Start here

The Hardness of Probabilistic Queries

Since I started at UW last quarter, I have been spending a lot of time working on hardness results for queries over probabilistic databases. A (tuple-independent) probabilistic database is a normal relational database except that every tuple is annotated with a number between 0 and 1. This number gives the probability that the tuple is true in a randomly chosen world. Each tuple is assumed to be independent of all other tuples. The probability that a Boolean query (ie, a query which returns true or false) holds on a probabilistic database is given by the sum of the probabilities of all possible worlds in which that query holds.

The normal measure of complexity used in database theory is data complexity – for a given query, how does the running time of the query change with the size of the database. The normal operators of the relational calculus have polynomial data complexity. When considering only data complexity, any query, no matter how complex, runs in polynomial time in the size of the database.

The status for queries over probabilistic databases is not so nice. The results returned by a probabilistic query must also include a probability. In the case of a simple Boolean query such as:

{Q = \exists x.R(x)}

We want the output to be a number between 0 and 1 indicating the probability that Q is true. We use the fact that P(Q) = 1 - P(\lnot Q), and \lnot Q = \forall x.\lnot R(x). This shows that P(\lnot Q) is the probability that all the tuples in R are false. We can compute this easily, using the tuple-indepence condition:

{P(\lnot R(x_1) \land \lnot R(x_2) \land \cdots \land \lnot R(x_n))}

