Anatomy of a File Read - Hadoop

Anatomy of a File Read

To get an idea of how data flows between the client interacting with HDFS, the namenode and the datanodes, consider Figure, which shows the main sequence of events when reading a file.

Anatomy of a File Read

The client opens the file it wishes to read by calling open() on the FileSystem object, which for HDFS is an instance of distributedFileSystem (step 1 in Figure ). DistributedFileSystem calls the namenode, using RPC, to determine the locations ofthe blocks for the first few blocks in the file (step 2). For each block, the namenode returns the addresses of the datanodes that have a copy of that block. Furthermore, the datanodes are sorted according to their proximity to the client (according to the topologyof the cluster’s network; see “Network Topology and Hadoop” ). If the client is itself a datanode (in the case of a MapReduce task, for instance), then it will read from the local datanode, if it hosts a copy of the block.

The DistributedFileSystem returns an FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from. FSDataInputStream in turn wrapsatanode and namenode I/O.

The client then calls read() on the stream (step 3). DFSInputStream, which has stored the first block in the file. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream (step 4). When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block (step 5). This happens transparently to the client, which from its point of view is just reading a continuous stream. Blocks are read in order with the DFSInputStream opening new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed. When the client has finished reading, it calls close() on the FSDataInputStream (step 6).

During reading, if the DFSInputStream encounters an error while communicating with a datanode, then it will try the next closest one for that block. It will also rememberoesn’t needlessly retry them for later blocks. The DFSInputStream also verifies checksums for the data transferred to it from the datanode. If a corrupted block is found, it is reported to the namenode before the DFSInput Stream attempts to read a replica of the block from another datanode.

One important aspect of this design is that the client contacts datanodes directly to retrieve data and is guided by the namenode to the best datanode for each block. This design allows HDFS to scale to a large number of concurrent clients, since the data traffic is spread across all the datanodes in the cluster. The namenode meanwhile merely has to service block location requests (which it stores in memory, making them very efficient) and does not, for example, serve data, which would quickly become a bottleneck as the number of clients grew.

Network Topology and Hadoop

What does it mean for two nodes in a local network to be “close” to each other? In the context of high-volume data processing, the limiting factor is the rate at which we can transfer data between nodes bandwidth is a scarce commodity. The idea is to use thebandwidth between two nodes as a measure of distance

Rather than measuring bandwidth between nodes, which can be difficult to do in practice (it requires a quiet cluster, and the number of pairs of nodes in a cluster grows as the square of the number of nodes), Hadoop takes a simple approach in which the network is represented as a tree and the distance between two nodes is the sum of their distances to their closest common ancestor. Levels in the tree are not predefined, but it is common to have levels that correspond to the data center, the rack, and the node that a process is running on. The idea is that the bandwidth available for each of the following scenarios becomes progressively less:

  • Processes on the same node
  • Different nodes on the same rack
  • Nodes on different racks in the same data center
  • Nodes in different data centers†

For example, imagine a node n1 on rack r1 in data center d1. This can be represented as /d1/r1/n1. Using this notation, here are the distances for the four scenarios:

  • distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
  • distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
  • distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
  • distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)

At the time of this writing, Hadoop is not suited for running across data centers .

This is illustrated schematically in. (Mathematically inclined readers will notice that this is an example of a distance metric.) Finally, it is important to realize that Hadoop cannot divine your network topology for you. It needs some help; we’ll cover how to configure topology in “Network Topology” By default though, it assumes that the network is flat a single-levelhierarchy or in other words, that all nodes are on a single rack in a single data center. For small clusters, this may actually be the case, and no further configuration is required.

Network Topology and Hadoop

Anatomy of a File Write

Next we’ll look at how files are written to HDFS. Although quite detailed, it is instructive to understand the data flow since it clarifies HDFS’s coherency model.

case of creating a new file, writing data to it, then closing the file.

The client creates the file by calling create() on DistributedFileSystem (step 1 in Figure ). DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it (step 2). The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException. The DistributedFileSystem returns an FSDataOutputStream for the client to start writing data to. Just as in the read case, SDataOutputStream wraps a DFSOutput Stream, which handles communication with the datanodes and namenode.

As the client writes data (step 3), DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue. The data queue is consumed by the Datanode to allocate new blocks by picking a list of suitable datanodes to store the replicas. The list of datanodes forms a pipeline we’ll assume the replication level is three, so there are three nodes in the pipeline. The DataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline. Similarly, the second datanode stores the packet and forwards it to the third (and last) datanode in the pipeline (step 4).

