Faster  0.0.4Alpha
Superfastdistributtedcomputing
fastComm.h
1 #ifndef LIBFASTER_FASTCOMM_H
2 #define LIBFASTER_FASTCOMM_H
3 
4 #include <string>
5 #include <mpi.h>
6 #include <sstream>
7 
8 
9 #include "definitions.h"
10 #include "fastCommBuffer.h"
11 #include "misc.h"
12 
13 namespace faster{
14  class fastComm;
15  class fastTask;
16 
17  enum commMode {
18  Local,
19  Mesos
20  };
21 
22  /*
23  #define MSG_TASK 0x0001
24  #define MSG_CREATEFDD 0x0002
25  #define MSG_CREATEIFDD 0x0003
26  #define MSG_CREATEIFDD 0x0004
27  #define MSG_DISCARDFDD 0x0005
28  #define MSG_FDDSETDATAID 0x0006
29  #define MSG_FDDSETDATA 0x0007
30  #define MSG_FDDSET2DDATAID 0x0008
31  #define MSG_FDDSET2DDATASIZES 0x0009
32  #define MSG_FDDSET2DDATA 0x000A
33  #define MSG_READFDDFILE 0x000B
34  #define MSG_COLLECT 0x000C
35  #define MSG_FDDDATAID 0x000D
36  #define MSG_FDDDATA 0x000E
37  #define MSG_TASKRESULT 0x000F
38  #define MSG_FDDINFO 0x0010
39  #define MSG_FDDSETIDATAID 0x0011
40  #define MSG_FDDSETIDATA 0x0012
41  #define MSG_FDDSETIKEYS 0x0013
42  #define MSG_FDDSET2DIDATAID 0x0014
43  #define MSG_FDDSET2DIDATASIZES 0x0015
44  #define MSG_FDDSET2DIDATA 0x0016
45  #define MSG_FDDSET2DIKEYS 0x0017
46  #define MSG_KEYOWNERSHIPSUGEST 0x0018
47  #define MSG_MYKEYOWNERSHIP 0x0019
48  #define MSG_MYKEYCOUNT 0x001a
49  #define MSG_IFDDDATAID 0x001b
50  #define MSG_IFDDDATAKEYS 0x001c
51  #define MSG_IFDDDATA 0x001d
52  #define MSG_COLLECTDATA 0x001a
53  #define MSG_KEYMAP 0x001b
54  #define MSG_GROUPBYKEYDATA 0x001c
55  // . . .
56  #define MSG_FINISH 0x8000 // */
57  enum msgTag : int {
58  MSG_TASK ,
59  MSG_CREATEFDD ,
60  MSG_CREATEIFDD ,
61  MSG_CREATEGFDD ,
62  MSG_DISCARDFDD ,
63  MSG_FDDSETDATAID ,
64  MSG_FDDSETDATA ,
65  MSG_FDDSET2DDATAID ,
66  MSG_FDDSET2DDATASIZES,
67  MSG_FDDSET2DDATA ,
68  MSG_READFDDFILE ,
69  MSG_COLLECT ,
70  MSG_FDDDATAID ,
71  MSG_FDDDATA ,
72  MSG_TASKRESULT ,
73  MSG_FDDINFO ,
74  MSG_FDDSETIDATAID ,
75  MSG_FDDSETIDATA ,
76  MSG_FDDSETIKEYS ,
77  MSG_FDDSET2DIDATAID ,
78  MSG_FDDSET2DIDATASIZES,
79  MSG_FDDSET2DIDATA ,
80  MSG_FDDSET2DIKEYS ,
81  MSG_KEYOWNERSHIPSUGEST,
82  MSG_MYKEYOWNERSHIP,
83  MSG_MYKEYCOUNT ,
84  MSG_IFDDDATAID ,
85  MSG_IFDDDATAKEYS,
86  MSG_IFDDDATA ,
87  MSG_COLLECTDATA ,
88  MSG_KEYMAP ,
89  MSG_GROUPBYKEYDATA,
90 
91  MSG_FINISH
92  };
93 
94 
95 
96  #define FDDTYPE_NULL 0x00
97  #define FDDTYPE_CHAR 0x01
98  #define FDDTYPE_INT 0x02
99  #define FDDTYPE_LONGINT 0x03
100  #define FDDTYPE_FLOAT 0x04
101  #define FDDTYPE_DOUBLE 0x05
102  #define FDDTYPE_STRING 0x07
103  #define FDDTYPE_CHARP 0x08
104  #define FDDTYPE_INTP 0x09
105  #define FDDTYPE_LONGINTP 0x0A
106  #define FDDTYPE_FLOATP 0x0B
107  #define FDDTYPE_DOUBLEP 0x0C
108  #define FDDTYPE_OBJECT 0x06
109 
110 
111 
112 
113 
114  // Communications class
115  // Responsible for process communication
116  class fastComm{
117  friend class fastContext;
118  private:
119  MPI_Status * status;
120  MPI_Request * req;
121  MPI_Request * req2;
122  MPI_Request * req3;
123  MPI_Request * req4;
124 
125  MPI_Group slaveGroup;
126  MPI_Comm slaveComm;
127 
128  commMode mode;
129  int numProcs;
130  int procId;
131  double timeStart, timeEnd;
132 
133  template <typename T>
134  void sendDataUltraPlus(int dest, T * data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
135  void sendDataUltraPlus(int dest, std::string * data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
136  template <typename T>
137  void sendDataUltraPlus(int dest, std::vector<T> * data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
138  template <typename T>
139  void sendDataUltraPlus(int dest, T ** data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request);
140 
141  template <typename K, typename T>
142  void sendDataUltra(unsigned long int id, int dest, K * keys, T * data, size_t * lineSizes, size_t size, int tagID, int tagDataSize, int tagKeys, int tagData);
143 
144 
145  void recvDataUltraPlus(int src, void *& data, int & size, int tag, fastCommBuffer & b UNUSED);
146 
147  void recvDataUltra(unsigned long int &id, int src, void *& keys, void *& data, size_t *& lineSizes, size_t &size, int tagID, int tagDataSize, int tagKeys, int tagData);
148 
149 
150  fastCommBuffer * buffer;
151  fastCommBuffer * bufferRecv;
152  fastCommBuffer resultBuffer;
153  public:
154 
155  fastComm(int & argc, char **& argv);
156  ~fastComm();
157 
158  int getProcId(){ return procId; }
159  int getNumProcs(){ return numProcs; }
160  fastCommBuffer & getResultBuffer(){ return resultBuffer; }
161  fastCommBuffer * getSendBuffers(){ return buffer; }
162 
163  bool isDriver();
164 
165  void probeMsgs(int & tag, int & src);
166  void waitForReq(int numReqs);
167  void join();
168 
169  template <typename T>
170  size_t getSize( T * data UNUSED, size_t * ds UNUSED, size_t s );
171  template <typename T>
172  size_t getSize( std::vector<T> * data, size_t * ds UNUSED, size_t s );
173  template <typename T>
174  size_t getSize( T ** data UNUSED, size_t * ds, size_t s );
175  size_t getSize( std::string * data, size_t * ds UNUSED, size_t s );
176 
177  // Task
178  void sendTask(fastTask & task);
179  void recvTask(fastTask & task);
180 
181  //void sendTaskResult(unsigned long int id, void * res, size_t size, double time);
182  void sendTaskResult();
183  void * recvTaskResult(unsigned long int &tid, unsigned long int & sid, size_t &size, size_t & time, procstat & stat);
184 
185  // FDD Creation / Destruction
186  void sendCreateFDD(unsigned long int id, fddType type, size_t size, int dest);
187  void recvCreateFDD(unsigned long int &id, fddType &type, size_t & size);
188  void sendCreateIFDD(unsigned long int id, fddType kType, fddType tType, size_t size, int dest);
189  void recvCreateIFDD(unsigned long int &id, fddType &kType, fddType &tType, size_t & size);
190 
191  void sendCreateFDDGroup(unsigned long int id, fddType keyType, std::vector<unsigned long int> & members);
192  void recvCreateFDDGroup(unsigned long int & id, fddType & keyType, std::vector<unsigned long int> & members);
193 
194  void sendDiscardFDD(unsigned long int id);
195  void recvDiscardFDD(unsigned long int &id);
196 
197  // Set Data
198  template <typename T>
199  void sendFDDSetData(unsigned long int id, int dest, T * data, size_t size);
200  template <typename T>
201  void sendFDDSetData(unsigned long int id, int dest, T ** data, size_t * lineSizes, size_t size);
202 
203  template <typename K, typename T>
204  void sendFDDSetIData(unsigned long int id, int dest, K * keys, T * data, size_t size);
205  template <typename K, typename T>
206  void sendFDDSetIData(unsigned long int id, int dest, K * keys, T ** data, size_t * lineSizes, size_t size);
207 
208  void recvFDDSetData(unsigned long int &id, void *& data, size_t &size);
209  void recvFDDSetData(unsigned long int &id, void *& data, size_t *& lineSizes, size_t &size);
210 
211  template <typename K, typename T>
212  void recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t &size);
213  template <typename K, typename T>
214  void recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t *& lineSizes, size_t &size);
215 
216 
217  // Data
218  template <typename T>
219  void sendFDDData(unsigned long int id, int dest, T * data, size_t size);
220  template <typename K, typename T>
221  void sendIFDDData(unsigned long int id, int dest, K * keys, T * data, size_t size);
222  void recvFDDData(unsigned long int &id, void * data, size_t &size);
223  void recvIFDDData(unsigned long int &id, void * keys, void * data, size_t &size);
224 
225 
226  template <typename T>
227  void sendFDDDataCollect(unsigned long int id, T * data, size_t size);
228  template <typename T>
229  void sendFDDDataCollect(unsigned long int id, T ** data, size_t * dataSizes, size_t size);
230  template <typename K, typename T>
231  void sendFDDDataCollect(unsigned long int id, K * keys, T * data, size_t size);
232  template <typename K, typename T>
233  void sendFDDDataCollect(unsigned long int id, K * keys, T ** data, size_t * dataSizes, size_t size);
234 
235 
236  template <typename T>
237  inline void decodeCollect(T & item);
238  template <typename T>
239  inline void decodeCollect(std::pair<T*,size_t> & item);
240  template <typename K, typename T>
241  inline void decodeCollect(std::pair<K, T> & item);
242  template <typename K, typename T>
243  inline void decodeCollect(std::tuple<K, T*, size_t> & item);
244 
245  template <typename T>
246  void recvFDDDataCollect(std::vector<T> & ret);
247 
248 
249 
250 
251  // Read File
252  void sendReadFDDFile(unsigned long int id, std::string filename, size_t size, size_t offset, int dest);
253  void recvReadFDDFile(unsigned long int &id, std::string & filename, size_t &size, size_t & offset);
254 
255  void sendFDDInfo(size_t size);
256  void recvFDDInfo(size_t &size, int & src);
257 
258  void sendCollect(unsigned long int id);
259  void recvCollect(unsigned long int &id);
260 
261  void sendFinish();
262  void recvFinish();
263 
264 
265  // GroupByKey
266  template <typename K>
267  void sendKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap);
268  template <typename K>
269  void recvKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap);
270  template <typename K>
271  void sendCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
272  template <typename K>
273  void recvCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
274 
275  void sendGroupByKeyData(int i);
276  void * recvGroupByKeyData(int &size);
277 
278  /*
279  template <typename K>
280  void sendKeyOwnershipSugest(int dest, K key);
281  template <typename K>
282  void sendMyKeyOwnership(K key);
283  template <typename K>
284  void recvKeyOwnershipGeneric(K * keys, int tag);
285  template <typename K>
286  void recvKeyOwnershipSugest(K * keys);
287  template <typename K>
288  void recvAllKeyOwnership(K * keys);
289  void sendMyKeyCount(int dest, size_t numKeys);
290  template <typename K>
291  typename std::deque<std::pair<K,size_t>> recvMyKeyCount(int & src);
292  template <typename K>
293  void sendCountByKey(std::unordered_map<K,size_t> & count);
294  template <typename K>
295  void recvCountByKey(std::unordered_map<K,size_t> & count);
296  */
297 
298  };
299 
300 
301 
302  /* ------- DATA Serialization -------- */
303 
304 
305  template <typename T>
306  size_t faster::fastComm::getSize( T * data UNUSED, size_t * ds UNUSED, size_t s ){
307  return s*sizeof(T);
308  }
309 
310  template <typename T>
311  size_t faster::fastComm::getSize( std::vector<T> * data, size_t * ds UNUSED, size_t s ){
312  size_t rawDataSize = 0;
313  for( size_t i = 0; i < s; ++i ){
314  rawDataSize += (sizeof(size_t) + data[i].size()*sizeof(T));
315  }
316  return s*sizeof(T);
317  }
318 
319  template <typename T>
320  size_t faster::fastComm::getSize( T ** data UNUSED, size_t * ds, size_t s ){
321  size_t rawDataSize = 0;
322  for( size_t i = 0; i < s; ++i ){
323  rawDataSize += ds[i]*sizeof(T);
324  }
325  return rawDataSize;
326  }
327 
328  // TODO use serialization?
329  template <typename T>
330  void fastComm::sendDataUltraPlus(int dest, T * data, size_t * lineSizes UNUSED, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
331  MPI_Isend( data, size*sizeof(T), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
332  }
333 
334  template <typename T>
335  void fastComm::sendDataUltraPlus(int dest, std::vector<T> * data, size_t * lineSizes UNUSED, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
336  b.reset();
337  b.grow(getSize(data, lineSizes, size));
338 
339  for ( size_t i = 0; i < size; ++i){
340  b << data[i];
341  }
342  MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
343  }
344 
345  template <typename T>
346  void fastComm::sendDataUltraPlus(int dest, T ** data, size_t * lineSizes, size_t size, int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
347  b.reset();
348  b.grow(getSize(data, lineSizes, size));
349 
350  for ( size_t i = 0; i < size; ++i){
351  b.write(data[i], lineSizes[i]*sizeof(T));
352  }
353  MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
354  }
355 
356  // Generic Data communication functions
357  // Send 1D Data
358  template <typename K, typename T>
359  void fastComm::sendDataUltra(unsigned long int id, int dest, K * keys, T * data, size_t * lineSizes, size_t size, int tagID, int tagDataSize, int tagKeys, int tagData){
360 
361  // Send data information
362  buffer[dest].reset();
363  buffer[dest] << id << size;
364  MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, tagID , MPI_COMM_WORLD, &req2[dest-1]);
365 
366  // Send subarrays sizes
367  if (tagDataSize)
368  MPI_Isend( lineSizes, size*sizeof(size_t), MPI_BYTE, dest, tagDataSize, MPI_COMM_WORLD, &req4[dest-1]);
369 
370  // Send Keys
371  if (tagKeys){
372  sendDataUltraPlus(dest, keys, NULL, size, tagKeys, buffer[dest], &req3[dest-1] );
373  MPI_Wait(&req3[dest-1], status);
374  }
375 
376  // Send subarrays
377  sendDataUltraPlus(dest, data, lineSizes, size, tagData, buffer[dest], &req[dest-1] );
378 
379  }
380 
381 
382  // 1D Primitive types
383  template <typename T>
384  void fastComm::sendFDDSetData(unsigned long int id, int dest, T * data, size_t size){
385  int * NULLRef = NULL;
386  sendDataUltra(id, dest, NULLRef, data, (size_t*) NULLRef, size, MSG_FDDSETDATAID, 0, 0, MSG_FDDSETDATA);
387  }
388  template <typename K, typename T>
389  void fastComm::sendFDDSetIData(unsigned long int id, int dest, K * keys, T * data, size_t size){
390  size_t * NULLRef = NULL;
391  sendDataUltra(id, dest, keys, data, NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
392  }
393 
394  // 2D Primitive types
395  template <typename T>
396  void fastComm::sendFDDSetData(unsigned long int id, int dest, T ** data, size_t *lineSize, size_t size){
397  int * NULLRef = NULL;
398  sendDataUltra(id, dest, NULLRef, data, lineSize, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, 0, MSG_FDDSET2DDATA);
399  }
400  template <typename K, typename T>
401  void fastComm::sendFDDSetIData(unsigned long int id, int dest, K * keys, T ** data, size_t *lineSize, size_t size){
402  sendDataUltra(id, dest, keys, data, lineSize, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
403  }
404 
405  template <typename K, typename T>
406  void fastComm::recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t &size){
407  void * NULLRef = NULL;
408  recvDataUltra(id, 0, keys, data, (size_t*&) NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
409  }
410 
411  template <typename K, typename T>
412  void fastComm::recvFDDSetIData(unsigned long int &id, K *& keys, T *& data, size_t *& lineSizes, size_t &size){
413  recvDataUltra(id, 0, keys, data, lineSizes, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
414  }
415 
416 
417  // Parallel data communication
418  template <typename T>
419  void fastComm::sendFDDData(unsigned long int id, int dest, T * data, size_t size){
420  sendDataUltra<int,T>(id, dest, NULL, data, NULL, size, MSG_FDDDATAID, 0, 0, MSG_FDDDATA);
421  }
422  template <typename K, typename T>
423  void fastComm::sendIFDDData(unsigned long int id, int dest, K * keys, T * data, size_t size){
424  sendDataUltra(id, dest, NULL, data, NULL, size, MSG_IFDDDATAID, 0, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
425  }
426 
427  template <typename T>
428  void fastComm::sendFDDDataCollect(unsigned long int id, T * data, size_t size){
429  buffer[0].reset();
430  buffer[0] << id << size;
431 
432  buffer[0].grow(16 + getSize(data, NULL, size));
433 
434  for( size_t i = 0; i < size; ++i ){
435  buffer[0] << data[i];
436  }
437 
438  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
439  }
440  template <typename T>
441  void fastComm::sendFDDDataCollect(unsigned long int id, T ** data, size_t * dataSizes, size_t size){
442 
443  buffer[0].reset();
444  buffer[0] << id << size;
445 
446  buffer[0].grow(16 + (size*sizeof(size_t)) + getSize(data, dataSizes, size));
447 
448  for( size_t i = 0; i < size; ++i ){
449  buffer[0] << dataSizes[i];
450  buffer[0].write(data[i], dataSizes[i]*sizeof(T));
451  }
452 
453  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
454  }
455  template <typename K, typename T>
456  void fastComm::sendFDDDataCollect(unsigned long int id, K * keys, T * data, size_t size){
457 
458  buffer[0].reset();
459  buffer[0] << id << size;
460 
461  buffer[0].grow(16 + getSize(keys, NULL, size) + getSize(data, NULL, size));
462 
463  for( size_t i = 0; i < size; ++i ){
464  buffer[0] << keys[i] << data[i];
465  }
466 
467  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
468  }
469  template <typename K, typename T>
470  void fastComm::sendFDDDataCollect(unsigned long int id, K * keys, T ** data, size_t * dataSizes, size_t size){
471 
472  buffer[0].reset();
473  buffer[0] << id << size;
474 
475  buffer[0].grow(16 + getSize(keys, NULL, size) + (size*sizeof(size_t)) + getSize(data, dataSizes, size));
476 
477  for( size_t i = 0; i < size; ++i ){
478  buffer[0] << keys[i] << dataSizes[i];
479  buffer[0].write(data[i], dataSizes[i]*sizeof(T));
480  }
481  MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
482  }
483 
484  template <typename T>
485  inline void fastComm::decodeCollect(T & item){
486  bufferRecv[0] >> item;
487  }
488  template <typename T>
489  inline void fastComm::decodeCollect(std::pair<T*,size_t> & item){
490  bufferRecv[0] >> item.second;
491  item.first = new T[item.second];
492  bufferRecv[0].read(item.first, item.second * sizeof(T) );
493  }
494  template <typename K, typename T>
495  inline void fastComm::decodeCollect(std::pair<K, T> & item){
496  bufferRecv[0] >> item.first >> item.second;
497  }
498  template <typename K, typename T>
499  inline void fastComm::decodeCollect(std::tuple<K, T*, size_t> & item){
500  bufferRecv[0] >> std::get<0>(item) >> std::get<2>(item);
501  std::get<1>(item) = new T[std::get<2>(item)];
502  bufferRecv[0].read(std::get<1>(item), std::get<2>(item) * sizeof(T) );
503  }
504 
505  template <typename T>
506  void fastComm::recvFDDDataCollect(std::vector<T> & ret){
507  size_t count = 0, size;
508  unsigned long int id;
509  for (int i = 1; i < (numProcs); ++i){
510  MPI_Status stat;
511  int msgSize = 0;
512 
513  MPI_Probe(i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
514  MPI_Get_count(&stat, MPI_BYTE, &msgSize);
515  bufferRecv[0].reset();
516  bufferRecv[0].grow(msgSize);
517 
518  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
519 
520  bufferRecv[0] >> id >> size;
521  //std::cerr << "[" << id << ":" << size<< "] " ;
522  for (size_t j = 0; j < size; ++j){
523  decodeCollect(ret[count]);
524  count ++;
525  }
526  }
527  }
528 
529  // GroupByKey
530  template <typename K>
531  void faster::fastComm::sendKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap){
532  buffer[0].reset();
533  buffer[0] << tid << size_t(keyMap.size());
534 
535  for ( auto it = keyMap.begin(); it != keyMap.end(); it++){
536  buffer[0] << it->first << it->second;
537  }
538 
539  for ( int i = 1; i < (numProcs); ++i)
540  MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_KEYMAP, MPI_COMM_WORLD, &req[i-1]);
541  MPI_Waitall( numProcs - 1, req, status);
542  }
543  template <typename K>
544  void faster::fastComm::recvKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap){
545  MPI_Status stat;
546  size_t size;
547  int rsize;
548 
549  MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
550  MPI_Get_count(&stat, MPI_BYTE, &rsize);
551  bufferRecv[0].grow(rsize);
552 
553  bufferRecv[0].reset();
554 
555  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
556 
557  bufferRecv[0] >> tid >> size;
558 
559  // Allocate map with pre-known size
560  keyMap.reserve(size);
561 
562  for ( size_t i = 0; i < size; ++i){
563  K key;
564  int count;
565  bufferRecv[0] >> key >> count;
566  keyMap[key] = count;
567  }
568  }
569  template <typename K>
570  void faster::fastComm::sendCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
571  buffer[0].reset();
572  buffer[0] << tid << size_t(keyMap.size());
573 
574  for ( auto it = keyMap.begin(); it != keyMap.end(); it++){
575  buffer[0] << it->first << it->second;
576  }
577 
578  buffer[0] << int(flags.size());
579  for ( size_t i = 0; i < flags.size(); ++i){
580  buffer[0] << char(flags[i]);
581  }
582 
583  for ( int i = 1; i < (numProcs); ++i)
584  MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_KEYMAP, MPI_COMM_WORLD, &req[i-1]);
585  MPI_Waitall( numProcs - 1, req, status);
586  }
587  template <typename K>
588  void faster::fastComm::recvCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
589  MPI_Status stat;
590  size_t size;
591  int rsize;
592 
593  MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
594  MPI_Get_count(&stat, MPI_BYTE, &rsize);
595  bufferRecv[0].grow(rsize);
596 
597  bufferRecv[0].reset();
598 
599  MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
600 
601  bufferRecv[0] >> tid >> size;
602 
603  // Allocate map with pre-known size
604  keyMap.reserve(size);
605 
606  for ( size_t i = 0; i < size; ++i){
607  K key;
608  int count;
609  bufferRecv[0] >> key >> count;
610  keyMap[key] = count;
611  }
612 
613  int numFlags = 0;
614  bufferRecv[0] >> numFlags;
615  flags.resize(numFlags);
616 
617  for ( int i = 0; i < numFlags; i++ ){
618  char flag;
619  bufferRecv[0] >> flag;
620  flags[i] = bool(flag);
621  }
622  }
623 
624 
625 }
626 #endif
Definition: misc.h:15
Definition: fastComm.h:116
Definition: fastCommBuffer.h:15
Definition: fastContext.h:54
Definition: _workerFdd.h:11
Definition: fastTask.h:11