There are a handful of files for controlling the configuration of a Hadoop installation; the most important ones are listed in Table

These files are all found in the conf directory of the Hadoop distribution. The configuration directory can be relocated to another part of the filesystem (outside the Hadoop installation, which makes upgrades marginally easier) as long as daemons are started with the --config option specifying the location of this directory on the local filesystem.

Configuration Management

Hadoop does not have a single, global location for configuration information. Instead, each Hadoop node in the cluster has its own set of configuration files, and it is up to administrators to ensure that they are kept in sync across the system. Hadoop provides a rudimentary facility for synchronizing configuration using rsync (see upcoming discussion); alternatively, there are parallel shell tools that can help do this, like dsh or pdsh.

Hadoop is designed so that it is possible to have a single set of configuration files that are used for all master and worker machines. The great advantage of this is simplicity, both conceptually (since there is only one configuration to deal with) and operationally (as the Hadoop scripts are sufficient to manage a single configuration setup).

For some clusters, the one-size-fits-all configuration model breaks down. For example, if you expand the cluster with new machines that have a different hardware specification to the existing ones, then you need a different configuration for the new machines to take advantage of their extra resources.

In these cases, you need to have the concept of a class of machine, and maintain a separate configuration for each class. Hadoop doesn’t provide tools to do this, but there are several excellent tools for doing precisely this type of configuration management, such as Chef, Puppet, cfengine, and bcfg2.

For a cluster of any size, it can be a challenge to keep all of the machines in sync: consider what happens if the machine is unavailable when you push out an update who ensures it gets the update when it becomes available? This is a big problem and can lead to divergent installations, so even if you use the Hadoop control scripts for managing Hadoop, it may be a good idea to use configuration management tools for maintainingthe cluster. These tools are also excellent for doing regular maintenance, such as patching security holes and updating system packages.

Control scripts

Hadoop comes with scripts for running commands, and starting and stopping daemons across the whole cluster. To use these scripts (which can be found in the bin directory), you need to tell Hadoop which machines are in the cluster. There are two files for this purpose, called masters and slaves, each of which contains a list of the machine hostnames or IP addresses, one per line. The masters file is actually a misleading name, inthat it determines which machine or machines should run a secondary namenode. The slaves file lists the machines that the datanodes and tasktrackers should run on. Both masters and slaves files reside in the configuration directory, although the slaves file may be placed elsewhere (and given another name) by changing the HADOOP_SLAVES setting in hadoop-env.sh. Also, these files do not need to be distributed to worker nodes, since they are used only by the control scripts running on the namenode or jobtracker.

You don’t need to specify which machine (or machines) the namenode and jobtracker runs on in the masters file, as this is determined by the machine the scripts are run on. (In fact, specifying these in the masters file would cause a secondary namenode to run there, which isn’t always what you want.) For example, the start-dfs.sh script, which starts all the HDFS daemons in the cluster, runs the namenode on the machine thescript is run on. In slightly more detail, it:

1. Starts a namenode on the local machine (the machine that the script is run on)
2. Starts a datanode on each machine listed in the slaves file
3. Starts a secondary namenode on each machine listed in the masters file

There is a similar script called start-mapred.sh, which starts all the MapReduce daemons in the cluster. More specifically, it:

1. Starts a jobtracker on the local machine
2. Starts a tasktracker on each machine listed in the slaves file

Note that masters is not used by the MapReduce control scripts.

Also provided are stop-dfs.sh and stop-mapred.sh scripts to stop the daemons started by the corresponding start script. These scripts start and stop Hadoop daemons using the hadoop-daemon.sh script. Ifyou use the aforementioned scripts, you shouldn’t call hadoop-daemon.sh directly. But if you need to control Hadoop daemons from another system or from your own scripts, then the hadoop-daemon.sh script is a good integration point. Likewise, hadoopdaemons. sh (with an “s”) is handy for starting the same daemon on a set of hosts.

Master node scenarios

Depending on the size of the cluster, there are various configurations for running the master daemons: the namenode, secondary namenode, and jobtracker. On a small cluster (a few tens of nodes), it is convenient to put them on a single machine; however, as the cluster gets larger, there are good reasons to separate them.

The namenode has high memory requirements, as it holds file and block metadata for the entire namespace in memory. The secondary namenode, while idle most of the time, has a comparable memory footprint to the primary when it creates a checkpoint. (This is explained in detail in “The filesystem image and edit log” .) For filesystems with a large number of files, there may not be enough physical memory on onemachine to run both the primary and secondary namenode.

