//hub-acc querying for undirected graph #include "ol/pregel-ol-dev.h" #include "utils/type.h" #define DEBUG_MODE//comment it out when running experiments //input line format: (from hubacc_ug_merge.cpp) //vid \t in_H==false num nb1 nb2 ... h_num h1 h2 ... //vid \t in_H==true num nb1 nb2 ... dstHVid1 hop1 dstHVid2 hop2 ... //output line format: src dst \t hop_dist //logic: //superstep 1: //- s sends all entry_vertices v a msg dist(s, v) to activate them; //- t sends entry list L(t) to aggregator //superstep 2: //- v in L(s) gets L(t) from aggregator, compute dist(s, v)+dist(v, u)+dist(u, t) for all u in L(t); //- v sends the min dist to aggregator //special case: //if s is high-deg, look up dist(s, t) from L(s) //- if found, t is also high-deg, return dist(s, t) and terminate; (technically, s sets its qvalue() accordingly for correct dumping) //- otherwise, s does not send anything in superstep 1, and get L(t) to compute upperbound //(t send (t, dist_tt=0) to agg if t is in H, since s might not be in H) string in_path = "/ol_merged"; string out_path = "/ol_out"; bool use_combiner = true; //-------------------------------------------------- //Step 1: define static field of vertex: adj-list struct SPQueryNQValue { vector nbs; void * list; bool in_H; void init(bool is_inH) { in_H = is_inH; if (in_H) list = new hash_map; else list=new vector; } vector* get_entry_list() const { return (vector*)list; } hash_map* get_hub_table() const { return (hash_map*)list; } ~SPQueryNQValue() { if(in_H) delete (hash_map*)list; else delete (vector*)list; } }; ibinstream & operator<<(ibinstream & m, const SPQueryNQValue & v) { m << v.nbs; m << v.in_H; //must do this first, later parts decides on this if (v.in_H) m << *(v.get_hub_table()); else m << *(v.get_entry_list()); return m; } obinstream & operator>>(obinstream & m, SPQueryNQValue & v) { m >> v.nbs; m >> v.in_H; //must do this first, later parts decides on this v.init(v.in_H); //allocate space first !!! if (v.in_H) m >> *(v.get_hub_table()); else m >> *(v.get_entry_list()); return m; } //-------------------------------------------------- //Step 2: define query type: here, it is intpair (src, dst) //-------------------------------------------------- //Step 3: define query-specific vertex state: intpair (hop_from_src, hop_to_dst) int not_reached = -1; //-------------------------------------------------- //Step 4: define msg type: here, it is char char fwd_msg = 1; //01 char back_msg = 2; //10 char met_msg = 3; //11 //-------------------------------------------------- //Step 5: define vertex class struct SPQueryAggField { int min; vector hubgate; int hop; //bound }; ibinstream & operator<<(ibinstream & m, const SPQueryAggField & v) { m << v.min; m << v.hubgate; m << v.hop; return m; } obinstream & operator>>(obinstream & m, SPQueryAggField & v) { m >> v.min; m >> v.hubgate; m >> v.hop; return m; } class SPQueryVertex: public VertexOL { public: //Step 5.1: define UDF1: query -> vertex's query-specific init state virtual intpair init_value(intpair& query) { intpair pair(not_reached, not_reached); if (id == query.v1) pair.v1 = 0; if (id == query.v2) pair.v2 = 0; return pair; } //Step 5.2: vertex.compute virtual void compute(MessageContainer& messages) { if (superstep() == 1) //only s and t are active { intpair query = get_query(); if (query.v1 == query.v2) { forceTerminate(); } else { if (id == get_query().v1) { if (!nqvalue().in_H) { #ifdef DEBUG_MODE//@@@@@@@@@@@@@@@ cout << "s not in H: s -> v in L(s)" << endl; //@@@@@@@@@@@@@@@ #endif//@@@@@@@@@@@@@@@ //s --dist(s, v)--> L(s) vector* list = nqvalue().get_entry_list(); for (vector::iterator it = list->begin(); it != list->end(); it++) { int entry = it->v1; int dist = it->v2; send_message(entry, (char) dist); } } else //src is in H, do not send to neighbors { hash_map & L_s=*(nqvalue().get_hub_table()); hash_map::iterator it=L_s.find(get_query().v2); if(it != L_s.end()) { //special case: t is in H int dist_st=it->second; #ifdef DEBUG_MODE//@@@@@@@@@@@@@@@ cout<<"s and T in H: s -> agg with "< agg, which is done in aggregator } //s and t cannot vote to halt yet } else if(superstep()==2) //s, t and all v in L(s) are active { vector & nbs=nqvalue().nbs; if(id==get_query().v1) { //src if(!nqvalue().in_H)//high-deg vertex does not propagate msgs { //forward broadcast for(int i=0; i & L_s=*(nqvalue().get_hub_table()); hash_map::iterator it=L_s.find(get_query().v2); if(it == L_s.end()) { //special case: t is not in H vector & L_t=((SPQueryAggField*)get_agg())->hubgate; int min=INT_MAX; for(int i=0; i* list = v->nqvalue().get_entry_list(); field.hubgate = *list; } } } else if (SPQueryVertex::superstep() == 2) { vector & nbs = v->nqvalue().nbs; int size = nbs.size(); if (size > 0 && nbs[size - 1] < 0) //v is in L(s) { int cur = -nbs[size - 1] - 1; nbs.resize(size - 1); //remove last item if (field.hop > cur) field.hop = cur; } } else { int dist1 = v->qvalue().v1; int dist2 = v->qvalue().v2; if (dist1 != -1 && dist2 != -1) //both reachable { int dist = dist1 + dist2; if (dist < field.min) field.min = dist; } } } virtual void stepFinal(SPQueryAggField* part) { if (SPQueryVertex::superstep() == 1) { if (field.hop > part->hop) field.hop = part->hop; //this is necessary for the case when s and t are all in H if (part->hubgate.size() > 0) field.hubgate = part->hubgate; } else if (SPQueryVertex::superstep() == 2) { if (field.hop > part->hop) field.hop = part->hop; } else { if ((*part).min < field.min) field.min = (*part).min; } } virtual SPQueryAggField* finishPartial() { return &field; } virtual SPQueryAggField* finishFinal() { #ifdef DEBUG_MODE//@@@@@@@@@@@@@@@ cout << "bound = " << field.hop << ", min = " << field.min << endl; //@@@@@@@@@@@@@@@ #endif//@@@@@@@@@@@@@@@ return &field; } }; //-------------------------------------------------- //Step 7: define worker class class SPQueryWorkerOL: public WorkerOL_auto { public: char buf[50]; SPQueryWorkerOL() : WorkerOL_auto(true) { } //Step 7.1: UDF: line -> vertex virtual SPQueryVertex* toVertex(char* line) { char * pch; SPQueryVertex* v = new SPQueryVertex(); pch = strtok(line, "\t"); v->id = atoi(pch); pch = strtok(NULL, " "); int in_H = atoi(pch); v->nqvalue().init(in_H); pch = strtok(NULL, " "); int num = atoi(pch); for (int i = 0; i < num; i++) { pch = strtok(NULL, " "); int nb = atoi(pch); v->nqvalue().nbs.push_back(nb); } if (in_H) { //reads in: dstHVid1 hop1 dstHVid2 hop2 ... hash_map* table=v->nqvalue().get_hub_table(); while((pch=strtok(NULL, " "))!=NULL) { int dst=atoi(pch); pch=strtok(NULL, " "); int hop=atoi(pch); (*table)[dst]=hop; } } else { //reads in: h_num h1 h2 ... vector* list=v->nqvalue().get_entry_list(); pch=strtok(NULL, " "); int hnum=atoi(pch); for(int i=0; ipush_back(pair); } } return v; } //Step 7.2: UDF: query string -> query (src_id) virtual intpair toQuery(char* line) { char * pch; pch = strtok(line, " "); int src = atoi(pch); pch = strtok(NULL, " "); int dst = atoi(pch); return intpair(src, dst); } //Step 7.3: UDF: vertex init virtual void init(VertexContainer& vertex_vec) { int src = get_query().v1; int pos = get_vpos(src); if (pos != -1) activate(pos); //------ int dst = get_query().v2; pos = get_vpos(dst); if (pos != -1) activate(pos); } //Step 7.4: UDF: task_dump virtual void dump(SPQueryVertex* vertex, BufferedWriter& writer) { //one entry for each meeting vertex, but dist may be the upperbound (which may be output many times) if (vertex->id == get_query().v1) //only let one vertex (here is src) output smallest hop-dist { if (get_query().v1 == get_query().v2) { sprintf(buf, "%d %d\0\n", get_query().v1, get_query().v2); writer.write(buf); } else { int cand1 = (get_agg())->min; int cand2 = (get_agg())->hop; if (cand1 > cand2) cand1 = cand2; if (cand1 == INT_MAX) sprintf(buf, "%d %d\tnot reachable\n", get_query().v1, get_query().v2); else sprintf(buf, "%d %d\t%d\n", get_query().v1, get_query().v2, cand1); writer.write(buf); } } } }; class SPQueryCombiner: public Combiner { //combiner won't take effect in superstep 1 (all tgts are different) public: virtual void combine(char & old, const char & new_msg) { old |= new_msg; } }; int main(int argc, char* argv[]) { WorkerParams param; param.input_path = in_path; param.output_path = out_path; param.force_write = true; param.native_dispatcher = false; SPQueryWorkerOL worker; SPQueryCombiner combiner; if (use_combiner) worker.setCombiner(&combiner); worker.run(param); return 0; }