Building Applications with ZooKeeper - Hadoop

Having covered ZooKeeper in some depth, let’s turn back to writing some useful applications with it.

A Configuration Service

One of the most basic services that a distributed application needs is a configuration service so that common pieces of configuration information can be shared by machines in a cluster. At the simplest level, ZooKeeper can act as a highly available store for configuration, allowing application participants to retrieve or update configuration files. Using ZooKeeper watches, it is possible to create an active configuration service,where interested clients are notified of changes in configuration.

Let’s write such a service. We make a couple of assumptions that simplify the implementation (they could be removed with a little more work). First, the only configuration values we need to store are strings, and keys are just znode paths, so we use a znode to store each key-value pair. Second, there is a single client that performs updates at any one time. Among other things, this model fits with the idea of a master (such as thenamenode in HDFS) that wishes to update information that its workers need to follow.

We wrap the code up in a class called ActiveKeyValueStore:

The contract of the write() method is that a key with the given value is written to ZooKeeper. It hides the difference between creating a new znode and updating an existing znode with a new value, by testing first for the znode using the exists operation and then performing the appropriate operation. The other detail worth mentioning is the need to convert the string value to a byte array, for which we just use the getBytes() method with a UTF-8 encoding.

To illustrate the use of the ActiveKeyValueStore, consider a ConfigUpdater class that updates a configuration property with a value. The listing appears in Example .

Example . An application that updates a property in ZooKeeper at random times

The program is simple. A ConfigUpdater has an ActiveKeyValueStore that connects to ZooKeeper in ConfigUpdater’s constructor. The run() method loops forever, updating the /config znode at random times with random values.

Next, let’s look at how to read the /config configuration property. First, we add a read method to ActiveKeyValueStore:

The getData() method of ZooKeeper takes the path, a Watcher, and a Stat object. The Stat object is filled in with values by getData(), and is used to pass information back to the caller. In this way, the caller can get both the data and the metadata for a znode, although in this case, we pass a null Stat because we are not interested in the metadata.

As a consumer of the service, ConfigWatcher (see Example ) creates an ActiveKey ValueStore, and after starting, calls the store’s read() method (in its displayConfig() method) to pass a reference to itself as the watcher. It displays the initial value of the configuration that it reads.

Example . An application that watches for updates of a property in ZooKeeper and prints them to the console

When the ConfigUpdater updates the znode, ZooKeeper causes the watcher to fire with an event type of EventType.NodeDataChanged. ConfigWatcher acts on this event in its process() method by reading and displaying the latest version of the config.

Because watches are one-time signals, we tell ZooKeeper of the new watch each time we call read() on ActiveKeyValueStore this ensures we see future updates. Furthermore, we are not guaranteed to receive every update, since between the receipt of the watch event and the next read, the znode may have been updated, possibly many times, and as the client has no watch registered during that period, it is not notified. For theconfiguration service, this is not a problem because clients care only about the latest value of a property, as it takes precedence over previous values, but in general you should be aware of this potential limitation.

Let’s see the code in action. Launch the ConfigUpdater in one terminal window:

Then launch the ConfigWatcher in another window immediately afterward:

The Resilient ZooKeeper Application

The first of the Fallacies of Distributed Computing† states that “The network is reliable.” As they stand, the programs so far have been assuming a reliable network, so when they run on a real network, they can fail in several ways. Let’s examine possible failure modes and what we can do to correct them so that our programs are resilient in the face of failure.


Every ZooKeeper operation in the Java API declares two types of exception in its throws clause: InterruptedException and KeeperException.


An InterruptedException is thrown if the operation is interrupted. There is a standard Java mechanism for canceling blocking methods, which is to call interrupt() on the thread from which the blocking method was called. A successful cancellation will result in an InterruptedException. ZooKeeper adheres to this standard, so you can cancel a ZooKeeper operation in this way. Classes or libraries that use ZooKeeper should usually propagate the InterruptedException so that their clients can cancel their operations.

