Home » Uncategorized » Distributed Transitive Closure Algorithms, Part 4 – Java Implementations

Distributed Transitive Closure Algorithms, Part 4 – Java Implementations

This post contains code implementing the Seminaive and Smart transitive closure algorithms. The algorithms are the same as in the previous post (Part 3), but these implementations are written in Java. These run directly on Hadoop. For these iterative algorithms, running Java code instead of Pig results in greatly improved performance.

Part 3 contains information on using Whirr to get an Amazon EC2 cluster running. The same instructions apply to getting a cluster set up for running the Java implementations.

First up is the implementation of Seminaive transitive closure. I tried to follow closely the psuedocode from Part 2.

import java.io.FileWriter;
import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;;
import java.util.HashMap;
import org.apache.hadoop.fs.FileUtil;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SeminaiveTC extends Configured implements Tool {
    private static final int splitSize = 5242880; // 5MB

    public static class JoinPartitioner extends Partitioner<IntWritable, EdgeWithOrigin> {

      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(IntWritable key, EdgeWithOrigin value, int numReduceTasks) {
        return key.get();
      }

    }

    public static class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
        public CombinedInputFormat() {
            super();
            this.setMaxSplitSize(splitSize);
        }

        @Override
        public RecordReader<LongWritable, Text> createRecordReader (InputSplit split, TaskAttemptContext context) throws IOException {

            CombineFileSplit combineSplit = (CombineFileSplit) split;

            RecordReader rr = new CombineFileRecordReader(combineSplit, context, myCombineFileRecordReader.class);
            return rr;
        }

        protected boolean isSplitable(JobContext context, Path file) {
            return true;
        }

        public static class myCombineFileRecordReader extends RecordReader<LongWritable, Text> {
            private LineRecordReader linerecord;

            private int index;

            public myCombineFileRecordReader(CombineFileSplit split,
                    TaskAttemptContext context, Integer index)
                throws IOException, InterruptedException {

                this.index = index;
                InputSplit is = (InputSplit) split;
            }

            public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
                CombineFileSplit combineSplit = (CombineFileSplit) split;

                linerecord = new LineRecordReader();

                FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());

                linerecord.initialize(fileSplit, context);
            }

            @Override
            public void close() throws IOException {
                if (linerecord != null) {
                    linerecord.close();
                    linerecord = null;
                }
            }

            @Override
            public Text getCurrentValue() {
                return linerecord.getCurrentValue();
            }

            @Override
            public LongWritable getCurrentKey() {
                return linerecord.getCurrentKey();
            }

            @Override
            public float getProgress() throws IOException {
                return linerecord.getProgress();
            }

            @Override
            public boolean nextKeyValue() throws IOException {
                return linerecord.nextKeyValue();
            }

        }
    }

    public static class Edge implements WritableComparable {
        public int x;
		public int y;

		public Edge(int x, int y) {
			this.x = x;
			this.y = y;
		}

		public Edge() {
			this(0, 0);
	    }

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y);
		}

        public int compareTo(Edge other) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
		}

		public boolean equals(Object o) {
			if (!(o instanceof Edge)) {
				return false;
			}
			Edge other = (Edge) o;
			return x == other.x && y == other.y;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

    public static class EdgeWithOrigin implements WritableComparable {
        public int x;
		public int y;
		public boolean left;

		public EdgeWithOrigin(int x, int y, boolean left) {
			this.x = x;
			this.y = y;
			this.left = left;
		}

		public EdgeWithOrigin() {
			this(0,0,false);
		}

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
			out.writeBoolean(left);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
			left = in.readBoolean();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y) + "\t" +
				String.valueOf(left);
		}

        public int compareTo(EdgeWithOrigin other) {
			if (other.left == left) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
			} else {
				if (left) {
					return -1;
				} else {
					return 1;
				}
			}
		}

		public boolean equals(Object o) {
			if (!(o instanceof EdgeWithOrigin)) {
				return false;
			}
			EdgeWithOrigin other = (EdgeWithOrigin) o;
			return x == other.x && y == other.y && left == other.left;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

	public static class JoinMap extends Mapper<Text, Text, IntWritable, EdgeWithOrigin>
	{
		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

			boolean left = false;

			Configuration conf = context.getConfiguration();
            String leftSideLocation = conf.get("leftSide");
            int hashCode = conf.getInt("hashCode", 3);

			// Requires the initial data set be in its own folder
            if ( leftSideLocation.equals( ((FileSplit)context.getInputSplit()).getPath().getParent().toString() ) ) {
                left = true;
            }

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());
			int hashA = a % hashCode;
			int hashB = b % hashCode;

			if (hashA == hashB) {
				context.write(new IntWritable(hashA), new EdgeWithOrigin(a, b, left));
			} else {
				context.write(new IntWritable(hashA), new EdgeWithOrigin(a, b, left));
				context.write(new IntWritable(hashB), new EdgeWithOrigin(a, b, left));
			}
		}
	}

	public static class JoinReducer extends Reducer<IntWritable, EdgeWithOrigin, Edge, BooleanWritable>
	{
	    BooleanWritable trueOut = new BooleanWritable(true);
		public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException
		{
			HashMap<Integer, LinkedList> canReachFromX = new HashMap<Integer, LinkedList>();
			HashMap<Integer, LinkedList> canReachY = new HashMap<Integer, LinkedList>();
			int hashX, hashY;

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

			Iterator valueIt = values.iterator();
			while (valueIt.hasNext()) {
				EdgeWithOrigin e = valueIt.next();

				if (!e.left) {
				    if (canReachFromX.containsKey(e.x)) {
				        canReachFromX.get(e.x).add(e.y);
				    } else {
				        LinkedList xLinkedList = new LinkedList();
				        xLinkedList.add(e.y);
				        canReachFromX.put(e.x, xLinkedList);
				    }
				}

				if (e.left) {
				    if (canReachY.containsKey(e.y)) {
				        canReachY.get(e.y).add(e.x);
				    } else {
				        LinkedList yLinkedList = new LinkedList();
				        yLinkedList.add(e.x);
				        canReachY.put(e.y, yLinkedList);
				    }
			    }

				hashX = e.x % hashCode;
				hashY = e.y % hashCode;

				// If h(x) == i && (x, y) \notin left, output (n, y) for all nodes n that can reach x
				if (!e.left && hashX == key.get()) {
				    if (canReachY.containsKey(e.x)) {
				        for (int n : (LinkedList) canReachY.get(e.x)) {
				            context.write(new Edge(n, e.y), trueOut);
				        }
				    }
			    }

			    if (e.left && hashY == key.get()) {
				    if (canReachFromX.containsKey(e.y)) {
				        for (int n : (LinkedList) canReachFromX.get(e.y)) {
				            context.write(new Edge(e.x, n), trueOut);
				        }
				    }
			    }

			}

		}
	}

	public static class SetDiffMap extends Mapper<LongWritable, Text, Edge, BooleanWritable>
	{
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
		    String line = value.toString();
			String[] nodes = line.split("\t");

			boolean left = false;
            if ( nodes.length == 3 ) {
                left = true;
            }

            int a = Integer.parseInt(nodes[0].toString());
            int b = Integer.parseInt(nodes[1].toString());
            context.write(new Edge(a, b), new BooleanWritable(left));
		}
	}

    public static class SetDiffReducer extends Reducer<Edge, BooleanWritable, Edge, NullWritable>
	{
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
			Iterator valueIt = values.iterator();
			boolean emptyRight = true;
			while (valueIt.hasNext()) {
				boolean inLeftRelation = valueIt.next().get();

				if (!inLeftRelation) {
				    emptyRight = false;
				    break;
				}
	        }

	        if (emptyRight) {
	            context.write(key, NullWritable.get());
	        }
	    }
	}

	public int run(String[] args) throws Exception  {
	    Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
        Path srcPath = new Path(args[1]);
        Path dstPath = new Path("out/E_0/" + srcPath.getName());
        Path dstPath2 = new Path("out/DeltaT_0/" + srcPath.getName());
        hdfs.copyFromLocalFile(srcPath, dstPath);
        hdfs.copyFromLocalFile(srcPath, dstPath2);

	    String E = "out/E_0";
	    String DeltaT = "out/DeltaT_0";
	    String timeLogFile = new String(args[2]);
	    int iteration = 1;
	    Path tempPath, tempPath2;
        boolean success = false;

	    while (iteration < 100) {

	        // Job 1 - join(DeltaT, E)
	        Configuration job1conf = new Configuration();

            job1conf.set("mapred.tasktracker.map.tasks.maximum", "3");
	        job1conf.set("mapred.max.split.size", "" + splitSize);
	        job1conf.set("mapred.reduce.tasks", args[0]);
	        job1conf.set("io.file.buffer.size", "65536");
	        job1conf.set("io.sort.factor", "100");
	        job1conf.set("io.sort.mb", "500");
	        job1conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job1conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

		    tempPath = new Path(DeltaT);
	        job1conf.set("leftSide", tempPath.getFileSystem(getConf()).makeQualified(tempPath).toString());
	        job1conf.setInt("hashCode", Integer.parseInt(args[0]));

		    Job job1 = new Job(job1conf);
		    job1.setJarByClass(SeminaiveTC.class);
		    job1.setJobName("JoinDeltaTE" + iteration);

		    job1.setMapOutputKeyClass(IntWritable.class);
		    job1.setMapOutputValueClass(EdgeWithOrigin.class);
		    job1.setOutputKeyClass(Edge.class);
		    job1.setOutputValueClass(NullWritable.class);
		    job1.setInputFormatClass(KeyValueTextInputFormat.class);

		    job1.setMapperClass(JoinMap.class);
		    job1.setReducerClass(JoinReducer.class);
		    job1.setPartitionerClass(JoinPartitioner.class);

		    FileInputFormat.setInputPaths(job1, new Path(DeltaT));
		    FileInputFormat.addInputPath(job1, new Path(E));
		    FileOutputFormat.setOutputPath(job1, new Path("out/JoinDeltaTE_" + iteration));

	        // Job 2 - set-diff(JoinDeltaTE, T)

	        Configuration job2conf = new Configuration();
            job2conf.set("mapred.tasktracker.map.tasks.maximum", "3");
	        job2conf.set("mapred.max.split.size", "" + splitSize); // More important to change in CombineFileInput above
	        job2conf.set("mapred.reduce.tasks", args[0]);
	        job2conf.set("io.file.buffer.size", "65536");
	        job2conf.set("io.sort.factor", "100");
	        job2conf.set("io.sort.mb", "500");
	        job2conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job2conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

		    Job job2 = new Job(job2conf);
		    job2.setJarByClass(RightLinearTC.class);
		    job2.setJobName("SetDiff" + iteration);

		    job2.setMapOutputKeyClass(Edge.class);
		    job2.setMapOutputValueClass(BooleanWritable.class);
		    job2.setOutputKeyClass(Edge.class);
		    job2.setOutputValueClass(NullWritable.class);

		    job2.setInputFormatClass(CombinedInputFormat.class);

		    job2.setMapperClass(SetDiffMap.class);
		    job2.setReducerClass(SetDiffReducer.class);

		    FileInputFormat.setInputPaths(job2, new Path("out/JoinDeltaTE_" + iteration));
		    FileInputFormat.addInputPath(job2, new Path(E));
		    for (int i = 1; i < iteration; i++) {
		        FileInputFormat.addInputPath(job2, new Path("out/DeltaT_" + i));
		    }
		    FileOutputFormat.setOutputPath(job2, new Path("out/DeltaT_" + iteration));

            FileWriter fileWriter = new FileWriter(timeLogFile, true);
            long time = System.currentTimeMillis();
            long startTime = time;
            long newTime;
            fileWriter.write("iteration" + iteration + "\t");

		    success = job1.waitForCompletion(true);

		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;

            if (success) success = job2.waitForCompletion(true);

		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t" + (newTime - startTime) / 1000 + "\n");

            fileWriter.close();

            if (!success) {
                break;
            }

            Counters counters = job2.getCounters();
            long sizeQ = counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).getValue();

            if (sizeQ < 1) {
                break;
            }

            DeltaT = "out/DeltaT_" + iteration;
            iteration++;
        }
		return success ? 0 : 1;
	}

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new RightLinearTC(), args);
		System.exit(result);
	}

}

