1 #ifndef LIBFASTER_FASTCOMM_H 2 #define LIBFASTER_FASTCOMM_H 9 #include "definitions.h" 10 #include "fastCommBuffer.h" 66 MSG_FDDSET2DDATASIZES,
80 MSG_FDDSET2DIDATASIZES,
83 MSG_KEYOWNERSHIPSUGEST,
99 #define FDDTYPE_NULL 0x00 100 #define FDDTYPE_CHAR 0x01 101 #define FDDTYPE_INT 0x02 102 #define FDDTYPE_LONGINT 0x03 103 #define FDDTYPE_FLOAT 0x04 104 #define FDDTYPE_DOUBLE 0x05 105 #define FDDTYPE_STRING 0x07 106 #define FDDTYPE_CHARP 0x08 107 #define FDDTYPE_INTP 0x09 108 #define FDDTYPE_LONGINTP 0x0A 109 #define FDDTYPE_FLOATP 0x0B 110 #define FDDTYPE_DOUBLEP 0x0C 111 #define FDDTYPE_OBJECT 0x06 128 MPI_Group slaveGroup;
134 double timeStart, timeEnd;
136 std::vector<bool> bufferBusy;
138 template <
typename T>
139 void sendDataUltraPlus(
int dest, T * data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
140 void sendDataUltraPlus(
int dest, std::string * data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
141 template <
typename T>
142 void sendDataUltraPlus(
int dest, std::vector<T> * data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
143 template <
typename T>
144 void sendDataUltraPlus(
int dest, T ** data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request);
146 template <
typename K,
typename T>
147 void sendDataUltra(
unsigned long int id,
int dest, K * keys, T * data,
size_t * lineSizes,
size_t size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData);
150 void recvDataUltraPlus(
int src,
void *& data,
int & size,
int tag,
fastCommBuffer & b UNUSED);
152 void recvDataUltra(
unsigned long int &
id,
int src,
void *& keys,
void *& data,
size_t *& lineSizes,
size_t &size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData);
161 const size_t maxMsgSize = 15000;
167 int getProcId(){
return procId; }
168 int getNumProcs(){
return numProcs; }
174 void probeMsgs(
int & tag,
int & src);
175 void waitForReq(
int numReqs);
179 template <
typename T>
180 size_t getSize( T * data UNUSED,
size_t * ds UNUSED,
size_t s );
181 template <
typename T>
182 size_t getSize( std::vector<T> * data,
size_t * ds UNUSED,
size_t s );
183 template <
typename T>
184 size_t getSize( T ** data UNUSED,
size_t * ds,
size_t s );
185 size_t getSize( std::string * data,
size_t * ds UNUSED,
size_t s );
192 void sendTaskResult();
193 void * recvTaskResult(
unsigned long int &tid,
unsigned long int & sid,
size_t &size,
size_t & time,
procstat & stat);
196 void sendCreateFDD(
unsigned long int id,
fddType type,
size_t size,
int dest);
197 void recvCreateFDD(
unsigned long int &
id,
fddType &type,
size_t & size);
198 void sendCreateIFDD(
unsigned long int id,
fddType kType,
fddType tType,
size_t size,
int dest);
199 void recvCreateIFDD(
unsigned long int &
id,
fddType &kType,
fddType &tType,
size_t & size);
201 void sendCreateFDDGroup(
unsigned long int id,
fddType keyType, std::vector<unsigned long int> & members);
202 void recvCreateFDDGroup(
unsigned long int &
id,
fddType & keyType, std::vector<unsigned long int> & members);
204 void sendDiscardFDD(
unsigned long int id);
205 void recvDiscardFDD(
unsigned long int &
id);
208 template <
typename T>
209 void sendFDDSetData(
unsigned long int id,
int dest, T * data,
size_t size);
210 template <
typename T>
211 void sendFDDSetData(
unsigned long int id,
int dest, T ** data,
size_t * lineSizes,
size_t size);
213 template <
typename K,
typename T>
214 void sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size);
215 template <
typename K,
typename T>
216 void sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T ** data,
size_t * lineSizes,
size_t size);
218 void recvFDDSetData(
unsigned long int &
id,
void *& data,
size_t &size);
219 void recvFDDSetData(
unsigned long int &
id,
void *& data,
size_t *& lineSizes,
size_t &size);
221 template <
typename K,
typename T>
222 void recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t &size);
223 template <
typename K,
typename T>
224 void recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t *& lineSizes,
size_t &size);
228 template <
typename T>
229 void sendFDDData(
unsigned long int id,
int dest, T * data,
size_t size);
230 template <
typename K,
typename T>
231 void sendIFDDData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size);
232 void recvFDDData(
unsigned long int &
id,
void * data,
size_t &size);
233 void recvIFDDData(
unsigned long int &
id,
void * keys,
void * data,
size_t &size);
236 template <
typename T>
237 void sendFDDDataCollect(
unsigned long int id, T * data,
size_t size);
238 template <
typename T>
239 void sendFDDDataCollect(
unsigned long int id, T ** data,
size_t * dataSizes,
size_t size);
240 template <
typename K,
typename T>
241 void sendFDDDataCollect(
unsigned long int id, K * keys, T * data,
size_t size);
242 template <
typename K,
typename T>
243 void sendFDDDataCollect(
unsigned long int id, K * keys, T ** data,
size_t * dataSizes,
size_t size);
246 template <
typename T>
247 inline void decodeCollect(T & item);
248 template <
typename T>
249 inline void decodeCollect(std::pair<T*,size_t> & item);
250 template <
typename K,
typename T>
251 inline void decodeCollect(std::pair<K, T> & item);
252 template <
typename K,
typename T>
253 inline void decodeCollect(std::tuple<K, T*, size_t> & item);
255 template <
typename T>
256 void recvFDDDataCollect(std::vector<T> & ret);
263 void sendReadFDDFile(
unsigned long int id, std::string filename,
size_t size,
size_t offset,
int dest);
264 void recvReadFDDFile(
unsigned long int &
id, std::string & filename,
size_t &size,
size_t & offset);
265 void sendWriteFDDFile(
unsigned long int id,std::string & path, std::string & sufix);
266 void recvWriteFDDFile(
unsigned long int &
id,std::string & path, std::string & sufix);
268 void sendFDDInfo(
size_t size);
269 void recvFDDInfo(
size_t &size,
int & src);
271 void sendFileName(std::string path);
272 void recvFileName(std::string & filename);
274 void sendCollect(
unsigned long int id);
275 void recvCollect(
unsigned long int &
id);
279 void bcastBuffer(
int src,
int i);
283 template <
typename K>
284 void sendKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap);
285 template <
typename K>
286 void recvKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap);
287 template <
typename K>
288 void distributeKeyMap(std::unordered_map<K, int> & localKeyMap, std::unordered_map<K, int> & keyMap);
289 template <
typename K>
290 void sendCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
291 template <
typename K>
292 void recvCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
294 bool isSendBufferFree(
int i);
295 void sendGroupByKeyData(
int i);
296 void * recvGroupByKeyData(
int &size);
325 template <
typename T>
326 size_t faster::fastComm::getSize( T * data UNUSED,
size_t * ds UNUSED,
size_t s ){
330 template <
typename T>
331 size_t faster::fastComm::getSize( std::vector<T> * data,
size_t * ds UNUSED,
size_t s ){
332 size_t rawDataSize = 0;
333 for(
size_t i = 0; i < s; ++i ){
334 rawDataSize += (
sizeof(size_t) + data[i].size()*
sizeof(T));
339 template <
typename T>
340 size_t faster::fastComm::getSize( T ** data UNUSED,
size_t * ds,
size_t s ){
341 size_t rawDataSize = 0;
342 for(
size_t i = 0; i < s; ++i ){
343 rawDataSize += ds[i]*
sizeof(T);
349 template <
typename T>
350 void fastComm::sendDataUltraPlus(
int dest, T * data,
size_t * lineSizes UNUSED,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request){
351 MPI_Isend( data, size*
sizeof(T), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
354 template <
typename T>
355 void fastComm::sendDataUltraPlus(
int dest, std::vector<T> * data,
size_t * lineSizes UNUSED,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request){
357 b.grow(getSize(data, lineSizes, size));
359 for (
size_t i = 0; i < size; ++i){
362 MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
365 template <
typename T>
366 void fastComm::sendDataUltraPlus(
int dest, T ** data,
size_t * lineSizes,
size_t size,
int tag,
fastCommBuffer & b UNUSED, MPI_Request * request){
368 b.grow(getSize(data, lineSizes, size));
370 for (
size_t i = 0; i < size; ++i){
371 b.write(data[i], lineSizes[i]*
sizeof(T));
373 MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
378 template <
typename K,
typename T>
379 void fastComm::sendDataUltra(
unsigned long int id,
int dest, K * keys, T * data,
size_t * lineSizes,
size_t size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData){
382 buffer[dest].reset();
383 buffer[dest] <<
id << size;
384 MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, tagID , MPI_COMM_WORLD, &req2[dest-1]);
388 MPI_Isend( lineSizes, size*
sizeof(
size_t), MPI_BYTE, dest, tagDataSize, MPI_COMM_WORLD, &req4[dest-1]);
392 sendDataUltraPlus(dest, keys, NULL, size, tagKeys, buffer[dest], &req3[dest-1] );
393 MPI_Wait(&req3[dest-1], status);
397 sendDataUltraPlus(dest, data, lineSizes, size, tagData, buffer[dest], &req[dest-1] );
403 template <
typename T>
404 void fastComm::sendFDDSetData(
unsigned long int id,
int dest, T * data,
size_t size){
405 int * NULLRef = NULL;
406 sendDataUltra(
id, dest, NULLRef, data, (
size_t*) NULLRef, size, MSG_FDDSETDATAID, 0, 0, MSG_FDDSETDATA);
408 template <
typename K,
typename T>
409 void fastComm::sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size){
410 size_t * NULLRef = NULL;
411 sendDataUltra(
id, dest, keys, data, NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
415 template <
typename T>
416 void fastComm::sendFDDSetData(
unsigned long int id,
int dest, T ** data,
size_t *lineSize,
size_t size){
417 int * NULLRef = NULL;
418 sendDataUltra(
id, dest, NULLRef, data, lineSize, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, 0, MSG_FDDSET2DDATA);
420 template <
typename K,
typename T>
421 void fastComm::sendFDDSetIData(
unsigned long int id,
int dest, K * keys, T ** data,
size_t *lineSize,
size_t size){
422 sendDataUltra(
id, dest, keys, data, lineSize, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
425 template <
typename K,
typename T>
426 void fastComm::recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t &size){
427 void * NULLRef = NULL;
428 recvDataUltra(
id, 0, keys, data, (
size_t*&) NULLRef, size, MSG_FDDSETIDATAID, 0, MSG_FDDSETIKEYS, MSG_FDDSETIDATA);
431 template <
typename K,
typename T>
432 void fastComm::recvFDDSetIData(
unsigned long int &
id, K *& keys, T *& data,
size_t *& lineSizes,
size_t &size){
433 recvDataUltra(
id, 0, keys, data, lineSizes, size, MSG_FDDSET2DIDATAID, MSG_FDDSET2DIDATASIZES, MSG_FDDSET2DIKEYS, MSG_FDDSET2DIDATA);
438 template <
typename T>
439 void fastComm::sendFDDData(
unsigned long int id,
int dest, T * data,
size_t size){
440 sendDataUltra<int,T>(id, dest, NULL, data, NULL, size, MSG_FDDDATAID, 0, 0, MSG_FDDDATA);
442 template <
typename K,
typename T>
443 void fastComm::sendIFDDData(
unsigned long int id,
int dest, K * keys, T * data,
size_t size){
444 sendDataUltra(
id, dest, NULL, data, NULL, size, MSG_IFDDDATAID, 0, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
447 template <
typename T>
448 void fastComm::sendFDDDataCollect(
unsigned long int id, T * data,
size_t size){
450 buffer[0] <<
id << size;
452 buffer[0].grow(16 + getSize(data, NULL, size));
454 for(
size_t i = 0; i < size; ++i ){
455 buffer[0] << data[i];
458 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
460 template <
typename T>
461 void fastComm::sendFDDDataCollect(
unsigned long int id, T ** data,
size_t * dataSizes,
size_t size){
464 buffer[0] <<
id << size;
466 buffer[0].grow(16 + (size*
sizeof(
size_t)) + getSize(data, dataSizes, size));
468 for(
size_t i = 0; i < size; ++i ){
469 buffer[0] << dataSizes[i];
470 buffer[0].write(data[i], dataSizes[i]*
sizeof(T));
473 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
475 template <
typename K,
typename T>
476 void fastComm::sendFDDDataCollect(
unsigned long int id, K * keys, T * data,
size_t size){
479 buffer[0] <<
id << size;
481 buffer[0].grow(16 + getSize(keys, NULL, size) + getSize(data, NULL, size));
483 for(
size_t i = 0; i < size; ++i ){
484 buffer[0] << keys[i] << data[i];
487 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
489 template <
typename K,
typename T>
490 void fastComm::sendFDDDataCollect(
unsigned long int id, K * keys, T ** data,
size_t * dataSizes,
size_t size){
493 buffer[0] <<
id << size;
495 buffer[0].grow(16 + getSize(keys, NULL, size) + (size*
sizeof(
size_t)) + getSize(data, dataSizes, size));
497 for(
size_t i = 0; i < size; ++i ){
498 buffer[0] << keys[i] << dataSizes[i];
499 buffer[0].write(data[i], dataSizes[i]*
sizeof(T));
501 MPI_Isend( buffer[0].data(), buffer[0].size(), MPI_BYTE, 0, MSG_COLLECTDATA , MPI_COMM_WORLD, req);
504 template <
typename T>
505 inline void fastComm::decodeCollect(T & item){
506 bufferRecv[0] >> item;
508 template <
typename T>
509 inline void fastComm::decodeCollect(std::pair<T*,size_t> & item){
510 bufferRecv[0] >> item.second;
511 item.first =
new T[item.second];
512 bufferRecv[0].read(item.first, item.second *
sizeof(T) );
514 template <
typename K,
typename T>
515 inline void fastComm::decodeCollect(std::pair<K, T> & item){
516 bufferRecv[0] >> item.first >> item.second;
518 template <
typename K,
typename T>
519 inline void fastComm::decodeCollect(std::tuple<K, T*, size_t> & item){
520 bufferRecv[0] >> std::get<0>(item) >> std::get<2>(item);
521 std::get<1>(item) =
new T[std::get<2>(item)];
522 bufferRecv[0].read(std::get<1>(item), std::get<2>(item) *
sizeof(T) );
525 template <
typename T>
526 void fastComm::recvFDDDataCollect(std::vector<T> & ret){
527 size_t count = 0, size;
528 unsigned long int id;
529 for (
int i = 1; i < (numProcs); ++i){
533 MPI_Probe(i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
534 MPI_Get_count(&stat, MPI_BYTE, &msgSize);
535 bufferRecv[0].reset();
536 bufferRecv[0].grow(msgSize);
538 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, i, MSG_COLLECTDATA, MPI_COMM_WORLD, &stat);
540 bufferRecv[0] >>
id >> size;
542 for (
size_t j = 0; j < size; ++j){
543 decodeCollect(ret[count]);
550 template <
typename K>
551 void faster::fastComm::sendKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap){
552 const int itemsPerMsg = (256 * 1024);
553 int num_msgs = ceil( (
double) keyMap.size() / itemsPerMsg );
554 int inserted_items = 0;
556 int current_buffer = 0;
562 buffer[current_buffer].reset();
563 buffer[current_buffer] << tid << size_t(keyMap.size());
565 std::cerr <<
" NM:" << num_msgs <<
"\n";
567 for (
auto it = keyMap.begin(); it != keyMap.end(); it++){
568 buffer[current_buffer] << it->first << it->second;
572 if (inserted_items >= itemsPerMsg){
573 std::cerr << msg_num <<
" ";
576 MPI_Waitall( (numProcs - 1), req, status);
578 for (
int i = 1; i < (numProcs); ++i)
580 buffer[current_buffer].data(),
581 buffer[current_buffer].size(),
589 buffer[current_buffer].reset();
594 current_buffer = (current_buffer + 1) % numProcs;
599 if (inserted_items > 0){
600 std::cerr << msg_num <<
" ";
603 MPI_Waitall( (numProcs - 1), req, status);
605 for (
int i = 1; i < (numProcs); ++i)
607 buffer[current_buffer].data(),
608 buffer[current_buffer].size(),
620 MPI_Waitall( (numProcs - 1), req, status);
625 template <
typename K>
626 void faster::fastComm::recvKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap){
632 MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
633 MPI_Get_count(&stat, MPI_BYTE, &rsize);
634 bufferRecv[0].grow(rsize);
635 bufferRecv[0].reset();
636 MPI_Recv(bufferRecv[0].data(),
637 bufferRecv[0].free(),
645 bufferRecv[0] >> tid >> size;
648 keyMap.reserve(size);
650 const int itemsPerMsg = (256 * 1024);
651 int num_msgs = ceil( (
double) size / itemsPerMsg );
652 int inserted_items = 0;
653 std::cerr <<
"(NM:" << num_msgs <<
")\n";
655 for (
int msg_num = 0; msg_num < num_msgs; ++msg_num){
656 int numRevcItems = std::min(size - inserted_items,
size_t(itemsPerMsg));
657 std::cerr << msg_num <<
" ";
661 MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
662 MPI_Get_count(&stat, MPI_BYTE, &rsize);
663 bufferRecv[0].grow(rsize);
664 bufferRecv[0].reset();
666 bufferRecv[0].data(),
667 bufferRecv[0].free(),
677 for (
int i = 0; i < numRevcItems; ++i){
680 bufferRecv[0] >> key >> count;
683 inserted_items += numRevcItems;
687 template <
typename K>
688 void faster::fastComm::distributeKeyMap(std::unordered_map<K, int> & localKeyMap, std::unordered_map<K, int> & keyMap){
690 keyMap.reserve(localKeyMap.size() * numProcs);
692 std::cerr <<
"---------- Join Slaves ----------";
696 for (
int i = 1; i < (numProcs); ++i){
698 std::cerr <<
"S" << localKeyMap.size() <<
" ";
701 buffer[i] << size_t(localKeyMap.size());
702 for (
auto it = localKeyMap.begin(); it != localKeyMap.end(); it++){
703 buffer[i] << it->first << it->second;
706 for (
int j = 1; j < (numProcs); ++j){
708 MPI_Isend(buffer[i].data(), buffer[i].size(), MPI_BYTE, j, MSG_DISTKEYMAP, MPI_COMM_WORLD, &req[reqIndex++]);
710 MPI_Waitall( numProcs - 2, req, status);
716 MPI_Probe(i, MSG_DISTKEYMAP, MPI_COMM_WORLD, &stat);
717 MPI_Get_count(&stat, MPI_BYTE, &rsize);
718 bufferRecv[i].grow(rsize);
720 bufferRecv[i].reset();
722 MPI_Recv(bufferRecv[i].data(), bufferRecv[i].free(), MPI_BYTE, i, MSG_DISTKEYMAP, MPI_COMM_WORLD, &stat);
724 bufferRecv[i] >> numItems;
725 std::cerr <<
"R:" << numItems <<
" ";
727 for (
size_t j = 0; j < (numItems); ++j){
728 bufferRecv[i] >> p.first >> p.second;
729 keyMap.insert(std::move(p));
734 keyMap.insert(localKeyMap.begin(), localKeyMap.end());
735 std::cerr <<
" Final Size: " << keyMap.size() <<
"\n";
738 template <
typename K>
739 void faster::fastComm::sendCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
741 buffer[0] << tid << size_t(keyMap.size());
743 for (
auto it = keyMap.begin(); it != keyMap.end(); it++){
744 buffer[0] << it->first << it->second;
747 buffer[0] << int(flags.size());
748 for (
size_t i = 0; i < flags.size(); ++i){
749 buffer[0] << char(flags[i]);
752 for (
int i = 1; i < (numProcs); ++i)
753 MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_KEYMAP, MPI_COMM_WORLD, &req[i-1]);
754 MPI_Waitall( numProcs - 1, req, status);
759 template <
typename K>
760 void faster::fastComm::recvCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags){
765 MPI_Probe(0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
766 MPI_Get_count(&stat, MPI_BYTE, &rsize);
767 bufferRecv[0].grow(rsize);
769 bufferRecv[0].reset();
771 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_KEYMAP, MPI_COMM_WORLD, &stat);
774 bufferRecv[0] >> tid >> size;
777 keyMap.reserve(size);
779 for (
size_t i = 0; i < size; ++i){
782 bufferRecv[0] >> key >> count;
787 bufferRecv[0] >> numFlags;
788 flags.resize(numFlags);
790 for (
int i = 0; i < numFlags; i++ ){
792 bufferRecv[0] >> flag;
793 flags[i] = bool(flag);
unsigned int fddType
Dataset type.