An InterruptedException does not indicate a failure, but rather that the operation has been canceled, so in the configuration application example, it is appropriate to propagate the exception, causing the application to terminate.


A KeeperException is thrown if the ZooKeeper server signals an error or if there is a communication problem with the server. There are various subclasses of KeeperException for different error cases. For example, KeeperException.NoNodeExcep tion is a subclass of KeeperException that is thrown if you try to perform an operation on a znode that doesn’t exist.

Every subclass of KeeperException has a corresponding code with information about the type of error. For example, for KeeperException.NoNodeException the code is Keep erException.Code.NONODE (an enum value).

There are two ways then to handle KeeperException: either catch KeeperException and test its code to determine what remedying action to take, or catch the equivalent KeeperException subclasses and perform the appropriate action in each catch block.

KeeperExceptions fall into three broad categories.

A state exception occurs when the operation fails because it cannot be applied to the znode tree. State exceptions usually happen because another process is mutating a znode at the same time. For example, a setData operation with a version number will fail with a KeeperException.BadVersionException if the znode is updated by another process first, since the version number does not match. The programmer is
usually aware that this kind of conflict is possible and will code to deal with it.

Some state exceptions indicate an error in the program, such as KeeperExcep tion.NoChildrenForEphemeralsException, which is thrown when trying to create a child znode of an ephemeral znode.

For more detail, see the excellent article “Dealing with InterruptedException” by Brian Goetz.

Recoverable exceptions.Recoverable exceptions are those from which the application can recover within the same ZooKeeper session. A recoverable exception is manifested by KeeperException.ConnectionLossException, which means that the connection to ZooKeeper has been lost. ZooKeeper will try to reconnect, and in most cases the reconnection will succeed and ensure that the session is intact.

However, ZooKeeper cannot tell whether the operation that failed with KeeperExcep tion.ConnectionLossException was applied. This is an example of partial failure (which we introduced at the beginning of the chapter). The onus is therefore on the programmer to deal with the uncertainty, and the action that should be taken depends on the application.

At this point, it is useful to make a distinction between idempotent and nonidempotent operations. An idempotent operation is one that may be applied one or more times with the same result, such as a read request or an unconditional setData. These can simply be retried.

A nonidempotent operation cannot be indiscriminately retried, as the effect of applying it multiple times is not the same as applying it once. The program needs a way of detecting whether its update was applied by encoding information in the znode’s path name or its data. We shall discuss how to deal with failed nonidempotent operations in “Recoverable exceptions” , when we look at the implementation of a lock service.

Unrecoverable exceptions.In some cases, the ZooKeeper session becomes invalid perhaps because of a timeout or because the session was closed (both get a KeeperEx ception.SessionExpiredException), or perhaps because authentication failed (Keeper Exception.AuthFailedException). In any case, all ephemeral nodes associated with the session will be lost, so the application needs to rebuild its state before reconnecting to ZooKeeper.

A reliable configuration service

Going back to the write() method in ActiveKeyValueStore, recall that it is composed of an exists operation followed by either a create or a setData:

Taken as a whole, the write() method is idempotent, so we can afford to unconditionally retry it. Here’s a modified version of the write() method that retries in a loop.

It is set to try a maximum number of retries (MAX_RETRIES) and sleeps for RETRY_PERIOD_SECONDS between each attempt:

The code is careful not to retry KeeperException.SessionExpiredException, since when a session expires, the ZooKeeper object enters the CLOSED state, from which it can never reconnect (refer to Figure ). We simply rethrow the exception§ and let the caller create a new ZooKeeper instance, so that the whole write() method can be retried. A simple way to create a new instance is to create a new ConfigUpdater (which we’ve actually renamed ResilientConfigUpdater) to recover from an expired session:

