Hadoop’s mapreduce on YARN: What the framework does before your Mapper/Reducer methods are called

0. Motivation

In several situations it is important to have a deeper understanding of the framework to write mapreduce programs that are more complex than the typical WordCount examples.

In all publications and books that I saw those details are not explained, so that I had to evaluate the details by myself. I used hadoop 2.3.0 source code for my analysis.

My goal was to get answers to the following questions:

  1. Where does the serialization takes place?
  2. Where are the key/value pairs generated from the HDFS file data?
  3. Which class makes the loop over all key/value pairs?
  4. How is Avro serialization detected/choosen and why are the special Avro map reduce classes necessary?

1. Method call stack

To get started with the analysis, I wrote an example MapReduce application and I included the following dummy Exception to get the stack trace:

The resulting stack trace delivers to following informations:

Noclass namemethod namesource code row
1org.apache.hadoop.mapred.YarnChildmain163
2org.apache.hadoop.security.UserGroupInformationdoAs1548
3javax.security.auth.SubjectdoAs415
4java.security.AccessControllerdoPrivileged-
5org.apache.hadoop.mapred.YarnChild$2run168
6org.apache.hadoop.mapred.MapTask run340
7org.apache.hadoop.mapred.MapTask runNewMapper764
8org.apache.hadoop.mapreduce.Mapperrun145
9MyMappermap...

2. Remarks to the different steps

2.1 – 2.5 Dispatching Job context

In steps 1 – 5 the job context is dispatched with some more general initializations steps that are not so relavant for my actual interests of this blog.

2.6 and 2.7 runNewMapper

Steps 6 is just calling 7 using the umbilical that is connecting the application master of the job with the worker nodes that execute the tasks (map and reduce).

First the mapper and input format classes are created using the informations from the job/task context.

Next the RecordReader is created using the now available input format instance. NewTrackingRecordReader is a inner class defined in MapTask. The constructor creates a instance of a mapreduce.RecordReader using

This RecordReader is used in the to get key/value pairs. In my example the RecordReader was the one from the SequenceFile:

As we can see, the method nextKeyValue internally calls the corresponding methods of SequenceFile.Reader which is associated with the SequenceFileInputFormat I used in my Job definition.

The keyDeserializer is created in the init method of SequenceFile.Reader using the SerializationFactory. How this works will be explained soon.

The serializer and deserializer is retrieved by the SerializationFactory which makes use of the Serializations registered in the JobContext / Configuration. First the constructur fills a List with all registered Serializations. The serialization to be used to serialize and deserialize a instance of a class is determined in the method getSerialization which loops over all serializations and checks if the accept method indicates that the serialization can be used for the given class.

 2.8 Mapper delegation to custom map method

The custom Mapper extends the framework Mapper. The framework calls a default identity implementation of the map method which is typically overwritten by the custom Mapper implementation.

2.9 Custom Mapper implementation

In this way the run method delegates the request to the cutom map implementation.

3. Answers to the questions

3.1 Where does the serialization takes place?

The configured InputFormat creates a RecordReader/Reader which determines the serialization which is usable for the configured key / value class. In my example I used a SequenceFile as input, where the key and value classes of the input are read from the SequenceFile header block (before real data are loaded).

From the RecordReader perspective deserialized key or value instances are loaded from the HDFS File (Split).

3.2 Where are the key/value pairs generated from the HDFS file data?

The inner class Reader of SequenceFile calls the methods of the Serialization implementation to deserialize the data and delivers the object instances to the calling instances.

3.3 Which class makes the loop over all key/value pairs?

The run method of class org.apache.hadoop.mapreduce.Mapper loop as long as context.nextKeyValue() is true. With the retrieved key and values the Mapper calls the custom implementation of the map method.

3.4 How is Avro serialization detected/choosen and why are the special Avro map reduce classes necessary?

The class SerializationFactory checks which Serializer accepts the requested class. If the Avro Serialization accepts this class (if the class is a Avro class), this Serialization is used for serializing and deserializing the data.

The second part of this question is not answered by my analysis (yet)….. to be continued

 

Avro reuse of custum defined types

It took me some time to find out that avro allows us to reuse custum types so that we get can build complex types that are composed of other complex types that we defined by ower own.

Look at the following example schema definition:

This kind of schema definitions is allowed if there is another schema definition file which defines the type org.woopi.avro.CompoundSubTypeExtended.

I used the following example for my tests:

If you compile the files with the Avro schema compiler that is included in the avro tools jar file avro-tools-1.7.6.jar that can be downloaded from Avro java jar mirror,it is important that the compiler gets all schema files at once so that it has a possibility to get informations from one type to be used in another one.

It is possible to use the java command with multiple single schema files and directories as input:

Or for ANT fans: