#include "basic/pregel-dev.h" #include "triple.h" using namespace std; //splitter: \t //input/output line format: vid subject nb1 nb1_predicate nb2 nb2_predicate ... (for a literal field, nb is the value field) //for output line, we first output all non-sink neighbors, followed by all sink neighbors struct O2IValue { string subject; vector nb; vector nb_type;//is_literal = true vector elabel; vector nb_id; //------ vector in_nb; vector in_elabel; }; ibinstream & operator<<(ibinstream & m, const O2IValue & v) { m<>(obinstream & m, O2IValue & v) { m>>v.subject; m>>v.nb; m>>v.nb_type; m>>v.elabel; m>>v.nb_id; return m; } struct int_string { int id; string label; }; ibinstream & operator<<(ibinstream & m, const int_string & v) { m<>(obinstream & m, int_string & v) { m>>v.id; m>>v.label; return m; } //==================================== class O2IVertex_pregel:public Vertex { public: void broadcast() { vector& nb_type = value().nb_type; vector& elabel = value().elabel; vector& nb_id = value().nb_id; for(int i=0; i& in_nb = value().in_nb; vector& in_elabel = value().in_elabel; //collect in-neighbors for(int i=0; i& nb_type = value().nb_type; vector& elabel = value().elabel; vector& nb = value().nb; //---- vector elabel2; vector nb2; for(int i=0; i { char* buf; public: O2IWorker_pregel() { buf=new char[TRIPLE_STRBUF_SIZE]; } ~O2IWorker_pregel() { delete buf; } //C version virtual O2IVertex_pregel* toVertex(char* line) { //vid subject nb1 nb1_predicate nb2 nb2_predicate ... (for a literal field, nb is the value field) O2IVertex_pregel* v=new O2IVertex_pregel; statement o; const char* l=line; o.skipWhiteSpace(l); v->id = o.readNum(l); o.skipWhiteSpace(l); v->value().subject = o.readResource(l); o.skipWhiteSpace(l); while(!o.eol(l)) { char c = o.nextChar(l); if(isdigit(c)) { int nb = o.readNum(l); v->value().nb_id.push_back(nb); v->value().nb.push_back("");//dummy v->value().nb_type.push_back(false); o.skipWhiteSpace(l); } else { string object = o.readNode(l); v->value().nb_id.push_back(0);//dummy v->value().nb.push_back(object); v->value().nb_type.push_back(true); o.skipWhiteSpace(l); } string predicate = o.readResource(l); v->value().elabel.push_back(predicate); o.skipWhiteSpace(l); } return v; } virtual void toline(O2IVertex_pregel* v, BufferedWriter & writer) { //vid subject nb1 nb1_predicate nb2 nb2_predicate ... //first output all non-sink neighbors, followed by all sink neighbors sprintf(buf, "%d\t%s",v->id, v->value().subject.c_str()); writer.write(buf); vector& in_nb = v->value().in_nb; vector& in_elabel = v->value().in_elabel; for(int i = 0; i < in_nb.size(); i++) { sprintf(buf, "\t%d\t%s", in_nb[i], in_elabel[i].c_str()); writer.write(buf); } vector& elabel = v->value().elabel; vector& nb = v->value().nb; for(int i = 0; i < nb.size(); i++) { sprintf(buf, "\t%s\t%s", nb[i].c_str(), elabel[i].c_str()); writer.write(buf); } writer.write("\n"); } }; void pregel_O2I(string in_path, string out_path) { WorkerParams param; param.input_path=in_path; param.output_path=out_path; param.force_write=true; param.native_dispatcher=false; O2IWorker_pregel worker; worker.run(param); } int main(int argc, char* argv[]){ init_workers(); pregel_O2I("/dblp_int", "/dblp_in"); worker_finalize(); return 0; }