The secondary namenode keeps a copy of the latest checkpoint of the filesystem metadata that it creates. Keeping this (stale) backup on a different node to the namenode allows recovery in the event of loss (or corruption) of all the namenode’s metadata files.(This is discussed further in Chapter Administrating Hadoop)

On a busy cluster running lots of MapReduce jobs, the jobtracker uses considerable memory and CPU resources, so it should run on a dedicated node.

Whether the master daemons run on one or more nodes, the following instructions apply:

• Run the HDFS control scripts from the namenode machine. The masters file should contain the address of the secondary namenode.
• Run the MapReduce control scripts from the jobtracker machine.

When the namenode and jobtracker are on separate nodes, their slaves files need to be kept in sync, since each node in the cluster should run a datanode and a tasktracker.

Environment Settings

In this section, we consider how to set the variables in hadoop-env.sh.

Memory

By default, Hadoop allocates 1000 MB (1 GB) of memory to each daemon it runs. This is controlled by the HADOOP_HEAPSIZE setting in hadoop-env.sh. In addition, the task tracker launches separate child JVMs to run map and reduce tasks in, so we need to factor these into the total memory footprint of a worker machine.

The maximum number of map tasks that will be run on a tasktracker at one time is controlled by the mapred.tasktracker.map.tasks.maximum property, which defaults to two tasks. There is a corresponding property for reduce tasks, mapred.task tracker.reduce.tasks.maximum, which also defaults to two tasks. The memory given to each of these child JVMs can be changed by setting the mapred.child.java.opts property.

The default setting is -Xmx200m, which gives each task 200 MB of memory. (Incidentally, you can provide extra JVM options here, too. For example, you might enable verbose GC logging to debug GC.) The default configuration therefore uses 2,800 MB of memory for a worker machine (see Table).

The number of tasks that can be run simultaneously on a tasktracker is governed by the number of processors available on the machine. Because MapReduce jobs are normally I/O-bound, it makes sense to have more tasks than processors to get better utilization. The amount of oversubscription depends on the CPU utilization of jobs you run, but a good rule of thumb is to have a factor of between one and two more tasks (counting both map and reduce tasks) than processors.

For example, if you had 8 processors and you wanted to run 2 processes on each processor, then you could set each of mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum to 7 (not 8, since the datanode and the tasktracker each take one slot). If you also increased the memory available to each child task to 400 MB, then the total memory usage would be 7,600 MB (see Table ).

Whether this Java memory allocation will fit into 8 GB of physical memory depends on the other processes that are running on the machine. If you are running Streaming or Pipes programs, this allocation will probably be inappropriate (and the memory allocated to the child should be dialed down), since it doesn’t allow enough memory for users’ (Streaming or Pipes) processes to run. The thing to avoid is processes beingswapped out, as this it leads to severe performance degradation. The precise memory settings are necessarily very cluster-dependent and can be optimized over time with experience gained from monitoring the memory usage across the cluster. Tools like Ganglia (“GangliaContext” on page 308) are good for gathering this information.

Hadoop also provides settings to control how much memory is used for MapReduce operations. These can be set on a per-job basis and are covered in the section on “Shuffle and Sort” .

For the master node, each of the namenode, secondary namenode, and jobtracker daemons uses 1,000 MB by default, a total of 3,000 MB.

A namenode can eat up memory, since a reference to every block of every file is maintained in memory. For example, 1,000 MB is enough for a few million files. You can increase the namenode’s memory withoutchanging the memory allocated to other Hadoop daemons by setting HADOOP_NAMENODE_OPTS in hadoop-env.sh to include a JVM option for setting the memory size. HADOOP_NAMENODE_OPTS allows you to pass extra options to the namenode’s JVM. So, for example, if using a Sun JVM, -Xmx2000m would specify that 2000 MB of memory should be allocated to the namenode.

If you change the namenode’s memory allocation, don’t forget to do the same for the secondary namenode (using the HADOOP_SECONDARYNAME NODE_OPTS variable), since its memory requirements are comparable to the primary namenode’s. You will probably also want to run the secondary namenode on a different machine, in this case.

There are corresponding environment variables for the other Hadoop daemons, so you can customize their memory allocations, if desired. See hadoop-env.sh for details.

Java

The location of the Java implementation to use is determined by the JAVA_HOME setting in hadoop-env.sh or from the JAVA_HOME shell environment variable, if not set in hadoopenv. sh. It’s a good idea to set the value in hadoop-env.sh, so that it is clearly defined in one place and to ensure that the whole cluster is using the same version of Java.

