Sunday, April 24, 2011

Using Dumbo to connect Cascading and Python

This post describes how I use Dumbo to allow me to combine code written in python with Cascading. I want to use Cascading to express complicated work flows which can then be scheduled as a series of map-reduce jobs. Dumbo 0.21 added support for expressing jobs as DAGs; which partially addresses one of my main reasons for using Cascading. Cascading, however, also supports operations like merging tuple streams which is a big requirement for the types of work flows I create. Finally, I want to use Python because I think its an ideal, high-level, language for rapidly prototyping and implementing the operations that I use in my work flows.

My starting point was Nate Murray's post which showed how to integrate a Hadoop Streaming Job into Cascading. The limits of this approach are
  1. Data is passed in and out of the python code as as text
  2. You have to parse the fields out of the lines; i.e the data is no longer represented as tuples.

My solution has the following elements:
  1.  We create a customized cascading tap which will encode/decode tuples using Hadoop Typed Bytes 
  2.  We "wrap" our python code using Dumbo so that we can let Dumbo handle converting the typedbytes to native python types.
  3. We setup a cascading streaming job which uses Dumbo to run our Map-Reduce job.

Creating A Cascading Tap For TypedBytes

Update 04-26-2011. There's a problem with  my original code. The source part of the tap doesn't properly set the field names based on the names in the dictionary.

Below is my code for creating a custom tap to read/write typed bytes data.


The tuples are encoded as field name, value pairs. This is space inefficient because the field names are the same for each tuple. Alternatively, we could just encode the values and then use some mechansim external to the sequence file to store the mapping between fields and tuple poisition . I found this latter approach to be to error prone when working on the python side. Its far easier to encode the field names so that in python you get a dictionary when you decode the typed bytes.


The code is based on the TextLine tap in cascading which I modified to suit my needs. I only really needed to modify the source and sink functions.




import java.beans.ConstructorProperties;
import java.io.IOException;

import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.Tuples;
import cascading.scheme.Scheme;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.typedbytes.*;

/**
 * A TypedBytesMapScheme is a type of {@link Scheme}, which uses
 * sequence files to encode the tuples. 
 * The tuples are encoded as JavaMap objects and then serialized using hadoop typed bytes.
 * We use Map objects so that we know which field corresponds to each position. 
 * We waste space by encoding the field names with each tuple, but when we decode
 * the tuples in python we get a dictionary so we know what each field is.
 */
public class TypedBytesMapScheme extends Scheme
{
 /** Field serialVersionUID */
 private static final long serialVersionUID = 1L;

 /** Protected for use by TempDfs and other subclasses. Not for general consumption. */
 protected TypedBytesMapScheme()
 {
  super( null );
 }

 /**
  * Creates a new SequenceFile instance that stores the given field names.
  *
  * @param fields
  */
 @ConstructorProperties({"fields"})
 public TypedBytesMapScheme( Fields fields )
 {
  super( fields, fields );
 }

 @Override
 public void sourceInit( Tap tap, JobConf conf )
 {
  conf.setInputFormat( SequenceFileInputFormat.class );
 }

 @Override
 public void sinkInit( Tap tap, JobConf conf )
 {
  conf.setOutputKeyClass( TypedBytesWritable.class ); // supports TapCollector
  conf.setOutputValueClass( TypedBytesWritable.class ); // supports TapCollector
  conf.setOutputFormat( SequenceFileOutputFormat.class );
 }

 @Override
 public Tuple source( Object key, Object value ) 
 {

  //cast value to a typedbytes writable objects
  TypedBytesWritable bytes = (TypedBytesWritable)value;

  //It should be a Map
  java.util.Map <String, Object>  items=(java.util.Map <String, Object >) bytes.getValue();
  
  //create a new tuple
  Tuple data=new Tuple();
  
  Fields sfields=this.getSourceFields();
  
  java.util.Iterator it=sfields.iterator();
  
  for (int i=0;i<sfields.size();i++){
   //get the field at position i
   Comparable f=sfields.get(i);
   
   //Will it be a string or a io.Text?
   if (f instanceof org.apache.hadoop.io.Text){
    org.apache.hadoop.io.Text name = (org.apache.hadoop.io.Text)f;
    data.add(items.get(name.toString()));   
   }
   if (f instanceof String ){
    String name = (String)f;
    data.add(items.get(name));   
   }
   else{
    //Not sure what to do. Throwing an exception appears to cause a problem
    System.out.println("Error TypedBytesMapScheme line 114: field name wasn't a string or text not sure what to do. Not sure which item to get. Will just get this position argument");
    data.add(items.get(i));
    //throw new Exception("Field isn't named by a string. We can't handle this currently");
   }
  }
  

  return data;
 }

