How does Joins performs - Hadoop

MapReduce can perform joins between large datasets, but writing the code to do joins from scratch is fairly involved. Rather than writing MapReduce programs, you might consider using a higher-level framework such as Pig, Hive, or Cascading, in which join operations are a core part of the implementation.

Let’s briefly consider the problem we are trying to solve. We have two datasets; for example, the weather stations database and the weather recordsand we want to reconcile the two. For example, we want to see each station’s history, with the station’s metadata inlined in each output row. This is illustrated in Figure .

How we implement the join depends on how large the datasets are and how they are partitioned. If one dataset is large (the weather records) but the other one is small enough to be distributed to each node in the cluster (as the station metadata is), then the join can be effected by a MapReduce job that brings the records for each station together (a partial sort on station ID, for example). The mapper or reducer uses thesmaller dataset to look up the station metadata for a station ID, so it can be written out with each record. See “Side Data Distribution” for a discussion of this approach, where we focus on the mechanics of distributing the data to tasktrackers.

If the join is performed by the mapper, it is called a map-side join, whereas if it is performed by the reducer it is called a reduce-side join.

If both datasets are too large for either to be copied to each node in the cluster, then we can still join them using MapReduce with a map-side or reduce-side join, depending on how the data is structured. One common example of this case is a user database and a log of some user activity (such as access logs). For a popular service, it is not feasible to distribute the user database (or the logs) to all the MapReduce nodes.

Map-Side Joins

A map-side join between large inputs works by performing the join before the data reaches the map function. For this to work, though, the inputs to each map must be partitioned and sorted in a particular way. Each input dataset must be divided into the same number of partitions, and it must be sorted by the same key (the join key) in each source. All the records for a particular key must reside in the same partition. This maysound like a strict requirement (and it is), but it actually fits the description of the output of a MapReduce job.

Map-Side Joins

A map-side join can be used to join the outputs of several jobs that had the same number of reducers, the same keys, and output files that are not splittable (by being smaller than an HDFS block, or by virtue of being gzip compressed, for example). In the context of the weather example, if we ran a partial sort on the stations file by station ID, and another, identical sort on the records, again by station ID, and with the same numberof reducers, then the two outputs would satisfy the conditions for running a map-side join.

Use a CompositeInputFormat from the org.apache.hadoop.mapred.join package to run a map-side join. The input sources and join type (inner or outer) for CompositeInput Format are configured through a join expression that is written according to a simple grammar. The package documentation has details and examples.

The org.apache.hadoop.examples.Join example is a general-purpose command-line program for running a map-side join, since it allows you to run a MapReduce job for any specified mapper and reducer over multiple inputs that are joined with a given join operation.

Reduce-Side Joins

A reduce-side join is more general than a map-side join, in that the input datasets don’t have to be structured in any particular way, but it is less efficient as both datasets have to go through the MapReduce shuffle. The basic idea is that the mapper tags each record with its source and uses the join key as the map output key, so that the records with the same key are brought together in the reducer. We use several ingredients to makethis work in practice:

Multiple inputs

The input sources for the datasets have different formats, in general, so it is very convenient to use the MultipleInputs class (see “Multiple Inputs” ) to separate the logic for parsing and tagging each source.

Secondary sort

As described, the reducer will see the records from both sources that have same key, but they are not guaranteed to be in any particular order. However, to perform the join, it is important to have the data from one source before another. For the weather data join, the station record must be the first of the values seen for each key, so the reducer can fill in the weather records with the station name and emit them straightaway. Of course, it would be possible to receive the records in any order if we buffered them in memory, but this should be avoided, since the number of records in any group may be very large and exceed the amount of emory availableto the reducer.*

We saw in “Secondary Sort” how to impose an order on the values for each key that the reducers see, so we use this technique here. To tag each record, we use TextPair from Chapter 4 for the keys, to store the station ID, and the tag. The only requirement for the tag values is that they sort in such a way that the station records come before the weather records. This can be achieved by tagging station records as 0 and weather records as 1. The mapper classes to do this are shown in below Examples

Example Mapper for tagging station records for a reduce-side join

The reducer knows that it will receive the station record first, so it extracts its name from the value and writes it out as a part of every output record .

Example. Reducer for joining tagged station records with tagged weather records

The code assumes that every station ID in the weather records has exactly one matching record in the station dataset. If this were not the case, we would need to generalize the code to put the tag into the value objects, by using another TextPair. The reduce() method would then be able to tell which entries were station names and detect (and handle) missing or duplicate entries, before processing the weather records.

Because objects in the reducer’s values iterator are re-used (for efficiency purposes), it is vital that the code makes a copy of the first Text object from the values iterator:

If the copy is not made, then the stationName reference will refer to the value just read when it is turned into a string, which is a bug.Tying the job together is the driver class, shown in Example The essential pointis that we partition and group on the first part of the key, the station ID, which we do with a custom Partitioner (KeyPartitioner) and a custom comparator, FirstCompara tor (from TextPair).

Example Application to join weather records with station names


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

Hadoop Topics