Another way of writing the code would be to have a single catch block, just for KeeperException, and a test to see whether its code has the value KeeperException.Code.SESSIONEXPIRED. Which method you use is a matter of style, since they both behave in the same way.

An alternative way of dealing with session expiry would be to look for a KeeperState of type Expired in the watcher (that would be the ConnectionWatcher in the example here), and create a new connection when this is detected. This way, we would just keep retrying in the write() method, even if we got a KeeperException.SessionExpiredExcep tion, since the connection should eventually be reestablished. Regardless of the precisemechanics of how we recover from an expired session, the important point is that it is a different kind of failure from connection loss and needs to be handled differently.

There’s actually another failure mode that we’ve ignored here. When the ZooKeeper object is created, it tries to connect to a ZooKeeper server.

If the connection fails or times out, then it tries another server in the ensemble. If, after trying all of the servers in the ensemble, it can’t connect, then it throws an IOException. The likelihood of all ZooKeeperservers being unavailable is low; nevertheless, some applications may choose to retry the operation in a loop until ZooKeeper is available.

This is just one strategy for retry handling there are many others, such as using exponential backoff where the period between retries is multiplied by a constant each time. The package in Hadoop Core is a set of utilities for adding retry logic into your code in a reusable way, and it may be helpful for building ZooKeeper applications.

A Lock Service

A distributed lock is a mechanism for providing mutual exclusion between a collection of processes. At any one time, only a single process may hold the lock. Distributed locks can be used for leader election in a large distributed system, where the leader is the process that holds the lock at any point in time.

Do not confuse ZooKeeper’s own leader election with a general leader election service, which can be built using ZooKeeper primitives. Zoo- Keeper’s own leader election is not exposed publicly, unlike the type ofgeneral leader election service we are describing here, which is designed to be used by distributed systems that need to agree upon a master process.

To implement a distributed lock using ZooKeeper, we use sequential znodes to impose an order on the processes vying for the lock. The idea is simple: first designate a lock znode, typically describing the entity being locked on, say /leader; then clients that want to acquire the lock create sequential ephemeral znodes as children of the lock znode.

At any point in time, the client with the lowest sequence number holds the lock. For example, if two clients create znodes at around the same time, /leader/lock-1 and /leader/lock-2, then the client that created /leader/lock-1 holds the lock, since itsznode has the lowest sequence number. The ZooKeeper service is the arbiter of order, since it assigns the sequence numbers.

The lock may be released simply by deleting the znode /leader/lock-1; alternatively, if the client process dies, it will be deleted by virtue of it being an ephemeral znode. The client that created /leader/lock-2 will then hold the lock, since it has the next lowest sequence number. It will be notified that it has the lock by creating a watch that fires when znodes go away.

The pseudocode for lock acquisition is as follows:

  1. Create an ephemeral sequential znode named lock- under the lock znode and remember its actual path name (the return value of the create operation)
  2. .Get the children of the lock znode and set a watch.
  3. If the path name of the znode created in 1 has the lowest number of the children returned in 2, then the lock has been acquired. Exit.
  4. Wait for the notification from the watch set in 2 and go to step 2.

The herd effect

Although this algorithm is correct, there are some problems with it. The first problem is that this implementation suffers from the herd effect. Consider hundreds or thousands of clients, all trying to acquire the lock. Each client places a watch on the lock znode for changes in its set of children. Every time the lock is released, or another process starts the lock acquisition process, the watch fires and every client receives a notification.

The “herd effect” refers to a large number of clients being notified of the same event, when only a small number of them can actually proceed. In this case, only one client will successfully acquire the lock, and the process of maintaining and sending watch events to all clients causes traffic spikes, which put pressure on the ZooKeeper servers.

To avoid the herd effect, we need to refine the condition for notification. The key observation for implementing locks is that a client needs to be notified only when the child znode with the previous sequence number goes away, not when any child znode is deleted (or created). In our example, if clients have created the znodes /leader/ lock-1, /leader/lock-2, and /leader/lock-3, then the client holding /leader/lock-3 onlyneeds to be notified when /leader/lock-2 disappears. It does not need to be notified when /leader/lock-1 disappears or when a new znode /leader/lock-4 is added.

Recoverable exceptions

Another problem with the lock algorithm as it stands is that it doesn’t handle the case when the create operation fails due to connection loss. Recall that in this case we do not know if the operation succeeded or failed. Creating a sequential znode is a nonidempotent operation, so we can’t simply retry, since if the first create hadsucceeded, we would have an orphaned znode that would never be deleted (until the client session ended, at least). Deadlock would be the unfortunate result.

The problem is that after reconnecting, the client can’t tell whether it created any of the child znodes. By embedding an identifier in the znode name, if it suffers a connection loss, it can check to see whether any of the children of the lock node have its identifier in their name. If a child contains its identifier, it knows that the create operation succeeded, and it shouldn’t create another child znode. If no child has the identifier in itsname, then the client can safely create a new sequential child znode.

The client’s session identifier is a long integer that is unique for the ZooKeeper service and therefore ideal for the purpose of identifying a client across connection loss events. The session identifier can be obtained by calling the getSessionId() method on the ZooKeeper Java class.

The ephemeral sequential znode should be created with a name of the form lock-<sessionId>-, so that when the sequence number is appended by ZooKeeper, the name becomes lock-<sessionId>-<sequenceNumber>. The sequence numbers are unique to the parent, not to the name of the child, so this technique allows the child znodes to identify their creators as well as impose an order of creation.

Unrecoverable exceptions

If a client’s ZooKeeper session expires, the ephemeral znode created by the client will be deleted, effectively relinquishing the lock or at least forfeiting the client’s turn to acquire the lock. The application using the lock should realize that it no longer holds the lock, clean up its state, and then start again by creating a new lock object and trying to acquire it. Notice that it is the application that controls this process, not the lock implementation, since it cannot second-guess how the application needs to clean up its state.


Implementing a distributed lock correctly is a delicate matter, since accounting for all of the failure modes is nontrivial. ZooKeeper comes with a production-quality lock implementation in Java called WriteLock that is very easy for clients to use.

More Distributed Data Structures and Protocols

There are many distributed data structures and protocols that can be built with Zoo- Keeper, such as barriers, queues, and two-phase commit. One interesting thing to note is that these are synchronous protocols, even though we use asynchronous ZooKeeper primitives (such as notifications) to build them.

The ZooKeeper website describes several such data structures and protocols in pseudocode. ZooKeeper comes with implementations of some of these standard recipes; they can be found in the recipes directory of the distribution.


BookKeeper is a highly available and reliable logging service. It can be used to provide write-ahead logging, which is a common technique for ensuring data integrity in storage systems. In a system using write-ahead logging, every write operation is written to the transaction log before it is applied. Using this procedure, we don’t have to write the data to permanent storage after every write operation because in the event of a systemfailure, the latest state may be recovered by replaying the transaction log for any writes that had not been applied.

BookKeeper clients create logs called ledgers, and each record appended to a ledger is called a ledger entry, which is simply a byte array. Ledgers are managed by bookies, which are servers that replicate the ledger data. Note that ledger data is not stored in ZooKeeper, only metadata is.

Traditionally, the challenge has been to make systems that use write-ahead logging robust in the face of failure of the node writing the transaction log. This is usually done by replicating the transaction log in some manner. Hadoop’s HDFS namenode, for instance, writes its edit log to multiple disks, one of which is typically an NFS mounted disk. However, in the event of failure of the primary, failover is still manual. By providing logging as a highly available service, BookKeeper promises to make failover transparent, since it can tolerate the loss of bookie servers.

BookKeeper is provided in the contrib directory of the ZooKeeper distribution, where you can find more information on how to use it.

All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd Protection Status

Hadoop Topics