11 #include "_workerIFdd.h" 15 #include "indexedFddStorageExtern.cpp" 19 template <
typename K,
typename T>
26 template <
typename K,
typename T>
33 template <
typename K,
typename T>
39 template <
typename K,
typename T>
44 template <
typename K,
typename T>
50 template <
typename K,
typename T>
52 return this->localData->getData()[address];
55 template <
typename K,
typename T>
57 return this->localData->getData();
60 template <
typename K,
typename T>
62 return this->localData->getKeys();
65 template <
typename K,
typename T>
67 return this->localData->getSize();
70 template <
typename K,
typename T>
75 template <
typename K,
typename T>
79 template <
typename K,
typename T>
81 localData->setSize(s);
85 template <
typename K,
typename T>
90 template <
typename K,
typename T>
96 template <
typename K,
typename T>
103 std::unordered_map<K, size_t> keyCount(fddSize);
104 for (
size_t i = 0; i < fddSize; i++){
105 keyCount[keys[i]] ++;
108 std::unordered_map<K, std::vector<T*>> keyLocations(fddSize);
109 for (
auto it = keyCount.cbegin(); it != keyCount.end(); ++it ){
110 keyLocations[it->first].reserve(it->second);
114 for (
size_t i = 0; i < fddSize; i++){
117 auto l = keyLocations.find(key);
118 if ( l != keyLocations.end() )
119 l->second.insert(l->second.end(), d);
130 if (this->uKeys.use_count() == 0){
131 this->uKeys = std::make_shared<std::vector<K>>();
132 this->uKeys->reserve( fddSize );
133 for (
auto it = keyLocations.begin(); it != keyLocations.end(); it++){
134 this->uKeys->insert(this->uKeys->end(), it->first);
138 std::vector< std::vector<T*> > keyLocationsV(this->uKeys->size());
140 for (
size_t i = 0; i < this->uKeys->size(); i++ ){
141 K & key = (*this->uKeys)[i];
142 auto l = keyLocations.find(key);
144 if ( l == keyLocations.end() ){
145 keyLocationsV[i] = {};
147 keyLocationsV[i] = std::move(l->second);
154 return keyLocationsV;
160 template <
typename K,
typename T>
163 K * keys = localData->getKeys();
164 size_t size = localData->getSize();
169 std::unordered_map<K, size_t> count;
171 for (
size_t i = 0; i < size; ++i){
172 auto it = count.find(keys[i]);
173 if (it == count.end())
178 buffer << size_t(count.size());
180 for (
auto it = count.begin(); it != count.end(); it++){
182 buffer << it->first << it->second;
194 template <
typename K,
typename T>
196 bool tryShrink =
false;
200 for (
int i = 1; i < (comm->getNumProcs()); ++i){
201 if (i == comm->getProcId())
206 buffer[i].writePos(dataSize[i], 0);
208 comm->sendGroupByKeyData(i);
221 template <
typename K,
typename T>
226 std::vector<bool> & deleted,
227 std::deque< std::pair<K,T> > & recvData,
231 K * keys = localData->getKeys();
232 T * data = localData->getData();
237 char msgContinued = 0;
239 if (peersFinished >= (comm->getNumProcs() - 2)){
244 void * rData = comm->recvGroupByKeyData(rSize);
248 rb.setBuffer(rData, rSize);
262 for (
size_t i = 0; i < numItems; ++i){
264 while ( ( pos < deleted.size() ) && ( ! deleted[pos] ) )
270 rb >> keys[pos] >> data[pos];
272 deleted[pos] =
false;
277 rb >> p.first >> p.second;
279 recvData.push_back(std::move(p));
284 if (msgContinued == 0){
293 template <
typename K,
typename T>
295 K * keys = localData->getKeys();
296 T * data = localData->getData();
297 size_t size = localData->getSize();
301 if ( size == deleted.size() ){
313 keys[i] = std::move(keys[j]);
314 data[i] = std::move(data[j]);
325 if (i >= deleted.size())
break;
327 while ( deleted[j] ){
335 if ( (j + 1) < localData->getSize() ){
337 localData->setSize(j + 1);
345 template <
typename K,
typename T>
347 std::vector<bool> & deleted,
348 std::deque< std::pair<K,T> > & recvData,
350 K * keys = localData->getKeys();
351 T * data = localData->getData();
352 size_t size = localData->getSize();
354 if ( recvData.size() > 0 ){
357 while ( (pos < size) && (recvData.size() > 0) ) {
360 auto & item = recvData.front();
362 keys[pos] = std::move(item.first);
363 data[pos] = std::move(item.second);
364 recvData.pop_front();
365 deleted[pos] =
false;
370 if ( recvData.size() > 0 ){
372 localData->setSize(localData->getSize() + recvData.size());
373 data = localData->getData();
374 keys = localData->getKeys();
377 for (
auto it = recvData.begin(); it != recvData.end(); it++){
379 keys[pos] = std::move(it->first);
380 data[pos] = std::move(it->second);
388 template <
typename K,
typename T>
474 template <
typename K,
typename T>
480 std::vector<size_t> & dataSize
486 if ( dataSize[owner] == 0 ){
487 buffer[owner].reset();
488 buffer[owner].advance(
sizeof(
size_t));
492 buffer[owner] << key << data;
496 if ( buffer[owner].size() >= comm->maxMsgSize ){
499 buffer[owner] << char(1);
500 buffer[owner].writePos(dataSize[owner], 0);
503 comm->sendGroupByKeyData(owner);
512 template <
typename K,
typename T>
515 std::vector<size_t> & dataSize
523 for (
int owner = 1; owner < comm->getNumProcs(); owner++){
524 if ( owner == comm->getProcId() ) {
529 if ( dataSize[owner] == 0 ){
530 buffer[owner].reset();
531 buffer[owner].advance(
sizeof(
size_t));
533 buffer[owner] << char(0);
534 buffer[owner].writePos(dataSize[owner], 0);
542 comm->sendGroupByKeyData(owner);
548 template <
typename K,
typename T>
551 std::vector< std::deque< std::pair<K,T> > > & pendingSend,
552 std::vector<size_t> & dataSize
554 bool finished =
true;
556 for (
int owner = 1; owner < comm->getNumProcs(); owner++){
557 if (pendingSend[owner].size() == 0) {
560 if ( comm->isSendBufferFree(owner) ){
562 while (pendingSend[owner].size() > 0){
563 std::pair<K,T> & p = pendingSend[owner].front();
564 if ( EDBKsendDataAsync(comm, owner, p.first, p.second, dataSize) ){
567 pendingSend[owner].pop_front();
577 template <
typename K,
typename T>
581 std::vector<bool> & deleted,
582 std::vector<size_t> & dataSize,
583 std::deque< std::pair<K,T> > & recvData,
584 std::vector< std::deque< std::pair<K,T> > > & pendingSend,
587 K * keys = localData->getKeys();
588 T * data = localData->getData();
589 size_t size = localData->getSize();
591 bool release =
false;
593 sendPending(comm, pendingSend, dataSize);
598 int owner = 1 + hash.get(key);
602 if (owner == comm->getProcId()){
609 if ( ! comm->isSendBufferFree(owner) ){
611 pendingSend[owner].push_back( std::make_pair( key, std::move(data[pos]) ) );
616 release = EDBKsendDataAsync(comm, owner, key, data[pos], dataSize);
621 if (recvData.size() > 0){
623 keys[pos] = std::move(recvData.front().first);
624 data[pos] = std::move(recvData.front().second);
625 recvData.pop_front();
638 if ( ! sendPending(comm, pendingSend, dataSize) ){
643 for (
int owner = 1; owner < comm->getNumProcs(); owner++){
644 if ( owner == comm->getProcId() ) {
647 if ( ! comm->isSendBufferFree(owner) ){
653 flushDataSend(comm, dataSize);
658 template <
typename K,
typename T>
666 size_t size = localData->getSize();
668 std::vector<bool> deleted(size,
false);
669 std::deque< std::pair<K,T> > recvData;
670 std::vector<size_t> dataSize(comm->getNumProcs(), 0);
671 std::vector< std::deque< std::pair<K,T> > > pendingSend(comm->getNumProcs());
675 bool sendFinished =
false;
676 bool recvFinished =
false;
677 int peersFinished = 0;
686 while ( ! (recvFinished & sendFinished) ){
688 if ( ! sendFinished )
689 sendFinished |= EDBKSendDataHashed(comm, sendPos, deleted, dataSize, recvData, pendingSend, dirty);
690 if ( ! recvFinished )
691 recvFinished |= EDBKRecvData(comm, recvPos, sendPos, deleted, recvData, peersFinished, dirty);
699 size = localData->getSize();
708 EDBKFinishDataInsert(deleted, recvData, recvPos);
718 EDBKShrinkData(deleted, recvPos);
725 if (keyLocations.size() > 0){
727 keyLocations.clear();
748 size = localData->getSize();
764 template <
typename K,
typename T>
767 if (keyMap.use_count() > 0)
768 exchangeDataByKeyMapped(comm);
770 exchangeDataByKeyHashed(comm);
774 template <
typename K,
typename T>
776 uKeys = std::make_shared<std::vector<K>>();
777 uKeys->reserve( keyMap->size() / std::max(1, numProcs - 2) );
779 for(
auto it = keyMap->begin(); it != keyMap->end(); it++ ){
780 if ( it->second ==
id ){
782 uKeys->insert(uKeys->end(), it->first);
789 template <
typename K,
typename T>
791 K * keys = localData->getKeys();
792 size_t size = localData->getSize();
793 std::unordered_map<K, bool> h(size / std::max(1, numProcs - 2));
799 for (
size_t i = 0; i < size; ++i){
800 auto loc = h.find(keys[i]);
803 h.insert(h.end(), std::make_pair(keys[i],
true));
806 uKeys = std::make_shared<std::vector<K>>(h.size());
808 for (
auto it = h.begin(); it != h.end(); it++ ){
809 (*uKeys)[i++] = it->first;
814 template <
typename K,
typename T>
817 unsigned long tid = 0;
828 keyMap = std::make_shared<std::unordered_map<K, int>>();
829 comm->recvKeyMap(tid, *keyMap);
840 findMyKeys(comm->getNumProcs(), comm->getProcId());
844 exchangeDataByKeyMapped(comm);
848 resultBuffer << size_t(localData->getSize());
852 template <
typename K,
typename T>
857 bool dirty = exchangeDataByKeyHashed(comm);
861 findMyKeysByHash(comm->getNumProcs());
865 resultBuffer << size_t(localData->getSize());
868 inline void parseData(std::string & item, std::string & d){
871 inline void parseData(std::string & item,
double & d){
872 d = std::atof(item.c_str());
874 inline void parseData(std::string & item,
float & d){
875 d = std::atof(item.c_str());
877 inline void parseData(std::string & item,
long int & d){
878 d = std::atol(item.c_str());
880 inline void parseData(std::string & item,
int & d){
881 d = std::atoi(item.c_str());
883 inline void parseData(std::string & item,
char & d){
887 template <
typename T>
888 inline void parseData(std::stringstream & ss, T & d){
890 while(std::getline(ss, item,
' ')){
898 template <
typename T>
899 inline void parseData(std::stringstream & ss, std::vector<T> & vec){
904 while(std::getline(ss, item,
' ')){
907 vec.insert(vec.end(), d);
912 template <
typename T>
913 bool waitForLastStage(std::deque<T> & q,
bool lastStageDone){
914 while(q.size() == 0){
923 bool onlineReadStage1Full(std::ifstream & inFile, std::deque<std::vector<std::string>> & q1, omp_lock_t & q1lock,
const int blocksize){
924 std::vector<std::string> lines(blocksize,
"");
926 for (
int i = 0; i < blocksize; i++){
928 std::getline( inFile, lines[i] );
934 omp_set_lock(&q1lock);
935 q1.push_back(std::move(lines));
936 omp_unset_lock(&q1lock);
938 if ( ! inFile.good() ){
939 std::cerr <<
"\033[0;31mF\033[0m";
943 while(q1.size() >= 100)
948 bool onlineReadStage1(std::ifstream & inFile, std::deque<std::vector<std::string>> & q1, omp_lock_t & q1lock,
size_t endOffset,
const int blocksize){
949 std::vector<std::string> lines(blocksize,
"");
951 for (
int i = 0; i < blocksize; i++){
952 if( inFile.good() && ( size_t(inFile.tellg()) <= endOffset ) ){
953 std::getline( inFile, lines[i] );
959 omp_set_lock(&q1lock);
960 q1.push_back(std::move(lines));
961 omp_unset_lock(&q1lock);
963 if ( (! inFile.good()) || (
size_t(inFile.tellg()) > endOffset ) )
966 while(q1.size() >= 100)
971 template <
typename K,
typename T>
972 bool onlineReadStage2(std::deque<std::vector<std::string>> & q1, omp_lock_t & q1lock, std::deque<std::vector<std::pair<K,T>>> & q2, omp_lock_t & q2lock){
973 std::vector<std::string> lines;
976 omp_set_lock(&q1lock);
977 lines = std::move(q1.front());
979 omp_unset_lock(&q1lock);
981 std::vector<std::pair<K,T>> items;
982 items.reserve(lines.size());
986 for (
size_t i = 0; i < lines.size(); i++){
987 if (lines[i].length() > 0 ){
989 std::stringstream ss(lines[i]);
990 parseData(ss, item.first);
991 parseData(ss, item.second);
992 items.insert(items.end(), item);
996 omp_set_lock(&q2lock);
997 q2.push_back(std::move(items));
998 omp_unset_lock(&q2lock);
1000 while(q2.size() >= 100)
1005 template <
typename K,
typename T>
1007 std::vector<std::pair<K,T>> items;
1009 omp_set_lock(&q2lock);
1010 items = std::move(q2.front());
1012 omp_unset_lock(&q2lock);
1014 for (
size_t i = 0; i < items.size(); i++){
1015 K & key = items[i].first;
1016 T & data = items[i].second;
1018 int myPart = ( (IonlineFullPartFuncP<K,T>) funcP ) ( key, data ) ;
1019 if ( myPart == comm->getProcId() ) {
1020 this->localData->insert(key, data);
1023 auto location = localKeyMap.find(key);
1024 if (location == localKeyMap.end()){
1025 uKeys->insert(uKeys->end(), key);
1026 localKeyMap.insert(std::make_pair(key, myPart));
1028 location->second = myPart;
1033 template <
typename K,
typename T>
1035 std::vector<std::pair<K,T>> items;
1037 omp_set_lock(&q2lock);
1038 items = std::move(q2.front());
1040 omp_unset_lock(&q2lock);
1042 for (
size_t i = 0; i < items.size(); i++){
1043 K & key = items[i].first;
1044 T & data = items[i].second;
1046 this->insert(&key, &data, 1);
1051 template <
typename K,
typename T>
1053 std::string filename;
1054 keyMap = std::make_shared<std::unordered_map<K, int>>();
1055 uKeys = std::make_shared<std::vector<K>>();
1058 comm->recvFileName(filename);
1061 std::ifstream inFile(filename, std::ifstream::in);
1094 std::deque<std::vector<std::string>> q1;
1095 std::deque<std::vector<std::pair<K,T>>> q2;
1098 omp_init_lock(&q1lock);
1099 omp_init_lock(&q2lock);
1100 bool stage1Done =
false;
1101 bool stage2Done =
false;
1102 bool stage3Done =
false;
1103 const int blocksize = 1000;
1104 #pragma omp parallel 1106 #pragma omp sections 1110 while( ! stage1Done ){
1111 std::cerr <<
"\033[1;32mR";
1112 stage1Done = onlineReadStage1Full(inFile, q1, q1lock, blocksize);
1117 while( ! stage2Done ){
1118 std::cerr <<
"\033[1;33mP";
1120 stage2Done = waitForLastStage(q1, stage1Done);
1121 if ( stage2Done )
break;
1123 stage2Done = onlineReadStage2(q1, q1lock, q2, q2lock);
1129 while( ! stage3Done ){
1130 std::cerr <<
"\033[1;34mI";
1132 stage3Done = waitForLastStage(q2, stage2Done);
1133 if ( stage3Done )
break;
1135 stage2Done = onlinePartReadStage3(*this->keyMap, comm, funcP, q2, q2lock);
1139 std::cerr <<
" \033[mDONE\n";
1140 omp_destroy_lock(&q1lock);
1141 omp_destroy_lock(&q2lock);
1144 void findFileOffset(
fastComm *comm, std::ifstream & inFile,
size_t & endOffset, std::deque<std::vector<std::string>> & q1){
1145 size_t offset, size;
1146 std::vector<std::string> line(1);
1148 inFile.seekg(0, std::ifstream::end);
1149 size = inFile.tellg() / (comm->getNumProcs() - 1);
1150 offset = (comm->getProcId() - 1) * size;
1151 endOffset = offset + size;
1153 inFile.seekg(offset, inFile.beg);
1156 char c = inFile.get();
1157 std::getline( inFile, line[0] );
1159 q1.push_back(std::move(line));
1165 template <
typename K,
typename T>
1167 using std::chrono::system_clock;
1168 using std::chrono::duration_cast;
1169 using std::chrono::milliseconds;
1171 auto start = system_clock::now();
1172 std::string filename;
1173 std::deque<std::vector<std::string>> q1;
1174 std::deque<std::vector<std::pair<K,T>>> q2;
1177 omp_init_lock(&q1lock);
1178 omp_init_lock(&q2lock);
1179 bool stage1Done =
false;
1180 bool stage2Done =
false;
1181 bool stage3Done =
false;
1182 const int blocksize = 200;
1186 keyMap = std::make_shared<std::unordered_map<K, int>>();
1187 std::unordered_map<K, int> localKeyMap;
1188 uKeys = std::make_shared<std::vector<K>>();
1191 comm->recvFileName(filename);
1194 std::ifstream inFile(filename, std::ifstream::in);
1197 findFileOffset(comm, inFile, endOffset, q1);
1199 #pragma omp parallel 1201 #pragma omp sections 1205 while( ! stage1Done ){
1206 stage1Done = onlineReadStage1(inFile, q1, q1lock, endOffset, blocksize);
1211 while( ! stage2Done ){
1213 stage2Done = waitForLastStage(q1, stage1Done);
1214 if ( stage2Done )
break;
1216 stage2Done = onlineReadStage2(q1, q1lock, q2, q2lock);
1221 while( ! stage3Done ){
1223 stage3Done = waitForLastStage(q2, stage2Done);
1224 if ( stage3Done )
break;
1226 stage3Done = onlinePartReadStage3(localKeyMap, comm, funcP, q2, q2lock);
1230 omp_destroy_lock(&q1lock);
1231 omp_destroy_lock(&q2lock);
1233 std::cerr <<
" Read:" << duration_cast<milliseconds>(system_clock::now() - start).count() <<
"\n";
1234 start = system_clock::now();
1236 comm->distributeKeyMap(localKeyMap, *keyMap);
1237 std::cerr <<
" DistKeys:" << duration_cast<milliseconds>(system_clock::now() - start).count() <<
"\n";
1240 template <
typename K,
typename T>
1242 using std::chrono::system_clock;
1243 using std::chrono::duration_cast;
1244 using std::chrono::milliseconds;
1246 auto start = system_clock::now();
1247 std::string filename;
1248 std::deque<std::vector<std::string>> q1;
1249 std::deque<std::vector<std::pair<K,T>>> q2;
1252 omp_init_lock(&q1lock);
1253 omp_init_lock(&q2lock);
1254 bool stage1Done =
false;
1255 bool stage2Done =
false;
1256 bool stage3Done =
false;
1257 const int blocksize = 4;
1261 keyMap = std::make_shared<std::unordered_map<K, int>>();
1262 uKeys = std::make_shared<std::vector<K>>();
1265 comm->recvFileName(filename);
1268 std::ifstream inFile(filename, std::ifstream::in);
1271 findFileOffset(comm, inFile, endOffset, q1);
1273 #pragma omp parallel 1275 #pragma omp sections 1279 while( ! stage1Done ){
1280 std::cerr <<
"\033[1;32mR";
1282 stage1Done = onlineReadStage1(inFile, q1, q1lock, endOffset, blocksize);
1287 while( ! stage2Done ){
1288 std::cerr <<
"\033[1;33mP";
1290 stage2Done = waitForLastStage(q1, stage1Done);
1291 if ( stage2Done )
break;
1293 stage2Done = onlineReadStage2(q1, q1lock, q2, q2lock);
1298 while( ! stage3Done ){
1299 std::cerr <<
"\033[1;34mI";
1301 stage3Done = waitForLastStage(q2, stage2Done);
1302 if ( stage3Done )
break;
1304 stage3Done = onlineReadStage3(q2, q2lock);
1308 omp_destroy_lock(&q1lock);
1309 omp_destroy_lock(&q2lock);
1311 std::cerr <<
" Read:" << duration_cast<milliseconds>(system_clock::now() - start).count() <<
"\n";
1314 template <
typename K,
typename T>
1315 void printData(std::ofstream & outFile, K * keys, std::vector<T> * data,
size_t s){
1316 outFile.precision(10);
1317 for (
size_t i = 0; i < s; i++){
1318 outFile << keys[i] <<
" ";
1319 for (
size_t j = 0; j < data[i].size(); j++){
1320 outFile << data[i][j] <<
" ";
1326 template <
typename K,
typename T>
1327 void printData(std::ofstream & outFile, K * keys, T * data,
size_t s){
1328 outFile.precision(10);
1329 for (
size_t i = 0; i < s; i++){
1330 outFile << keys[i] <<
" " << data[i] <<
"\n";
1335 template <
typename K>
1336 void printData(std::ofstream & outFile, K * keys,
double * data,
size_t s){
1337 outFile << std::fixed << std::setprecision(16);
1338 for (
size_t i = 0; i < s; i++){
1339 outFile << keys[i] <<
" " << data[i] <<
"\n";
1343 template <
typename K,
typename T>
1345 std::string path = * (std::string*) pathP;
1346 std::string sufix = * (std::string*) sufixP;
1348 K * keys = localData->getKeys();
1349 T * data = localData->getData();
1350 size_t s = localData->getSize();
1352 std::string filename(path + std::to_string(procId) + sufix);
1354 std::ofstream outFile(filename, std::ofstream::out);
1356 printData(outFile, keys, data, s);
1359 template <
typename K,
typename T>
1361 using std::chrono::system_clock;
1362 using std::chrono::duration_cast;
1363 using std::chrono::milliseconds;
1365 auto start = system_clock::now();
1378 durationP = buffer.size();
1379 buffer.advance(
sizeof(
size_t));
1381 rStatP = buffer.size();
1384 rSizeP = buffer.size();
1385 buffer.advance(
sizeof(
size_t));
1387 headerSize = buffer.size();
1389 if (op & (OP_GENERICMAP | OP_GENERICREDUCE | OP_GENERICUPDATE)){
1390 this->apply(func, op, dest, buffer);
1391 if (dest) buffer << size_t(dest->getSize());
1400 case OP_GroupByKeyH:
1401 groupByKeyHashed(comm);
1405 buffer << size_t(this->getSize());
1408 onlinePartRead(comm, func);
1409 buffer << size_t(this->getSize());
1411 case OP_OnFullPRead:
1413 buffer << size_t(this->getSize());
1418 auto end = system_clock::now();
1419 auto duration = duration_cast<milliseconds>(end - start);
1422 buffer.writePos(
size_t(duration.count()), durationP);
1423 buffer.writePos(getProcStat(), rStatP);
1424 buffer.writePos(
size_t(buffer.size() - headerSize), rSizeP);
1426 comm->sendTaskResult();
unsigned int fddOpType
Dataset operation type.
fdd< T > * onlineRead(std::string path, onlineFullPartFuncP< T > funcP)
Reads a file with online parsing and mapping (?)
unsigned int fddType
Dataset type.
int numProcs()
Return the number of processes running.
fdd< T > * onlineFullPartRead(std::string path, onlineFullPartFuncP< T > funcP)
Reads a file with online parsing and partition (NOT IMPLEMENTED)