5 #define NUMITEMS 100*1000 10 const double dumpingFactor = 0.85;
12 using std::chrono::system_clock;
13 using std::chrono::duration_cast;
14 using std::chrono::milliseconds;
18 pair<int,vector<int>> toAList(
string & input){
20 stringstream ss(input);
27 for (
size_t i = 0; i < input.size(); ++i ){
33 edges.reserve(numTokens-1);
39 edges.insert(edges.end(), edge);
42 return make_pair(key, std::move(edges));
45 pair<int, double> createPR(
const int & key, vector<int> & s UNUSED){
46 return make_pair(key, 1.0 / numNodes);
49 deque<pair<int, double>> givePageRank(
const int & key UNUSED, vector<void *> & sl, vector<void *> & prl){
50 auto & s = * (vector<int>*) * sl.begin();
51 auto & pr = * (
double*) * prl.begin();
52 deque<pair<int,double>> msgList;
53 double contrib = dumpingFactor * pr / s.size();
56 for (
size_t i = 0; i < s.size(); ++i){
59 msgList.push_back(make_pair(s[i], contrib));
64 pair<int, double> combine(
const int & key, vector<double *> & prl){
71 for (
auto it = prl.begin(); it != prl.end(); it++){
78 double getNewPR(
const int & key UNUSED, vector<void *> & prL, vector<void *> & contribL){
80 double & pr = * (
double*) * prL.begin();
86 for(
auto it = contribL.begin(); it != contribL.end(); it++){
87 double contrib = *(
double*) *it;
93 pr = sum + ( (1.0 - dumpingFactor) / numNodes );
97 return abs(oldPR - pr);
100 pair<int,vector<int>> maxNodeId(
const int & ka, vector<int> & a,
const int & kb, vector<int> & b){
104 for(
auto it = a.begin(); it != a.end(); it++){
107 for(
auto it = b.begin(); it != b.end(); it++){
111 return make_pair(m, vout);
113 double maxError(
double & a,
double & b){
119 int main(
int argc,
char ** argv){
122 auto start = system_clock::now();
136 cerr <<
"------------ PageRank -------------";
139 cerr <<
" Init Time: " << duration_cast<milliseconds>(system_clock::now() - start).count() <<
"ms\n";
141 if ( (argc > 2) && (argv[2][0] ==
'1') ){
142 cerr <<
"Calibrate Performance\n";
148 cerr <<
"Import Data\n";
149 auto start2 = system_clock::now();
151 cerr <<
" Read Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() <<
"ms\n";
153 cerr <<
"Convert to Adjacency List\n";
154 auto structure = data->
map<int, vector<int>>(&toAList)->groupByKey()->cache();
158 numNodes = structure->reduce(&maxNodeId).first + 1;
160 cerr << numNodes <<
" node Graph" <<
'\n';
162 cerr <<
"Init Pagerank\n";
163 auto pr = structure->map<int,
double>(&createPR)->cache();
164 auto iterationData = structure->cogroup(pr)->cache();
168 cerr <<
"Process Data\n";
172 cerr <<
"\033[1;32mIteration " << i++ <<
"\033[0m\n" ;
173 start2 = system_clock::now();
174 auto contribs = iterationData->flatMapByKey(&givePageRank);
177 auto combContribs = contribs->mapByKey(&combine);
180 cerr <<
" " << contribs->
getSize() <<
" (" << combContribs->getSize() <<
") messages.\n";
182 error = pr->cogroup(combContribs)->mapByKey(&getNewPR)->reduce(&maxError);
184 cerr <<
" Error " << error <<
" time:" << duration_cast<milliseconds>(system_clock::now() - start2).count() <<
"ms\n";
192 start2 = system_clock::now();
194 pr->writeToFile(std::string(
"/tmp/pr"), std::string(
".txt"));
196 auto duration = duration_cast<milliseconds>(system_clock::now() - start).count();
197 cerr <<
" Write Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() <<
"ms\n";
198 cerr <<
"PageRank in " << structure->getSize() <<
" node graph in "<< i <<
" iterations! In " << duration <<
"ms (error: " << error <<
") \n";
void startWorkers()
Start worker machines computation.
void registerGlobal(T *varP)
Gegisters a primitive global varible to be used inside used defined functions in distributted environ...
void registerFunction(void *funcP)
Register a user custom function in the context.
void updateInfo()
Prints information from tesk ran since last faster::fastContext::updateInfo() called.
bool isDriver()
Checks for the driver process.
size_t getSize()
Returns the size of the dataset.
fdd< U > * map(mapFunctionP< T, U > funcP)
creates a fdd<U>
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
void calibrate()
Performs a microbenchmark to do dynamic load balancing (UNUSED)
void printHeader()
Prints a header for task execution information.