HDFS - Hadoop

Persistent Data Structures

As an administrator, it is invaluable to have a basic understanding of how the components of HDFS the namenode, the secondary namenode, and the datanodes organize their persistent data on disk. Knowing which files are which can help you diagnose problems or spot that something is awry.

Namenode directory structure

A newly formatted namenode creates the following directory structure:

Recall from Setting Up A Hadoop cluster Chapter that the dfs.name.dir property is a list of directories, with the same contents mirrored in each directory. This mechanism provides resilience, particularly if one of the directories is an NFS mount, as is recommended.

The VERSION file is a Java properties file that contains information about the version of HDFS that is running. Here are the contents of a typical file:

The layout Version is a negative integer that defines the version of HDFS’s persistent data structures. This version number has no relation to the release number of the Hadoop distribution. Whenever the layout changes, the version number is decremented (for example, the version after −18 is −19). When this happens, HDFS needs to be upgraded, since a newer namenode (or datanode) will not operate if its storage layoutis an older version. Upgrading HDFS is covered in “Upgrades” .

The namespaceID is a unique identifier for the filesystem, which is created when the filesystem is first formatted. The namenode uses it to identify new datanodes, since they will not know the namespaceID until they have registered with the namenode.

The cTime property marks the creation time of the namenode’s storage. For newly formatted storage, the value is always zero, but it is updated to a timestamp whenever the filesystem is upgraded. The storageType indicates that this storage directory contains data structures for a namenode.

The other files in the namenode’s storage directory are edits, fsimage, and fstime. These are all binary files, which use Hadoop Writable objects as their serialization format (see “Serialization” ). To understand what these files are for, we need to dig into the workings of the namenode a little more.

The filesystem image and edit log

When a filesystem client performs a write operation (such as creating or moving a file), it is first recorded in the edit log. The namenode also has an in-memory representation of the filesystem metadata, which it updates after the edit log has been modified. The in-memory metadata is used to serve read requests.

The edit log is flushed and synced after every write before a success code is returned to the client. For namenodes that write to multiple directories, the write must be flushed and synced to every copy before returning successfully. This ensures that no operation is lost due to machine failure.

The fsimage file is a persistent checkpoint of the filesystem metadata. However, it is not updated for every filesystem write operation, since writing out the fsimage file, which can grow to be gigabytes in size, would be very slow. This does not compromise resilience, however, because if the namenode fails, then the latest state of its metadata can be reconstructed by loading the fsimage from disk into memory, then applying eachof the operations in the edit log. In fact, this is precisely what the namenode does when it starts up (see “Safe Mode” ).

The fsimage file contains a serialized form of all the directory and file inodes in the filesystem. Each inode is an internal representation of a file or directory’s metadata and contains such information as the file’s replication level, modification and access times, access permissions, block size, and the blocks a file is made up of. For directories, the modification time, permissions, and quota metadata is stored.

The fsimage file does not record the datanodes on which the blocks are stored. Instead the namenode keeps this mapping in memory, which it constructs by asking the datanodes for their block lists when they jointhe cluster and periodically afterward to ensure the namenode’s block mapping is up-to-date.

As described, the edits file would grow without bound. Though this state of affairs would have no impact on the system while the namenode is running, if the namenode were restarted, it would take a long time to apply each of the operations in its (very long) edit log. During this time, the filesystem would be offline, which is generally undesirable.

The solution is to run the secondary namenode, whose purpose is to produce checkpoints of the primary’s in-memory filesystem metadata.* The checkpointing process proceeds as follows (and is shown schematically in Figure ):

  1. The secondary asks the primary to roll its edits file, so new edits go to a new file.
  2. The secondary retrieves fsimage and edits from the primary (using HTTP GET).
  3. The secondary loads fsimage into memory, applies each operation from edits, then creates a new consolidated fsimage file.
  4. The secondary sends the new fsimage back to the primary (using HTTP POST).
  5. The primary replaces the old fsimage with the new one from the secondary, and the old edits file with the new one it started in step 1. It also updates the fstime file to record the time that the checkpoint was taken.

