libfaster API Documentation  Development Version
Super fast distributted computing
fastComm.cpp
1 #include <numeric>
2 #include <algorithm>
3 
4 #include "fastComm.h"
5 #include "fastTask.h"
6 
7 bool faster::fastComm::isDriver(){
8  return (procId == 0);
9 }
10 
11 void exitCallback(){
12  int mpi_finalized;
13 
14  MPI_Finalized(&mpi_finalized);
15  //std::cerr<< "FINALIZING MPI";
16  if (!mpi_finalized){
17  //std::cerr<< "!!!!!!!";
18  MPI_Finalize ();
19  }
20 }
21 
22 faster::fastComm::fastComm(int & argc, char ** argv){
23  int mpi_initialized;
24 
25  MPI_Initialized(&mpi_initialized);
26  if (!mpi_initialized){
27  std::atexit(exitCallback);
28  //std::cerr<< "INITIALIZING MPI!!!!!!!";
29  MPI_Init (0, NULL);
30  }
31 
32  MPI_Comm_size (MPI_COMM_WORLD, &numProcs);
33  MPI_Comm_rank (MPI_COMM_WORLD, &procId);
34  std::vector<int> slaveIDs(numProcs-1);
35  iota(slaveIDs.begin(), slaveIDs.end(), 1);
36 
37  MPI_Group origGroup;
38  MPI_Comm_group(MPI_COMM_WORLD, &origGroup);
39  MPI_Group_incl(origGroup, numProcs-1, slaveIDs.data(), &slaveGroup);
40  MPI_Comm_create(MPI_COMM_WORLD, slaveGroup, &slaveComm);
41 
42  timeStart = MPI_Wtime();
43 
44  status = new MPI_Status [numProcs];
45  req = new MPI_Request [numProcs];
46  req2 = new MPI_Request [numProcs];
47  req3 = new MPI_Request [numProcs];
48  req4 = new MPI_Request [numProcs];
49  buffer = new fastCommBuffer [numProcs];
50  bufferRecv = new fastCommBuffer [std::max(3, numProcs)];
51  bufferBusy.resize(numProcs);
52 }
53 
54 faster::fastComm::~fastComm(){
55  timeEnd = MPI_Wtime();
56 
57 
58  delete [] status;
59  delete [] req;
60  delete [] req2;
61  delete [] req3;
62  delete [] req4;
63  delete [] buffer;
64  delete [] bufferRecv;
65 }
66 
67 void faster::fastComm::probeMsgs(int & tag, int & src){
68  MPI_Status stat;
69  MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &stat);
70  tag = stat.MPI_TAG;
71  src = stat.MPI_SOURCE;
72 }
73 
74 void faster::fastComm::waitForReq(int numReqs){
75  MPI_Waitall(numReqs, req, status);
76 }
77 
78 void faster::fastComm::joinAll(){
79  MPI_Barrier(MPI_COMM_WORLD);
80 }
81 
82 void faster::fastComm::joinSlaves(){
83  MPI_Barrier(slaveComm);
84 }
85 
86 void faster::fastComm::sendTask(fastTask &task){
87  buffer[0].reset();
88  buffer[0].grow(48 + 8*task.globals.size() );
89 
90  buffer[0] << task.id << task.operationType << task.srcFDD << task.destFDD << task.functionId;
91  buffer[0] << size_t(task.globals.size());
92 
93  for ( size_t i = 0; i < task.globals.size(); i++){
94  size_t s = std::get<1>(task.globals[i]);
95  int type = std::get<2>(task.globals[i]);
96  //std::cerr << "SEND Glob.: "<< i <<"T:" << type << " S:" << s << "\n";
97  buffer[0] << type;
98  buffer[0] << s;
99  buffer[0].write(std::get<0>(task.globals[i]), s);
100  }
101 
102  for (int i = 1; i < numProcs; ++i){
103  MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_TASK, MPI_COMM_WORLD, &req[i-1]);
104  }
105  //MPI_Waitall( numProcs - 1, req, status);
106  MPI_Waitall( numProcs - 1, req, status);
107 }
108 
109 void faster::fastComm::recvTask(fastTask & task){
110  MPI_Status stat;
111  size_t numGlobals = 0;
112 
113  bufferRecv[0].reset();
114 
115  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_TASK, MPI_COMM_WORLD, &stat);
116 
117  bufferRecv[0] >> task.id >> task.operationType >> task.srcFDD >> task.destFDD >> task.functionId;
118 
119  bufferRecv[0] >> numGlobals;
120  for ( size_t i = 0; i < numGlobals; i++){
121  size_t varSize = 0;
122  char * var;
123  int varType;
124 
125  bufferRecv[0] >> varType;
126  bufferRecv[0] >> varSize;
127 
128  //std::cerr << "RGlob.: T:" << varType << " S:" << varSize << "\n";
129 
130  var = new char[varSize]();
131  bufferRecv[0].read(var, varSize);
132 
133  task.globals.insert(task.globals.end(), std::make_tuple(var, varSize, varType));
134  }
135 }
136 
137 //void faster::fastComm::sendTaskResult(unsigned long int id, void * res, size_t size, double time){
138 void faster::fastComm::sendTaskResult(){
139 
140  //std::cerr << " Result Buffer Size: " << resultBuffer.size() << "\n";
141  MPI_Send(resultBuffer.data(), resultBuffer.size() , MPI_BYTE, 0, MSG_TASKRESULT, MPI_COMM_WORLD);
142 }
143 
144 void * faster::fastComm::recvTaskResult(unsigned long int & id, unsigned long int & sid, size_t & size, size_t & time, procstat & stat){
145  int rsize;
146 
147  MPI_Probe(MPI_ANY_SOURCE, MSG_TASKRESULT, MPI_COMM_WORLD, status);
148  MPI_Get_count(status, MPI_BYTE, &rsize);
149  sid = status->MPI_SOURCE;
150  //std::cerr << " Recv Msg Size: " << rsize << " from " << sid << "\n";
151 
152  bufferRecv[sid].grow(rsize);
153  bufferRecv[sid].reset();
154 
155  MPI_Irecv(bufferRecv[sid].data(), bufferRecv[sid].free(), MPI_BYTE, sid, MSG_TASKRESULT, MPI_COMM_WORLD, &req[sid-1]);
156 
157  bufferRecv[sid] >> id >> time >> stat >> size;
158 
159  return bufferRecv[sid].pos();
160 }
161 
162 void faster::fastComm::sendCreateFDD(unsigned long int id, fddType type, size_t size, int dest){
163  //char typeC = encodeFDDType(type);
164 
165  buffer[dest].reset();
166 
167  //buffer[dest] << id << typeC << size;
168  buffer[dest] << id << type << size;
169 
170  //std::cerr << '(' << id << ' ' << (int) typeC << ":" << buffer[dest].size() << ')';
171 
172  MPI_Isend(buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_CREATEFDD, MPI_COMM_WORLD, &req[dest-1]);
173 }
174 
175 void faster::fastComm::recvCreateFDD(unsigned long int &id, fddType &type, size_t &size){
176  //char t;
177  MPI_Status stat;
178 
179  bufferRecv[0].reset();
180 
181  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_CREATEFDD, MPI_COMM_WORLD, &stat);
182 
183  //bufferRecv[0] >> id >> t >> size;
184  //type = decodeFDDType(t);
185  bufferRecv[0] >> id >> type >> size;
186 
187  //std::cerr << '(' << id << ' ' << (int) t << ":" << buffer[dest].size() << ')';
188 }
189 
190 void faster::fastComm::sendCreateIFDD(unsigned long int id, fddType kType, fddType tType, size_t size, int dest){
191  //char typeC = encodeFDDType(type);
192 
193  buffer[dest].reset();
194  buffer[dest].grow(32);
195 
196  //buffer[dest] << id << typeC << size;
197  buffer[dest] << id << kType << tType << size;
198 
199  //std::cerr << '(' << id << ' ' << (int) typeC << ":" << buffer[dest].size() << ')';
200 
201  MPI_Isend(buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_CREATEIFDD, MPI_COMM_WORLD, &req[dest-1]);
202 }
203 
204 void faster::fastComm::recvCreateIFDD(unsigned long int &id, fddType &kType, fddType &tType, size_t &size){
205  //char t;
206  MPI_Status stat;
207 
208  bufferRecv[0].reset();
209  bufferRecv[0].grow(32);
210 
211  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_CREATEIFDD, MPI_COMM_WORLD, &stat);
212 
213  //bufferRecv[0] >> id >> t >> size;
214  //type = decodeFDDType(t);
215  bufferRecv[0] >> id >> kType >> tType >> size;
216 
217  //std::cerr << '(' << id << ' ' << (int) t << ":" << buffer[dest].size() << ')';
218 }
219 
220 void faster::fastComm::sendCreateFDDGroup(unsigned long int id, fddType keyType, std::vector<unsigned long int> & idV){
221  for (int dest = 1; dest < numProcs; ++dest){
222  buffer[dest].reset();
223 
224  buffer[dest] << id << keyType << size_t(idV.size());
225 
226  for ( size_t i = 0; i < idV.size(); ++i){
227  buffer[dest] << idV[i];
228  }
229 
230  MPI_Isend(buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_CREATEGFDD, MPI_COMM_WORLD, &req[dest-1]);
231  }
232  MPI_Waitall( numProcs - 1, req, status);
233 }
234 void faster::fastComm::recvCreateFDDGroup(unsigned long int & id, fddType & keyType, std::vector<unsigned long int> & idV){
235  size_t numMembers;
236  MPI_Status stat;
237  bufferRecv[0].reset();
238 
239  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_CREATEGFDD, MPI_COMM_WORLD, &stat);
240 
241  bufferRecv[0] >> id >> keyType >> numMembers;
242 
243  idV.resize(numMembers);
244 
245  for ( size_t i = 0; i < numMembers; ++i){
246  bufferRecv[0] >> idV[i];
247  }
248 }
249 
250 void faster::fastComm::sendDiscardFDD(unsigned long int id){
251  for (int i = 1; i < numProcs; ++i){
252  MPI_Isend( &id, sizeof(long unsigned int), MPI_BYTE, i, MSG_DISCARDFDD, MPI_COMM_WORLD, &req[i-1]);
253  }
254 
255  //MPI_Waitall( numProcs - 1, req, status);
256 }
257 void faster::fastComm::recvDiscardFDD(unsigned long int &id){
258  MPI_Status stat;
259  MPI_Recv(&id, sizeof(long unsigned int), MPI_BYTE, 0, MSG_DISCARDFDD, MPI_COMM_WORLD, &stat);
260 }
261 
262 size_t faster::fastComm::getSize( std::string * data, size_t * ds UNUSED, size_t s ){
263  size_t rawDataSize = 0;
264  for( size_t i = 0; i < s; ++i ){
265  rawDataSize += (sizeof(size_t) + data[i].size());
266  }
267  return rawDataSize;
268 }
269 
270 
271 void faster::fastComm::sendDataUltraPlus(int dest, std::string * data, size_t * lineSizes UNUSED, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
272  b.reset();
273  b.grow(getSize(data, lineSizes, size));
274 
275  for ( size_t i = 0; i < size; ++i){
276  b << data[i];
277  }
278  MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
279 }
280 
281 void faster::fastComm::recvDataUltraPlus(int src, void *& data, int & size, int tag, fastCommBuffer & b){
282  MPI_Status stat, stat2;
283  MPI_Probe(src, tag, MPI_COMM_WORLD, &stat);
284  MPI_Get_count(&stat, MPI_BYTE, &size);
285  b.grow(size);
286  b.reset();
287  MPI_Recv(b.data(), b.free(), MPI_BYTE, src, tag, MPI_COMM_WORLD, &stat2);
288  data = b.data();
289 }
290 
291 // Receive 2D data
292 void faster::fastComm::recvDataUltra(unsigned long int &id, int src, void *& keys, void *& data, size_t *& lineSizes, size_t &size, int tagID, int tagDataSize, int tagKeys, int tagData){
293  int rSize = 0;
294 
295  // Receive the FDD ID, size and item size
296  bufferRecv[0].reset();
297  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, src, tagID, MPI_COMM_WORLD, status);
298  bufferRecv[0] >> id >> size;
299  //std::cerr << ".";
300 
301  // Receive the size of every line to be received
302  if (tagDataSize)
303  recvDataUltraPlus(src, (void*&) lineSizes, rSize, tagDataSize, bufferRecv[2]);
304  //std::cerr << ".";
305 
306  // Receive Keys
307  if (tagKeys)
308  recvDataUltraPlus(src, keys, rSize, tagKeys, bufferRecv[1]);
309  //std::cerr << ".";
310 
311 
312  recvDataUltraPlus(src, data, rSize, tagData, bufferRecv[0]);
313  //std::cerr << ".";
314 }
315 
316 
317 void faster::fastComm::recvFDDSetData(unsigned long int &id, void *& data, size_t &size){
318  void * NULLRef = NULL;
319  //recvDataGeneric(id, 0, data, size, MSG_FDDSETDATAID, MSG_FDDSETDATA);
320  recvDataUltra(id, MPI_ANY_SOURCE, NULLRef, data, (size_t*&) NULLRef, size, MSG_FDDSETDATAID, 0, 0, MSG_FDDSETDATA);
321 }
322 void faster::fastComm::recvFDDSetData(unsigned long int &id, void *& data, size_t *& lineSizes, size_t &size){
323  //recvDataGeneric(id, 0, data, lineSizes, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, MSG_FDDSET2DDATA);
324  void * NULLRef = NULL;
325  recvDataUltra(id, 0, NULLRef, data, lineSizes, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, 0, MSG_FDDSET2DDATA);
326 }
327 
328 void faster::fastComm::recvFDDData(unsigned long int &id, void * data, size_t &size){
329  void * NULLRef = NULL;
330  //recvDataGeneric(id, 0, data, size, MSG_FDDDATAID, MSG_FDDDATA);
331  recvDataUltra(id, 0, NULLRef, data, (size_t*&) NULLRef, size, MSG_FDDDATAID, 0, 0, MSG_FDDDATA);
332 }
333 void faster::fastComm::recvIFDDData(unsigned long int &id, void * keys, void * data, size_t &size){
334  size_t * NULLRef = NULL;
335  //recvIDataGeneric(id, 0, keys, data, size, MSG_IFDDDATAID, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
336  recvDataUltra(id, 0, keys, data, NULLRef, size, MSG_IFDDDATAID, 0, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
337 }
338 
339 
340 
341 void faster::fastComm::sendReadFDDFile(unsigned long int id, std::string filename, size_t size, size_t offset, int dest){
342 
343  buffer[dest].reset();
344 
345  buffer[dest] << id << size << offset << filename;
346 
347  MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_READFDDFILE, MPI_COMM_WORLD, &req[dest-1]);
348 }
349 
350 void faster::fastComm::recvReadFDDFile(unsigned long int &id, std::string & filename, size_t &size, size_t & offset){
351  MPI_Status stat;
352  //size_t filenameSize;
353  int msgSize = 0;
354 
355  bufferRecv[0].reset();
356  MPI_Probe(0, MSG_READFDDFILE, MPI_COMM_WORLD, &stat);
357  MPI_Get_count(&stat, MPI_BYTE, &msgSize);
358  bufferRecv[0].grow(msgSize);
359 
360  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_READFDDFILE, MPI_COMM_WORLD, &stat);
361  //buffer[0] >> id >> size >> offset >> filenameSize;
362  //buffer[0].read(filename, filenameSize);
363  bufferRecv[0] >> id >> size >> offset >> filename;
364  //std::cerr << filename << "\n";
365 }
366 
367 void faster::fastComm::sendWriteFDDFile(unsigned long int id,std::string & path, std::string & sufix){
368 
369  for (int dest = 1; dest < numProcs; ++dest){
370  buffer[dest].reset();
371 
372  buffer[dest] << id << path << sufix;
373 
374  MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_WRITEFDDFILE, MPI_COMM_WORLD, &req[dest-1]);
375  }
376  MPI_Waitall( numProcs - 1, req, status);
377 }
378 
379 void faster::fastComm::recvWriteFDDFile(unsigned long int & id,std::string & path, std::string & sufix){
380  MPI_Status stat;
381  //size_t filenameSize;
382 
383  int msgSize = 0;
384  bufferRecv[0].reset();
385  MPI_Probe(0, MSG_WRITEFDDFILE, MPI_COMM_WORLD, &stat);
386  MPI_Get_count(&stat, MPI_BYTE, &msgSize);
387  bufferRecv[0].grow(msgSize);
388 
389  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_WRITEFDDFILE, MPI_COMM_WORLD, &stat);
390  //buffer[0] >> id >> size >> offset >> filenameSize;
391  //buffer[0].read(filename, filenameSize);
392  bufferRecv[0] >> id >> path >> sufix;
393  //std::cerr << filename << "\n";
394 }
395 
396 
397 
398 void faster::fastComm::sendFDDInfo(size_t size){
399  MPI_Send( &size, sizeof(size_t), MPI_BYTE, 0, MSG_FDDINFO, MPI_COMM_WORLD);
400 }
401 
402 void faster::fastComm::recvFDDInfo(size_t &size, int & src){
403  MPI_Recv(&size, sizeof(size_t), MPI_BYTE, MPI_ANY_SOURCE, MSG_FDDINFO, MPI_COMM_WORLD, status);
404  src = status[0].MPI_SOURCE;
405 }
406 
407 void faster::fastComm::sendFileName(std::string path){
408  buffer[0].reset();
409 
410  buffer[0] << path;
411 
412  for (int dest = 1; dest < numProcs; ++dest){
413  MPI_Isend(
414  buffer[0].data(),
415  buffer[0].size(),
416  MPI_BYTE,
417  dest,
418  MSG_FILENAME,
419  MPI_COMM_WORLD,
420  &req[dest-1]);
421  }
422 }
423 
424 void faster::fastComm::recvFileName(std::string & path){
425  MPI_Status stat;
426 
427  int msgSize = 0;
428  bufferRecv[0].reset();
429  MPI_Probe(0, MSG_FILENAME, MPI_COMM_WORLD, &stat);
430  MPI_Get_count(&stat, MPI_BYTE, &msgSize);
431  bufferRecv[0].grow(msgSize);
432 
433  MPI_Recv(
434  bufferRecv[0].data(),
435  bufferRecv[0].free(),
436  MPI_BYTE,
437  0,
438  MSG_FILENAME,
439  MPI_COMM_WORLD,
440  &stat);
441  //buffer[0] >> id >> size >> offset >> filenameSize;
442  //buffer[0].read(filename, filenameSize);
443  bufferRecv[0] >> path;
444 }
445 
446 bool faster::fastComm::isSendBufferFree(int i){
447  if (bufferBusy[i]){
448  int savepoint = i - 1;
449  int flag;
450 
451  if ( i > procId ){
452  savepoint -= 1;
453  }
454 
455  // check if buffer has been freed
456  MPI_Test(&req[savepoint], &flag, MPI_STATUS_IGNORE);
457  if (flag){
458  bufferBusy[i] = false;
459  return true;
460  }else{
461  bufferBusy[i] = true;
462  return false;
463  }
464  }
465  return true;
466 }
467 
468 void faster::fastComm::sendGroupByKeyData(int i){
469  int savepoint = i - 1;
470  if ( i > procId ){
471  savepoint -= 1;
472  }
473  MPI_Isend(
474  buffer[i].data(),
475  buffer[i].size(),
476  MPI_BYTE,
477  i,
478  MSG_GROUPBYKEYDATA,
479  MPI_COMM_WORLD,
480  &req[savepoint]);
481  bufferBusy[i] = true;
482 }
483 
484 void * faster::fastComm::recvGroupByKeyData(int & size){
485  MPI_Status stat;
486  size = 0;
487  int flag = 0;
488  void * data;
489 
490  MPI_Iprobe(
491  MPI_ANY_SOURCE,
492  MSG_GROUPBYKEYDATA,
493  MPI_COMM_WORLD,
494  &flag,
495  &stat
496  );
497  if (flag == true){
498  recvDataUltraPlus(
499  MPI_ANY_SOURCE,
500  data,
501  size,
502  MSG_GROUPBYKEYDATA,
503  bufferRecv[0]
504  );
505  }
506 
507  return data;
508 }
509 
510 
511 
512 void faster::fastComm::sendCollect(unsigned long int id){
513  for (int i = 1; i < numProcs; ++i){
514  MPI_Isend(
515  &id,
516  sizeof(long unsigned int),
517  MPI_BYTE,
518  i,
519  MSG_COLLECT,
520  MPI_COMM_WORLD,
521  &req[i-1]
522  );
523  }
524 
525  MPI_Waitall( numProcs - 1, req, status);
526 }
527 
528 void faster::fastComm::recvCollect(unsigned long int &id){
529  MPI_Status stat;
530  MPI_Recv(&id, sizeof(long unsigned int), MPI_BYTE, 0, MSG_COLLECT, MPI_COMM_WORLD, &stat);
531 }
532 
533 void faster::fastComm::sendFinish(){
534  for (int i = 1; i < numProcs; ++i){
535  MPI_Isend( buffer[0].data(), 1, MPI_BYTE, i, MSG_FINISH, MPI_COMM_WORLD, &req[i-1]);
536  }
537 
538  MPI_Waitall( numProcs - 1, req, status);
539 }
540 void faster::fastComm::recvFinish(){
541  MPI_Status stat;
542  MPI_Recv(bufferRecv[0].data(), 1, MPI_BYTE, 0, MSG_FINISH, MPI_COMM_WORLD, &stat);
543 }
544 
545 void faster::fastComm::bcastBuffer(int src, int i){
546  joinAll();
547  MPI_Bcast(buffer[i].data(), buffer[i].size(), MPI_BYTE, src, MPI_COMM_WORLD);
548 }
549 
unsigned int fddType
Dataset type.
Definition: definitions.h:16