{= P(\lnot R(x_1)) \cdots (P(\lnot R(x_n))}

So Q can be answered in polynomial time.

Consider this query:

{h_0 = \exists x \exists y.R(x) \land S(x,y) \land T(y)}

We still have tuple-level independence, but now that we have added joins, we introduced correlations between possible elements of the answer set. There is no obvious way to efficiently calculate the probability that h_0 is true. In fact, a large amount of work has gone into showing that no such algorithm is ever likely to be found, for h_0 and a broad class of other positive, existentially quantified first-order queries. The culmination of this, for tuple-independent probabilistic databases and positive queries, can be found in the paper The dichotomy of probabilistic inference for unions of conjunctive queries (2012). It establishes that, for any query without universal quantifiers or negation, evaluating that query over a probabilistic database is either in PTIME or #P-hard in the size of the database.

A Brief Introduction to #P

#P is the complexity class containing counting problems associated with problems in NP. An example of a #P problem is counting the number of satisfying assignments of the variables in an instance of 3SAT. A #P-hard problem is a problem such that a polynomial time solution would allow you to compute, in polynomial time, the solution for any problem in #P.  #P-hard problems are at least as hard as NP-hard problems. A #P-complete problem is a problem that is #P-hard and is itself in #P. The canonical #P-complete problem is #SAT, counting the number of satisfying solutions to an arbitrary Boolean formula.

One of the interesting things about the #P class is that there are #P-complete problems where the associated decision problem is in P. Or in other words, determining whether a solution exists can be done in polynomial time, but counting the number of solutions is #P-hard.

In a 1983 paper, The complexity of counting cuts and of computing the probability that a graph is connected, Provan and Ball prove that #PP2CNF is also #P-complete, where the problem is defined as follows: Let Boolean variables X_1, \dots, X_n and Y_1, \dots, Y_n, let E be the edges of a bipartite graph connecting X variables to Y variables. The positive, partitioned 2-CNF (PP2CNF) formula is:

{\Phi = \land_{(i,j)\in E} (X_i \lor Y_j)}

#PP2CNF is the number of satisfying assignments of \Phi. An immediate consequence is that the dual problem, #PP2DNF, is also  #P-hard:

{\Phi = \lor_{(i,j)\in E} (X_i \land Y_j)}

Proving h_0 is #P-hard

To show that h_0 is #P-hard, we reduce from #PP2DNF. If we can show that any #PP2DNF instance can be mapped to an instance of h_0, then we have shown that h_0 is at least as hard as #PP2DNF. What follows is directly from the paper by Dalvi and Suciu (The dichotomy of probabilistic inference for unions of conjunctive queries).

Recall h_0‘s definition:

{h_0 = \exists x \exists y.R(x) \land S(x,y) \land T(y)}

Given a PP2DNF instance \Phi with E \subseteq [n_1] \times [n_2], define a database with three tables: R(x), S(x,y), and T(y).

Set R = [n_1], S = E, T = [n_2]. Define the probabilities as follows:

{P(R(i)) = \frac{1}{2}}

{P(S(i, j)) = 1}

{P(T(j)) = \frac{1}{2}}

Then every possible world has probability 1 / 2^{n_1 + n_2}. On each possible world, h_0 is true if and only if there exists an (i, j) pair in E such that R(i) and T(j) are true. This is the same condition necessary for the PP2DNF instance \Phi to be satisfied. We have:

{P(h_0) = 1 / 2^{n_1 + n_2} \, \# \Phi }

If we can find P(h_0) in polynomial time, we can also find \# \Phi. This completes the hardness proof for h_0.

Wrapping Up

Although this is a fairly straightforward proof that h_0 is #P-hard, the full dichotomy result for unions of positive existential queries into PTIME or #P-hard require an extensive amount of machinery. While the discussion in this post barely scratches the surface, it hopefully has illustrated that evaluating queries in probabilistic databases is a challenging problem and relates directly to an interesting area of complexity theory.

Entity Classification and Relation Extraction: Joint Probabilistic Models

The goal of relation extraction is to automatically build structured, queryable representations of unstructured text. The sheer volume of available data prohibits manual extraction, yet the ambiguity of natural language makes automated extraction difficult. I construct a joint probabilistic model of the components necessary for relation extraction – entity classification and relation extraction – that allows for a rich representation of the relation extraction process. This joint model helps alleviate the problem of error propagation common in pipelined models, where the output of one component is fed as a (potentially incorrect) input to the next component.

1 Introduction

Even just considering the scientific domain, the speed at which scientists output new knowledge in the form of unstructured text far outpaces the ability of any one individual to read and process. This fundamental inability of individuals to keep up with the amount of textual information available has led to attempts to develop tools to automate part of the process. One such tool is known as relation extraction, and attempts to automatically identify relationships expressed in text.

In this post I show that joint modeling of entity classification and relation extraction is able to improve performance in both tasks over individual approaches. Joint inference allows confident predictions from one classifier to correct errors in the output of another classifier. Beyond use of the joint model for entity classification and relation extraction on the corpus used in Roth and Yih [2007], I also applied the same techniques to the recently released Google relation extraction corpus. While the granularity of labels with the Google data is more suited to a distant supervision setting, I was able to extract a subset amenable to the approach of this post. I also compare exact and approximate inference in joint models and find that, for the models I construct, there is little difference in the final predictions relying on approximate or exact inference.

2 Related Work

GuoDong et al. [2005] and Zhao and Grishman [2005] are representative of early, supervised approaches to the relation extraction problem. GuoDong et al. [2005] use lexical, syntactic, and semantic features to attempt to predict relationships. Zhao and Grishman [2005] use string kernel methods to perform the same supervised classification task. Jiang and Zhai [2007] present a comprehensive overview of feature selection for relation extraction.

Recently, distant supervision (Mintz et al. [2009]) techniques have become the focus of much relation extraction research. Distant supervision begins with a database of known facts then mines large bodies of text to find sentences that contain two entities participating in a relation. These sentences are considered positive examples for training. This method incorporates large amounts of potentially incorrect examples, but compensates for this noise by building training sets far larger than is possible through human annotation.

Both direct and distant supervision methods typically rely on a “black box” named entity recognizer to examine each sentence and extract the entity mentions. This pipelined approach can introduce errors into the relation extraction by a failure in the named entity recognizer to extract the correct entity mention. GuoDong et al. [2005] acknowledge this problem in the pipelined approach and attempt to compensate for it by including a wide variety of “shallow” and “deep” linguistic features, with the expectation that errors in one set of features can be compensated for by additional features.

Roth and Yih [2002] present an early approach to modeling relation extraction as the joint inference problem of recognizing entities and relations simultaneously. Roth and Yih [2007] refine this work and presents additional results and new evaluation metrics.

These, and other works, introduce a joint model of the components of relation extraction to overcome propagation of errors from the pipelined approach. In many cases, however, joint models have been reported to perform no better, or even worse, than the “simpler” pipelined approach. See Riedel and McCallum [2011] for a discussion of this issue.

My work further explores the questions raised in Roth and Yih [2007] and Kate and Mooney [2010], where joint inference is applied to the supervised entity classification and relation extraction setting.

3 Problem Statement

Given an input sentence X with distinguished entity mentions E1,E2,…,E2, the goal is to predict the following:

  • The types of each entity Ei
  • The type of each possible relation between ordered pairs of entities (Ei,Ej),i≠j.

With this definition, I am restricting my attention to relationships expressed entirely within a single sentence. Relationships that span multiple sentences or documents are a more challenging problem beyond the scope of my current model.

Each corpus, in addition to sentences requiring the above classification of entities and relations, also has an associated set of constraints imposed between relation and entity types to ensure coherent predictions.

For example, the data set from Roth and Yih [2007] has the constraint (Live In, Person, Location), expressing that only a person and location can participate in the lives in relation.

4 Tested Approaches

4.1 Relation Classification Only

My initial approach was to use a popular natural language processing toolkit from Stanford to process each input sentence and perform the entity classification as a pre-processing step. I then trained a variety of classifiers from the scikit-learn library to perform the relation classification between entities, treating the entity type predictions of the Stanford NLP toolkit as a ground truth.

This is a common approach in relation extraction, and representative of approaches following the so-called “pipeline” model, where distinct components are executed in sequence on the input data and the final prediction is made by the last classifier, using all previous models’ output as input features. This approach can be very successful, but it had obvious limitations for my intended work:

  • This approach treats entity classification as a “black box”, with no associated probabilities output for every class
  • Furthermore, it abstracts away a large part of the difficulty of performing relation extraction by assuming entity classification is a solved problem
  • Relies on input sentences from a domain for which the Stanford named entity classification is already trained, which does hold in a general setting

For these reasons, I moved beyond using the Stanford named entity classification toolkit and developed my own models to handle the entity classification task.

4.2 Separate Entity and Relation Classification

My baseline implementation used two models to independently classify entities and relations.

The entity classifier accepts a set of features from the words in and around an entity mention, along with features such as the capitalization pattern of words. The word-based features include unigrams, bigrams, and trigrams of the words themselves, the parts-of-speech tags, and word/part-of-speech pairs. These are standard features from the literature (Roth and Yih [2007], Kate and Mooney [2010]).

The relation classifier uses a set of similar features from the two entity mentions and similar features drawn from the words between them.

I used scikit-learn’s implementations to build the entity and relation classifiers, with my own code written to process the data, extract features, perform k-fold cross-validation and report statistics. I tested several classifiers:

  • Naive Bayes – while a simple naive Bayes model performed reasonably well in terms of precision and recall, the probability estimates derived from a multiclass naive Bayes classifier are essentially worthless. Almost all of the probability mass returned for a given input is centered on one class. In other words, the classifier is usually far too confident in its predictions. This precludes using its output for a joint inference model, and I did not consider this model further.
  • Multiclass Logistic Regression – This performed well, with a short training and testing time on my datasets, and returns a reasonable approximation of the probability distribution over the output classes. This is the model I primarily used and for which I report results below.
  • Linear SVM – This also performed well in terms of recall and precision, and returned probability estimates that worked well for the subsequent joint inference model. However, scitkit-learn implements Platt scaling to return probability estimates for SVMs. This involves fitting a logistic regression model to the SVMs output scores. This is a high-overhead operation and significantly increased train and test times. The study by Niculescu-Mizil and Caruana [2005] indicates that, with proper tuning, the probabilities output by an SVM using Platt scaling will be superior to those of logistic regression, I viewed quantifying this over my datasets as beyond the scope of my project and relied on the (possibly slightly inferior) probability estimates of logistic regression for reasons of speed.
4.2.1 Alternate Feature Set

I also trained the separate models using an augmented feature set, known in the literature as an Omniscient classifier. This feature set uses the true labels of the entities as input features for the relation classifier, and the true labels of the relations as input features for the entity classifier. This is an entirely unrealistic model, as it presupposes the true labels are always known, but it serves as an upper bound on the performance possible by jointly considering entity and relation classification. Omniscient features significantly increase precision, recall, and F1 scores, as reported in the results. Interestingly, even with the true labels available at train and test time, the joint inference models are even able to enhance the performance of the Omniscient classifiers.

4.3 Bayesian Network for Joint Inference – Approach #1

Figure 1: The Bayesian network built according to approach #1, corresponding to a sentence with two entities.

Figure 1: The Bayesian network built according to approach #1, corresponding to a sentence with two entities.

Roth and Yih [2002] present, to the best of my knowledge, one of the earliest attempts at performing relation extraction by building a joint model over entity and relation classifiers. Following their approach, I built a Bayesian network model that incorporated the independent classifiers trained as I described in the previous section. Training the classifiers proceeds as before, but at test time, the predictions of the classifiers are used to construct a Bayesian network. The most probable explanation (MPE) of the constructed network determines the final predictions for entities and relations within an entire sentence.

My initial approach to modeling the extraction process as a Bayesian network constructed a network for each sentence, with a node for each entity and potential relation, as in figure 1. This is consistent with the method described in Roth and Yih [2002].

The network probabilities for entity nodes come directly from the probability estimates output by the trained entity classifier. The conditional probability tables (CPTs) for the relation nodes are likewise determined by the probabilities from the relation classifier, except for the application of constraints.

The intent of the joint model is to enforce constraints, as described in section 3, and thereby help correct erroneous predictions from one classifier by considering the predictions of another classifier. Therefore, the conditional probabilities of a relation node taking on a value that is not consistent with its constraints is set to zero, regardless of the probabilities output by the relation classifier.

As an example of this, consider the entity pair, “Sirhan Sirhan” and “Senator Kennedy”. If the entity classifier predicts with 90% probability that “Sirhan Sirhan” is a person and with 51% probability “Senator Kennedy” is a location, yet the relation classifier reports 95% probability that the “kill” relation holds between them, the goal of the Bayesian network model is to infer that “Senator Kennedy” is a person and, in a sense, override the entity classifier’s wrong but not very confident prediction that “Senator Kennedy” is a location.

Unfortunately, this network configuration heavily promotes a MPE that, if the base classifier predictions violate the constraints, simply predicts the same entity types as the base classifiers and defaults the relation to “other”. This is because relation predictions that are not consistent with the constraints have probability 0, which means that the “other” relation must have probability 1. This dominates even confident predictions of relations that satisfy the constraints.

This results in a joint model that only in rare cases outputs a final result better than just using the base classifiers independently, and overall performance (as measured by precision, recall, and F1 scores) was almost unchanged from the separate classifier approach. This was consistent with the results reported in Roth and Yih [2002], where the joint model only made a small difference. As a consequence, I developed a slightly more complicated Bayesian network for modeling sentence-level predictions that was better able to enforce the constraints and increase recall, precision, and F1 scores.

4.4 Bayesian Network for Joint Inference – Approach #2

Figure 2: The Bayesian network built according to approach #2, corresponding to a sentence with three entities. Nodes are generated for each entity and possible relation. I nodes are indicators used to enforce constraints between entity and relation types.

Figure 2: The Bayesian network built according to approach #2, corresponding to a sentence with three entities. Nodes are generated for each entity and possible relation. I nodes are indicators used to enforce constraints between entity and relation types.

Instead of directly connecting entity to relation nodes, I defined an additional indicator variable for each possible relation, with incoming edges from the corresponding entities and relation nodes. The indicator CPTs are entirely deterministic. For all assignments of its entity and relation variables that do not violate a constraint, its value is true with probability 1. For all other assignments, its value is false with probability 1. An example network following this construction is shown in figure 2.

At inference time, the indicator variables are “observed” as evidence; in other words, instead of finding the most probable explanation’s assignment of all variables, we find the MPE of the entity and relation variables given that all indicator variables are observed to be true. In contrast to the previous approach, this network is able to correct many mistakes made by the underlying classifiers: the MPE explanation simply chooses the assignment of entity and relation variables that maximizes their joint likelihood subject to the constraints imposed by the network, which is exactly the desired effect. As seen in the following results section, using this construction of the Bayesian network has a significant positive impact on overall performance.

4.5 Circuit Compilation

Figure 3: A comparison of the compilation and inference times per sentence, based on the number of entities in each sentence. Inference must be performed for each sentence in the test set, but compilation only needs to be performed once for each corpus as a pre-processing step.

Figure 3: A comparison of the compilation and inference times per sentence, based on the number of entities in each sentence. Inference must be performed for each sentence in the test set, but compilation only needs to be performed once for each corpus as a pre-processing step.

A significant drawback of the joint model is the increased time spent in generating predictions. My initial program, written in Python, trains and tests the entity and relation classifiers, then generates a text file encoding the network and probabilities in the SamIAm Bayesian network file format (Darwiche). Next, a call is made to a Java program (the core of which is supplied with the SamIAm library, which I modified to interact with Python and return the MPE instantiation assignments). This performs approximate inference on the network. For these simple networks, the approximate answers are very close to exact; however, even approximate inference takes several seconds, as shown in figure 3.

As dynamic inference requires approximately 3 seconds to complete per sentence, in a corpus with only 1000 sentences, k-fold cross-validation requires 50 minutes just for the dynamic inference. To speed this up, I made use of the ACE package (Darwiche and Chavira), which compiles Bayesian networks into an arithmetic circuit.

For each corpus and its associated constraints, I used the ACE package to compile a Bayesian network for sentences with between two and eleven entities. Each compiled circuit represents a network encoding the appropriate constraints between all n(n- 1) possible relations between the n entities. The ACE package is distributed with a Java program that reads in a compiled network and returns the marginal probabilities; I altered this inference engine to accept as arguments the updated CPTs of the circuit gates corresponding to entity and relation probabilities, and to perform two passes over the circuit to calculate the MPE.

The benefits of compiling a Bayesian network into a circuit comes from the following:

  • Circuits enable calculating the (exact) MPE of a Bayesian network in time linear in the size of the circuit (Darwiche [2009])
  • In contrast, exact calculation of the MPE in the network is NP-hard
  • This enables a significant speed increase over dynamic inference (see figure 3)
  • Compiling a circuit only has to be done once. The CPTs encoded by the Bayesian network are translated into the circuit, and can be updated efficiently to incorporate the probability estimates of the base classifier

Note that the circuits themselves may be of size exponential in the input network, but in many cases, such as the networks I generated, much more compact circuits can be found.

Using compiled circuits, overall inference time for a corpus of 1000 sentences drops from ≈ 50 minutes to ≈ 6 minutes.

5 Results

5.1 Roth and Yih Data

Figure 4: A sample sentence from the Roth and Yih [2007] corpus.

Figure 4: A sample sentence from the Roth and Yih [2007] corpus.

The first dataset I used came from Roth and Yih [2007]. It was manually created from 1437 sentences exhibiting at least one of the following relations: located in, works for, organization based in, lives in, or kills. From these sentences, there are 5336 entities and 19048 pairs of entities (potential binary relations). A sample sentence from this dataset is shown in figure 4. This dataset has been used in other papers subsequently, including Kate and Mooney [2010].

Results are presented, averaged over 5-fold cross-validation, in tables 1 (for entities) and 2 (for relations).

The Separate lines present the results using separately trained and tested entity and relation classifiers, with no joint inference performed. This can be seen as a baseline for comparing with the performances of the joint model.

The Joint lines present the results of building a Bayesian network, according to the approach described in section 4.4. The probabilities used in the Bayesian network come from the Separate classifiers, but predictions are made at the sentence-level based on the most probable explanation of the Bayesian network.

For entity classification, the joint model provides a slight increase in F1 for the person and location entity types. These are consistent in magnitude with the results reported in Roth and Yih [2007].

However, for relation classification, the joint model increases F1 scores for every relation. The increases for the works for and kills relations are particularly high. It is interesting to see that the joint model tends to not increase, or even harm, recall, but increase precision by a significant amount. The results here are equivalent to those in Roth and Yih [2007], with the largest increases in F1 having the same magnitude as in my model.

These results show that the joint model is capable of significantly improving the relation classification performance.

Finally, the Omniscient and Omniscient + Joint lines display the results for entity and relation classifiers augmented with the omniscient features described in section 4.2.1. While this represents an unrealistic feature set for a true application, it is interesting to note that even in this case, the joint model never hurts and often improves performance.

Table 1: Results of relation classification on the Roth and Yih [2007] corpus. Experiments are conducted using 5-fold cross-validation.

Table 1: Results of relation classification on the Roth and Yih [2007] corpus. Experiments are conducted using 5-fold cross-validation.

Table 2: Results of entity classification on the Roth and Yih [2007] corpus. Experiments are conducted using 5-fold cross-validation.

Table 2: Results of entity classification on the Roth and Yih [2007] corpus. Experiments are conducted using 5-fold cross-validation.

5.2 Google’s Relation Extraction Corpus

My original goal was to use the recently released Google relation extraction corpus. It contains a number of relations, such as birth place, where each relation is accompanied by a paragraph from Wikipedia which contains text sufficient to infer the relation.

However, the Google corpus offers a challenge for evaluating results: it contains one label per paragraph, but many of the paragraphs contain multiple relations. I ran into problems such as the following: the classifier predicts that a person was born in Minnesota, which is correct and labeled so in the data, but also that he was affiliated with several schools and other relations that may or may not be correct (the paragraph they are drawn from only has the birthplace label).

To bypass this problem, and to enable testing my model on a second corpus, I only considered the birth place and place of death relations from the Google corpus. For these, there is presumably only one correct answer. I further simplified the problem by filtering the relations to those which contained a single sentence mentioning both the person and the location. Some of these sentences may not actually express the relation, but I relied on there being enough correct labels to outweigh any noise in the data.

Additionally, unlike the previous corpus, the Google corpus does not explicitly mark the entity mentions in each sentence. To overcome this, I used the Stanford NLP toolkit to automatically extract entities and used its predicted types as the ground truth.

Results are presented, averaged over 5-fold cross-validation, in tables 3 (for entities) and 4 (for relations).

Because the entity labels are generated automatically and likely contain numerous errors, it is difficult to draw any conclusions from the entity results over the Google corpus. However, for the relations (which are human-annotated), the joint model once again improves on the performance of the independent classifiers. As with the previous corpus, joint inference tends to not help or slightly harm recall but boost precision, even when omniscient features are used.

Table 3: Results of entity classification on a subset of the Google relation extraction corpus. Exper- iments are conducted using 5-fold cross-validation.

Table 3: Results of entity classification on a subset of the Google relation extraction corpus. Exper- iments are conducted using 5-fold cross-validation.

Table 4: Results of relation classification on a subset of the Google relation extraction corpus. Experiments are conducted using 5-fold cross-validation.

Table 4: Results of relation classification on a subset of the Google relation extraction corpus. Experiments are conducted using 5-fold cross-validation.

6 Conclusions

  • Benefits of Joint Inference Consistent with previously published result, my approach finds that joint inference between entity and relation classification at the sentence level provides a small but significant increase in overall performance, as measured by precision, recall, and F1 scores.
  • Consistency of Predictions By incorporating the outputs of entity and relation classifiers into a joint probabilistic model, I ensured that the predictions for each sentence are self-consistent: the probabilistic models enforce constraints present in the data, such as only two people can participate in the kills relation.
  • Compiling Bayesian Networks: Pre-compiling Bayesian networks greatly speeds up the inference process. Contrary to my original expectations, the ability to perform exact inference does not significantly enhance final prediction quality.
  • Exact versus Approximate Inference: In the graphical models I constructed to examine the influence of joint inference, whether the inference procedure returns exact or approximate makes essentially no difference. There are two primary reasons for this. One, the graphical model structures are relatively simple, and approximate inference such as loopy belief propagation has been empirically shown to achieve good approximations for these case. Second, the underlying probabilities encoded in the Bayesian networks are themselves highly noisy approximations. In hindsight, this is an obvious limitation when considering any potential performance gain granted by exact inference.

7 Future Work

  • Quality of Classifier-Generated Probabilities: An interesting aspect suggested, but not explored, by my work is quantifying the dependence of the joint model on the quality of the probabilities generated by the individual classifiers. An investigation of this aspect would combine elements of sensitivity analysis of Bayesian networks and accurate probability estimates from classifiers, two well-researched areas that have not (to the best of my knowledge) been studied from the vantage point of estimating the joint impact of sensitivity and probability estimates on an information extraction task.
  • Learning Constraints from Data This is mentioned as a possibility in Roth and Yih [2002], but not pursued. Rather than having to manually specify the constraints for each corpus, it should be possible to learn such constraints from the data, similar to the method of learning the MLE estimates of Bayesian network parameters discussed in class.

References

Adnan Darwiche. Samiam. Software available from http://reasoning.cs.ucla.edu/samiam.

Adnan Darwiche. Modeling and reasoning with Bayesian networks. Cambridge University Press, 2009.

Adnan Darwiche and Mark Chavira. Ace, an arithmetic circuit compiler. 2007. URL: http://reasoning.cs.ucla.edu/ace.

Zhou GuoDong, Su Jian, Zhang Jie, and Zhang Min. Exploring various knowledge in relation extraction. In Proceedings of the 43rd Annual Meeting on Association for Computational Linguistics, pages 427–434. Association for Computational Linguistics, 2005.

Jing Jiang and ChengXiang Zhai. A systematic exploration of the feature space for relation extraction. In HLT-NAACL, pages 113–120, 2007.

Rohit J Kate and Raymond J Mooney. Joint entity and relation extraction using card-pyramid parsing. In Proceedings of the Fourteenth Conference on Computational Natural Language Learning, pages 203–212. Association for Computational Linguistics, 2010.

Mike Mintz, Steven Bills, Rion Snow, and Dan Jurafsky. Distant supervision for relation extraction without labeled data. In Proceedings of the Joint Conference of the 47th Annual Meeting of the ACL and the 4th International Joint Conference on Natural Language Processing of the AFNLP: Volume 2-Volume 2, pages 1003–1011. Association for Computational Linguistics, 2009.

Alexandru Niculescu-Mizil and Rich Caruana. Predicting good probabilities with supervised learning. In Proceedings of the 22nd international conference on Machine learning, pages 625–632. ACM, 2005.

Sebastian Riedel and Andrew McCallum. Fast and robust joint models for biomedical event extraction. In Proceedings of the Conference on Empirical Methods in Natural Language Processing, pages 1–12. Association for Computational Linguistics, 2011.

Dan Roth and Wen-tau Yih. Probabilistic reasoning for entity & relation recognition. In Proceedings of the 19th international conference on Computational linguistics-Volume 1, pages 1–7. Association for Computational Linguistics, 2002.

Dan Roth and Wen-tau Yih. Global inference for entity and relation identification via a linear programming formulation. Introduction to Statistical Relational Learning, pages 553–580, 2007.

Shubin Zhao and Ralph Grishman. Extracting relations with integrated information using kernel methods. In Proceedings of the 43rd Annual Meeting on Association for Computational Linguistics, pages 419–426. Association for Computational Linguistics, 2005.

Relax, Compensate, and then Recover: Notes on the paper by Arthur Choi and Adnan Darwiche

I’ve been reading a lot of papers since starting at UW and I thought it might be nice to keep some written notes about a few of them.

To start off with, I am reading Choi and Darwiche’s 2011 survey-style paper on probabilistic inference, Relax, Compensate, and then Recover. The PDF is available here.

Overview of the Paper

The paper discusses exact probabilistic inference and presents a framework (called relax, compensate, and recover) for approximate probabilistic inference. From what I have encountered so far, probabilistic inference in the CS community is used exclusively when talking about calculating probabilities using a graphical models (such as Bayesian networks), which are statistical models used to represent the joint distribution and conditional dependencies over a set of random variables.

For example, variables might represent our knowledge about particular aspects of the world, such as whether the lawn is wet or whether it is raining outside. Usually, some form of evidence is noted, such as the absence of a wet lawn, and an example of probabilistic inference is then to calculate the probability that it is raining after taking into account this evidence. Calculating probabilities on graphical models is a difficult computational problem, and so this paper explores ways to expand the cases where exact inference can be used and find better approximation techniques for intractable cases.

The approximate inference technique is based on the idea of taking the input graphical model, then “relaxing” some of its equivalence constraints, and performing exact inference on the now approximate model. The notions of compensation and recovery are explained later. Their purpose is to nudge the approximate model closer to the original model, which hopefully means that our solution will move closer to the true solution of the original problem.

The exact inference technique is based on the idea of converting a probabilistic inference problem into a propositional knowledge base (this Darwiche paper also refers to these as Boolean functions). The idea here is that, given a knowledge base in a certain form (detailed later), the problem can be compiled into a form that supports tractable answering of queries.

Interestingly, even though their approximate inference technique relies on exact inference over a modified model, they note that their exact and inexact approaches have not been integrated together in practice (at time of writing, at least).

They conclude the introduction with a note that their frameworks have performed well at several recent Uncertainty in Artificial Intelligence (UAI) inference competitions.

Exact Probabilistic Inference

The first concept introduced in this section is treewidth. Treewidth is a numeric characterization of a graph’s structure that is often used in graph algorithms to characterize complexity. As the paper notes, existing exact inference algorithms have runtimes that are generally exponential in the model’s treewidth.

The second concept introduced is negation normal form (NNF), which they define as a directed acyclic graph (or circuit) where internal nodes are labeled with either AND or OR, and the leaf nodes are all literals or explicitly labeled true or false. In other words, an NNF is a boolean circuit diagram built with only AND and OR gates.

An NNF can exhibit zero, one or both of the following properties: decomposability, which means that the left and right branches of an AND node share no variables, and determinism, which means that the left and right branches of an OR node are mutually exclusive (the expression in the left branch cannot be true if the expression in the right branch is true, and vice versa).

The basic idea of this section is that any type of graphical model can be encoded into a propositional knowledge base in conjunctive normal form (CNF), where each clause has a weight. If you translate the CNF into a decomposable, deterministic NNF (dd-NNF), then you can perform efficient inference in time linear in the NNF size. In this case, the NNF represents an arithmetic circuit and a single pass from the leaves to the root will compute a probability. This sounds great, and often is, but they point out the linear run time is in the size of the dd-NNF, not in the original size of the original knowledge base, and unfortunately the conversion from CNF to dd-NNF can result in an exponential size increase.

Still, they state that a key advantage of this compilation into a dd-NNF is that it “exploits local structure (i.e., the structure of the parameters that quantify the network)”. I wasn’t sure what this meant, and the paper doesn’t elaborate, but there’s actually a chapter about this in Darwiche’s book, and in a less easily digestible form in this paper. The simplest form of “local structure”, which just means any structure arising from the conditional probability tables of a graphical model instead of its network topology, is determinism: instead of saying X is 50% likely to occur when Y occurs, you say that X occurs with certainty when Y occurs. These constraints are deterministic because they always happen, not just some of the time. Awareness of deterministic constraints can help you avoid performing inference over the entire structure of the graph. Another example of local structure that can make inference easier is context-specific independence: if X takes on a specific value, one of its children may become independent of its other parents. There are other examples, and I haven’t read far enough to see exactly how dd-NNF’s make it easier to exploit local structure, but at least this gives some of the intuition behind what local structure is referring to.

Approximate Probabilistic Inference

In this section,  Choi and Darwiche elaborate on how to perform approximate inference by taking the original problem, creating a relaxed version of it, and performing exact inference on the now-approximate model.

First up is the concept of relaxing equivalence constraints, which say that the value of variable X must be the same as Y in a valid model. Relaxing this equality constraint means simply ignoring the fact that X and Y should take on the same value. Exact inference in the relaxed model may produce a solution where, for instance, X is true and Y is false; this is invalid in the original problem, but okay in the relaxed model.

Next is compensation, which helps move the solutions on the relaxed model closer to the exact solution to the original model. In general, this is done by manipulating the approximate model to enforce a weaker notion of equivalence on the relaxed equality constraint. What exactly a weaker notion of equivalence looks like depends on the type of model, and I’ll talk about that more below.

Finally, recovery refers to restoring the equivalence constraints that, when removed, caused the worst decrease in accuracy of the approximate solution. They point to their previous work that proposes a number of recovery heuristics to try to identify the equivalence constraints whose relaxation most damaged the accuracy of the solution.

The authors also note that the exact solutions on a relaxed model coincide with the solutions produced by the mini-bucket elimination algorithm of Rina Dechter (http://www.ics.uci.edu/~csp/r62a.pdf), and that the relax, compensate, and recover framework subsumes loopy belief propagation. Perhaps some discussion of how and why this is so could serve as a future post; I don’t know enough details of either algorithm to contribute much now.

It’s worth noting that there is a connection between this inference framework and solving weighted MAX-SAT, which the authors briefly mention in this paper. They have a previous paper that discusses how to relax and compensate for weighted MAX-SAT that I found quite useful for wrapping my head around what “weaker notion of equivalency” actually means: it’s any property of the variables involved in a relaxation that, while not necessarily implying that the variables adopt the same value, helps in some fashion to move the solution of the approximate model closer to the solution of the original models. For weighted MAX-SAT, weaker equivalency corresponds to a relationship between the weights of the relaxed variables and the weights of (slightly) constrained optimal solutions in the relaxed model. For Bayesian networks, weaker equivalency corresponds to the relaxed variables having the same marginal probabilities (meaning that the probability of X being true is the same as the probability of Y being true), which makes a kind of intuitive sense: if the values of X and Y are the same, as in the original model, they must have the same marginals. So a slightly weaker definition of “the same” could be that their marginals alone are equal. Their is some more detail about this concept in this paper, and much more detail in their other work.

One not-completely obvious point about the relaxation of equivalence constraints: many times what you have in a graphical model is not an equivalence constraint but a correlation between variables. For instance, if you have connected variables X and Y, you can insert a new variable Y’ between X and Y, with the equivalence constraint that Y = Y’. When you relax the model, you let Y and Y’ take on different values: this means that connection between X and Y is broken, as X interacts with Y’ which is not related to Y in the relaxed model.

Conclusion

The paper concludes with a pointer to their Bayesian network toolkit, SamIam, which includes implementations of the inference framework they discuss.

They identify as future work the integration between their approximate and exact inference frameworks, as well as noting the potential for the further incorporation of knowledge compilation.

Cloud Recognition in the Miami Skyline

For our senior design project a UC Davis, Saerorm Park, Liz Shigetoshi, Ashish Subedy and I worked on automatically classifying cloud types in webcam images from Miami. This work was done in association with mentors at Lawrence Berkeley National Laboratory, who are interested in the problem in order to automatically gather climate data to help aid accurate climate modeling.

Here is the poster we presented at the UC Davis Engineering Showcase: Cloud Recognition in the Miami Skyline

Here is a video Saerorm made showing some of the edge detection/cloud pixel identification results: http://youtu.be/I4PF9cH7M2I

Distributed Transitive Closure Algorithms, Part 5 – The End

The previous four posts summarized the basics of two algorithms for computing the transitive closure, along with an outline of the technical details of implementation and testing.

To conclude, I am also posting a link to the honors thesis at UC Davis that resulted from this work:

Distributed Algorithms for the Transitive Closure

Distributed Transitive Closure Algorithms, Part 4 – Java Implementations

This post contains code implementing the Seminaive and Smart transitive closure algorithms. The algorithms are the same as in the previous post (Part 3), but these implementations are written in Java. These run directly on Hadoop. For these iterative algorithms, running Java code instead of Pig results in greatly improved performance.

Part 3 contains information on using Whirr to get an Amazon EC2 cluster running. The same instructions apply to getting a cluster set up for running the Java implementations.

First up is the implementation of Seminaive transitive closure. I tried to follow closely the psuedocode from Part 2.

import java.io.FileWriter;
import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;;
import java.util.HashMap;
import org.apache.hadoop.fs.FileUtil;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SeminaiveTC extends Configured implements Tool {
    private static final int splitSize = 5242880; // 5MB

    public static class JoinPartitioner extends Partitioner<IntWritable, EdgeWithOrigin> {

      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(IntWritable key, EdgeWithOrigin value, int numReduceTasks) {
        return key.get();
      }

    }

    public static class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
        public CombinedInputFormat() {
            super();
            this.setMaxSplitSize(splitSize);
        }

        @Override
        public RecordReader<LongWritable, Text> createRecordReader (InputSplit split, TaskAttemptContext context) throws IOException {

            CombineFileSplit combineSplit = (CombineFileSplit) split;

            RecordReader rr = new CombineFileRecordReader(combineSplit, context, myCombineFileRecordReader.class);
            return rr;
        }

        protected boolean isSplitable(JobContext context, Path file) {
            return true;
        }

        public static class myCombineFileRecordReader extends RecordReader<LongWritable, Text> {
            private LineRecordReader linerecord;

            private int index;

            public myCombineFileRecordReader(CombineFileSplit split,
                    TaskAttemptContext context, Integer index)
                throws IOException, InterruptedException {

                this.index = index;
                InputSplit is = (InputSplit) split;
            }

            public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
                CombineFileSplit combineSplit = (CombineFileSplit) split;

                linerecord = new LineRecordReader();

                FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());

                linerecord.initialize(fileSplit, context);
            }

            @Override
            public void close() throws IOException {
                if (linerecord != null) {
                    linerecord.close();
                    linerecord = null;
                }
            }

            @Override
            public Text getCurrentValue() {
                return linerecord.getCurrentValue();
            }

            @Override
            public LongWritable getCurrentKey() {
                return linerecord.getCurrentKey();
            }

            @Override
            public float getProgress() throws IOException {
                return linerecord.getProgress();
            }

            @Override
            public boolean nextKeyValue() throws IOException {
                return linerecord.nextKeyValue();
            }

        }
    }

    public static class Edge implements WritableComparable {
        public int x;
		public int y;

		public Edge(int x, int y) {
			this.x = x;
			this.y = y;
		}

		public Edge() {
			this(0, 0);
	    }

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y);
		}

        public int compareTo(Edge other) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
		}

		public boolean equals(Object o) {
			if (!(o instanceof Edge)) {
				return false;
			}
			Edge other = (Edge) o;
			return x == other.x && y == other.y;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

    public static class EdgeWithOrigin implements WritableComparable {
        public int x;
		public int y;
		public boolean left;

		public EdgeWithOrigin(int x, int y, boolean left) {
			this.x = x;
			this.y = y;
			this.left = left;
		}

		public EdgeWithOrigin() {
			this(0,0,false);
		}

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
			out.writeBoolean(left);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
			left = in.readBoolean();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y) + "\t" +
				String.valueOf(left);
		}

        public int compareTo(EdgeWithOrigin other) {
			if (other.left == left) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
			} else {
				if (left) {
					return -1;
				} else {
					return 1;
				}
			}
		}

		public boolean equals(Object o) {
			if (!(o instanceof EdgeWithOrigin)) {
				return false;
			}
			EdgeWithOrigin other = (EdgeWithOrigin) o;
			return x == other.x && y == other.y && left == other.left;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

	public static class JoinMap extends Mapper<Text, Text, IntWritable, EdgeWithOrigin>
	{
		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

			boolean left = false;

			Configuration conf = context.getConfiguration();
            String leftSideLocation = conf.get("leftSide");
            int hashCode = conf.getInt("hashCode", 3);

			// Requires the initial data set be in its own folder
            if ( leftSideLocation.equals( ((FileSplit)context.getInputSplit()).getPath().getParent().toString() ) ) {
                left = true;
            }

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());
			int hashA = a % hashCode;
			int hashB = b % hashCode;

			if (hashA == hashB) {
				context.write(new IntWritable(hashA), new EdgeWithOrigin(a, b, left));
			} else {
				context.write(new IntWritable(hashA), new EdgeWithOrigin(a, b, left));
				context.write(new IntWritable(hashB), new EdgeWithOrigin(a, b, left));
			}
		}
	}

	public static class JoinReducer extends Reducer<IntWritable, EdgeWithOrigin, Edge, BooleanWritable>
	{
	    BooleanWritable trueOut = new BooleanWritable(true);
		public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException
		{
			HashMap<Integer, LinkedList> canReachFromX = new HashMap<Integer, LinkedList>();
			HashMap<Integer, LinkedList> canReachY = new HashMap<Integer, LinkedList>();
			int hashX, hashY;

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

			Iterator valueIt = values.iterator();
			while (valueIt.hasNext()) {
				EdgeWithOrigin e = valueIt.next();

				if (!e.left) {
				    if (canReachFromX.containsKey(e.x)) {
				        canReachFromX.get(e.x).add(e.y);
				    } else {
				        LinkedList xLinkedList = new LinkedList();
				        xLinkedList.add(e.y);
				        canReachFromX.put(e.x, xLinkedList);
				    }
				}

				if (e.left) {
				    if (canReachY.containsKey(e.y)) {
				        canReachY.get(e.y).add(e.x);
				    } else {
				        LinkedList yLinkedList = new LinkedList();
				        yLinkedList.add(e.x);
				        canReachY.put(e.y, yLinkedList);
				    }
			    }

				hashX = e.x % hashCode;
				hashY = e.y % hashCode;

				// If h(x) == i && (x, y) \notin left, output (n, y) for all nodes n that can reach x
				if (!e.left && hashX == key.get()) {
				    if (canReachY.containsKey(e.x)) {
				        for (int n : (LinkedList) canReachY.get(e.x)) {
				            context.write(new Edge(n, e.y), trueOut);
				        }
				    }
			    }

			    if (e.left && hashY == key.get()) {
				    if (canReachFromX.containsKey(e.y)) {
				        for (int n : (LinkedList) canReachFromX.get(e.y)) {
				            context.write(new Edge(e.x, n), trueOut);
				        }
				    }
			    }

			}

		}
	}

	public static class SetDiffMap extends Mapper<LongWritable, Text, Edge, BooleanWritable>
	{
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
		    String line = value.toString();
			String[] nodes = line.split("\t");

			boolean left = false;
            if ( nodes.length == 3 ) {
                left = true;
            }

            int a = Integer.parseInt(nodes[0].toString());
            int b = Integer.parseInt(nodes[1].toString());
            context.write(new Edge(a, b), new BooleanWritable(left));
		}
	}

    public static class SetDiffReducer extends Reducer<Edge, BooleanWritable, Edge, NullWritable>
	{
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
			Iterator valueIt = values.iterator();
			boolean emptyRight = true;
			while (valueIt.hasNext()) {
				boolean inLeftRelation = valueIt.next().get();

				if (!inLeftRelation) {
				    emptyRight = false;
				    break;
				}
	        }

	        if (emptyRight) {
	            context.write(key, NullWritable.get());
	        }
	    }
	}

	public int run(String[] args) throws Exception  {
	    Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
        Path srcPath = new Path(args[1]);
        Path dstPath = new Path("out/E_0/" + srcPath.getName());
        Path dstPath2 = new Path("out/DeltaT_0/" + srcPath.getName());
        hdfs.copyFromLocalFile(srcPath, dstPath);
        hdfs.copyFromLocalFile(srcPath, dstPath2);

	    String E = "out/E_0";
	    String DeltaT = "out/DeltaT_0";
	    String timeLogFile = new String(args[2]);
	    int iteration = 1;
	    Path tempPath, tempPath2;
        boolean success = false;

	    while (iteration < 100) {

	        // Job 1 - join(DeltaT, E)
	        Configuration job1conf = new Configuration();

            job1conf.set("mapred.tasktracker.map.tasks.maximum", "3");
	        job1conf.set("mapred.max.split.size", "" + splitSize);
	        job1conf.set("mapred.reduce.tasks", args[0]);
	        job1conf.set("io.file.buffer.size", "65536");
	        job1conf.set("io.sort.factor", "100");
	        job1conf.set("io.sort.mb", "500");
	        job1conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job1conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

		    tempPath = new Path(DeltaT);
	        job1conf.set("leftSide", tempPath.getFileSystem(getConf()).makeQualified(tempPath).toString());
	        job1conf.setInt("hashCode", Integer.parseInt(args[0]));

		    Job job1 = new Job(job1conf);
		    job1.setJarByClass(SeminaiveTC.class);
		    job1.setJobName("JoinDeltaTE" + iteration);

		    job1.setMapOutputKeyClass(IntWritable.class);
		    job1.setMapOutputValueClass(EdgeWithOrigin.class);
		    job1.setOutputKeyClass(Edge.class);
		    job1.setOutputValueClass(NullWritable.class);
		    job1.setInputFormatClass(KeyValueTextInputFormat.class);

		    job1.setMapperClass(JoinMap.class);
		    job1.setReducerClass(JoinReducer.class);
		    job1.setPartitionerClass(JoinPartitioner.class);

		    FileInputFormat.setInputPaths(job1, new Path(DeltaT));
		    FileInputFormat.addInputPath(job1, new Path(E));
		    FileOutputFormat.setOutputPath(job1, new Path("out/JoinDeltaTE_" + iteration));

	        // Job 2 - set-diff(JoinDeltaTE, T)

	        Configuration job2conf = new Configuration();
            job2conf.set("mapred.tasktracker.map.tasks.maximum", "3");
	        job2conf.set("mapred.max.split.size", "" + splitSize); // More important to change in CombineFileInput above
	        job2conf.set("mapred.reduce.tasks", args[0]);
	        job2conf.set("io.file.buffer.size", "65536");
	        job2conf.set("io.sort.factor", "100");
	        job2conf.set("io.sort.mb", "500");
	        job2conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job2conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

		    Job job2 = new Job(job2conf);
		    job2.setJarByClass(RightLinearTC.class);
		    job2.setJobName("SetDiff" + iteration);

		    job2.setMapOutputKeyClass(Edge.class);
		    job2.setMapOutputValueClass(BooleanWritable.class);
		    job2.setOutputKeyClass(Edge.class);
		    job2.setOutputValueClass(NullWritable.class);

		    job2.setInputFormatClass(CombinedInputFormat.class);

		    job2.setMapperClass(SetDiffMap.class);
		    job2.setReducerClass(SetDiffReducer.class);

		    FileInputFormat.setInputPaths(job2, new Path("out/JoinDeltaTE_" + iteration));
		    FileInputFormat.addInputPath(job2, new Path(E));
		    for (int i = 1; i < iteration; i++) {
		        FileInputFormat.addInputPath(job2, new Path("out/DeltaT_" + i));
		    }
		    FileOutputFormat.setOutputPath(job2, new Path("out/DeltaT_" + iteration));

            FileWriter fileWriter = new FileWriter(timeLogFile, true);
            long time = System.currentTimeMillis();
            long startTime = time;
            long newTime;
            fileWriter.write("iteration" + iteration + "\t");

		    success = job1.waitForCompletion(true);

		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;

            if (success) success = job2.waitForCompletion(true);

		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t" + (newTime - startTime) / 1000 + "\n");

            fileWriter.close();

            if (!success) {
                break;
            }

            Counters counters = job2.getCounters();
            long sizeQ = counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).getValue();

            if (sizeQ < 1) {
                break;
            }

            DeltaT = "out/DeltaT_" + iteration;
            iteration++;
        }
		return success ? 0 : 1;
	}

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new RightLinearTC(), args);
		System.exit(result);
	}

}

