Details if other :. Thanks for telling us about the problem. Return to Book Page. Preview — Hadoop Operations by Eric Sammer. Get A Copy. Paperback , pages. More Details Friend Reviews. To see what your friends thought of this book, please sign up. To ask other readers questions about Hadoop Operations , please sign up.
Lists with This Book. This book is not yet featured on Listopia. Community Reviews. Showing Rating details. All Languages. More filters. Sort order. Desmond rated it it was amazing Mar 23, Akhi marked it as to-read Aug 17, Constant width Used for program listings, as well as within paragraphs to refer to program elements such as variable or function names, databases, data types, environment variables, statements, and keywords.
Constant width bold Shows commands or other text that should be typed literally by the user. Constant width italic Shows text that should be replaced with user-supplied values or by values deter- mined by context. This icon indicates a warning or caution. Using Code Examples This book is here to help you get your job done. In general, you may use the code in this book in your programs and documentation.
For example, writing a program that uses several chunks of code from this book does not require permission. Answering a question by citing this book and quoting example code does not require permission. We appreciate, but do not require, attribution. An attribution usually includes the title, author, publisher, and ISBN.
Copyright Eric Sammer, Technology professionals, software developers, web designers, and business and cre- ative professionals use Safari Books Online as their primary resource for research, problem solving, learning, and certification training. Safari Books Online offers a range of product mixes and pricing programs for organi- zations, government agencies, and individuals.
For more information about Safari Books Online, please visit us online. To comment or ask technical questions about this book, send email to bookquestions oreilly. None of this was possible without the support and hard work of the larger Apache Fladoop community and ecosystem projects. I want to encourage all readers to get involved in the community and open source in general. Both Matt and Tom White coached me through the proposal process.
Aparna Ramani, Rob Weltman, Jolly Chen, and Flelen Friedland were instrumental throughout this process and forgiving of my con- stant interruptions of their teams. Special thanks to Christophe Bisciglia for giving me an opportunity at Cloudera and for the advice along the way. To those whom I may have omitted from this list, please forgive me. Finally, a special thank you to Kathy Sammer for her unwavering support, and for teaching me to do exactly what others say you cannot.
Preface xi www. Companies are storing more data from more sources in more formats than ever before. Organizations are finding new ways to use data that was previously be- lieved to be of little value, or far too expensive to retain, to better serve their constitu- ents. Sourcing and storing data is one half of the equation. Processing that data to produce information is fundamental to the daily operations of every modern business.
Fraud detection in commerce and finance, anomaly detection in operational systems, demographic analysis in ad- vertising, and many other applications have had to deal with these issues for decades. What has happened is that the volume, velocity, and variety of this data has changed, and in some cases, rather dramatically. This makes sense, as many algorithms benefit from access to more data. Take, for instance, the problem of recommending products to a visitor of an ecommerce website.
You could simply show each visitor a rotating list of products they could buy, hoping that one would appeal to them. The question is what do you need to improve the chance of showing the right person the right product? Customers who already bought a specific brand of laptop computer from you may be interested in compatible acces- sories and upgrades. No matter the solution, all of the algorithms behind these options require data 1.
I once worked on a data-driven marketing project for a company that sold beauty products. Using purchase transactions of all customers over a long period of time, the company was able to predict when a customer would run out of a given product after purchasing it. As it turned out, simply offering them the same thing about a week before they ran out resulted in a very noticeable lift in sales.
Knowing more about a problem space generally leads to better decisions or algorithm efficacy , which in turn leads to happier users, more money, reduced fraud, healthier people, safer conditions, or whatever the desired result might be. Apache Hadoop is a platform that provides pragmatic, cost-effective, scalable infra- structure for building many of the types of applications described earlier. Made up of a distributed filesystem called the Hadoop Distributed Filesystem HDFS and a com- putation layer that implements a processing paradigm called MapReduce, Hadoop is an open source, batch data processing system for enormous amounts of data.
We live in a flawed world, and Hadoop is designed to survive in it by not only tolerating hard- ware and software failures, but also treating them as first-class conditions that happen regularly. Hadoop uses a cluster of plain old commodity servers with no specialized hardware or network infrastructure to form a single, logical, storage and compute plat- form, or cluster, that can be shared by multiple individuals or groups.
Computation in Hadoop MapReduce is performed in parallel, automatically, with a simple abstraction for developers that obviates complex synchronization and network programming. Un- like many other distributed data processing systems, Hadoop runs the user-provided processing logic on the machine where the data lives rather than dragging the data across the network; a huge win for performance. For those interested in the history, Hadoop was modeled after two papers produced by Google, one of the many companies to have these kinds of data-intensive processing problems.
The first, presented in , describes a pragmatic, scalable, distributed filesystem optimized for storing enormous datasets, called the Google Filesystem, or GFS. In addition to simple storage, GFS was built to support large-scale, data-intensive, distributed processing applications. The following year, another paper, titled "Map- Reduce: Simplified Data Processing on Large Clusters," was presented, defining a pro- gramming model and accompanying framework that provided automatic paralleliza- tion, fault tolerance, and the scale to process hundreds of terabytes of data in a single job over thousands of machines.
When paired, these two systems could be used to build large data processing clusters on relatively inexpensive, commodity machines. Interest and investment in Hadoop has led to an entire ecosystem of related software both open source and commercial. Within the Apache Software Foundation alone, projects that explicitly make use of, or integrate with, Hadoop are springing up regu- larly. Some of these projects make authoring MapReduce jobs easier and more acces- sible, while others focus on getting data in and out of HDFS, simplify operations, enable deployment in cloud environments, and so on.
Here is a sampling of the more popular projects with which you should familiarize yourself: Apache Hive Hive creates a relational database-style abstraction that allows developers to write a dialect of SQL, which in turn is executed as one or more MapReduce jobs on the 2 Chapter 1: Introduction www. Hive takes advantage of this and provides a quick way to reduce the learning curve to adopting Hadoop and writing MapReduce jobs. For this reason, Hive is by far one of the most popular Hadoop ecosystem projects.
Hive works by defining a table-like schema over an existing set of files in HDFS and handling the gory details of extracting records from those files when a query is run. The data on disk is never actually changed, just parsed at query time. HiveQL statements are interpreted and an execution plan of prebuilt map and reduce classes is assembled to perform the MapReduce equivalent of the SQL statement.
Instead, users write data processing jobs in a high-level scripting language from which Pig builds an execution plan and exe- cutes a series of MapReduce jobs to do the heavy lifting. Apache Sqoop Not only does Hadoop not want to replace your database, it wants to be friends with it. Exchanging data with relational databases is one of the most popular in- tegration points with Apache Hadoop. Using MapReduce, Sqoop performs these operations in parallel with no need to write code. Many of these connectors are open source, while others are free or available from com- mercial vendors at a cost.
Apache Flume Apache Flume is a streaming data collection and aggregation system designed to transport massive volumes of data into systems such as Hadoop. It supports native connectivity and support for writing directly to HDFS, and simplifies reliable, streaming data delivery from a variety of sources including RPC services, log4j appenders, syslog, and even the output from OS commands.
Data can be routed, load-balanced, replicated to multiple destinations, and aggregated from thousands of hosts by a tier of agents. Introduction [ 3 www. Apache Oozie is a workflow engine and scheduler built specifically for large-scale job orchestration on a Hadoop cluster. Workflows can be triggered by time or events such as data arriving in a directory, and job failure handling logic can be implemented so that policies are adhered to.
Oozie presents a REST service for programmatic management of workflows and status retrieval. Run as a command- line tool either locally or within the cloud, Whirr can spin up instances, deploy Hadoop, configure the software, and tear it down on demand. Under the hood, Whirr uses the powerful jclouds library so that it is cloud provider-neutral. Data in HBase is stored in a semi- columnar format partitioned by rows into regions.
Today, HBase is used to serve huge amounts of data to real-time systems in major production deploy- ments. Apache ZooKeeper A true workhorse, Apache ZooKeeper is a distributed, consensus-based coordina- tion system used to support distributed applications. Distributed applications that require leader election, locking, group membership, service location, and config- uration services can use ZooKeeper rather than reimplement the complex coordi- nation and error handling that comes with these functions. In fact, many projects within the Hadoop ecosystem use ZooKeeper for exactly this purpose most no- tably, HBase.
Apache HCatalog A relatively new entry, Apache HCatalog is a service that provides shared schema and data access abstraction services to applications with the ecosystem. The long-term goal of HCatalog is to enable interoperability between tools such as Apache Hive and Pig so that they can share dataset metadata information. The Hadoop ecosystem is exploding into the commercial world as well.
Hadoop is fast becoming or, as an increasingly growing group would believe, already has become the de facto standard for truly large-scale data processing in the data center. Those in the latter camp may be rolling their eyes at the prospect of dealing with yet another system. Administrators do, however, play an abso- lutely critical role in planning, installing, configuring, maintaining, and monitoring Hadoop clusters. Hadoop is a comparatively low-level system, leaning heavily on the host operating system for many features, and it works best when developers and ad- ministrators collaborate regularly.
What you do impacts how things work. The so-called big data space is all the rage, sure, but more importantly, Hadoop is growing and changing at a stag- gering rate. Each new version — and there have been a few big ones in the past year or two — brings another truckload of features for both developers and administrators alike. Where necessary, the differences are called out, and a section in Chapter 4 is devoted to walking you through the most commonly encountered versions.
This book is intended to be a pragmatic guide to running Hadoop in production. Those who have some familiarity with Hadoop may already know alternative methods for installation or have differing thoughts on how to properly tune the number of map slots based on CPU utilization. The goal is not to enumerate all possible scenarios, but rather to call out what works, as demonstrated in critical deployments. Chapters 2 and 3 provide the necessary background, describing what HDFS and Map- Reduce are, why they exist, and at a high level, how they work.
Chapter 4 walks you through the process of planning for an Hadoop deployment including hardware selec- tion, basic resource planning, operating system selection and configuration, Hadoop distribution and version selection, and network concerns for Hadoop clusters. We also briefly cover the flux capacitor and discuss the burn rate of energon cubes during combat.
Introduction 5 www. Those that have strong security requirements or want to understand identity, access, and authorization within Hadoop will want to pay particular attention to Chapter 6. Chapter 7 explains the nuts and bolts of sharing a single large cluster across multiple groups and why this is beneficial while still adhering to service-level agree- ments by managing and allocating resources accordingly. Once everything is up and running, Chapter 8 acts as a run book for the most common operations and tasks.
Chapter 9 is the rainy day chapter, covering the theory and practice of troubleshooting complex distributed systems such as Hadoop, including some real-world war stories. In an attempt to minimize those rainy days, Chapter 10 is all about how to effectively monitor your Hadoop cluster. Finally, Chapter 1 1 provides some basic tools and tech- niques for backing up Hadoop and dealing with catastrophic failure.
HDFS was built to support high throughput, streaming reads and writes of extremely large files. Traditional large storage area networks SANs and network attached storage NAS offer centralized, low-latency access to either a block device or a filesystem on the order of terabytes in size. These systems are fantastic as the backing store for relational databases, content delivery systems, and similar types of data storage needs because they can support full-featured POSIX semantics, scale to meet the size requirements of these systems, and offer low-latency access to data.
Imagine for a second, though, hundreds or thousands of machines all waking up at the same time and pulling hundreds of terabytes of data from a centralized storage system at once. Accom- plish availability and high throughput through application-level replication of data. Batch performance is more important than interactive response times. See Chapter 3 for details. MapReduce, for instance, takes advantage of how the data in HDFS is split on ingestion into blocks and pushes com- putation to the machine where blocks can be read locally.
Design HDFS, in many ways, follows traditional filesystem design. Files are stored as opaque blocks and metadata exists that keeps track of the filename to block mapping, directory tree structure, permissions, and so forth. This is similar to common Linux filesystems such as ext3. So what makes HDFS different? Traditional filesystems are implemented as kernel modules in Linux, at least and together with userland tools, can be mounted and made available to end users.
ISBN 13: 9781491923832
This is a fancy way of saying that the filesystem code runs outside the kernel as OS processes and by extension, is not registered with or exposed via the Linux VFS layer. While this is much simpler, more flexible, and arguably safer to implement, it means that you don't mount HDFS as you would ext3, for instance, and that it requires applications to be explicitly built for it. In addition to being a userspace filesystem, HDFS is a distributed filesystem. Dis- tributed filesystems are used to overcome the limits of what an individual disk or ma- chine is capable of supporting. Each machine in a cluster stores a subset of the data that makes up the complete filesystem with the idea being that, as we need to store more block data, we simply add more machines, each with multiple disks.
Another major difference between HDFS and other filesystems is its block size. It is common that general purpose filesystems use a 4 KB or 8 KB block size for data. Ha- doop, on the other hand, uses the significantly larger block size of 64 MB by default. Increasing the block size means data will be written in larger contiguous chunks on disk, which in turn means data can be written and read in larger sequential opera- tions.
Rather than rely on specialized storage subsystem data protection, HDFS replicates each block to multiple machines in the cluster. By default, each block in a file is repli- cated three times. Because files in HDFS are write once, once a replica is written, it is not possible for it to change. Having multiple replicas means multiple machine failures are easily tolerated, but there are also more opportunities to read data from a machine closest to an application on the network.
HDFS actively tracks and manages the number of available replicas of a block as well. Should the number of copies of a block drop below the configured replication factor, the filesystem automatically makes a new copy from one of the remaining replicas. Daemons There are three daemons that make up a standard HDFS cluster, each of which serves a distinct role, shown in Table Table HDFS daemons Daemon per cluster Purpose Namenode 1 Stores filesystem metadata, stores file to block map, and pro- vides a global picture of the filesystem Secondary namenode 1 Performs internal namenode transaction log checkpointing Datanode Many Stores block data file contents Blocks are nothing more than chunks of a file, binary blobs of data.
In production systems, these disks are usually reserved exclusively for Hadoop. Storage can be added to a cluster by adding more datanodes with additional disk capacity, or even adding disks to existing datanodes. This keeps with the commodity hard- ware design goal and reduces cost as clusters grow in size.
Rather than rely on a RAID controller for data safety, block data is simply written to multiple machines. Having multiple copies of each block on separate machines means that not only are we protected against data loss if a machine disappears, but during processing, any copy of this data can be used. By having more than one option, the scheduler that decides where to perform processing has a better chance of being able Daemons 9 www.
This is covered in greater detail in Chapter 3. The lack of RAID can be controversial. In fact, many believe RAID simply makes disks faster, akin to a magic go-fast turbo button. This, however, is not always the case. Typically, datanodes have a large number of independent disks, each of which stores full blocks. While datanodes are responsible for storing block data, the namenode NN is the daemon that stores the filesystem metadata and maintains a complete picture of the filesystem.
Datanodes regularly report their status to the namenode in a heartbeat. This means that, at any given time, the namenode has a complete view of all datanodes in the cluster, their current health, and what blocks they have available. Figure The block report is simply a list of all blocks the datanode currently has on its disks and allows the namenode to keep track of any changes.
This is also necessary because, while the file to block mapping on the name- node is stored on disk, the locations of the blocks are not written to disk. This may seem counterintuitive at first, but it means a change in IP address or hostname of any of the datanodes does not impact the underlying storage of the filesystem metadata. Another nice side effect of this is that, should a datanode experience failure of a moth- erboard, administrators can simply remove its hard drives, place them into a new chas- sis, and start up the new machine.
As far as the namenode is concerned, the blocks have simply moved to a new datanode. The downside is that, when initially starting a cluster or restarting it, for that matter , the namenode must wait to receive block re- ports from all datanodes to know all blocks are present.
The namenode filesystem metadata is served entirely from RAM for fast lookup and retrieval, and thus places a cap on how much metadata the namenode can handle. Finally, the third HDFS process is called the secondary namenode and performs some internal housekeeping for the namenode. Despite its name, the secondary namenode is not a backup for the namenode and performs a completely different function. The secondary namenode may have the worst name for a process in the history of computing. It has tricked many new to Hadoop into believing that, should the evil robot apocalypse occur, their cluster will continue to function when their namenode becomes sentient and walks out of the data center.
- Hadoop Operations - O'Reilly Media.
- Best Hadoop Administration Books You Must Read.
- Diary of a Superhero Sidekick (Mighty Mantis Book 1).
- Never Tell.
- Voyageurs malgré eux (Les) (French Edition)!
- I Can Depend On Me.
This library encapsulates most of the gory details related to communicating with the name- node and datanodes when necessary, as well as dealing with the numerous failure cases that can occur when working with a distributed filesystem. Reading and Writing Data 11 www. As shown in Figure , the client begins by contacting the namenode, indicating which file it would like to read. The client identity is first validated — either by trusting the client and allowing it to specify a username or by using a strong authentication mech- anism such as Kerberos see Chapter 6 — and then checked against the owner and permissions of the file.
If the file exists and the user has access to it, the namenode responds to the client with the first block ID and the list of datanodes on which a copy of the block can be found, sorted by their distance to the client. With the block IDs and datanode hostnames, the client can now contact the most appropriate datanode directly and read the block data it needs. This process repeats until all blocks in the file have been read or the client closes the file stream.
Rather than give up, the library will automatically attempt to read another replica of the data from another datanode. If all replicas are unavailable, the read op- eration fails and the client receives an exception. Another corner case that can occur is that the information returned by the namenode about block locations can be outdated by the time the client attempts to contact a datanode, in which case either a retry will occur if there are other replicas or the read will fail.
While rare, these kinds of corner cases make troubleshooting a large distributed system such as Hadoop so complex. See Chapter 9 for a tour of what can go wrong and how to diagnose the problem. Remember that clients need not actually implement this logic; this is simply an overview of how data is written to the cluster by the underlying Hadoop library. Application developers use mostly familiar APIs to open files, write to a stream, and close them similarly to how they would with traditional local files.
A request is sent to the namenode to create the file metadata if the user has the necessary permissions to do so. The metadata entry for the new file is made; however, it initially has no associated blocks. A response to the client indicates the open request was successful and that it may now begin writing data. As the client writes data to the stream it is split into packets not to be confused with TCP packets or HDFS blocks , which are queued in memory.
A separate thread in the client consumes packets from this queue and, as necessary, contacts the namenode requesting a set of datanodes to which replicas of the next block should be written. The client then makes a direct connection to the first datanode in the list, which makes a connection to the second, which connects to the third. This forms the replication pipe- line to be used for this block of data, as shown in Figure Data packets are then streamed to the first datanode, which writes the data to disk, and to the next datanode in the pipeline, which writes to its disk, and so on.
The client application maintains a list of packets for which acknowledgments have not yet been received and when it receives a response, it knows the data has been written to all nodes in the pipeline.
Follow the author
This process of writing packets to the pipeline continues until the block size is reached, at which point the client goes back to the namenode for the next set of datanodes to write to. Of course, things are not always this simple, and failures can occur. The most common type of failure is that a datanode in the replication pipeline fails to write data for one Reading and Writing Data 13 www.
The HDFS write path reason or another — a disk dies or a datanode fails completely, for instance. When this happens, the pipeline is immediately closed and all packets that had been sent since the last acknowledgment are pushed back into the queue to be written so that any datanodes past the failed node in the pipeline will receive the data. The current block is given a new ID on the remaining healthy datanodes. This is done so that, should the failed datanode return, the abandoned block will appear to not belong to any file and be discarded automatically.
A new replication pipeline containing the remaining da- tanodes is opened and the write resumes. At this point, things are mostly back to normal and the write operation continues until the file is closed. The namenode will notice that one of the blocks in the file is under-replicated and will arrange for a new replica to be created asynchronously. A client can recover from multiple failed datanodes provided at least a minimum number of replicas are written by default, this is one. Managing Filesystem Metadata The namenode stores its filesystem metadata on local filesystem disks in a few different files, the two most important of which ar efsimage and edits.
Just like a database would, fsimage contains a complete snapshot of the filesystem metadata whereas edits contains 14 Chapter 2: HDFS www. Upon namenode startup, th e fsimage file is loaded into RAM and any changes in the edits file are replayed, bringing the in-memory view of the filesystem up to date.
In more recent versions of Hadoop specifically, Apache Hadoop 2. Conceptually, metadata stor- age is similar, although transactions are no longer stored in a single edits file. Instead, the namenode periodically rolls the edits file closes one file and opens a new file , numbering them by transaction ID. That being said, you should never make direct changes to these files unless you really know what you are doing. The rest of this book will simply refer to these files using their base names , fsimage and edits, to refer generally to their function.
Recall from earlier that the namenode writes changes only to its write ahead log, edits. Over time, the edits file grows and grows and as with any log-based system such as this, would take a long time to replay in the event of server failure. Similar to a relational database, the edits file needs to be periodically applied to the fsimage file. The problem is that the namenode may not have the available resources — CPU or RAM — to do this while continuing to provide service to the cluster.
This is where the secondary namenode comes in. The exact interaction that occurs between the namenode and the secondary namenode shown in Figure is as follows: 1 1. The secondary namenode instructs the namenode to roll its edits file and begin writing to edits. The secondary namenode loads fsimage, replays edits on top of it, and writes a new, compacted fsimage file to disk. The secondary namenode sends the new fsimage file to the namenode, which adopts it. The namenode renames edits. This process is slightly different for Apache Hadoop 2. Managing Filesystem Metadata 15 www. Newer versions of Hadoop use a defined number of transactions rather than file size to determine when to perform a checkpoint.
Namenode High Availability As administrators responsible for the health and service of large-scale systems, the no- tion of a single point of failure should make us a bit uneasy or worse. Unfortunately, for a long time the HDFS namenode was exactly that: a single point of failure. Recently, the Hadoop community as a whole has invested heavily in making the namenode highly available, opening Hadoop to additional mission-critical deployments.
The edits write ahead log needs to be available to both namenodes, and therefore is stored on a shared storage device. Currently, an NFS filer is required as the shared storage, although there are plans to remove this dependency. Datanodes are also aware of both namenodes in an HA configuration and send block reports to both servers. A high-availability pair of namenodes can be configured for manual or automatic fail- over. In the default manual failover mode, a command must be sent to effect a state transition from one namenode to the other. When configured for automatic failover, each namenode runs an additional process called a failover controller that monitors the health of the process and coordinates state transitions.
Just as in other HA systems, there are two primary types of failover: graceful failover, initiated by an administrator, and nongraceful failover, which is the result of a detected fault in the active process.
Customers who bought this item also bought
If both processes were allowed to con- tinue running, they could both write to the shared state and corrupt the filesystem metadata. This is commonly called a split brain scenario. Most administrators who want high availability will also want to con- figure automatic failover as well. See Figure for an example of automatic failover. When running with high availability, the standby namenode takes over the role of the secondary namenode, described earlier.
In other words, there is no separate secondary namenode process in an FIA cluster, only a pair of namenode processes. Those that already run Fladoop clusters that have a dedicated machine on which they run the secondary namenode process can repurpose that machine to be a second namenode in most cases. Namenode High Availability 17 www. These tools, after all, support health checks, in and out of band communication, and fencing plug-ins already. Unfortunately, HA is a tougher nut to crack than simply killing a process and starting a new one elsewhere.
The real challenge with implementing a highly available namenode stems from the fact that datanode block reports are not written to disk. As we saw earlier, receiving and processing block reports from hundreds or thousands of machines is actually the part of cluster startup that takes time; on the order of tens of minutes or more.
This type of interruption is still far outside of the acceptable service- level agreement for many mission-critical systems. Namenode Federation Large-scale users of Hadoop have had another obstacle with which to contend: the limit of how much metadata the namenode can store in memory. In order to scale the namenode beyond the amount of physical memory that could be stuffed into a single server, there needed to be a way to move from a scale-up to a scale-out approach.
This technique is called namespace federation and refers to assembling one logical namespace from a number of autonomous systems. An ex- ample of a federated namespace is the Linux filesystem: many devices can be mounted at various points to form a single namespace that clients can address without concern for which underlying device actually contains the data. Namenode federation Figure works around the memory limitation of the name- node by allowing the filesystem namespace to be broken up into slices and spread across multiple namenodes.
Just as it sounds, this is really just like running a number of sep- arate namenodes, each of which is responsible for a different part of the directory structure. The one major way in which namenode federation is different from running several discreet clusters is that each datanode stores blocks for multiple namenodes. While blocks from different pools are stored on the same disks there is no physical separation , they are logically exclusive.
Each datanode sends heartbeats and block reports to each name- node. Clients often do not want to have to worry about multiple namenodes, so a special client API implementation called ViewFS can be used that maps slices of the filesystem to the proper namenode. Federation also allows us to use namespace partitioning to control the availability and fault tolerance of different slices of the file- system.
Namenode federation overview Namenode Federation 19 www. That is, it is possible to enable them independently of each other, as they speak to two different problems. This means a namespace can be partitioned and some of those partitions or all may be served by an HA pair of namenodes. All other access methods are built on top of this API and by definition, can expose only as much functionality as it permits. The API does differ where necessary in order to provide the features and guar- antees it advertises, but most of these are obvious or documented.
In order to access HDFS, clients — applications that are written against the API — must have a copy of configuration data that tells them where the namenode is running. This is analogous to an Oracle client application requiring the tnsnames. Each ap- plication must also have access to the Hadoop library JAR file. Clients can be on the same physical machines as any of the Hadoop daemons, or they can be separate from the cluster proper.
Theyjust happen to be running on the same physical machines where HDFS stores its block data. Command-Line Tools Hadoop comes with a number of command-line tools that enable basic filesystem op- erations. Running hadoop fs will display basic usage information, as shown in Example Example If the full URL syntax is not used, the value is taken from the fs.
Access and Integration 21 www. Files are uploaded using either -put or the synonym -copyFromLocal and are downloaded with -get or -copyToLocal. This can be done by using the -setrep command, which takes a replication factor and an optional flag -R to indicate it should operate recursively see Example You may notice that only files have a block list. Directories in HDFS are purely metadata entries and have no block data. That is, development of a kernel module is not required.
This allows legacy applications and systems to continue to read and write files to a regular directory on a Linux server that is backed by F1DFS. All of the properties of F1DFS are still present: no in-place modification of files, comparatively high latency, poor random access performance, optimization for large streaming operations, and huge scale. It is only a compatibility layer that can expose F1DFS to applications that perform only basic file operations. Starting with Apache Fladoop 1. By using the embedded web server in each daemon, WebFlDFS clients must be able to communicate with each node of the cluster, just like native Java clients.
FlttpFS primarily exists to solve this problem and instead acts as a gateway service that can span network segments. The upside to FlttpFS is that it minimizes the footprint required to communicate with the cluster, but at the cost of total scale and capacity because all data between clients and F1DFS must now travel through a single node. Of course, it is perfectly fine to run multiple FlttpFS proxies to overcome this problem.
The decision can be one based exclusively on the required data throughput and network design and security requirements. Designed to simplify the development of large-scale, dis- tributed, fault-tolerant data processing applications, MapReduce is foremost a way of writing applications.
In MapReduce, developers write jobs that consist primarily of a map function and a reduce function, and the framework handles the gory details of parallelizing the work, scheduling parts of the job on worker machines, monitoring for and recovering from failures, and so forth. Developers are shielded from having to implement complex and repetitious code and instead, focus on algorithms and business logic. User-provided code is invoked by the framework rather than the other way around. This is much like Java application servers that invoke servlets upon receiving an HTTP request; the container is responsible for setup and teardown as well as pro- viding a runtime environment for user-supplied code.
Dean, S. Hadoop MapReduce is an open source im- plementation of the model described in this paper and tracks the implementation closely. Specifically developed to deal with large-scale workloads, MapReduce provides the following features: Simplicity of development MapReduce is dead simple for developers: no socket programming, no threading or fancy synchronization logic, no management of retries, no special techniques to deal with enormous amounts of data.
Developers use functional programming concepts to build data processing applications that operate on one record at a time. The reduce function then operates on the intermediate key-value pairs, processing all values that have the same key together and outputting the result.
These primi- tives can be used to implement filtering, projection, grouping, aggregation, and other common data processing functions. Scale Since tasks do not communicate with one another explicitly and do not share state, they can execute in parallel and on separate machines. Additional machines can be added to the cluster and applications immediately take advantage of the addi- tional hardware with no change at all.
MapReduce is designed to be a share noth- ing system. The framework is responsible for splitting a MapReduce job into tasks. Tasks are then executed on worker nodes or less pleasantly slaves. MapReduce treats failure as a first-class citizen and supports reexecution of failed tasks on healthy worker nodes in the cluster. Should a worker node fail, all tasks are assumed to be lost, in which case they are simply rescheduled elsewhere.
The unit of work is always the task, and it either completes successfully or it fails completely. In MapReduce, users write a client application that submits one or more jobs that con- tain user-supplied map and reduce code and a job configuration file to a cluster of machines. The job contains a map function and a reduce function, along with job con- figuration information that controls various aspects of its execution. A job processes an input dataset specified by the user and usually outputs one as well. Commonly, the input and output datasets are one or more files on a distributed filesystem.
The Stages of MapReduce A MapReduce job is made up of four distinct stages, executed in order: client job sub- mission, map task execution, shuffle and sort, and reduce task execution. Client ap- plications can really be any type of application the developer desires, from command- line tools to services. The job itself is made up of code written by a developer against the MapReduce APIs and the configuration which specifies things such as the input and output datasets.
As described earlier, the client application submits a job to the cluster using the frame- work APIs. A master process, called th t jobtr acker in Hadoop MapReduce, is respon- sible for accepting these submissions more on the role of the jobtracker later. The framework gets to decide how to split the input dataset into chunks, or input splits , of data that can be processed in parallel. In Hadoop MapReduce, the component that does this is called an input format, and Hadoop comes with a small library of them for common file formats.
A sample of log records might look something like this: [INFO - com. Main] Application started! Main] Something hinky-u is going down. Main] False alarm.
No worries. Main] Good morning. It's-u coffee time. For each input split, a map task is created that runs the user-supplied map function on each record in the split. Map tasks are executed in parallel. This means each chunk of the input dataset is being processed at the same time by various machines that make up the cluster. The map function takes a key-value pair as input and produces zero or more intermediate key-value pairs.
The input format is responsible for turning each record into its key-value pair repre- sentation. For now, trust that one of the built-in input formats will turn each line of the file into a value with the byte offset into the file provided as the key. Getting back to our example, we want to write a map function that will filter records for those within a specific timeframe, and then count all events of each severity. The Stages of MapReduce 27 www.
A map function can do just about whatever it wants with each record. First, we see that the key INFO repeats, which makes sense because our sample contained three INFO records that would have matched the date The other notable effect is that the output records are not in the order we would expect. This is because the framework sorts the output of each map task by its key. Just like outputting the value l for each record, the rationale behind sorting the data will become clear in a moment. Further, each key is assigned to a partition using a component called the partitioner.
In Fladoop MapReduce, the default partitioner implementation is a hash partitioner that takes a hash of the key, modulo the number of configured reducers in the job, to get a partition number. Because the hash implementation used by Fladoop ensures the hash of the key INFO is always the same on all machines, all INFO records are guaranteed to be placed in the same partition. For all intents and purposes, you can picture a partition number next to each record; it would be the same for all records with the same key. See Figure for a high-level overview of the execution of the map phase.
K: V A number of guarantees, however, are made to the developer with respect to the re- ducers that need to be fulfilled. For example, if a reducer receives the INFO key, it will always receive the three number 1 values. This makes sense given the pre- ceding requirement. The next phase of processing, called the shuffle and sort, is responsible for enforcing these guarantees. When started, each reducer is assigned one of the partitions on which it should work. First, they copy the intermediate key-value data from each worker for their assigned partition. The reducer assigned partition 1, for example, would need to fetch each piece of its partition data from potentially every other worker in the cluster.
A logical view of the intermediate data across all machines in the cluster might look like this: The Stages of MapReduce 29 www.
Hadoop Operations: A Guide for Developers and Administrators - Eric Sammer - Google книги
To minimize the total runtime of the job, the framework is permitted to begin copying intermediate data from completed map tasks as soon as they are finished. Remember that the shuffle and sort is being performed by the reduce tasks, each of which takes up resources in the cluster. We want to start the copy phase soon enough that most of the intermediate data is copied before the final map task completes, but not so soon that the data is copied leaving the reduce tasks idly taking up resources that could be used by other reduce tasks. See mapred. Once the reducer has received its data, it is left with many small bits of its partition, each of which is sorted by key.
What we want is a single list of key-value pairs, still sorted by key, so we have all values for each key together. The easiest way to accomplish this is by performing a merge sort of the data. A merge sort takes a number of sorted items and merges them together to form a fully sorted list using a minimal amount of memory. Separate files are written so that reducers do not have to coordinate access to a shared file. This greatly reduces complexity and lets each reducer run at whatever speed it can.
The format of the file depends on the output format specified by the author of the MapRe- duce job in the job configuration. Run reduce A. To produce the same output, we would use the following SQL statement. MapReduce is a batch data processing system The design of MapReduce assumes that jobs will run on the order of minutes, if not hours. It is optimized for full table scan style operations. Consequently, it un- derwhelms when attempting to mimic low-latency, random access patterns found in traditional online transaction processing OLTP systems.
MapReduce is not a relational database killer, nor does it purport to be. MapReduce is overly simplistic One of its greatest features is also one of its biggest drawbacks: MapReduce is simple. In cases where a developer knows something special about the data and wants to make certain optimizations, he may find the model limiting. This can be very true.
Certainly for basic query-like functionality, no one wants to write, map, and reduce functions. Higher-level languages built atop Map- Reduce exist to simplify life, and unless you truly need the ability to touch terabytes or more of raw data, it can be overkill. Not all algorithms can be parallelized There are entire classes of problems that cannot easily be parallelized. The act of training a model in machine learning, for instance, cannot be parallelized for many types of models. This is true for many algorithms where there is shared state or dependent variables that must be maintained and updated centrally.
Sometimes 32 Chapter 3: MapReduce www. Other times, while this is possible, it may not be ideal for a host of reasons. Knowing how to identify these kinds of problems and create alternative solutions is far beyond the scope of this book and an art in its own right. Hadoop MapReduce is inher- ently aware of HDFS and can use the namenode during the scheduling of tasks to decide the best placement of map tasks with respect to machines where there is a local copy of the data. This avoids a significant amount of network overhead during processing, as workers do not need to copy data over the network to access it, and it removes one of the primary bottlenecks when processing huge amounts of data.
Client applications written against the Hadoop APIs can submit jobs either synchronously and block for the result, or asynchronously and poll the master for job status. Cluster daemons are long-lived while user tasks are executed in ephemeral child processes. Although executing a separate process incurs the overhead of launching a separate JVM, it isolates the framework from untrusted user code that could — and in many cases does — fail in destructive ways. Since MapReduce is specifically targeting batch processing tasks, the additional over- head, while undesirable, is not necessarily a showstopper.
One of the ingredients in the secret sauce of MapReduce is the notion of data locality, by which we mean the ability to execute computation on the same machine where the data being processed is stored. When a job executes, workers fetch the data from the central storage system, process it, and write the result back to the storage device. The problem is that this can lead to a storm effect when there are a large number of workers attempting to fetch the same data at the same time and, for large datasets, quickly causes bandwidth contention.
MapReduce flips this model on its head. Instead Introducing Hadoop MapReduce 33 www. Blocks that make up files are distributed to nodes when they are initially written and when computation is performed, the user-supplied code is executed on the machine where the block can be pushed to the machine where the block is stored locally. Remember that HDFS stores multiple replicas of each block.
This is not just for data availability in the face of failures, but also to increase the chance that a machine with a copy of the data has available capacity to run a task. Daemons There are two major daemons in Hadoop MapReduce: the jobtracker and the tasktracker. Jobtracker The jobtracker is the master process, responsible for accepting job submissions from clients, scheduling tasks to run on worker nodes, and providing administrative func- tions such as worker health and task progress monitoring to the cluster.
There is one jobtracker per MapReduce cluster and it usually runs on reliable hardware since a failure of the master will result in the failure of all running jobs. Just like the relationship between datanodes and the namenode in HDFS, tasktrackers inform the jobtracker as to their current health and status by way of regular heartbeats.
After a configurable period of no heartbeats, a tasktracker is assumed dead. The jobtracker uses a thread pool to process heartbeats and client requests in parallel. When a job is submitted, information about each task that makes up the job is stored in memory. This task information updates with each tasktracker heartbeat while the tasks are running, providing a near real-time view of task progress and health.
After the job completes, this information is retained for a configurable window of time or until a specified number of jobs have been executed. On an active cluster where many jobs, each with many tasks, are running, this information can consume a considerable amount of RAM. For this reason, monitoring jobtracker memory uti- lization is absolutely critical. The jobtracker provides an administrative web interface that, while a charming flash- back to web anti- design circa , is incredibly information-rich and useful. As tasktrackers all must report in to the jobtracker, a complete view of the available cluster resources is available via the administrative interface.
If you are to be responsible for a production Hadoop cluster, you will find yourself checking this interface con- stantly throughout the day. The act of deciding which tasks of a job should be executed on which worker nodes is referred to as task scheduling. This is not scheduling in the way that the cron daemon executes jobs at given times, but instead is more like the way the OS kernel schedules process CPU time.
Much like CPU time sharing, tasks in a MapReduce cluster share worker node resources, or space, but instead of context switching — that is, pausing the execution of a task to give another task time to run — when a task executes, it executes completely. Understanding task scheduling — and by extension, resource allocation and sharing — is so important that an entire chapter Chapter 7 is dedicated to the subject. Tasktracker The second daemon, the tasktracker, accepts task assignments from the jobtracker, instantiates the user code, executes those tasks locally, and reports progress back to the jobtracker periodically.
There is always a single tasktracker on each worker node. Both tasktrackers and datanodes run on the same machines, which makes each node both a compute node and a storage node, respectively. Each tasktracker is configured with a specific number of map and reduce task slots that indicate how many of each type of task it is capable of executing in parallel.
A task slot is exactly what it sounds like; it is an allocation of available resources on a worker node to which a task may be assigned, in which case it is executed. A tasktracker executes some number of map tasks and reduce tasks in parallel, so there is concurrency both within a worker where many tasks run, and at the cluster level where many workers exist. Map and reduce slots are configured separately because they consume resources differently.
You may have picked up on the idea that deciding the number of map and reduce task slots is extremely important to making full use of the worker node hardware, and you would be correct. Upon receiving a task assignment from the jobtracker, the tasktracker executes an attempt of the task in a separate process. The distinction between a task and a task attempt is important: a task is the logical unit of work, while a task attempt is a specific, physical instance of that task being executed.
Communication between the task attempt usually called the child, or child pro- cess and the tasktracker is maintained via an RPC connection over the loopback in- terface called the umbilical protocol. As soon as the task completes, the child exits and the slot becomes available for assignment. The tasktracker uses a list of user-specified directories each of which is assumed to be on a separate physical device to hold the intermediate map output and reducer input during job execution.
This is required because this data is usually too large to fit ex- clusively in memory for large jobs or when many jobs are running in parallel. Tasktrackers, like the jobtracker, also have an embedded web server and user interface. When It All Goes Wrong Rather than panic when things go wrong, MapReduce is designed to treat failures as common and has very well-defined semantics for dealing with the inevitable.
With tens, hundreds, or even thousands of machines making up a Hadoop cluster, machines — and especially hard disks — fail at a significant rate. In addition to faulty servers, there can sometimes be errant user MapReduce jobs, network failures, and even errors in the data. When a failure is detected by the tasktracker, it is reported to the jobtracker in the next heartbeat.
The jobtracker, in turn, notes the failure and if additional attempts are per- mitted the default limit is four attempts , reschedules the task to run. The task may be run either on the same machine or on another machine in the cluster, depending on available capacity. Should multiple tasks from the same job fail on the same tasktracker 36 Chapter 3: MapReduce www. If multiple tasks from different jobs repeatedly fail on a specific tasktracker, the tasktracker in question is added to a global blacklist for 24 hours, which prevents any tasks from being scheduled on that tasktracker.
The jobtracker, after a configurable amount of time with no heartbeats, will consider the tasktracker dead along with any tasks it was assigned.