 @Override
 public void sink( TupleEntry tupleEntry, OutputCollector outputCollector ) throws IOException
 {
  Tuple result = getSinkFields() != null ? tupleEntry.selectTuple( getSinkFields() ) : tupleEntry.getTuple();

  java.util.Map <String,Object> objs=new java.util.HashMap<String,Object>();  

  //loop over all the enteries in the tuple
  
  Object item;
  
  //tupleentry.getFields() returns the fields actually in the tuple
  // getSinkFields() is the field selector in the tap
  // we apply this selector to the actual fields 
  Fields sfields=tupleEntry.getFields().select(getSinkFields());
  
  if (sfields == null){
   System.out.println("\t TypedBytesMapScheme: sinkfields is Null");
  }
  if (sfields.isAll()){
   //System.out.println("\t TypedBytesMapScheme: sfields is ALL");
   //Since we want to select all fields use fields in the tuple entry
   
  }
  System.out.println("\t TypedBytesMapScheme: sfields.size="+sfields.size());
  for (int i=0;i < sfields.size();i++){
   
   //get the name of this field
   String name = sfields.get(i).toString();
   
   item = result.get(i); 
   //check if Item implements the writable inteface
   //Do we need to do this , why can't we just get the item
   // and set it as the value
   if (item instanceof org.apache.hadoop.io.Text){
    item=item.toString();
   }
   else if(item instanceof org.apache.hadoop.io.LongWritable){
    //assume its one of the objects line LongWritable that implements a get function to get the value
    item=((org.apache.hadoop.io.LongWritable)item).get();
   }
   else if(item instanceof org.apache.hadoop.io.IntWritable){
    //assume its one of the objects line LongWritable that implements a get function to get the value
    item=((org.apache.hadoop.io.IntWritable)item).get();
   }
   else if(item instanceof org.apache.hadoop.io.DoubleWritable){
    //assume its one of the objects line LongWritable that implements a get function to get the value
    item=((org.apache.hadoop.io.DoubleWritable)item).get();
   }

   
   //item.getClass().
   objs.put(name, item);
  }
  


  TypedBytesWritable value = new TypedBytesWritable();
  TypedBytesWritable key = new TypedBytesWritable();
  key.setValue(0);
  value.setValue(objs);
  outputCollector.collect(key,value );
 }
}

Setting up a Cascading Flow which uses Dumbo


Below is the java code for setting up the MapReduce flow which uses Dumbo to call the python/ mapper reducer.


//Path to the python executable
String pyexe="/usr/local/python2.6/bin/python ";
 
//path to the python script to execute  
String pyscript="Utilities/dpf/cascading/run_each_pipe.py";

//Paths of the input/output files for our job
//These files will be written/read by other flows in our cascade
//The should contain tuples encoded using our TypedBytesMapScheme.
String pymrin="somepath/in";
String pymrout="somepath/out";

//create a list of directories we need on our python path
//common.pyc is part of the dumbo/backends and typedbytes should be in our
//python path so we probably don't need the next line.
String pypath="common.pyc:typedbytes-0.3.6-py2.6.egg";


//How much memory to allow the Mapper/Reducer
//in bytes. I had to increase this beyond the default
//otherwise my python scripts ran into memory errors loading c-extensions
String memlimit="1256000000";

//set the home directory and python egg cache
//I use the LinuxTaskController so that MR jobs
//run as the user who submitted them so we can 
//access files in our home directory
//We still need to set the home directory otherwise it defaults
//to the home directory for the map reduce user.
String homedir=System.getProperty("user.home");
//set the python egg cache to the users home directory
String pyeggcache=System.getProperty("user.home")+"/.python-eggs";

//We can create the job configuration using the same paramters
//we would pass on the command line
//we specify the reducer as Identity to make it mapper only?
JobConf streamConf = StreamJob.createJob( new String[]{
  "-input", pymrin, 
  "-output", pymrout,  

  // The first argument is the iteration number which should just be 0
  // The second number determines how much memory the process can use
  // its equivalent to using the -memlimit command with dumbo
  "-mapper", pyexe+pyscript+ "  map 0 "+memlimit,
  //don't specify the reducer to make it map only?
  "-reducer", "/usr/local/python2.6/bin/python "+pyscript +" red  0 "+memlimit,
  //Environment variables used by dumbo
  "-cmdenv","dumbo_mrbase_class=dumbo.backends.common.MapRedBase",
  "-cmdenv","dumbo_jk_class=dumbo.backends.common.JoinKey",
  "-cmdenv","dumbo_runinfo_class=dumbo.backends.streaming.StreamingRunInfo",
  "-cmdenv","PYTHONPATH="+pypath,
  "-cmdenv","PYTHON_EGG_CACHE="+pyeggcache,
  //set the home directory to the user submitting the job because
  //otherwise it appears to be set to the mapred user which causes problems
  "-cmdenv","HOME="+homedir,     
  "-inputformat","org.apache.hadoop.streaming.AutoInputFormat",
  //use a sequence file for the output format,
  "-outputformat","org.apache.hadoop.mapred.SequenceFileOutputFormat",
  "-jobconf","stream.map.input=typedbytes",
  "-jobconf","stream.reduce.input=typedbytes", 
  "-jobconf","stream.map.output=typedbytes", 
  "-jobconf","stream.reduce.output=typedbytes",
  "-jobconf","mapred.job.name="+pipe_func,
  //Increase the memory for the virtual machines so we don't run out of memory loading the dll
  //"-jobconf","mapred.child.java.opts=-Xmx1000m"    

});
boolean deleteSinkOnInit=true;
mrflow = new MapReduceFlow("streaming flow", streamConf, deleteSinkOnInit);

Python Job

On the python side, you create Mappers and Reducers as you would for a regular dumbo job

class Mapper(Base):
 """
 This is the base mapper which pushes data through a pipe 
 """
 
 def __call__(self,key,value):
  """Push the values through the operator

  Key - shouldn't contain any useful information
  value - Should be a dictionary representing the tuple 
  """

  #..your code..
  yield out_key,out
   
   
class Reducer(Base):
 """
 The reducer
 """
 
 def __call__(self,key,valgen):
  """Push the values through the operator
  
  Parameters
  ------------------------------------------------------------------------------------------------
  valgen - Generator to  iterate over the values
  """

  #.. your code here  
  yield key,out

def run():
 """We add this run function so that other functions can actually invoke it."""

 import dumbo
 job=dumbo.Job()
 job.additer(Mapper,Reducer)
 job.run()

if __name__ == "__main__":
        run()

No comments:

Post a Comment