However, this was my first experience implementing a non-trivial algorithm in Hadoop and, consequently, my first experience with trying to performance-tune Hadoop. Some parts of the code explicitly deal with configuring parameters such as the split size, number of map reduce tasks, etc. It is probably preferable to account for these configuration settings in a more elegant matter, and it is very likely that my exact parameters are not the optimal settings. They were simply the best I found via a trial-and-error approach.

Also, I should note that the use of a custom partitioner, as defined at the top of the source code, was dramatic in increasing the algorithm’s performance. It helps ensure that, even with a large number of reducers, the output after each iteration of the algorithm is not too fragmented (having fewer big files is generally better, performance-wise, than many very small files).

After compiling the above Java file into a .jar and uploading it to your cluster, you can run the program with the following command: (where <num reducers> = the number of reducer tasks you wish to use)

hadoop jar SeminaiveTC.jar SeminaiveTC  graph.txt time_log.txt

Here is the code for SmartTC, which also attempts to adhere closely to the pseudocode in the previous post.

import java.io.FileWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;;
import java.util.HashMap;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SmartTC extends Configured implements Tool {
    private static final int splitSize = 129554432; // 32MB

	public static class JoinPartitioner extends Partitioner<IntWritable, EdgeWithOrigin> {

      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(IntWritable key, EdgeWithOrigin value, int numReduceTasks) {
        return key.get();
      }
    }

    public static class OneSourceJoinPartitioner extends Partitioner<IntWritable, Edge> {

      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(IntWritable key, Edge value, int numReduceTasks) {
        return key.get();
      }
    }

    public static class Edge implements WritableComparable {
        public int x;
		public int y;

		public Edge(int x, int y) {
			this.x = x;
			this.y = y;
		}

		public Edge() {
			this(0, 0);
	    }

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y);
		}

        public int compareTo(Edge other) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
		}

		public boolean equals(Object o) {
			if (!(o instanceof Edge)) {
				return false;
			}
			Edge other = (Edge) o;
			return x == other.x && y == other.y;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

    public static class EdgeWithOrigin implements WritableComparable {
        public int x;
		public int y;
		public boolean left;

		public EdgeWithOrigin(int x, int y, boolean left) {
			this.x = x;
			this.y = y;
			this.left = left;
		}

		public EdgeWithOrigin() {
			this(0,0,false);
		}

		public void write(DataOutput out) throws IOException {
			out.writeInt(x);
			out.writeInt(y);
			out.writeBoolean(left);
		}

		public void readFields(DataInput in) throws IOException {
			x = in.readInt();
			y = in.readInt();
			left = in.readBoolean();
		}

		public String toString() {
			return Integer.toString(x) + "\t" +
			    Integer.toString(y) + "\t" +
				String.valueOf(left);
		}

        public int compareTo(EdgeWithOrigin other) {
			if (other.left == left) {
                if (other.x == x) {
					Integer temp = new Integer(y);
					return temp.compareTo(other.y);
				} else {
					Integer temp = new Integer(x);
					return temp.compareTo(other.x);
				}
			} else {
				if (left) {
					return -1;
				} else {
					return 1;
				}
			}
		}

		public boolean equals(Object o) {
			if (!(o instanceof EdgeWithOrigin)) {
				return false;
			}
			EdgeWithOrigin other = (EdgeWithOrigin) o;
			return x == other.x && y == other.y && left == other.left;
		}

		public int hashCode() {
			return x ^ y;
		}
	}

	public static class JoinMap extends Mapper<Text, Text, IntWritable, EdgeWithOrigin>
	{
		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

			boolean left = false;

			Configuration conf = context.getConfiguration();
            String leftSideLocation = conf.get("leftSide");
            int hashCode = conf.getInt("hashCode", 3);

			// Requires the initial data set be in its own folder
            if ( leftSideLocation.equals( ((FileSplit)context.getInputSplit()).getPath().getParent().toString() ) ) {
                left = true;
            }

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());
			int hashA = a % hashCode;
			int hashB = b % hashCode;

			if (left) {
			    context.write(new IntWritable(hashB), new EdgeWithOrigin(a, b, left));
			} else {
			    context.write(new IntWritable(hashA), new EdgeWithOrigin(a, b, left));
			}

		}
	}

	public static class JoinReducer extends Reducer<IntWritable, EdgeWithOrigin, Edge, NullWritable>
	{
		public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException
		{
		    // reachableFrom is keyed on x and stores a list containing all nodes x can reach (in right side relation of join)
		    // canReach is keyed on y and stores a list of all nodes that can reach y (in left side relation of join)
			HashMap<Integer, LinkedList> canReach = new HashMap<Integer, LinkedList>();
			HashMap<Integer, LinkedList> reachableFrom = new HashMap<Integer, LinkedList>();
			int hashX, hashY;

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

			Iterator valueIt = values.iterator();
			while (valueIt.hasNext()) {
				EdgeWithOrigin e = valueIt.next();

				if (!e.left) { // implies that h(x) == key
				    if (reachableFrom.containsKey(e.x)) {
				        reachableFrom.get(e.x).add(e.y);
				    } else {
				        LinkedList xLinkedList = new LinkedList();
				        xLinkedList.add(e.y);
				        reachableFrom.put(e.x, xLinkedList);
				    }

				    // Search left-side relation store and see if it contains (*, x)
				    // If so, emit (*, y) for all nodes in left-side relation that can reach x
				    if (canReach.containsKey(e.x)) {
				        for (int n : (LinkedList) canReach.get(e.x)) {
				            context.write(new Edge(n, e.y), NullWritable.get());
				        }
				    }
				} else { // implies that h(y) == key
				    if (canReach.containsKey(e.y)) {
				        canReach.get(e.y).add(e.x);
				    } else {
				        LinkedList yLinkedList = new LinkedList();
				        yLinkedList.add(e.x);
				        canReach.put(e.y, yLinkedList);
				    }

				    // Search right-side relation store and see if it contains (y, *)
				    // If so, emit (x, *) for all nodes in right-side relation reachable from y
				    if (reachableFrom.containsKey(e.y)) {
				        for (int n : (LinkedList) reachableFrom.get(e.y)) {
				            context.write(new Edge(e.x, n), NullWritable.get());
				        }
				    }
			    }

			}

		}
	}

	public static class SetDiffMap extends Mapper<LongWritable, Text, Edge, BooleanWritable>
	{
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
		    String line = value.toString();
			String[] nodes = line.split("\t");

			boolean left = false;

            if ( nodes.length == 3 ) {
                left = true;
            }

            int a = Integer.parseInt(nodes[0].toString());
            int b = Integer.parseInt(nodes[1].toString());
            context.write(new Edge(a, b), new BooleanWritable(left));
		}
	}

    public static class SetDiffReducer extends Reducer<Edge, BooleanWritable, Edge, NullWritable>
	{
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
			Iterator valueIt = values.iterator();
			boolean emptyRight = true;
			while (valueIt.hasNext()) {
				boolean inLeftRelation = valueIt.next().get();

				if (!inLeftRelation) {
				    emptyRight = false;
				    break;
				}
	        }

	        if (emptyRight) {
	            context.write(key, NullWritable.get());
	        }
	    }
	}

	public static class DupElimMap extends Mapper<Text, Text, Edge, NullWritable>
	{
		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());

			context.write(new Edge(a, b), NullWritable.get());
		}
	}

	public static class DupElimReducer extends Reducer<Edge, NullWritable, Edge, NullWritable>
	{
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
		    context.write(key, NullWritable.get());
		}
	}

	public static class DupElimReducer2 extends Reducer<Edge, NullWritable, Edge, BooleanWritable>
	{
	    BooleanWritable trueOut = new BooleanWritable(true);
		public void reduce(Edge key, Iterable values, Context context) throws IOException, InterruptedException
		{
		    context.write(key, trueOut);
		}
	}

    public static class OneSourceJoinMap extends Mapper<Text, Text, IntWritable, Edge>
	{

		@Override
		public void map(Text x, Text y, Context context)
				throws IOException, InterruptedException {

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

            int a = Integer.parseInt(x.toString());
            int b = Integer.parseInt(y.toString());
			int hashA = a % hashCode;
			int hashB = b % hashCode;

			if (hashA == hashB) {
				context.write(new IntWritable(hashA), new Edge(a, b));
			} else {
				context.write(new IntWritable(hashA), new Edge(a, b));
				context.write(new IntWritable(hashB), new Edge(a, b));
			}
		}
	}

	public static class OneSourceJoinReducer extends Reducer<IntWritable, Edge, Edge, BooleanWritable>
	{
	    BooleanWritable trueOut = new BooleanWritable(true);
		public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException
		{
			HashMap<Integer, LinkedList> canReachFromX = new HashMap<Integer, LinkedList>();
			HashMap<Integer, LinkedList> canReachY = new HashMap<Integer, LinkedList>();
			int hashX, hashY;

			Configuration conf = context.getConfiguration();
            int hashCode = conf.getInt("hashCode", 3);

			Iterator valueIt = values.iterator();
			while (valueIt.hasNext()) {
				Edge e = valueIt.next();

				// Corrects derivation count for cliques (where joining (0, 0) and (0, 0) produced *two* (0,0) results)
				// But might break proper derivation count when joining (0,0) and (0, *).
				if (e.x == e.y) {
				    context.write(e, trueOut);
				    continue;
				}

				if (canReachFromX.containsKey(e.x)) {
				    canReachFromX.get(e.x).add(e.y);
				} else {
				    LinkedList xLinkedList = new LinkedList();
				    xLinkedList.add(e.y);
				    canReachFromX.put(e.x, xLinkedList);
				}

				if (canReachY.containsKey(e.y)) {
				    canReachY.get(e.y).add(e.x);
				} else {
				    LinkedList yLinkedList = new LinkedList();
				    yLinkedList.add(e.x);
				    canReachY.put(e.y, yLinkedList);
				}

				hashX = e.x % hashCode;
				hashY = e.y % hashCode;

				// If h(x) == i, output (n, y) for all nodes n that can reach x
				if (hashX == key.get()) {
				    if (canReachY.containsKey(e.x)) {
				        for (int n : (LinkedList) canReachY.get(e.x)) {
				            context.write(new Edge(n, e.y), trueOut);
				        }
				    }
			    }

			    if (hashY == key.get()) {
				    if (canReachFromX.containsKey(e.y)) {
				        for (int n : (LinkedList) canReachFromX.get(e.y)) {
				            context.write(new Edge(e.x, n), trueOut);
				        }
				    }
			    }

			}

		}
	}

	public int run(String[] args) throws Exception  {
		Configuration config = new Configuration();
        FileSystem hdfs = FileSystem.get(config);
        Path QPath = new Path(args[1]);
        Path dstPath = new Path("out/Q_0/" + QPath.getName());
        hdfs.copyFromLocalFile(QPath, dstPath);
        Path dstPath2 = new Path("out/P_0/");
        hdfs.mkdirs(dstPath2);
	    String Q = "out/Q_0/";
	    String P = "out/P_0/";
	    int iteration = 1;
	    Path tempPath, tempPath2;
	    String timeLogFile = new String(args[2]);
        boolean success = false;

	    while (iteration < 100) {
	        // Job 1 - join(Q, P)

	        Configuration job1conf = new Configuration();
	        job1conf.set("mapred.max.split.size", "" + splitSize);
	        job1conf.set("mapred.reduce.tasks", args[0]);
	        job1conf.set("io.file.buffer.size", "65536");
	        job1conf.set("io.sort.factor", "100");
	        job1conf.set("io.sort.mb", "500");
	        job1conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job1conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB
	        job1conf.setInt("hashCode", Integer.parseInt(args[0]));

		    tempPath = new Path(Q);
	        job1conf.set("leftSide", tempPath.getFileSystem(getConf()).makeQualified(tempPath).toString());

	        Job job1 = new Job(job1conf);

		    job1.setJarByClass(SmartTC.class);
		    job1.setJobName("JoinQP" + iteration);

		    job1.setMapOutputKeyClass(IntWritable.class);
		    job1.setMapOutputValueClass(EdgeWithOrigin.class);
		    job1.setOutputKeyClass(Edge.class);
		    job1.setOutputValueClass(NullWritable.class);
		    job1.setInputFormatClass(KeyValueTextInputFormat.class);

		    job1.setMapperClass(JoinMap.class);
		    job1.setReducerClass(JoinReducer.class);
		    job1.setPartitionerClass(JoinPartitioner.class);

		    FileInputFormat.setInputPaths(job1, new Path(Q));
		    FileInputFormat.addInputPath(job1, new Path(P));
		    FileOutputFormat.setOutputPath(job1, new Path("out/DeltaP_" + iteration));

		    // Job 2 - dup-elim(union(Q, P, DeltaP))
			Configuration job2conf = new Configuration();
	        job2conf.set("mapred.max.split.size", "" + splitSize);
	        job2conf.set("mapred.reduce.tasks", args[0]);
	        job2conf.set("io.file.buffer.size", "65536");
	        job2conf.set("io.sort.factor", "100");
	        job2conf.set("io.sort.mb", "500");
	        job2conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job2conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

	        Job job2 = new Job(job2conf);
		    job2.setJarByClass(SmartTC.class);
		    job2.setJobName("DupElimQPDeltaP" + iteration);

		    job2.setMapOutputKeyClass(Edge.class);
		    job2.setMapOutputValueClass(NullWritable.class);
		    job2.setOutputKeyClass(Edge.class);
		    job2.setOutputValueClass(NullWritable.class);
		    job2.setInputFormatClass(KeyValueTextInputFormat.class);

		    job2.setMapperClass(DupElimMap.class);
		    job2.setReducerClass(DupElimReducer.class);

		    FileInputFormat.setInputPaths(job2, new Path(Q));
		    FileInputFormat.addInputPath(job2, new Path(P));
		    FileInputFormat.addInputPath(job2, new Path("out/DeltaP_" + iteration));
		    FileOutputFormat.setOutputPath(job2, new Path("out/P_" + iteration));

		    // Job 3 - join(Q,Q)

		    Configuration job3conf = new Configuration();
	        job3conf.set("mapred.max.split.size", "" + splitSize);
	        job3conf.set("mapred.reduce.tasks", args[0]);
	        job3conf.set("io.file.buffer.size", "65536");
	        job3conf.set("io.sort.factor", "100");
	        job3conf.set("io.sort.mb", "500");
	        job3conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job3conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB
	        job3conf.setInt("hashCode", Integer.parseInt(args[0]));

	        Job job3 = new Job(job3conf);

		    job3.setJarByClass(SmartTC.class);
		    job3.setJobName("JoinQQ" + iteration);

		    job3.setMapOutputKeyClass(IntWritable.class);
		    job3.setMapOutputValueClass(Edge.class);
		    job3.setOutputKeyClass(Edge.class);
		    job3.setOutputValueClass(NullWritable.class);
		    job3.setInputFormatClass(KeyValueTextInputFormat.class);

		    job3.setMapperClass(OneSourceJoinMap.class);
		    job3.setReducerClass(OneSourceJoinReducer.class);
		    job3.setPartitionerClass(OneSourceJoinPartitioner.class);

		    FileInputFormat.setInputPaths(job3, new Path(Q));
		    FileOutputFormat.setOutputPath(job3, new Path("out/JoinQQ_" + iteration));

	        // Job 5 - set-diff(QQ, P)
		    Configuration job5conf = new Configuration();
	        job5conf.set("mapred.max.split.size", "" + splitSize);
	        job5conf.set("mapred.reduce.tasks", args[0]);
	        job5conf.set("io.file.buffer.size", "65536");
	        job5conf.set("io.sort.factor", "100");
	        job5conf.set("io.sort.mb", "500");
	        job5conf.set("mapred.child.java.opts", "-Xmx1024m");
	        job5conf.set("mapred.child.ulimit", "2097152"); // In KB = 2GB

	        Job job5 = new Job(job5conf);

		    job5.setJarByClass(SmartTC.class);
		    job5.setJobName("SetDiffQP" + iteration);

		    job5.setMapOutputKeyClass(Edge.class);
		    job5.setMapOutputValueClass(BooleanWritable.class);
		    job5.setOutputKeyClass(Edge.class);
		    job5.setOutputValueClass(NullWritable.class);

		    job5.setMapperClass(SetDiffMap.class);
		    job5.setReducerClass(SetDiffReducer.class);

		    FileInputFormat.setInputPaths(job5, new Path("out/JoinQQ_" + iteration));
		    FileInputFormat.addInputPath(job5, new Path("out/P_" + iteration));
		    FileOutputFormat.setOutputPath(job5, new Path("out/Q_" + iteration));

		    FileWriter fileWriter = new FileWriter(timeLogFile, true);
            long time = System.currentTimeMillis();
            long startTime = time;
            long newTime;
            fileWriter.write("iteration" + iteration + "\t");

		    success = job1.waitForCompletion(true);
            newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;
            if (success) success = job2.waitForCompletion(true);
		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;
            if (success) success = job3.waitForCompletion(true);
		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t");
            time = newTime;

            if (success) success = job5.waitForCompletion(true);

		    newTime = System.currentTimeMillis();
            fileWriter.write( (newTime - time) / 1000 + "\t" + (newTime - startTime) / 1000 + "\n");

            fileWriter.close();

            if (!success) {
                break;
            }

            Counters counters = job5.getCounters();
            long sizeQ = counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).getValue();

            if (sizeQ < 1) {
                break;
            }

            Q = "out/Q_" + iteration;
            P = "out/P_" + iteration;
            iteration++;

        }
		return success ? 0 : 1;
	}

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new SmartTC(), args);
		System.exit(result);
	}
}