However, this was my first experience implementing a non-trivial algorithm in Hadoop and, consequently, my first experience with trying to performance-tune Hadoop. Some parts of the code explicitly deal with configuring parameters such as the split size, number of map reduce tasks, etc. It is probably preferable to account for these configuration settings in a more elegant matter, and it is very likely that my exact parameters are not the optimal settings. They were simply the best I found via a trial-and-error approach.

Also, I should note that the use of a custom partitioner, as defined at the top of the source code, was dramatic in increasing the algorithm’s performance. It helps ensure that, even with a large number of reducers, the output after each iteration of the algorithm is not too fragmented (having fewer big files is generally better, performance-wise, than many very small files).

After compiling the above Java file into a .jar and uploading it to your cluster, you can run the program with the following command: (where <num reducers> = the number of reducer tasks you wish to use)

hadoop jar SeminaiveTC.jar SeminaiveTC  graph.txt time_log.txt

Here is the code for SmartTC, which also attempts to adhere closely to the pseudocode in the previous post.

import java.io.FileWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;;
import java.util.HashMap;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SmartTC extends Configured implements Tool {
    private static final int splitSize = 129554432; // 32MB

	public static class JoinPartitioner extends Partitioner<IntWritable, EdgeWithOrigin> {

      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(IntWritable key, EdgeWithOrigin value, int numReduceTasks) {
        return key.get();
      }
    }

    public static class OneSourceJoinPartitioner extends Partitioner<IntWritable, Edge> {

      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(IntWritable key, Edge value, int numReduceTasks) {
        return key.get();
      }
    }

    public static class Edge implements WritableComparable {
        public int x;
		public int y;

		public Edge(int x, int y) {
			this.x = x;
			this.y = y;
		}

		public Edge() {
			this(0, 0);
	    }

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y);
		}

        public int compareTo(Edge other) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
		}

		public boolean equals(Object o) {
			if (!(o instanceof Edge)) {
				return false;
			}
			Edge other = (Edge) o;
			return x == other.x && y == other.y;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

    public static class EdgeWithOrigin implements WritableComparable {
        public int x;
		public int y;
		public boolean left;

		public EdgeWithOrigin(int x, int y, boolean left) {
			this.x = x;
			this.y = y;
			this.left = left;
		}

		public EdgeWithOrigin() {
			this(0,0,false);
		}

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
			out.writeBoolean(left);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
			left = in.readBoolean();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y) + "\t" +
				String.valueOf(left);
		}

        public int compareTo(EdgeWithOrigin other) {
			if (other.left == left) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
			} else {
				if (left) {
					return -1;
				} else {
					return 1;
				}
			}
		}

		public boolean equals(Object o) {
			if (!(o instanceof EdgeWithOrigin)) {
				return false;
			}
			EdgeWithOrigin other = (EdgeWithOrigin) o;
			return x == other.x && y == other.y && left == other.left;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

	public static class JoinMap extends Mapper<Text, Text, IntWritable, EdgeWithOrigin>
	{
		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

			boolean left = false;

			Configuration conf = context.getConfiguration();
            String leftSideLocation = conf.get("leftSide");
            int hashCode = conf.getInt("hashCode", 3);

			// Requires the initial data set be in its own folder
            if ( leftSideLocation.equals( ((FileSplit)context.getInputSplit()).getPath().getParent().toString() ) ) {
                left = true;
            }

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());
			int hashA = a % hashCode;
			int hashB = b % hashCode;

			if (left) {
			    context.write(new IntWritable(hashB), new EdgeWithOrigin(a, b, left));
			} else {
			    context.write(new IntWritable(hashA), new EdgeWithOrigin(a, b, left));
			}

		}
	}

	public static class JoinReducer extends Reducer<IntWritable, EdgeWithOrigin, Edge, NullWritable>
	{
		public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException
		{
		    // reachableFrom is keyed on x and stores a list containing all nodes x can reach (in right side relation of join)
		    // canReach is keyed on y and stores a list of all nodes that can reach y (in left side relation of join)
			HashMap<Integer, LinkedList> canReach = new HashMap<Integer, LinkedList>();
			HashMap<Integer, LinkedList> reachableFrom = new HashMap<Integer, LinkedList>();
			int hashX, hashY;

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

			Iterator valueIt = values.iterator();
			while (valueIt.hasNext()) {
				EdgeWithOrigin e = valueIt.next();

				if (!e.left) { // implies that h(x) == key
				    if (reachableFrom.containsKey(e.x)) {
				        reachableFrom.get(e.x).add(e.y);
				    } else {
				        LinkedList xLinkedList = new LinkedList();
				        xLinkedList.add(e.y);
				        reachableFrom.put(e.x, xLinkedList);
				    }

				    // Search left-side relation store and see if it contains (*, x)
				    // If so, emit (*, y) for all nodes in left-side relation that can reach x
				    if (canReach.containsKey(e.x)) {
				        for (int n : (LinkedList) canReach.get(e.x)) {
				            context.write(new Edge(n, e.y), NullWritable.get());
				        }
				    }
				} else { // implies that h(y) == key
				    if (canReach.containsKey(e.y)) {
				        canReach.get(e.y).add(e.x);
				    } else {
				        LinkedList yLinkedList = new LinkedList();
				        yLinkedList.add(e.x);
				        canReach.put(e.y, yLinkedList);
				    }

				    // Search right-side relation store and see if it contains (y, *)
				    // If so, emit (x, *) for all nodes in right-side relation reachable from y
				    if (reachableFrom.containsKey(e.y)) {
				        for (int n : (LinkedList) reachableFrom.get(e.y)) {
				            context.write(new Edge(e.x, n), NullWritable.get());
				        }
				    }
			    }

			}

		}
	}

	public static class SetDiffMap extends Mapper<LongWritable, Text, Edge, BooleanWritable>
	{
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
		    String line = value.toString();
			String[] nodes = line.split("\t");

			boolean left = false;

            if ( nodes.length == 3 ) {
                left = true;
            }

            int a = Integer.parseInt(nodes[0].toString());
            int b = Integer.parseInt(nodes[1].toString());
            context.write(new Edge(a, b), new BooleanWritable(left));
		}
	}

    public static class SetDiffReducer extends Reducer<Edge, BooleanWritable, Edge, NullWritable>
	{
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
			Iterator valueIt = values.iterator();
			boolean emptyRight = true;
			while (valueIt.hasNext()) {
				boolean inLeftRelation = valueIt.next().get();

				if (!inLeftRelation) {
				    emptyRight = false;
				    break;
				}
	        }

	        if (emptyRight) {
	            context.write(key, NullWritable.get());
	        }
	    }
	}

	public static class DupElimMap extends Mapper<Text, Text, Edge, NullWritable>
	{
		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());

			context.write(new Edge(a, b), NullWritable.get());
		}
	}

	public static class DupElimReducer extends Reducer<Edge, NullWritable, Edge, NullWritable>
	{
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
		    context.write(key, NullWritable.get());
		}
	}

	public static class DupElimReducer2 extends Reducer<Edge, NullWritable, Edge, BooleanWritable>
	{
	    BooleanWritable trueOut = new BooleanWritable(true);
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
		    context.write(key, trueOut);
		}
	}

    public static class OneSourceJoinMap extends Mapper<Text, Text, IntWritable, Edge>
	{

		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());
			int hashA = a % hashCode;
			int hashB = b % hashCode;

			if (hashA == hashB) {
				context.write(new IntWritable(hashA), new Edge(a, b));
			} else {
				context.write(new IntWritable(hashA), new Edge(a, b));
				context.write(new IntWritable(hashB), new Edge(a, b));
			}
		}
	}

	public static class OneSourceJoinReducer extends Reducer<IntWritable, Edge, Edge, BooleanWritable>
	{
	    BooleanWritable trueOut = new BooleanWritable(true);
		public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException
		{
			HashMap<Integer, LinkedList> canReachFromX = new HashMap<Integer, LinkedList>();
			HashMap<Integer, LinkedList> canReachY = new HashMap<Integer, LinkedList>();
			int hashX, hashY;

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

			Iterator valueIt = values.iterator();
			while (valueIt.hasNext()) {
				Edge e = valueIt.next();

				// Corrects derivation count for cliques (where joining (0, 0) and (0, 0) produced *two* (0,0) results)
				// But might break proper derivation count when joining (0,0) and (0, *).
				if (e.x == e.y) {
				    context.write(e, trueOut);
				    continue;
				}

				if (canReachFromX.containsKey(e.x)) {
				    canReachFromX.get(e.x).add(e.y);
				} else {
				    LinkedList xLinkedList = new LinkedList();
				    xLinkedList.add(e.y);
				    canReachFromX.put(e.x, xLinkedList);
				}

				if (canReachY.containsKey(e.y)) {
				    canReachY.get(e.y).add(e.x);
				} else {
				    LinkedList yLinkedList = new LinkedList();
				    yLinkedList.add(e.x);
				    canReachY.put(e.y, yLinkedList);
				}

				hashX = e.x % hashCode;
				hashY = e.y % hashCode;

				// If h(x) == i, output (n, y) for all nodes n that can reach x
				if (hashX == key.get()) {
				    if (canReachY.containsKey(e.x)) {
				        for (int n : (LinkedList) canReachY.get(e.x)) {
				            context.write(new Edge(n, e.y), trueOut);
				        }
				    }
			    }

			    if (hashY == key.get()) {
				    if (canReachFromX.containsKey(e.y)) {
				        for (int n : (LinkedList) canReachFromX.get(e.y)) {
				            context.write(new Edge(e.x, n), trueOut);
				        }
				    }
			    }

			}

		}
	}

	public int run(String[] args) throws Exception  {
		Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
        Path QPath = new Path(args[1]);
        Path dstPath = new Path("out/Q_0/" + QPath.getName());
        hdfs.copyFromLocalFile(QPath, dstPath);
        Path dstPath2 = new Path("out/P_0/");
        hdfs.mkdirs(dstPath2);
	    String Q = "out/Q_0/";
	    String P = "out/P_0/";
	    int iteration = 1;
	    Path tempPath, tempPath2;
	    String timeLogFile = new String(args[2]);
        boolean success = false;

	    while (iteration < 100) {
	        // Job 1 - join(Q, P)

	        Configuration job1conf = new Configuration();
	        job1conf.set("mapred.max.split.size", "" + splitSize);
	        job1conf.set("mapred.reduce.tasks", args[0]);
	        job1conf.set("io.file.buffer.size", "65536");
	        job1conf.set("io.sort.factor", "100");
	        job1conf.set("io.sort.mb", "500");
	        job1conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job1conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB
	        job1conf.setInt("hashCode", Integer.parseInt(args[0]));

		    tempPath = new Path(Q);
	        job1conf.set("leftSide", tempPath.getFileSystem(getConf()).makeQualified(tempPath).toString());

	        Job job1 = new Job(job1conf);

		    job1.setJarByClass(SmartTC.class);
		    job1.setJobName("JoinQP" + iteration);

		    job1.setMapOutputKeyClass(IntWritable.class);
		    job1.setMapOutputValueClass(EdgeWithOrigin.class);
		    job1.setOutputKeyClass(Edge.class);
		    job1.setOutputValueClass(NullWritable.class);
		    job1.setInputFormatClass(KeyValueTextInputFormat.class);

		    job1.setMapperClass(JoinMap.class);
		    job1.setReducerClass(JoinReducer.class);
		    job1.setPartitionerClass(JoinPartitioner.class);

		    FileInputFormat.setInputPaths(job1, new Path(Q));
		    FileInputFormat.addInputPath(job1, new Path(P));
		    FileOutputFormat.setOutputPath(job1, new Path("out/DeltaP_" + iteration));

		    // Job 2 - dup-elim(union(Q, P, DeltaP))
			Configuration job2conf = new Configuration();
	        job2conf.set("mapred.max.split.size", "" + splitSize);
	        job2conf.set("mapred.reduce.tasks", args[0]);
	        job2conf.set("io.file.buffer.size", "65536");
	        job2conf.set("io.sort.factor", "100");
	        job2conf.set("io.sort.mb", "500");
	        job2conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job2conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

	        Job job2 = new Job(job2conf);
		    job2.setJarByClass(SmartTC.class);
		    job2.setJobName("DupElimQPDeltaP" + iteration);

		    job2.setMapOutputKeyClass(Edge.class);
		    job2.setMapOutputValueClass(NullWritable.class);
		    job2.setOutputKeyClass(Edge.class);
		    job2.setOutputValueClass(NullWritable.class);
		    job2.setInputFormatClass(KeyValueTextInputFormat.class);

		    job2.setMapperClass(DupElimMap.class);
		    job2.setReducerClass(DupElimReducer.class);

		    FileInputFormat.setInputPaths(job2, new Path(Q));
		    FileInputFormat.addInputPath(job2, new Path(P));
		    FileInputFormat.addInputPath(job2, new Path("out/DeltaP_" + iteration));
		    FileOutputFormat.setOutputPath(job2, new Path("out/P_" + iteration));

		    // Job 3 - join(Q,Q)

		    Configuration job3conf = new Configuration();
	        job3conf.set("mapred.max.split.size", "" + splitSize);
	        job3conf.set("mapred.reduce.tasks", args[0]);
	        job3conf.set("io.file.buffer.size", "65536");
	        job3conf.set("io.sort.factor", "100");
	        job3conf.set("io.sort.mb", "500");
	        job3conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job3conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB
	        job3conf.setInt("hashCode", Integer.parseInt(args[0]));

	        Job job3 = new Job(job3conf);

		    job3.setJarByClass(SmartTC.class);
		    job3.setJobName("JoinQQ" + iteration);

		    job3.setMapOutputKeyClass(IntWritable.class);
		    job3.setMapOutputValueClass(Edge.class);
		    job3.setOutputKeyClass(Edge.class);
		    job3.setOutputValueClass(NullWritable.class);
		    job3.setInputFormatClass(KeyValueTextInputFormat.class);

		    job3.setMapperClass(OneSourceJoinMap.class);
		    job3.setReducerClass(OneSourceJoinReducer.class);
		    job3.setPartitionerClass(OneSourceJoinPartitioner.class);

		    FileInputFormat.setInputPaths(job3, new Path(Q));
		    FileOutputFormat.setOutputPath(job3, new Path("out/JoinQQ_" + iteration));

	        // Job 5 - set-diff(QQ, P)
		    Configuration job5conf = new Configuration();
	        job5conf.set("mapred.max.split.size", "" + splitSize);
	        job5conf.set("mapred.reduce.tasks", args[0]);
	        job5conf.set("io.file.buffer.size", "65536");
	        job5conf.set("io.sort.factor", "100");
	        job5conf.set("io.sort.mb", "500");
	        job5conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job5conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

	        Job job5 = new Job(job5conf);

		    job5.setJarByClass(SmartTC.class);
		    job5.setJobName("SetDiffQP" + iteration);

		    job5.setMapOutputKeyClass(Edge.class);
		    job5.setMapOutputValueClass(BooleanWritable.class);
		    job5.setOutputKeyClass(Edge.class);
		    job5.setOutputValueClass(NullWritable.class);

		    job5.setMapperClass(SetDiffMap.class);
		    job5.setReducerClass(SetDiffReducer.class);

		    FileInputFormat.setInputPaths(job5, new Path("out/JoinQQ_" + iteration));
		    FileInputFormat.addInputPath(job5, new Path("out/P_" + iteration));
		    FileOutputFormat.setOutputPath(job5, new Path("out/Q_" + iteration));

		    FileWriter fileWriter = new FileWriter(timeLogFile, true);
            long time = System.currentTimeMillis();
            long startTime = time;
            long newTime;
            fileWriter.write("iteration" + iteration + "\t");

		    success = job1.waitForCompletion(true);
            newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;
            if (success) success = job2.waitForCompletion(true);
		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;
            if (success) success = job3.waitForCompletion(true);
		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;

            if (success) success = job5.waitForCompletion(true);

		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t" + (newTime - startTime) / 1000 + "\n");

            fileWriter.close();

            if (!success) {
                break;
            }

            Counters counters = job5.getCounters();
            long sizeQ = counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).getValue();

            if (sizeQ < 1) {
                break;
            }

            Q = "out/Q_" + iteration;
            P = "out/P_" + iteration;
            iteration++;

        }
		return success ? 0 : 1;
	}

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new SmartTC(), args);
		System.exit(result);
	}
}

After compiling and uploading the .jar to your cluster, the Smart TC program can be run with the following command:

hadoop jar SmartTC.jar SmartTC  graph.txt time_log.txt
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: