7 bool faster::fastComm::isDriver(){
14 MPI_Finalized(&mpi_finalized);
22 faster::fastComm::fastComm(
int & argc,
char ** argv){
25 MPI_Initialized(&mpi_initialized);
26 if (!mpi_initialized){
27 std::atexit(exitCallback);
32 MPI_Comm_size (MPI_COMM_WORLD, &numProcs);
33 MPI_Comm_rank (MPI_COMM_WORLD, &procId);
34 std::vector<int> slaveIDs(numProcs-1);
35 iota(slaveIDs.begin(), slaveIDs.end(), 1);
38 MPI_Comm_group(MPI_COMM_WORLD, &origGroup);
39 MPI_Group_incl(origGroup, numProcs-1, slaveIDs.data(), &slaveGroup);
40 MPI_Comm_create(MPI_COMM_WORLD, slaveGroup, &slaveComm);
42 timeStart = MPI_Wtime();
44 status =
new MPI_Status [numProcs];
45 req =
new MPI_Request [numProcs];
46 req2 =
new MPI_Request [numProcs];
47 req3 =
new MPI_Request [numProcs];
48 req4 =
new MPI_Request [numProcs];
49 buffer =
new fastCommBuffer [numProcs];
50 bufferRecv =
new fastCommBuffer [std::max(3, numProcs)];
51 bufferBusy.resize(numProcs);
54 faster::fastComm::~fastComm(){
55 timeEnd = MPI_Wtime();
67 void faster::fastComm::probeMsgs(
int & tag,
int & src){
69 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &stat);
71 src = stat.MPI_SOURCE;
74 void faster::fastComm::waitForReq(
int numReqs){
75 MPI_Waitall(numReqs, req, status);
78 void faster::fastComm::joinAll(){
79 MPI_Barrier(MPI_COMM_WORLD);
82 void faster::fastComm::joinSlaves(){
83 MPI_Barrier(slaveComm);
86 void faster::fastComm::sendTask(fastTask &task){
88 buffer[0].grow(48 + 8*task.globals.size() );
90 buffer[0] << task.id << task.operationType << task.srcFDD << task.destFDD << task.functionId;
91 buffer[0] << size_t(task.globals.size());
93 for (
size_t i = 0; i < task.globals.size(); i++){
94 size_t s = std::get<1>(task.globals[i]);
95 int type = std::get<2>(task.globals[i]);
99 buffer[0].write(std::get<0>(task.globals[i]), s);
102 for (
int i = 1; i < numProcs; ++i){
103 MPI_Isend(buffer[0].data(), buffer[0].size(), MPI_BYTE, i, MSG_TASK, MPI_COMM_WORLD, &req[i-1]);
106 MPI_Waitall( numProcs - 1, req, status);
109 void faster::fastComm::recvTask(fastTask & task){
111 size_t numGlobals = 0;
113 bufferRecv[0].reset();
115 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_TASK, MPI_COMM_WORLD, &stat);
117 bufferRecv[0] >> task.id >> task.operationType >> task.srcFDD >> task.destFDD >> task.functionId;
119 bufferRecv[0] >> numGlobals;
120 for (
size_t i = 0; i < numGlobals; i++){
125 bufferRecv[0] >> varType;
126 bufferRecv[0] >> varSize;
130 var =
new char[varSize]();
131 bufferRecv[0].read(var, varSize);
133 task.globals.insert(task.globals.end(), std::make_tuple(var, varSize, varType));
138 void faster::fastComm::sendTaskResult(){
141 MPI_Send(resultBuffer.data(), resultBuffer.size() , MPI_BYTE, 0, MSG_TASKRESULT, MPI_COMM_WORLD);
144 void * faster::fastComm::recvTaskResult(
unsigned long int &
id,
unsigned long int & sid,
size_t & size,
size_t & time, procstat & stat){
147 MPI_Probe(MPI_ANY_SOURCE, MSG_TASKRESULT, MPI_COMM_WORLD, status);
148 MPI_Get_count(status, MPI_BYTE, &rsize);
149 sid = status->MPI_SOURCE;
152 bufferRecv[sid].grow(rsize);
153 bufferRecv[sid].reset();
155 MPI_Irecv(bufferRecv[sid].data(), bufferRecv[sid].free(), MPI_BYTE, sid, MSG_TASKRESULT, MPI_COMM_WORLD, &req[sid-1]);
157 bufferRecv[sid] >>
id >> time >> stat >> size;
159 return bufferRecv[sid].pos();
162 void faster::fastComm::sendCreateFDD(
unsigned long int id,
fddType type,
size_t size,
int dest){
165 buffer[dest].reset();
168 buffer[dest] <<
id << type << size;
172 MPI_Isend(buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_CREATEFDD, MPI_COMM_WORLD, &req[dest-1]);
175 void faster::fastComm::recvCreateFDD(
unsigned long int &
id,
fddType &type,
size_t &size){
179 bufferRecv[0].reset();
181 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_CREATEFDD, MPI_COMM_WORLD, &stat);
185 bufferRecv[0] >>
id >> type >> size;
190 void faster::fastComm::sendCreateIFDD(
unsigned long int id,
fddType kType,
fddType tType,
size_t size,
int dest){
193 buffer[dest].reset();
194 buffer[dest].grow(32);
197 buffer[dest] <<
id << kType << tType << size;
201 MPI_Isend(buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_CREATEIFDD, MPI_COMM_WORLD, &req[dest-1]);
204 void faster::fastComm::recvCreateIFDD(
unsigned long int &
id,
fddType &kType,
fddType &tType,
size_t &size){
208 bufferRecv[0].reset();
209 bufferRecv[0].grow(32);
211 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_CREATEIFDD, MPI_COMM_WORLD, &stat);
215 bufferRecv[0] >>
id >> kType >> tType >> size;
220 void faster::fastComm::sendCreateFDDGroup(
unsigned long int id,
fddType keyType, std::vector<unsigned long int> & idV){
221 for (
int dest = 1; dest < numProcs; ++dest){
222 buffer[dest].reset();
224 buffer[dest] <<
id << keyType << size_t(idV.size());
226 for (
size_t i = 0; i < idV.size(); ++i){
227 buffer[dest] << idV[i];
230 MPI_Isend(buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_CREATEGFDD, MPI_COMM_WORLD, &req[dest-1]);
232 MPI_Waitall( numProcs - 1, req, status);
234 void faster::fastComm::recvCreateFDDGroup(
unsigned long int &
id,
fddType & keyType, std::vector<unsigned long int> & idV){
237 bufferRecv[0].reset();
239 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_CREATEGFDD, MPI_COMM_WORLD, &stat);
241 bufferRecv[0] >>
id >> keyType >> numMembers;
243 idV.resize(numMembers);
245 for (
size_t i = 0; i < numMembers; ++i){
246 bufferRecv[0] >> idV[i];
250 void faster::fastComm::sendDiscardFDD(
unsigned long int id){
251 for (
int i = 1; i < numProcs; ++i){
252 MPI_Isend( &
id,
sizeof(
long unsigned int), MPI_BYTE, i, MSG_DISCARDFDD, MPI_COMM_WORLD, &req[i-1]);
257 void faster::fastComm::recvDiscardFDD(
unsigned long int &
id){
259 MPI_Recv(&
id,
sizeof(
long unsigned int), MPI_BYTE, 0, MSG_DISCARDFDD, MPI_COMM_WORLD, &stat);
262 size_t faster::fastComm::getSize( std::string * data,
size_t * ds UNUSED,
size_t s ){
263 size_t rawDataSize = 0;
264 for(
size_t i = 0; i < s; ++i ){
265 rawDataSize += (
sizeof(size_t) + data[i].size());
271 void faster::fastComm::sendDataUltraPlus(
int dest, std::string * data,
size_t * lineSizes UNUSED,
size_t size,
int tag, fastCommBuffer & b UNUSED, MPI_Request * request){
273 b.grow(getSize(data, lineSizes, size));
275 for (
size_t i = 0; i < size; ++i){
278 MPI_Isend( b.data(), b.size(), MPI_BYTE, dest, tag, MPI_COMM_WORLD, request);
281 void faster::fastComm::recvDataUltraPlus(
int src,
void *& data,
int & size,
int tag, fastCommBuffer & b){
282 MPI_Status stat, stat2;
283 MPI_Probe(src, tag, MPI_COMM_WORLD, &stat);
284 MPI_Get_count(&stat, MPI_BYTE, &size);
287 MPI_Recv(b.data(), b.free(), MPI_BYTE, src, tag, MPI_COMM_WORLD, &stat2);
292 void faster::fastComm::recvDataUltra(
unsigned long int &
id,
int src,
void *& keys,
void *& data,
size_t *& lineSizes,
size_t &size,
int tagID,
int tagDataSize,
int tagKeys,
int tagData){
296 bufferRecv[0].reset();
297 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, src, tagID, MPI_COMM_WORLD, status);
298 bufferRecv[0] >>
id >> size;
303 recvDataUltraPlus(src, (
void*&) lineSizes, rSize, tagDataSize, bufferRecv[2]);
308 recvDataUltraPlus(src, keys, rSize, tagKeys, bufferRecv[1]);
312 recvDataUltraPlus(src, data, rSize, tagData, bufferRecv[0]);
317 void faster::fastComm::recvFDDSetData(
unsigned long int &
id,
void *& data,
size_t &size){
318 void * NULLRef = NULL;
320 recvDataUltra(
id, MPI_ANY_SOURCE, NULLRef, data, (
size_t*&) NULLRef, size, MSG_FDDSETDATAID, 0, 0, MSG_FDDSETDATA);
322 void faster::fastComm::recvFDDSetData(
unsigned long int &
id,
void *& data,
size_t *& lineSizes,
size_t &size){
324 void * NULLRef = NULL;
325 recvDataUltra(
id, 0, NULLRef, data, lineSizes, size, MSG_FDDSET2DDATAID, MSG_FDDSET2DDATASIZES, 0, MSG_FDDSET2DDATA);
328 void faster::fastComm::recvFDDData(
unsigned long int &
id,
void * data,
size_t &size){
329 void * NULLRef = NULL;
331 recvDataUltra(
id, 0, NULLRef, data, (
size_t*&) NULLRef, size, MSG_FDDDATAID, 0, 0, MSG_FDDDATA);
333 void faster::fastComm::recvIFDDData(
unsigned long int &
id,
void * keys,
void * data,
size_t &size){
334 size_t * NULLRef = NULL;
336 recvDataUltra(
id, 0, keys, data, NULLRef, size, MSG_IFDDDATAID, 0, MSG_IFDDDATAKEYS, MSG_IFDDDATA);
341 void faster::fastComm::sendReadFDDFile(
unsigned long int id, std::string filename,
size_t size,
size_t offset,
int dest){
343 buffer[dest].reset();
345 buffer[dest] <<
id << size << offset << filename;
347 MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_READFDDFILE, MPI_COMM_WORLD, &req[dest-1]);
350 void faster::fastComm::recvReadFDDFile(
unsigned long int &
id, std::string & filename,
size_t &size,
size_t & offset){
355 bufferRecv[0].reset();
356 MPI_Probe(0, MSG_READFDDFILE, MPI_COMM_WORLD, &stat);
357 MPI_Get_count(&stat, MPI_BYTE, &msgSize);
358 bufferRecv[0].grow(msgSize);
360 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_READFDDFILE, MPI_COMM_WORLD, &stat);
363 bufferRecv[0] >>
id >> size >> offset >> filename;
367 void faster::fastComm::sendWriteFDDFile(
unsigned long int id,std::string & path, std::string & sufix){
369 for (
int dest = 1; dest < numProcs; ++dest){
370 buffer[dest].reset();
372 buffer[dest] <<
id << path << sufix;
374 MPI_Isend( buffer[dest].data(), buffer[dest].size(), MPI_BYTE, dest, MSG_WRITEFDDFILE, MPI_COMM_WORLD, &req[dest-1]);
376 MPI_Waitall( numProcs - 1, req, status);
379 void faster::fastComm::recvWriteFDDFile(
unsigned long int &
id,std::string & path, std::string & sufix){
384 bufferRecv[0].reset();
385 MPI_Probe(0, MSG_WRITEFDDFILE, MPI_COMM_WORLD, &stat);
386 MPI_Get_count(&stat, MPI_BYTE, &msgSize);
387 bufferRecv[0].grow(msgSize);
389 MPI_Recv(bufferRecv[0].data(), bufferRecv[0].free(), MPI_BYTE, 0, MSG_WRITEFDDFILE, MPI_COMM_WORLD, &stat);
392 bufferRecv[0] >>
id >> path >> sufix;
398 void faster::fastComm::sendFDDInfo(
size_t size){
399 MPI_Send( &size,
sizeof(
size_t), MPI_BYTE, 0, MSG_FDDINFO, MPI_COMM_WORLD);
402 void faster::fastComm::recvFDDInfo(
size_t &size,
int & src){
403 MPI_Recv(&size,
sizeof(
size_t), MPI_BYTE, MPI_ANY_SOURCE, MSG_FDDINFO, MPI_COMM_WORLD, status);
404 src = status[0].MPI_SOURCE;
407 void faster::fastComm::sendFileName(std::string path){
412 for (
int dest = 1; dest < numProcs; ++dest){
424 void faster::fastComm::recvFileName(std::string & path){
428 bufferRecv[0].reset();
429 MPI_Probe(0, MSG_FILENAME, MPI_COMM_WORLD, &stat);
430 MPI_Get_count(&stat, MPI_BYTE, &msgSize);
431 bufferRecv[0].grow(msgSize);
434 bufferRecv[0].data(),
435 bufferRecv[0].free(),
443 bufferRecv[0] >> path;
446 bool faster::fastComm::isSendBufferFree(
int i){
448 int savepoint = i - 1;
456 MPI_Test(&req[savepoint], &flag, MPI_STATUS_IGNORE);
458 bufferBusy[i] =
false;
461 bufferBusy[i] =
true;
468 void faster::fastComm::sendGroupByKeyData(
int i){
469 int savepoint = i - 1;
481 bufferBusy[i] =
true;
484 void * faster::fastComm::recvGroupByKeyData(
int & size){
512 void faster::fastComm::sendCollect(
unsigned long int id){
513 for (
int i = 1; i < numProcs; ++i){
516 sizeof(
long unsigned int),
525 MPI_Waitall( numProcs - 1, req, status);
528 void faster::fastComm::recvCollect(
unsigned long int &
id){
530 MPI_Recv(&
id,
sizeof(
long unsigned int), MPI_BYTE, 0, MSG_COLLECT, MPI_COMM_WORLD, &stat);
533 void faster::fastComm::sendFinish(){
534 for (
int i = 1; i < numProcs; ++i){
535 MPI_Isend( buffer[0].data(), 1, MPI_BYTE, i, MSG_FINISH, MPI_COMM_WORLD, &req[i-1]);
538 MPI_Waitall( numProcs - 1, req, status);
540 void faster::fastComm::recvFinish(){
542 MPI_Recv(bufferRecv[0].data(), 1, MPI_BYTE, 0, MSG_FINISH, MPI_COMM_WORLD, &stat);
545 void faster::fastComm::bcastBuffer(
int src,
int i){
547 MPI_Bcast(buffer[i].data(), buffer[i].size(), MPI_BYTE, src, MPI_COMM_WORLD);
unsigned int fddType
Dataset type.