Blocking Queues Core Java

You have now seen the low-level building blocks that form the foundations of concurrent programming in Java. However, for practical programming, you want to stay away from the low-level constructs whenever possible. It is much easier and safer to use higher level structures that have been implemented by concurrency experts.

Many threading problems can be formulated elegantly and safely by using one or more queues. Producer threads insert items into the queue, and consumer threads retrieve them. The queue lets you safely hand over data from one thread to another. For example, consider our bank transfer program. Rather than accessing the bank object directly, the transfer threads insert transfer instruction objects into a queue. Another thread removes the instructions from the queue and carries out the transfers. Only that thread has access to the internals of the bank object. No synchronization is necessary. (Of course, the implementors of the threadsafe queue classes had to worry about locks and conditions, but that was their problem, not yours.)

A blocking queue causes a thread to block when you try to add an element when the queue is currently full or to remove an element when the queue is empty. Blocking queues are a useful tool for coordinating the work of multiple threads. Worker threads can periodically deposit intermediate results in a blocking queue. Other worker threads remove the intermediate results and modify them further. The queue automatically balances the workload. If the first set of threads runs slower than the second, the second set blocks while waiting for the results. If the first set of threads runs faster, the queue fills up until the second set catches up. Table below shows the methods for blocking queues.

The blocking queue methods fall into three categories, depending on their action when the queue is full or empty. If you use the queue as a thread management tool, you will want to use the put and take methods. The add, remove, and element operations throw an exception when you try to add to a full queue or get the head of an empty queue. Of course, in a multithreaded program, the queue might become full or empty at any time, so you will instead want to use the offer, poll, and peek methods. These methods simply return with a failure indicator instead of throwing an exception if they cannot carry out their tasks.

NOTE: The poll and peek methods return null to indicate failure. Therefore, it is illegal to insert null values into these queues.

There are also variants of the offer and poll methods with a timeout. For example, the call

tries for 100 milliseconds to insert an element to the tail of the queue. If it succeeds, it returns true; otherwise, it returns false when it times out. Similarly, the call

tries for 100 milliseconds to remove the head of the queue. If it succeeds, it returns the head; otherwise, it returns null when it times out. The put method blocks if the queue is full, and the take method blocks if the queue is empty. These are the equivalents of offer and poll with no timeout.

The java.util.concurrent package supplies several variations of blocking queues. By default, the LinkedBlockingQueue has no upper bound on its capacity, but a maximum capacity can be optionally specified. The LinkedBlockingDeque is a double-ended version. The ArrayBlockingQueue is constructed with a given capacity and an optional parameter to require fairness. If fairness is specified, then the longest-waiting threads are given preferential treatment. As always, fairness exacts a significant performance penalty, and you should only use it if your problem specifically requires it.

Blocking Queue Methods

Blocking Queue Methods

The PriorityBlockingQueue is a priority queue, not a first-in/first-out queue. Elements are removed in order of their priority. The queue has unbounded capacity, but retrieval will block if the queue is empty.

Finally, a DelayQueue contains objects that implement the Delayed interface:

The getDelay method returns the remaining delay of the object. A negative value indicates that the delay has elapsed. Elements can only be removed from a DelayQueue if their delay has elapsed. You also need to implement the compareTo method. The DelayQueue uses that method to sort the entries.

The program in Listing below shows how to use a blocking queue to control a set of threads. The program searches through all files in a directory and its subdirectories, printing lines that contain a given keyword.

A producer thread enumerates all files in all subdirectories and places them in a blocking queue. This operation is fast, and the queue would quickly fill up with all files in the file system if it was not bounded.

We also start a large number of search threads. Each search thread takes a file from the queue, opens it, prints all lines containing the keyword, and then takes the next file. We use a trick to terminate the application when no further work is required. In order to signal completion, the enumeration thread places a dummy object into the queue. (This is similar to a dummy suitcase with a label “last bag” in a baggage claim belt.) When a search thread takes the dummy, it puts it back and terminates.

Note that no explicit thread synchronization is required. In this application, we use the queue data structure as a synchronization mechanism.

java.util.concurrent.ArrayBlockingQueue<E>

  • ArrayBlockingQueue(int capacity)
  • ArrayBlockingQueue(int capacity, boolean fair)
    constructs a blocking queue with the given capacity and fairness settings. Thequeue is implemented as a circular array.

java.util.concurrent.LinkedBlockingDeque<E>

  • LinkedBlockingQueue()
  • LinkedBlockingDeque()
    constructs an unbounded blocking queue or deque, implemented as a linked list.
  • LinkedBlockingQueue(int capacity)
  • LinkedBlockingDeque(int capacity)
    constructs a bounded blocking queue or deque with the given capacity, implemented as a linked list.

java.util.concurrent.DelayQueue<E extends Delayed>

  • DelayQueue()
    constructs an unbounded bounded blocking queue of Delayed elements. Only elements whose delay has expired can be removed from the queue.

java.util.concurrent.Delayed

  • long getDelay(TimeUnit unit)
    gets the delay for this object, measured in the given time unit.

java.util.concurrent.PriorityBlockingQueue<E>

  • PriorityBlockingQueue()
  • PriorityBlockingQueue(int initialCapacity)
  • PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
    constructs an unbounded blocking priority queue implemented as a heap.

java.util.concurrent.BlockingQueue<E>

  • void put(E element)
    adds the element, blocking if necessary.
  • E take()
    removes and returns the head element, blocking if necessary.
  • boolean offer(E element, long time, TimeUnit unit)
    adds the given element and returns true if successful, blocking if necessary until the element has been added or the time has elapsed.
  • E poll(long time, TimeUnit unit)
    removes and returns the head element, blocking if necessary until an element is available or the time has elapsed. Returns null upon failure.

java.util.concurrent.BlockingDeque<E>

  • void putFirst(E element)
  • void putLast(E element)
    adds the element, blocking if necessary.
  • E takeFirst()
  • E takeLast()
    removes and returns the head or tail element, blocking if necessary.
  • boolean offerFirst(E element, long time, TimeUnit unit)
  • boolean offerLast(E element, long time, TimeUnit unit)
    adds the given element and returns true if successful, blocking if necessary until the element has been added or the time has elapsed.
  • E pollFirst(long time, TimeUnit unit)
  • E pollLast(long time, TimeUnit unit)
    removes and returns the head or tail element, blocking if necessary until an element is available or the time has elapsed. Returns null upon failure.


Face Book Twitter Google Plus Instagram Youtube Linkedin Myspace Pinterest Soundcloud Wikipedia

All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Core Java Topics