8 const double dumpingFactor = 0.85;
10 using std::chrono::system_clock;
11 using std::chrono::duration_cast;
12 using std::chrono::milliseconds;
16 pair<int,vector<int>> toAList(
string & input){
18 stringstream ss(input);
25 for (
size_t i = 0; i < input.size(); ++i ){
31 edges.reserve(numTokens-1);
37 edges.insert(edges.end(), edge);
40 return make_pair(key, std::move(edges));
43 pair<int, double> createPR(
const int & key, vector<int> & s UNUSED){
44 return make_pair(key, 1.0 / numNodes);
47 pair<int, double> createErrors(
const int & key, vector<int> & s UNUSED){
48 return make_pair(key, 1);
51 deque<pair<int, double>> givePageRank(
int * keys,
void * adjListP,
size_t numPresNodes,
int * prKeys,
void * prP,
size_t nPR UNUSED,
int * eKeys UNUSED,
void * errorsP,
size_t nErrors UNUSED){
52 auto adjList = (vector<int>*) adjListP;
53 auto pr = (
double*) prP;
54 auto errors = (
double*) errorsP;
55 deque<pair<int,double>> msgList;
56 vector<double> givePR(numNodes,0);
58 vector<size_t> prLocation(numNodes+1);
59 vector<bool> present(numNodes+1,
false);
60 vector<double> newPR(numPresNodes, (1 - dumpingFactor) / numNodes);
63 if ( (numPresNodes != nPR) || ( nPR != nErrors ) ) {
64 cerr <<
"Internal unknown error!!!!";
68 #pragma omp parallel for 69 for (
size_t i = 0; i < numPresNodes; ++i){
70 present[keys[i]] =
true;
71 prLocation[prKeys[i]] = i;
74 #pragma omp parallel for schedule(dynamic, 200) 76 for (
size_t i = 0; i < numPresNodes; ++i){
77 double contrib = dumpingFactor * pr[prLocation[keys[i]]] / adjList[i].size();
78 for (
size_t j = 0; j < adjList[i].size(); ++j){
79 auto target = adjList[i][j];
88 if ( present[target] ){
89 auto l = prLocation[target];
91 newPR[ l ] += contrib;
94 givePR[target] += contrib;
99 for (
size_t i = 0; i < numNodes; ++i){
100 if ( givePR[i] != 0 ){
101 auto p = make_pair(i, givePR[i]);
103 msgList.push_back(p);
108 #pragma omp parallel for 109 for (
size_t i = 0; i < numPresNodes; ++i){
110 errors[i] = newPR[i] - pr[i];
119 void getNewPR(
int * prKeys,
void * prVP,
size_t npr,
int * contKeys,
void * contribVP,
size_t numContribs,
int * eKeys UNUSED,
void * errorVP,
size_t nErrors UNUSED){
120 double * pr = (
double*) prVP;
121 double * contrib = (
double*) contribVP;
122 double * error = (
double*) errorVP;
124 vector<double> newPr(npr, 0);
125 vector<size_t> prLocation(numNodes+1);
127 if ( npr != nErrors ) {
128 cerr <<
"Internal unknown error!!!!!!";
132 #pragma omp parallel for 133 for (
size_t i = 0; i < npr; ++i ){
134 prLocation[prKeys[i]] = i;
137 #pragma omp parallel for 138 for (
size_t i = 0; i < numContribs; ++i){
139 auto target = contKeys[i];
140 size_t targetPrLoc = prLocation[target];
141 double cont = contrib[i];
144 newPr[targetPrLoc] += cont;
149 #pragma omp parallel for 150 for (
size_t i = 0; i < npr; ++i ){
152 error[i] = abs(error[i] + newPr[i]);
158 pair<int,vector<int>> maxNodeId(
const int & ka, vector<int> & a,
const int & kb, vector<int> & b){
162 for(
auto it = a.begin(); it != a.end(); it++){
165 for(
auto it = b.begin(); it != b.end(); it++){
169 return make_pair(m, vout);
172 pair<int,double> maxError(
const int & ka,
double & a,
const int & kb,
double & b){
174 return make_pair(ka,a);
176 return make_pair(kb,b);
181 int main(
int argc,
char ** argv){
184 auto start = system_clock::now();
198 cerr <<
"------------ PageRank -------------\n";
201 cerr <<
" Init Time: " << duration_cast<milliseconds>(system_clock::now() - start).count() <<
"ms\n";
203 if ( (argc > 2) && (argv[2][0] ==
'1') ){
204 cerr <<
"Calibrate Performance\n";
209 cerr <<
"Import Data" <<
'\n';
210 auto start2 = system_clock::now();
212 cerr <<
" Read Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() <<
"ms\n";
214 cerr <<
"Convert to Adjacency List\n";
215 auto structure = data->
map<int, vector<int>>(&toAList);
217 structure->groupByKey()->cache();
221 numNodes = structure->reduce(&maxNodeId).first + 1;
223 cerr << numNodes <<
" node Graph" <<
'\n';
225 cerr <<
"Init Pagerank" <<
'\n';
226 auto pr = structure->
map<int,
double>(&createPR)->cache();
227 auto errors = structure->map<int,
double>(&createErrors)->cache();
228 auto iterationData = structure->cogroup(pr, errors)->cache();
232 cerr <<
"Process Data" <<
'\n';
235 cerr <<
"\033[1;32mIteration " << i++ <<
"\033[0m\n";
236 auto start2 = system_clock::now();
237 auto contribs = iterationData->bulkFlatMap(&givePageRank);
239 cerr << contribs->
getSize() <<
" (" << contribs->getSize() <<
") messages. \n";
241 pr->cogroup(contribs, errors)->bulkUpdate(&getNewPR);
242 error = errors->reduce(&maxError).second;
244 cerr <<
" Error " << error <<
" time:" << duration_cast<milliseconds>(system_clock::now() - start2).count() <<
"ms\n";
252 start2 = system_clock::now();
254 pr->writeToFile(std::string(
"/tmp/pr"), std::string(
".txt"));
256 auto duration = duration_cast<milliseconds>(system_clock::now() - start).count();
257 cerr <<
" Write Time: " << duration_cast<milliseconds>(system_clock::now() - start2).count() <<
"ms\n";
258 cerr <<
"PageRank in " << structure->getSize() <<
" node graph in "<< i <<
" iterations! In " << duration <<
"ms (error: " << error <<
") \n";
void startWorkers()
Start worker machines computation.
void registerGlobal(T *varP)
Gegisters a primitive global varible to be used inside used defined functions in distributted environ...
void registerFunction(void *funcP)
Register a user custom function in the context.
void updateInfo()
Prints information from tesk ran since last faster::fastContext::updateInfo() called.
bool isDriver()
Checks for the driver process.
size_t getSize()
Returns the size of the dataset.
fdd< U > * map(mapFunctionP< T, U > funcP)
creates a fdd<U>
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
void calibrate()
Performs a microbenchmark to do dynamic load balancing (UNUSED)
void printHeader()
Prints a header for task execution information.