A client writing data to HDFS

DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes, called the ack queue. A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline (step 5). If a datanode fails while data is being written to it, then the following actions are taken, which are transparent to the client writing the data. First the pipeline is closed, and any packets in the ack queue are added to the front of the data queue so that datanodes that are downstream from the failed node will not miss any packets. The current block on the good datanodes is given a new identity, which is communicated to the namenode, so that the partial block on the failed datanode will be deleted if the failed

datanode recovers later on. The failed datanode is removed from the pipeline and the remainder of the block’s data is written to the two good datanodes in the pipeline. The namenode notices that the block is under-replicated, and it arranges for a further replica to be created on another node. Subsequent blocks are then treated as normal.

It’s possible, but unlikely, that multiple datanodes fail while a block is being written. As long as dfs.replication.min replicas (default one) are written, the write will succeed, and the block will be asynchronously replicated across the cluster until its target replicationfactor is reached (dfs.replication, which defaults to three).

When the client has finished writing data, it calls close() on the stream (step 6). This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before contacting the namenode to signal that the file is complete (step 7). The namenode already knows which blocks the file is made up of (via Data Streamer asking for block allocations), so it only has to wait for blocks to be minimally replicated before returning successfully.

Replica Placement

How does the namenode choose which datanodes to store replicas on? There’s a tradeoff between reliability and write bandwidth and read bandwidth here. For example, placing all replicas on a single node incurs the lowest write bandwidth penalty since the replication pipeline runs on a single node, but this offers no real redundancy (if the node fails, the data for that block is lost). Also, the read bandwidth is high for off-rack reads. At the other extreme, placing replicas in different data centers may maximize redundancy, but at the cost of bandwidth. Even in the same data center (which is what all Hadoop clusters to date have run in), there are a variety of placement strategies. Indeed, Hadoop changed its placement strategy in release 0.17.0 to one that helps keep a fairly even distribution of blocks across the cluster. And from 0.21.0, block placement policies are pluggable.

Hadoop’s default strategy is to place the first replica on the same node as the client (for clients running outside the cluster, a node is chosen at random, although the system tries not to pick nodes that are too full or too busy). The second replica is placed on a different rack from the first (off-rack), chosen at random. The third replica is placed on the same rack as the second, but on a different node chosen at random. Further replicas are placed on random nodes on the cluster, although the system tries to avoid placingtoo many replicas on the same rack.

Once the replica locations have been chosen, a pipeline is built, taking network topology into account. For a replication factor of 3, the pipeline might look likeOverall, this strategy gives a good balance among reliability (blocks are stored on two racks), write bandwidth (writes only have to traverse a single network switch), read performance (there’s a choice of two racks to read from), and block distribution acrossthe cluster (clients only write a single block on the local rack).

A Typical replica of pipeline

coherency Model

A coherency model for a filesystem describes the data visibility of reads and writes for a file. HDFS trades off some POSIX requirements for performance, so some operations may behave differently than you expect them to.After creating a file, it is visible in the filesystem namespace, as expected:

However, any content written to the file is not guaranteed to be visible, even if the stream is flushed. So the file appears to have a length of zero:

Once more than a block’s worth of data has been written, the first block will be visible to new readers. This is true of subsequent blocks, too: it is always the current block being written that is not visible to other readers.HDFS provides a method for forcing all buffers to be synchronized to the datanodes via the sync() method on FSDataOutputStream. After a successful return from sync(), HDFS guarantees that the data written up to that point in the file is persisted and visible
to all new readers:

This behavior is similar to the fsync system call in POSIX that commits buffered data for a file descriptor. For example, using the standard Java API to write a local file, we are guaranteed to see the content after flushing the stream and synchronizing:

Consequences for application design

This coherency model has implications for the way you design applications. With no calls to sync(), you should be prepared to lose up to a block of data in the event of client or system failure. For many applications, this is unacceptable, so you should call sync() at suitable points, such as after writing a certain number of records or number of bytes. Though the sync() operation is designed to not unduly tax HDFS, it does have some overhead, so there is a trade-off between data robustness and throughput. What is an acceptable trade-off is application-dependent, and suitable values can be selected after measuring your application’s performance with different sync() frequencies.

Releases of Hadoop up to and including 0.20 do not have a working implementation of sync(); however, this has been remedied from 0.21.0 onward. Also, from that version, sync() is deprecated in favor of hflush(), which only guarantees that new readers will see all data written to that point, and hsync(), which makes a stronger guarantee that the operating system has flushed the data to disk (like POSIX fsync), although data may still be in the disk cache.


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics