Monday, July 7, 2008

Cascading Hadoop

When running MapReduce stuff, it very quickly becomes apparent that simple jobs need queued up then an operation run on the output of another, or perhaps an operation run on multiple outputs from another - almost like a cascaded effect one could say... Enter Cascading, a project that aims to build fault tolerant workflows on top of Hadoop.

I have only just started to play with it (similar to Hadoop it is not in the public maven repositories, so of course I wrote my own pom to do a local install). It is a new project and a small team (1 guy?) but it looks promising, although I think it misses a few more getting started examples for common operations - but I figure if people blog about it like here, the examples will pop up pretty quickly.

One of the nice features that immediately attracted me to it was the fact that it can do visualisations of the workflow for you like so (this is just an example workflow, not the code below):


And here is the example. I am using my standard subset of GBIF data, and grouping together the records by scientific name and then sorting them on resource and basisOfRecord (some databases can't mix group by and order by in SQL without temp table creation, so this seemed like a nice example).


public void run(String input, String output) {
// source is the input file - here I read from local file system
// (e.g. not the distributed fs)
Tap source = new Lfs( new TextLine(), input );

// sink is the output file - here I write to local file system
// (e.g. not the distributed fs)
Tap sink = new Lfs( new TextLine(), output, true );

// my tab file schema
Fields dwcFields = new Fields( "resource", "kingdom", "phylum", "class", "order", "family",
"genus", "scientificName", "basisOfRecord", "latitude", "longitude" );

// parse the data
Pipe pipe = new Each( "parser", new Fields( "line" ), new RegexSplitter(dwcFields));

// define some group and sort fields
Fields groupFields = new Fields("scientificName");
Fields sortFields = new Fields("resource", "basisOfRecord");

// a group by with a sort...
// note that this takes the previous pipe
pipe = new GroupBy(pipe, groupFields, sortFields);

// connect the assembly to the SOURCE and SINK taps
Flow parsedLogFlow = new FlowConnector().connect( source, sink, pipe );

// start execution of the flow (either locally or on the cluster)
parsedLogFlow.start();

// block until the flow completes
parsedLogFlow.complete();
}


So this was very simple, and it was only the first night playing. Note that this code does not have a mention of a MapReduce job, or anything more complex than a simple tap, pipe, sink workflow,,,

I will proceed by trying to do a much more complex workflow - I think splitting the world data into the 2.8 grids I proposed earlier (6 zoom levels), followed by doing some breakdowns for various analytics I anticipate producing. Then I will report back with metrics running from EC2.
What I would really like to do, is have some nice metadata that accompanies the datafiles at each process that gives the semantics of the file - e.g. something that describes the columns in the tab file, so I expect to use the TDWG vocabularies and do some RDF (perhaps RDF represented as JSON?) This way I can set up the Fields automatically, based on the content of the file, and accept different input formats.

No comments: