Sunday, April 24, 2011

Using Dumbo to connect Cascading and Python

This post describes how I use Dumbo to allow me to combine code written in python with Cascading. I want to use Cascading to express complicated work flows which can then be scheduled as a series of map-reduce jobs. Dumbo 0.21 added support for expressing jobs as DAGs; which partially addresses one of my main reasons for using Cascading. Cascading, however, also supports operations like merging tuple streams which is a big requirement for the types of work flows I create. Finally, I want to use Python because I think its an ideal, high-level, language for rapidly prototyping and implementing the operations that I use in my work flows.

My starting point was Nate Murray's post which showed how to integrate a Hadoop Streaming Job into Cascading. The limits of this approach are
  1. Data is passed in and out of the python code as as text
  2. You have to parse the fields out of the lines; i.e the data is no longer represented as tuples.

My solution has the following elements:
  1.  We create a customized cascading tap which will encode/decode tuples using Hadoop Typed Bytes 
  2.  We "wrap" our python code using Dumbo so that we can let Dumbo handle converting the typedbytes to native python types.
  3. We setup a cascading streaming job which uses Dumbo to run our Map-Reduce job.

Creating A Cascading Tap For TypedBytes

Update 04-26-2011. There's a problem with  my original code. The source part of the tap doesn't properly set the field names based on the names in the dictionary.

Below is my code for creating a custom tap to read/write typed bytes data.


The tuples are encoded as field name, value pairs. This is space inefficient because the field names are the same for each tuple. Alternatively, we could just encode the values and then use some mechansim external to the sequence file to store the mapping between fields and tuple poisition . I found this latter approach to be to error prone when working on the python side. Its far easier to encode the field names so that in python you get a dictionary when you decode the typed bytes.


The code is based on the TextLine tap in cascading which I modified to suit my needs. I only really needed to modify the source and sink functions.




import java.beans.ConstructorProperties;
import java.io.IOException;

import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.Tuples;
import cascading.scheme.Scheme;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.typedbytes.*;

/**
 * A TypedBytesMapScheme is a type of {@link Scheme}, which uses
 * sequence files to encode the tuples. 
 * The tuples are encoded as JavaMap objects and then serialized using hadoop typed bytes.
 * We use Map objects so that we know which field corresponds to each position. 
 * We waste space by encoding the field names with each tuple, but when we decode
 * the tuples in python we get a dictionary so we know what each field is.
 */
public class TypedBytesMapScheme extends Scheme
{
 /** Field serialVersionUID */
 private static final long serialVersionUID = 1L;

 /** Protected for use by TempDfs and other subclasses. Not for general consumption. */
 protected TypedBytesMapScheme()
 {
  super( null );
 }

 /**
  * Creates a new SequenceFile instance that stores the given field names.
  *
  * @param fields
  */
 @ConstructorProperties({"fields"})
 public TypedBytesMapScheme( Fields fields )
 {
  super( fields, fields );
 }

 @Override
 public void sourceInit( Tap tap, JobConf conf )
 {
  conf.setInputFormat( SequenceFileInputFormat.class );
 }

 @Override
 public void sinkInit( Tap tap, JobConf conf )
 {
  conf.setOutputKeyClass( TypedBytesWritable.class ); // supports TapCollector
  conf.setOutputValueClass( TypedBytesWritable.class ); // supports TapCollector
  conf.setOutputFormat( SequenceFileOutputFormat.class );
 }

 @Override
 public Tuple source( Object key, Object value ) 
 {

  //cast value to a typedbytes writable objects
  TypedBytesWritable bytes = (TypedBytesWritable)value;

  //It should be a Map
  java.util.Map <String, Object>  items=(java.util.Map <String, Object >) bytes.getValue();
  
  //create a new tuple
  Tuple data=new Tuple();
  
  Fields sfields=this.getSourceFields();
  
  java.util.Iterator it=sfields.iterator();
  
  for (int i=0;i<sfields.size();i++){
   //get the field at position i
   Comparable f=sfields.get(i);
   
   //Will it be a string or a io.Text?
   if (f instanceof org.apache.hadoop.io.Text){
    org.apache.hadoop.io.Text name = (org.apache.hadoop.io.Text)f;
    data.add(items.get(name.toString()));   
   }
   if (f instanceof String ){
    String name = (String)f;
    data.add(items.get(name));   
   }
   else{
    //Not sure what to do. Throwing an exception appears to cause a problem
    System.out.println("Error TypedBytesMapScheme line 114: field name wasn't a string or text not sure what to do. Not sure which item to get. Will just get this position argument");
    data.add(items.get(i));
    //throw new Exception("Field isn't named by a string. We can't handle this currently");
   }
  }
  

  return data;
 }

