My starting point was Nate Murray's post which showed how to integrate a Hadoop Streaming Job into Cascading. The limits of this approach are
- Data is passed in and out of the python code as as text
- You have to parse the fields out of the lines; i.e the data is no longer represented as tuples.
My solution has the following elements:
- We create a customized cascading tap which will encode/decode tuples using Hadoop Typed Bytes
- We "wrap" our python code using Dumbo so that we can let Dumbo handle converting the typedbytes to native python types.
- We setup a cascading streaming job which uses Dumbo to run our Map-Reduce job.
Creating A Cascading Tap For TypedBytes
Update 04-26-2011. There's a problem with my original code. The source part of the tap doesn't properly set the field names based on the names in the dictionary.import java.beans.ConstructorProperties; import java.io.IOException; import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.Tuples; import cascading.scheme.Scheme; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.typedbytes.*; /** * A TypedBytesMapScheme is a type of {@link Scheme}, which uses * sequence files to encode the tuples. * The tuples are encoded as JavaMap objects and then serialized using hadoop typed bytes. * We use Map objects so that we know which field corresponds to each position. * We waste space by encoding the field names with each tuple, but when we decode * the tuples in python we get a dictionary so we know what each field is. */ public class TypedBytesMapScheme extends Scheme { /** Field serialVersionUID */ private static final long serialVersionUID = 1L; /** Protected for use by TempDfs and other subclasses. Not for general consumption. */ protected TypedBytesMapScheme() { super( null ); } /** * Creates a new SequenceFile instance that stores the given field names. * * @param fields */ @ConstructorProperties({"fields"}) public TypedBytesMapScheme( Fields fields ) { super( fields, fields ); } @Override public void sourceInit( Tap tap, JobConf conf ) { conf.setInputFormat( SequenceFileInputFormat.class ); } @Override public void sinkInit( Tap tap, JobConf conf ) { conf.setOutputKeyClass( TypedBytesWritable.class ); // supports TapCollector conf.setOutputValueClass( TypedBytesWritable.class ); // supports TapCollector conf.setOutputFormat( SequenceFileOutputFormat.class ); } @Override public Tuple source( Object key, Object value ) { //cast value to a typedbytes writable objects TypedBytesWritable bytes = (TypedBytesWritable)value; //It should be a Map java.util.Map <String, Object> items=(java.util.Map <String, Object >) bytes.getValue(); //create a new tuple Tuple data=new Tuple(); Fields sfields=this.getSourceFields(); java.util.Iterator it=sfields.iterator(); for (int i=0;i<sfields.size();i++){ //get the field at position i Comparable f=sfields.get(i); //Will it be a string or a io.Text? if (f instanceof org.apache.hadoop.io.Text){ org.apache.hadoop.io.Text name = (org.apache.hadoop.io.Text)f; data.add(items.get(name.toString())); } if (f instanceof String ){ String name = (String)f; data.add(items.get(name)); } else{ //Not sure what to do. Throwing an exception appears to cause a problem System.out.println("Error TypedBytesMapScheme line 114: field name wasn't a string or text not sure what to do. Not sure which item to get. Will just get this position argument"); data.add(items.get(i)); //throw new Exception("Field isn't named by a string. We can't handle this currently"); } } return data; } @Override public void sink( TupleEntry tupleEntry, OutputCollector outputCollector ) throws IOException { Tuple result = getSinkFields() != null ? tupleEntry.selectTuple( getSinkFields() ) : tupleEntry.getTuple(); java.util.Map <String,Object> objs=new java.util.HashMap<String,Object>(); //loop over all the enteries in the tuple Object item; //tupleentry.getFields() returns the fields actually in the tuple // getSinkFields() is the field selector in the tap // we apply this selector to the actual fields Fields sfields=tupleEntry.getFields().select(getSinkFields()); if (sfields == null){ System.out.println("\t TypedBytesMapScheme: sinkfields is Null"); } if (sfields.isAll()){ //System.out.println("\t TypedBytesMapScheme: sfields is ALL"); //Since we want to select all fields use fields in the tuple entry } System.out.println("\t TypedBytesMapScheme: sfields.size="+sfields.size()); for (int i=0;i < sfields.size();i++){ //get the name of this field String name = sfields.get(i).toString(); item = result.get(i); //check if Item implements the writable inteface //Do we need to do this , why can't we just get the item // and set it as the value if (item instanceof org.apache.hadoop.io.Text){ item=item.toString(); } else if(item instanceof org.apache.hadoop.io.LongWritable){ //assume its one of the objects line LongWritable that implements a get function to get the value item=((org.apache.hadoop.io.LongWritable)item).get(); } else if(item instanceof org.apache.hadoop.io.IntWritable){ //assume its one of the objects line LongWritable that implements a get function to get the value item=((org.apache.hadoop.io.IntWritable)item).get(); } else if(item instanceof org.apache.hadoop.io.DoubleWritable){ //assume its one of the objects line LongWritable that implements a get function to get the value item=((org.apache.hadoop.io.DoubleWritable)item).get(); } //item.getClass(). objs.put(name, item); }
TypedBytesWritable value = new TypedBytesWritable(); TypedBytesWritable key = new TypedBytesWritable(); key.setValue(0); value.setValue(objs); outputCollector.collect(key,value ); } }
Setting up a Cascading Flow which uses Dumbo
Below is the java code for setting up the MapReduce flow which uses Dumbo to call the python/ mapper reducer.
//Path to the python executable String pyexe="/usr/local/python2.6/bin/python "; //path to the python script to execute String pyscript="Utilities/dpf/cascading/run_each_pipe.py"; //Paths of the input/output files for our job //These files will be written/read by other flows in our cascade //The should contain tuples encoded using our TypedBytesMapScheme. String pymrin="somepath/in"; String pymrout="somepath/out"; //create a list of directories we need on our python path //common.pyc is part of the dumbo/backends and typedbytes should be in our //python path so we probably don't need the next line. String pypath="common.pyc:typedbytes-0.3.6-py2.6.egg"; //How much memory to allow the Mapper/Reducer //in bytes. I had to increase this beyond the default //otherwise my python scripts ran into memory errors loading c-extensions String memlimit="1256000000"; //set the home directory and python egg cache //I use the LinuxTaskController so that MR jobs //run as the user who submitted them so we can //access files in our home directory //We still need to set the home directory otherwise it defaults //to the home directory for the map reduce user. String homedir=System.getProperty("user.home"); //set the python egg cache to the users home directory String pyeggcache=System.getProperty("user.home")+"/.python-eggs"; //We can create the job configuration using the same paramters //we would pass on the command line //we specify the reducer as Identity to make it mapper only? JobConf streamConf = StreamJob.createJob( new String[]{ "-input", pymrin, "-output", pymrout, // The first argument is the iteration number which should just be 0 // The second number determines how much memory the process can use // its equivalent to using the -memlimit command with dumbo "-mapper", pyexe+pyscript+ " map 0 "+memlimit, //don't specify the reducer to make it map only? "-reducer", "/usr/local/python2.6/bin/python "+pyscript +" red 0 "+memlimit, //Environment variables used by dumbo "-cmdenv","dumbo_mrbase_class=dumbo.backends.common.MapRedBase", "-cmdenv","dumbo_jk_class=dumbo.backends.common.JoinKey", "-cmdenv","dumbo_runinfo_class=dumbo.backends.streaming.StreamingRunInfo", "-cmdenv","PYTHONPATH="+pypath, "-cmdenv","PYTHON_EGG_CACHE="+pyeggcache, //set the home directory to the user submitting the job because //otherwise it appears to be set to the mapred user which causes problems "-cmdenv","HOME="+homedir, "-inputformat","org.apache.hadoop.streaming.AutoInputFormat", //use a sequence file for the output format, "-outputformat","org.apache.hadoop.mapred.SequenceFileOutputFormat", "-jobconf","stream.map.input=typedbytes", "-jobconf","stream.reduce.input=typedbytes", "-jobconf","stream.map.output=typedbytes", "-jobconf","stream.reduce.output=typedbytes", "-jobconf","mapred.job.name="+pipe_func, //Increase the memory for the virtual machines so we don't run out of memory loading the dll //"-jobconf","mapred.child.java.opts=-Xmx1000m" }); boolean deleteSinkOnInit=true; mrflow = new MapReduceFlow("streaming flow", streamConf, deleteSinkOnInit);
Python Job
On the python side, you create Mappers and Reducers as you would for a regular dumbo jobclass Mapper(Base): """ This is the base mapper which pushes data through a pipe """ def __call__(self,key,value): """Push the values through the operator Key - shouldn't contain any useful information value - Should be a dictionary representing the tuple """ #..your code.. yield out_key,out class Reducer(Base): """ The reducer """ def __call__(self,key,valgen): """Push the values through the operator Parameters ------------------------------------------------------------------------------------------------ valgen - Generator to iterate over the values """ #.. your code here yield key,out def run(): """We add this run function so that other functions can actually invoke it.""" import dumbo job=dumbo.Job() job.additer(Mapper,Reducer) job.run() if __name__ == "__main__": run()