This is a study guide I made for the Cloudera Certified Administrator for Apache Hadoop (CCAH) that I passed. This is for test exam code CCA-410. I did not take the $3k class offered from Cloudera. I just used the O'Reilly books "Hadoop Operations" and "Hadoop The Definitive Guide" to study. I also used the Cloudera support website documentation along with their pre-test docs. The Apache Hadoop website is also a great site for even more documentation.
This study guide was made for the versions of Hadoop that Cloudera uses with their own distro (CDH4) which is 2.0.0. Hadoop development moves fast and this info might be out of date so verify anything you read below. I would hope that most primary functions will stay the same, but just some properties names will be deprecated and changed. Here is a link for information about the test.
############ ### HDFS ### ############ Hadoop Distributed Filesystem (HDFS) Built to support high throughput, streaming reads and writes of extremely large files. NAS and SAN's offer centralized, low-latency access to either a block device or a filesystem on the order of terabytes in size. They do not scale to meet the need of thousands of machines pulling hundreds of Gigs of content all at one time. ### Goals for HDFS ### * Store millions of large files, each greater than tens of gigabytes, and filesystem sizes reaching tens of petabytes. * Use a scale-out model based on inexpensive commodity servers with internal JBOD ("Just a bunch of disks") rather than RAID to achieve large-scale storage. Accomplish availability and high throughput through application-level replication of data. * Optimize for large, streaming reads and writes rather than low-latency access to many small files. Batch performance is more important than interactive response times. * Gracefully deal with component failures of machines and disks. * Support the functionality and scale requirements of MapReduce processing. ### HDFS Design ### - HDFS is a userspace filesystem. It runs as a process of the OS. It uses the local machines filesystem to store it's own files. HDFS is not a POSIX-compliant filesystem. - HDFS is a distributed filesystem. Each machine in a cluster stores a subset of the data (blocks) that makes up the complete filesystem. Filesystem metadata is stored on a centralized server (NameNode), acting as a directory of block data and providing a global picture of the filesystem's state. The admin controls the number of blocks that are replicated across the cluster. The default replication for each block is 3 times. Should the number of copies of a block drop below the configured replication factor, the filesystem automatically makes a new copy from one of the remaining replicas Having this replication allows: - Multiple machine failures can be more easily tolerated. - Read data from a machine closest to an application on the network - During processing any copy of the data can be used, giving the scheduler a better chance of finding available resources - No need for special expensive specialized storage system for data protection of block data. Ex. RAID - HDFS has a block size higher than most other filesystems. The default is 64M and some go as high as 1G. Increasing the block size means data will be written in larger contiguous chunks on disk, which in turn means data can be written and read in larger sequential operations. This minimizes drive seek operations and results in better performance when doing large streaming I/O operations. - A block consists of the raw bytes of a portion of the file being stored. All block files start with the prefix blk_. The metadata file (with a .meta suffix) is made up of a header with version and type information, followed by a series of checksums for sections of the block. - Files in HDFS are write once, once a replica is written, it is not possible for it to change. Benefits of this include: - Removing the need for complex reasoning about the consistency between replicas. - Applications being able to read any of the available replicas when accessing a file - A file in HDFS that is smaller than a single block does not occupy a full block's worth of underlying storage. So if a file is 10MB but the block size is 64MB the block only occupies 10MB until more data is added. - Multiple files in HDFS can use different block sizes. HDFS provides api to specify block size when you create a file. FileSystem.create(Path, overwrite, bufferSize, replication, blockSize, progress) - Every datanode runs a block scanner periodically verified every three weeks, this verifies all the blocks stored on the datanode. This allows bad blocks to be detected and fixed before they are read by clients. The DataBlockScanner maintains a list of blocks to verify and scans them one by one for checksum errors. - HDFS permits a client to read a file that is open for writing. When reading a file open for writing, the length of the last block still being written is unknown to the NameNode. The last incomplete block is not visible and neither are any subsequent blocks which are going to be completed. Only the blocks which were completed before opening the stream for reading are available. In other words if the block being written is not finished yet the reader can't see it. - HAR file (Hadoop Archive) groups together small files into a single Hadoop archive file. - Useful when there are already lots of small files in HDFS, which need to be grouped together before some expensive jobs. - Implemented as a MapReduce job. - Use a har:// URL to access each file from the archive and view the archive as a folder. - Use a normal hdfs:// URL to access the actual content of the archive in HDFS. HARs are stored in HDFS as folders which contain a file with the concatenation of all its containing input files. ### Daemons that make up HDFS ### There are three daemons that make up a standard HDFS cluster. NameNode - 1 per cluster. Filesystem metadata is stored on a centralized server, acting as a directory of block data and providing a global picture of the filesystem's state. Secondary NameNode - 1 per cluster. Performs internal NameNode transaction log checkpointing. Datanode - Many per cluster. Stores block data (contents of files). ### NameNode ### - Clients connect to the NameNode to perform filesystem operations - Datanodes regularly report their status to the NameNode in a heartbeat. They carry information about total storage capacity, fraction of storage in use, and the number of data transfers currently in progress. These statistics are used for the NameNode's block allocation and load balancing decisions. - The NameNode does not directly send requests to DataNodes. It uses replies to heartbeats to send instructions to the DataNodes. The instructions include commands to replicate blocks to other nodes, remove local block replicas, re-register and send an immediate block report, and shut down the node. - At any given time, the NameNode has a complete view of all datanodes in the cluster, their current health, and what blocks they have available. - Datanode initially starts up, as well as every hour thereafter, a block report to the NameNode. A block report is simply a list of all blocks the datanode currently has on its disks. The NameNode keeps track of all the changes. - File to block mapping on the NameNode is stored on disk. The host specific location of the blocks are not recorded. DataNodes send out their block lists on startup and periodically after that. The NameNode uses these reports to see where all of it's blocks are. This allows you to move disks from one datanode to another and not worry about a changing hostname or IP. - NameNode stores its filesystem metadata on local filesystem disks in a few different files, but the two most important of which are fsimage and edits. - Fsimage contains a complete snapshot of the filesystem metadata including a serialized form of all the directory and file inodes in the filesystem. Each inode is an internal representation of a file or directory's metadata and contains such information as the file's replication level, modification and access times, access permissions, block size, and the blocks a file is made up of. For directories, the modification time, permissions, and quota metadata is stored. - Edits file (journal) contains only incremental modifications made to the metadata. It uses a write ahead log which reduces I/O operations to sequential, append-only operations (in the context of the NameNode, since it serves directly from RAM), which avoids costly seek operations and yields better overall performance. - Upon NameNode startup, the fsimage file is loaded into RAM and any changes in the edits file are replayed, bringing the in-memory view of the filesystem up to date. - Each client-initiated transaction (copy,move,etc) is recorded in the journal, and the journal file is flushed and synced before the acknowledgment is sent to the client. The NameNode also updates its in-memory (RAM) representation of the filesystem metadata, which it updates after the edit log has been modified. - Starting a NameNode will bring it into service after it loads the fsimage, replays the transaction log, sees some percentage of blocks (minimally replicated) from the datanodes, and is stable for some additional amount of time. - This threshold is almost always 100% but can be changed with dfs.safemode.extension parameter. - During safe mode replication of blocks is prohibited, but it does offer a read-only view of the filesystem to clients. The NameNode awaits when all or majority of DataNodes report their blocks. Depending on how safe mode parameters are configured the name-node will stay in safe mode until a specific percentage of blocks of the system is minimally replicated. Safe mode is exited when the minimal replication condition is reached, plus an extension time of 30 seconds. The minimal replication condition is when 99.9% of the blocks in the whole filesystem meet their minimum replication level (which defaults to one and is set by dfs.replication.min. hadoop dfsadmin -safemode get - NameNode filesystem metadata is served entirely from RAM. This makes it fast, but limits the amount of metadata a box can handle. Roughly 1 million blocks occupies roughly 1 GB of heap. - Hadoop's default strategy is for the NameNode to place the first replica on the same node as the client (for clients running outside the cluster, a node is chosen at random, although the system tries not to pick nodes that are too full or too busy). The second replica is placed on a different rack from the first (off-rack), chosen at random. The third replica is placed on the same rack as the second, but on a different node chosen at random. Further replicas are placed on random nodes on the cluster, although the system tries to avoid placing too many replicas on the same rack. ### Secondary NameNode ### - IT IS NOT A BACKUP FOR THE NameNode! The name is horrible, but it is what it is. - The NameNodes edits file needs to be periodically applied to the fsimage file. NameNode may not have the available resources (CPU or RAM) to do this while continuing to provide service to the cluster so the secondary NameNode applies the updates from the edits file to the fsimage file and sends it back to the primary. This is known as the checkpointing process. - The checkpoint file is never changed by the NameNode only the Secondary NameNode. - This application of the updates (checkpointing) to the fsimage file occurs every 60 mins by default or whenever the NameNodes edits file reaches 64Meg. Which ever happens first. Newer versions of Hadoop use a defined number of transactions rather than file size to determine when to perform a checkpoint. - If the secondary NameNode is not running at all, the edit log will grow significantly and it will slow the system down. Also, the system will go into safemode for an extended time since the NameNode needs to combine the edit log and the current filesystem checkpoint image. ### Datanode ### - Daemon responsible for storing and retrieving block (chunks of a file) data is called the Datanode (DN). - Datanode has direct local access to one or more disks, (commonly called data disks) in a server on which it's permitted to store block data - Point Datanode to new disks in existing servers or adding new servers with more disks increases the amount of storage in the cluster. - Block data is streamed to and from datanodes directly, so bandwidth is not limited by a single node - Datanodes regularly report their status to the NameNode in a heartbeat - Datanode initially starts up, as well as every hour thereafter, a block report to the NameNode. A block report is simply a list of all blocks the datanode currently has on its disks. The NameNode keeps track of all the changes. ### Process of reading a file ### Reading file in HDFS called /foo/bar.txt. 1. The client uses a Hadoop client program to make the request. 2. Client program reads the cluster config file on the local machine which tells it where the namemode is located. This has to be configured ahead of time. 3. The client contacts the NameNode and requests the file it would like to read. 4. Client validation is checked by username or by strong authentication mechanism like Kerberos. 5. Once client is validated request is checked against the owner and permissions of the file. 6. If the file exists and the user has access to it then the NameNode responds with the first block id and provides a list of datanodes a copy of the block can be found, sorted by their distance to the client (reader). 7. The client now contacts the most appropriate datanode directly and read the block data it needs. This process repeats until all blocks in the file have been read or the client closes the file stream. - If while reading the file the datanode dies, library will automatically attempt to read another replica of the data from another datanode. If all replicas are unavailable, the read operation fails and the client receives an exception. - If information returned by the NameNode about block locations are outdated by the time the client attempts to contact a datanode, a retry will occur if there are other replicas or the read will fail. ### Process of writing a file ### Writing a new file to HDFS called /foo/babar.txt. 1. The client uses a Hadoop client program to make the request. 2. Client program reads the cluster config file on the local machine which tells it where the namemode is located. This has to be configured ahead of time. 3. A request is sent to the NameNode to create the file metadata 4. Client validation is checked by username or by an authentication mechanism like Kerberos. 5. If the user has the necessary permissions to do so, the metadata entry for the new file is made. However, it initially has no associated blocks. 6. NameNode responds to the client and indicates the open request was successful and that it may now begin writing data. 7. The client starts breaking up the file into pieces (packets, not TCP ones), queues them in memory and starts a data stream from this queue. 8. The client contacts the NameNode requesting a set of datanodes to which replicas of the next block should be written. 9. The namemode responds and the clients data packets are then streamed to the first datanode, which writes the data to disk, and to the next datanode, which writes to its disk, and so on. This is called a replication pipeline. Each datanode in the replication pipeline acknowledges each packet as it's successfully written. 10. The client application maintains a list of packets for which acknowledgments have not yet been received and when it receives a response, it knows the data has been written to all nodes in the pipeline. This process of writing packets to the pipeline continues until the block size is reached, at which point the client goes back to the NameNode for the next set of datanodes to write to. 11. Eventually, the client indicates it's finished sending data by closing the stream, which flushes any remaining packets out to disk and updates the NameNode to indicate the file is now complete. - If a datanode in the pipeline fails to write the pipeline is immediately closed and all packets that had been sent since the last acknowledgment are pushed back into the queue to be written so that any datanodes past the failed node in the pipeline will receive the data. The current block is given a new ID on the remaining healthy datanodes. This is done so that, should the failed datanode return, the abandoned block will appear to not belong to any file and be discarded automatically. A new replication pipeline containing the remaining datanodes is opened and the write resumes. - When a new block is created, HDFS places the first replica on the node where the writer is located. The second and the third replicas are placed on two different nodes in a different rack. The rest are placed on random nodes with restrictions that no more than one replica is placed at any one node and no more than two replicas are placed in the same rack, if possible. ### NameNode High Availability ### - NameNode high availability (or HA) is deployed as an active/passive(standby) pair of NameNodes. The edits write ahead log needs to be available to both NameNodes, and therefore is stored on a shared storage device. Currently, an NFS filer is required as the shared storage, although there are plans to remove this dependency. As the active NameNode writes to the edits log, the Standby NameNode is constantly replaying transactions to ensure it is up to date and ready to take over in the case of failure. - Datanodes are also aware of both NameNodes in an HA configuration and send block reports to both servers. - High-availability pair of NameNodes can be configured for manual or automatic failover. Default is manual failover. - In a manual failover a command must be sent to effect a state transition from one NameNode to the other. - In automatic failover, each NameNode runs an additional process called a "failover controller" that monitors the health of the process and coordinates state transitions. - Graceful failover is initiated by the admin. - A Nongraceful failover is a detected fault in the active failover controller process. - The system can use a series of increasingly drastic fencing techniques to ensure the failed node (which could still think it's active) is actually stopped. Tell it to stop via RPC, or Send a IPMI reboot message to the failed host. - It's impossible to know if a NameNode has relinquished active status or if it's simply inaccessible from the standby. - When running in HA mode the standby NameNode takes over the role of the secondary NameNode. There is no separate secondary NameNode process in an HA cluster, only a pair of NameNode processes. Most repurpose their secondary NameNode machine to be a second NameNode. - Manual failover from primary NN to Secondary NN looks like this: haadmin -failover hadoop1 hadoop2 This shuts down (fences) hadoop1 and brings up hadoop2 as the active NN. ### NameNode Federation ### - Helps overcome the limit of how much metadata the NameNode can store in memory by splitting it up across multiple NameNodes. This gives us one logical namespace from a bunch of different NameNodes. Similar to the Linux filesystem where many different devices can be mounted to different points, but still form under one named root /. - Each datanode has a block pool for each namespace. While blocks from different pools are stored on the same disks (there is no physical separation), they are logically exclusive. Each datanode sends heartbeats and block reports to each NameNode. - NameNodes do not communicate with one another and failure of one does not affect the other - Clients view the namespace via an API implementation called ViewFS. This maps slices of the filesystem to the proper NameNode. It is configured on the client side via the local core-site.xml file. - Federation does not support overlapping mount points as of right now. ### Clients ### - Client can read and write data to HDFS using different tools and API's. - Clients can be on the same physical machines as any of the Hadoop daemons, or they can be on a host separate from the cluster. - Clients that regularly use one datanode can cause the node to become unbalanced because of the block placement policy. The NameNode will assign the local machine as the destination for the first replica when an HDFS client is running on a datanode. This causes more blocks to kept on the local datanode that others. To help fix this run the balancer. hadoop balancer -threshold N (where N is the percentage of blocks within which datanodes should be with one another) This can be killed at any time without any repercussions. Apache Hadoop users can use the start-balancer.sh script. ## Commands ### - Hadoop comes with a number of command-line tools that enable basic filesystem operations. - HDFS commands are subcommands of the hadoop command-line utility # Display basic usage information hadoop fs # List files in a dir. Uses fs.default.name value in core-site.xml file if full url syntax is not used. hadoop fs -ls /user/dude or hadoop fs -ls hdfs://NameNode.blah.com:8020/home/dude # Upload file with -put or -copyFromLocal which copies file form local filesystem hadoop fs -put /etc/resolv.conf /user/dude/ # Download file from HDFS using -get or -copyToLocal. hadoop fs -get /user/dude/resolv.conf ./ # Set a replication factor for a file or dir of files with the -R hadoop fs -setrep 5 -R /user/dude/rep5/ # Run a fsck on the files we set the rep factor on and see if it looks correct hadoop fsck /user/dude/rep5 -files -blocks -locations ################# ### MapReduce ### ################# - Their are 2 versions on MapReduce in the ecosystem right now. V1 and V2. - V1 is the orginal MapReduce that uses tasktracker and jobtracker daemons. - V2 is called YARN. YARN to splits up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. Resource manager, application master, and node manager. ### MapReduce v1 ### - Developers write jobs (code) that contains of a map function and a reduce function, along with job configuration information that controls various aspects of its execution - The jobs are broken up into tasks, the tasks are scheduled to run on machines, each task's health is monitored, in case of any failures the necessary tasks are retried. - TaskTracker is a process that runs on slave (data) nodes and is responsible for instantiating and monitoring individual Map and Reduce tasks. It starts a separate JVM processes to do the actual work (called as Task Instances). - MapReduce job is made up of four distinct stages, executed in order: client job submission, map task execution, shuffle and sort, and reduce task execution - MapReduce framework provides a set of APIs for submitting jobs and interacting with the cluster. - A job is made up of code written by a developer against the MapReduce APIs and the configuration which specifies things such as the input and output datasets - The jobtracker process (running on another host in the cluster) is responsible for accepting the job submissions, scheduling tasks to run on worker nodes, and providing administrative functions such as worker health and task progress monitoring to the cluster. - Speculative execution - The speculative execution helps to offset the slow workers. The jobtracker will create multiple instances of the same task and will take the first result into consideration and the second instance of the task will be killed. - Job submission occurs over the network from any machine. It does not have to be one from the cluster. - There is one jobtracker per MapReduce cluster. If it dies all running jobs fails. So put it on a reliable server. - Tasktrackers inform the jobtracker as to their current health and status by way of regular heartbeats. Each heartbeat contains the total number of map and reduce task slots available, the number occupied, and detailed information about any currently executing tasks. After a configured period of no heartbeats the tasktracker is considered dead. - When a job is submitted to jobtracker, information about each task that makes up the job is stored in memory. After the job completes, this information is retained for a configurable window of time or until a specified number of jobs have been executed. On an active cluster where many jobs, each with many tasks, are running, this information can consume a considerable amount of RAM. Because of this monitoring jobtracker memory utilization is critical - The act of deciding which tasks of a job should be executed on which worker nodes is referred to as task scheduling. The scheduler decides when tasks get executed and in what order. - Their is 1 tasktracker process on all worker nodes, it accepts task assignments from the jobtracker, instantiates the user code, executes those tasks locally, and reports progress back to the jobtracker periodically - Both tasktrackers and datanodes processes run on the same machines, which makes each node both a compute node and a storage node. - Each tasktracker is configured with a specific number of map and reduce task slots that indicate how many of each type of task it is capable of executing in parallel. - Tasktrackers allow more map tasks than reduce tasks to execute in parallel because they consume resources differently. - Upon receiving a task assignment from the jobtracker, the tasktracker executes an attempt of the task in a separate process. A task is the logical unit of work, while a task attempt is a specific, physical instance of that task being executed. Attempts may fail, but each task in a job will have at least 1 attempt. Communication between the task attempt (usually called the child, or child process) and the tasktracker is maintained via an RPC connection over the loopback interface called the umbilical protocol. - Tasktracker uses a list of user-specified directories to hold the intermediate map output and reducer input during job execution. These user-specificed dir's are kept on the local machines filesystem. - When a failure is detected by the tasktracker, it is reported to the jobtracker in the next heartbeat. The job is rescheduled. If enough tasks from the same job fail on the same tasktracker then the node is added to a job-level blacklist. If multiple tasks from different jobs repeatedly fail on a specific tasktracker, the tasktracker in question is added to a global blacklist for 24 hours - If there is a loss of the tasktracker daemon or the entire worker node. The jobtracker, after a configurable amount of time with no heartbeats, will consider the tasktracker dead along with any tasks it was assigned. Tasks are rescheduled on other nodes. - If the jobtracker fails all jobs will fail eventually. This is a single point of failure in Hadoop. - Benefit of MapReduce is it has data locality. This is the ability to execute computation on the same machine where the data being processed is stored. This helps remove the "Store Effect" of all machines smashing a SAN for large datasets, and bogging down the network. ### Highlevel overview of a MapReduce v1 ### 1. User creates a job and submits it to the jobtracker process. 2. A map task is created that runs the user-supplied map function on each record. The map function takes a key-value pair as input and produces zero or more intermediate key-value pairs. Map tasks are executed in parallel by various machines aross the cluster. Mapper output (intermediate data) is stored on the Local file system (NOT HDFS) of each individual mapper nodes. This is typically a temporary directory location which can be setup in config by the hadoop administrator. The intermediate data is cleaned up after the Hadoop Job completes. When you set the reducers to zero no reducers will be executed, and the output of each mapper will be stored to a separate file on HDFS. The output of the reduce is normally stored in HDFS for reliability. When the mapper creates a key-value pair each key is assigned to a partition using a component called the partitioner. The partitioner is it's own key-value pair. It takes a hash of the key, modulo the number of configured reducers in the job, to get a partition number. Keys that are the same are put into the same partition. Try to picture a partition number next to each key value pair (record). Example looks like: partition 1, dog => poodle partition 2, cat => bengal partition 3, bird => hawk partition 1, dog => pug partition 3, bird => duck 3. The next step is shuffle and sort. This is preformed by the reducers (reduce tasks). Each reducer is assigned one of the partitions on which it should work. This is a flury of network copies between each reducer in the cluster so it can get the partition (intermediate key-value) data it was assigned to work on. The number of reducers is defined by the developer. The output of the reduce is normally stored in HDFS for reliability. This step uses a lot of bandwidth between servers and benefits from very fast networking like 10G. 4. After the partition data has been copied we can start performing a merge sort of the data. A merge sort takes a number of sorted items and merges them together to form a fully sorted list. Each reducer produces a separate output file, usually in HDFS. Each reducer output file usually named part-, where is the number of the reduce task within the job. The output format of the file is specified by the author of the MapReduce job. ### MapReduce v2 (YARN) ### - YARN to splits up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. ResourceManager, ApplicationMaster, and NodeManager. - Resource managment (what was the JobTracker in v1) is now done by a daemon called the ResourceManager(RM). It is responsible for creating and allocating resources to multiple applications. Each application is an individual MapReduce job. The resource manager daemon is still centralized (global) on one machine. assumes the responsibility to negotiate a specified container in which to start the ApplicationMaster and then launches the ApplicationMaster. On successful container allocations, the ApplicationMaster launches the container by providing the container launch specification to the NodeManager. These containers can be launched on any node with NodeManager in the cluster. - NodeManager(NM) is another deamon that runs on each worker node in the cluster. It runs in place of the traditional tasktracker. NodeManager launches any type of process, dictated by the application, in an application container. It also manages the ApplicationMaster daemon. - Jobs (applications) are now managed and executed by the per application ApplicationMaster(AM) daemon. This deamon can run on any node of the cluster. It is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks. Jobs are now isolated from each other and are decentrailized. Every application has its own instance of an ApplicationMaster. AM is essentially user code and it is not to be trusted. It is not run at a privileged level. - An application can ask for specific resource requests via the ApplicationMaster to satisfy its resource needs. The Scheduler responds to a resource request by granting a container, which satisfies the requirements laid out by the ApplicationMaster in the initial ResourceRequest. The ApplicationMaster has to take the Container and present it to the NodeManager managing the host, on which the container was allocated, to use the resources for launching its tasks. - MRV2 maintains API compatibility with previous stable release (hadoop-0.20.205). This means that all MapReduce jobs should still run unchanged on top of MRv2 with just a recompile. - Don't run MRv1 and YARN on the same set of nodes at the same time. It is not supported. ########################## ### Planning a cluster ### ########################## ### Distribution and software version ### - Download tarballs from Apache Foundation. v1.0 has support for building RPM's or DEB's. - Cloudera has it's own disto called CDH. It backports critical fixes, provides packages for a number of different operating systems, and has a commercial-grade QA and testing process. - CDH currently includes Apache Hadoop, Apache HBase, Apache Hive, Apache Pig, Apache Sqoop, Apache Flume, Apache ZooKeeper, Apache Oozie, Apache Mahout, and Hue. - CDH4 is based on Apache Hadoop 2.0.0 - CDH is available as tarballs, RedHat Enterprise Linux 5 and 6 RPMs, SuSE Enterprise Linux RPMs, and Debian Deb packages. ### Hardware ### - Master nodes (jobtracker,NameNode,secondary NameNode) - Redundency is key. Dual power supplies, bonded nic's. RAID 10 possibly or NameNode. - Lot's and lot's of RAM. As much as you can afford. - OS disks should be RAID 1 mirror. - NameNode and jobtracker produce a lot of logs so have a good amount of disk space - Small cluster (20 workers or less) dual quad-core 2.6 Ghz CPU, 24 GB of DDR3 RAM, dual 1 Gb Ethernet NICs, a SAS drive controller, and at least two SATA II drives in a JBOD configuration in addition to the host OS device. - Medium cluster (up to 300 workers) 48 GB ram, and the rest the same as small cluster. - Master nodes in large clusters should have a total of 96 GB of RAM, and the rest the same as small cluster. - NameNode considerations - Length of filenames actually starts to matter at scale. The longer the name the more memory it takes up. - Lot's of small files use up metadata. The more files there are to track the more memory we need. - NameNodes don't need lots of storage as they only store what fits in memory. The metadata dump can't be bigger than memory. - NameNode disk reliability is very important. Use RAID or send multiple copies to different JBOD disks. Also a copy of data should be written to an NFS share. That is a last ditch recovery effort. - Secondary NameNode is usually the same hardware config as the NameNode. - Jobtracker hardware is similar to NameNode as it is RAM hungry. Jobtracker keeps metadata information about the last 100 (by default) jobs executed on the cluster in RAM. This can be tweaked though. - Worker/datanodes - Data disks should be as big of disks as you can afford setup in JBOD configuration. 20-30% of the machine's raw disk capacity needs to be reserved for temporary data. - Each task needs ~ between 2 GB and 4 GB of memory, depending on the task being performed. - Each worker node in the cluster executes a predetermined number of map and reduce tasks simultaneously. A cluster administrator configures the number of these slots, and Hadoop's task scheduler a function of the jobtracker�assigns tasks that need to execute to available slots. Each one of these slots can be thought of as a compute unit consuming some amount of CPU, memory, and disk I/O resources, depending on the task being performed. It is very dynamic and has to be tuned to the hardware you have. - Mid size would be 2 - 6 core processors, 64Gig ECC RAM, 12 SATA drives, SAS 6 Gb/s, 2 - 1G Ethernet, Allocate memory to take advantage of triple or quad channel. - High end would be 2 - 6 core processors, 96Gig ECC RAM, 2 - SAS 6 Gb/s, 24 � 1 TB SFF Nearline/MDL, 1 - 10G NIC, Allocate memory to take advantage of triple or quad channel. - Hadoop does not generaly benefit from Virtualization. Guest OSes are unaware of one another as they perform I/O scheduling operations and, as a result, can cause excessive drive seek. Hadoop us - Blade enclosures commonly share I/O planes and network connectivity, and the blades themselves usually have little to no local storage. They are built for more compute intensive workloads. - JBOD is preferred for datanodes. Multiple independent disks doing sequential reads is better than a group of disks working together. Also, variation in drive speeds will not matter to independent disks. - SAN's and NAS's are not good for Hadoop. They are usually oversubscribed only a small number of machines can do concurrent I/O at one time. ### Cluster sizing ### - You could choose based on your storage needs. - Sample growth plan Average daily ingest rate 2TB Replication factor 3 Daily raw consumption 6 TB Ingest x replication Node raw storage 24 TB 12 x 2 TB SATA II HDD MapReduce temp space reserve 25% For intermediate MapReduce data Node-usable raw storage 18 TB Node raw storage � MapReduce reserve 1 year (flat growth) 122 nodes Ingest x replication x 365 / node raw storage - Watch the trade off of CPU for disk space density pitfall. You could try to purchase machines that are half as powerful, but buy twice as many. Then you get in power,space,cooling, network port density issues. It's preferable to purchase reasonably dense machines without falling outside the normal boundaries of what is considered commodity hardware. ### OS selection, prep, layout ### - Linux is your best bet for a good production quality OS. Pick the distro your most comfortable with and what works best with your hardware. Also look at support and tools offered for each. - Use a software configuration management system to help you configure a large number of machines - Directory layout - Hadoop home - Hadoop software is installed here. Usually in /opt or /usr/local or /usr. - Datanode data directories - used by the datanode to store HDFS block data. assumes that each directory provided is a separate physical device with independent spindles. Often put on the same devices as the tasktracker MapReduce local directories - NameNode directories - used by the NameNode to store filesystem metadata. NameNode assumes that each directory provided is a separate physical device and replicates all writes to each device synchronously. Directories will all require the same amount of space and generally do not use more than 100 GB. - MapReduce local directories - One or more directories used by the tasktracker to store temporary data during a MapReduce job. The more spindles the better. Often put on the same devices as the datanode data directories. - Hadoop log directory - used by all daemons to store log data as well as job and task-level data. - Hadoop pid directory - small, does not grow. Daemons store pid id's in here. - Hadoop temp directory - small, short-lived files it sometimes needs to create. /tmp/hadoop-<${user.name}> by default. - Java SDK is critical for Hadoop since it's run with Java. HotSpot JVM is, by far, the best performing, most stable implementation available. Cloudera's RPM packages require Oracle Java RPM. - All machines in the cluster should run the exact same version of Java, down to the patch level. Use 64bit version. - Hadoop can be deployed in stand alone mode, pseudo-distributed mode or fully-distributed mode. ### Hostnames ### - Tasktracker and datanodes heartbeat into the jobtracker and NameNode (respectively) every few seconds. Heartbeat includes the identity of the machine, either by hostname or by IP address. This identity is how it will refer to this machine. If you contact the NameNode to open a file on a machine it will return the name the worker uses in its heartbeat - Clients must be able to resolve the hostname and communicate with the worker using the identifier as it was reported to the NameNode. This means the hostname of the nodes in the cluster needs to get set correctly in all places on the OS. ### Users, Groups, Privs ### - Try to run HDFS daemons as one user and MapReduce daemons as another. Jobtracker,Tasktracker, Child tasks as user mapred. In secure mode child tasks are run as the user that submitted the job. NameNode, Secondary NameNode,Datanode run as hdfs. ### Tuning and conf changes ### - /etc/security/limits.conf file. Remove file limits for Hadoop daemons. hdfs - nofile 32768 mapred - nofile 32768 hbase - nofile 32768 - /etc/sysctl.conf. Change following Linux kernel parameters vm.swappiness=0 vm.overcommit_memory=1 - Do not use Linux Volume Manager for Hadoop data disks - When mouting filesystems disable dir and file access times (atime). ### Network usage and design ### - HDFS and MapReduce exhibit strong East/West, or full node-to-node communication patterns - Tasktrackers regularly heartbeat small bits of information to the jobtracker to indicate they're alive. - During job submission, the jobtracker communicates with the NameNode, but also in the form of small RPC requests - Each reducer must fetch the map output data for its partition from each tasktracker in the cluster. - Shuffle phase accounts for a rather significant amount of East/West traffic within the cluster. - Traditional Tree Topology - predominant architecture deployed in data centers today - Has multiple tiers, each of which brings together (or aggregates) the branches of another tier - Hosts are connected to leaf or access switches in a tree, which are then connected via one or more uplinks to the next tier - Example: First tier are 48x1GbE switchs with four 10GbE ports. Second tier is 1 48x10GbE port to tie all of the trunked 10GbE ports together. Hosts sit on the 1GbE ports. This will get you 576 hosts. Ports are oversubscribed 1.2:1 - Adding more hosts means adding more tiers towards the top. Problem is adding more tiers adds more hop points and the amount of oversubscription is compounded with each tier in the tree. - Cluster data ingress and egress should be nearest to the root of a tree network prevents branch monopolization and unbalanced traffic patterns - Tree network works for small and midsize networks that fit within two tiers. - Never place low latency services on the cluster distribution switch - Spine Fabric - Preferred topology for Hadoop traffic because it maximizes east/west traffic patterns and is redundant - Tries to be as close to equal distance between any two hosts as possible - Each leaf switch (1st tier) has one or more uplinks to every switch in the second tier, called the spine - A routing protocol such as IS-IS, OSPF, or EIGRP is run with equal cost multipath (ECMP) routes so that traffic has multiple path options and takes the shortest path between two hosts - ECMP routing says we can simply take a different path to the same place; the bandwidth isn't gone, just spread out - This equidistant, uniform bandwidth configuration is perfect for applications with strong East/West traffic patterns. ########################## ### Install and Config ### ########################## ### Install ### - Tarball installs are difficult because of all the extra steps needed like permissions and log relocation, etc. It is however the most flexible. - Installing from RPM or Deb packages is easier. Saves you from making common mistakes and helps keep best pratices. - Packages install to places like /usr/bin, /usr/lib, /etc/hadoop. - For a production environment root privs will be required. It's harder to install without root privs. You can do it with the tarball install and change the owner to someone other than root. - CDH install mods the /etc/security/limits.d file for you. alternatives system, which allows multiple alternative versions of files to be installed, software installed to /usr/lib/hadoop-0.20 and symlinked to /usr/lib/hadoop by way of alternatives ### Config ### - Global, cluster level parameters control how the software itself is deployed, service identification, access controls, and integration with the OS and external systems - Not all these parameters can not be changed without restarting the daemons which they affect - Developers or automated systems that submit jobs can override these values in code or from the command line, where applicable - Some parameters can be specified per operation, provided the code or context supports it - The following configuration files exist in the conf directory: - hadoop-env.sh - A bourne shell fragment sourced by the Hadoop scripts, this file specifies environment variables that affect the JDK used by Hadoop, daemon JDK options, the pid file, and log file directories. - core-site.xml - An XML file that specifies parameters relevant to all Hadoop daemons and clients. - hdfs-site.xml - An XML file that specifies parameters used by the HDFS daemons and clients. - mapred-site.xml - An XML file that specifies parameters used by the MapReduce daemons and clients. - log4j.properties - A Java property file that contains all log configuration information. Covered in - masters (optional) - A newline separated list of machines that run the secondary NameNode, used only by the start-*.sh helper scripts. - slaves (optional) - A newline separated list of machine names that run the datanode / tasktracker pair of daemons, used only by the start-*.sh helper scripts. - fair-scheduler.xml (optional) - The file used to specify the resource pools and settings for the Fair Scheduler task scheduler plugin for MapReduce. - capacity-scheduler.xml (optional) - The name of the file used to specify the queues and settings for the Capacity Scheduler task scheduler plugin for MapReduce. - dfs.include (optional, conventional name) - A newline separated list of machine names that are permitted to connect to the NameNode. Blank file means no nodes are allowed to connect. - NameNode.dfs.exclude (optional, conventional name) - A newline separated list of machine names that are not permitted to connect to the NameNode. - hadoop-policy.xml - An XML file that defines which users and / or groups are permitted to invoke specific RPC functions when communicating with Hadoop. - mapred-queue-acls.xml - An XML file that defines which users and / or groups are permitted to submit jobs to which MapReduce job queues. - taskcontroller.cfg - A Java property−style file that defines values used by the setuid task-controller MapReduce helper program used when operating in secure mode. - Hadoop scripts ensure the conf directory is always at the head of the classpath so files can easily be located by the code - Administrators can mark a property as final to prevent a developer override of properties looks like true ### core-site.xml important properties ### - fs.default.name - This paremeter is not depreacted and has been replaced by fs.defaultFS. But it is still given on the test in 2013 so still know that the parameter is a URL that specifies the default filesystem used by clients. Default set to file:/// (local file system) prod clusters usually use hdfs://hostname:port which points to NameNode. - fs.defaultFS - Replaces fs.default.name. Default value is file:///. The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a filesystem. - fs.checkpoint.dir - The fs.checkpoint.dir parameter specifies the comma separated list of directories used by the secondary NameNode in which to store filesystem metadata during a checkpoint operation - fs.checkpoint.size - Default is 67108864 (64Meg) The size of the current edit log (in bytes) that triggers a periodic checkpoint even if the fs.checkpoint.period hasn't expired. - io.file.buffer.size - used as a general purpose buffer size. Property should be set to a multiple of the system page size, defined in bytes, and is 4KB by default. Try starting value at 65536. - fs.trash.interval - Number of minutes after which the checkpoint gets deleted. If zero, the trash feature is disabled. This option may be configured both on the server and the client. If trash is disabled server side then the client side configuration is checked. If trash is enabled on the server side then the value configured on the server is used and the client configuration value is ignored. - hadoop.security.authentication - Possible values are simple (no authentication), and kerberos ### hdfs-site.xml important properties ### - dfs.name.dir - Specifies a comma separated list of local directories (with no spaces) in which the NameNode should store a copy of the HDFS filesystem metadata. encouraged to specify two internal disks and a low latency, highly reliable, NFS mount. - dfs.name.edits.dir or dfs.namenode.edits.dir - Determines where on the local filesystem the DFS name node should store the transaction (edits) file. If this is a comma-delimited list of directories then the transaction file is replicated in all of the directories, for redundancy. Default value is same as dfs.name.dir - dfs.data.dir or dfs.datanode.data.dir - indicate where datanodes should store HDFS block data. Also a comma separate list, rather than mirroring data to each directory specified, the datanode round robins blocks between disks - dfs.replication - Default block replication (3). The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. - dfs.blocksize - The default block size for new files, in bytes. - dfs.permissions.enabled - If "true", enable permission checking in HDFS. - dfs.datanode.du.reserved - Reserved space in bytes per volume. Always leave this much space free for non dfs use. - dfs.permissions.supergroup - a designated group is given special privileges equivalent to being the superuser - dfs.namenode.name.dir.restore - Set to true to enable NameNode to attempt recovering a previously failed dfs.namenode.name.dir. When enabled, a recovery of any failed directory is attempted during checkpoint. - dfs.namenode.shared.edits.dir - A directory on shared storage between the multiple namenodes in an HA cluster. This directory will be written by the active and read by the standby in order to keep the namespaces synchronized. This directory does not need to be listed in dfs.namenode.edits.dir above. It should be left empty in a non-HA cluster. - dfs.namenode.safemode.threshold-pct - Specifies the percentage of blocks that should satisfy the minimal replication requirement defined by dfs.namenode.replication.min. Values less than or equal to 0 mean not to wait for any particular percentage of blocks before exiting safemode. Values greater than 1 will make safe mode permanent. - dfs.namenode.safemode.min.datanodes - Specifies the number of datanodes that must be considered alive before the name node exits safemode. - dfs.hosts - Names a file that contains a list of hosts that are permitted to connect to the namenode. The full pathname of the file must be specified. If the value is empty, all hosts are permitted. - dfs.hosts.exclude - Names a file that contains a list of hosts that are not permitted to connect to the namenode. The full pathname of the file must be specified. If the value is empty, no hosts are excluded. ### mapred-site.xml important properties ### - mapred.job.tracker - provides the same for tasktrackers and MapReduce clients Parameter is a hostname (or IP address) and port pair on which the jobtracker listens for RPC communication Standard port is 8021. - mapred.local.dir - map tasks in a MapReduce job use the machine's local disk to store their intermediate output. Uses a comma seperated list of disks, sometimes sharing disks with the data in dfs.data.dir - mapred.child.ulimit - The maximum virtual memory, in KB, of a process launched by the Map-Reduce framework. This can be used to control both the Mapper/Reducer tasks and applications using Hadoop Pipes, Hadoop Streaming etc. By default it is left unspecified to let cluster admins control it via limits.conf and other such relevant mechanisms. - mapred.tasktracker.map.tasks.maximum - The maximum number of map tasks that will be run simultaneously by a task tracker. - mapred.tasktracker.reduce.tasks.maximum - The maximum number of reduce tasks that will be run simultaneously by a task tracker. - io.sort.mb - The total amount of buffer memory to use while sorting files, in megabytes. By default, gives each merge stream 1MB, which should minimize seeks. - io.sort.factor - The number of streams to merge at once while sorting files. This determines the number of open file handles. - mapred.map.tasks - The default number of map tasks per job. Ignored when mapred.job.tracker is "local". mapred.reduce.tasks - The default number of reduce tasks per job. Typically set to 99% of the cluster's reduce capacity, so that if a node fails the reduces can still be executed in a single wave. Ignored when mapred.job.tracker is "local". - mapred.map.max.attempts - The maximum number of attempts per map task. In other words, framework will try to execute a map task these many number of times before giving up on it. - mapred.reduce.max.attempts - The maximum number of attempts per reduce task. In other words, framework will try to execute a reduce task these many number of times before giving up on it. - mapred.reduce.parallel.copies - The default number of parallel transfers run by reduce during the copy(shuffle) phase. - mapred.task.timeout - The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string. - mapred.hosts - Names a file that contains the list of nodes that may connect to the jobtracker. If the value is empty, all hosts are permitted. - mapred.hosts.exclude - Names a file that contains the list of hosts that should be excluded by the jobtracker. If the value is empty, no hosts are excluded. - Tuning a worker node - start with a baseline configuration of 1.5 tasks for each physical CPU core. Two thirds allocated to map task slots and the remaining one third as reduce task slots ### Logging ### - Hadoop uses the Java logging package log4j to control log output. log4j.properties file in the conf/ dir. This file controls the overall log levels of both the Hadoop daemons as well as MapReduce jobs - System logfiles produced by Hadoop are stored in $HADOOP_INSTALL/logs by default. - This can be changed using the HADOOP_LOG_DIR setting in hadoop-env.sh. - Each Hadoop daemon running on a machine produces two logfiles. The first is the log output written via log4j. This file, which ends in .log, should be checked fisrt when diagnosing problems because most application log messages are written here. The standard Hadoop log4j configuration uses a Daily Rolling File Appender to rotate logfiles. Old logfiles are never deleted, so you should arrange for them to be periodically deleted or archived, so as to not run out of disk space on the local node. - The second logfile is the combined standard output and standard error log. This logfile, which ends in .out, usually contains little or no output, since Hadoop uses log4j for logging. It is rotated only when the daemon is restarted, and only the last five logs are retained. Old logfiles are suffixed with a number between 1 and 5, with 5 being the oldest file. - Logfile names (of both types) are a combination of the name of the user running the daemon, the daemon name, and the machine hostname. For example, hadoop-blahdatanode-poofy.local.log.2008-07-04 is the name of a logfile after it has been rotated. - logger is a named channel for log events that has a specified minimum log level - The log levels, in order of most severe to least, are FATAL, ERROR, WARN, INFO, DEBUG, and TRACE. Log events with a log level greater than or equal to that which is specified are accepted while less severe events are simply discarded - each logger has a parent logger from which it inherits its configuration information - Loggers output their log events to an appender which is responsible to handling the event. Most common appenders log to disk, but you can send them to places like syslog or console. - log4j.rootLogger which specifies the default log level and appender to be used by all loggers - Loggers can be specified by using the naming convention of log4j.logger.logger-name - Value of a logger parameter is always a log level, a comma, and the name of one or more appenders. Ex: hadoop.root.logger INFO, console then lower down you might see log4j.rootLogger ${hadoop.root.logger}, EventCounter ### Monitoring cluster ### - start the NameNodes and datanodes, and verify that all datanodes are communicating with each NameNode - Each NameNode has two primary URLs: one that shows the view of the namespace managed by that NameNode, and an aggregate cluster view that shows all NameNodes that make up the cluster - URL of view of the namespace managed by that NameNode: http://NameNode.blah.com:50070/dfshealth.jsp - Html page has the following info on it. NameNode localhost.localdomain:8020 (active) Started: sat sep 13 4:4:44 EST 2013 Version: 2.0.0-cdh4.3.0, 234929d9s9839s98sd7f9d7s9g Compiled: Mon May 3 3:3:33 by blah from Unknown Upgrades: There are not upgrades in progress Cluster ID: cluster11 Blockpool ID: BP-23434343333-127.0.0.1-34322343ll3434 [Browse the filesystem] [NameNode Logs] -------------------------------------------------------------- Cluster Summary Security is OFF 124 files and driectories, 91 blocks = 215 total. Heap memory used 41.31 MB is 50% of Commited Heap Memory 81.06 MB. Max Heap Memory is 171.94 MB. Non Heap Memory used 47.55 MB is 70% of Commited Non Heap Memory 67.05 MB. Max Non Heap Memory is 130MB Configured Capacity : 11.07GB DFS Used : 58.96 MB Non DFS Used : 2.95GB DFS Remaining : 8.06GB DFS Used% : 0.52% DFS Remaining% : 72.82% Block Pool Used : 58.96 MB Block Pool Used% : 0.52% DataNodes Usages : Min % Median % Max % stdev % 0.52% 0.52% 0.52% 0.00% [Live Nodes] : 1 (Decomissioned: 0) [Dead Nodes] : 0 (Decomissioned: 0) [Decomissioning Nodes] : 0 Number of unreplicated blocks : 91 NameNode Journal Status: Current transaction ID: 894 --------------------------------- -------------------------------------------- Journal Manager State -------------------------------- --------------------------------------------- FileJournalManager(root=/dfs/nn) EditLogFileOutputStream(/dfs/nn/current/edits_inprogress_00000000844 NameNode Storage: ------------------- ----- ----------- Storage Directory Type State ------------------ ---- ------------- /dfs/nn IMAGE_AND_EDITS Active - URL of view of aggregate cluster view that shows all NameNodes that make up the cluster. This is the federated NameNode cluster view. http://NameNode.blah.com:50070/dfsclusterhealth.jsp - Html page looks like: Cluster ' cluster11' Cluster Summary Total files and directories 124 Configured Capacity : 11.07GB DFS Used : 58.96 MB Non DFS Used : 2.95GB DFS Remaining : 8.06GB DFS Used% : 0.52% DFS Remaining% : 72.82% ----------------------------------------- NameNodes Number of NameNodes : 1 NameNode Blockpool Files and Blocks Missing Live Datenode Dead Datanode Used Directories Blocks (Decomissioned) (Decomissioned) --------------------- --------- ----------- ----- ------ -------------- -------------- localhost.localdomain 58.96MB 124 91 0 1 (0) 0 (0) - URL of view of the jobtracker http://jobtracker.blah.com:50030/jobtracker.jsp - Html page looks like: localhost Hadoop Map/Reduce Administration State: RUNNING Started: Sat Sep 14 12:22:33 PDT 2013 Version: 2.0.0.-mr1-cdh4.3.0, Unknown Compiled: Mon May 14 12:22:33 by blah from Unknown Identifier: 202023003020 Cluster Summary (Heap Size is 81.06MB/171.94 MB) Running Running Total Nodes Occupied Occupied Reserve Map Reduce Avg. Blacklisted Excluded Map Reduce Submissions Map Reduced Reduce Task Task Task/Node Nodes Nodes Tasks Tasks Slots Slots Slots Capacity Capacity ------ ------- ----------- ------ -------- -------- ------- -------- -------- --------- ----------- -------- 0 0 0 1 0 0 0 0 0 6.00 0 0 Scheduling Information Queue Name State Scheduling Information ----------- ------ ---------------------- default running N/A Filter (Jobid,Priority, User, Name ______________ Running Jobs ------------ None Retired Jobs ------------- None Local Logs [Log] directory, [Job Tracker History] - sudo -u hdfs hadoop dfsadmin -report ### Quick HTTP Ports Reference ### HDFS Port Config Parameter ------------- Namenode 50070 dfs.http.address Datanodes 50075 dfs.datanode.http.address Secondary NameNode 50090 dfs.secondary.http.address Backup/Checkpoint node 50105 dfs.backup.http.address Mapreduce Port Config Parameter ------------ JobTracker 50030 mapred.job.tracker.http.address Tasktrackers 50060 mapred.task.tracker.http.address ################################# ### Quotas and Job Schedulers ### ################################# ### Quota ### - Setting a quota on a directory (size is in bytes) - dfsadmin -setSpaceQuota size path - Look a quotas of a dir - hadoop fs -count -q /user/blah - Column one is the file count quota and two is the file count remaining column. The third and fourth columns are the space quota and remaining space in bytes - Remember: HDFS quota is applied to the physical (post-replication) size rather than the logical (pre-replication) size - You can also apply quotas on the number of files that may be created within a directory. hadoop dfsadmin -setQuota number path ### Schedulers ### - Task slots are shared across jobs that are executing concurrently. The scheduler is responsible for assigning tasks to open slots on tasktrackers - The scheduler is a plug-in within the jobtracker - FIFO (First In First Out) Scheduler - Default scheduler in Hadoop - Very simple concept. Jobs are processed in the order they are recieved. - Supports five levels of job prioritization, very low, low, normal, high, very high. Each is implemented in a seperate queue. - Higher tasks are processed before lower tasks. No matter when the jobs come in higher always trumps lower. - To set this edit the mapred-site.xml file and set to mapred.jobtracker.taskScheduler - Fair Scheduler - Jobs, which are submitted to queues, are placed into pools. Each pool is assigned a number of task slots based on a number of factors including the total slot capacity of the cluster, the current demand (where "demand" is the number of tasks in a pool) on other pools, minimum slot guarantees, and available slot capacity. Pools may optionally have minimum slot guarantees. These pools are said to have an SLA, with the minimum number of slots providing the vehicle for ensuring task scheduling within a given period of time. Beyond the minimum slot guarantees, each pool gets an equal number of the remaining available slots on the cluster; this is where the "fair share" portion of the name comes from. - By default, no pools have minimums and so all pools simply receive an equal number of slots. - Pools can be added dynamically, in which case, the number of slots they receive is adjusted - The scheduler determines to which pool a job (and really, it's tasks) are assigned by a MapReduce property usually set to user.name. This yields a pool per user, each of which receives an equal number of slots - Each time a tasktracker heartbeats to the jobtracker and reports available slots the rules are evauluated and changes are made. - Total capacity - In the context of scheduling, total capacity (or total cluster capacity) is the sum of all slots of each type (map slots and reduce slots) regardless of their state. Total capacity is not a static number - Total available capacity - The total available capacity is the number of open slots in a cluster. - Pool - A pool is a container for a group of jobs and the recipient of resource allocation. - Demand - A pool is said to have demand if and only if there are queued tasks that should be assigned to it. - Fair share - The "fair" number of slots a pool should receive. - Minimum share - An administrator-configured number of slots that a pool is guaranteed to receive. By default, pools have no minimum share. - When assigning tasks, the scheduler first looks at the demand for each pool. No demand then no slots are given. - The scheduler gives each pool with demand its minimum share, if min share is configured. Min share is always 1st. - Once the minimum shares satisfied the scheduler switches to allocating the remaining slots - If the sum of the minimum shares of pools with demand is greater than the total capacity, the minimum shares of each pool are adjusted pro rata. - The scheduler now tries to assign the rest of the slots evenly across the pools. - The scheduler never allocates more slots than the demand. If a user wants 10 slots and 20 are available then they only get 10 for their pool. - In addition to, or in place of a minimum share, pools may also have a weight. Pools with greater weight receive more slots during fair share allocation (weight does not impact minimum share allocation). The weight of a pool simply acts as a multiplier; a weight of 2 means the pool receives two slots to every one slot the other pools receive. By default, pools have a weight of 1. - Job priorities, like those supported in the FIFO scheduler, are also supported in the Fair Scheduler. When a priority is set, it simply affects the weight of the job; the higher the priority, the greater the weight, and the more tasks assigned during fair share allocation. - If two or more jobs could be submitted to the same pool by different users then The Fair Scheduler uses another instance of itself to schedule jobs within each pool. It evenly splits up the jobs in the pool. - When a job is submitted to a pool with a minimum share and those slots have been given away it can kill tasks or wait for them to complete. This is called preemption. - Delayed task assignment (sometimes called delay scheduling) is done by the scheduler to increase the data locality hit ratio and as a result, the performance of a job. This lets a free slot on a tasktracker remain open for a short amount of time if there is no queued task that would prefer to run on the host in question - Choose the Fair Scheduler over the Capacity Scheduler if: - You have a slow network and data locality makes a significant difference to job runtime. Features like delay scheduling can make a dramatic difference in the effective locality rate of map tasks. - You have a lot of variability in the utilization between pools. The Fair Scheduler's preemption model affects much greater overall cluster utilization by giving away otherwise reserved resources when they're not used. - You require jobs within a pool to make equal progress rather than running in FIFO order. - Capacity Scheduler (CS) - An administrator configures one or more queues, each with a capacity, a predetermined fraction of the total cluster slot capacity. It is reserved for the queue in question and is not given away in the absence of demand. - During the tasktracker heartbeat, slots are given to queues, with the most starved queues receive slots first. Queue starvation is measured by dividing the number of running tasks in the queue by the queue's capacity or in other words, its percentage used. Any additional slots beyond the sum of the queue capacities defined may be freely assigned to any queue, as needed, by the scheduler. - Within a queue, jobs for the same user are FIFO ordered. - CS additionally understands scheduling tasks based on (user defined) memory consumption of a job's tasks. It uses information collected by the tasktracker to aid in scheduling decisions - Administrators may specify a default virtual and physical memory limit on tasks that users may optionally override upon job submission. The scheduler then uses this information to decide on which tasktracker to place the tasks, - The CS has feature where job initialization is performed lazily, which can reduce the required memory footprint of the jobtracker. This feature allows an administrator to specify a maximum number of jobs to initialize, per user, per queue. - Choose the CS scheduler if: - You know a lot about your cluster workloads and utilization and simply want to enforce resource allocation. - You have very little fluctuation within queue utilization. The CS's more rigid resource allocation makes sense when all queues are at capacity almost all the time. - You have high variance in the memory requirements of jobs and you need the CS's memory-based scheduling support. - You demand scheduler determinism. ############################### ### Monitoring and Managing ### ############################### - Starting a proccess using scripts can be done from the /etc/init.d dir with the process name you want to start and using start|stop|restart command. You need to become root before doing this. - Starting a process in the foreground can be done with the hadoop command "hadoop process" where process is the name of the process you want to start. - Adding a datanode: 1. Add hosts ip address to the dfs.hosts file if using host include functionality. 2. run hdfs dfsadmin -refreshNodes as HDFS superuser. 3. Update any rack awareness files. 4. Start datanode process on the datanode. 5. Check hadoop dfsadmin -report or webUI for new node. 6. Run the balancer to redistribute the blocks. - Adding a tasktracker node: 1. Follow the procedure for adding a datanode to HDFS. 2. Run the balancer utility to distribute existing block data to the new datanode. 3. Start the tasktracker process. 4. Confirm that the jobtracker can communicate with the new tasktracker by checking the number of available tasktrackers in its web user interface. - The decommissioning process relies on the HDFS host include and exclude files. If you are not using these files, it is not possible to gracefully decommission a datanode. To decomission do the following 1. Add the ip of the node to the dfs.hosts.exclude file. 2. Run hadoop dfsadmin -refreshNodes as HDFS superuser. 3. Monitor the NameNode web UI to make sure it starts the decomisison process. 4. Wait a very long time and keep watching the NameNode web UI until it says "decomissioned" next to the nodes name. 5. Stop the datanode process. 6. Remove it from the HDFS include and exclude files as well as any rack topology database. 7. Run hadoop dfsadmin -refreshNodes so NameNode sees the removal. - Check the health of the HDFS filesystem with the fsck command. HDFS is considered healthy if and only if all files have a minimum number of replicas available. The command is sudo -u hdfs hadoop fsck / - Dealing with a failed disk - Is determined by: 1. The specified path is a directory. 2. The directory exists. 3. The directory is readable. 4. The directory is writable. - Default is if one disk fails node is considered dead. To fix and replace 1. Stop any Hadoop-related processes (optionally following the decommissioning process for the datanode). 2. Replace any failed disks. 3. Follow the process for adding the node back into the cluster. 4. Run the Hadoop fsck utility to validate the health of HDFS. Over-replicated blocks are normal immediately after a node is reintroduced to the cluster, which is automatically corrected (over replicated files are deleted) over time. - When a datanode loses network connection for a few minutes - The replication factor is actively maintained by the NameNode. The NameNode monitors the status of all datanodes and keeps track which blocks are located on that node. The moment the datanode is not avaialble the NameNode will notice that certian blocks will be under replicated. The NameNode begins replicating the blocks that were stored on the dead datanode. The NameNode Orchestrates the replication of data blocks from one datanode to another. The replication data transfer happens directly between datanodes and the data never passes through the NameNode. The NameNode also makes sure that not all replicas of a block are located on one rack. If the NameNode detects that a block's replicas end up at one rack, the NameNode treats the block as mis-replicated and replicates the block to a different rack - However, if the datanode comes back up, over replicated data will be deleted. Note: the data might be deleted from the original datanode. ################# ### Ecosystem ### ################# - Apache Hive - Converts your SQL query into a series of MapReduce jobs for execution on a Hadoop cluster. Hive organizes data into tables, which provide a means for attaching structure to data stored in HDFS. Metadata such as table schemas is stored in a database called the metastore. Everyone can use the metastore if it is centrally kept. - To install - just download tarball from website and untar it: % tar xzf hive-x.y.z.tar.gz - It's handy to put Hive on your path to make it easy to launch: % export HIVE_INSTALL=/home/blah/hive-x.y.z-dev % export PATH=$PATH:$HIVE_INSTALL/bin - Now type hive to launch the Hive shell: % hive hive> - Log files located in /tmp/$USER/hive.log, Check conf/hive-log4j.properties, and you can edit this file to change log levels and other logging-related settings. - Pig - The language used to express data flows, called Pig Latin. - The execution environment to run Pig Latin programs. There are currently two environments: local execution in a single JVM and distributed execution on a Hadoop cluster. - A Pig Latin program is script made up of a series of operations, or transformations, that are applied to the input data to produce output. Taken as a whole, the operations describe a data flow, which the Pig execution environment translates into an executable repre- sentation and then runs. Under the covers, Pig turns the transformations into a series of MapReduce jobs - Pig runs as a client-side application. Even if you want to run Pig on a Hadoop cluster, there is nothing extra to install on the cluster: Pig launches jobs and interacts with HDFS (or other Hadoop filesystems) from your workstation. - To install (you need Java 6) - Download and untar: tar xzf pig-x.y.z.tar.gz - % export PIG_INSTALL=/home/tom/pig-x.y.z % export PATH=$PATH:$PIG_INSTALL/bin - You also need to set the JAVA_HOME environment variable to point to a suitable Java installation. - You can execute Pig in local or MapReduce mode. Local mode is for a small local file (pig -x local) or you can run it in MapReduce mode Pig translates queries into MapReduce jobs and runs them on a Hadoop cluster. pig -x mapreduce - Grunt is an interactive shell for running Pig commands. Just run Pig with no options. - Zookeeper - Distributed coordiniation service with a set of tools to build distributed applications that can safely handle partial failures - ZooKeeper is, at its core, a stripped-down filesystem that exposes a few simple operations, and some extra abstractions such as ordering and notifications. - ZooKeeper runs on a collection of machines and is designed to be highly available. No single point of failure. - ZooKeeper interactions support participants that do not need to know about one another. Processes can drop notes to each other even if they don't know the other process. - Installing - Download and untar: % tar xzf zookeeper-x.y.z.tar.gz - % export ZOOKEEPER_INSTALL=/home/blah/zookeeper-x.y.z - % export PATH=$PATH:$ZOOKEEPER_INSTALL/bin - Config file is zoo.cfg and is usually placed in the conf/ dir. tickTime=2000 dataDir=/Users/tom/zookeeper clientPort=2181 - Start local zookeeper server: % zkServer.sh start - Sqoop - Allows users to extract data from a structured data store (like a RDBMS) into Hadoop for further processing - This processing can be done with MapReduce programs or other higher-level tools such as Hive - When the final results of an analytic pipeline are available, Sqoop can export these results back to the data store for consumption by other clients. - Installing - Download and untar: tar xvzf sqoop-x.y.z.tar.gz - % export SQOOP_HOME=/home/blah/sqoop export ENV var - % $SQOOP_HOME/bin/sqoop then run it. standard location is /usr/bin/ - Sqoop has an extension framework that makes it possible to import data from�and export data to�any external storage system that has bulk data transfer capabilities. A Sqoop connector is a modular component that uses this framework to enable Sqoop imports and exports. - Sqoop ships with connectors for working with a range of popular relational databases, including MySQL, PostgreSQL, Oracle, SQL Server, and DB2 - By default, Sqoop will generate comma-delimited text files for imported data. - Mahout: A Scalable machine learning and data mining library. - Cassandra: A scalable multi-master database with no single points of failure. - Chukwa: A data collection system for managing large distributed systems. - HBase: A scalable, distributed database that supports structured data storage for large tables. - Mahout: A Scalable machine learning and data mining library. - Apache Ambari project is aimed at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs. - Solr is the popular, blazing fast open source enterprise search platform from the Apache Lucene project. Its major features include powerful full-text search, hit highlighting, faceted search, near real-time indexing, dynamic clustering, database integration, rich document (e.g., Word, PDF) handling, and geospatial search. - Oozie is a workflow scheduler system to manage Apache Hadoop jobs. Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.