After compiling and uploading the .jar to your cluster, the Smart TC program can be run with the following command:

hadoop jar SmartTC.jar SmartTC  graph.txt time_log.txt

Distributed Transitive Closure Algorithms, Part 3 – Up and Running with Pig

The programs below can be run locally on a single computer, but Pig and Hadoop are designed to run on large clusters. I used Amazon’s EC2 computers for my tests. This post describes how to get a cluster on EC2 up and running with the Pig transitive closure algorithms (using Hadoop with Java will be the subject of the next post).

Amazon’s EC2 service allows you to rent from one to several dozen computers and pay only for the time you use. At the time of writing, they also offer up to 750 hours of time on their “Micro” instances with no charge. The micro instances are nice to get started and experiment with, but they are too underpowered for most real usage.

Launching a Cluster

The Apache Whirr project offers a quick and easy way to launch clusters on Amazon EC2 or other cloud computing providers. Whirr operates using recipes, configuration files that allow you to specify the properties of your cluster and, in the case of Hadoop, Pig, and many other software packages, what software you would like installed on your cluster when it is launched. I didn’t spend any time at all dealing with launching clusters directly on EC2, without using Whirr, so I can’t directly compare the experience. However, Whirr definitely made it easy to get EC2 clusters up and running. For a quick introduction to Whirr for EC2, this blog post is a good start, and will help you configure and use the following Whirr recipe.