At the end of the process, the primary has an up-to-date fsimage file and a shorter edits file (it is not necessarily empty, as it may have received some edits while the checkpoint was being taken). It is possible for an administrator to run this process manually while the namenode is in safe mode, using the hadoop dfsadmin -saveNamespace command.

This procedure makes it clear why the secondary has similar memory requirements to the primary (since it loads the fsimage into memory), which is the reason that the secondary needs a dedicated machine on large clusters.

* From Hadoop version 0.21.0 onward, the secondary namenode will be deprecated and replaced by a checkpoint node, which has the same functionality. At the same time, a new type of namenode, called a backupnode, will be introduced, whose purpose is to maintain an up-to-date copy of the namenode metadata and to act as a replacement for storing a copy of the metadata on NFS.

The filesystem image and edit log

The schedule for checkpointing is controlled by two configuration parameters. The secondary namenode checkpoints every hour (fs.checkpoint.period in seconds) or sooner if the edit log has reached 64 MB (fs.checkpoint.size in bytes), which it checks every five minutes.

Secondary namenode directory structure

A useful side effect of the checkpointing process is that the secondary has a checkpoint at the end of the process, which can be found in a subdirectory called previous.checkpoint.

This can be used as a source for making (stale) backups of the namenode’s metadata:

The layout of this directory and of the secondary’s current directory is identical to the namenode’s. This is by design, since in the event of total namenode failure (when there are no recoverable backups, even from NFS), it allows recovery from a secondary namenode. This can be achieved either by copying the relevant storage directory to a new namenode, or, if the secondary is taking over as the new primary namenode, byusing the -importCheckpoint option when starting the namenode daemon. The -importCheckpoint option will load the namenode metadata from the latest checkpoint in the directory defined by the fs.checkpoint.dir property, but only if there is no metadata in the dfs.name.dir directory, so there is no risk of overwriting precious metadata.

Datanode directory structure

Unlike namenodes, datanodes do not need to be explicitly formatted, since they create their storage directories automatically on startup. Here are the key files and directories:

A datanode’s VERSION file is very similar to the namenode’s:

The namespaceID, cTime, and layoutVersion are all the same as the values in the namenode (in fact, the namespaceID is retrieved from the namenode when the datanode first connects). The storageID is unique to the datanode (it is the same across all storage directories) and is used by the namenode to uniquely identify the datanode. The storageType identifies this directory as a datanode storage directory.

The other files in the datanode’s current storage directory are the files with the blk_ prefix. There are two types: the HDFS blocks themselves (which just consist of the file’s raw bytes) and the metadata for a block (with a .meta suffix). A block file just consists of the raw bytes of a portion of the file being stored; the metadata file is made up of a header with version and type information, followed by a series of checksums for sections of the block.

When the number of blocks in a directory grows to a certain size, the datanode creates a new subdirectory in which to place new blocks and their accompanying metadata. It creates a new subdirectory every time the number of blocks in a directory reaches 64 (set by the dfs.datanode.numblocks configuration property). The effect is to have a tree with high fan-out, so even for systems with a very large number of blocks, the directories will only be a few levels deep. By taking this measure, the datanode ensures that there is a manageable number of files per directory, which avoids the problems that most operating systems encounter when there are a large number of files (tens or hundreds of thousands) in a single directory.

If the configuration property dfs.data.dir specifies multiple directories (on different drives), blocks are written to each in a round-robin fashion. Note that blocks are not replicated on each drive on a single datanode: block replication is across distinct datanodes.

Safe Mode

When the namenode starts, the first thing it does is load its image file (fsimage) into memory and apply the edits from the edit log (edits). Once it has reconstructed a consistent in-memory image of the filesystem metadata, it creates a new fsimage file (effectively doing the checkpoint itself, without recourse to the secondary namenode) and an empty edit log. Only at this point does the namenode start listening for RPCand HTTP requests. However, the namenode is running in safe mode, which means that it offers only a read-only view of the filesystem to clients.

Strictly speaking, in safe mode, only filesystem operations that access the filesystem metadata (like producing a directory listing) are guaranteed to work. Reading a file will work only if the blocks are available onthe current set of datanodes in the cluster; and file modifications (writes, deletes, or renames) will always fail.

