Fork me on GitHub

Nick
Nack

Why

Ever wanted to split your Hadoop streaming output into multiple directories? How about filtering by multiple keys in a single pass?

Nick Nack makes your Hadoop streaming jobs even better by hooking into the rich support for writing to multiple outputs that Hadoop offers. This library (and further documentation) is tailored for working with mrjob, but can be used with any Hadoop streaming job.

How

Start slicing your mapper or reducer output with these simple steps.

class MyJob(MRJob):

  # tell mrjob not to format our output -- we're going to leave that to hadooop
  OUTPUT_PROTOCOL = mrjob.protocol.RawProtocol

  # tell hadoop to massage our mrjob output using this output format
  HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'

  # mrjob 0.5.3+ only, see note below if you are using an older version
  LIBJARS = ['nicknack-1.0.1.jar']

  def mapper(self, _, line):
    yield "csv-output", '1,two,"3 four"'
    yield "json-output", json.dumps({"one":1, "two": "two"})

mrjob 0.5.3+

  • As long as the nicknack jar is in the same directory as your script, it will be discovered and uploaded by mrjob.
    See the docs for details.

Pre 0.5.3:

  • Modify mrjob to stick the jar on the requested machine:
> python myjob.py --bootstrap 'sudo cp nicknack-1.0.1.jar# /home/hadoop/nicknack.jar' --hadoop-arg -libjars --hadoop-arg /home/hadoop/nicknack.jar

Or using a mrjob.conf config file:

runners:
  emr:
    bootstrap:
      - "sudo cp nicknack-1.0.1.jar# /home/hadoop/nicknack.jar"
    hadoop_extra_args:
      - "-libjars"
      - "/home/hadoop/nicknack.jar"

That’s it! Your mrjob streaming output will now be processed by a Nick Nack formatter.

Pro tip: mrjob will only apply the HADOOP_OUTPUT_FORMAT parameter to the last step in your sequence of steps. So along with normal MapReduce jobs, it will also work on your map-only or reduce-only jobs.
Formatters

To use a Nick Nack formatter you must set both the OUTPUT_PROTOCOL and the HADOOP_OUTPUT_FORMAT fields in your class definition. The package name for all the Nick Nack formatters is nicknack, so prefix the formatter name with this. For example, to use the MultipleValueOutputFormat formatter, your class should have:

OUTPUT_PROTOCOL = mrjob.protocol.RawProtocol
HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'

MultipleValueOutputFormat

Use Case: You want to split your output into directories specified by a key, and not include the key in the output.

mrjob Output:

yield "filename1", json.dumps({"some values": "other JSON"})
yield "otherfile", json.dumps({"other values": "more JSON"})

Hadoop Input:

filename1   {"some values": "other JSON"}
otherfile   {"other values": "more JSON"}

Output in [outputdir]/filename1/part-00000:

{"some values": "other JSON"}

Output in [outputdir]/otherfile/part-00000

{"other values": "more JSON"}

MultipleTextOutputFormatByKey

Use Case: You want to split your output into directories specified by a key, but also keep the keys in the output.

  • This is nearly the same as MultipleValueOutputFormat, except both the key and value are included in the output.

mrjob Output:

yield "filename1", json.dumps({"some values": "other JSON"})
yield "otherfile", json.dumps({"other values": "more JSON"})

Hadoop Input:

filename1	{"some values": "other JSON"}
otherfile	{"other values": "more JSON"}

Output in [outputdir]/filename1/part-00000:

filename1	{"some values": "other JSON"}

Output in [outputdir]/otherfile/part-00000

otherfile	{"other values": "more JSON"}

MultipleSimpleOutputFormat

Use Case: You want to split out output into specific directories, where the directory name isn’t a part of the output. Your key should be a tuple of (filename,key) within a space delimited string (we break on the first space to determine the filename, the remainder is the key.

Pro tip: Your directory names must not contain spaces. If you must have spaces, use one of MultipleJSONOutputFormat or MultipleCSVOutputFormat.

mrjob Output:

yield "dirname1 key1", json.dumps({"some values": "other JSON"})
yield "dirname2 key2", json.dumps({"other values": "more JSON"})

Hadoop Input:

dirname1 key1	{"some values": "other JSON"}
dirname2 key2	{"other values": "more JSON"}

Output in [outputdir]/dirname1/part-00000:

key1	{"some values": "other JSON"}

Output in [outputdir]/dirname2/part-00000

key2	{"other values": "more JSON"}

MultipleJSONOutputFormat

Use Case: You want to split out output into specific directories, where the directory name isn’t a part of the output. Your key should be a JSON array containing exactly two elements, 1) the directory name and 2) the key to write in the output.

  • This formatter adds a tiny bit of overhead as we need to decode the json key for each line processed.

mrjob Output:

yield json.dumps(["dirname1", "key1"]), json.dumps({"some values": "other JSON"})
yield json.dumps(["dirname2", "key2"]), json.dumps({"other values": "more JSON"})

Hadoop Input:

["dirname1", "key1"]	{"some values": "other JSON"}
["dirname2", "key2"]	{"other values": "more JSON"}

Output in [outputdir]/dirname1/part-00000:

key1	{"some values": "other JSON"}

Output in [outputdir]/dirname2/part-00000

key2	{"other values": "more JSON"}

MultipleCSVOutputFormat

Use Case: You want to split out output into specific directories, where the directory name isn’t a part of the output. Your key should be a CSV row containing exactly two elements, 1) the directory name and 2) the key to write in the output.

  • This formatter adds a tiny bit of overhead as we need to decode the csv key for each line processed.
Pro tip: If you're keys are funky (contain quotes, spaces, ..), consider using the MultipleJSONOutputFormat to avoid csv escaping issues.

mrjob Output:

yield "dirname1,key1", json.dumps({"some values": "other JSON"})
yield "dirname2,key2", json.dumps({"other values": "more JSON"})

Hadoop Input:

dirname1,key1	{"some values": "other JSON"}
dirname2,key2	{"other values": "more JSON"}

Output in [outputdir]/dirname1/part-00000:

key1	{"some values": "other JSON"}

Output in [outputdir]/dirname2/part-00000

key2	{"other values": "more JSON"}

MultipleLeafValueOutputFormat

Warning! Advanced usage only. You probably want MultipleValueOutputFormat instead.

Use Case: You want complete control of the output files, including the leaf file (usually part-00XXX). This means the job must output unique keys per process or else you will lose data. For example, you may use this format if you guarantee each key is unique per reducer.

mrjob Output:

yield "full/filename", json.dumps({"some values": "other JSON"})
yield "otherpath/filename", json.dumps({"other values": "more JSON"})

Hadoop Input:

full/filename	{"some values": "other JSON"}
otherpath/filename	{"other values": "more JSON"}

Output in [outputdir]/full/filename:

{"some values": "other JSON"}

Output in [outputdir]/otherpath/filename

{"other values": "more JSON"}
Credits

The majority of this project is shameless copied over from oddjob, the original author deserves any and all credit. I couldn’t even come up with an original name.