Home » Uncategorized » Distributed Transitive Closure Algorithms, Part 3 – Up and Running with Pig

Distributed Transitive Closure Algorithms, Part 3 – Up and Running with Pig

The programs below can be run locally on a single computer, but Pig and Hadoop are designed to run on large clusters. I used Amazon’s EC2 computers for my tests. This post describes how to get a cluster on EC2 up and running with the Pig transitive closure algorithms (using Hadoop with Java will be the subject of the next post).

Amazon’s EC2 service allows you to rent from one to several dozen computers and pay only for the time you use. At the time of writing, they also offer up to 750 hours of time on their “Micro” instances with no charge. The micro instances are nice to get started and experiment with, but they are too underpowered for most real usage.

Launching a Cluster

The Apache Whirr project offers a quick and easy way to launch clusters on Amazon EC2 or other cloud computing providers. Whirr operates using recipes, configuration files that allow you to specify the properties of your cluster and, in the case of Hadoop, Pig, and many other software packages, what software you would like installed on your cluster when it is launched. I didn’t spend any time at all dealing with launching clusters directly on EC2, without using Whirr, so I can’t directly compare the experience. However, Whirr definitely made it easy to get EC2 clusters up and running. For a quick introduction to Whirr for EC2, this blog post is a good start, and will help you configure and use the following Whirr recipe.

Here is the Whirr recipe that I used:



whirr.instance-templates=1 hadoop-namenode+hadoop-jobtracker+pig-client,
    3 hadoop-datanode+hadoop-tasktracker











This recipe launches a cluster of 4 of Amazon’s XL Cluster Compute instances, which are optimized for high-speed communication between the different machines. I found this to be helpful, in terms of speed of execution, for the distributed transitive closure algorithms, as there is lots of data being transferred across the network during the computation. Note, however, that these instances are not free, and are quite a bit more expensive per machine than the commodity M1 or M2 instances.

Once the cluster is started, the final output from Whirr will be a list of machines in the new cluster, their IP addresses, and the ssh command to log in. There should be a line such as the following:

[hadoop-namenode+hadoop-jobtracker+pig-client]: ssh -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no eric@

This tells you the IP address of the NameNode and JobTracker, which is the main machine in the cluster responsible for assigning tasks to the other machines. Additionally, this instance has been configured with Pig. This is the machine you will want to ssh into to run the distributed transitive closure algorithms.

Pig Implementation

Pig is a very nice scripting language built on top of Hadoop. Initially, I used Pig to implement the transitive closure algorithms.

Here is the Pig implementation of seminaive transitive closure. It is written using the Python API, which allows you to do embed Pig inside of a Python script. Pig, by itself, has no capability for performing iterative or recursive computations. In the code below, Python is used to control and monitor the iterative computation. I had no experience with Pig before embarking on this project, and I was fortunate enough to find this prior work on Transitive Closure in Pig. The code below was originally based off of Julien’s code, and I’m thankful to that blog post for getting me started quickly with iterative Pig computations. It’s entirely possible that I introduced mistakes or inefficiencies in the following code, but at the least, hopefully it is helpful for anyone else planning to implement seminaive or smart transitive closure.


import time
import sys
from org.apache.pig.scripting import *

def main():
    Pig.fs("rmr out/tc")
    Q = Pig.compile("""
        SET default_parallel 4; -- set to fully utilize cluster
        SET job.name '$iteration_name';
        SET mapred.min.split.size  3146000; -- = 3MB
        SET mapred.max.split.size  3146000;
        SET pig.maxCombinedSplitSize 3146000; -- Most important, not sure if other two are needed

        E = LOAD '$Edges' USING PigStorage(',') AS (x:int, y:int);
        Delta = LOAD '$Delta' USING PigStorage(',') AS (x:int, y:int);
        T = LOAD '$T' USING PigStorage(',') AS (x:int, y:int);

        DeltaPrimeBigSchema = JOIN Delta BY y, E by x USING 'replicated';
        DeltaPrime = FOREACH DeltaPrimeBigSchema GENERATE Delta::x as x, E::y as y;
        DeltaCogroup = COGROUP DeltaPrime BY (x, y), T by (x, y);
        DeltaFiltered = FILTER DeltaCogroup BY IsEmpty(T);
        Delta = FOREACH DeltaFiltered GENERATE group.x, group.y;

        T = UNION T, Delta;
        STORE Delta INTO '$newDelta' USING PigStorage(',');
        STORE T INTO '$newT' USING PigStorage(',');
    Edges = sys.argv[1]
    Delta = Edges
    T = Edges
    start = time.time()
    for i in range(100):
        iteration_name = "tc" + str(i + 1)
        newDelta = "out/tc/delta_" + str(i + 1)
        newT = "out/tc/T_" + str(i + 1)
        #countQDerivations = "out/tc/derivations_q_" + str(i + 1)
        #countPDerivations = "out/tc/derivations_p_" + str(i + 1)
        job = Q.bind().runSingle()
        if not job.isSuccessful():
            raise 'failed'
        Delta = newDelta
        T = newT
        f = open(sys.argv[2], 'a')
        f.write(iteration_name + "," + str(time.time() - start) + "\n")
        start = time.time()

        # detect if we are done
        if not job.result("Delta").iterator().hasNext():

# To run: pig seminaivetc.py <path to edge list>
if __name__ == '__main__':

Here is the Pig code for Smart:


import sys
import time
from org.apache.pig.scripting import *

def main():
    # cleanup output directory before starting
    Pig.fs("rmr out/tc")
    PigJob = Pig.compile("""
        SET default_parallel 4;  -- set to fully utilize cluster
        SET job.name '$iteration_name';
        SET mapred.min.split.size  8388608; -- = 8MB
        SET mapred.max.split.size  8388608;
        SET pig.maxCombinedSplitSize 8388608; -- Most important, not sure if other two are needed
        Q = LOAD '$Q' USING PigStorage(',')  AS (x:int, y:int); 
        Q2 = LOAD '$Q' USING PigStorage(',')  AS (x:int, y:int); -- self joins not supported 
        P = LOAD '$P' USING PigStorage(',')  AS (x:int, y:int);
        DeltaPTemp = JOIN P BY x, Q BY y;
        DeltaP = FOREACH DeltaPTemp GENERATE Q::x AS x, P::y AS y;
        PDup = UNION DeltaP, P, Q;
        P = DISTINCT PDup;
        QJoinQ = JOIN Q by y, Q2 by x;
        QQ = FOREACH QJoinQ GENERATE Q::x AS x, Q2::y AS y;
        QQCogroup = COGROUP QQ BY (x, y), P BY (x, y);
        QQFiltered = FILTER QQCogroup BY IsEmpty(P);
        Q = FOREACH QQFiltered GENERATE group.x, group.y;
        STORE P INTO '$newP' USING PigStorage(',');
        STORE Q INTO '$newQ' USING PigStorage(',');
    Q = sys.argv[1]
    P = sys.argv[2]
    start = time.time()
    for i in range(10):
        iteration_name = "smarttc" + str(i + 1)
        newQ = "out/tc/q_" + str(i + 1)
        newP = "out/tc/p_" + str(i + 1)
        #countQDerivations = "out/tc/derivations_q_" + str(i + 1)
        #countPDerivations = "out/tc/derivations_p_" + str(i + 1)
        job = PigJob.bind().runSingle()
        if not job.isSuccessful():
            raise 'failed'
        Q= newQ
        P = newP
        f = open(sys.argv[3], 'a')
        f.write(iteration_name + "," + str(time.time() - start) + "\n")
        start = time.time()
        # detect if we are done
        if not job.result("Q").iterator().hasNext():

# To run: pig smarttc.py <path to q> <path to empty file>
# output goes in out/tc/p_<last>/part-m-* (can be in multiple files)
if __name__ == '__main__':

Transferring the Code and Data to EC2

In order to run the Pig code, you will have to transfer it to the EC2 pig-client instance started by Whirr. The following commands will use SCP to transfer the files to your cluster (modify the commands to match the output of the Whirr launch-cluster command):

scp -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no ~/pig/seminaivetc.py eric@
scp -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no ~/pig/smarttc.py eric@

Additionally, you will need to place some sample graph data on the cluster. To transfer file to your cluster’s Hadoop file system, issue the following commands to configure your local Hadoop to connect to the cluster’s file system and copy over the data files (If you don’t have Hadoop installed locally, you can use SCP to upload the data files and then transfer them to the cluster’s Hadoop file system once you’ve ssh’ed into your cluster).

~/.whirr/tc-4cc4xl/hadoop-proxy.sh &
export HADOOP_CONF_DIR=~/.whirr/tc-4cc4xl
hadoop fs -put ~/data/graph.txt /tmp/graph.txt
hadoop fs -put ~/data/empty.txt /tmp/empty.txt

The above assumes that graph.txt contains a tab-separated directed edge list, where the nodes are integers. For example, here is an edge list representing the fused-together binary trees of the Afrati et al. paper:

1	2
1	3
2	4
2	5
3	6
3	7
4	8
4	9
5	10
5	11
6	12
6	13
7	14
7	15
8	16
8	17
9	18
9	19
10	20
10	21
11	22
11	23
12	24
12	25
13	26
13	27
14	28
14	29
15	30
15	31
16	32
17	32
18	33
19	33
20	34
21	34
22	35
23	35
24	36
25	36
26	37
27	37
28	38
29	38
30	39
31	39
32	40
33	40
34	41
35	41
36	42
37	42
38	43
39	43
40	44
41	44
42	45
43	45
44	46
45	46

And for, ahem, legacy reasons, the Smart implementation requires an empty text file (representing the initially empty P relation).

Running Seminaive

Using the specific address and username output by the Whirr launch-cluster command, you can ssh into your cluster:

ssh -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no eric@

Once in, you can run the Seminaive algorithm on the graph in graph.txt as follows: (note, this command requires /tmp/graph.txt to be a location in the cluster’s Hadoop file system, not the local file system)

pig seminaivetc.py /tmp/graph.txt time_log.txt

The last argument is simply a file name for python to write down the time required for each iteration of the algorithm. Once Pig has finished running, the full transitive closure of the graph can be found in the Hadoop file system at out/tc. You can copy it to the cluster’s local file system for viewing with the following command:

hadoop fs -copyToLocal out/tc ./

Running Smart

Running Smart is almost identical to running Seminaive. The output is placed into the same directory in the Hadoop file system.

pig smarttc.py /tmp/graph.txt /tmp/empty.txt time_log.txt

1 Comment

  1. […] Distributed Transitive Closure Algorithms, Part 3 – Up and Running with Pig […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: