libfaster API Documentation  Development Version
Super fast distributted computing
workerIFddCore.cpp
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <memory>
5 #include <string>
6 #include <iomanip>
7 #include <unistd.h>
8 #include <ctime>
9 #include <chrono>
10 
11 #include "_workerIFdd.h"
12 #include "fastComm.h"
13 #include "misc.h"
14 #include "hasher.h"
15 #include "indexedFddStorageExtern.cpp"
16 
17 using namespace faster;
18 
19 template <typename K, typename T>
20 faster::workerIFddCore<K,T>::workerIFddCore(unsigned int ident, fddType kt, fddType t) : workerFddBase(ident, t){
21  keyType = kt;
22  localData = new indexedFddStorage<K,T>();
23 }
24 
25 
26 template <typename K, typename T>
27 faster::workerIFddCore<K,T>::workerIFddCore(unsigned int ident, fddType kt, fddType t, size_t size) : workerFddBase(ident, t){
28  keyType = kt;
29  localData = new indexedFddStorage<K,T>(size);
30 }
31 
32 
33 template <typename K, typename T>
35  keyLocations.clear();
36  delete localData;
37 }
38 
39 template <typename K, typename T>
41  return type;
42 }
43 
44 template <typename K, typename T>
46  return keyType;
47 }
48 
49 
50 template <typename K, typename T>
51 T & faster::workerIFddCore<K,T>::operator[](size_t address){
52  return this->localData->getData()[address];
53 }
54 
55 template <typename K, typename T>
57  return this->localData->getData();
58 }
59 
60 template <typename K, typename T>
62  return this->localData->getKeys();
63 }
64 
65 template <typename K, typename T>
67  return this->localData->getSize();
68 }
69 
70 template <typename K, typename T>
72  return sizeof(T);
73 }
74 
75 template <typename K, typename T>
77  return sizeof(T);
78 }
79 template <typename K, typename T>
81  localData->setSize(s);
82 }
83 
84 
85 template <typename K, typename T>
87  delete (T*) item;
88 }
89 
90 template <typename K, typename T>
92  localData->shrink();
93 }
94 
95 
96 template <typename K, typename T>
97 std::vector< std::vector<T*> > faster::workerIFddCore<K,T>::findKeyInterval(K * keys, T * data, size_t fddSize){
98  //auto start = system_clock::now();
99 
100  //std::cerr << "IFDDDependent ";
101  //std::cerr << "FindKeyInterval\n";
102 
103  std::unordered_map<K, size_t> keyCount(fddSize);
104  for ( size_t i = 0; i < fddSize; i++){
105  keyCount[keys[i]] ++;
106  }
107 
108  std::unordered_map<K, std::vector<T*>> keyLocations(fddSize);
109  for ( auto it = keyCount.cbegin(); it != keyCount.end(); ++it ){
110  keyLocations[it->first].reserve(it->second);
111  }
112  keyCount.clear();
113 
114  for ( size_t i = 0; i < fddSize; i++){
115  K & key = keys[i];
116  T * d = &data[i];
117  auto l = keyLocations.find(key);
118  if ( l != keyLocations.end() )
119  l->second.insert(l->second.end(), d);
120  }
121  //auto t1 = duration_cast<milliseconds>(system_clock::now() - start2).count();
122  //start2 = system_clock::now();
123 
124  //auto t2 = duration_cast<milliseconds>(system_clock::now() - start2).count();
125  //start2 = system_clock::now();
126  //std::cerr << " T0:" << t0 << " T1:" << t1 << " T2:" << t2 << "\n";
127 
128  //exit(231);
129 
130  if (this->uKeys.use_count() == 0){
131  this->uKeys = std::make_shared<std::vector<K>>();
132  this->uKeys->reserve( fddSize );
133  for ( auto it = keyLocations.begin(); it != keyLocations.end(); it++){
134  this->uKeys->insert(this->uKeys->end(), it->first);
135  }
136  }
137 
138  std::vector< std::vector<T*> > keyLocationsV(this->uKeys->size());
139 
140  for ( size_t i = 0; i < this->uKeys->size(); i++ ){
141  K & key = (*this->uKeys)[i];
142  auto l = keyLocations.find(key);
143 
144  if ( l == keyLocations.end() ){
145  keyLocationsV[i] = {};
146  }else{
147  keyLocationsV[i] = std::move(l->second);
148  }
149  }
150  //std::cerr << "FindKeyIntervalDONE\n";
151 
152  //std::cerr << "\nT0:" << t0 << " T1:" << t1 << " T2:" << t2 << " TOT:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
153 
154  return keyLocationsV;
155 }
156 
157 
158 
159 
160 template <typename K, typename T>
162  //std::cerr << " CountByKey\n";
163  K * keys = localData->getKeys();
164  size_t size = localData->getSize();
165  fastCommBuffer &buffer = comm->getResultBuffer();
166 
167  // TODO PARALELIZE!
168  // This needs to be ordered to send in order
169  std::unordered_map<K, size_t> count;
170  // Count keys
171  for ( size_t i = 0; i < size; ++i){
172  auto it = count.find(keys[i]);
173  if (it == count.end())
174  count[keys[i]] = 1;
175  else
176  count[keys[i]] += 1;
177  }
178  buffer << size_t(count.size());
179  int sum = 0;
180  for ( auto it = count.begin(); it != count.end(); it++){
181  sum += it->second;
182  buffer << it->first << it->second;
183  }
184 
185 }
186 
187 //void p(double & v){
188  //fprintf(stderr, "%.2lf ", v);
189 //}
190 
191 //template <typename T>
192 //void p(T & v UNUSED){
193 //}
194 template <typename K, typename T>
195 bool faster::workerIFddCore<K,T>::EDBKSendData(fastComm *comm, std::vector<size_t> & dataSize){
196  bool tryShrink = false;
197  fastCommBuffer * buffer = comm->getSendBuffers();
198 
199  // Include the data size in the message header
200  for ( int i = 1; i < (comm->getNumProcs()); ++i){
201  if (i == comm->getProcId())
202  continue;
203  if (dataSize[i] > 0)
204  tryShrink = true;
205 
206  buffer[i].writePos(dataSize[i], 0);
207  //buffer[i].writePos(0, 1, sizeof(size_t));
208  comm->sendGroupByKeyData(i);
209  //std::cerr << "\033[0;31m"<< dataSize[i] << " \033[0m> " << i << " ";
210  }
211  //std::cerr << "]\n";
212 
213  //Ti[1] = duration_cast<milliseconds>(system_clock::now() - start).count();
214  //start = system_clock::now();
215 
216  return tryShrink;
217 
218 }
219 
220 // Checks for arriving data
221 template <typename K, typename T>
223  fastComm *comm,
224  size_t & pos,
225  size_t & posLimit,
226  std::vector<bool> & deleted,
227  std::deque< std::pair<K,T> > & recvData,
228  int & peersFinished,
229  bool & dirty
230  ){
231  K * keys = localData->getKeys();
232  T * data = localData->getData();
233 
234  int rSize = 0;
235  size_t numItems;
236  fastCommBuffer rb(0);
237  char msgContinued = 0;
238 
239  if (peersFinished >= (comm->getNumProcs() - 2)){
240  //std::cerr << "\033[0;31mRECV FINISHED \033[0m";
241  return true;
242  }
243 
244  void * rData = comm->recvGroupByKeyData(rSize);
245 
246  // Insert message into dataset or queue
247  if ( rSize > 0 ){
248  rb.setBuffer(rData, rSize);
249 
250  rb >> numItems;
251  if (numItems > 0){
252  //std::cerr << "R";
253  dirty = true;
254  }
255 
256 
257  //for (int i = 0; i < rSize; ++i){
258  //fprintf(stderr, "%02x", ((char*) rData)[i]);
259  //}
260  //std::cerr << "\033[0;32mRECV" << numItems << "(" << rSize << ")\033[0m ";
261 
262  for (size_t i = 0; i < numItems; ++i){
263  // Find a empty space in the local data
264  while ( ( pos < deleted.size() ) && ( ! deleted[pos] ) )
265  pos++;
266 
267  // Insert Recv Data
268  if (pos < posLimit){
269  // Insert inplace
270  rb >> keys[pos] >> data[pos];
271  //std::cerr << "\033[0;35m" << keys[pos] << "\033[0m ";
272  deleted[pos] = false;
273  pos++;
274  }else{
275  // Put in a recv list
276  std::pair<K,T> p;
277  rb >> p.first >> p.second;
278  //std::cerr << "\033[0;35m" << p.first << "\033[0m ";
279  recvData.push_back(std::move(p));
280  }
281  }
282  // Check for continuation
283  rb >> msgContinued;
284  if (msgContinued == 0){
285  peersFinished++;
286  //std::cerr << " \033[0;31m F:" << peersFinished << "\033[0m";
287  }
288  }
289  return false;
290 }
291 
292 // Get rid of blank spaces in fdd data storage
293 template <typename K, typename T>
294 void faster::workerIFddCore<K,T>::EDBKShrinkData(std::vector<bool> & deleted, size_t & pos ){
295  K * keys = localData->getKeys();
296  T * data = localData->getData();
297  size_t size = localData->getSize();
298 
299 
300  // If there are elements that are still sparse in the memory
301  if ( size == deleted.size() ){
302  // Resume where last procedure left off
303  // i searches for first empty space
304  size_t i = pos;
305  // j searches for last NON-empty space
306  size_t j = size - 1;
307  // Bring them forward and correct size
308  while(i < j){
309  if ( deleted[i] ){
310  // Found empty space
311  if ( !deleted[j] ){
312  //std::cerr << keys[j] << ">" << i << " ";
313  keys[i] = std::move(keys[j]);
314  data[i] = std::move(data[j]);
315  deleted[i] = false;
316  i++;
317  }
318  j--;
319  }else{
320  if ( deleted[j] ){
321  j--;
322  }
323  i++;
324  }
325  if (i >= deleted.size()) break;
326  }
327  while ( deleted[j] ){
328  if ( j == 0){
329  j--;
330  break;
331  }
332  j--;
333 
334  }
335  if ( (j + 1) < localData->getSize() ){
336  //std::cerr << " \033[0;31mSD SETSIZE:" << j+1 << "\033[0m\n";
337  localData->setSize(j + 1);
338  }
339 
340  }
341 
342 }
343 
344 // Insert the rest of the received data inplace
345 template <typename K, typename T>
347  std::vector<bool> & deleted,
348  std::deque< std::pair<K,T> > & recvData,
349  size_t & pos ){
350  K * keys = localData->getKeys();
351  T * data = localData->getData();
352  size_t size = localData->getSize();
353 
354  if ( recvData.size() > 0 ){
355  //std::cerr << "\nEDBK finish insert data (" << recvData.size() << ")\n";
356 
357  while ( (pos < size) && (recvData.size() > 0) ) {
358  if ( deleted[pos] ){
359  // Insert inplace
360  auto & item = recvData.front();
361  //std::cerr << " \033[0;36m(" << item.first << ">" << pos << ")\033[0m";
362  keys[pos] = std::move(item.first);
363  data[pos] = std::move(item.second);
364  recvData.pop_front();
365  deleted[pos] = false;
366  }
367  pos++;
368  }
369 
370  if ( recvData.size() > 0 ){
371  //std::cerr << " \033[0;31mFI SETSIZE:" << localData->getSize() + recvData.size() << "\033[0m\n";
372  localData->setSize(localData->getSize() + recvData.size());
373  data = localData->getData();
374  keys = localData->getKeys();
375 
376  // Insert the rest at the end
377  for (auto it = recvData.begin(); it != recvData.end(); it++){
378  //std::cerr << " \033[0;36m(" << it->first << ">" << pos << ")\033[0m";
379  keys[pos] = std::move(it->first);
380  data[pos] = std::move(it->second);
381  pos++;
382  }
383  }
384  }
385  recvData.clear();
386 }
387 
388 template <typename K, typename T>
390  /*
391  //using std::chrono::system_clock;
392  //using std::chrono::duration_cast;
393  //using std::chrono::milliseconds;
394 
395  //int Ti[5];
396  //auto start = system_clock::now();
397  //std::cerr << " \033[0;33mExchange Data By Key\033[0m\n";
398  //K * keys = localData->getKeys();
399  //T * data = localData->getData();
400  size_t size = localData->getSize();
401  //fastCommBuffer * buffer = comm->getSendBuffers();
402  std::vector<size_t> dataSize(comm->getNumProcs(), 0);
403  std::vector<bool> deleted(size, false);
404  size_t pos;
405  bool dirty = false;
406  bool tryShrink = false;
407 
408 
409 
410  //Ti[0] = duration_cast<milliseconds>(system_clock::now() - start).count();
411  //start = system_clock::now();
412 
413  //std::cerr << comm->getProcId() << " Write Buffers";
414  // Reserve space in the message for the header
415  / *for ( int i = 1; i < (comm->getNumProcs()); ++i){
416  if (i == comm->getProcId())
417  continue;
418  buffer[i].reset();
419  buffer[i].advance(sizeof(size_t));
420  //buffer[i].advance(1);
421  }
422  //std::cerr << "\n";
423 
424  //std::cerr << comm->getProcId() << " [ ";
425  // Insert Data that dont belong to me in the message
426  for ( size_t i = 0; i < size; ++i){
427  K & key = keys[i];
428  int owner = (*keyMap)[key];
429 
430  //if (owner != comm->getProcId())
431  //std::cerr << "\033[0;31m";
432  //std::cerr << " " << key << "\033[0m";
433 
434  if (owner == comm->getProcId())
435  continue;
436  buffer[owner] << key << data[i];
437  dataSize[owner]++;
438  deleted[i] = true;
439  //p(data[i]);
440  //std::cerr << ":" << i;
441  //std::cerr << i << ":" << key << ">" << owner << " ";
442  //std::cerr << i << ":" << (size_t) data[i] << " ";
443  }// * /
444 
445  //std::cerr << " - ";
446 
447 
448  //omm->joinSlaves();
449 
450  // Send Data
451  //ryShrink = EDBKSendData(comm, dataSize);
452 
453  // Recv Data
454  //dirty = EDBKRecvData(comm, deleted, pos, tryShrink);
455 
456  // Fit data do memmory
457  //EDBKShrinkData(deleted, pos);
458  //std::cerr << " (new size: " << localData->getSize() << ")\n";
459 
460  // Clear Key location saved by last ByKey function
461  if ( dirty | tryShrink ){
462  if (keyLocations.size() > 0){
463  keyLocations.clear();
464  //std::cerr << " CLEAR KeyLocations\n" ;
465  }
466  }
467  //Ti[3] = duration_cast<milliseconds>(system_clock::now() - start).count();
468  //std::cerr << " TIn:" << Ti[0] << " TSn:" << Ti[1] << " TRv:" << Ti[2] << " TSh:" << Ti[3] << "\n";
469 
470  // */
471 }
472 
473 // Insert data into departure buffer and send if buffer is full
474 template <typename K, typename T>
476  fastComm *comm,
477  int owner,
478  K & key,
479  T & data,
480  std::vector<size_t> & dataSize
481  ){
482  fastCommBuffer * buffer = comm->getSendBuffers();
483 
484  // If it is the beginin of the message save space for the msg
485  // size
486  if ( dataSize[owner] == 0 ){
487  buffer[owner].reset();
488  buffer[owner].advance(sizeof(size_t));
489  }
490 
491  // Insert data Into buffer
492  buffer[owner] << key << data;
493  dataSize[owner]++;
494 
495  // Check to see if we reached the maximum message size
496  if ( buffer[owner].size() >= comm->maxMsgSize ){
497  //std::cerr << "Send Async" << owner << "("<< dataSize[owner] << "," << buffer[owner].size() << ") ";
498  //Send partial data
499  buffer[owner] << char(1);
500  buffer[owner].writePos(dataSize[owner], 0);
501 
502  // Send data
503  comm->sendGroupByKeyData(owner);
504  dataSize[owner] = 0;
505  return true;
506  }
507 
508  return false;
509 }
510 
511 // Sends all buffers
512 template <typename K, typename T>
514  fastComm *comm,
515  std::vector<size_t> & dataSize
516  ){
517  fastCommBuffer * buffer = comm->getSendBuffers();
518 
519 
520 
521  //std::cerr << "\033[0;31mSF\033[0m ";
522  // Send last pice of data
523  for ( int owner = 1; owner < comm->getNumProcs(); owner++){
524  if ( owner == comm->getProcId() ) {
525  continue;
526  }
527  // If it is the beginning of the message save space for the msg
528  // size
529  if ( dataSize[owner] == 0 ){
530  buffer[owner].reset();
531  buffer[owner].advance(sizeof(size_t));
532  }
533  buffer[owner] << char(0);
534  buffer[owner].writePos(dataSize[owner], 0);
535  //for (size_t i = 0; i < buffer[owner].size(); ++i){
536  // fprintf(stderr, "%02x ", ((char*) buffer[owner].data())[i]);
537  //}
538 
539  //std::cerr << "flushDataSend-" << comm->getProcId() << ">" << owner << " ";
540 
541  // Send data
542  comm->sendGroupByKeyData(owner);
543  }
544  //std::cerr << "\033[0;33mSEND FINISHED\033[0m\n";
545 }
546 
547 // Sends pending data
548 template <typename K, typename T>
550  fastComm *comm,
551  std::vector< std::deque< std::pair<K,T> > > & pendingSend,
552  std::vector<size_t> & dataSize
553  ){
554  bool finished = true;
555 
556  for ( int owner = 1; owner < comm->getNumProcs(); owner++){
557  if (pendingSend[owner].size() == 0) {
558  continue;
559  }
560  if ( comm->isSendBufferFree(owner) ){
561  //std::cerr << "sending " << pendingSend[owner].size() << " pending > " << owner << "\n";
562  while (pendingSend[owner].size() > 0){
563  std::pair<K,T> & p = pendingSend[owner].front();
564  if ( EDBKsendDataAsync(comm, owner, p.first, p.second, dataSize) ){
565  return false;
566  }
567  pendingSend[owner].pop_front();
568  }
569  }else{
570  finished = false;
571  }
572  }
573  return finished;
574 }
575 
576 // Send data that belong to other machines
577 template <typename K, typename T>
579  fastComm *comm,
580  size_t & pos,
581  std::vector<bool> & deleted,
582  std::vector<size_t> & dataSize,
583  std::deque< std::pair<K,T> > & recvData,
584  std::vector< std::deque< std::pair<K,T> > > & pendingSend,
585  bool & dirty) {
586 
587  K * keys = localData->getKeys();
588  T * data = localData->getData();
589  size_t size = localData->getSize();
590  hasher<K> hash(comm->getNumProcs() - 1);
591  bool release = false;
592 
593  sendPending(comm, pendingSend, dataSize);
594 
595  // Insert Data that dont belong to me in the message
596  while ( pos < size){
597  K & key = keys[pos];
598  int owner = 1 + hash.get(key);
599  //std::cerr << key;
600 
601  // If it is my item dont send it
602  if (owner == comm->getProcId()){
603  pos++;
604  continue;
605  }
606 
607  dirty = true;
608  // Enqueue for sending later if buffer occupied
609  if ( ! comm->isSendBufferFree(owner) ){
610  //usleep(1);
611  pendingSend[owner].push_back( std::make_pair( key, std::move(data[pos]) ) );
612  //return false;
613  }else{
614  //std::cerr << "\033[0;34m" << key << "\033[0m:" << owner << " ";
615 
616  release = EDBKsendDataAsync(comm, owner, key, data[pos], dataSize);
617  }
618  //std::cerr << "S";
619 
620  // Place a received pair inplace
621  if (recvData.size() > 0){
622  // Replace deleted item data
623  keys[pos] = std::move(recvData.front().first);
624  data[pos] = std::move(recvData.front().second);
625  recvData.pop_front();
626  }else{
627  // Just delete item;
628  deleted[pos] = true;
629  }
630  pos++;
631 
632  if (release)
633  return false;
634 
635  }
636  //std::cerr << "no more data to send\n ";
637 
638  if ( ! sendPending(comm, pendingSend, dataSize) ){
639  return false;
640  }
641 
642  // Wait for all buffers to be freed
643  for ( int owner = 1; owner < comm->getNumProcs(); owner++){
644  if ( owner == comm->getProcId() ) {
645  continue;
646  }
647  if ( ! comm->isSendBufferFree(owner) ){
648  //std::cerr << "buffer "<< owner <<" not free\n ";
649  return false;
650  }
651  }
652 
653  flushDataSend(comm, dataSize);
654 
655  return true;
656 }
657 
658 template <typename K, typename T>
660  //using std::chrono::system_clock;
661  //using std::chrono::duration_cast;
662  //using std::chrono::milliseconds;
663  //int Ti[5];
664  //auto start = system_clock::now();
665 
666  size_t size = localData->getSize();
667  //std::cerr << " \033[0;33mExchange Data By Key ID:" << id << "(" << size << ")\033[0m\n";
668  std::vector<bool> deleted(size, false);
669  std::deque< std::pair<K,T> > recvData;
670  std::vector<size_t> dataSize(comm->getNumProcs(), 0);
671  std::vector< std::deque< std::pair<K,T> > > pendingSend(comm->getNumProcs());
672  size_t sendPos = 0;
673  size_t recvPos = 0;
674  bool dirty = false;
675  bool sendFinished = false;
676  bool recvFinished = false;
677  int peersFinished = 0;
678 
679 
680  comm->joinSlaves();
681  //std::cerr << " JOINED\n"; std::cerr.flush();
682 
683  //Ti[0] = duration_cast<milliseconds>(system_clock::now() - start).count();
684  //start = system_clock::now();
685 
686  while ( ! (recvFinished & sendFinished) ){
687  //std::cerr << "."; std::cerr.flush();
688  if ( ! sendFinished )
689  sendFinished |= EDBKSendDataHashed(comm, sendPos, deleted, dataSize, recvData, pendingSend, dirty);
690  if ( ! recvFinished )
691  recvFinished |= EDBKRecvData(comm, recvPos, sendPos, deleted, recvData, peersFinished, dirty);
692  }
693  //std::cerr << " PF:" << peersFinished << " CONDITIONS:" << sendFinished << " " << recvFinished << "\n";
694 
695  //Ti[1] = duration_cast<milliseconds>(system_clock::now() - start).count();
696  //start = system_clock::now();
697 
698  //K * keys = localData->getKeys();
699  size = localData->getSize();
700  /*for ( size_t i = 0; i < size; i++){
701  if ( deleted[i] )
702  std::cerr << "\033[0;31m ";
703  std::cerr << i << ":" << keys[i] << " ";
704  if ( deleted[i] )
705  std::cerr << "\033[0m ";
706  }// */
707  recvPos = 0;
708  EDBKFinishDataInsert(deleted, recvData, recvPos);
709  //keys = localData->getKeys();
710  //size = localData->getSize();
711  //for ( size_t i = 0; i < size; i++){
712  //std::cerr << keys[i] << " ";
713  //}
714  //std::cerr << "\n-------------------------------\n";
715 
716  if (dirty){
717  //std::cerr << "\033[0;33mSHRINK\033[0m\n";
718  EDBKShrinkData(deleted, recvPos);
719  }
720  //std::cerr << " (new size: " << localData->getSize() << ")\n";
721 
722  // Clear Key location saved by last ByKey function
723  if ( dirty ){
724  //std::cerr << " \033[0;32mDIRTY..." ;
725  if (keyLocations.size() > 0){
726  //std::cerr << " CLEAR KeyLocations" ;
727  keyLocations.clear();
728  }
729  //std::cerr << "\033[0m\n" ;
730  }else{
731  //std::cerr << " \033[0;34mNOT DIRTY!!!!!\n\033[0m" ;
732  }
733 
734  //Ti[2] = duration_cast<milliseconds>(system_clock::now() - start).count();
735  //start = system_clock::now();
736 
737  //std::cerr << " \n";
738 
739  //groupByKeyHashed = true;
740 
741  //keys = localData->getKeys();
742  //size = localData->getSize();
743  //std::cerr << "\n";
744  //for ( size_t i = 0; i < size; i++){
745  //std::cerr << i << " ";
746  //}
747  //std::cerr << "\n";// */
748  size = localData->getSize();
749 
750  //std::cerr << " Join:" ;
751  comm->joinSlaves();
752 
753  //Ti[3] = duration_cast<milliseconds>(system_clock::now() - start).count();
754  //std::cerr << "\033[34mFINISHED ("<< size <<")\033[0m";
755  /*std::cerr << " \033[0;33mEDBK\033[0m Join:" << Ti[0] <<
756  " Exchange:" << Ti[1] << " (" << sum(dataSize) << ")" <<
757  " finish:" << Ti[2] <<
758  " Join:" << Ti[3] <<
759  "\n"; // */
760 
761  return dirty;
762 }
763 
764 template <typename K, typename T>
766 
767  if (keyMap.use_count() > 0)
768  exchangeDataByKeyMapped(comm);
769  else
770  exchangeDataByKeyHashed(comm);
771 
772 }
773 
774 template <typename K, typename T>
776  uKeys = std::make_shared<std::vector<K>>();
777  uKeys->reserve( keyMap->size() / std::max(1, numProcs - 2) );
778 
779  for( auto it = keyMap->begin(); it != keyMap->end(); it++ ){
780  if ( it->second == id ){
781  //std::cerr << "\033[0;31m";
782  uKeys->insert(uKeys->end(), it->first);
783  }
784  //std::cerr << it->first << "\033[0m ";
785  }
786 }
787 
788 
789 template <typename K, typename T>
791  K * keys = localData->getKeys();
792  size_t size = localData->getSize();
793  std::unordered_map<K, bool> h(size / std::max(1, numProcs - 2));
794  int i = 0;
795 
796  //h.reserve( size / std::max(1, numProcs - 2) );
797  //uKeys->reserve( size / std::max(1, numProcs - 2) );
798 
799  for ( size_t i = 0; i < size; ++i){
800  auto loc = h.find(keys[i]);
801 
802  if (loc == h.end())
803  h.insert(h.end(), std::make_pair(keys[i], true));
804  }
805 
806  uKeys = std::make_shared<std::vector<K>>(h.size());
807 
808  for ( auto it = h.begin(); it != h.end(); it++ ){
809  (*uKeys)[i++] = it->first;
810  }
811 
812 }
813 
814 template <typename K, typename T>
816  //size_t numMyKeys = 0;
817  unsigned long tid = 0;
818  fastCommBuffer &resultBuffer = comm->getResultBuffer();
819  //std::cerr << " groupByKey\n";
820 
821  //using std::chrono::system_clock;
822  //using std::chrono::duration_cast;
823  //using std::chrono::milliseconds;
824  //int Ti[5];
825  //auto start = system_clock::now();
826 
827  //std::cerr << " RecvKeyMap\n";
828  keyMap = std::make_shared<std::unordered_map<K, int>>();
829  comm->recvKeyMap(tid, *keyMap);
830 
831  // Find out how many keys I own
832  //for ( auto it = keyMap->begin(); it != keyMap->end(); it++)
833  //if (it->second == comm->getProcId())
834  //numMyKeys++;
835  //localData->setNumKeys(numMyKeys);
836  //std::cerr << " NumKeys: " << numMyKeys << "\n";
837  //Ti[0] = duration_cast<milliseconds>(system_clock::now() - start).count();
838  //start = system_clock::now();
839 
840  findMyKeys(comm->getNumProcs(), comm->getProcId());
841  //Ti[1] = duration_cast<milliseconds>(system_clock::now() - start).count();
842  //start = system_clock::now();
843 
844  exchangeDataByKeyMapped(comm);
845  //Ti[2] = duration_cast<milliseconds>(system_clock::now() - start).count();
846 
847  //comm->waitForReq(comm->getNumProcs()-1);
848  resultBuffer << size_t(localData->getSize());
849 
850  //std::cerr << " DONE rKM:" << Ti[0] << " fKM:" << Ti[1] << " eDBK:" << Ti[2] << "\n";
851 }
852 template <typename K, typename T>
854  fastCommBuffer &resultBuffer = comm->getResultBuffer();
855  //std::cerr << "S groupByKeyHashed\n";
856 
857  bool dirty = exchangeDataByKeyHashed(comm);
858 
859  if (dirty){
860  //std::cerr << "S findMyKeysByHash\n";
861  findMyKeysByHash(comm->getNumProcs());
862  }
863 
864 
865  resultBuffer << size_t(localData->getSize());
866 }
867 
868 inline void parseData(std::string & item, std::string & d){
869  d = item;
870 }
871 inline void parseData(std::string & item, double & d){
872  d = std::atof(item.c_str());
873 }
874 inline void parseData(std::string & item, float & d){
875  d = std::atof(item.c_str());
876 }
877 inline void parseData(std::string & item, long int & d){
878  d = std::atol(item.c_str());
879 }
880 inline void parseData(std::string & item, int & d){
881  d = std::atoi(item.c_str());
882 }
883 inline void parseData(std::string & item, char & d){
884  d = item[0];
885 }
886 
887 template <typename T>
888 inline void parseData(std::stringstream & ss, T & d){
889  std::string item;
890  while(std::getline(ss, item, ' ')){
891  if(item.size() > 0){
892  parseData(item, d);
893  break;
894  }
895  }
896 }
897 
898 template <typename T>
899 inline void parseData(std::stringstream & ss, std::vector<T> & vec){
900  vec.clear();
901  std::string item;
902  T d;
903 
904  while(std::getline(ss, item, ' ')){
905  if(item.size() > 0){
906  parseData(item, d);
907  vec.insert(vec.end(), d);
908  }
909  }
910 }
911 
912 template <typename T>
913 bool waitForLastStage(std::deque<T> & q, bool lastStageDone){
914  while(q.size() == 0){
915  if (lastStageDone){
916  return true;
917  }else{
918  usleep(10);
919  }
920  }
921  return false;
922 }
923 bool onlineReadStage1Full(std::ifstream & inFile, std::deque<std::vector<std::string>> & q1, omp_lock_t & q1lock, const int blocksize){
924  std::vector<std::string> lines(blocksize, "");
925 
926  for ( int i = 0; i < blocksize; i++){
927  if( inFile.good() ){
928  std::getline( inFile, lines[i] );
929  }else{
930  break;
931  }
932  }
933 
934  omp_set_lock(&q1lock);
935  q1.push_back(std::move(lines));
936  omp_unset_lock(&q1lock);
937 
938  if ( ! inFile.good() ){
939  std::cerr << "\033[0;31mF\033[0m";
940  return true;
941  }
942 
943  while(q1.size() >= 100)
944  usleep(10);
945 
946  return false;
947 }
948 bool onlineReadStage1(std::ifstream & inFile, std::deque<std::vector<std::string>> & q1, omp_lock_t & q1lock, size_t endOffset, const int blocksize){
949  std::vector<std::string> lines(blocksize, "");
950 
951  for ( int i = 0; i < blocksize; i++){
952  if( inFile.good() && ( size_t(inFile.tellg()) <= endOffset ) ){
953  std::getline( inFile, lines[i] );
954  }else{
955  break;
956  }
957  }
958 
959  omp_set_lock(&q1lock);
960  q1.push_back(std::move(lines));
961  omp_unset_lock(&q1lock);
962 
963  if ( (! inFile.good()) || ( size_t(inFile.tellg()) > endOffset ) )
964  return true;
965 
966  while(q1.size() >= 100)
967  usleep(10);
968 
969  return false;
970 }
971 template <typename K, typename T>
972 bool onlineReadStage2(std::deque<std::vector<std::string>> & q1, omp_lock_t & q1lock, std::deque<std::vector<std::pair<K,T>>> & q2, omp_lock_t & q2lock){
973  std::vector<std::string> lines;
974  std::pair<K,T> item;
975 
976  omp_set_lock(&q1lock);
977  lines = std::move(q1.front());
978  q1.pop_front();
979  omp_unset_lock(&q1lock);
980 
981  std::vector<std::pair<K,T>> items;
982  items.reserve(lines.size());
983 
984  //omp_set_num_threads(10);
985  //#pragma omp parallel for schedule(dynamic,10) private(ss)
986  for ( size_t i = 0; i < lines.size(); i++){
987  if (lines[i].length() > 0 ){
988  //std::cerr << lines[i] << "\n";
989  std::stringstream ss(lines[i]);
990  parseData(ss, item.first);
991  parseData(ss, item.second);
992  items.insert(items.end(), item);
993  }
994  }
995 
996  omp_set_lock(&q2lock);
997  q2.push_back(std::move(items));
998  omp_unset_lock(&q2lock);
999 
1000  while(q2.size() >= 100)
1001  usleep(10);
1002 
1003  return false;
1004 }
1005 template <typename K, typename T>
1006 bool faster::workerIFddCore<K,T>::onlinePartReadStage3(std::unordered_map<K, int> & localKeyMap, fastComm *comm, void * funcP, std::deque<std::vector<std::pair<K,T>>> & q2, omp_lock_t & q2lock){
1007  std::vector<std::pair<K,T>> items;
1008 
1009  omp_set_lock(&q2lock);
1010  items = std::move(q2.front());
1011  q2.pop_front();
1012  omp_unset_lock(&q2lock);
1013 
1014  for ( size_t i = 0; i < items.size(); i++){
1015  K & key = items[i].first;
1016  T & data = items[i].second;
1017 
1018  int myPart = ( (IonlineFullPartFuncP<K,T>) funcP ) ( key, data ) ;
1019  if ( myPart == comm->getProcId() ) {
1020  this->localData->insert(key, data);
1021  }
1022 
1023  auto location = localKeyMap.find(key);
1024  if (location == localKeyMap.end()){
1025  uKeys->insert(uKeys->end(), key);
1026  localKeyMap.insert(std::make_pair(key, myPart));
1027  }else{
1028  location->second = myPart;
1029  }
1030  }
1031  return false;
1032 }
1033 template <typename K, typename T>
1034 bool faster::workerIFddCore<K,T>::onlineReadStage3(std::deque<std::vector<std::pair<K,T>>> & q2, omp_lock_t & q2lock){
1035  std::vector<std::pair<K,T>> items;
1036 
1037  omp_set_lock(&q2lock);
1038  items = std::move(q2.front());
1039  q2.pop_front();
1040  omp_unset_lock(&q2lock);
1041 
1042  for ( size_t i = 0; i < items.size(); i++){
1043  K & key = items[i].first;
1044  T & data = items[i].second;
1045 
1046  this->insert(&key, &data, 1);
1047  }
1048  return false;
1049 }
1050 
1051 template <typename K, typename T>
1053  std::string filename;
1054  keyMap = std::make_shared<std::unordered_map<K, int>>();
1055  uKeys = std::make_shared<std::vector<K>>();
1056 
1057  // Get file path
1058  comm->recvFileName(filename);
1059 
1060  // Open File
1061  std::ifstream inFile(filename, std::ifstream::in);
1062 
1063  // Start reading lines
1064  /*std::string line;
1065  while( inFile.good() ){
1066  K key;
1067  T d;
1068  std::getline( inFile, line );
1069 
1070  if (line.length() == 0)
1071  continue;
1072 
1073  ss.clear();
1074  ss.str(line);
1075  parseData(ss, key);
1076  parseData(ss, d);
1077 
1078  int myPart = ( (IonlineFullPartFuncP<K,T>) funcP ) ( key, d) ;
1079  if ( myPart == comm->getProcId() ) {
1080  this->localData->insert(key, d);
1081  }
1082 
1083  auto location = keyMap->find(key);
1084  if (location == keyMap->end()){
1085  uKeys->insert(uKeys->end(), key);
1086  keyMap->insert(std::make_pair(key, myPart));
1087  }else{
1088  location->second = myPart;
1089  }
1090 
1091 
1092  }// */
1093 
1094  std::deque<std::vector<std::string>> q1;
1095  std::deque<std::vector<std::pair<K,T>>> q2;
1096  omp_lock_t q1lock;
1097  omp_lock_t q2lock;
1098  omp_init_lock(&q1lock);
1099  omp_init_lock(&q2lock);
1100  bool stage1Done = false;
1101  bool stage2Done = false;
1102  bool stage3Done = false;
1103  const int blocksize = 1000;
1104  #pragma omp parallel
1105  {
1106  #pragma omp sections
1107  {
1108  // Read fron disk
1109  #pragma omp section
1110  while( ! stage1Done ){
1111  std::cerr << "\033[1;32mR";
1112  stage1Done = onlineReadStage1Full(inFile, q1, q1lock, blocksize);
1113  }
1114 
1115  // Parse lines
1116  #pragma omp section
1117  while( ! stage2Done ){
1118  std::cerr << "\033[1;33mP";
1119 
1120  stage2Done = waitForLastStage(q1, stage1Done);
1121  if ( stage2Done ) break;
1122 
1123  stage2Done = onlineReadStage2(q1, q1lock, q2, q2lock);
1124 
1125  }
1126 
1127  // Insert items into the dataset
1128  #pragma omp section
1129  while( ! stage3Done ){
1130  std::cerr << "\033[1;34mI";
1131 
1132  stage3Done = waitForLastStage(q2, stage2Done);
1133  if ( stage3Done ) break;
1134 
1135  stage2Done = onlinePartReadStage3(*this->keyMap, comm, funcP, q2, q2lock);
1136  }
1137  }
1138  }
1139  std::cerr << " \033[mDONE\n";
1140  omp_destroy_lock(&q1lock);
1141  omp_destroy_lock(&q2lock);// */
1142 }
1143 
1144 void findFileOffset(fastComm *comm, std::ifstream & inFile, size_t & endOffset, std::deque<std::vector<std::string>> & q1){
1145  size_t offset, size;
1146  std::vector<std::string> line(1);
1147 
1148  inFile.seekg(0, std::ifstream::end);
1149  size = inFile.tellg() / (comm->getNumProcs() - 1);
1150  offset = (comm->getProcId() - 1) * size;
1151  endOffset = offset + size;
1152 
1153  inFile.seekg(offset, inFile.beg);
1154 
1155  if ( offset > 0 ){
1156  char c = inFile.get();
1157  std::getline( inFile, line[0] );
1158  if ( c == '\n' ){
1159  q1.push_back(std::move(line));
1160  }
1161 
1162  }
1163 }
1164 
1165 template <typename K, typename T>
1166 void faster::workerIFddCore<K,T>::onlinePartRead(fastComm *comm, void * funcP){
1167  using std::chrono::system_clock;
1168  using std::chrono::duration_cast;
1169  using std::chrono::milliseconds;
1170 
1171  auto start = system_clock::now();
1172  std::string filename;
1173  std::deque<std::vector<std::string>> q1;
1174  std::deque<std::vector<std::pair<K,T>>> q2;
1175  omp_lock_t q1lock;
1176  omp_lock_t q2lock;
1177  omp_init_lock(&q1lock);
1178  omp_init_lock(&q2lock);
1179  bool stage1Done = false;
1180  bool stage2Done = false;
1181  bool stage3Done = false;
1182  const int blocksize = 200;
1183  size_t endOffset;
1184 
1185  // Init
1186  keyMap = std::make_shared<std::unordered_map<K, int>>();
1187  std::unordered_map<K, int> localKeyMap;
1188  uKeys = std::make_shared<std::vector<K>>();
1189 
1190  // Get file path
1191  comm->recvFileName(filename);
1192 
1193  // Open File
1194  std::ifstream inFile(filename, std::ifstream::in);
1195 
1196  // Find file offset
1197  findFileOffset(comm, inFile, endOffset, q1);
1198 
1199  #pragma omp parallel
1200  {
1201  #pragma omp sections
1202  {
1203  // Read fron disk
1204  #pragma omp section
1205  while( ! stage1Done ){
1206  stage1Done = onlineReadStage1(inFile, q1, q1lock, endOffset, blocksize);
1207  }
1208 
1209  // Parse line
1210  #pragma omp section
1211  while( ! stage2Done ){
1212 
1213  stage2Done = waitForLastStage(q1, stage1Done);
1214  if ( stage2Done ) break;
1215 
1216  stage2Done = onlineReadStage2(q1, q1lock, q2, q2lock);
1217 
1218  }
1219 
1220  #pragma omp section
1221  while( ! stage3Done ){
1222 
1223  stage3Done = waitForLastStage(q2, stage2Done);
1224  if ( stage3Done ) break;
1225 
1226  stage3Done = onlinePartReadStage3(localKeyMap, comm, funcP, q2, q2lock);
1227  }
1228  }
1229  }
1230  omp_destroy_lock(&q1lock);
1231  omp_destroy_lock(&q2lock);// */
1232 
1233  std::cerr << " Read:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
1234  start = system_clock::now();
1235 
1236  comm->distributeKeyMap(localKeyMap, *keyMap);
1237  std::cerr << " DistKeys:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
1238 }
1239 
1240 template <typename K, typename T>
1242  using std::chrono::system_clock;
1243  using std::chrono::duration_cast;
1244  using std::chrono::milliseconds;
1245 
1246  auto start = system_clock::now();
1247  std::string filename;
1248  std::deque<std::vector<std::string>> q1;
1249  std::deque<std::vector<std::pair<K,T>>> q2;
1250  omp_lock_t q1lock;
1251  omp_lock_t q2lock;
1252  omp_init_lock(&q1lock);
1253  omp_init_lock(&q2lock);
1254  bool stage1Done = false;
1255  bool stage2Done = false;
1256  bool stage3Done = false;
1257  const int blocksize = 4;
1258  size_t endOffset;
1259 
1260  // Init
1261  keyMap = std::make_shared<std::unordered_map<K, int>>();
1262  uKeys = std::make_shared<std::vector<K>>();
1263 
1264  // Get file path
1265  comm->recvFileName(filename);
1266 
1267  // Open File
1268  std::ifstream inFile(filename, std::ifstream::in);
1269 
1270  // Find file offset
1271  findFileOffset(comm, inFile, endOffset, q1);
1272 
1273  #pragma omp parallel
1274  {
1275  #pragma omp sections
1276  {
1277  // Read fron disk
1278  #pragma omp section
1279  while( ! stage1Done ){
1280  std::cerr << "\033[1;32mR";
1281 
1282  stage1Done = onlineReadStage1(inFile, q1, q1lock, endOffset, blocksize);
1283  }
1284 
1285  // Parse line
1286  #pragma omp section
1287  while( ! stage2Done ){
1288  std::cerr << "\033[1;33mP";
1289 
1290  stage2Done = waitForLastStage(q1, stage1Done);
1291  if ( stage2Done ) break;
1292 
1293  stage2Done = onlineReadStage2(q1, q1lock, q2, q2lock);
1294 
1295  }
1296 
1297  #pragma omp section
1298  while( ! stage3Done ){
1299  std::cerr << "\033[1;34mI";
1300 
1301  stage3Done = waitForLastStage(q2, stage2Done);
1302  if ( stage3Done ) break;
1303 
1304  stage3Done = onlineReadStage3(q2, q2lock);
1305  }
1306  }
1307  }
1308  omp_destroy_lock(&q1lock);
1309  omp_destroy_lock(&q2lock);// */
1310 
1311  std::cerr << " Read:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
1312 }
1313 
1314 template <typename K, typename T>
1315 void printData(std::ofstream & outFile, K * keys, std::vector<T> * data, size_t s){
1316  outFile.precision(10);
1317  for ( size_t i = 0; i < s; i++){
1318  outFile << keys[i] << " ";
1319  for ( size_t j = 0; j < data[i].size(); j++){
1320  outFile << data[i][j] << " ";
1321  }
1322  outFile << "\n";
1323  }
1324 }
1325 
1326 template <typename K, typename T>
1327 void printData(std::ofstream & outFile, K * keys, T * data, size_t s){
1328  outFile.precision(10);
1329  for ( size_t i = 0; i < s; i++){
1330  outFile << keys[i] << " " << data[i] << "\n";
1331  }
1332 }
1333 
1334 
1335 template <typename K>
1336 void printData(std::ofstream & outFile, K * keys, double * data, size_t s){
1337  outFile << std::fixed << std::setprecision(16);
1338  for ( size_t i = 0; i < s; i++){
1339  outFile << keys[i] << " " << data[i] << "\n";
1340  }
1341 }
1342 
1343 template <typename K, typename T>
1344 void faster::workerIFddCore<K, T>::writeToFile(void * pathP, size_t procId, void * sufixP){
1345  std::string path = * (std::string*) pathP;
1346  std::string sufix = * (std::string*) sufixP;
1347 
1348  K * keys = localData->getKeys();
1349  T * data = localData->getData();
1350  size_t s = localData->getSize();
1351 
1352  std::string filename(path + std::to_string(procId) + sufix);
1353 
1354  std::ofstream outFile(filename, std::ofstream::out);
1355  //std::cerr << "Write file w:" << s << " lines\n";
1356  printData(outFile, keys, data, s);
1357 }
1358 
1359 template <typename K, typename T>
1360 void faster::workerIFddCore<K, T>::preapply(long unsigned int id, void * func, fddOpType op, workerFddBase * dest, fastComm * comm){
1361  using std::chrono::system_clock;
1362  using std::chrono::duration_cast;
1363  using std::chrono::milliseconds;
1364 
1365  auto start = system_clock::now();
1366  fastCommBuffer &buffer = comm->getResultBuffer();
1367  size_t durationP;
1368  size_t rSizeP;
1369  size_t rStatP;
1370  size_t headerSize;
1371  char ret = 0;
1372  procstat s;
1373 
1374  buffer.reset();
1375  buffer << id;
1376 
1377  // Reserve space for the time duration
1378  durationP = buffer.size();
1379  buffer.advance(sizeof(size_t));
1380 
1381  rStatP = buffer.size();
1382  buffer.advance(s);
1383 
1384  rSizeP = buffer.size();
1385  buffer.advance(sizeof(size_t));
1386 
1387  headerSize = buffer.size();
1388 
1389  if (op & (OP_GENERICMAP | OP_GENERICREDUCE | OP_GENERICUPDATE)){
1390  this->apply(func, op, dest, buffer);
1391  if (dest) buffer << size_t(dest->getSize());
1392  }else{
1393  switch(op){
1394  case OP_CountByKey:
1395  countByKey(comm);
1396  break;
1397  case OP_GroupByKey:
1398  groupByKey(comm);
1399  break;
1400  case OP_GroupByKeyH:
1401  groupByKeyHashed(comm);
1402  break;
1403  case OP_OnlineRead:
1404  onlineRead(comm);
1405  buffer << size_t(this->getSize());
1406  break;
1407  case OP_OnPartRead:
1408  onlinePartRead(comm, func);
1409  buffer << size_t(this->getSize());
1410  break;
1411  case OP_OnFullPRead:
1412  onlineFullPartRead(comm, func);
1413  buffer << size_t(this->getSize());
1414  break;
1415  }
1416  buffer << ret;
1417  }
1418  auto end = system_clock::now();
1419  auto duration = duration_cast<milliseconds>(end - start);
1420  //std::cerr << " ET:" << duration.count() << " ";
1421 
1422  buffer.writePos(size_t(duration.count()), durationP);
1423  buffer.writePos(getProcStat(), rStatP);
1424  buffer.writePos(size_t(buffer.size() - headerSize), rSizeP);
1425 
1426  comm->sendTaskResult();
1427 
1428 }
1429 
1430 
1431 template class faster::workerIFddCore<char, char>;
1432 template class faster::workerIFddCore<char, int>;
1436 //template class faster::workerIFddCore<char, char*>;
1437 //template class faster::workerIFddCore<char, int*>;
1438 //template class faster::workerIFddCore<char, long int*>;
1439 //template class faster::workerIFddCore<char, float*>;
1440 //template class faster::workerIFddCore<char, double*>;
1447 
1448 template class faster::workerIFddCore<int, char>;
1449 template class faster::workerIFddCore<int, int>;
1451 template class faster::workerIFddCore<int, float>;
1453 //template class faster::workerIFddCore<int, char*>;
1454 //template class faster::workerIFddCore<int, int*>;
1455 //template class faster::workerIFddCore<int, long int*>;
1456 //template class faster::workerIFddCore<int, float*>;
1457 //template class faster::workerIFddCore<int, double*>;
1464 
1470 //template class faster::workerIFddCore<long int, char*>;
1471 //template class faster::workerIFddCore<long int, int*>;
1472 //template class faster::workerIFddCore<long int, long int*>;
1473 //template class faster::workerIFddCore<long int, float*>;
1474 //template class faster::workerIFddCore<long int, double*>;
1481 
1483 template class faster::workerIFddCore<float, int>;
1487 //template class faster::workerIFddCore<float, char*>;
1488 //template class faster::workerIFddCore<float, int*>;
1489 //template class faster::workerIFddCore<float, long int*>;
1490 //template class faster::workerIFddCore<float, float*>;
1491 //template class faster::workerIFddCore<float, double*>;
1498 
1504 //template class faster::workerIFddCore<double, char*>;
1505 //template class faster::workerIFddCore<double, int*>;
1506 //template class faster::workerIFddCore<double, long int*>;
1507 //template class faster::workerIFddCore<double, float*>;
1508 //template class faster::workerIFddCore<double, double*>;
1515 
1521 //template class faster::workerIFddCore<std::string, char*>;
1522 //template class faster::workerIFddCore<std::string, int*>;
1523 //template class faster::workerIFddCore<std::string, long int*>;
1524 //template class faster::workerIFddCore<std::string, float*>;
1525 //template class faster::workerIFddCore<std::string, double*>;
1532 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41
fdd< T > * onlineRead(std::string path, onlineFullPartFuncP< T > funcP)
Reads a file with online parsing and mapping (?)
libfaster main namespace
Definition: _workerFdd.h:11
unsigned int fddType
Dataset type.
Definition: definitions.h:16
int numProcs()
Return the number of processes running.
Definition: fastContext.h:150
fdd< T > * onlineFullPartRead(std::string path, onlineFullPartFuncP< T > funcP)
Reads a file with online parsing and partition (NOT IMPLEMENTED)