libfaster API Documentation  Development Version
Super fast distributted computing
worker.cpp
1 #include <string>
2 #include <fstream>
3 #include <iostream>
4 #include <chrono>
5 
6 #include "fastComm.h"
7 #include "fastCommBuffer.h"
8 #include "workerFdd.h"
9 #include "worker.h"
10 
11 faster::worker::worker(fastComm * c, void ** ft, std::vector< std::tuple<void*, size_t, int> > & globalTable){
12  //std::cerr << " Starting Worker " << c->getProcId() << '\n';
13  funcTable = ft;
14  comm = c;
15  finished = false;
16  fddList.reserve(1024);
17  this->globalTable = &globalTable;
18 }
19 
20 faster::worker::~worker(){
21 }
22 
23 
24 void faster::worker::discardFDD(unsigned long int id){
25  delete fddList[id];
26  fddList[id] = NULL;
27 }
28 
29 void faster::worker::setFDDData(unsigned long int id, void * data, size_t size){
30  workerFddBase * fdd = fddList[id];
31 
32  if (fdd == NULL) { std::cerr << "\nERROR: Could not find FDD!"; exit(201); }
33 
34  fdd->setDataRaw( data, size );
35 }
36 void faster::worker::setFDDIData(unsigned long int id, void * keys, void * data, size_t size){
37  workerFddBase * fdd = fddList[id];
38 
39  if (fdd == NULL) { std::cerr << "\nERROR: Could not find FDD!"; exit(201); }
40 
41  fdd->setDataRaw( keys, data, size );
42 }
43 
44 void faster::worker::setFDDData(unsigned long int id, void * data, size_t * lineSizes, size_t size){
45  workerFddBase * fdd = fddList[id];
46 
47  if (fdd == NULL) { std::cerr << "\nERROR: Could not find FDD!"; exit(201); }
48 
49  fdd->setDataRaw( data, lineSizes, size );
50 }
51 
52 void faster::worker::setFDDIData(unsigned long int id, void * keys, void * data, size_t * lineSizes, size_t size){
53  workerFddBase * fdd = fddList[id];
54 
55  if (fdd == NULL) { std::cerr << "\nERROR: Could not find FDD!"; exit(201); }
56 
57  fdd->setDataRaw( keys, data, lineSizes, size );
58 }
59 
60 /*void faster::worker::getFDDData(unsigned long int id, void *& data, size_t &size){
61  workerFddBase * fdd = fddList[id];
62 
63  if (fdd == NULL) { std::cerr << "\nERROR: Could not find FDD!"; exit(201); }
64 
65  data = fdd->getData();
66  size = fdd->getSize();
67 }*/
68 
69 
70 void faster::worker::calibrate(){
71  using std::chrono::system_clock;
72  using std::chrono::duration_cast;
73  using std::chrono::milliseconds;
74 
75  fastCommBuffer &buffer = comm->getResultBuffer();
76  const int TESTVECSIZE = 1000000;
77  char ret = 0;
78 
79  buffer.reset();
80  buffer << size_t(0);
81 
82  auto start = system_clock::now();
83  auto end = system_clock::now();
84 
85  #pragma omp parallel
86  {
87  std::vector<double> v(TESTVECSIZE, 0);
88  double a = 0;
89 
90 
91  #pragma omp master
92  start = system_clock::now();
93 
94  #pragma omp barrier
95 
96  #pragma omp for schedule(static,100)
97  for ( size_t i = 0; i < 4000000; ++i){
98  for ( size_t j = 1; j < 5; ++j){
99  a += i + j;
100  a -= i - j;
101  a *= i * j;
102  a /= 1 + i / j;
103  }
104  a += v[ size_t( i * 20011 ) % TESTVECSIZE ];
105  v [ size_t( i * 4099 ) % TESTVECSIZE ] = a;
106  }
107 
108  #pragma omp master
109  end = system_clock::now();
110  }
111  auto duration = duration_cast<milliseconds>(end - start);
112 
113  buffer << size_t(duration.count());
114  buffer << getProcStat();
115  buffer << size_t(1);
116  buffer << ret;
117 
118  comm->sendTaskResult();
119 }
120 
121 void faster::worker::writeFDDFile(unsigned long int id, std::string &path, std::string &sufix){
122  using std::chrono::system_clock;
123  using std::chrono::duration_cast;
124  using std::chrono::milliseconds;
125 
126  char ret = 0;
127  fastCommBuffer &buffer = comm->getResultBuffer();
128  workerFddBase * src = fddList[id];
129 
130  buffer.reset();
131  buffer << size_t(0);
132 
133  auto start = system_clock::now();
134  auto end = system_clock::now();
135 
136  src->writeToFile(&path, comm->getProcId(), &sufix);
137 
138  auto duration = duration_cast<milliseconds>(end - start);
139 
140  buffer << size_t(duration.count());
141  buffer << getProcStat();
142  buffer << size_t(1);
143  buffer << ret;
144 
145  comm->sendTaskResult();
146 }
147 
148 void resizeVector(std::tuple<void *, size_t, int> & var UNUSED){
149 }
150 
151 void faster::worker::updateGlobals(fastTask &task){
152  for ( size_t i = 0; i < task.globals.size(); i++){
153  int type = std::get<2>(task.globals[i]);
154  size_t s = std::get<1>(task.globals[i]);
155 
156  if (type & VECTOR){
157  resizeVector((*globalTable)[i]);
158  }
159  if (type & POINTER){
160  char ** v = (char **) std::get<0>((*globalTable)[i]);
161  if (*v == NULL)
162  (*v) = new char[s];
163  std::memcpy(
164  *v,
165  std::get<0>(task.globals[i]),
166  s
167  );
168  }else{
169  std::memcpy(
170  std::get<0>((*globalTable)[i]),
171  std::get<0>(task.globals[i]),
172  s
173  );
174  }
175 
176  delete [] (char*) std::get<0>(task.globals[i]);
177  }
178 }
179 
180 void faster::worker::solve(fastTask &task){
181  updateGlobals(task);
182 
183  if (task.operationType == OP_Calibrate){
184  calibrate();
185  return;
186  }
187 
188  workerFddBase * src = fddList[task.srcFDD];
189  workerFddBase * dest = NULL;
190 
191  if (src == NULL) { std::cerr << "\nERROR: Could not find FDD " << task.srcFDD << " !"; exit(201); }
192 
193  if ( task.operationType & OP_GENERICMAP)
194  dest = fddList[task.destFDD];
195 
196  if ( task.functionId != -1 )
197  src->preapply(task.id, funcTable[task.functionId], task.operationType, dest, comm);
198  else
199  src->preapply(task.id, NULL, task.operationType, dest, comm);
200 }
201 
202 void faster::worker::collect(unsigned long int id){
203  workerFddBase * fdd = fddList[id];
204 
205  if (fdd == NULL) { std::cerr << "\nERROR: Could not find FDD " << id << " !"; exit(202); }
206 
207  fdd->collect(comm);
208 }