System logfiles

System logfiles produced by Hadoop are stored in $HADOOP_INSTALL/logs by default. This can be changed using the HADOOP_LOG_DIR setting in hadoop-env.sh. It’s a good idea to change this so that logfiles are kept out of the directory that Hadoop is installed in, since this keeps logfiles in one place even after the installation directory changes after an upgrade. A common choice is /var/log/hadoop, set by including the following line in hadoop-env.sh: The log directory will be created if it doesn’t already exist (if not, confirm that the Hadoop user has permission to create it). Each Hadoop daemon running on a machine produces two logfiles. The first is the log output written via log4j. This file, which ends in .log, should be the first port of call when diagnosing problems, since most application log messages are written here. The standard Hadoop log4j configuration uses a DailyRolling File Appender to rotate logfiles. Old logfiles are never deleted, so you should arrange for them to be periodically deleted or archived, so as to not run out of disk space on the local node. The second logfile is the combined standard output and standard error log. This logfile, which ends in .out, usually contains little or no output, since Hadoop uses log4j for logging. It is only rotated when the daemon is restarted, and only the last five logs are retained. Old logfiles are suffixed with a number between 1 and 5, with 5 being the oldest file. Logfile names (of both types) are a combination of the name of the user running the daemon, the daemon name, and the machine hostname. For example, hadoop-tomdatanode- sturges.local.log.2008-07-04 is the name of a logfile after it has been rotated. This naming structure makes it possible to archive logs from all machines in the cluster in a single directory, if needed, since the filenames are unique. The username in the logfile name is actually the default for the HADOOP_IDENT_STRING setting in hadoop-env.sh. If you wish to give the Hadoop instance a different identity for the purposes of naming the logfiles, change HADOOP_IDENT_STRING to be the identifier you want. SSH settings The control scripts allow you to run commands on (remote) worker nodes from the master node using SSH. It can be useful to customize the SSH settings, for various reasons. For example, you may want to reduce the connection timeout (using the ConnectTimeout option) so the control scripts don’t hang around waiting to see whether a dead node is going to respond. Obviously, this can be taken too far. If the timeout istoo low, then busy nodes will be skipped, which is bad. Another useful SSH setting is StrictHostKeyChecking, which can be set to no to automatically add new host keys to the known hosts files. The default, ask, is to prompt the user to confirm they have verified the key fingerprint, which is not a suitable setting in a large cluster environment. To pass extra options to SSH, define the HADOOP_SSH_OPTS environment variable in hadoop-env.sh. See the ssh and ssh_config manual pages for more SSH settings. The Hadoop control scripts can distribute configuration files to all nodes of the cluster using rsync. This is not enabled by default, but by defining the HADOOP_MASTER setting in hadoop-env.sh, worker daemons will rsync the tree rooted at HADOOP_MASTER to the local node’s HADOOP_INSTALL whenever the daemon starts up. What if you have two masters a namenode and a jobtracker on separate machines? You can pick one as the source and the other can rsync from it, along with all the workers. In fact, you could use any machine, even one outside the Hadoop cluster, to rsync from. Because HADOOP_MASTER is unset by default, there is a bootstrapping problem: how do we make sure hadoop-env.sh with HADOOP_MASTER set is present on worker nodes? For small clusters, it is easy to write a small script to copy hadoop-env.sh from the master to all of the worker nodes. For larger clusters, tools like dsh can do the copies in parallel. Alternatively, a suitable hadoop-env.sh can be created as a part of the automated installation script (such as Kickstart). When starting a large cluster with rsyncing enabled, the worker nodes can overwhelm the master node with rsync requests since the workers start at around the same time. To avoid this, set the HADOOP_SLAVE_SLEEP setting to a small number of seconds, such as 0.1, for one-tenth of a second. When running commands on all nodes of the cluster, the master will sleep for this period between invoking the command on each worker machine in turn. For more discussion on the security implications of SSH Host Keys, consult the article “SSH Host Key Protection” by Brian Hatch at http://www.securityfocus.com/infocus/1806. Important Hadoop Daemon Properties Hadoop has a bewildering number of configuration properties. In this section, we address the ones that you need to define (or at least understand why the default is appropriate) for any real-world working cluster. These properties are set in the Hadoop site files: core-site.xml, hdfs-site.xml, and mapred-site.xml. Example shows a typical example set of files. Notice that most are marked as final, in order to prevent them frombeing overridden by job configurations. You can learn more about how to write Hadoop’s configuration files in “The Configuration API” . Example . A typical set of site configuration files HDFS To run HDFS, you need to designate one machine as a namenode. In this case, the property fs.default.name is an HDFS filesystem URI, whose host is the namenode’s hostname or IP address, and port is the port that the namenode will listen on for RPCs.If no port is specified, the default of 8020 is used. The masters file that is used by the control scripts is not used by the HDFS (or MapReduce) daemons to determine hostnames. In fact, because the masters file is only used by the scripts, you can ignore it if you don’t use them. The fs.default.name property also doubles as specifying the default filesystem. The default filesystem is used to resolve relative paths, which are handy to use since they save typing (and avoid hardcoding knowledge of a particular namenode’s address). For example, with the default filesystem defined in Example, the relative URI /a/b is resolved to hdfs://namenode/a/b. If you are running HDFS, the fact that fs.default.name is used to specify both the HDFS namenode and the default filesystem means HDFS has to be the default filesystem in the server configuration. Bear in mind,however, that it is possible to specify a different filesystem as the default in the client configuration, for convenience. For example, if you use both HDFS and S3 filesystems, then you have a choice of specifying either as the default in the client configuration, which allows you to refer to the default with a relative URI and the otherwith an absolute URI. There are a few other configuration properties you should set for HDFS: those that set the storage directories for the namenode and for datanodes. The property dfs.name.dir specifies a list of directories where the namenode stores persistent filesystem metadata (the edit log and the filesystem image). A copy of each of the metadata files is stored in each directory for redundancy. It’s common to configure dfs.name.dir so that the namenode metadata is written to one or two local disks, and a remote disk, such as an NFS-mounted directory. Such a setup guards against failure of a local disk and failure of the entire namenode, since in both cases the files can be recovered and used to start a new namenode. (The secondary namenode takes only periodic checkpoints of the namenode, so it does not provide an up-to-date backup of the namenode.) You should also set the dfs.data.dir property, which specifies a list of directories for a datanode to store its blocks. Unlike the namenode, which uses multiple directories for redundancy, a datanode round-robins writes between its storage directories, so for performance you should specify a storage directory for each local disk. Read performance also benefits from having multiple disks for storage, because blocks will be spreadacross them, and concurrent reads for distinct blocks will be correspondingly spread across disks. For maximum performance, you should mount storage disks with the noatime option. This setting means that last accessed time information is not written on file reads, which gives significant performance gains. Finally, you should configure where the secondary namenode stores its checkpoints of the filesystem. The fs.checkpoint.dir property specifies a list of directories where the checkpoints are kept. Like the storage directories for the namenode, which keep redundant copies of the namenode metadata, the checkpointed filesystem image is stored in each checkpoint directory for redundancy. Table summarizes the important configuration properties for HDFS. Note that the storage directories for HDFS are under Hadoop’s temporary directory by default (the hadoop.tmp.dir property, whose default is /tmp/hadoop-${user.name}). Therefore, it is critical that these propertiesare set so that data is not lost by the system clearing out temporary directories.

