libfaster API Documentation  Development Version
Super fast distributted computing
fastContext.cpp
1 #include <chrono>
2 #include <fstream>
3 
4 #include "worker.h"
5 #include "fastContext.h"
6 #include "misc.h"
7 
8 faster::fastContext::fastContext( int argc, char ** argv): fastContext(fastSettings(), argc, argv){
9 }
10 
11 // Create a context with local as master
12 faster::fastContext::fastContext(const fastSettings & s, int argc, char ** argv){
13 
14  settings = new fastSettings(s);
15  comm = new fastComm(argc, argv );
16  scheduler = new fastScheduler( comm->numProcs, & funcName);
17  numFDDs = 0;
18  //numTasks = 0;
19 
20 }
21 
22 
24  // Tell workers to go home!
25  //std::cerr << " S:FINISH! ";
26  if(comm->isDriver()){
27  comm->sendFinish();
28 
29  // Delete FDDs TODO - Find out how to do this!!!!
30  //std::cerr << " DESTROY fastContext\n";
31  for ( size_t i = 0; i < fddList.size(); i++){
32  //std::cerr << i << " - "<< fddList[i]->getId() << " ";
33  if (fddInternal[i]){
34  //std::cerr << " DELETING\n";
35  delete fddList[i];
36  }
37  }
38 
39  // Clean process
40  fddList.clear();
41  }
42  //taskList.clear();
43  delete comm;
44  delete settings;
45  delete scheduler;
46 }
47 
49  funcTable.insert(funcTable.end(), funcP);
50  funcName.insert(funcName.end(), "");
51 }
52 
53 void faster::fastContext::registerFunction(void * funcP, std::string name){
54  funcTable.insert(funcTable.end(), funcP);
55  funcName.insert(funcName.end(), name);
56 }
57 
59  // Create a Worker context and exit after finished
60  if ( ! comm->isDriver() ){
61  // Start worker role
62  worker worker(comm, funcTable.data(), globalTable);
63 
64  worker.run();
65 
66  //auto id = comm->getProcId();
67  // Clean process
68  //delete comm;
69  //comm = NULL;
70  //delete settings;
71  //settings = NULL;
72  //exit(0);
73  }// */
74 
75 }
77  return comm->isDriver();
78 }
79 
81  using std::chrono::system_clock;
82  using std::chrono::duration_cast;
83  using std::chrono::milliseconds;
84 
85  unsigned long int rid;
86  unsigned long int sid = 0;
87  size_t size;
88  std::vector<size_t> time(comm->numProcs, 0);
89 
90  auto start = system_clock::now();
91  unsigned long int tid = enqueueTask(OP_Calibrate, 0, 1000);
92 
93  for (int i = 1; i < comm->numProcs; ++i){
94  size_t t = 0;
95  procstat stat;
96  void * result UNUSED = comm->recvTaskResult(rid, sid, size, t, stat);
97  time[sid] = t;
98  scheduler->taskProgress(tid, sid, t, stat);
99  }
100  comm->waitForReq(comm->numProcs - 1);
101 
102  auto duration = duration_cast<milliseconds>(system_clock::now() - start);
103  scheduler->taskFinished(tid, duration.count());
104 
105  scheduler->setCalibration(time);
106 }
107 
108 int faster::fastContext::findFunc(void * funcP){
109  //std::cerr << " Find Function " << funcP ;
110  for( size_t i = 0; i < funcTable.size(); ++i){
111  if (funcTable[i] == funcP)
112  return i;
113  }
114  return -1;
115 }
116 
117 
118 unsigned long int faster::fastContext::_createFDD(fddBase * ref, fddType type, const std::vector<size_t> * dataAlloc){
119 
120  //std::cerr << " Create FDD\n";
121  for (int i = 1; i < comm->numProcs; ++i){
122  if (dataAlloc){
123  //std::cerr << " S:CreateFdd ID:" << numFDDs << " T:" << type << " S:" << (*dataAlloc)[i] ;
124  comm->sendCreateFDD(numFDDs, type, (*dataAlloc)[i], i);
125  //std::cerr << ".\n";
126  }else{
127  //std::cerr << " S:CreateFdd ID:" << numFDDs << " T:" << type << " S: ?";
128  comm->sendCreateFDD(numFDDs, type, 0, i);
129  //std::cerr << ".\n";
130  }
131 
132  }
133  fddList.push_back(ref);
134  fddInternal.push_back(false);
135  comm->waitForReq(comm->numProcs - 1);
136  //std::cerr << " Done\n";
137 
138  return numFDDs++;
139 }
140 
141 unsigned long int faster::fastContext::_createIFDD(fddBase * ref, fddType kType, fddType tType, const std::vector<size_t> * dataAlloc){
142 
143  //std::cerr << " Create IFDD\n";
144  for (int i = 1; i < comm->numProcs; ++i){
145  //std::cerr << " S:CreateIFdd ID:" << numFDDs << " K:" << kType << " T:" << tType << " S:" << dataPerProc <<'\n';
146  if (dataAlloc != NULL){
147  //std::cerr << " S:CreateIFdd ID:" << numFDDs << " K:" << kType << " T:" << tType << " S:" << (*dataAlloc)[i];
148  comm->sendCreateIFDD(numFDDs, kType, tType, (*dataAlloc)[i], i);
149  //std::cerr << ".\n";
150  }else{
151  //std::cerr << " S:CreateIFdd ID:" << numFDDs << " K:" << kType << " T:" << tType << " S:? ";
152  comm->sendCreateIFDD(numFDDs, kType, tType, 0, i);
153  //std::cerr << ".\n";
154  }
155  }
156  fddList.push_back(ref);
157  fddInternal.push_back(false);
158  comm->waitForReq(comm->numProcs - 1);
159  //std::cerr << " Done\n";
160 
161  return numFDDs++;
162 }
163 
164 // TODO CHANGE THIS!
165 unsigned long int faster::fastContext::createFDD(fddBase * ref, size_t typeCode, const std::vector<size_t> & dataAlloc){
166  return _createFDD(ref, decodeType(typeCode), &dataAlloc);
167 }
168 unsigned long int faster::fastContext::createFDD(fddBase * ref, size_t typeCode){
169  return _createFDD(ref, decodeType(typeCode), NULL);
170 }
171 unsigned long int faster::fastContext::createPFDD(fddBase * ref, size_t typeCode, const std::vector<size_t> & dataAlloc){
172  return _createFDD(ref, POINTER | decodeType(typeCode), &dataAlloc);
173 }
174 unsigned long int faster::fastContext::createPFDD(fddBase * ref, size_t typeCode){
175  return _createFDD(ref, POINTER | decodeType(typeCode), NULL);
176 }
177 unsigned long int faster::fastContext::createIFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode, const std::vector<size_t> & dataAlloc){
178  return _createIFDD(ref, decodeType(kTypeCode), decodeType(tTypeCode), &dataAlloc);
179 }
180 unsigned long int faster::fastContext::createIFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode){
181  return _createIFDD(ref, decodeType(kTypeCode), decodeType(tTypeCode), NULL);
182 }
183 unsigned long int faster::fastContext::createIPFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode, const std::vector<size_t> & dataAlloc){
184  return _createIFDD(ref, decodeType(kTypeCode), POINTER | decodeType(tTypeCode), &dataAlloc);
185 }
186 unsigned long int faster::fastContext::createIPFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode){
187  return _createIFDD(ref, decodeType(kTypeCode), POINTER | decodeType(tTypeCode), NULL);
188 }
189 
190 unsigned long int faster::fastContext::createFddGroup(fddBase * ref, std::vector<fddBase*> & fddV){
191  std::vector<unsigned long int> members(fddV.size());
192  std::vector<fddType> dataTypeV(fddV.size());
193 
194  fddType kType = fddV[0]->kType();
195 
196  for( size_t i = 0; i < fddV.size(); ++i){
197  members[i] = fddV[i]->getId();
198  }
199 
200  comm->sendCreateFDDGroup( numFDDs, kType, members);
201 
202  //std::cerr << " S:CreateFddGroup ID:" << numFDDs << '\n';
203 
204  fddList.push_back(ref);
205  fddInternal.push_back(false);
206  return numFDDs++;
207 }
208 
209 
210 
211 // Propagate FDD destruction to other machines
212 size_t findFileSize(const char* filename)
213 {
214  std::ifstream in(filename, std::ifstream::in | std::ifstream::binary);
215  in.seekg(0, std::ifstream::end);
216  return in.tellg();
217 }
218 
219 unsigned long int faster::fastContext::readFDD(fddBase * ref, const char * fileName){
220  //send read fdd n. numFdds from file fileName
221  size_t fileSize = findFileSize(fileName);
222  size_t offset = 0;
223  std::vector<size_t> dataAlloc = getAllocation(fileSize);
224 
225  for (int i = 1; i < comm->numProcs; ++i){
226  //size_t dataPerProc = fileSize / (comm->numProcs - 1);
227  //int rem = fileSize % (comm->numProcs -1);
228  //if (i <= rem)
229  //dataPerProc += 1;
230  //comm->sendReadFDDFile(ref->getId(), std::string(fileName), dataPerProc, offset, i);
231  //offset += dataPerProc;
232  comm->sendReadFDDFile(ref->getId(), std::string(fileName), dataAlloc[i], offset, i);
233  offset += dataAlloc[i];
234  }
235 
236  //std::cerr << " S:ReadFdd";
237  comm->waitForReq(comm->numProcs - 1);
238  //std::cerr << '\n';
239 
240  return numFDDs++;
241 }
242 
243 void faster::fastContext::getFDDInfo(size_t & s, std::vector<size_t> & dataAlloc){
244  s = 0;
245  dataAlloc = std::vector<size_t>(comm->numProcs,0);
246 
247  for (int i = 1; i < comm->numProcs; ++i){
248  size_t size;
249  int src;
250 
251  //std::cerr << " R:GetFDDInfo ";
252  comm->recvFDDInfo(size, src);
253  dataAlloc[src] = size;
254 
255  //std::cerr << "S:" << size << "\n";
256  s += size;
257  }
258 }
259 
260 void faster::fastContext::writeToFile(unsigned long int id,std::string & path, std::string & sufix){
261  unsigned long int tid, sid;
262  system_clock::time_point start;
263 
264  comm->sendWriteFDDFile(id, path, sufix);
265 
266  for ( int i = 1; i < comm->numProcs; i++ ){
267  size_t s;
268  size_t time;
269  procstat stat;
270  comm->recvTaskResult(tid, sid, s, time, stat);
271  }
272 }
273 
274 unsigned long int faster::fastContext::enqueueTask(fddOpType opT, unsigned long int idSrc, unsigned long int idRes, int funcId, size_t size){
275  fastTask * newTask = scheduler->enqueueTask(opT, idSrc, idRes, funcId, size, globalTable);
276 
277  //if(scheduler->dataMigrationNeeded()){
278  //comm->migrateData(scheduler->getDataMigrationInfo());
279  // scheduler->getDataMigrationInfo();
280  //}
281 
282  // TODO do this later on a shceduler?
283  comm->sendTask(*newTask);
284  //std::cerr << " S:Task ID:" << newTask->id << " FDD:" << idSrc << " F:" << funcId << '\n';
285 
286  return newTask->id;
287 }
288 unsigned long int faster::fastContext::enqueueTask(fddOpType opT, unsigned long int id, size_t size){
289  return enqueueTask(opT, id, 0, -1, size);
290 }
291 
292 std::vector< std::pair<void *, size_t> > faster::fastContext::recvTaskResult(unsigned long int &tid, unsigned long int &sid, system_clock::time_point & start){
293  using std::chrono::duration_cast;
294  using std::chrono::milliseconds;
295 
296  std::vector< std::pair<void*, size_t> > result ( comm->numProcs );
297  //std::cerr << " R:TaskResult \n";
298 
299  //start = system_clock::now();
300  for ( int i = 1; i < comm->numProcs; i++ ){
301  size_t time;
302  size_t size;
303  procstat stat;
304 
305  void * r = comm->recvTaskResult(tid, sid, size, time, stat);
306  result[sid].first = r;
307  result[sid].second = size;
308  //std::cerr << " P: " << sid << "\n";
309 
310  scheduler->taskProgress(tid, sid, time, stat);
311  }
312  comm->waitForReq(comm->numProcs - 1);
313 
314  auto duration = duration_cast<milliseconds>(system_clock::now() - start);
315  scheduler->taskFinished(tid, duration.count());
316 
317 
318  //taskList[id]->workersFinished++;
319 
320  return result;
321 }
322 
323 std::vector<size_t> faster::fastContext::getAllocation(size_t size){
324  return scheduler->getAllocation(size);
325 }
326 
327 void faster::fastContext::discardFDD(unsigned long int id){
328  comm->sendDiscardFDD(id);
329 }
330 
332  scheduler->updateTaskInfo();
333 }
335  scheduler->printHeader();
336 }
338  //comm->getHostnames();
339  scheduler->printTaskInfo();
340 }
341 
342 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41
void startWorkers()
Start worker machines computation.
Definition: fastContext.cpp:58
void printInfo()
Prints task execution information for all tasks executed by the user.
void registerFunction(void *funcP)
Register a user custom function in the context.
Definition: fastContext.cpp:48
int getId()
Returns the identification number of the dataset.
Definition: fddBase.h:27
void updateInfo()
Prints information from tesk ran since last faster::fastContext::updateInfo() called.
bool isDriver()
Checks for the driver process.
Definition: fastContext.cpp:76
Framework context class.
Definition: fastContext.h:66
~fastContext()
fastContext destructor
Definition: fastContext.cpp:23
void calibrate()
Performs a microbenchmark to do dynamic load balancing (UNUSED)
Definition: fastContext.cpp:80
unsigned int fddType
Dataset type.
Definition: definitions.h:16
Context Configuration Class.
Definition: fastContext.h:38
void printHeader()
Prints a header for task execution information.
fastContext(int argc=0, char **argv=NULL)
fastContext default constructor
Definition: fastContext.cpp:8