1 #ifndef LIBFASTER_FASTCOMM_H
2 #define LIBFASTER_FASTCOMM_H
9 #include "definitions.h"
10 #include "fastCommBuffer.h"
66 MSG_FDDSET2DDATASIZES,
78 MSG_FDDSET2DIDATASIZES,
81 MSG_KEYOWNERSHIPSUGEST,
96 #define FDDTYPE_NULL 0x00
97 #define FDDTYPE_CHAR 0x01
98 #define FDDTYPE_INT 0x02
99 #define FDDTYPE_LONGINT 0x03
100 #define FDDTYPE_FLOAT 0x04
101 #define FDDTYPE_DOUBLE 0x05
102 #define FDDTYPE_STRING 0x07
103 #define FDDTYPE_CHARP 0x08
104 #define FDDTYPE_INTP 0x09
105 #define FDDTYPE_LONGINTP 0x0A
106 #define FDDTYPE_FLOATP 0x0B
107 #define FDDTYPE_DOUBLEP 0x0C
108 #define FDDTYPE_OBJECT 0x06
125 MPI_Group slaveGroup;
131 double timeStart, timeEnd;
133 template <
typename T>
134 void sendDataUltraPlus(
int dest, T * data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
135 void sendDataUltraPlus(
int dest, std::string * data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
136 template <
typename T>
137 void sendDataUltraPlus(
int dest, std::vector<T> * data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
138 template <
typename T>
139 void sendDataUltraPlus(
int dest, T ** data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
141 template <
typename K,
typename T>
142 void sendDataUltra(
unsigned long int id,
int dest, K * keys, T * data,
size_t * lineSizes,
size_t size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData);
145 void recvDataUltraPlus(
int src,
void *& data,
int & size,
int tag,
fastCommBuffer & b UNUSED);
147 void recvDataUltra(
unsigned long int &
id,
int src,
void *& keys,
void *& data,
size_t *& lineSizes,
size_t &size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData);
155 fastComm(
int & argc,
char **& argv);
158 int getProcId(){
return procId; }
159 int getNumProcs(){
return numProcs; }
165 void probeMsgs(
int & tag,
int & src);
166 void waitForReq(
int numReqs);
169 template <
typename T>
170 size_t getSize( T * data UNUSED,
size_t * ds UNUSED,
size_t s );
171 template <
typename T>
172 size_t getSize( std::vector<T> * data,
size_t * ds UNUSED,
size_t s );
173 template <
typename T>
174 size_t getSize( T ** data UNUSED,
size_t * ds,
size_t s );
175 size_t getSize( std::string * data,
size_t * ds UNUSED,
size_t s );
182 void sendTaskResult();
183 void * recvTaskResult(
unsigned long int &tid,
unsigned long int & sid,
size_t &size,
size_t & time,
procstat & stat);
186 void sendCreateFDD(
unsigned long int id, fddType type,
size_t size,
int dest);
187 void recvCreateFDD(
unsigned long int &
id, fddType &type,
size_t & size);
188 void sendCreateIFDD(
unsigned long int id, fddType kType, fddType tType,
size_t size,
int dest);
189 void recvCreateIFDD(
unsigned long int &
id, fddType &kType, fddType &tType,
size_t & size);
191 void sendCreateFDDGroup(
unsigned long int id, fddType keyType, std::vector<unsigned long int> & members);
192 void recvCreateFDDGroup(
unsigned long int &
id, fddType & keyType, std::vector<unsigned long int> & members);
194 void sendDiscardFDD(
unsigned long int id);
195 void recvDiscardFDD(
unsigned long int &
id);
198 template <
typename T>
199 void sendFDDSetData(
unsigned long int id,
int dest, T * data,
size_t size);
200 template <
typename T>
201 void sendFDDSetData(
unsigned long int id,
int dest, T ** data,
size_t * lineSizes,
size_t size);
203 template <
typename K,
typename T>
204 void sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size);
205 template <
typename K,
typename T>
206 void sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T ** data,
size_t * lineSizes,
size_t size);
208 void recvFDDSetData(
unsigned long int &
id,
void *& data,
size_t &size);
209 void recvFDDSetData(
unsigned long int &
id,
void *& data,
size_t *& lineSizes,
size_t &size);
211 template <
typename K,
typename T>
212 void recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t &size);
213 template <
typename K,
typename T>
214 void recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t *& lineSizes,
size_t &size);
218 template <
typename T>
219 void sendFDDData(
unsigned long int id,
int dest, T * data,
size_t size);
220 template <
typename K,
typename T>
221 void sendIFDDData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size);
222 void recvFDDData(
unsigned long int &
id,
void * data,
size_t &size);
223 void recvIFDDData(
unsigned long int &
id,
void * keys,
void * data,
size_t &size);
226 template <
typename T>
227 void sendFDDDataCollect(
unsigned long int id, T * data,
size_t size);
228 template <
typename T>
229 void sendFDDDataCollect(
unsigned long int id, T ** data,
size_t * dataSizes,
size_t size);
230 template <
typename K,
typename T>
231 void sendFDDDataCollect(
unsigned long int id, K * keys, T * data,
size_t size);
232 template <
typename K,
typename T>
233 void sendFDDDataCollect(
unsigned long int id, K * keys, T ** data,
size_t * dataSizes,
size_t size);
236 template <
typename T>
237 inline void decodeCollect(T & item);
238 template <
typename T>
239 inline void decodeCollect(std::pair<T*,size_t> & item);
240 template <
typename K,
typename T>
241 inline void decodeCollect(std::pair<K, T> & item);
242 template <
typename K,
typename T>
243 inline void decodeCollect(std::tuple<K, T*, size_t> & item);
245 template <
typename T>
246 void recvFDDDataCollect(std::vector<T> & ret);
252 void sendReadFDDFile(
unsigned long int id, std::string filename,
size_t size,
size_t offset,
int dest);
253 void recvReadFDDFile(
unsigned long int &
id, std::string & filename,
size_t &size,
size_t & offset);
255 void sendFDDInfo(
size_t size);
256 void recvFDDInfo(
size_t &size,
int & src);
258 void sendCollect(
unsigned long int id);
259 void recvCollect(
unsigned long int &
id);
266 template <
typename K>
267 void sendKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap);
268 template <
typename K>
269 void recvKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap);
270 template <
typename K>
271 void sendCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
272 template <
typename K>
273 void recvCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
275 void sendGroupByKeyData(
int i);
276 void * recvGroupByKeyData(
int &size);
305 template <
typename T>
306 size_t faster::fastComm::getSize( T * data UNUSED,
size_t * ds UNUSED,
size_t s ){
310 template <
typename T>
311 size_t faster::fastComm::getSize( std::vector<T> * data,
size_t * ds UNUSED,
size_t s ){
312 size_t rawDataSize = 0;
313 for(
size_t i = 0; i < s; ++i ){
314 rawDataSize += (
sizeof(size_t) + data[i].size()*
sizeof(T));
319 template <
typename T>
320 size_t faster::fastComm::getSize( T ** data UNUSED,
size_t * ds,
size_t s ){
321 size_t rawDataSize = 0;
322 for(
size_t i = 0; i < s; ++i ){
323 rawDataSize += ds[i]*
sizeof(T);
329 template <
typename T>
330 void fastComm::sendDataUltraPlus(
int dest, T * data,
size_t * lineSizes UNUSED,
size_t size,
int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
331 MPI_Isend( data, size*
sizeof(T), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
334 template <
typename T>
335 void fastComm::sendDataUltraPlus(
int dest, std::vector<T> * data,
size_t * lineSizes UNUSED,
size_t size,
int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
337 b.grow(getSize(data, lineSizes, size));
339 for (
size_t i = 0; i < size; ++i){
342 MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
345 template <
typename T>
346 void fastComm::sendDataUltraPlus(
int dest, T ** data,
size_t * lineSizes,
size_t size,
int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
348 b.grow(getSize(data, lineSizes, size));
350 for (
size_t i = 0; i < size; ++i){
351 b.write(data[i], lineSizes[i]*
sizeof(T));
353 MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
358 template <
typename K,
typename T>
359 void fastComm::sendDataUltra(
unsigned long int id,
int dest, K * keys, T * data,
size_t * lineSizes,
size_t size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData){
362 buffer[dest].reset();
363 buffer[dest] <<
id << size;
364 MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, tagID , MPI_COMM_WORLD, &req2[dest-1]);
368 MPI_Isend( lineSizes, size*
sizeof(
size_t), MPI_BYTE, dest, tagDataSize, MPI_COMM_WORLD, &req4[dest-1]);
372 sendDataUltraPlus(dest, keys, NULL, size, tagKeys, buffer[dest], &req3[dest-1] );
373 MPI_Wait(&req3[dest-1], status);
377 sendDataUltraPlus(dest, data, lineSizes, size, tagData, buffer[dest], &req[dest-1] );
383 template <
typename T>
384 void fastComm::sendFDDSetData(
unsigned long int id,
int dest, T * data,
size_t size){
385 int * NULLRef = NULL;
386 sendDataUltra(
id, dest, NULLRef, data, (
size_t*) NULLRef, size, MSG_FDDSETDATAID, 0, 0, MSG_FDDSETDATA);
388 template <
typename K,
typename T>
389 void fastComm::sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size){
390 size_t * NULLRef = NULL;
391 sendDataUltra(
id, dest, keys, data, NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
395 template <
typename T>
396 void fastComm::sendFDDSetData(
unsigned long int id,
int dest, T ** data,
size_t *lineSize,
size_t size){
397 int * NULLRef = NULL;
398 sendDataUltra(
id, dest, NULLRef, data, lineSize, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, 0, MSG_FDDSET2DDATA);
400 template <
typename K,
typename T>
401 void fastComm::sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T ** data,
size_t *lineSize,
size_t size){
402 sendDataUltra(
id, dest, keys, data, lineSize, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
405 template <
typename K,
typename T>
406 void fastComm::recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t &size){
407 void * NULLRef = NULL;
408 recvDataUltra(
id, 0, keys, data, (
size_t*&) NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
411 template <
typename K,
typename T>
412 void fastComm::recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t *& lineSizes,
size_t &size){
413 recvDataUltra(
id, 0, keys, data, lineSizes, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
418 template <
typename T>
419 void fastComm::sendFDDData(
unsigned long int id,
int dest, T * data,
size_t size){
420 sendDataUltra<int,T>(id, dest, NULL, data, NULL, size, MSG_FDDDATAID, 0, 0, MSG_FDDDATA);
422 template <
typename K,
typename T>
423 void fastComm::sendIFDDData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size){
424 sendDataUltra(
id, dest, NULL, data, NULL, size, MSG_IFDDDATAID, 0, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
427 template <
typename T>
428 void fastComm::sendFDDDataCollect(
unsigned long int id, T * data,
size_t size){
430 buffer[0] <<
id << size;
432 buffer[0].grow(16 + getSize(data, NULL, size));
434 for(
size_t i = 0; i < size; ++i ){
435 buffer[0] << data[i];
438 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
440 template <
typename T>
441 void fastComm::sendFDDDataCollect(
unsigned long int id, T ** data,
size_t * dataSizes,
size_t size){
444 buffer[0] <<
id << size;
446 buffer[0].grow(16 + (size*
sizeof(
size_t)) + getSize(data, dataSizes, size));
448 for(
size_t i = 0; i < size; ++i ){
449 buffer[0] << dataSizes[i];
450 buffer[0].write(data[i], dataSizes[i]*
sizeof(T));
453 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
455 template <
typename K,
typename T>
456 void fastComm::sendFDDDataCollect(
unsigned long int id, K * keys, T * data,
size_t size){
459 buffer[0] <<
id << size;
461 buffer[0].grow(16 + getSize(keys, NULL, size) + getSize(data, NULL, size));
463 for(
size_t i = 0; i < size; ++i ){
464 buffer[0] << keys[i] << data[i];
467 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
469 template <
typename K,
typename T>
470 void fastComm::sendFDDDataCollect(
unsigned long int id, K * keys, T ** data,
size_t * dataSizes,
size_t size){
473 buffer[0] <<
id << size;
475 buffer[0].grow(16 + getSize(keys, NULL, size) + (size*
sizeof(
size_t)) + getSize(data, dataSizes, size));
477 for(
size_t i = 0; i < size; ++i ){
478 buffer[0] << keys[i] << dataSizes[i];
479 buffer[0].write(data[i], dataSizes[i]*
sizeof(T));
481 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
484 template <
typename T>
485 inline void fastComm::decodeCollect(T & item){
486 bufferRecv[0] >> item;
488 template <
typename T>
489 inline void fastComm::decodeCollect(std::pair<T*,size_t> & item){
490 bufferRecv[0] >> item.second;
491 item.first =
new T[item.second];
492 bufferRecv[0].read(item.first, item.second *
sizeof(T) );
494 template <
typename K,
typename T>
495 inline void fastComm::decodeCollect(std::pair<K, T> & item){
496 bufferRecv[0] >> item.first >> item.second;
498 template <
typename K,
typename T>
499 inline void fastComm::decodeCollect(std::tuple<K, T*, size_t> & item){
500 bufferRecv[0] >> std::get<0>(item) >> std::get<2>(item);
501 std::get<1>(item) =
new T[std::get<2>(item)];
502 bufferRecv[0].read(std::get<1>(item), std::get<2>(item) *
sizeof(T) );
505 template <
typename T>
506 void fastComm::recvFDDDataCollect(std::vector<T> & ret){
507 size_t count = 0, size;
508 unsigned long int id;
509 for (
int i = 1; i < (numProcs); ++i){
513 MPI_Probe(i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
514 MPI_Get_count(&stat, MPI_BYTE, &msgSize);
515 bufferRecv[0].reset();
516 bufferRecv[0].grow(msgSize);
518 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
520 bufferRecv[0] >>
id >> size;
522 for (
size_t j = 0; j < size; ++j){
523 decodeCollect(ret[count]);
530 template <
typename K>
531 void faster::fastComm::sendKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap){
533 buffer[0] << tid << size_t(keyMap.size());
535 for (
auto it = keyMap.begin(); it != keyMap.end(); it++){
536 buffer[0] << it->first << it->second;
539 for (
int i = 1; i < (numProcs); ++i)
540 MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_KEYMAP, MPI_COMM_WORLD, &req[i-1]);
541 MPI_Waitall( numProcs - 1, req, status);
543 template <
typename K>
544 void faster::fastComm::recvKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap){
549 MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
550 MPI_Get_count(&stat, MPI_BYTE, &rsize);
551 bufferRecv[0].grow(rsize);
553 bufferRecv[0].reset();
555 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
557 bufferRecv[0] >> tid >> size;
560 keyMap.reserve(size);
562 for (
size_t i = 0; i < size; ++i){
565 bufferRecv[0] >> key >> count;
569 template <
typename K>
570 void faster::fastComm::sendCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
572 buffer[0] << tid << size_t(keyMap.size());
574 for (
auto it = keyMap.begin(); it != keyMap.end(); it++){
575 buffer[0] << it->first << it->second;
578 buffer[0] << int(flags.size());
579 for (
size_t i = 0; i < flags.size(); ++i){
580 buffer[0] << char(flags[i]);
583 for (
int i = 1; i < (numProcs); ++i)
584 MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_KEYMAP, MPI_COMM_WORLD, &req[i-1]);
585 MPI_Waitall( numProcs - 1, req, status);
587 template <
typename K>
588 void faster::fastComm::recvCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
593 MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
594 MPI_Get_count(&stat, MPI_BYTE, &rsize);
595 bufferRecv[0].grow(rsize);
597 bufferRecv[0].reset();
599 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
601 bufferRecv[0] >> tid >> size;
604 keyMap.reserve(size);
606 for (
size_t i = 0; i < size; ++i){
609 bufferRecv[0] >> key >> count;
614 bufferRecv[0] >> numFlags;
615 flags.resize(numFlags);
617 for (
int i = 0; i < numFlags; i++ ){
619 bufferRecv[0] >> flag;
620 flags[i] = bool(flag);
Definition: fastComm.h:116
Definition: fastCommBuffer.h:15
Definition: fastContext.h:54
Definition: _workerFdd.h:11
Definition: fastTask.h:11