libfaster API Documentation  Development Version
Super fast distributted computing
pagerank.cpp
1 #include <iostream>
2 #include <algorithm>
3 #include "libfaster.h"
4 
5 #define NUMITEMS 100*1000
6 
7 using namespace std;
8 using namespace faster;
9 
10 const double dumpingFactor = 0.85;
11 
12 using std::chrono::system_clock;
13 using std::chrono::duration_cast;
14 using std::chrono::milliseconds;
15 
16 size_t numNodes = 0;
17 
18 pair<int,vector<int>> toAList(string & input){
19 
20  stringstream ss(input);
21  vector<int> edges;
22  int key;
23  int edge;
24 
25 
26  int numTokens = 0;
27  for ( size_t i = 0; i < input.size(); ++i ){
28  if (input[i] == ' '){
29  numTokens++;
30  }
31  }
32  if (numTokens > 2){
33  edges.reserve(numTokens-1);
34  }// */
35 
36  ss >> key;
37 
38  while(ss >> edge){
39  edges.insert(edges.end(), edge);
40  }
41 
42  return make_pair(key, std::move(edges));
43 }// */
44 
45 pair<int, double> createPR(const int & key, vector<int> & s UNUSED){
46  return make_pair(key, 1.0 / numNodes);
47 }
48 
49 deque<pair<int, double>> givePageRank(const int & key UNUSED, vector<void *> & sl, vector<void *> & prl){
50  auto & s = * (vector<int>*) * sl.begin();
51  auto & pr = * (double*) * prl.begin();
52  deque<pair<int,double>> msgList;
53  double contrib = dumpingFactor * pr / s.size();
54 
55 
56  for ( size_t i = 0; i < s.size(); ++i){
57  //if (s[i] == 1)
58  //cerr << "\033[0;36m" << key << " PR:" << pr << " D:(" << s.size()<< ") S1 : "<< contrib << "\033[0m\n";
59  msgList.push_back(make_pair(s[i], contrib));
60  }
61 
62  return msgList;
63 }
64 pair<int, double> combine(const int & key, vector<double *> & prl){
65  pair<int,double> r;
66 
67  r.first = key;
68  r.second = 0;
69 
70  //cerr << key << ":" << prl.size() << " ";
71  for ( auto it = prl.begin(); it != prl.end(); it++){
72  r.second += **it;
73  }
74 
75  return r;
76 }
77 
78 double getNewPR(const int & key UNUSED, vector<void *> & prL, vector<void *> & contribL){
79  //cerr << key << " ";
80  double & pr = * (double*) * prL.begin();
81  double oldPR = pr;
82  double sum = 0;
83 
84  //if ( key == 1 )
85  //cerr << "\n[\033[0;36m1 R:\033[0m";
86  for( auto it = contribL.begin(); it != contribL.end(); it++){
87  double contrib = *(double*) *it;
88  sum += contrib;
89  //if ( key == 1 )
90  //cerr<< "\033[0;36m" << contrib << "\033[0m ";
91  }
92  //pr = (1 - dumpingFactor) + dumpingFactor * sum;
93  pr = sum + ( (1.0 - dumpingFactor) / numNodes );
94  //if ( key == 1 )
95  //cerr << "]\n";
96 
97  return abs(oldPR - pr);
98 }
99 
100 pair<int,vector<int>> maxNodeId( const int & ka, vector<int> & a, const int & kb, vector<int> & b){
101  vector<int> vout;
102  int m = max(ka, kb);
103 
104  for(auto it = a.begin(); it != a.end(); it++){
105  m = max(m, *it);
106  }
107  for(auto it = b.begin(); it != b.end(); it++){
108  m = max(m, *it);
109  }
110 
111  return make_pair(m, vout);
112 }
113 double maxError( double & a, double & b){
114  return max(a,b);
115 }
116 
117 
118 
119 int main(int argc, char ** argv){
120  // Init Faster Framework
121 
122  auto start = system_clock::now();
123 
124  fastContext fc(argc, argv);
125  fc.registerFunction((void*) &toAList, "toAList");
126  fc.registerFunction((void*) &createPR, "createPR");
127  fc.registerFunction((void*) &givePageRank, "givePageRank");
128  fc.registerFunction((void*) &combine, "combine");
129  fc.registerFunction((void*) &getNewPR, "getNewPR");
130  fc.registerFunction((void*) &maxNodeId, "maxNodeId");
131  fc.registerFunction((void*) &maxError, "maxError");
132  fc.registerGlobal(&numNodes);
133  fc.startWorkers();
134  if (!fc.isDriver())
135  return 0;
136  cerr << "------------ PageRank -------------";
137 
138  fc.printHeader();
139  cerr << " Init Time: " << duration_cast<milliseconds>(system_clock::now() - start).count() << "ms\n";
140 
141  if ( (argc > 2) && (argv[2][0] == '1') ){
142  cerr << "Calibrate Performance\n";
143  fc.calibrate();
144  fc.updateInfo();
145  }
146 
147 
148  cerr << "Import Data\n";
149  auto start2 = system_clock::now();
150  auto data = new fdd<string>(fc, argv[1]);
151  cerr << " Read Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() << "ms\n";
152 
153  cerr << "Convert to Adjacency List\n";
154  auto structure = data->map<int, vector<int>>(&toAList)->groupByKey()->cache();
155  fc.updateInfo();
156 
157  //numNodes = structure->getSize();
158  numNodes = structure->reduce(&maxNodeId).first + 1;
159  fc.updateInfo();
160  cerr << numNodes << " node Graph" << '\n';
161 
162  cerr << "Init Pagerank\n";
163  auto pr = structure->map<int, double>(&createPR)->cache();
164  auto iterationData = structure->cogroup(pr)->cache();
165  double error = 1000;
166  fc.updateInfo();
167 
168  cerr << "Process Data\n";
169  int i = 0;
170  //while( (error >= 1E-4) && (i < 10)){
171  while( i < 10 ){
172  cerr << "\033[1;32mIteration " << i++ << "\033[0m\n" ;
173  start2 = system_clock::now();
174  auto contribs = iterationData->flatMapByKey(&givePageRank);
175  fc.updateInfo();
176 
177  auto combContribs = contribs->mapByKey(&combine);
178  fc.updateInfo();
179 
180  cerr << " " << contribs->getSize() << " (" << combContribs->getSize() << ") messages.\n";
181 
182  error = pr->cogroup(combContribs)->mapByKey(&getNewPR)->reduce(&maxError);
183  fc.updateInfo();
184  cerr << " Error " << error << " time:" << duration_cast<milliseconds>(system_clock::now() - start2).count() << "ms\n";
185 
186  //auto p = pr->collect();
187  //sort(p.begin(), p.end());
188  //for ( auto i = 0; i < 10 ; i++ ){
189  //fprintf(stderr, "\033[0;32m%d:%.8lf\033[0m\n", p[i].first, p[i].second);
190  //}
191  }
192  start2 = system_clock::now();
193 
194  pr->writeToFile(std::string("/tmp/pr"), std::string(".txt"));
195 
196  auto duration = duration_cast<milliseconds>(system_clock::now() - start).count();
197  cerr << " Write Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() << "ms\n";
198  cerr << "PageRank in " << structure->getSize() << " node graph in "<< i << " iterations! In " << duration << "ms (error: " << error << ") \n";
199 
200  //cerr << "PRESS ENTER TO CONTINUE\n";
201  //cin.get();
202  return 0;
203 }
void startWorkers()
Start worker machines computation.
Definition: fastContext.cpp:58
void registerGlobal(T *varP)
Gegisters a primitive global varible to be used inside used defined functions in distributted environ...
Definition: fastContext.h:353
STL namespace.
void registerFunction(void *funcP)
Register a user custom function in the context.
Definition: fastContext.cpp:48
void updateInfo()
Prints information from tesk ran since last faster::fastContext::updateInfo() called.
bool isDriver()
Checks for the driver process.
Definition: fastContext.cpp:76
Framework context class.
Definition: fastContext.h:66
size_t getSize()
Returns the size of the dataset.
Definition: fddBase.h:24
fdd< U > * map(mapFunctionP< T, U > funcP)
creates a fdd<U>
Definition: fdd.h:164
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
Definition: fastContext.h:24
void calibrate()
Performs a microbenchmark to do dynamic load balancing (UNUSED)
Definition: fastContext.cpp:80
libfaster main namespace
Definition: _workerFdd.h:11
void printHeader()
Prints a header for task execution information.