Instead, we recommend you create and IMPORT
a macro file containing the REGISTER
and DEFINE
statements. This is what we use in the sample code repo:
-- Paths %DEFAULT dsfp_dir '/path/to/data_science_fun_pack'; -- Versions; must include the leading dash when version is given %DEFAULT datafu_version '-1.2.1'; %DEFAULT piggybank_version ''; %DEFAULT pigsy_version '-2.1.0-SNAPSHOT'; REGISTER '$dsfp_dir/pig/pig/contrib/piggybank/java/piggybank$piggybank_version.jar'; REGISTER '$dsfp_dir/pig/datafu/datafu-pig/build/libs/datafu-pig$datafu_version.jar'; REGISTER '$dsfp_dir/pig/pigsy/target/pigsy$pigsy_version.jar'; DEFINE Transpose datafu.pig.util.TransposeTupleToBag(); DEFINE Digest datafu.pig.hash.Hasher('murmur3-32');
First, we define a few string defaults. Making the common root path a %DEFAULT
means you can override it at runtime, and simplifies the lines that follow. Parameterizing the versions makes them visible and also lets you easily toggle between versions from the commandline for smoke testing.
Next we register the jars, interpolating the paths and versions; then define the standard collection of UDFs we use. These definitions are executed for all scripts that import the file, but we were unable to detect any impact on execution time.
One reason you might find yourself splitting a table is to create multiple files on disk according to some key.
If instead you’re looking to partition directly into files named for a key, use the multistorage storefunc from the Piggybank UDF collection. As opposed to SPLIT, each record goes into exactly one file. Here is how to partition player seasons by primary team:
There might be many reasons to do this splitting, but one of the best is to accomplish the equivalent of what traditional database admins call "vertical partitioning". You are still free to access the table as a whole, but in cases where one field is over and over again used to subset the data, the filtering can be done without ever even accessing the excluded data. Modern databases have this feature built-in and will apply it on your behalf based on the query, but our application of it here is purely ad-hoc. You will need to specify the subset of files yourself at load time to take advantage of the filtering.
bat_season = LOAD 'bat_season' AS (...); STORE bat_season INTO '/data/out/baseball/seasons_by_team' USING MultiStorage('/data/out/baseball/seasons_by_team', '10'); -- team_id, field 10 STORE ... multistorage;
STORE events INTO '$out_dir/evs_away' USING MultiStorage('$out_dir/evs_away','5'); -- field 5: away_team_id STORE events INTO '$out_dir/evs_home' USING MultiStorage('$out_dir/evs_home','6'); -- field 6: home_team_id
This script will run a map-only job with 9 map tasks (assuming 1GB+ of data and a 128MB block size). With MultiStorage, all Boston Red Sox (team id BOS
) home games that come from say the fifth map task will go into $out_dir/evs_home/BOS/part-m-0004
(contrast that to the normal case of $out_dir/evs_home/part-m-00004
). Each map task would write its records into the sub directory named for the team with the part-m-
file named for its taskid index.
Since most teams appear within each input split, each subdirectory will have a full set of part-m-00000 through part-m-00008 files. In our runs, we ended up with XXX output files — not catastrophic, but (a) against best practices, (b) annoying to administer, (c) the cause of either nonlocal map tasks (if splits are combined) or proliferation of downstream map tasks (if splits are not combined). The methods of (REF) "Cleaning up Many Small Files" would work, but you’ll need to run a cleanup job per team. Better by far is to precede the STORE USING MultiStorage
step with a GROUP BY team_id
. We’ll learn all about grouping next chapter, but its use should be clear enough: all of each team’s events will be sent to a common reducer; as long as the Pig pig.output.lazy
option is set, the other reducers will not output files.
events_by_away = FOREACH (GROUP events BY away_team_id) GENERATE FLATTEN(events); events_by_home = FOREACH (GROUP events BY home_team_id) GENERATE FLATTEN(events); STORE events_by_away INTO '$out_dir/evs_away-g' USING MultiStorage('$out_dir/evs_away-g','5'); -- field 5: away_team_id STORE events_by_home INTO '$out_dir/evs_home-g' USING MultiStorage('$out_dir/evs_home-g','6'); -- field 6: home_team_id
The output has a directory for each key, and within directory that the same part-NNNNN
files of any map-reduce job.
This means the count of output files is the number of keys times the number of output slots, which can lead to severe many small files problem. As mentioned in Chapter 3 (REF), many small files is Not Good. If you precede the STORE operation by a GROUP BY
on the key, the reducer guarantee provides that each subdirectory will only have one output file.
We won’t go into much detail, but one final set of patterns is to split a table into uniform chunks. If you don’t need the chunks to be exactly sized, you can apply a final ORDER BY
operation on a uniformly-distributed key — see the section on "Shuffling the Records in a Table" in the next chapter (REF).
To split into chunks with an exact number of lines, first use RANK
to number each line, then prepare a chunk key using the line number modulo the chunk size, and store into chunks using MultiStorage. Since the rank operation’s reducers number their records sequentially, only a few reducers are involved with each chunk, and so you won’t hit the small files problem. Splitting a table into blocks of fixed size is naturally provided by the HDFS block size parameter, but we’re not aware of a good way to do so explicitly.
An ORDER BY statement with parallelism forced to (output size / desired chunk size) will give you roughly uniform chunks,
SET DEFAULT_PARALLEL 3; %DEFAULT chunk_size 10000;
-- Supply enough keys to rank to ensure a stable sorting bat_seasons_ranked = RANK bat_seasons BY (player_id, year_id) bat_seasons_chunked = FOREACH (bat_seasons_ranked) GENERATE SPRINTF("%03d", FLOOR(rank/$chunk_size)) AS chunk_key, player_id..; -- Writes the chunk key into the file, like it or not. STORE bat_seasons_chunked INTO '$out_dir/bat_seasons_chunked' USING MultiStorage('$out_dir/bat_seasons_chunked','0'); -- field 0: chunk_key
Note that in current versions of Pig, the RANK operator forces parallelism one. If that’s unacceptable, we’ll quickly sketch a final alternative but send you to the sample code for details. You can instead use RANK on the map side modulo the number of chunks, group on that and store with MultiStorage. This will, however, have non-uniformity in actual chunk sizes of about the number of map-tasks — the final lines of each map task are more likely to short-change the higher-numbered chunks. On the upside, the final chunk isn’t shorter than the rest (as it is with the prior method or the unix split command).
%DEFAULT n_chunks 8; bats_ranked_m = FOREACH (RANK bat_seasons) GENERATE MOD(rank, $n_chunks) AS chunk_key, player_id..; bats_chunked_m = FOREACH (GROUP bats_ranked_m BY chunk_key) GENERATE FLATTEN(bats_ranked_m); STORE bats_chunked_m INTO '$out_dir/bats_chunked_m' USING MultiStorage('$out_dir/bat_seasons_chunked','0'); -- field 0: chunk_key
With no sort key fields, it’s done on the map side (avoiding the single-reducer drawback of RANK)
The Many Small Files problem is so pernicious because Hadoop natively assigns each mapper to only one file, and so a normal mapper-only job can only increase the number of files. We know of two ways to reorganize the records of a table into fewer files.
One is to perform a final ORDER BY
operation [1]. Since this gives the side benefit of allowing certain optimized join operations, we like to do this for "gold" datasets that will be used by many future jobs.
Sorting is a fairly expensive operation, though; luckily, Pig can do this reasonably well with a mapper-only job by setting the pig.splitCombination
configuration to true and setting pig.maxCombinedSplitSize
to the size of the input divided by the number of files you’d like to produce.
REGISTERING STUFF IN PIG BELOW
set pig.splitCombination true; set pig.maxCombinedSplitSize 2100100100;
The maxCombinedSplitSize
should be much larger than the HDFS block size so that blocks are fully used. Also note the old sailor’s trick in the last line — since there’s no essential difference between 2 billion bytes, 2 gigabytes, or a number nearby, the value 2100100100
is much easier to read accurately than 2000000000
or 2147483648
.
Lastly, you can set the pig.additional.jars
and udf.import.list
java properties. For packages that you want to regard as being effectively built-in, this is our favorite method — but the hardest to figure out. We can’t go into the details (see the Pig documentation, there are many) but we can show you how to match what we used above:
# Remove backslashes and spaces: these must sit on the same line pig.additional.jars=\ /path/to/data_science_fun_pack/pig/datafu/datafu-pig/build/libs/datafu-pig-1.2.1.jar:\ /path/to/data_science_fun_pack/pig/pig/contrib/piggybank/java/piggybank.jar:\ /path/to/data_science_fun_pack/pig/pigsy/target/pigsy-2.1.0.jar # Remove backslashes and spaces: these also must sit on the same line udf.import.list=\ datafu.pig.bags:datafu.pig.hash:datafu.pig.stats:datafu.pig.sets:datafu.pig.util:\ org.apache.pig.piggybank.evaluation:pigsy.text