Recall that the locations of blocks in the system are not persisted by the namenode this information resides with the datanodes, in the form of a list of the blocks it is storing. During normal operation of the system, the namenode has a map of block locations stored in memory. Safe mode is needed to give the datanodes time to check in to the namenode with their block lists, so the namenode can be informed of enough block locations to run the filesystem effectively. If the namenode didn’t wait for enough datanodes to check in, then it would start the process of replicating blocks to newdatanodes, which would be unnecessary in most cases (since it only needed to wait for the extra datanodes to check in), and would put a great strain on the cluster’s resources.

Indeed, while in safe mode, the namenode does not issue any block replication or deletion instructions to datanodes.

Safe mode is exited when the minimal replication condition is reached, plus an extension time of 30 seconds. The minimal replication condition is when 99.9% of the blocks in the whole filesystem meet their minimum replication level (which defaults to one, and is set by dfs.replication.min, see Table).

When you are starting a newly formatted HDFS cluster, the namenode does not go into safe mode since there are no blocks in the system.

safemode properties

Entering and leaving safe mode

To see whether the namenode is in safe mode, you can use the dfsadmin command:

The front page of the HDFS web UI provides another indication of whether the namenode is in safe mode.

Sometimes you want to wait for the namenode to exit safe mode before carrying out a command, particularly in scripts. The wait option achieves this:

An administrator has the ability to make the namenode enter or leave safe mode at any time. It is sometimes necessary to do this when carrying out maintenance on the cluster or after upgrading a cluster to confirm that data is still readable. To enter safe mode, use the following command:

You can use this command when the namenode is still in safe mode while starting up to ensure that it never leaves safe mode. Another way of making sure that the namenode stays in safe mode indefinitely is to set the property dfs.safemode.threshold.pct to a value over one.

You can make the namenode leave safe mode by using:

Audit Logging

HDFS has the ability to log all filesystem access requests, a feature that some organizations require for auditing purposes. Audit logging is implemented using log4j logging at the INFO level, and in the default configuration it is disabled, as the log threshold is set to WARN in log4j.properties:

You can enable audit logging by replacing WARN with INFO, and the result will be a log line written to the namenode’s log for every HDFS event. Here’s an example for a list status request on /user/tom:

It is a good idea to configure log4j so that the audit log is written to a separate file and isn’t mixed up with the namenode’s other log entries. An example of how to do this can be found on the Hadoop wiki at http://wiki.apache.org/hadoop/HowToConfigure.



The dfsadmin tool is a multipurpose tool for finding information about the state of HDFS, as well as performing administration operations on HDFS. It is invoked as hadoop dfsadmin. Commands that alter HDFS state typically require superuser privileges.

The available commands to dfsadmin are described in Table

available commands to dfsadmin

Filesystem check (fsck)

Hadoop provides an fsck utility for checking the health of files in HDFS. The tool looks for blocks that are missing from all datanodes, as well as under- or over-replicated blocks. Here is an example of checking the whole filesystem for a small cluster:

fsck recursively walks the filesystem namespace, starting at the given path (here the filesystem root), and checks the files it finds. It prints a dot for every file it checks. To check a file, fsck retrieves the metadata for the file’s blocks and looks for problems or inconsistencies. Note that fsck retrieves all of its information from the namenode; it does not communicate with any datanodes to actually retrieve any block data.

Most of the output from fsck is self-explanatory, but here are some of the conditions it looks for:

These are blocks that exceed their target replication for the file they belong to. Over-replication is not normally a problem, and HDFS will automatically delete excess replicas.

Under-replicated blocks

These are blocks that do not meet their target replication for the file they belong to. HDFS will automatically create new replicas of under-replicated blocks until they meet the target replication. You can get information about the blocks being replicated (or waiting to be replicated) using hadoop dfsadmin -metasave.

Misreplicated blocks

