Saturday 8 June 2013

Hadoop Streaming in Python : Nuances & Sample

Hadoop streaming allows us to write mappers and reducers in languages like python, ruby and even C. Using python, the amount of code involved in writing a mapper and reducer is less compared to Java. Also, the streaming has some nuances to watch for.

1) Streaming works like Unix command line pipes. i.e data is written and read in text. Apart form the first input and last output, standard input and standard output are used. i.e your mapper in python does a print on its standard output and the reducer gets it on its standard input.

2) The mapper and reducer work on splits. Splits are the chunk of data that a mapper is given. It need not be the same as the HDFS chunk size. From observation it is seen that, this value changes as the program proceeds.

3) Streaming works on lines of input rather than (key, value) pairs. Although the notion of (key,value ) pairs can be used in streaming too. If you don't specify the (key, value) pairs and also the separator, defaults will be used. These are tab for the separator and the whole input line for key and an empty value. What this means to us developers is that, if you don't use/impose the notion of key value pairs, you do the sorting within a group in your reducer code. i.e when the whole line is used as key, your reducer may not necessarily receive the shuffled & sorted input it expects. When you write the mappers and reducers in Java your code actually receives the key value pair. Here you won't. So watch out for the sorting.

For this use the following parameters when running your streaming program.
-D stream.map.output.field.separator=, means "," is the separator used. Default is tab.
-D stream.num.map.output.key.fields=4 means all of the output line until the 4th separator is used as key and the rest as value.

4) Re-order the map outputs with the key parameter above to ensure grouping of input to reducer (as you want it to be).

5) Within your python code add #!/usr/bin/python at the very beginning to tell the program which interpreter to use for the mapper code and reducer code.

6) Check the mapper and reducer using the command line and sample data. If it works there, it should work on Hadoop. For example, this checks the working of a mapper and reducer written in python
cat ./datagen/small-earthquake.data | ./stream/mapper.py | sort | ./stream/reducer.py


Sample scenario: A 2GB file of earthquake information is available. Here, we write a mapper to filter the input and a reducer to do a count on the data it receives. File includes the following columns in order year, city name, quake magnitude, latitude, longitude. The mapper filters the input between years 1960 and 2000. The reducer counts the number of quakes for each city in that data.


The mapper  and reducer code in python are shown below

Notice that, the mapper rearranges the order of columns to <city, year> and tab is used as the separator since this is the default one. Plus, since city is the first column in map output, map output will be grouped on the basis of the first column i.e up until the first delimiter tab. Refer the nuances above.

Run the streaming command on Hadoop as follows:


hadoop jar ~/Development/hadoop-1.0.3/contrib/streaming/hadoop-streaming-1.0.3.jar \
-input /hadoop/streampython/large-earthquake.data \
-output  /hadoop/streampython/filtered \
-file ./stream/mapper.py \
-mapper 'mapper.py' \
-file ./stream/reducer.py  \
-reducer 'reducer.py'


Sample code: The git, eclipse project for this is available at this link. It also has a data gen script that, can generate 2GB plus test earthquake data.

https://drive.google.com/folderview?id=0B-oeIeog2xb7RWlpRDBROEdFT2c&usp=sharing

Screens:
a) Input files:

b) The SPLIT_RAW_BYTES for the map task.

c) Job Status with two reducers

Some Errors to watch out for:
1) You see the following on the job tracker -> Task tracker for you maps. Your maps seem to have been killed more times than allowed.


java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
 at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:576)
 at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
 at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
 at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
 at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
 at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:416)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
 at org.apache.hadoop.mapred.Child.main(Child.java:249)


For myself, this was due to the fact that, I had not added  #!/usr/bin/python to my python files. Once added, there were no issues. There was no need to change permissions on any files.

2) Basic Hadoop Nuances:

a) Add the following property to your core-site.xml. This tells hadoop which location to use for its tmp folder. A location that does not get modified over re-boots.

<property>
<name>hadoop.tmp.dir</name>
<value>/home/harisankar/Hadoop/tmp</value>
</property>

b) Add the following two properties to your hdfs-site.xml. These tell hadoop where to store its name node details and also the data chunks. You can see the data chunks for your files here.

        <property>
  <name>dfs.name.dir</name>
  <value>/home/harisankar/Hadoop/name</value>
</property>

<property>
  <name>dfs.data.dir</name>
  <value>/home/harisankar/Hadoop/data</value>
</property>

Otherwise your datanode and namenode playup after a re-boot. You may see exceptions like, could replicate to only 0 nodes and Data node's Connection to namenode timedout after n tries. in the log files.

No comments: