libfaster API Documentation  Development Version
Super fast distributted computing
pagerank-bulk2.cpp
1 #include <iostream>
2 #include <algorithm>
3 #include "libfaster.h"
4 
5 using namespace std;
6 using namespace faster;
7 
8 const double dumpingFactor = 0.85;
9 
10 using std::chrono::system_clock;
11 using std::chrono::duration_cast;
12 using std::chrono::milliseconds;
13 
14 size_t numNodes = 0;
15 
16 pair<int,vector<int>> toAList(string & input){
17 
18  stringstream ss(input);
19  vector<int> edges;
20  int key;
21  int edge;
22 
23 
24  int numTokens = 0;
25  for ( size_t i = 0; i < input.size(); ++i ){
26  if (input[i] == ' '){
27  numTokens++;
28  }
29  }
30  if (numTokens > 2){
31  edges.reserve(numTokens-1);
32  }// */
33 
34  ss >> key;
35 
36  while(ss >> edge){
37  edges.insert(edges.end(), edge);
38  }
39 
40  return make_pair(key, std::move(edges));
41 }// */
42 
43 pair<int, double> createPR(const int & key, vector<int> & s UNUSED){
44  return make_pair(key, 1.0 / numNodes);
45 }
46 
47 pair<int, double> createErrors(const int & key, vector<int> & s UNUSED){
48  return make_pair(key, 1);
49 }
50 
51 deque<pair<int, double>> givePageRank(int * keys, void * adjListP, size_t numPresNodes, int * prKeys, void * prP, size_t nPR UNUSED, int * eKeys UNUSED, void * errorsP, size_t nErrors UNUSED){
52  auto adjList = (vector<int>*) adjListP;
53  auto pr = (double*) prP;
54  auto errors = (double*) errorsP;
55  deque<pair<int,double>> msgList;
56  vector<double> givePR(numNodes,0);
57 
58  vector<size_t> prLocation(numNodes+1);
59  vector<bool> present(numNodes+1, false);
60  vector<double> newPR(numPresNodes, (1 - dumpingFactor) / numNodes);
61 
62 
63  if ( (numPresNodes != nPR) || ( nPR != nErrors ) ) {
64  cerr << "Internal unknown error!!!!";
65  exit(11);
66  }
67 
68  #pragma omp parallel for
69  for ( size_t i = 0; i < numPresNodes; ++i){
70  present[keys[i]] = true;
71  prLocation[prKeys[i]] = i;
72  }
73 
74  #pragma omp parallel for schedule(dynamic, 200)
75  //#pragma omp parallel for
76  for ( size_t i = 0; i < numPresNodes; ++i){
77  double contrib = dumpingFactor * pr[prLocation[keys[i]]] / adjList[i].size();
78  for ( size_t j = 0; j < adjList[i].size(); ++j){
79  auto target = adjList[i][j];
80  /*if (target == 1){
81  cerr << "\033[0;36m" << keys[i] << " PR:" << pr[prLocation[keys[i]]] << " D:(" << adjList[i].size() << ") S1 : "<< contrib ;
82  if ( present[target] ){
83  cerr << "[LOCAL]";
84  }
85  cerr << "\033[0m\n";
86  }// */
87 
88  if ( present[target] ){
89  auto l = prLocation[target];
90  #pragma omp atomic
91  newPR[ l ] += contrib;
92  }else{
93  #pragma omp atomic
94  givePR[target] += contrib;
95  }
96  }
97  }
98  // Generate messages
99  for ( size_t i = 0; i < numNodes; ++i){
100  if ( givePR[i] != 0 ){
101  auto p = make_pair(i, givePR[i]);
102 
103  msgList.push_back(p);
104  }
105  }
106 
107  // Calculate Page Rank Error
108  #pragma omp parallel for
109  for ( size_t i = 0; i < numPresNodes; ++i){
110  errors[i] = newPR[i] - pr[i];
111  pr[i] = newPR[i];
112  //if ( prKeys[i] == 1 )
113  //cerr << "\033[0;36m" << " TPR:" << pr[i] << " D:(" << adjList[i].size() << ") S1 : " << newPR[i] << "\033[0m\n";
114  }
115 
116  return msgList;
117 }
118 
119 void getNewPR(int * prKeys, void * prVP, size_t npr, int * contKeys, void * contribVP, size_t numContribs, int * eKeys UNUSED, void * errorVP, size_t nErrors UNUSED){
120  double * pr = (double*) prVP;
121  double * contrib = (double*) contribVP;
122  double * error = (double*) errorVP;
123 
124  vector<double> newPr(npr, 0);
125  vector<size_t> prLocation(numNodes+1);
126 
127  if ( npr != nErrors ) {
128  cerr << "Internal unknown error!!!!!!";
129  exit(11);
130  }
131 
132  #pragma omp parallel for
133  for ( size_t i = 0; i < npr; ++i ){
134  prLocation[prKeys[i]] = i;
135  //eLocation[eKeys[i]] = i;
136  }
137  #pragma omp parallel for
138  for ( size_t i = 0; i < numContribs; ++i){
139  auto target = contKeys[i];
140  size_t targetPrLoc = prLocation[target];
141  double cont = contrib[i];
142 
143  //#pragma omp atomic
144  newPr[targetPrLoc] += cont;
145  //if ( target == 1 )
146  //cerr << "[\033[0;36m1 R:" << cont << "\033[0m] \n";
147  }
148 
149  #pragma omp parallel for
150  for ( size_t i = 0; i < npr; ++i ){
151  // Output PR error
152  error[i] = abs(error[i] + newPr[i]);
153  pr[i] += newPr[i];
154 
155  }
156 }
157 
158 pair<int,vector<int>> maxNodeId( const int & ka, vector<int> & a, const int & kb, vector<int> & b){
159  vector<int> vout;
160  int m = max(ka, kb);
161 
162  for(auto it = a.begin(); it != a.end(); it++){
163  m = max(m, *it);
164  }
165  for(auto it = b.begin(); it != b.end(); it++){
166  m = max(m, *it);
167  }
168 
169  return make_pair(m, vout);
170 }
171 
172 pair<int,double> maxError(const int & ka, double & a, const int & kb, double & b){
173  if (a > b)
174  return make_pair(ka,a);
175  else
176  return make_pair(kb,b);
177 }
178 
179 
180 
181 int main(int argc, char ** argv){
182  // Init Faster Framework
183 
184  auto start = system_clock::now();
185 
186  fastContext fc(argc, argv);
187  fc.registerFunction((void*) &toAList, "toAList");
188  fc.registerFunction((void*) &createPR, "createPR");
189  fc.registerFunction((void*) &createErrors, "createErrors");
190  fc.registerFunction((void*) &givePageRank, "givePageRank");
191  fc.registerFunction((void*) &getNewPR, "getNewPR");
192  fc.registerFunction((void*) &maxNodeId, "maxNodeId");
193  fc.registerFunction((void*) &maxError, "maxError");
194  fc.registerGlobal(&numNodes);
195  fc.startWorkers();
196  if (!fc.isDriver())
197  return 0;
198  cerr << "------------ PageRank -------------\n";
199 
200  fc.printHeader();
201  cerr << " Init Time: " << duration_cast<milliseconds>(system_clock::now() - start).count() << "ms\n";
202 
203  if ( (argc > 2) && (argv[2][0] == '1') ){
204  cerr << "Calibrate Performance\n";
205  fc.calibrate();
206  fc.updateInfo();
207  }
208 
209  cerr << "Import Data" << '\n';
210  auto start2 = system_clock::now();
211  auto data = new fdd<string>(fc, argv[1]);
212  cerr << " Read Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() << "ms\n";
213 
214  cerr << "Convert to Adjacency List\n";
215  auto structure = data->map<int, vector<int>>(&toAList);
216  cerr << "GBK\n";
217  structure->groupByKey()->cache();
218  fc.updateInfo();
219  cerr << "REDUCE\n";
220 
221  numNodes = structure->reduce(&maxNodeId).first + 1;
222  fc.updateInfo();
223  cerr << numNodes << " node Graph" << '\n';
224 
225  cerr << "Init Pagerank" << '\n';
226  auto pr = structure->map<int, double>(&createPR)->cache();
227  auto errors = structure->map<int, double>(&createErrors)->cache();
228  auto iterationData = structure->cogroup(pr, errors)->cache();
229  fc.updateInfo();
230  double error = 1;
231 
232  cerr << "Process Data" << '\n';
233  int i = 0;
234  while( i < 10 ){
235  cerr << "\033[1;32mIteration " << i++ << "\033[0m\n";
236  auto start2 = system_clock::now();
237  auto contribs = iterationData->bulkFlatMap(&givePageRank);
238  fc.updateInfo();
239  cerr << contribs->getSize() << " (" << contribs->getSize() << ") messages. \n";
240 
241  pr->cogroup(contribs, errors)->bulkUpdate(&getNewPR);
242  error = errors->reduce(&maxError).second;
243  fc.updateInfo();
244  cerr << " Error " << error << " time:" << duration_cast<milliseconds>(system_clock::now() - start2).count() << "ms\n";
245 
246  //auto p = pr->collect();
247  //sort(p.begin(), p.end());
248  //for ( auto i = 0; i < 10 ; i++ ){
249  //fprintf(stderr, "\033[0;32m%d:%.8lf\033[0m\n", p[i].first, p[i].second);
250  //}
251  }
252  start2 = system_clock::now();
253 
254  pr->writeToFile(std::string("/tmp/pr"), std::string(".txt"));
255 
256  auto duration = duration_cast<milliseconds>(system_clock::now() - start).count();
257  cerr << " Write Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() << "ms\n";
258  cerr << "PageRank in " << structure->getSize() << " node graph in "<< i << " iterations! In " << duration << "ms (error: " << error << ") \n";
259 
260  //cerr << "\033[0;31mPRESS ENTER TO EXIT\033[0m\n";
261  //cin.get();
262 
263  return 0;
264 }
void startWorkers()
Start worker machines computation.
Definition: fastContext.cpp:58
void registerGlobal(T *varP)
Gegisters a primitive global varible to be used inside used defined functions in distributted environ...
Definition: fastContext.h:353
STL namespace.
void registerFunction(void *funcP)
Register a user custom function in the context.
Definition: fastContext.cpp:48
void updateInfo()
Prints information from tesk ran since last faster::fastContext::updateInfo() called.
bool isDriver()
Checks for the driver process.
Definition: fastContext.cpp:76
Framework context class.
Definition: fastContext.h:66
size_t getSize()
Returns the size of the dataset.
Definition: fddBase.h:24
fdd< U > * map(mapFunctionP< T, U > funcP)
creates a fdd<U>
Definition: fdd.h:164
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
Definition: fastContext.h:24
void calibrate()
Performs a microbenchmark to do dynamic load balancing (UNUSED)
Definition: fastContext.cpp:80
libfaster main namespace
Definition: _workerFdd.h:11
void printHeader()
Prints a header for task execution information.