MapReduce

To run MapReduce, you need to designate one machine as a jobtracker, which on small clusters may be the same machine as the namenode. To do this, set the mapred.job.tracker property to the hostname or IP address and port that the jobtracker will listen on. Note that this property is not a URI, but a host-port pair, separated by a colon. The port number 8021 is a common choice.

During a MapReduce job, intermediate data and working files are written to temporary local files. Since this data includes the potentially very large output of map tasks, you need to ensure that the mapred.local.dir property, which controls the location of local temporary storage, is configured to use disk partitions that are large enough. The mapred.local.dir property takes a comma-separated list of directory names, and youshould use all available local disks to spread disk I/O. Typically, you will use the same disks and partitions (but different directories) for MapReduce temporary data as you use for datanode block storage, as governed by the dfs.data.dir property, discussed earlier.

MapReduce uses a distributed filesystem to share files (such as the job JAR file) with the tasktrackers that run the MapReduce tasks. The mapred.system.dir property is used to specify a directory where these files can be stored. This directory is resolved relative to the default filesystem (configured in fs.default.name), which is usually HDFS.

Finally, you should set the mapred.tasktracker.map.tasks.maximum and mapred.task tracker.reduce.tasks.maximum properties to reflect the number of available cores on the tasktracker machines and mapred.child.java.opts to reflect the amount of memory available for the tasktracker child JVMs. See the discussion in “Memory”.

