libfaster API Documentation  Development Version
Super fast distributted computing
fastComm.h
1 #ifndef LIBFASTER_FASTCOMM_H
2 #define LIBFASTER_FASTCOMM_H
3 
4 #include <string>
5 #include <mpi.h>
6 #include <sstream>
7 
8 
9 #include "definitions.h"
10 #include "fastCommBuffer.h"
11 #include "misc.h"
12 
13 namespace faster{
14  class fastComm;
15  class fastTask;
16 
17  enum commMode {
18  Local,
19  Mesos
20  };
21 
22  /*
23  #define MSG_TASK 0x0001
24  #define MSG_CREATEFDD 0x0002
25  #define MSG_CREATEIFDD 0x0003
26  #define MSG_CREATEIFDD 0x0004
27  #define MSG_DISCARDFDD 0x0005
28  #define MSG_FDDSETDATAID 0x0006
29  #define MSG_FDDSETDATA 0x0007
30  #define MSG_FDDSET2DDATAID 0x0008
31  #define MSG_FDDSET2DDATASIZES 0x0009
32  #define MSG_FDDSET2DDATA 0x000A
33  #define MSG_READFDDFILE 0x000B
34  #define MSG_COLLECT 0x000C
35  #define MSG_FDDDATAID 0x000D
36  #define MSG_FDDDATA 0x000E
37  #define MSG_TASKRESULT 0x000F
38  #define MSG_FDDINFO 0x0010
39  #define MSG_FDDSETIDATAID 0x0011
40  #define MSG_FDDSETIDATA 0x0012
41  #define MSG_FDDSETIKEYS 0x0013
42  #define MSG_FDDSET2DIDATAID 0x0014
43  #define MSG_FDDSET2DIDATASIZES 0x0015
44  #define MSG_FDDSET2DIDATA 0x0016
45  #define MSG_FDDSET2DIKEYS 0x0017
46  #define MSG_KEYOWNERSHIPSUGEST 0x0018
47  #define MSG_MYKEYOWNERSHIP 0x0019
48  #define MSG_MYKEYCOUNT 0x001a
49  #define MSG_IFDDDATAID 0x001b
50  #define MSG_IFDDDATAKEYS 0x001c
51  #define MSG_IFDDDATA 0x001d
52  #define MSG_COLLECTDATA 0x001a
53  #define MSG_KEYMAP 0x001b
54  #define MSG_GROUPBYKEYDATA 0x001c
55  // . . .
56  #define MSG_FINISH 0x8000 // */
57  enum msgTag : int {
58  MSG_TASK ,
59  MSG_CREATEFDD ,
60  MSG_CREATEIFDD ,
61  MSG_CREATEGFDD ,
62  MSG_DISCARDFDD ,
63  MSG_FDDSETDATAID ,
64  MSG_FDDSETDATA ,
65  MSG_FDDSET2DDATAID ,
66  MSG_FDDSET2DDATASIZES,
67  MSG_FDDSET2DDATA ,
68  MSG_READFDDFILE ,
69  MSG_WRITEFDDFILE,
70  MSG_FILENAME,
71  MSG_COLLECT ,
72  MSG_FDDDATAID ,
73  MSG_FDDDATA ,
74  MSG_TASKRESULT ,
75  MSG_FDDINFO ,
76  MSG_FDDSETIDATAID ,
77  MSG_FDDSETIDATA ,
78  MSG_FDDSETIKEYS ,
79  MSG_FDDSET2DIDATAID ,
80  MSG_FDDSET2DIDATASIZES,
81  MSG_FDDSET2DIDATA ,
82  MSG_FDDSET2DIKEYS ,
83  MSG_KEYOWNERSHIPSUGEST,
84  MSG_MYKEYOWNERSHIP,
85  MSG_MYKEYCOUNT ,
86  MSG_IFDDDATAID ,
87  MSG_IFDDDATAKEYS,
88  MSG_IFDDDATA ,
89  MSG_COLLECTDATA ,
90  MSG_KEYMAP ,
91  MSG_DISTKEYMAP ,
92  MSG_GROUPBYKEYDATA,
93 
94  MSG_FINISH
95  };
96 
97 
98 
99  #define FDDTYPE_NULL 0x00
100  #define FDDTYPE_CHAR 0x01
101  #define FDDTYPE_INT 0x02
102  #define FDDTYPE_LONGINT 0x03
103  #define FDDTYPE_FLOAT 0x04
104  #define FDDTYPE_DOUBLE 0x05
105  #define FDDTYPE_STRING 0x07
106  #define FDDTYPE_CHARP 0x08
107  #define FDDTYPE_INTP 0x09
108  #define FDDTYPE_LONGINTP 0x0A
109  #define FDDTYPE_FLOATP 0x0B
110  #define FDDTYPE_DOUBLEP 0x0C
111  #define FDDTYPE_OBJECT 0x06
112 
113 
114 
115 
116 
117  // Communications class
118  // Responsible for process communication
119  class fastComm{
120  friend class fastContext;
121  private:
122  MPI_Status * status;
123  MPI_Request * req;
124  MPI_Request * req2;
125  MPI_Request * req3;
126  MPI_Request * req4;
127 
128  MPI_Group slaveGroup;
129  MPI_Comm slaveComm;
130 
131  commMode mode;
132  int numProcs;
133  int procId;
134  double timeStart, timeEnd;
135 
136  std::vector<bool> bufferBusy;
137 
138  template <typename T>
139  void sendDataUltraPlus(int dest, T * data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
140  void sendDataUltraPlus(int dest, std::string * data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
141  template <typename T>
142  void sendDataUltraPlus(int dest, std::vector<T> * data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
143  template <typename T>
144  void sendDataUltraPlus(int dest, T ** data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
145 
146  template <typename K, typename T>
147  void sendDataUltra(unsigned long int id, int dest, K * keys, T * data, size_t * lineSizes, size_t size, int tagID, int tagDataSize, int tagKeys, int tagData);
148 
149 
150  void recvDataUltraPlus(int src, void *& data, int & size, int tag, fastCommBuffer & b UNUSED);
151 
152  void recvDataUltra(unsigned long int &id, int src, void *& keys, void *& data, size_t *& lineSizes, size_t &size, int tagID, int tagDataSize, int tagKeys, int tagData);
153 
154 
155  fastCommBuffer * buffer;
156  fastCommBuffer * bufferRecv;
157  fastCommBuffer resultBuffer;
158 
159  public:
160 
161  const size_t maxMsgSize = 15000;
162  //const size_t maxMsgSize = 3;
163 
164  fastComm(int & argc, char ** argv);
165  ~fastComm();
166 
167  int getProcId(){ return procId; }
168  int getNumProcs(){ return numProcs; }
169  fastCommBuffer & getResultBuffer(){ return resultBuffer; }
170  fastCommBuffer * getSendBuffers(){ return buffer; }
171 
172  bool isDriver();
173 
174  void probeMsgs(int & tag, int & src);
175  void waitForReq(int numReqs);
176  void joinAll();
177  void joinSlaves();
178 
179  template <typename T>
180  size_t getSize( T * data UNUSED, size_t * ds UNUSED, size_t s );
181  template <typename T>
182  size_t getSize( std::vector<T> * data, size_t * ds UNUSED, size_t s );
183  template <typename T>
184  size_t getSize( T ** data UNUSED, size_t * ds, size_t s );
185  size_t getSize( std::string * data, size_t * ds UNUSED, size_t s );
186 
187  // Task
188  void sendTask(fastTask & task);
189  void recvTask(fastTask & task);
190 
191  //void sendTaskResult(unsigned long int id, void * res, size_t size, double time);
192  void sendTaskResult();
193  void * recvTaskResult(unsigned long int &tid, unsigned long int & sid, size_t &size, size_t & time, procstat & stat);
194 
195  // FDD Creation / Destruction
196  void sendCreateFDD(unsigned long int id, fddType type, size_t size, int dest);
197  void recvCreateFDD(unsigned long int &id, fddType &type, size_t & size);
198  void sendCreateIFDD(unsigned long int id, fddType kType, fddType tType, size_t size, int dest);
199  void recvCreateIFDD(unsigned long int &id, fddType &kType, fddType &tType, size_t & size);
200 
201  void sendCreateFDDGroup(unsigned long int id, fddType keyType, std::vector<unsigned long int> & members);
202  void recvCreateFDDGroup(unsigned long int & id, fddType & keyType, std::vector<unsigned long int> & members);
203 
204  void sendDiscardFDD(unsigned long int id);
205  void recvDiscardFDD(unsigned long int &id);
206 
207  // Set Data
208  template <typename T>
209  void sendFDDSetData(unsigned long int id, int dest, T * data, size_t size);
210  template <typename T>
211  void sendFDDSetData(unsigned long int id, int dest, T ** data, size_t * lineSizes, size_t size);
212 
213  template <typename K, typename T>
214  void sendFDDSetIData(unsigned long int id, int dest, K * keys, T * data, size_t size);
215  template <typename K, typename T>
216  void sendFDDSetIData(unsigned long int id, int dest, K * keys, T ** data, size_t * lineSizes, size_t size);
217 
218  void recvFDDSetData(unsigned long int &id, void *& data, size_t &size);
219  void recvFDDSetData(unsigned long int &id, void *& data, size_t *& lineSizes, size_t &size);
220 
221  template <typename K, typename T>
222  void recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t &size);
223  template <typename K, typename T>
224  void recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t *& lineSizes, size_t &size);
225 
226 
227  // Data
228  template <typename T>
229  void sendFDDData(unsigned long int id, int dest, T * data, size_t size);
230  template <typename K, typename T>
231  void sendIFDDData(unsigned long int id, int dest, K * keys, T * data, size_t size);
232  void recvFDDData(unsigned long int &id, void * data, size_t &size);
233  void recvIFDDData(unsigned long int &id, void * keys, void * data, size_t &size);
234 
235 
236  template <typename T>
237  void sendFDDDataCollect(unsigned long int id, T * data, size_t size);
238  template <typename T>
239  void sendFDDDataCollect(unsigned long int id, T ** data, size_t * dataSizes, size_t size);
240  template <typename K, typename T>
241  void sendFDDDataCollect(unsigned long int id, K * keys, T * data, size_t size);
242  template <typename K, typename T>
243  void sendFDDDataCollect(unsigned long int id, K * keys, T ** data, size_t * dataSizes, size_t size);
244 
245 
246  template <typename T>
247  inline void decodeCollect(T & item);
248  template <typename T>
249  inline void decodeCollect(std::pair<T*,size_t> & item);
250  template <typename K, typename T>
251  inline void decodeCollect(std::pair<K, T> & item);
252  template <typename K, typename T>
253  inline void decodeCollect(std::tuple<K, T*, size_t> & item);
254 
255  template <typename T>
256  void recvFDDDataCollect(std::vector<T> & ret);
257 
258 
259 
260 
261  // Read File
262  // THESE FUNCTIONS WILL BE DEPRECATED (THEY WILL BECOME TASKS)
263  void sendReadFDDFile(unsigned long int id, std::string filename, size_t size, size_t offset, int dest);
264  void recvReadFDDFile(unsigned long int &id, std::string & filename, size_t &size, size_t & offset);
265  void sendWriteFDDFile(unsigned long int id,std::string & path, std::string & sufix);
266  void recvWriteFDDFile(unsigned long int & id,std::string & path, std::string & sufix);
267 
268  void sendFDDInfo(size_t size);
269  void recvFDDInfo(size_t &size, int & src);
270 
271  void sendFileName(std::string path);
272  void recvFileName(std::string & filename);
273 
274  void sendCollect(unsigned long int id);
275  void recvCollect(unsigned long int &id);
276 
277  void sendFinish();
278  void recvFinish();
279  void bcastBuffer(int src, int i);
280 
281 
282  // GroupByKey
283  template <typename K>
284  void sendKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap);
285  template <typename K>
286  void recvKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap);
287  template <typename K>
288  void distributeKeyMap(std::unordered_map<K, int> & localKeyMap, std::unordered_map<K, int> & keyMap);
289  template <typename K>
290  void sendCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
291  template <typename K>
292  void recvCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
293 
294  bool isSendBufferFree(int i);
295  void sendGroupByKeyData(int i);
296  void * recvGroupByKeyData(int &size);
297 
298  /*
299  template <typename K>
300  void sendKeyOwnershipSugest(int dest, K key);
301  template <typename K>
302  void sendMyKeyOwnership(K key);
303  template <typename K>
304  void recvKeyOwnershipGeneric(K * keys, int tag);
305  template <typename K>
306  void recvKeyOwnershipSugest(K * keys);
307  template <typename K>
308  void recvAllKeyOwnership(K * keys);
309  void sendMyKeyCount(int dest, size_t numKeys);
310  template <typename K>
311  typename std::deque<std::pair<K,size_t>> recvMyKeyCount(int & src);
312  template <typename K>
313  void sendCountByKey(std::unordered_map<K,size_t> & count);
314  template <typename K>
315  void recvCountByKey(std::unordered_map<K,size_t> & count);
316  */
317 
318  };
319 
320 
321 
322  /* ------- DATA Serialization -------- */
323 
324 
325  template <typename T>
326  size_t faster::fastComm::getSize( T * data UNUSED, size_t * ds UNUSED, size_t s ){
327  return s*sizeof(T);
328  }
329 
330  template <typename T>
331  size_t faster::fastComm::getSize( std::vector<T> * data, size_t * ds UNUSED, size_t s ){
332  size_t rawDataSize = 0;
333  for( size_t i = 0; i < s; ++i ){
334  rawDataSize += (sizeof(size_t) + data[i].size()*sizeof(T));
335  }
336  return s*sizeof(T);
337  }
338 
339  template <typename T>
340  size_t faster::fastComm::getSize( T ** data UNUSED, size_t * ds, size_t s ){
341  size_t rawDataSize = 0;
342  for( size_t i = 0; i < s; ++i ){
343  rawDataSize += ds[i]*sizeof(T);
344  }
345  return rawDataSize;
346  }
347 
348  // TODO use serialization?
349  template <typename T>
350  void fastComm::sendDataUltraPlus(int dest, T * data, size_t * lineSizes UNUSED, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
351  MPI_Isend( data, size*sizeof(T), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
352  }
353 
354  template <typename T>
355  void fastComm::sendDataUltraPlus(int dest, std::vector<T> * data, size_t * lineSizes UNUSED, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
356  b.reset();
357  b.grow(getSize(data, lineSizes, size));
358 
359  for ( size_t i = 0; i < size; ++i){
360  b << data[i];
361  }
362  MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
363  }
364 
365  template <typename T>
366  void fastComm::sendDataUltraPlus(int dest, T ** data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
367  b.reset();
368  b.grow(getSize(data, lineSizes, size));
369 
370  for ( size_t i = 0; i < size; ++i){
371  b.write(data[i], lineSizes[i]*sizeof(T));
372  }
373  MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
374  }
375 
376  // Generic Data communication functions
377  // Send 1D Data
378  template <typename K, typename T>
379  void fastComm::sendDataUltra(unsigned long int id, int dest, K * keys, T * data, size_t * lineSizes, size_t size, int tagID, int tagDataSize, int tagKeys, int tagData){
380 
381  // Send data information
382  buffer[dest].reset();
383  buffer[dest] << id << size;
384  MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, tagID , MPI_COMM_WORLD, &req2[dest-1]);
385 
386  // Send subarrays sizes
387  if (tagDataSize)
388  MPI_Isend( lineSizes, size*sizeof(size_t), MPI_BYTE, dest, tagDataSize, MPI_COMM_WORLD, &req4[dest-1]);
389 
390  // Send Keys
391  if (tagKeys){
392  sendDataUltraPlus(dest, keys, NULL, size, tagKeys, buffer[dest], &req3[dest-1] );
393  MPI_Wait(&req3[dest-1], status);
394  }
395 
396  // Send subarrays
397  sendDataUltraPlus(dest, data, lineSizes, size, tagData, buffer[dest], &req[dest-1] );
398 
399  }
400 
401 
402  // 1D Primitive types
403  template <typename T>
404  void fastComm::sendFDDSetData(unsigned long int id, int dest, T * data, size_t size){
405  int * NULLRef = NULL;
406  sendDataUltra(id, dest, NULLRef, data, (size_t*) NULLRef, size, MSG_FDDSETDATAID, 0, 0, MSG_FDDSETDATA);
407  }
408  template <typename K, typename T>
409  void fastComm::sendFDDSetIData(unsigned long int id, int dest, K * keys, T * data, size_t size){
410  size_t * NULLRef = NULL;
411  sendDataUltra(id, dest, keys, data, NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
412  }
413 
414  // 2D Primitive types
415  template <typename T>
416  void fastComm::sendFDDSetData(unsigned long int id, int dest, T ** data, size_t *lineSize, size_t size){
417  int * NULLRef = NULL;
418  sendDataUltra(id, dest, NULLRef, data, lineSize, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, 0, MSG_FDDSET2DDATA);
419  }
420  template <typename K, typename T>
421  void fastComm::sendFDDSetIData(unsigned long int id, int dest, K * keys, T ** data, size_t *lineSize, size_t size){
422  sendDataUltra(id, dest, keys, data, lineSize, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
423  }
424 
425  template <typename K, typename T>
426  void fastComm::recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t &size){
427  void * NULLRef = NULL;
428  recvDataUltra(id, 0, keys, data, (size_t*&) NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
429  }
430 
431  template <typename K, typename T>
432  void fastComm::recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t *& lineSizes, size_t &size){
433  recvDataUltra(id, 0, keys, data, lineSizes, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
434  }
435 
436 
437  // Parallel data communication
438  template <typename T>
439  void fastComm::sendFDDData(unsigned long int id, int dest, T * data, size_t size){
440  sendDataUltra<int,T>(id, dest, NULL, data, NULL, size, MSG_FDDDATAID, 0, 0, MSG_FDDDATA);
441  }
442  template <typename K, typename T>
443  void fastComm::sendIFDDData(unsigned long int id, int dest, K * keys, T * data, size_t size){
444  sendDataUltra(id, dest, NULL, data, NULL, size, MSG_IFDDDATAID, 0, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
445  }
446 
447  template <typename T>
448  void fastComm::sendFDDDataCollect(unsigned long int id, T * data, size_t size){
449  buffer[0].reset();
450  buffer[0] << id << size;
451 
452  buffer[0].grow(16 + getSize(data, NULL, size));
453 
454  for( size_t i = 0; i < size; ++i ){
455  buffer[0] << data[i];
456  }
457 
458  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
459  }
460  template <typename T>
461  void fastComm::sendFDDDataCollect(unsigned long int id, T ** data, size_t * dataSizes, size_t size){
462 
463  buffer[0].reset();
464  buffer[0] << id << size;
465 
466  buffer[0].grow(16 + (size*sizeof(size_t)) + getSize(data, dataSizes, size));
467 
468  for( size_t i = 0; i < size; ++i ){
469  buffer[0] << dataSizes[i];
470  buffer[0].write(data[i], dataSizes[i]*sizeof(T));
471  }
472 
473  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
474  }
475  template <typename K, typename T>
476  void fastComm::sendFDDDataCollect(unsigned long int id, K * keys, T * data, size_t size){
477 
478  buffer[0].reset();
479  buffer[0] << id << size;
480 
481  buffer[0].grow(16 + getSize(keys, NULL, size) + getSize(data, NULL, size));
482 
483  for( size_t i = 0; i < size; ++i ){
484  buffer[0] << keys[i] << data[i];
485  }
486 
487  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
488  }
489  template <typename K, typename T>
490  void fastComm::sendFDDDataCollect(unsigned long int id, K * keys, T ** data, size_t * dataSizes, size_t size){
491 
492  buffer[0].reset();
493  buffer[0] << id << size;
494 
495  buffer[0].grow(16 + getSize(keys, NULL, size) + (size*sizeof(size_t)) + getSize(data, dataSizes, size));
496 
497  for( size_t i = 0; i < size; ++i ){
498  buffer[0] << keys[i] << dataSizes[i];
499  buffer[0].write(data[i], dataSizes[i]*sizeof(T));
500  }
501  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
502  }
503 
504  template <typename T>
505  inline void fastComm::decodeCollect(T & item){
506  bufferRecv[0] >> item;
507  }
508  template <typename T>
509  inline void fastComm::decodeCollect(std::pair<T*,size_t> & item){
510  bufferRecv[0] >> item.second;
511  item.first = new T[item.second];
512  bufferRecv[0].read(item.first, item.second * sizeof(T) );
513  }
514  template <typename K, typename T>
515  inline void fastComm::decodeCollect(std::pair<K, T> & item){
516  bufferRecv[0] >> item.first >> item.second;
517  }
518  template <typename K, typename T>
519  inline void fastComm::decodeCollect(std::tuple<K, T*, size_t> & item){
520  bufferRecv[0] >> std::get<0>(item) >> std::get<2>(item);
521  std::get<1>(item) = new T[std::get<2>(item)];
522  bufferRecv[0].read(std::get<1>(item), std::get<2>(item) * sizeof(T) );
523  }
524 
525  template <typename T>
526  void fastComm::recvFDDDataCollect(std::vector<T> & ret){
527  size_t count = 0, size;
528  unsigned long int id;
529  for (int i = 1; i < (numProcs); ++i){
530  MPI_Status stat;
531  int msgSize = 0;
532 
533  MPI_Probe(i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
534  MPI_Get_count(&stat, MPI_BYTE, &msgSize);
535  bufferRecv[0].reset();
536  bufferRecv[0].grow(msgSize);
537 
538  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
539 
540  bufferRecv[0] >> id >> size;
541  //std::cerr << "[" << id << ":" << size<< "] " ;
542  for (size_t j = 0; j < size; ++j){
543  decodeCollect(ret[count]);
544  count ++;
545  }
546  }
547  }
548 
549  // GroupByKey
550  template <typename K>
551  void faster::fastComm::sendKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap){
552  const int itemsPerMsg = (256 * 1024);
553  int num_msgs = ceil( (double) keyMap.size() / itemsPerMsg );
554  int inserted_items = 0;
555  int msg_num = 0;
556  int current_buffer = 0;
557 
558  //MPI_Request * requests = new MPI_Request[ num_msgs * (numProcs - 1) ];
559  //MPI_Status * statuss = new MPI_Status[ num_msgs * (numProcs - 1) ];
560 
561  // Include number of items in first message
562  buffer[current_buffer].reset();
563  buffer[current_buffer] << tid << size_t(keyMap.size());
564 
565  std::cerr << " NM:" << num_msgs << "\n";
566 
567  for ( auto it = keyMap.begin(); it != keyMap.end(); it++){
568  buffer[current_buffer] << it->first << it->second;
569  inserted_items ++;
570 
571  // Send some keys
572  if (inserted_items >= itemsPerMsg){
573  std::cerr << msg_num << " ";
574 
575  if (msg_num > 0)
576  MPI_Waitall( (numProcs - 1), req, status);
577 
578  for ( int i = 1; i < (numProcs); ++i)
579  MPI_Isend(
580  buffer[current_buffer].data(),
581  buffer[current_buffer].size(),
582  MPI_BYTE,
583  i,
584  MSG_KEYMAP,
585  MPI_COMM_WORLD,
586  //&requests[ (i-1) + msg_num * (numProcs-1)]
587  &req[i-1]
588  );
589  buffer[current_buffer].reset();
590  inserted_items = 0;
591  msg_num++;
592 
593  // Next send will use next buffer
594  current_buffer = (current_buffer + 1) % numProcs;
595  }
596  }
597 
598  // Send the rest of the keys
599  if (inserted_items > 0){
600  std::cerr << msg_num << " ";
601 
602  if (msg_num > 0)
603  MPI_Waitall( (numProcs - 1), req, status);
604 
605  for ( int i = 1; i < (numProcs); ++i)
606  MPI_Isend(
607  buffer[current_buffer].data(),
608  buffer[current_buffer].size(),
609  MPI_BYTE,
610  i,
611  MSG_KEYMAP,
612  MPI_COMM_WORLD,
613  //&requests[ (i-1) + msg_num * (numProcs-1)]
614  &req[i-1]
615  );
616  }
617 
618  // Wait for message send conclusion
619  //MPI_Waitall( num_msgs * (numProcs - 1), requests, statuss);
620  MPI_Waitall( (numProcs - 1), req, status);
621 
622  //delete [] requests;
623  //delete [] statuss;
624  }
625  template <typename K>
626  void faster::fastComm::recvKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap){
627  MPI_Status stat;
628  size_t size;
629  int rsize;
630 
631  // Recv initial data
632  MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
633  MPI_Get_count(&stat, MPI_BYTE, &rsize);
634  bufferRecv[0].grow(rsize);
635  bufferRecv[0].reset();
636  MPI_Recv(bufferRecv[0].data(),
637  bufferRecv[0].free(),
638  MPI_BYTE,
639  0,
640  MSG_KEYMAP,
641  MPI_COMM_WORLD,
642  &stat
643  );
644 
645  bufferRecv[0] >> tid >> size;
646 
647  // Allocate map with pre-known size
648  keyMap.reserve(size);
649 
650  const int itemsPerMsg = (256 * 1024);
651  int num_msgs = ceil( (double) size / itemsPerMsg );
652  int inserted_items = 0;
653  std::cerr << "(NM:" << num_msgs << ")\n";
654 
655  for ( int msg_num = 0; msg_num < num_msgs; ++msg_num){
656  int numRevcItems = std::min(size - inserted_items, size_t(itemsPerMsg));
657  std::cerr << msg_num << " ";
658 
659  // Recv more keys
660  if (msg_num > 0){
661  MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
662  MPI_Get_count(&stat, MPI_BYTE, &rsize);
663  bufferRecv[0].grow(rsize);
664  bufferRecv[0].reset();
665  MPI_Recv(
666  bufferRecv[0].data(),
667  bufferRecv[0].free(),
668  MPI_BYTE,
669  0,
670  MSG_KEYMAP,
671  MPI_COMM_WORLD,
672  &stat
673  );
674  }
675 
676  // Insert recvd keys
677  for ( int i = 0; i < numRevcItems; ++i){
678  K key;
679  int count;
680  bufferRecv[0] >> key >> count;
681  keyMap[key] = count;
682  }
683  inserted_items += numRevcItems;
684  }
685  }
686 
687  template <typename K>
688  void faster::fastComm::distributeKeyMap(std::unordered_map<K, int> & localKeyMap, std::unordered_map<K, int> & keyMap){
689  std::pair<K, int> p;
690  keyMap.reserve(localKeyMap.size() * numProcs);
691 
692  std::cerr << "---------- Join Slaves ----------";
693  joinSlaves();
694  std::cerr << "\n";
695 
696  for (int i = 1; i < (numProcs); ++i){
697  if (procId == i){
698  std::cerr << "S" << localKeyMap.size() << " ";
699  int reqIndex = 0;
700 
701  buffer[i] << size_t(localKeyMap.size());
702  for ( auto it = localKeyMap.begin(); it != localKeyMap.end(); it++){
703  buffer[i] << it->first << it->second;
704  }
705 
706  for (int j = 1; j < (numProcs); ++j){
707  if ( j != i )
708  MPI_Isend(buffer[i].data(), buffer[i].size(), MPI_BYTE, j, MSG_DISTKEYMAP, MPI_COMM_WORLD, &req[reqIndex++]);
709  }
710  MPI_Waitall( numProcs - 2, req, status);
711  }else{
712  int rsize;
713  size_t numItems;
714  MPI_Status stat;
715 
716  MPI_Probe(i, MSG_DISTKEYMAP, MPI_COMM_WORLD, &stat);
717  MPI_Get_count(&stat, MPI_BYTE, &rsize);
718  bufferRecv[i].grow(rsize);
719 
720  bufferRecv[i].reset();
721 
722  MPI_Recv(bufferRecv[i].data(), bufferRecv[i].free(), MPI_BYTE, i, MSG_DISTKEYMAP, MPI_COMM_WORLD, &stat);
723 
724  bufferRecv[i] >> numItems;
725  std::cerr << "R:" << numItems << " ";
726 
727  for (size_t j = 0; j < (numItems); ++j){
728  bufferRecv[i] >> p.first >> p.second;
729  keyMap.insert(std::move(p));
730  }
731  }
732 
733  }
734  keyMap.insert(localKeyMap.begin(), localKeyMap.end());
735  std::cerr << " Final Size: " << keyMap.size() << "\n";
736  }
737 
738  template <typename K>
739  void faster::fastComm::sendCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
740  buffer[0].reset();
741  buffer[0] << tid << size_t(keyMap.size());
742 
743  for ( auto it = keyMap.begin(); it != keyMap.end(); it++){
744  buffer[0] << it->first << it->second;
745  }
746 
747  buffer[0] << int(flags.size());
748  for ( size_t i = 0; i < flags.size(); ++i){
749  buffer[0] << char(flags[i]);
750  }
751 
752  for ( int i = 1; i < (numProcs); ++i)
753  MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_KEYMAP, MPI_COMM_WORLD, &req[i-1]);
754  MPI_Waitall( numProcs - 1, req, status);
755 
756  //bcastBuffer(0, 0);
757 
758  }
759  template <typename K>
760  void faster::fastComm::recvCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
761  MPI_Status stat;
762  size_t size;
763  int rsize;
764 
765  MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
766  MPI_Get_count(&stat, MPI_BYTE, &rsize);
767  bufferRecv[0].grow(rsize);
768 
769  bufferRecv[0].reset();
770 
771  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
772  //bcastBuffer(0, 0);
773 
774  bufferRecv[0] >> tid >> size;
775 
776  // Allocate map with pre-known size
777  keyMap.reserve(size);
778 
779  for ( size_t i = 0; i < size; ++i){
780  K key;
781  int count;
782  bufferRecv[0] >> key >> count;
783  keyMap[key] = count;
784  }
785 
786  int numFlags = 0;
787  bufferRecv[0] >> numFlags;
788  flags.resize(numFlags);
789 
790  for ( int i = 0; i < numFlags; i++ ){
791  char flag;
792  bufferRecv[0] >> flag;
793  flags[i] = bool(flag);
794  }
795  }
796 
797 
798 }
799 #endif
Framework context class.
Definition: fastContext.h:66
libfaster main namespace
Definition: _workerFdd.h:11
unsigned int fddType
Dataset type.
Definition: definitions.h:16