Here is the Whirr recipe that I used:

whirr.cluster-name=tc-4cc4xl

whirr.cluster-user=${sys:user.name}

whirr.instance-templates=1 hadoop-namenode+hadoop-jobtracker+pig-client,
    3 hadoop-datanode+hadoop-tasktracker

whirr.hadoop.version=1.1.2

whirr.pig.version=0.10.1

whirr.pig.tarball.url=http://apache.osuosl.org/pig/pig-${whirr.pig.version}/pig-${whirr.pig.version}.tar.gz

whirr.provider=aws-ec2

whirr.identity=YOURIDENTITYHERE

whirr.credential=YOURCREDENTIALSHERE

whirr.hardware-id=cc1.4xlarge

whirr.image-id=us-east-1/ami-59204e30

whirr.location-id=us-east-1

hadoop-hdfs.dfs.replication=1

This recipe launches a cluster of 4 of Amazon’s XL Cluster Compute instances, which are optimized for high-speed communication between the different machines. I found this to be helpful, in terms of speed of execution, for the distributed transitive closure algorithms, as there is lots of data being transferred across the network during the computation. Note, however, that these instances are not free, and are quite a bit more expensive per machine than the commodity M1 or M2 instances.

Once the cluster is started, the final output from Whirr will be a list of machines in the new cluster, their IP addresses, and the ssh command to log in. There should be a line such as the following:

[hadoop-namenode+hadoop-jobtracker+pig-client]: ssh -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no eric@50.17.173.124

This tells you the IP address of the NameNode and JobTracker, which is the main machine in the cluster responsible for assigning tasks to the other machines. Additionally, this instance has been configured with Pig. This is the machine you will want to ssh into to run the distributed transitive closure algorithms.

Pig Implementation

Pig is a very nice scripting language built on top of Hadoop. Initially, I used Pig to implement the transitive closure algorithms.

Here is the Pig implementation of seminaive transitive closure. It is written using the Python API, which allows you to do embed Pig inside of a Python script. Pig, by itself, has no capability for performing iterative or recursive computations. In the code below, Python is used to control and monitor the iterative computation. I had no experience with Pig before embarking on this project, and I was fortunate enough to find this prior work on Transitive Closure in Pig. The code below was originally based off of Julien’s code, and I’m thankful to that blog post for getting me started quickly with iterative Pig computations. It’s entirely possible that I introduced mistakes or inefficiencies in the following code, but at the least, hopefully it is helpful for anyone else planning to implement seminaive or smart transitive closure.

#!/usr/bin/python

import time
import sys
from org.apache.pig.scripting import *

def main():
    Pig.fs("rmr out/tc")
    
    Q = Pig.compile("""
        SET default_parallel 4; -- set to fully utilize cluster
        SET job.name '$iteration_name';
        
        SET mapred.min.split.size  3146000; -- = 3MB
        SET mapred.max.split.size  3146000;
        SET pig.maxCombinedSplitSize 3146000; -- Most important, not sure if other two are needed

        E = LOAD '$Edges' USING PigStorage(',') AS (x:int, y:int);
        Delta = LOAD '$Delta' USING PigStorage(',') AS (x:int, y:int);
        T = LOAD '$T' USING PigStorage(',') AS (x:int, y:int);

        DeltaPrimeBigSchema = JOIN Delta BY y, E by x USING 'replicated';
        DeltaPrime = FOREACH DeltaPrimeBigSchema GENERATE Delta::x as x, E::y as y;
        DeltaCogroup = COGROUP DeltaPrime BY (x, y), T by (x, y);
        DeltaFiltered = FILTER DeltaCogroup BY IsEmpty(T);
        Delta = FOREACH DeltaFiltered GENERATE group.x, group.y;

        T = UNION T, Delta;
        
        STORE Delta INTO '$newDelta' USING PigStorage(',');
        STORE T INTO '$newT' USING PigStorage(',');
    """
    )
    
    Edges = sys.argv[1]
    Delta = Edges
    T = Edges
    
    start = time.time()
    for i in range(100):
        iteration_name = "tc" + str(i + 1)
        newDelta = "out/tc/delta_" + str(i + 1)
        newT = "out/tc/T_" + str(i + 1)
        #countQDerivations = "out/tc/derivations_q_" + str(i + 1)
        #countPDerivations = "out/tc/derivations_p_" + str(i + 1)
        job = Q.bind().runSingle()
        if not job.isSuccessful():
            raise 'failed'
        Delta = newDelta
        T = newT
        
        f = open(sys.argv[2], 'a')
        f.write(iteration_name + "," + str(time.time() - start) + "\n")
        f.close()
        start = time.time()

        # detect if we are done
        if not job.result("Delta").iterator().hasNext():
            break

