Friday, December 20, 2013

Convert a bag of key-value pairs to map in Pig

Here is my input file: data.txt.
id_1    key_1  v_i1_a1
id_1    key_2  v_i1_a2
id_2    key_1  v_i2_a1
id_1    key_3  v_i1_a3
id_2    key_3  v_i2_a3
id_1    key_4  v_i1_a4
I want to get a map of key-value pairs for an ID like this:
(id_1, [key_1#v_i1_a1, key_2#v_i1_a2, key_3#v_i1_a3, key_4#v_i1_a4])
(id_2, [key_1#v_i2_a1, key_3#v_i2_a3])
Does this pig script work? TOMAP and BagToTuple are built-in UDFs.
data = load 'data.txt' as (id: chararray, key: chararray, value: chararray);

-- Group data by id
g_data = group data by id;
describe g_data;

-- Convert key-value pair bag to tuple, then convert to map
to_tuple = foreach g_data generate group, TOMAP(BagToTuple(data.(key, value)));
dump to_tuple;
describe to_tuple;
Unfortunately, the above script doesn't work. The output looks like this:
g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)
}}
(id_1,)
(id_2,)
to_tuple: {group: chararray,map[]}
What's wrong? Let us remove TOMAP and run it again
...
to_tuple = foreach g_data generate group, BagToTuple(data.(key, value)) as kv;
...
The results looks like this:
g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)
}}
(id_1,(key_4,v_i1_a4,key_3,v_i1_a3,key_2,v_i1_a2,key_1,v_i1_a1))
(id_2,(key_3,v_i2_a3,key_1,v_i2_a1))
to_tuple: {group: chararray,kv: (key: chararray,value: chararray)}
Did you find anything wrong? BagToTuple converts the bag to tuples correctly, but the schema of the tuple kv has only two fields. What about flatten the tuple, and use project-range? Unfortunately it still not working. Only one key-value pair in the map.
to_tuple = foreach g_data generate group, FLATTEN(BagToTuple(data.(key, value)));
...

to_map = foreach to_tuple generate group, TOMAP($1 ..);
dump to_map;
describe to_map;

g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)}}
(id_1,key_4,v_i1_a4,key_3,v_i1_a3,key_2,v_i1_a2,key_1,v_i1_a1)
(id_2,key_3,v_i2_a3,key_1,v_i2_a1)
to_tuple: {group: chararray,org.apache.pig.builtin.bagtotuple_8::key: chararray,
org.apache.pig.builtin.bagtotuple_8::value: chararray}
(id_1,[key_4#v_i1_a4])
(id_2,[key_3#v_i2_a3])
to_map: {group: chararray,map[]}

The problem is caused by BagToTuple, who sets the output schema to the tuple schema in the bag. This is not correct because the generated tuple's length is unknown. BagToTuple should let Pig to figure out the schema. The fix would be very simple: just find the file BagToTuple.java, and totally remove output schema method.

And you also need to resolve another issue related to TOMAP. Using TOMAP(BagToTuple(..)) won't work because the tuple will be passed as the first field. You have to FLATTEN the tuple and use position reference like this:

define MyBagToTuple my_package.BagToTuple();

data = load 'data.txt' as (id: chararray, key: chararray, value: chararray);

-- Group data by id
g_data = group data by id;
describe g_data;

-- Convert key-value pair bag to tuple, then convert to map
to_tuple = foreach g_data generate group, FLATTEN(MyBagToTuple(data.(key, value)));
dump to_tuple;
describe to_tuple;

to_map = foreach to_tuple generate $0 as id, TOMAP($1 ..) as kv;
dump to_map;
describe to_map;

g_data: {group: chararray,data: {(id: chararray,key: chararray,value: chararray)
}}
(id_1,key_4,v_i1_a4,key_3,v_i1_a3,key_2,v_i1_a2,key_1,v_i1_a1)
(id_2,key_3,v_i2_a3,key_1,v_i2_a1)
Schema for to_tuple unknown.
(id_1,[key_3#v_i1_a3,key_2#v_i1_a2,key_1#v_i1_a1,key_4#v_i1_a4])
(id_2,[key_3#v_i2_a3,key_1#v_i2_a1])
to_map: {id: bytearray,kv: map[]}