libfaster API Documentation  Development Version
Super fast distributted computing
indexedFdd.h
1 #ifndef LIBFASTER_INDEXEDFDD_H
2 #define LIBFASTER_INDEXEDFDD_H
3 
4 #include <vector>
5 #include <typeinfo>
6 #include <stdio.h>
7 #include <list>
8 #include <tuple>
9 #include <memory>
10 
11 #include "definitions.h"
12 #include "fddBase.h"
13 #include "fastContext.h"
14 #include "misc.h"
15 
16 namespace faster{
17 
18  class fastContext;
19 
20  template<typename K>
21  class groupedFdd;
22 
23  template <typename K, typename T>
24  class indexedFdd ;
25 
26  // Driver side FDD
27  // It just sends commands to the workers.
28  template <typename K, typename T>
29  class iFddCore : public fddBase{
30 
31  protected:
32  bool groupedByKey;
33  bool groupedByMap;
34  fastContext * context;
35 
36  iFddCore() {
37  _kType = decodeType(typeid(K).hash_code());
38  groupedByKey = false;
39  groupedByMap = false;
40  cached = false;
41  }
42 
43  // Create a empty fdd
44  iFddCore(fastContext &c) {
45  _kType = decodeType(typeid(K).hash_code());
46  groupedByKey = false;
47  groupedByMap = false;
48  cached = false;
49  context = &c;
50  }
51 
52  // Create a empty fdd with a pre allocated size
53  iFddCore(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) {
54  _kType = decodeType(typeid(K).hash_code());
55  groupedByKey = false;
56  groupedByMap = false;
57  cached = false;
58  context = &c;
59  this->size = s;
60  this->dataAlloc = dataAlloc;
61  }
62 
63  virtual ~iFddCore(){
64  }
65 
66  std::unordered_map<K, std::tuple<size_t, int, size_t>> * calculateKeyCount(std::vector< std::pair<void *, size_t> > & result);
67  std::unordered_map<K, int> calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> & count);
68 
69  // -------------- Core FDD Functions --------------- //
70  void update(void * funcP, fddOpType op);
71  fddBase * _map(void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start);
72  template <typename U>
73  fdd<U> * map( void * funcP, fddOpType op);
74  template <typename L, typename U>
75  indexedFdd<L,U> * mapI( void * funcP, fddOpType op);
76 
77  indexedFdd<K,T> * groupByKeyMapped();
78  indexedFdd<K,T> * groupByKeyHashed();
79 
80  public:
81  //template<typename... FddTypes, typename... Args>
82  //groupedFdd<K, T, FddTypes...> * cogroup(Args * ... args){
83  //return new groupedFdd<K, T, FddTypes...>(context, this, args...);
84  //}
85 
94  template<typename U>
96 
97  this->groupByKey();
98  auto start = system_clock::now();
99 
100  return new groupedFdd<K>(context, this, fdd1, start);
101  }
102 
113  template<typename U, typename V>
115 
116  this->groupByKey();
117  auto start = system_clock::now();
118 
119  return new groupedFdd<K>(context, this, fdd1, fdd2, start);
120  }
121 
125  std::unordered_map<K, size_t> countByKey();
126 
132 
135  void discard(){
136  //std::cerr << "\033[0;31mDEL" << id << "\033[0m ";
137  context->discardFDD(id);
138  }
139 
147  void writeToFile(std::string path, std::string sufix);
148 
152  bool isGroupedByKey() {
153  return groupedByKey;
154  }
156  void setGroupedByKey(bool gbk) {
157  groupedByKey = gbk;
158  }
160  void setGroupedByMap(bool gbm) {
161  groupedByMap = gbm;
162  }
163 
164  };
165 
166  template <typename K, typename T>
167  class indexedFdd : public iFddCore<K,T>{
168  private:
169  std::pair <K,T> finishReduces(char ** partResult, size_t * pSize, int funcId, fddOpType op);
170  std::pair <K,T> reduce( void * funcP, fddOpType op);
171 
172 
173 
174  public:
175  // -------------- Constructors --------------- //
176 
179  this->_tType = decodeType(typeid(T).hash_code());
180  this->id = c.createIFDD(this, typeid(K).hash_code(), typeid(T).hash_code());
181  }
182 
184  indexedFdd(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : iFddCore<K,T>(c, s, dataAlloc){
185  this->_tType = decodeType(typeid(T).hash_code());
186  this->id = c.createIFDD(this, typeid(K).hash_code(), typeid(T).hash_code(), dataAlloc);
187  }
188 
190  indexedFdd(fastContext &c, size_t s) : indexedFdd(c, s, c.getAllocation(s)) { }
191 
193  indexedFdd(fastContext &c, K * keys, T * data, size_t size) : indexedFdd(c, size){
194  c.parallelizeI(this->id, keys, data, size);
195  }
196 
198  indexedFdd(fastContext &c, std::string) { //: indexedFdd(c, size){
199  //this->onlineRead();
200  }
201 
205  }
206 
207  // --------------- FDD Builtin functions ------------- //
211  std::vector<std::pair<K,T>> collect( ){
212  //std::cerr << " \033[0;31mSIZE: " << this->size << "\033[0m";
213  std::vector<std::pair<K,T>> data(this->size);
214  this->context->collectFDD(data, this);
215  return data;
216  }
217 
223  this->cached = true;
224  return this;
225  }
226 
227  // Update
230  indexedFdd<K,T> * update(updateIFunctionP<K,T> funcP){
231  iFddCore<K,T>::update((void*) funcP, OP_Update);
232  return this;
233  }
234 
235  // -------------- FDD Functions --------------- //
236 
239 
240  // Map
242  template <typename L, typename U>
243  indexedFdd<L,U> * map( ImapIFunctionP<K,T,L,U> funcP ){
244  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_Map);
245  }
247  template <typename L, typename U>
248  indexedFdd<L,U> * map( IPmapIFunctionP<K,T,L,U> funcP ){
249  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_Map);
250  }
252  template <typename U>
253  fdd<U> * map( mapIFunctionP<K,T,U> funcP ){
254  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
255  }
257  template <typename U>
258  fdd<U> * map( PmapIFunctionP<K,T,U> funcP ){
259  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
260  }
261 
262  // MapByKey
265  template <typename L, typename U>
266  indexedFdd<L,U> * mapByKey( ImapByKeyIFunctionP<K,T,L,U> funcP ){
267  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_MapByKey);
268  }
271  template <typename L, typename U>
272  indexedFdd<L,U> * mapByKey( IPmapByKeyIFunctionP<K,T,L,U> funcP ){
273  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_MapByKey);
274  }
277  template <typename L, typename U>
278  fdd<U> * mapByKey( mapByKeyIFunctionP<K,T,U> funcP ){
279  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
280  }
283  template <typename L, typename U>
284  fdd<U> * mapByKey( PmapByKeyIFunctionP<K,T,U> funcP ){
285  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
286  }
287 
288 
289  // BulkMap
292  template <typename L, typename U>
293  indexedFdd<L,U> * bulkMap( IbulkMapIFunctionP<K,T,L,U> funcP ){
294  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
295  }
298  template <typename L, typename U>
299  indexedFdd<L,U> * bulkMap( IPbulkMapIFunctionP<K,T,L,U> funcP ){
300  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
301  }
304  template <typename L, typename U>
305  fdd<U> * bulkMap( bulkMapIFunctionP<K,T,U> funcP ){
306  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
307  }
310  template <typename L, typename U>
311  fdd<U> * bulkMap( PbulkMapIFunctionP<K,T,U> funcP ){
312  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
313  }
315 
316 
319 
320  // FlatMap
322  template <typename L, typename U>
323  indexedFdd<L,U> * flatMap( IflatMapIFunctionP<K,T,L,U> funcP ){
324  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
325  }
327  template <typename L, typename U>
328  indexedFdd<L,U> * flatMap( IPflatMapIFunctionP<K,T,L,U> funcP ){
329  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
330  }
332  template <typename L, typename U>
333  fdd<U> * flatMap( flatMapIFunctionP<K,T,U> funcP ){
334  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
335  }
337  template <typename L, typename U>
338  fdd<U> * flatMap( PflatMapIFunctionP<K,T,U> funcP ){
339  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
340  }
341 
342  // TODO add flatMapByKey
343 
344 
347  template <typename L, typename U>
348  indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIFunctionP<K,T,L,U> funcP ){
349  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
350  }
353  template <typename L, typename U>
354  indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIFunctionP<K,T,L,U> funcP ){
355  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
356  }
359  template <typename L, typename U>
360  fdd<U> * bulkFlatMap( bulkFlatMapIFunctionP<K,T,U> funcP ){
361  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
362  }
365  template <typename L, typename U>
366  fdd<U> * bulkFlatMap( PbulkFlatMapIFunctionP<K,T,U> funcP ){
367  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
368  }
370 
371  // ------------------ Reduce ----------------- //
374 
375  // Run a Reduce
377  std::pair<K,T> reduce( IreduceIFunctionP<K,T> funcP ){
378  return reduce((void*) funcP, OP_Reduce);
379  }
380  //indexedFdd<K,T> * std::pair<K,T> reduceByKey( IreduceByKeyIFunctionP<K,T> funcP ){
381  //return reduceByKey((void*) funcP, OP_Reduce);
382  //}
383 
387  std::pair<K,T> bulkReduce( IbulkReduceIFunctionP<K,T> funcP ){
388  return reduce((void*) funcP, OP_BulkReduce);
389  }
391 
392  };
393 
394  template <typename K, typename T>
395  class indexedFdd<K,T *> : public iFddCore<K,T*>{
396  private:
397  std::tuple <K,T,size_t> finishReducesP(char ** partResult, size_t * pSize, int funcId, fddOpType op);
398  std::tuple <K,T,size_t> reduceP( void * funcP, fddOpType op);
399 
400  public:
401  // -------------- Constructors --------------- //
402  //
403  // Create a empty fdd
405  this->_tType = POINTER | decodeType(typeid(T).hash_code());
406  this->id = c.createIPFDD(this, typeid(K).hash_code(), typeid(T).hash_code());
407  }
408 
409  // Create a empty fdd with a pre allocated size
410  indexedFdd(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : iFddCore<K,T *>(c, s, dataAlloc){
411  this->_tType = POINTER | decodeType(typeid(T).hash_code());
412  this->id = c.createIPFDD(this, typeid(K).hash_code(), typeid(T).hash_code(), c.getAllocation(s));
413  }
414  indexedFdd(fastContext &c, size_t s) : indexedFdd(c, s, c.getAllocation(s)) {
415  }
416 
417  // Create a fdd from a array in memory
418  indexedFdd(fastContext &c, K * keys, T ** data, size_t * dataSizes, size_t size) : indexedFdd(c, size){
419  c.parallelizeI(this->id, keys, data, dataSizes, size);
420  }
421 
422  ~indexedFdd(){
423  }
424 
425 
426  // -------------- FDD Functions Parameter Specification --------------- //
427  // These need to be specialized because they can return a pointer or not
428 
429  // -------------------- Map ------------------- //
430 
431  // Map
432  template <typename L, typename U>
433  indexedFdd<L,U> * map( ImapIPFunctionP<K,T,L,U> funcP ){
434  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_Map);
435  }
436  template <typename L, typename U>
437  indexedFdd<L,U> * map( IPmapIPFunctionP<K,T,L,U> funcP ){
438  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_Map);
439  }
440  template <typename L, typename U>
441  fdd<U> * map( mapIPFunctionP<K,T,U> funcP ){
442  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
443  }
444  template <typename L, typename U>
445  fdd<U> * map( PmapIPFunctionP<K,T,U> funcP ){
446  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
447  }
448 
449  // MapByKey
450  template <typename L, typename U>
451  indexedFdd<L,U> * mapByKey( ImapByKeyIPFunctionP<K,T,L,U> funcP ){
452  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_MapByKey);
453  }
454  template <typename L, typename U>
455  indexedFdd<L,U> * mapByKey( IPmapByKeyIPFunctionP<K,T,L,U> funcP ){
456  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_MapByKey);
457  }
458  template <typename L, typename U>
459  fdd<U> * mapByKey( mapByKeyIPFunctionP<K,T,U> funcP ){
460  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
461  }
462  template <typename L, typename U>
463  fdd<U> * mapByKey( PmapByKeyIPFunctionP<K,T,U> funcP ){
464  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
465  }
466 
467 
468  template <typename L, typename U>
469  indexedFdd<L,U> * bulkMap( IbulkMapIPFunctionP<K,T,L,U> funcP ){
470  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkMap);
471  }
472  template <typename L, typename U>
473  indexedFdd<L,U> * bulkMap( IPbulkMapIPFunctionP<K,T,L,U> funcP ){
474  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkMap);
475  }
476  template <typename L, typename U>
477  fdd<U> * bulkMap( bulkMapIPFunctionP<K,T,U> funcP ){
478  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
479  }
480  template <typename L, typename U>
481  fdd<U> * bulkMap( PbulkMapIPFunctionP<K,T,U> funcP ){
482  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
483  }
484 
485 
486  template <typename L, typename U>
487  indexedFdd<L,U> * flatMap( IflatMapIPFunctionP<K,T,L,U> funcP){
488  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_FlatMap);
489  }
490  template <typename L, typename U>
491  indexedFdd<L,U> * flatMap( IPflatMapIPFunctionP<K,T,L,U> funcP){
492  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_FlatMap);
493  }
494  template <typename L, typename U>
495  fdd<U> * flatMap( flatMapIPFunctionP<K,T,U> funcP){
496  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
497  }
498  template <typename L, typename U>
499  fdd<U> * flatMap( PflatMapIPFunctionP<K,T,U> funcP){
500  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
501  }
502 
503 
504  template <typename L, typename U>
505  indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIPFunctionP<K,T,L,U> funcP){
506  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkFlatMap);
507  }
508  template <typename L, typename U>
509  indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIPFunctionP<K,T,L,U> funcP){
510  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkFlatMap);
511  }
512  template <typename L, typename U>
513  fdd<U> * bulkFlatMap( bulkFlatMapIPFunctionP<K,T,U> funcP){
514  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
515  }
516  template <typename L, typename U>
517  fdd<U> * bulkFlatMap( PbulkFlatMapIPFunctionP<K,T,U> funcP){
518  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
519  }
520 
521  // ------------------ Reduce ----------------- //
522 
523  // Run a Reduce
524  inline std::vector<std::pair<K,T>> reduce(IPreduceIPFunctionP<K,T> funcP ){
525  return reduce((void*) funcP, OP_Reduce);
526  }
527  //inline indexedFdd<K,T> reduceByKey(IPreduceByKeyIPFunctionP<K,T> funcP ){
528  //return reduceByKey((void*) funcP, OP_Reduce);
529  //}
530  inline std::vector<std::pair<K,T>> bulkReduce(IPbulkReduceIPFunctionP<K,T> funcP ){
531  return reduce((void*) funcP, OP_BulkReduce);
532  }
533 
534  // --------------- FDD Builtin functions ------------- //
535  // Collect a FDD
536  std::vector<std::tuple<K,T*, size_t>> collect( ) {
537  std::vector<std::tuple<K,T*, size_t>> data(this->size);
538  this->context->collectFDD(data, this);
539  return data;
540  }
541 
542  indexedFdd<K,T*> * cache(){
543  this->cached = true;
544  return this;
545  }
546 
547  };
548 
549 
550  template <typename K, typename T>
551  void iFddCore<K,T>::update( void * funcP, fddOpType op){
552  //std::cerr << " Send Update Req\n";
553  unsigned long int tid, sid;
554  auto start = system_clock::now();
555 
556  // Decode function pointer
557  int funcId = context->findFunc(funcP);
558 
559  // Send task
560  context->enqueueTask(op, id, 0, funcId, this->size);
561 
562  // Receive results
563  auto result = context->recvTaskResult(tid, sid, start);
564 
565  if (!cached)
566  this->discard();
567 
568  //std::cerr << "\n";
569  }
570 
571  template <typename K, typename T>
572  fddBase * iFddCore<K,T>::_map( void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start){
573  //std::cerr << " Map ";
574  unsigned long int tid, sid;
575  unsigned long int newFddId = newFdd->getId();
576 
577  context->setInternal(newFdd->getId(), true);
578 
579  // Decode function pointer
580  int funcId = context->findFunc(funcP);
581 
582  // Send task
583  context->enqueueTask(op, id, newFddId, funcId, this->size);
584 
585  // Receive results
586  auto result = context->recvTaskResult(tid, sid, start);
587 
588  if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ) {
589  size_t newSize = 0;
590 
591  for (int i = 1; i < context->numProcs(); ++i){
592  if (result[i].second > 0) newSize += * (size_t*) result[i].first;
593  }
594 
595  newFdd->setSize(newSize);
596  }
597 
598  if (!cached)
599  this->discard();
600 
601  //std::cerr << "\n";
602  return newFdd;
603  }
604  template <typename K, typename T>
605  template <typename L, typename U>
606  indexedFdd<L,U> * iFddCore<K,T>::mapI( void * funcP, fddOpType op){
607  indexedFdd<L,U> * newFdd;
608  auto start = system_clock::now();
609 
610  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
611  newFdd = new indexedFdd<L,U>(*context);
612  }else{
613  if (dataAlloc.empty()) dataAlloc = context->getAllocation(size);
614  newFdd = new indexedFdd<L,U>(*context, size, dataAlloc);
615  }
616 
617  return (indexedFdd<L,U> *) _map(funcP, op, newFdd, start);
618  }
619 
620  template <typename K, typename T>
621  template <typename U>
622  fdd<U> * iFddCore<K,T>::map( void * funcP, fddOpType op){
623  fdd<U> * newFdd;
624  auto start = system_clock::now();
625 
626  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
627  newFdd = new fdd<U>(*context);
628  }else{
629  if (dataAlloc.empty()) dataAlloc = context->getAllocation(size);
630  newFdd = new fdd<U>(*context, size, dataAlloc);
631  }
632 
633  return (fdd<U> *) _map(funcP, op, newFdd, start);
634  }
635 
636  template <typename K, typename T>
637  std::unordered_map<K,size_t> iFddCore<K,T>::countByKey(){
638  //std::cerr << " Count By Key";
639  fastCommBuffer decoder(0);
640  unsigned long int tid, sid;
641  std::unordered_map<K,size_t> count;
642 
643  auto start = system_clock::now();
644  context->enqueueTask(OP_CountByKey, id, this->size);
645 
646  auto result = context->recvTaskResult(tid, sid, start);
647 
648  for (int i = 1; i < context->numProcs(); ++i){
649 
650  K key;
651  size_t kCount, numKeys;
652  decoder.setBuffer(result[i].first, result[i].second);
653  decoder >> numKeys;
654 
655  for ( size_t i = 0; i < numKeys; ++i ) {
656  decoder >> key >> kCount;
657  //auto it = count.find(key);
658  //if (it != count.end())
659  // count[key] += kCount;
660  //else
661  // count[key] = kCount;
662  count[key] += kCount;
663  }
664  }
665 
666  //std::cerr << "\n";
667  return count;
668  }
669 
670 
671 
672  template <typename K, typename T>
673  std::unordered_map<K, std::tuple<size_t, int, size_t>> * iFddCore<K,T>::calculateKeyCount(std::vector< std::pair<void *, size_t> > & result){
674  fastCommBuffer decoder(0);
675 
676  auto * count = new std::unordered_map< K, std::tuple<size_t, int, size_t> >();
677  count->reserve(this->size);
678 
679  for (int i = 1; i < context->numProcs(); ++i){
680  K key;
681  size_t kCount, numKeys;
682 
683  if (result[i].second == 0) continue;
684 
685  decoder.setBuffer(result[i].first, result[i].second);
686  decoder >> numKeys;
687 
688  for ( size_t j = 0; j < numKeys; ++j ) {
689 
690  decoder >> key >> kCount;
691  auto it = count->find(key);
692 
693  if (it != count->end()){
694 
695  int &owner = std::get<1>(it->second);
696  size_t &ownerCount = std::get<2>(it->second);
697 
698  std::get<0>(it->second) += kCount;
699 
700  // Fount the new majority owner
701  if (kCount > ownerCount){
702  owner = i;
703  ownerCount = kCount;
704  }
705 
706  }else{
707  (*count)[key] = std::make_tuple(kCount, i, kCount);
708  }
709  }
710  }
711 
712  return count;
713  }
714  template <typename K, typename T>
715  std::unordered_map<K, int> iFddCore<K,T>::calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> & count){
716  size_t size = this->size;
717  std::unordered_map<K, int> kMap(count.size());
718  std::unordered_map<K, bool> done;
719  size_t numProcs = context->numProcs();
720  std::vector<size_t> keyAlloc(numProcs,0);
721  std::vector<size_t> procBudget = context->getAllocation(size);
722 
723 
724  //std::cerr << " [ Budget: ";
725  //for ( int i = 1; i < numProcs; ++i)
726  //std::cerr << procBudget[i] << " ";
727  //std::cerr << "= " << size << "\n";
728 
729  for (auto it = count.begin(); it != count.end(); it++){
730  K key = it->first;
731  size_t kCount = std::get<0>(it->second);
732  int preffered = std::get<1>(it->second);
733 
734  if(keyAlloc[preffered] < procBudget[preffered]){
735  kMap[key] = preffered;
736  keyAlloc [preffered] += kCount;
737  //count.erase(key);
738  done[key] = true;
739  }else{
740  done[key] = false;
741  }
742 
743  }
744 
745  for (auto it = count.begin(); it != count.end(); it++){
746  K key = it->first;
747  if (! done[key]){
748  size_t kCount = std::get<0>(it->second);
749  int preffered = 1 + rand() % (numProcs - 1);
750 
751  while(keyAlloc[preffered] >= (procBudget[preffered] + 1)){
752  //preffered = 1 + rand() % (numProcs - 1);
753  preffered = (preffered + 1) % numProcs;
754  }
755  kMap[key] = preffered;
756  keyAlloc [preffered] += kCount;
757  }
758  }
759  /*std::cerr << " [ Alloc: ";
760  for ( int i = 1; i < numProcs; ++i)
761  std::cerr << keyAlloc[i] << " ";
762  std::cerr << "\n";
763  std::cerr << " [ Map: ";
764  for (auto it = kMap.begin(); it != kMap.end(); it++){
765  std::cerr << it->first << ":" << it->second << " ";
766  }
767  std::cerr << " ]\n"; */
768 
769 
770  return kMap;
771  }
772 
773  template <typename K, typename T>
775  unsigned long int tid, sid;
776 
777  if (! groupedByKey){
778  using std::chrono::system_clock;
779  using std::chrono::duration_cast;
780  using std::chrono::milliseconds;
781  std::cerr << " GroupByKey ";
782  auto start = system_clock::now();
783 
784  context->enqueueTask(OP_CountByKey, id, this->size);
785 
786  auto result = context->recvTaskResult(tid, sid, start);
787  std::cerr << " CBK:" << duration_cast<milliseconds>(system_clock::now() - start).count();
788  start = system_clock::now();
789 
790  // Get a count by key with majority owner consideration
791  auto * count = calculateKeyCount(result);
792  std::cerr << " proc.Keys:" << duration_cast<milliseconds>(system_clock::now() - start).count();
793  auto start2 = system_clock::now();
794 
795  std::unordered_map<K, int> keyMap = calculateKeyMap(*count);
796  delete count;
797  std::cerr << " calc.KeyMap:" << duration_cast<milliseconds>(system_clock::now() - start2).count();
798  start2 = system_clock::now();
799 
800  // Migrate data according to key ownership
801  unsigned long int tid = context->enqueueTask(OP_GroupByKey, id, this->size);
802  std::cerr << " enq.Task:" << duration_cast<milliseconds>(system_clock::now() - start2).count();
803  start2 = system_clock::now();
804  context->sendKeyMap(tid, keyMap);
805  keyMap.clear();
806  std::cerr << " snd.KeyMap:" << duration_cast<milliseconds>(system_clock::now() - start2).count();
807 
808  dataAlloc.resize(context->numProcs());
809  result = context->recvTaskResult(tid, sid, start);
810  size_t newSize = 0;
811  for (int i = 1; i < context->numProcs(); ++i){
812  if (result[i].second > 0){
813  dataAlloc[i] = * (size_t*) result[i].first;
814  newSize += dataAlloc[i];
815  }
816  }
817  size = newSize;
818  groupedByKey = true;
819  }
820  //std::cerr << ". ";
821  return (indexedFdd<K,T> *)this;
822  }
823  template <typename K, typename T>
825  unsigned long int tid, sid;
826 
827  if (! groupedByKey){
828  auto start = system_clock::now();
829  //std::cerr << " GroupByKeyHashed \n";
830 
831 
832  // Migrate data according to key ownership
833  tid = context->enqueueTask(OP_GroupByKeyH, id, this->size);
834 
835  auto result = context->recvTaskResult(tid, sid, start);
836  size_t newSize = 0;
837  dataAlloc.resize(context->numProcs());
838  for (int i = 1; i < context->numProcs(); ++i){
839  if (result[i].second > 0){
840  dataAlloc[i] = * (size_t*) result[i].first;
841  newSize += dataAlloc[i];
842  //std::cerr << dataAlloc[i] << " ";
843  }
844  }
845  size = newSize;
846  groupedByKey = true;
847  }
848  //std::cerr << ". ";
849 
850  return (indexedFdd<K,T> *)this;
851  }
852  template <typename K, typename T>
854  if (groupedByMap)
855  return groupByKeyMapped();
856  else
857  return groupByKeyHashed();
858  }
859 
860  template <typename K, typename T>
861  void iFddCore<K,T>::writeToFile(std::string path, std::string sufix){
862  context->writeToFile(id, path, sufix);
863  }
864 
865 
866  template <typename K, typename T>
867  std::pair <K,T> indexedFdd<K,T>::finishReduces(char ** partResult, size_t * pSize, int funcId, fddOpType op){
868  std::pair <K,T> result;
869 
870  if (op == OP_Reduce){
871  IreduceIFunctionP<K, T> reduceFunc = (IreduceIFunctionP<K, T>) this->context->funcTable[funcId];
872  fastCommBuffer buffer(0);
873 
874  // Get the real object behind the buffer
875  buffer.setBuffer(partResult[0], pSize[0]);
876  buffer >> result;
877 
878  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
879  std::pair <K,T> pr;
880 
881  //std::cerr << " BUFFER: " << size_t(buffer.pos()) << " " << buffer.size() << "\n";
882  buffer.setBuffer(partResult[i], pSize[i]);
883  buffer >> pr;
884 
885  result = reduceFunc(result.first, result.second, pr.first, pr.second);
886  }
887  }else{
888  IbulkReduceIFunctionP<K, T> bulkReduceFunc = (IbulkReduceIFunctionP<K, T>) this->context->funcTable[funcId];
889  T * vals = new T[this->context->numProcs() - 1];
890  K * keys = new K[this->context->numProcs() - 1];
891 
892  //#pragma omp parallel for
893  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
894  fastCommBuffer buffer(0);
895  std::pair <K,T> pr;
896 
897  buffer.setBuffer(partResult[i], pSize[i]);
898  buffer >> pr;
899 
900  keys[i] = pr.first;
901  vals[i] = pr.second;
902  }
903 
904  result = bulkReduceFunc(keys, vals, this->context->numProcs() - 1);
905 
906  delete [] vals;
907  delete [] keys;
908  // TODO do bulkreduce
909  }
910 
911  return result;
912  }
913 
914  template <typename K, typename T>
915  std::pair <K,T> indexedFdd<K,T>::reduce( void * funcP, fddOpType op){
916  //std::cerr << " Reduce \n";
917  std::pair <K,T> result;
918  int funcId = this->context->findFunc(funcP);
919  char ** partResult = new char*[this->context->numProcs() - 1];
920  size_t * rSize = new size_t[this->context->numProcs() - 1];
921  unsigned long int tid, sid;
922 
923  // Send task
924  auto start = system_clock::now();
925  unsigned long int reduceTaskId UNUSED = this->context->enqueueTask(op, this->id, 0, funcId, this->size);
926 
927  // Receive results
928  auto resultV = this->context->recvTaskResult(tid, sid, start);
929 
930  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
931  partResult[i] = (char*) resultV[i + 1].first;
932  rSize[i] = resultV[i + 1].second;
933  }
934 
935  // Finish applying reduces
936  result = finishReduces(partResult, rSize, funcId, op);
937 
938  delete [] partResult;
939  delete [] rSize;
940 
941  if (!this->cached)
942  this->discard();
943 
944 
945  //std::cerr << "\n";
946  return result;
947  }
948 
949 
950  template <typename K, typename T>
951  std::tuple <K,T,size_t> indexedFdd<K,T*>::finishReducesP(char ** partResult, size_t * pSize, int funcId, fddOpType op){
952  std::tuple <K,T,size_t> result;
953 
954  if (op == OP_Reduce){
955  IPreduceIPFunctionP<K,T> reduceFunc = (IreduceIFunctionP<K,T>) this->context->funcTable[funcId];
956  fastCommBuffer buffer(0);
957 
958  buffer.setBuffer(partResult[0], pSize[0]);
959  buffer >> result;
960 
961  //#pragma omp parallel for
962  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
963  std::tuple <K,T,size_t> pr;
964 
965  buffer.setBuffer(partResult[i], pSize[i]);
966  buffer >> pr;
967 
968  result = reduceFunc(
969  std::get<0>(result),
970  std::get<1>(result),
971  std::get<2>(result),
972  std::get<0>(pr),
973  std::get<1>(pr),
974  std::get<2>(pr));
975  }
976  }else{
977  IPbulkReduceIPFunctionP<K,T> bulkReduceFunc = (IPbulkReduceIPFunctionP<K,T>) this->context->funcTable[funcId];
978  T * vals = new T[this->context->numProcs() - 1];
979  K * keys = new K[this->context->numProcs() - 1];
980  size_t * sizes = new size_t[this->context->numProcs() - 1];
981 
982  //#pragma omp parallel for
983  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
984  fastCommBuffer buffer(0);
985  std::tuple <K,T,size_t> pr;
986 
987  buffer.setBuffer(partResult[i], pSize[i]);
988  buffer >> pr;
989 
990  std::tie (keys[i], vals[i], sizes[i]) = pr;
991  }
992 
993  result = bulkReduceFunc(keys, vals, sizes, this->context->numProcs() - 1);
994  // TODO do bulkreduce
995  }
996 
997  return result;
998  }
999 
1000  template <typename K, typename T>
1001  std::tuple <K,T,size_t> indexedFdd<K,T*>::reduceP( void * funcP, fddOpType op){
1002  auto start = system_clock::now();
1003  //std::cerr << " Reduce ";
1004  std::tuple <K,T,size_t> result;
1005  unsigned long int tid, sid;
1006  int funcId = this->context->findFunc(funcP);
1007  char ** partResult = new char *[this->context->numProcs() - 1];
1008  size_t * partrSize = new size_t[this->context->numProcs() - 1];
1009 
1010  // Send task
1011  unsigned long int reduceTaskId = this->context->enqueueTask(op, this->id, 0, funcId, this->size);
1012 
1013  // Receive results
1014  auto resultV = this->context->recvTaskResult(tid, sid, start);
1015 
1016  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
1017  partResult[i] = (T*) resultV[i+1].first;
1018  partrSize[i] = resultV[i+1].second /= sizeof(T);
1019  }
1020 
1021  // Finish applying reduces
1022  result = finishReducesP(partResult, partrSize, funcId, op);
1023 
1024  delete [] partResult;
1025  delete [] partrSize;
1026 
1027  if (!this->cached)
1028  this->discard();
1029 
1030  //std::cerr << "\n";
1031  return result;
1032  }
1033 
1034 }
1035 
1036 #endif
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41
fdd< U > * map(mapIFunctionP< K, T, U > funcP)
creates a fdd<U>
Definition: indexedFdd.h:253
indexedFdd< L, U > * mapByKey(IPmapByKeyIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
Definition: indexedFdd.h:272
indexedFdd(fastContext &c, std::string)
Create a indexedFdd from a file.
Definition: indexedFdd.h:198
indexedFdd< L, U > * flatMap(IflatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
Definition: indexedFdd.h:323
fdd< U > * mapByKey(PmapByKeyIFunctionP< K, T, U > funcP)
creates a fdd<U *>
Definition: indexedFdd.h:284
void setGroupedByMap(bool gbm)
(UNUSED)
Definition: indexedFdd.h:160
fdd< U > * bulkMap(bulkMapIFunctionP< K, T, U > funcP)
creates a fdd<U>
Definition: indexedFdd.h:305
indexedFdd(fastContext &c)
Create a empty indexedFdd.
Definition: indexedFdd.h:178
std::unordered_map< K, size_t > countByKey()
Count how many unique key there is in the dataset.
Definition: indexedFdd.h:637
indexedFdd< K, T > * update(updateIFunctionP< K, T > funcP)
updates the content of a indexedFDD
Definition: indexedFdd.h:230
int getId()
Returns the identification number of the dataset.
Definition: fddBase.h:27
void writeToFile(std::string path, std::string sufix)
Writes FDD content to file.
Definition: indexedFdd.h:861
indexedFdd(fastContext &c, size_t s)
Create a empty indexedFdd with a pre allocated size.
Definition: indexedFdd.h:190
std::vector< std::pair< K, T > > collect()
Brings the distributted data from a indexedFDD to the driver memory.
Definition: indexedFdd.h:211
Framework context class.
Definition: fastContext.h:66
indexedFdd< L, U > * mapByKey(ImapByKeyIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
Definition: indexedFdd.h:266
std::pair< K, T > bulkReduce(IbulkReduceIFunctionP< K, T > funcP)
summarizes a fdd<K,T> into a single value of type T using a bulk function pair<K,T> F(K...
Definition: indexedFdd.h:387
void setGroupedByKey(bool gbk)
(UNUSED)
Definition: indexedFdd.h:156
indexedFdd< K, T > * groupByKey()
Groups distributed dataset by key.
Definition: indexedFdd.h:853
bool isGroupedByKey()
Determines if a dataset is grouped by key.
Definition: indexedFdd.h:152
fdd< U > * map(PmapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
Definition: indexedFdd.h:258
std::pair< K, T > reduce(IreduceIFunctionP< K, T > funcP)
summarizes a fdd<K,T> into a single value of type T
Definition: indexedFdd.h:377
groupedFdd< K > * cogroup(iFddCore< K, U > *fdd1, iFddCore< K, V > *fdd2)
Groupes tree datasets together according with the keys of the first dataset.
Definition: indexedFdd.h:114
indexedFdd< L, U > * map(ImapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
Definition: indexedFdd.h:243
fdd< U > * bulkFlatMap(PbulkFlatMapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
Definition: indexedFdd.h:366
indexedFdd< L, U > * flatMap(IPflatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
Definition: indexedFdd.h:328
void discard()
deallocates previously cached FDD
Definition: indexedFdd.h:135
fdd< U > * bulkMap(PbulkMapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
Definition: indexedFdd.h:311
indexedFdd< L, U > * bulkFlatMap(IbulkFlatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
Definition: indexedFdd.h:348
indexedFdd< L, U > * bulkMap(IPbulkMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
Definition: indexedFdd.h:299
groupedFdd< K > * cogroup(iFddCore< K, U > *fdd1)
Groupes two datasets twogether according with the keys of the first dataset.
Definition: indexedFdd.h:95
fdd< U > * flatMap(flatMapIFunctionP< K, T, U > funcP)
creates a fdd<U>
Definition: indexedFdd.h:333
~indexedFdd()
Class Destructor. WARNING: It will deallocate distributed memory.
Definition: indexedFdd.h:204
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
Definition: fastContext.h:24
libfaster main namespace
Definition: _workerFdd.h:11
fdd< U > * mapByKey(mapByKeyIFunctionP< K, T, U > funcP)
creates a fdd<U>
Definition: indexedFdd.h:278
indexedFdd< L, U > * bulkMap(IbulkMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
Definition: indexedFdd.h:293
indexedFdd(fastContext &c, K *keys, T *data, size_t size)
Create a indexedFdd from a array in memory.
Definition: indexedFdd.h:193
indexedFdd< L, U > * bulkFlatMap(IPbulkFlatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
Definition: indexedFdd.h:354
fdd< U > * bulkFlatMap(bulkFlatMapIFunctionP< K, T, U > funcP)
creates a fdd<U>
Definition: indexedFdd.h:360
indexedFdd(fastContext &c, size_t s, const std::vector< size_t > &dataAlloc)
Create a empty indexedFdd with a pre allocated size.
Definition: indexedFdd.h:184
indexedFdd< L, U > * map(IPmapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
Definition: indexedFdd.h:248
indexedFdd< K, T > * cache()
Prevents automatic memory deallocation from hapenning.
Definition: indexedFdd.h:222
fdd< U > * flatMap(PflatMapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
Definition: indexedFdd.h:338