Table summarizes the important configuration properties for HDFS.

Hadoop daemons generally run both an RPC server (Table) for communication between daemons and an HTTP server to provide web pages for human consumption (Table ). Each server is configured by setting the network address and port number to listen on. By specifying the network address as 0.0.0.0, Hadoop will bind to all addresses on the machine.Alternatively, you can specify a single address to bind to. Aport number of 0 instructs the server to start on a free port: this is generally discouraged, since it is incompatible with setting cluster-wide firewall policies

In addition to an RPC server, datanodes run a TCP/IP server for block transfers. The server address and port is set by the dfs.datanode.address property, and has a default value of 0.0.0.0:50010.

There are also settings for controlling which network interfaces the datanodes and tasktrackers report as their IP addresses (for HTTP and RPC servers). The relevant properties are dfs.datanode.dns.interface and mapred.tasktracker.dns.interface, both of which are set to default, which will use the default network interface. You can set this explicitly to report the address of a particular interface (eth0, for example).

This section discusses some other properties that you might consider setting.

Cluster membership

To aid the addition and removal of nodes in the future, you can specify a file containing a list of authorized machines that may join the cluster as datanodes or tasktrackers.

The file is specified using the dfs.hosts (for datanodes) and mapred.hosts (for tasktrackers) properties, as well as the corresponding dfs.hosts.exclude and mapred.hosts.exclude files used for decommissioning. See “Commissioning and Decommissioning Nodes” for further discussion.

Buffer size

Hadoop uses a buffer size of 4 KB (4,096 bytes) for its I/O operations. This is a conservative setting, and with modern hardware and operating systems, you will likely see performance benefits by increasing it; 64 KB (65,536 bytes) or 128 KB (131,072 bytes) are common choices. Set this using the io.file.buffer.size property in core-site.xml.

HDFS block size

The HDFS block size is 64 MB by default, but many clusters use 128 MB (134,217,728 bytes) or even 256 MB (268,435,456 bytes) to ease memory pressure on the namenode and to give mappers more data to work on. Set this using the dfs.block.size property in hdfs-site.xml.

Reserved storage space

By default, datanodes will try to use all of the space available in their storage directories. If you want to reserve some space on the storage volumes for non-HDFS use, then you can set dfs.datanode.du.reserved to the amount, in bytes, of space to reserve.

Trash

Hadoop filesystems have a trash facility, in which deleted files are not actually deleted, but rather are moved to a trash folder, where they remain for a minimum period before being permanently deleted by the system. The minimum period in minutes that a file will remain in the trash is set using the fs.trash.interval configuration property in core-site.xml. By default, the trash interval is zero, which disables trash.

Like in many operating systems, Hadoop’s trash facility is a user-level feature, meaning that only files that are deleted using the filesystem shell are put in the trash. Files deleted programmatically are deleted immediately. It is possible to use the trash programmatically, however, by constructing a Trash instance, then calling its moveToTrash() method with the Path of the file intended for deletion. The method returns a value indicating success; a value of false means either that trash is not enabled or that the file is already in the trash.

When trash is enabled, each user has her own trash directory called .Trash in her home directory. File recovery is simple: you look for the file in a subdirectory of .Trash and move it out of the trash subtree.

HDFS will automatically delete files in trash folders, but other filesystems will not, so you have to arrange for this to be done periodically. You can expunge the trash, which will delete files that have been in the trash longer than their minimum period, using the filesystem shell:

The Trash class exposes an expunge() method that has the same effect.

On a shared cluster, it shouldn’t be possible for one user’s errant MapReduce program to bring down nodes in the cluster. This can happen if the map or reduce task has a memory leak, for example, because the machine on which the tasktracker is running will run out of memory and may affect the other running processes. To prevent this situation, you can set mapred.child.ulimit, which sets a maximum limit on the virtualmemory of the child process launched by the tasktracker. It is set in kilobytes, and should be comfortably larger than the memory of the JVM set by mapred.child.java.opts; otherwise, the child JVM might not start.

As an alternative, you can use limits.conf to set process limits at the operating system level.

Job scheduler

Particularly in a multiuser MapReduce setting, consider changing the default FIFO job scheduler to one of the more fully featured alternatives. See “Job Scheduling”

User Account Creation

Once you have a Hadoop cluster up and running, you need to give users access to it. This involves creating a home directory for each user and setting ownership permissions on it:

This is a good time to set space limits on the directory. The following sets a 1 TB limit on the given user directory: