My starting point was Nate Murray's post which showed how to integrate a Hadoop Streaming Job into Cascading. The limits of this approach are
- Data is passed in and out of the python code as as text
- You have to parse the fields out of the lines; i.e the data is no longer represented as tuples.
My solution has the following elements:
- We create a customized cascading tap which will encode/decode tuples using Hadoop Typed Bytes
- We "wrap" our python code using Dumbo so that we can let Dumbo handle converting the typedbytes to native python types.
- We setup a cascading streaming job which uses Dumbo to run our Map-Reduce job.
Creating A Cascading Tap For TypedBytes
Update 04-26-2011. There's a problem with my original code. The source part of the tap doesn't properly set the field names based on the names in the dictionary.import java.beans.ConstructorProperties; import java.io.IOException; import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.Tuples; import cascading.scheme.Scheme; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.typedbytes.*; /** * A TypedBytesMapScheme is a type of {@link Scheme}, which uses * sequence files to encode the tuples. * The tuples are encoded as JavaMap objects and then serialized using hadoop typed bytes. * We use Map objects so that we know which field corresponds to each position. * We waste space by encoding the field names with each tuple, but when we decode * the tuples in python we get a dictionary so we know what each field is. */ public class TypedBytesMapScheme extends Scheme { /** Field serialVersionUID */ private static final long serialVersionUID = 1L; /** Protected for use by TempDfs and other subclasses. Not for general consumption. */ protected TypedBytesMapScheme() { super( null ); } /** * Creates a new SequenceFile instance that stores the given field names. * * @param fields */ @ConstructorProperties({"fields"}) public TypedBytesMapScheme( Fields fields ) { super( fields, fields ); } @Override public void sourceInit( Tap tap, JobConf conf ) { conf.setInputFormat( SequenceFileInputFormat.class ); } @Override public void sinkInit( Tap tap, JobConf conf ) { conf.setOutputKeyClass( TypedBytesWritable.class ); // supports TapCollector conf.setOutputValueClass( TypedBytesWritable.class ); // supports TapCollector conf.setOutputFormat( SequenceFileOutputFormat.class ); } @Override public Tuple source( Object key, Object value ) { //cast value to a typedbytes writable objects TypedBytesWritable bytes = (TypedBytesWritable)value; //It should be a Map java.util.Map <String, Object> items=(java.util.Map <String, Object >) bytes.getValue(); //create a new tuple Tuple data=new Tuple(); Fields sfields=this.getSourceFields(); java.util.Iterator it=sfields.iterator(); for (int i=0;i<sfields.size();i++){ //get the field at position i Comparable f=sfields.get(i); //Will it be a string or a io.Text? if (f instanceof org.apache.hadoop.io.Text){ org.apache.hadoop.io.Text name = (org.apache.hadoop.io.Text)f; data.add(items.get(name.toString())); } if (f instanceof String ){ String name = (String)f; data.add(items.get(name)); } else{ //Not sure what to do. Throwing an exception appears to cause a problem System.out.println("Error TypedBytesMapScheme line 114: field name wasn't a string or text not sure what to do. Not sure which item to get. Will just get this position argument"); data.add(items.get(i)); //throw new Exception("Field isn't named by a string. We can't handle this currently"); } } return data; } @Override public void sink( TupleEntry tupleEntry, OutputCollector outputCollector ) throws IOException { Tuple result = getSinkFields() != null ? tupleEntry.selectTuple( getSinkFields() ) : tupleEntry.getTuple(); java.util.Map <String,Object> objs=new java.util.HashMap<String,Object>(); //loop over all the enteries in the tuple Object item; //tupleentry.getFields() returns the fields actually in the tuple // getSinkFields() is the field selector in the tap // we apply this selector to the actual fields Fields sfields=tupleEntry.getFields().select(getSinkFields()); if (sfields == null){ System.out.println("\t TypedBytesMapScheme: sinkfields is Null"); } if (sfields.isAll()){ //System.out.println("\t TypedBytesMapScheme: sfields is ALL"); //Since we want to select all fields use fields in the tuple entry } System.out.println("\t TypedBytesMapScheme: sfields.size="+sfields.size()); for (int i=0;i < sfields.size();i++){ //get the name of this field String name = sfields.get(i).toString(); item = result.get(i); //check if Item implements the writable inteface //Do we need to do this , why can't we just get the item // and set it as the value if (item instanceof org.apache.hadoop.io.Text){ item=item.toString(); } else if(item instanceof org.apache.hadoop.io.LongWritable){ //assume its one of the objects line LongWritable that implements a get function to get the value item=((org.apache.hadoop.io.LongWritable)item).get(); } else if(item instanceof org.apache.hadoop.io.IntWritable){ //assume its one of the objects line LongWritable that implements a get function to get the value item=((org.apache.hadoop.io.IntWritable)item).get(); } else if(item instanceof org.apache.hadoop.io.DoubleWritable){ //assume its one of the objects line LongWritable that implements a get function to get the value item=((org.apache.hadoop.io.DoubleWritable)item).get(); } //item.getClass(). objs.put(name, item); }
TypedBytesWritable value = new TypedBytesWritable(); TypedBytesWritable key = new TypedBytesWritable(); key.setValue(0); value.setValue(objs); outputCollector.collect(key,value ); } }
Setting up a Cascading Flow which uses Dumbo
Below is the java code for setting up the MapReduce flow which uses Dumbo to call the python/ mapper reducer.
//Path to the python executable
String pyexe="/usr/local/python2.6/bin/python ";
//path to the python script to execute
String pyscript="Utilities/dpf/cascading/run_each_pipe.py";
//Paths of the input/output files for our job
//These files will be written/read by other flows in our cascade
//The should contain tuples encoded using our TypedBytesMapScheme.
String pymrin="somepath/in";
String pymrout="somepath/out";
//create a list of directories we need on our python path
//common.pyc is part of the dumbo/backends and typedbytes should be in our
//python path so we probably don't need the next line.
String pypath="common.pyc:typedbytes-0.3.6-py2.6.egg";
//How much memory to allow the Mapper/Reducer
//in bytes. I had to increase this beyond the default
//otherwise my python scripts ran into memory errors loading c-extensions
String memlimit="1256000000";
//set the home directory and python egg cache
//I use the LinuxTaskController so that MR jobs
//run as the user who submitted them so we can
//access files in our home directory
//We still need to set the home directory otherwise it defaults
//to the home directory for the map reduce user.
String homedir=System.getProperty("user.home");
//set the python egg cache to the users home directory
String pyeggcache=System.getProperty("user.home")+"/.python-eggs";
//We can create the job configuration using the same paramters
//we would pass on the command line
//we specify the reducer as Identity to make it mapper only?
JobConf streamConf = StreamJob.createJob( new String[]{
"-input", pymrin,
"-output", pymrout,
// The first argument is the iteration number which should just be 0
// The second number determines how much memory the process can use
// its equivalent to using the -memlimit command with dumbo
"-mapper", pyexe+pyscript+ " map 0 "+memlimit,
//don't specify the reducer to make it map only?
"-reducer", "/usr/local/python2.6/bin/python "+pyscript +" red 0 "+memlimit,
//Environment variables used by dumbo
"-cmdenv","dumbo_mrbase_class=dumbo.backends.common.MapRedBase",
"-cmdenv","dumbo_jk_class=dumbo.backends.common.JoinKey",
"-cmdenv","dumbo_runinfo_class=dumbo.backends.streaming.StreamingRunInfo",
"-cmdenv","PYTHONPATH="+pypath,
"-cmdenv","PYTHON_EGG_CACHE="+pyeggcache,
//set the home directory to the user submitting the job because
//otherwise it appears to be set to the mapred user which causes problems
"-cmdenv","HOME="+homedir,
"-inputformat","org.apache.hadoop.streaming.AutoInputFormat",
//use a sequence file for the output format,
"-outputformat","org.apache.hadoop.mapred.SequenceFileOutputFormat",
"-jobconf","stream.map.input=typedbytes",
"-jobconf","stream.reduce.input=typedbytes",
"-jobconf","stream.map.output=typedbytes",
"-jobconf","stream.reduce.output=typedbytes",
"-jobconf","mapred.job.name="+pipe_func,
//Increase the memory for the virtual machines so we don't run out of memory loading the dll
//"-jobconf","mapred.child.java.opts=-Xmx1000m"
});
boolean deleteSinkOnInit=true;
mrflow = new MapReduceFlow("streaming flow", streamConf, deleteSinkOnInit);
Python Job
On the python side, you create Mappers and Reducers as you would for a regular dumbo jobclass Mapper(Base):
"""
This is the base mapper which pushes data through a pipe
"""
def __call__(self,key,value):
"""Push the values through the operator
Key - shouldn't contain any useful information
value - Should be a dictionary representing the tuple
"""
#..your code..
yield out_key,out
class Reducer(Base):
"""
The reducer
"""
def __call__(self,key,valgen):
"""Push the values through the operator
Parameters
------------------------------------------------------------------------------------------------
valgen - Generator to iterate over the values
"""
#.. your code here
yield key,out
def run():
"""We add this run function so that other functions can actually invoke it."""
import dumbo
job=dumbo.Job()
job.additer(Mapper,Reducer)
job.run()
if __name__ == "__main__":
run()
No comments:
Post a Comment