 @Override
 public void sink( TupleEntry tupleEntry, OutputCollector outputCollector ) throws IOException
 {
  Tuple result = getSinkFields() != null ? tupleEntry.selectTuple( getSinkFields() ) : tupleEntry.getTuple();

  java.util.Map <String,Object> objs=new java.util.HashMap<String,Object>();  

  //loop over all the enteries in the tuple
  
  Object item;
  
  //tupleentry.getFields() returns the fields actually in the tuple
  // getSinkFields() is the field selector in the tap
  // we apply this selector to the actual fields 
  Fields sfields=tupleEntry.getFields().select(getSinkFields());
  
  if (sfields == null){
   System.out.println("\t TypedBytesMapScheme: sinkfields is Null");
  }
  if (sfields.isAll()){
   //System.out.println("\t TypedBytesMapScheme: sfields is ALL");
   //Since we want to select all fields use fields in the tuple entry
   
  }
  System.out.println("\t TypedBytesMapScheme: sfields.size="+sfields.size());
  for (int i=0;i < sfields.size();i++){
   
   //get the name of this field
   String name = sfields.get(i).toString();
   
   item = result.get(i); 
   //check if Item implements the writable inteface
   //Do we need to do this , why can't we just get the item
   // and set it as the value
   if (item instanceof org.apache.hadoop.io.Text){
    item=item.toString();
   }
   else if(item instanceof org.apache.hadoop.io.LongWritable){
    //assume its one of the objects line LongWritable that implements a get function to get the value
    item=((org.apache.hadoop.io.LongWritable)item).get();
   }
   else if(item instanceof org.apache.hadoop.io.IntWritable){
    //assume its one of the objects line LongWritable that implements a get function to get the value
    item=((org.apache.hadoop.io.IntWritable)item).get();
   }
   else if(item instanceof org.apache.hadoop.io.DoubleWritable){
    //assume its one of the objects line LongWritable that implements a get function to get the value
    item=((org.apache.hadoop.io.DoubleWritable)item).get();
   }

   
   //item.getClass().
   objs.put(name, item);
  }
  


  TypedBytesWritable value = new TypedBytesWritable();
  TypedBytesWritable key = new TypedBytesWritable();
  key.setValue(0);
  value.setValue(objs);
  outputCollector.collect(key,value );
 }
}

Setting up a Cascading Flow which uses Dumbo


Below is the java code for setting up the MapReduce flow which uses Dumbo to call the python/ mapper reducer.


//Path to the python executable
String pyexe="/usr/local/python2.6/bin/python ";
 
//path to the python script to execute  
String pyscript="Utilities/dpf/cascading/run_each_pipe.py";

//Paths of the input/output files for our job
//These files will be written/read by other flows in our cascade
//The should contain tuples encoded using our TypedBytesMapScheme.
String pymrin="somepath/in";
String pymrout="somepath/out";

//create a list of directories we need on our python path
//common.pyc is part of the dumbo/backends and typedbytes should be in our
//python path so we probably don't need the next line.
String pypath="common.pyc:typedbytes-0.3.6-py2.6.egg";


//How much memory to allow the Mapper/Reducer
//in bytes. I had to increase this beyond the default
//otherwise my python scripts ran into memory errors loading c-extensions
String memlimit="1256000000";

//set the home directory and python egg cache
//I use the LinuxTaskController so that MR jobs
//run as the user who submitted them so we can 
//access files in our home directory
//We still need to set the home directory otherwise it defaults
//to the home directory for the map reduce user.
String homedir=System.getProperty("user.home");
//set the python egg cache to the users home directory
String pyeggcache=System.getProperty("user.home")+"/.python-eggs";

//We can create the job configuration using the same paramters
//we would pass on the command line
//we specify the reducer as Identity to make it mapper only?
JobConf streamConf = StreamJob.createJob( new String[]{
  "-input", pymrin, 
  "-output", pymrout,  

  // The first argument is the iteration number which should just be 0
  // The second number determines how much memory the process can use
  // its equivalent to using the -memlimit command with dumbo
  "-mapper", pyexe+pyscript+ "  map 0 "+memlimit,
  //don't specify the reducer to make it map only?
  "-reducer", "/usr/local/python2.6/bin/python "+pyscript +" red  0 "+memlimit,
  //Environment variables used by dumbo
  "-cmdenv","dumbo_mrbase_class=dumbo.backends.common.MapRedBase",
  "-cmdenv","dumbo_jk_class=dumbo.backends.common.JoinKey",
  "-cmdenv","dumbo_runinfo_class=dumbo.backends.streaming.StreamingRunInfo",
  "-cmdenv","PYTHONPATH="+pypath,
  "-cmdenv","PYTHON_EGG_CACHE="+pyeggcache,
  //set the home directory to the user submitting the job because
  //otherwise it appears to be set to the mapred user which causes problems
  "-cmdenv","HOME="+homedir,     
  "-inputformat","org.apache.hadoop.streaming.AutoInputFormat",
  //use a sequence file for the output format,
  "-outputformat","org.apache.hadoop.mapred.SequenceFileOutputFormat",
  "-jobconf","stream.map.input=typedbytes",
  "-jobconf","stream.reduce.input=typedbytes", 
  "-jobconf","stream.map.output=typedbytes", 
  "-jobconf","stream.reduce.output=typedbytes",
  "-jobconf","mapred.job.name="+pipe_func,
  //Increase the memory for the virtual machines so we don't run out of memory loading the dll
  //"-jobconf","mapred.child.java.opts=-Xmx1000m"    

});
boolean deleteSinkOnInit=true;
mrflow = new MapReduceFlow("streaming flow", streamConf, deleteSinkOnInit);