These are blocks that do not satisfy the block replica placement policy (see “Replica Placement” . For example, for a replication level of three in a multirack cluster, if all three replicas of a block are on the same rack, then the block is misreplicated since the replicas should be spread across at least two racks for resilience.

A misreplicated block is not fixed automatically by HDFS (at the time of this writing). As a workaround, you can fix the problem manually by increasing the replication of the file the block belongs to (using hadoop fs -setrep), waiting until the block gets replicated, then decreasing the replication of the file back to its original value.

Corrupt blocks

These are blocks whose replicas are all corrupt. Blocks with at least one noncorrupt replica are not reported as corrupt; the namenode will replicate the noncorrupt replica until the target replication is met.

Missing replica

These are blocks with no replicas anywhere in the cluster.

Corrupt or missing blocks are the biggest cause for concern, as it means data has been lost. By default, fsck leaves files with corrupt or missing blocks, but you can tell it to perform one of the following actions on them:

  • Move the affected files to the /lost+found directory in HDFS, using the -move option. Files are broken into chains of contiguous blocks to aid any salvaging efforts you may attempt.
  • Delete the affected files, using the -delete option. Files cannot be recovered after being deleted.

The fsck tool provides an easy way to find out which blocks are in any particular file. For example:

Finding the blocks for a file. This says that the file /user/tom/part-00007 is made up of one block and shows the datanodes where the blocks are located. The fsck options used are as follows:

  • The -files option shows the line with the filename, size, number of blocks, and its health (whether there are any missing blocks).
  • The -blocks option shows information about each block in the file, one line per block.
  • The -racks option displays the rack location and the datanode addresses for each block.

Running hadoop fsck without any arguments displays full usage instructions

Datanode block scanner

Every datanode runs a block scanner, which periodically verifies all the blocks stored on the datanode. This allows bad blocks to be detected and fixed before they are read by clients. The DataBlockScanner maintains a list of blocks to verify and scans them one by one for checksum errors. The scanner employs a throttling mechanism to preserve disk bandwidth on the datanode.

Blocks are periodically verified every three weeks to guard against disk errors over time (this is controlled by the dfs.datanode.scan.period.hours property, which defaults to 504 hours). Corrupt blocks are reported to the namenode to be fixed.

You can get a block verification report for a datanode by visiting the datanode’s web interface at http://datanode:50075/blockScannerReport. Here’s an example of a report, which should be self-explanatory:

By specifying the listblocks parameter, http://datanode:50075/blockScannerReport ?listblocks, the report is preceded by a list of all the blocks on the datanode along with their latest verification status. Here is a nippet of the block list (lines are split to fit the page):

The first column is the block ID, followed by some key-value pairs. The status can be one of failed or ok according to whether the last scan of the block detected a checksum error. The type of scan is local if it was performed by the background thread, remote if it was performed by a client or a remote datanode, or none if a scan of this block has yet to be made. The last piece of information is the scan time, which is displayed as the number of milliseconds since midnight 1 January 1970, and also as a more readable value.


Over time, the distribution of blocks across datanodes can become unbalanced. An unbalanced cluster can affect locality for MapReduce, and it puts a greater strain on the highly utilized datanodes, so it’s best avoided.

The balancer program is a Hadoop daemon that re-distributes blocks by moving them from over-utilized datanodes to under-utilized datanodes, while adhering to the block replica placement policy that makes data loss unlikely by placing block replicas on different racks (see “Replica Placement”). It moves blocks until the cluster is deemed to be balanced, which means that the utilization of every datanode (ratio of used space on the node to total capacity of the node) differs from the utilization of the cluster (ratio of used space on the cluster to total capacity of the cluster) by no more than a given threshold percentage. You can start the balancer with:

% start-balancer.sh

The -threshold argument specifies the threshold percentage that defines what it means for the cluster to be balanced. The flag is optional, in which case the threshold is 10%. At any one time, only one balancer may be running on the cluster.

The balancer runs until the cluster is balanced; it cannot move any more blocks, or it loses contact with the namenode. It produces a logfile in the standard log directory, where it writes a line for every iteration of redistribution that it carries out. Here is the output from a short run on a small cluster:

The balancer is designed to run in the background without unduly taxing the cluster or interfering with other clients using the cluster. It limits the bandwidth that it uses to copy a block from one node to another. The default is a modest 1 MB/s, but this can be changed by setting the dfs.balance.bandwidthPerSec property in hdfs-site.xml, specified in bytes.

All rights reserved © 2020 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics