Analytics/Kraken/Tutorial

Cool, Hadoop! What can I do?

Pig is a data processing language designed to run on top of a Hadoop cluster which makes it easy to run map reduce jobs. It comes with an interactive REPL, Grunt[1], and there are growing number of mature open source libraries for common tasks [2][3].

Resources edit

To get started writing pig scripts, there is a excellent (though outdated) O'Reilly Media publication Programming Pig[4] freely available online and a nice IBM DeveloperWorks introduction[5]. There is also a thorough official documentation wiki from the apache software foundation[6].

Examples edit

But perhaps the best place to start is looking at some example scripts to start playing with. The analytics team has a repository hosted on Github which contains a variety of Pig scripts [7] and user defined functions (UDFs)[8] which you can check out.

As a disclaimer, none of this code is very mature and a lot can depend on your set up, so it may take a while to get things actually working. For example, in order to use the piggybank UDFs or special Wikipedia specific UDFs you'll need to reference the shared library directory 'hdfs:///libs'. Our geocoding solution is also a big hacky at the moment, requiring you to place the geocoding database (usually called 'GeoIP.dat') in your hdfs home directory.

Here is a slightly modified and lightly annotated basic script which counts web requests count.pig:


REGISTER 'hdfs:///libs/piggybank.jar' --need to load jar files in order to use them
REGISTER 'hdfs:///libs/kraken.jar'
REGISTER 'hdfs:///libs/geoip-1.2.3.jar' --this needs to be located in your home directory
-- it is also necessary AFAWK to place the geocoding database, GeoIP.dat in your home directory
-- in order for geocoding to work

DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.RegexExtract();
--define statements shorten UDF reference
DEFINE PARSE org.wikimedia.analytics.kraken.pig.ParseWikiUrl();
DEFINE GEO org.wikimedia.analytics.kraken.pig.GetCountryCode('GeoIP.dat');
--or allow constructor arguments to be passed
DEFINE TO_MONTH org.wikimedia.analytics.kraken.pig.ConvertDateFormat('yyyy-MM-dd\'T\'HH:mm:ss', 'yyyy-MM');
DEFINE TO_MONTH_MS org.wikimedia.analytics.kraken.pig.ConvertDateFormat('yyyy-MM-dd\'T\'HH:mm:ss.SSS', 'yyyy-MM');
DEFINE HAS_MS org.wikimedia.analytics.kraken.pig.RegexMatch('.*\\.[0-9]{3}');

-- LOAD just takes a directory name and allows you 
-- to specify a schema with the AS command
LOG_FIELDS     = LOAD '$input' USING PigStorage(' ') AS (
    hostname,
    udplog_sequence,
    timestamp:chararray,
    request_time,
    remote_addr:chararray,
    http_status,
    bytes_sent,
    request_method:chararray,
    uri:chararray,
    proxy_host,
    content_type:chararray,
    referer,
    x_forwarded_for,
    user_agent );

LOG_FIELDS     = FILTER LOG_FIELDS BY (request_method MATCHES '(GET|get)');
LOG_FIELDS     = FILTER LOG_FIELDS BY content_type == 'text/html' OR (content_type == '-');

PARSED    = FOREACH LOG_FIELDS GENERATE
		    FLATTEN(PARSE(uri)) AS (language_code:chararray, isMobile:chararray, domain:chararray),
		    GEO(remote_addr) AS country,
		    (HAS_MS(timestamp) ? TO_MONTH_MS(timestamp) : TO_MONTH(timestamp)) AS month;

FILTERED    = FILTER PARSED BY (domain eq 'wikipedia.org');
        
GROUPED        = GROUP FILTERED BY (month, language_code, isMobile, country);

COUNT    = FOREACH GROUPED GENERATE
            FLATTEN(group) AS (month, language_code, isMobile, country),
            COUNT_STAR(FILTERED);

-- grunt / pig won't actually do anything until they see a STORE or DUMP command
STORE COUNT into '$output';

and the call to invoke it from inside of grunt if the script is in your home directory on HDFS:

grunt> exec -param input=/traffic/zero -param output=zero_counts hdfs:///user/<USER_NAME>/count.pig 

which should create a collection of files on HDFS in a directory names 'counts' within your home directory.

Monitoring edit

To monitor one of your jobs once it has started, go to http://jobs.analytics.wikimedia.org/cluster/apps/RUNNING and you can see the number of mappers and reducers finished.

(If you are using the Browser Configured Proxy, the job URL is http://analytics1010.eqiad.wmnet:8088/cluster/apps/RUNNING)

Known issues / Annoyances / Feature requests edit

  • Grunt periodically loses the ability to find files on the local file system. So when running a script which lives on an01, you might get the error "ERROR 1000: Error during parsing. File not found: src/kraken/src/main/pig/topk.pig". If this happens, just execute the script directly from the an01 shell with "$ pig ...".
  • The MaxMind GeoIP library requires that the actual database file (not the jar) live in your home directory so that it can be replicated across all of the nodes.

Links edit