A diagram of the architecture of Pregel+ is shown in the figure below. Pregel+ is implemented in C/C++ as a group of header files, and users only need to include the necessary base classes and implement the application logic in their subclasses. A Pregel+ application program is compiled using GCC, preferably with -O2 option enabled. Click here to see how to run a Pregel+ program in your cluster.
Pregel+ communicates with HDFS (e.g. for graph loading and result dumping) through libhdfs, a JNI based C API for HDFS. Each computing unit (or, worker) of Pregel+ is simply an MPI process and communications are implemented using MPI communication primitives. One may deploy Pregel+ with any Hadoop and MPI version, such as Hadoop 1.2.1 and MPICH 3.0.4.
Unlike Pregel, Pregel+ also makes the master a worker, and the task of fault recovery can be implemented by a script as follows. The script loops a Pregel+ job, which runs for at most Δ supersteps before dumping the intermediate results to HDFS. Meanwhile, the script also monitors the cluster condition. If a machine is down, the script kills the current job and restarts another job loading the latest intermediate results from HDFS. Fault tolerance is achieved by the data replication in HDFS.
In this web page, we just provide a high-level description of Pregel+'s base classes. Users need to subclass them with their template arguments properly specified in the application code. Click here to check the detailed programming interface.
The Vertex class has an abstract compute() function, where users implement their vertex-centric computing logic. An Vertex object maintains three fields: a vertex ID of type <I>, a vertex value type <V>, and a boolean state indicating whether it is active. Unlike other Pregel-like systems where a vertex object also maintains an adjacency list of items representing the out-edges with value type <E>, we choose to let users implement the adjacent list(s) in <V>. This is because in some algorithms like bi-directional BFS, we also need to maintain the in-neighbors, while in other algorithms we do not need the adjacency list at all (e.g. the k-means algorithm of this paper). Moreover, while <E> is required to indicate the edge length for weighted graphs, it is useless for unweighted graphs, and the adjacency list can be simply implemented as std::vector<I> to save space. In this sense, our approach is more flexible.
The Worker class takes the user-defined Vertex subclass as an argument, and has two abstract functions for users to specify:
VertexT* toVertex(char* line)
void toline(VertexT* v, BufferedWriter& writer)
The first function defines how to parse a line from the input graph data on HDFS into a vertex object. When a job runs, this function will be called for each line of the input graph data, before graph computation begins. Note that the function returns a pointer, which means that users need to call new VertexT in the function and set its field properly before returning it. We return VertexT * instead of VertexT to avoid the copying of a potentially large vertex object, and vertex deletion is automatically handled by our system.
The second function defines what to write to HDFS (using BufferedWriter& writer) for a processed vertex VertexT* v. This function is called for each vertex after the graph computation is done.
We assume users are familiar with the concept of message combiner. Otherwise, you may check Section 3.2 of this paper. The Combiner class has only one abstract function for users to specify:
void combine(MessageT& old_msg, MessageT& new_msg)
Here, MessageT& old_msg refers to the combined message. When a vertex in the current worker sends a message to vertex tgt, the system checks whether it is the first message on this worker targeted at tgt. If so, it becomes old_msg; otherwise, combine(old_msg, new_msg) is called to combine that message (i.e. new_msg) with the combined message (i.e. old_msg).
We assume users are familiar with the concept of aggregator. Otherwise, you may check Section 3.3 of this paper. The Aggregator class has five abstract functions for users to specify:
void init()
void stepPartial(VertexT* v)
void stepFinal(PartialT* p)
PartialT* finishPartial()
FinalT* finishFinal()
We illustrate the usage of the above functions by describing how our aggregator works. One may then check some of our application codes that use aggregator to better understand its usage.
Each worker maintains an aggregator object. At the beginning of a superstep (before vertex-centric computation), the aggregator object calls init() to initialize its state. Then, for each vertex v that is active at the beginning of the superstep, v.compute() is first called, followed by the aggregator calling stepPartial(v). In stepPartial(v), the aggregator checks v's state (which may already be updated by v.compute()) and updates its own state. Note that stepPartial(v) is called as long as v is active at the begnning of the superstep, even when v votes to halt in compute(). After vertex computation finishes, the system calls finishPartial() to obtain the partially aggregated state (of type PartialT) for each worker. This partially aggregated state is presented as the circles marked with p in the above figure.
Before the next superstep begins, the partially aggregated value p of each worker is sent to the master, where the master calls stepFinal(PartialT* p) to update the state of its aggregator with that of p (note that our system makes master also a worker, which processes a fraction of vertices and has its own aggregator object). After stepFinal(PartialT* p) is called for all partially aggregated values, the master calls finishFinal() to obtain the final aggregated value (i.e. the circle marked with f in the above figure) and broadcasts it back to each worker. If one wants to post-process the aggregated result before the next superstep, one may do that in finishFinal() (which is similar to the master-compute functionality of Giraph and GPS).
If you have many vertex states to aggregate (possibly with different logic), you may implement them as the fields of your Aggregator subclass and implement the abstract functions to update them properly.
In a distributed system, in order to send a main-memory object to another machine, one needs to define how to map the object into its serial representation (and vice versa). The main-memory object is first serialized to a binary stream, which is sent to the target machine; after receiving the binary stream, the receiving machine deserializes the stream to obtain the main-memory object.
In Java-based systems such as Hadoop and Giraph, this is achieved by the Writable interface. Any type of object that needs to be sent through the network should implement the Writable interface. For example, if an integer needs to be transmitted though the network, one should use IntWritable instead of simply using int.
In Pregel+, we are able to decouple the (de)serialization operation from the object type, thanks to C++'s support of operator overloading. Specifically, one may use any data type in their code, and if the data needs to be transmitted over the network, one needs to define the (de)serialization function for its type by overloading the operators << and >>. For example, if we have a type (C/C++ basic data type, struct, or class) T that should be serializable, we define the following two functions:
obinstream& operator>>(obinstream& m, T& data)
ibinstream& operator<<(ibinstream& m, const T& data)
Here, the usage is similar to cout<< and cin>>, except that we are writing binary streams instead of text streams. We were careless during the design and the name obinstream and ibinstream are reversed from their meaning, but we leave them as they are, since revising them involves updating a lot of places throughout the system and application codes. The first function specifies how to serialize data to the input stream m, which is then sent to the target machine. The second function specifies how to deserialize the received output stream m to obtain the object data. After users define the (de)serialization operations, the system knows how to convert T-typed object to/from the serial data streams.
For basic C/C++ data types such as int, double and std::string, users can directly call m<<data and m>>data since they are already defined in utils/serialization.h. Similarly, utils/serialization.h also defines (de)serialization functions for STL containers such as std::vector<T> and __gnu_cxx::hash_map<K, V>. As long as the template argument(s) are serializable, the container type is serializable. For example, one can directly call m<<data for data of type std::vector<int> since int is serializable.
In most cases, the (de)serialization functions are called by the system rather than users. However, one may need to define the (de)serialization functions for user-defined types, where the predefined (de)serialization functions can be used in a recursive manner. For example, suppose that we have a type:
struct my_type
{
int state;
vector<int> out_neighbors;
};
We may define the (de)serialization functions of my_type using the predefined (de)serialization functions of its components as follows:
obinstream& operator>>(obinstream& m, my_type& data)
{
m >> state;
m >> out_neighbors;
return m;
}
ibinstream& operator<<(ibinstream& m, const my_type& data)
{
m << state;
m << out_neighbors;
return m;
}
We now use class Vertex<I, V, M> in basic/Vertex.h to illustrate what types should be serializable. If the (de)serialization functions are not defined for those types, the application code cannot pass compilation. Firstly, each vertex should be serializable, since after vertices are loaded in parallel by different workers from HDFS, they need to be exchanged though the network, so that finally each worker w only holds those vertices v with hash(v) = w before graph computation. In our system, the user-defined Vertex subclass is serializable, as long as all its template arguments <I>, <V>, <M> are serializable, which should be guaranteed by users. Secondly, since the computation is done by message passing, the message type <M> should be serializable (of course, it is already required by the first requirement just mentioned). Thirdly, if aggregator is used, a partially aggregated value of type PartialT should be serializable since it is sent from a worker to the master though the network; similarly, the final aggregated value of type FinalT should be serializable, since it is broadcast back from the master to all workers though the network.
All the communication operations are handled by the system itself, and application programmers may safely skip this part. For those who would like to change or extend the system code, we provide more details here.
Besides the basic mode, Pregel+ also supports the vertex-mirroring mode which is designed to cope with high degree vertices. The motivation of vertex mirroring is as follows: suppose our cluster contains 100 workers, and consider a vertex v with degree 1M that wants to send its state a(v) to all its neighbors. In the basic mode, 1M messages are sent through the network. Another option is to construct a mirror of v in each worker that contains v's neighbors beforehand, and v only sends a(v) to all its mirrors, which then forward it to the local neighbors. At most 100 messages are sent in this case, which not only significantly reduces the number of messages, but also prevents the worker that v resides in from becoming a performance bottleneck.
However, a vertex that gets mirrored cannot enjoy the benefit of message combining, since it is not aware of the target vertex (but rather, its mirrors). Therefore, we should only construct mirrors for high degree vertices. To set the mirroring threshold, one should call the following function, which is defined in utils/ghost.
set_ghost_threshold(int degree_threshold)
Instead of using the Vertex and Worker classes in the basic package, one should use the GVertex and GWorker classes in the ghost package to enabling vertex mirroring. Moreover, instead of calling send_message(tgt, msg) to send a message to each target vertex tgt, a vertex v subclassing GVertex can only call broadcast(msg) in its compute() function to broadcast msg to its out-neighbors. Click here to check the detailed programming interface.
In many Pregel algorithms, a vertex v may need to request another vertex u for its state a(u). Using the basic mode, v needs to send itself as a requester to u, and in the next superstep, u receives the request and send a(u) back to v. Therefore, v obtain a(u) in the third superstep.
However, the above process requires three supersteps and cumbersome coding. We provide the RVertex and RWorker classes in the reqresp package to facilitate the request-respond process, while all APIs of the basic mode remain valid. Specifically, the RVertex class provides two additional functions:
request(tgt)
RespondT get_respond(tgt)
Vertex v may now simply call request(u) to send request to u, and call get_respond(u) to get a(u) in the next superstep. However, users need to implement on more abstract function (besides compute()) to indicate what a vertex should respond (e.g. a(u)) once receiving a request:
RespondT respond()