Python Job

On the python side, you create Mappers and Reducers as you would for a regular dumbo job

class Mapper(Base):
 """
 This is the base mapper which pushes data through a pipe 
 """
 
 def __call__(self,key,value):
  """Push the values through the operator

  Key - shouldn't contain any useful information
  value - Should be a dictionary representing the tuple 
  """

  #..your code..
  yield out_key,out
   
   
class Reducer(Base):
 """
 The reducer
 """
 
 def __call__(self,key,valgen):
  """Push the values through the operator
  
  Parameters
  ------------------------------------------------------------------------------------------------
  valgen - Generator to  iterate over the values
  """

  #.. your code here  
  yield key,out

def run():
 """We add this run function so that other functions can actually invoke it."""

 import dumbo
 job=dumbo.Job()
 job.additer(Mapper,Reducer)
 job.run()

if __name__ == "__main__":
        run()

Thursday, April 14, 2011

Problems setting up LinuxTaskController with Hadoop (cloudera release 3)

This post describes some of the problems I ran into trying to setup the LinuxTaskController  using Cloudera (CDH3u0). I wanted to setup the LinuxTaskController so that map reduce jobs would run as the user who submitted them.

I started by following the Instructions for setting up security in CDH3  (Note. I skipped all the steps except installing the secure packages and setting up the secure mapreduce).

Most of the problems I had were because of spacing issues in the taskcontroller. cfg file.
  1. I needed to add at least 1 newline after the final line of the taskcontroller.cfg  (which for me sets the value of mapred.tasktracker.group). (This was a known  bug in CDH3B4 but since its no longer described in the latest docs, I assume its supposed to be fixed. Its possible it was fixed and there was a problem with my upgrade from CDH3B4 to CDH3u0)
  2. Extra spaces at the end of lines.
    • I had extra spaces at the end of the line "mapred.local.dir=/somedir"
    • This caused the task controller to fail to start any task attempts
    • I discovered this by looking at my task controller log file where I saw an exception:
      • Failed to create directory "/somedir /tasktracker/jlewilocal"  (notice the space)
 

Saturday, April 9, 2011

Debugging Hadoop Streaming using Eclipse

This post describes how I setup Eclipse so I could debug what was happening when I tried to run a Hadoop Streaming Job with the "-conf" option.

It is based on:
  1. Cloudera's screencast on setting up eclipse 
  2. Hadoop Wiki on Setting Up Eclipse

I'm not a java or Eclipse expert so if you see a way to do something better please let me know.

I'm using the Helios release of Eclipse and the Clourdera 3 Beta 4 release for Hadoop.

Setting Up Eclipse
  1.  Download a tarball containing the cloudera source form Cloudera 3 archive 
  2. Unpack the tarball
  3. In Eclipse create a new project
  4. In the "Java Settings" dialog set the default output folder to something other than bin
    "build/eclipse-classes"
      We do this because the "bin" is used to contain hadoop shell scripts
  5. Close the project in eclipse
  6. Copy the contents of the unpacked cloudera tarball to the directory for your new eclipse project e.g
    mv some-dir/hadoop-0.20.2-CDH3B4/*  ~/workspace/your-project/
    
  7. Open the project and refresh it (right click the project and hit F5
  8. At this point in Eclipse package explorer you should see src listed under your project
      Unfortunately, the src isn't set up properly given the package names
  9. Setup the src folders in Eclipse 
    1. Right click on properties for your project
    2.  select java build path->source tab
    3. Remove the entry project/src
    4. Click add folder and add the following entries
      1. src/core
      2. src/contrib/streaming/src/java
      3. src/mapred
      4. Set the output folder to project/eclipse-build
    1. Click on Libraries and add all the jars in
      • project/lib
  10. Click on the menu project and uncheck build automatically
  11. Create a new ant builder
    1. Right click on project properties
    2. Select builders
    3. Click new and select Ant Builder
    4. For the buildfile select browse workspace and then select build.xml in your project
    5. click on targets and set after a "clean"
      1.  compile-core-classes, compile-core
      2. For manual build set the targets to compile, compile-core-classes, compile-mapred-clases, compile-contrib
  12. You should now be able to build it by clicking project-> build project
  13. To debug a file e.g "HadoopStreaming.java" just right click and select debug as "java application"
    • If you get a warning about errors in project try clicking on proceed
Final Notes
  • The wiki describes an ant task which automatically sets up Eclipse. Unfortunately the eclipse templates don't appear to be included in the cloudera CDH3B4 release. Although you could try downloading them from the Apache repository.
  • On another system I had some trouble getting Eclipse to stop at breakpoints. This seems to be java+Eclipse issue.