# To run: pig seminaivetc.py <path to edge list>
if __name__ == '__main__':
    main()
    

Here is the Pig code for Smart:

#!/usr/bin/python

import sys
import time
from org.apache.pig.scripting import *

def main():
    # cleanup output directory before starting
    Pig.fs("rmr out/tc")
    
    PigJob = Pig.compile("""
        SET default_parallel 4;  -- set to fully utilize cluster
        SET job.name '$iteration_name';
        
        SET mapred.min.split.size  8388608; -- = 8MB
        SET mapred.max.split.size  8388608;
        SET pig.maxCombinedSplitSize 8388608; -- Most important, not sure if other two are needed
        
        Q = LOAD '$Q' USING PigStorage(',')  AS (x:int, y:int); 
        Q2 = LOAD '$Q' USING PigStorage(',')  AS (x:int, y:int); -- self joins not supported 
        P = LOAD '$P' USING PigStorage(',')  AS (x:int, y:int);
        
        DeltaPTemp = JOIN P BY x, Q BY y;
        DeltaP = FOREACH DeltaPTemp GENERATE Q::x AS x, P::y AS y;
        PDup = UNION DeltaP, P, Q;
        P = DISTINCT PDup;
       
        QJoinQ = JOIN Q by y, Q2 by x;
        QQ = FOREACH QJoinQ GENERATE Q::x AS x, Q2::y AS y;
        QQCogroup = COGROUP QQ BY (x, y), P BY (x, y);
        QQFiltered = FILTER QQCogroup BY IsEmpty(P);
        Q = FOREACH QQFiltered GENERATE group.x, group.y;
        
        STORE P INTO '$newP' USING PigStorage(',');
        STORE Q INTO '$newQ' USING PigStorage(',');
    """
    )
    
    Q = sys.argv[1]
    P = sys.argv[2]
    
    start = time.time()
    for i in range(10):
        iteration_name = "smarttc" + str(i + 1)
        newQ = "out/tc/q_" + str(i + 1)
        newP = "out/tc/p_" + str(i + 1)
        #countQDerivations = "out/tc/derivations_q_" + str(i + 1)
        #countPDerivations = "out/tc/derivations_p_" + str(i + 1)
        job = PigJob.bind().runSingle()
        if not job.isSuccessful():
            raise 'failed'
        Q= newQ
        P = newP
        
        f = open(sys.argv[3], 'a')
        f.write(iteration_name + "," + str(time.time() - start) + "\n")
        f.close()
        start = time.time()
        
        # detect if we are done
        if not job.result("Q").iterator().hasNext():
            break
            

# To run: pig smarttc.py <path to q> <path to empty file>
# output goes in out/tc/p_<last>/part-m-* (can be in multiple files)
if __name__ == '__main__':
    main()

Transferring the Code and Data to EC2

In order to run the Pig code, you will have to transfer it to the EC2 pig-client instance started by Whirr. The following commands will use SCP to transfer the files to your cluster (modify the commands to match the output of the Whirr launch-cluster command):

scp -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no ~/pig/seminaivetc.py eric@174.129.130.47:/home/users/eric/
scp -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no ~/pig/smarttc.py eric@174.129.130.47:/home/users/eric/

Additionally, you will need to place some sample graph data on the cluster. To transfer file to your cluster’s Hadoop file system, issue the following commands to configure your local Hadoop to connect to the cluster’s file system and copy over the data files (If you don’t have Hadoop installed locally, you can use SCP to upload the data files and then transfer them to the cluster’s Hadoop file system once you’ve ssh’ed into your cluster).

~/.whirr/tc-4cc4xl/hadoop-proxy.sh &
export HADOOP_CONF_DIR=~/.whirr/tc-4cc4xl
hadoop fs -put ~/data/graph.txt /tmp/graph.txt
hadoop fs -put ~/data/empty.txt /tmp/empty.txt

The above assumes that graph.txt contains a tab-separated directed edge list, where the nodes are integers. For example, here is an edge list representing the fused-together binary trees of the Afrati et al. paper:

1	2
1	3
2	4
2	5
3	6
3	7
4	8
4	9
5	10
5	11
6	12
6	13
7	14
7	15
8	16
8	17
9	18
9	19
10	20
10	21
11	22
11	23
12	24
12	25
13	26
13	27
14	28
14	29
15	30
15	31
16	32
17	32
18	33
19	33
20	34
21	34
22	35
23	35
24	36
25	36
26	37
27	37
28	38
29	38
30	39
31	39
32	40
33	40
34	41
35	41
36	42
37	42
38	43
39	43
40	44
41	44
42	45
43	45
44	46
45	46

And for, ahem, legacy reasons, the Smart implementation requires an empty text file (representing the initially empty P relation).

Running Seminaive

Using the specific address and username output by the Whirr launch-cluster command, you can ssh into your cluster:

ssh -i /home/eric/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" -o StrictHostKeyChecking=no eric@54.234.206.45

Once in, you can run the Seminaive algorithm on the graph in graph.txt as follows: (note, this command requires /tmp/graph.txt to be a location in the cluster’s Hadoop file system, not the local file system)

pig seminaivetc.py /tmp/graph.txt time_log.txt

The last argument is simply a file name for python to write down the time required for each iteration of the algorithm. Once Pig has finished running, the full transitive closure of the graph can be found in the Hadoop file system at out/tc. You can copy it to the cluster’s local file system for viewing with the following command:

hadoop fs -copyToLocal out/tc ./

Running Smart

Running Smart is almost identical to running Seminaive. The output is placed into the same directory in the Hadoop file system.

pig smarttc.py /tmp/graph.txt /tmp/empty.txt time_log.txt