Faster  0.0.4Alpha
Superfastdistributtedcomputing
indexedFdd.h
1 #ifndef LIBFASTER_INDEXEDFDD_H
2 #define LIBFASTER_INDEXEDFDD_H
3 
4 #include <vector>
5 #include <typeinfo>
6 #include <stdio.h>
7 #include <list>
8 #include <tuple>
9 #include <memory>
10 
11 #include "definitions.h"
12 #include "fddBase.h"
13 #include "fastContext.h"
14 #include "misc.h"
15 
16 namespace faster{
17 
18  class fastContext;
19 
20  template<typename K>
21  class groupedFdd;
22 
23  template <typename K, typename T>
24  class indexedFdd ;
25 
26  // Driver side FDD
27  // It just sends commands to the workers.
28  template <typename K, typename T>
29  class iFddCore : public fddBase{
30 
31  protected:
32  bool groupedByKey;
33  std::shared_ptr<std::unordered_map<K, int>> keyMap;
34  fastContext * context;
35 
36  iFddCore() {
37  _kType = decodeType(typeid(K).hash_code());
38  groupedByKey = false;
39  cached = false;
40  }
41 
42  // Create a empty fdd
43  iFddCore(fastContext &c) {
44  _kType = decodeType(typeid(K).hash_code());
45  groupedByKey = false;
46  cached = false;
47  context = &c;
48  }
49 
50  // Create a empty fdd with a pre allocated size
51  iFddCore(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) {
52  _kType = decodeType(typeid(K).hash_code());
53  groupedByKey = false;
54  cached = false;
55  context = &c;
56  this->size = s;
57  this->dataAlloc = dataAlloc;
58  }
59 
60  virtual ~iFddCore(){
61  }
62 
63  std::shared_ptr<std::unordered_map<K, int>> calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> count);
64 
65  // -------------- Core FDD Functions --------------- //
66  fddBase * _map( void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start);
67  template <typename U>
68  fdd<U> * map( void * funcP, fddOpType op);
69  template <typename L, typename U>
70  indexedFdd<L,U> * mapI( void * funcP, fddOpType op);
71 
72  public:
73  //template<typename... FddTypes, typename... Args>
74  //groupedFdd<K, T, FddTypes...> * cogroup(Args * ... args){
75  //return new groupedFdd<K, T, FddTypes...>(context, this, args...);
76  //}
77  template<typename U>
78  groupedFdd<K> * cogroup(iFddCore<K,U> * fdd1){
79  auto start = system_clock::now();
80 
81  this->groupByKey();
82 
83  return new groupedFdd<K>(context, this, fdd1, keyMap, start);
84  }
85 
86  template<typename U, typename V>
87  groupedFdd<K> * cogroup(iFddCore<K,U> * fdd1, iFddCore<K,V> * fdd2){
88  auto start = system_clock::now();
89 
90  this->groupByKey();
91 
92  return new groupedFdd<K>(context, this, fdd1, fdd2, keyMap, start);
93  }
94 
95  std::unordered_map<K, size_t> countByKey();
96 
97  indexedFdd<K,T> * groupByKey();
98 
99  void discard(){
100  //std::cerr << "\033[0;31mDEL" << id << "\033[0m ";
101  context->discardFDD(id);
102  this->keyMap.reset();
103  }
104 
105  void * getKeyMap(void) {
106  return &this->keyMap;
107  }
108  void setKeyMap(void * keyMap) {
109  this->keyMap = * ( std::shared_ptr<std::unordered_map<K, int>> * ) keyMap;
110  }
111  bool isGroupedByKey() {
112  return groupedByKey;
113  }
114  void setGroupedByKey(bool gbk) {
115  groupedByKey = gbk;
116  }
117 
118  };
119 
120  template <typename K, typename T>
121  class indexedFdd : public iFddCore<K,T>{
122  private:
123  std::pair <K,T> finishReduces(char ** partResult, size_t * pSize, int funcId, fddOpType op);
124  std::pair <K,T> reduce( void * funcP, fddOpType op);
125 
126 
127 
128  public:
129  // -------------- Constructors --------------- //
130 
131  // Create a empty fdd
132  indexedFdd(fastContext &c) : iFddCore<K,T>(c){
133  this->_tType = decodeType(typeid(T).hash_code());
134  this->id = c.createIFDD(this, typeid(K).hash_code(), typeid(T).hash_code());
135  }
136 
137  // Create a empty fdd with a pre allocated size
138  indexedFdd(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : iFddCore<K,T>(c, s, dataAlloc){
139  this->_tType = decodeType(typeid(T).hash_code());
140  this->id = c.createIFDD(this, typeid(K).hash_code(), typeid(T).hash_code(), dataAlloc);
141  }
142  indexedFdd(fastContext &c, size_t s) : indexedFdd(c, s, c.getAllocation(s)) { }
143 
144  // Create a fdd from a array in memory
145  indexedFdd(fastContext &c, K * keys, T * data, size_t size) : indexedFdd(c, size){
146  c.parallelizeI(this->id, keys, data, size);
147  }
148 
149  ~indexedFdd(){
150  }
151 
152  // -------------- FDD Functions --------------- //
153 
154  // -------------------- Map ------------------- //
155  // Map
156  template <typename L, typename U>
157  indexedFdd<L,U> * map( ImapIFunctionP<K,T,L,U> funcP ){
158  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_Map);
159  }
160  template <typename L, typename U>
161  indexedFdd<L,U> * map( IPmapIFunctionP<K,T,L,U> funcP ){
162  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_Map);
163  }
164  template <typename L, typename U>
165  fdd<U> * map( mapIFunctionP<K,T,U> funcP ){
166  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
167  }
168  template <typename L, typename U>
169  fdd<U> * map( PmapIFunctionP<K,T,U> funcP ){
170  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
171  }
172 
173  // MapByKey
174  template <typename L, typename U>
175  indexedFdd<L,U> * mapByKey( ImapByKeyIFunctionP<K,T,L,U> funcP ){
176  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_MapByKey);
177  }
178  template <typename L, typename U>
179  indexedFdd<L,U> * mapByKey( IPmapByKeyIFunctionP<K,T,L,U> funcP ){
180  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_MapByKey);
181  }
182  template <typename L, typename U>
183  fdd<U> * mapByKey( mapByKeyIFunctionP<K,T,U> funcP ){
184  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
185  }
186  template <typename L, typename U>
187  fdd<U> * mapByKey( PmapByKeyIFunctionP<K,T,U> funcP ){
188  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
189  }
190 
191 
192  // BulkMap
193  template <typename L, typename U>
194  indexedFdd<L,U> * bulkMap( IbulkMapIFunctionP<K,T,L,U> funcP ){
195  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
196  }
197  template <typename L, typename U>
198  indexedFdd<L,U> * bulkMap( IPbulkMapIFunctionP<K,T,L,U> funcP ){
199  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
200  }
201  template <typename L, typename U>
202  fdd<U> * bulkMap( bulkMapIFunctionP<K,T,U> funcP ){
203  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
204  }
205  template <typename L, typename U>
206  fdd<U> * bulkMap( PbulkMapIFunctionP<K,T,U> funcP ){
207  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
208  }
209 
210 
211  // FlatMap
212  template <typename L, typename U>
213  indexedFdd<L,U> * flatMap( IflatMapIFunctionP<K,T,L,U> funcP ){
214  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
215  }
216  template <typename L, typename U>
217  indexedFdd<L,U> * flatMap( IPflatMapIFunctionP<K,T,L,U> funcP ){
218  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
219  }
220  template <typename L, typename U>
221  fdd<U> * flatMap( flatMapIFunctionP<K,T,U> funcP ){
222  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
223  }
224  template <typename L, typename U>
225  fdd<U> * flatMap( PflatMapIFunctionP<K,T,U> funcP ){
226  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
227  }
228 
229 
230  template <typename L, typename U>
231  indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIFunctionP<K,T,L,U> funcP ){
232  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
233  }
234  template <typename L, typename U>
235  indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIFunctionP<K,T,L,U> funcP ){
236  return iFddCore<K,T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
237  }
238  template <typename L, typename U>
239  fdd<U> * bulkFlatMap( bulkFlatMapIFunctionP<K,T,U> funcP ){
240  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
241  }
242  template <typename L, typename U>
243  fdd<U> * bulkFlatMap( PbulkFlatMapIFunctionP<K,T,U> funcP ){
244  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
245  }
246 
247  // ------------------ Reduce ----------------- //
248 
249  // Run a Reduce
250  std::pair<K,T> reduce( IreduceIFunctionP<K,T> funcP ){
251  return reduce((void*) funcP, OP_Reduce);
252  }
253  //indexedFdd<K,T> * std::pair<K,T> reduceByKey( IreduceByKeyIFunctionP<K,T> funcP ){
254  //return reduceByKey((void*) funcP, OP_Reduce);
255  //}
256  std::pair<K,T> bulkReduce( IbulkReduceIFunctionP<K,T> funcP ){
257  return reduce((void*) funcP, OP_BulkReduce);
258  }
259 
260  // --------------- FDD Builtin functions ------------- //
261  // Collect a FDD
262  std::vector<std::pair<K,T>> collect( ){
263  //std::cerr << " \033[0;31mSIZE: " << this->size << "\033[0m";
264  std::vector<std::pair<K,T>> data(this->size);
265  this->context->collectFDD(data, this);
266  return data;
267  }
268 
269  indexedFdd<K,T> * cache(){
270  this->cached = true;
271  return this;
272  }
273  };
274 
275  template <typename K, typename T>
276  class indexedFdd<K,T *> : public iFddCore<K,T*>{
277  private:
278  std::tuple <K,T,size_t> finishReducesP(char ** partResult, size_t * pSize, int funcId, fddOpType op);
279  std::tuple <K,T,size_t> reduceP( void * funcP, fddOpType op);
280 
281  public:
282  // -------------- Constructors --------------- //
283  //
284  // Create a empty fdd
286  this->_tType = POINTER | decodeType(typeid(T).hash_code());
287  this->id = c.createIPFDD(this, typeid(K).hash_code(), typeid(T).hash_code());
288  }
289 
290  // Create a empty fdd with a pre allocated size
291  indexedFdd(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : iFddCore<K,T *>(c, s, dataAlloc){
292  this->_tType = POINTER | decodeType(typeid(T).hash_code());
293  this->id = c.createIPFDD(this, typeid(K).hash_code(), typeid(T).hash_code(), c.getAllocation(s));
294  }
295  indexedFdd(fastContext &c, size_t s) : indexedFdd(c, s, c.getAllocation(s)) {
296  }
297 
298  // Create a fdd from a array in memory
299  indexedFdd(fastContext &c, K * keys, T ** data, size_t * dataSizes, size_t size) : indexedFdd(c, size){
300  c.parallelizeI(this->id, keys, data, dataSizes, size);
301  }
302 
303  ~indexedFdd(){
304  }
305 
306 
307  // -------------- FDD Functions Parameter Specification --------------- //
308  // These need to be specialized because they can return a pointer or not
309 
310  // -------------------- Map ------------------- //
311 
312  // Map
313  template <typename L, typename U>
314  indexedFdd<L,U> * map( ImapIPFunctionP<K,T,L,U> funcP ){
315  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_Map);
316  }
317  template <typename L, typename U>
318  indexedFdd<L,U> * map( IPmapIPFunctionP<K,T,L,U> funcP ){
319  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_Map);
320  }
321  template <typename L, typename U>
322  fdd<U> * map( mapIPFunctionP<K,T,U> funcP ){
323  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
324  }
325  template <typename L, typename U>
326  fdd<U> * map( PmapIPFunctionP<K,T,U> funcP ){
327  return iFddCore<K,T>::template map<U>((void*) funcP, OP_Map);
328  }
329 
330  // MapByKey
331  template <typename L, typename U>
332  indexedFdd<L,U> * mapByKey( ImapByKeyIPFunctionP<K,T,L,U> funcP ){
333  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_MapByKey);
334  }
335  template <typename L, typename U>
336  indexedFdd<L,U> * mapByKey( IPmapByKeyIPFunctionP<K,T,L,U> funcP ){
337  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_MapByKey);
338  }
339  template <typename L, typename U>
340  fdd<U> * mapByKey( mapByKeyIPFunctionP<K,T,U> funcP ){
341  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
342  }
343  template <typename L, typename U>
344  fdd<U> * mapByKey( PmapByKeyIPFunctionP<K,T,U> funcP ){
345  return iFddCore<K,T>::template map<U>((void*) funcP, OP_MapByKey);
346  }
347 
348 
349  template <typename L, typename U>
350  indexedFdd<L,U> * bulkMap( IbulkMapIPFunctionP<K,T,L,U> funcP ){
351  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkMap);
352  }
353  template <typename L, typename U>
354  indexedFdd<L,U> * bulkMap( IPbulkMapIPFunctionP<K,T,L,U> funcP ){
355  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkMap);
356  }
357  template <typename L, typename U>
358  fdd<U> * bulkMap( bulkMapIPFunctionP<K,T,U> funcP ){
359  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
360  }
361  template <typename L, typename U>
362  fdd<U> * bulkMap( PbulkMapIPFunctionP<K,T,U> funcP ){
363  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkMap);
364  }
365 
366 
367  template <typename L, typename U>
368  indexedFdd<L,U> * flatMap( IflatMapIPFunctionP<K,T,L,U> funcP){
369  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_FlatMap);
370  }
371  template <typename L, typename U>
372  indexedFdd<L,U> * flatMap( IPflatMapIPFunctionP<K,T,L,U> funcP){
373  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_FlatMap);
374  }
375  template <typename L, typename U>
376  fdd<U> * flatMap( flatMapIPFunctionP<K,T,U> funcP){
377  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
378  }
379  template <typename L, typename U>
380  fdd<U> * flatMap( PflatMapIPFunctionP<K,T,U> funcP){
381  return iFddCore<K,T>::template map<U>((void*) funcP, OP_FlatMap);
382  }
383 
384 
385  template <typename L, typename U>
386  indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIPFunctionP<K,T,L,U> funcP){
387  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkFlatMap);
388  }
389  template <typename L, typename U>
390  indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIPFunctionP<K,T,L,U> funcP){
391  return iFddCore<K,T>::template map<L,U>((void*) funcP, OP_BulkFlatMap);
392  }
393  template <typename L, typename U>
394  fdd<U> * bulkFlatMap( bulkFlatMapIPFunctionP<K,T,U> funcP){
395  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
396  }
397  template <typename L, typename U>
398  fdd<U> * bulkFlatMap( PbulkFlatMapIPFunctionP<K,T,U> funcP){
399  return iFddCore<K,T>::template map<U>((void*) funcP, OP_BulkFlatMap);
400  }
401 
402  // ------------------ Reduce ----------------- //
403 
404  // Run a Reduce
405  inline std::vector<std::pair<K,T>> reduce(IPreduceIPFunctionP<K,T> funcP ){
406  return reduce((void*) funcP, OP_Reduce);
407  }
408  //inline indexedFdd<K,T> reduceByKey(IPreduceByKeyIPFunctionP<K,T> funcP ){
409  //return reduceByKey((void*) funcP, OP_Reduce);
410  //}
411  inline std::vector<std::pair<K,T>> bulkReduce(IPbulkReduceIPFunctionP<K,T> funcP ){
412  return reduce((void*) funcP, OP_BulkReduce);
413  }
414 
415  // --------------- FDD Builtin functions ------------- //
416  // Collect a FDD
417  std::vector<std::tuple<K,T*, size_t>> collect( ) {
418  std::vector<std::tuple<K,T*, size_t>> data(this->size);
419  this->context->collectFDD(data, this);
420  return data;
421  }
422 
423  indexedFdd<K,T*> * cache(){
424  this->cached = true;
425  return this;
426  }
427 
428  };
429 
430 
431  template <typename K, typename T>
432  fddBase * iFddCore<K,T>::_map( void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start){
433  //std::cerr << " Map ";
434  unsigned long int tid, sid;
435  unsigned long int newFddId = newFdd->getId();
436 
437  // Decode function pointer
438  int funcId = context->findFunc(funcP);
439 
440  // Send task
441  context->enqueueTask(op, id, newFddId, funcId, this->size);
442 
443  // Receive results
444  auto result = context->recvTaskResult(tid, sid, start);
445 
446  if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ) {
447  size_t newSize = 0;
448 
449  for (int i = 1; i < context->numProcs(); ++i){
450  if (result[i].second > 0) newSize += * (size_t*) result[i].first;
451  }
452 
453  newFdd->setSize(newSize);
454  }
455 
456  if (!cached)
457  this->discard();
458 
459  //std::cerr << "\n";
460  return newFdd;
461  }
462  template <typename K, typename T>
463  template <typename L, typename U>
464  indexedFdd<L,U> * iFddCore<K,T>::mapI( void * funcP, fddOpType op){
465  indexedFdd<L,U> * newFdd;
466  auto start = system_clock::now();
467 
468  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
469  newFdd = new indexedFdd<L,U>(*context);
470  }else{
471  newFdd = new indexedFdd<L,U>(*context, size, dataAlloc);
472  }
473 
474  return (indexedFdd<L,U> *) _map(funcP, op, newFdd, start);
475  }
476 
477  template <typename K, typename T>
478  template <typename U>
479  fdd<U> * iFddCore<K,T>::map( void * funcP, fddOpType op){
480  fdd<U> * newFdd;
481  auto start = system_clock::now();
482 
483  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
484  newFdd = new fdd<U>(*context);
485  }else{
486  newFdd = new fdd<U>(*context, size, dataAlloc);
487  }
488 
489  return (fdd<U> *) _map(funcP, op, newFdd, start);
490  }
491 
492  template <typename K, typename T>
493  std::unordered_map<K,size_t> iFddCore<K,T>::countByKey(){
494  //std::cerr << " Count By Key";
495  fastCommBuffer decoder(0);
496  unsigned long int tid, sid;
497  std::unordered_map<K,size_t> count;
498 
499  auto start = system_clock::now();
500  context->enqueueTask(OP_CountByKey, id, this->size);
501 
502  auto result = context->recvTaskResult(tid, sid, start);
503 
504  for (int i = 1; i < context->numProcs(); ++i){
505 
506  K key;
507  size_t kCount, numKeys;
508  decoder.setBuffer(result[i].first, result[i].second);
509  decoder >> numKeys;
510 
511  for ( size_t i = 0; i < numKeys; ++i ) {
512  decoder >> key >> kCount;
513  //auto it = count.find(key);
514  //if (it != count.end())
515  // count[key] += kCount;
516  //else
517  // count[key] = kCount;
518  count[key] += kCount;
519  }
520  }
521 
522  //std::cerr << "\n";
523  return count;
524  }
525 
526  template <typename K, typename T>
527  std::shared_ptr<std::unordered_map<K, int>> iFddCore<K,T>::calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> count){
528  size_t size = this->size;
529  std::shared_ptr<std::unordered_map<K, int>> kMap = std::make_shared<std::unordered_map<K, int>>();
530  std::unordered_map<K, bool> done;
531  size_t numProcs = context->numProcs();
532  std::vector<size_t> keyAlloc(numProcs,0);
533  std::vector<size_t> procBudget = context->getAllocation(size);
534 
535  kMap->reserve(count.size());
536 
537  //std::cerr << " [ Budget: ";
538  //for ( int i = 1; i < numProcs; ++i)
539  //std::cerr << procBudget[i] << " ";
540  //std::cerr << "= " << size << "\n";
541 
542  for (auto it = count.begin(); it != count.end(); it++){
543  K key = it->first;
544  size_t kCount = std::get<0>(it->second);
545  int preffered = std::get<1>(it->second);
546 
547  if(keyAlloc[preffered] < procBudget[preffered]){
548  (*kMap)[key] = preffered;
549  keyAlloc [preffered] += kCount;
550  //count.erase(key);
551  done[key] = true;
552  }else{
553  done[key] = false;
554  }
555 
556  }
557 
558  for (auto it = count.begin(); it != count.end(); it++){
559  K key = it->first;
560  if (! done[key]){
561  size_t kCount = std::get<0>(it->second);
562  int preffered = 1 + rand() % (numProcs - 1);
563 
564  while(keyAlloc[preffered] >= procBudget[preffered]){
565  preffered = 1 + rand() % (numProcs - 1);
566  }
567  (*kMap)[key] = preffered;
568  keyAlloc [preffered] += kCount;
569  }
570  }
571  /*std::cerr << " [ Alloc: ";
572  for ( int i = 1; i < numProcs; ++i)
573  std::cerr << keyAlloc[i] << " ";
574  std::cerr << "\n";
575  std::cerr << " [ Map: ";
576  for (auto it = kMap.begin(); it != kMap.end(); it++){
577  std::cerr << it->first << ":" << it->second << " ";
578  }
579  std::cerr << " ]\n"; */
580 
581 
582  return kMap;
583  }
584 
585  template <typename K, typename T>
586  indexedFdd<K,T> * iFddCore<K,T>::groupByKey(){
587  fastCommBuffer decoder(0);
588  unsigned long int tid, sid;
589  // Key -> totalKeycount, maxowner, ownerCount
590 
591  if (! groupedByKey){
592  //std::cerr << " GroupByKey ";
593  auto * count = new std::unordered_map< K, std::tuple<size_t, int, size_t> >();
594  count->reserve(this->size);
595 
596  auto start = system_clock::now();
597  context->enqueueTask(OP_CountByKey, id, this->size);
598 
599  auto result = context->recvTaskResult(tid, sid, start);
600 
601  // Get a count by key with majority owner consideration
602  for (int i = 1; i < context->numProcs(); ++i){
603  K key;
604  size_t kCount, numKeys;
605 
606  if (result[i].second == 0) continue;
607 
608  decoder.setBuffer(result[i].first, result[i].second);
609  decoder >> numKeys;
610 
611  for ( size_t i = 0; i < numKeys; ++i ) {
612 
613  decoder >> key >> kCount;
614  auto it = count->find(key);
615 
616  if (it != count->end()){
617 
618  int &owner = std::get<1>(it->second);
619  size_t &ownerCount = std::get<2>(it->second);
620 
621  std::get<0>(it->second) += kCount;
622 
623  // Fount the new majority owner
624  if (kCount > ownerCount){
625  owner = sid;
626  ownerCount = kCount;
627  }
628 
629  }else{
630  (*count)[key] = std::make_tuple(kCount, sid, kCount);
631  }
632  }
633  }
634  this->keyMap = calculateKeyMap(*count);
635  delete count;
636 
637  // Migrate data according to key ownership
638  unsigned long int tid = context->enqueueTask(OP_GroupByKey, id, this->size);
639  context->sendKeyMap(tid, *keyMap);
640 
641  result = context->recvTaskResult(tid, sid, start);
642  groupedByKey = true;
643  }
644  //std::cerr << ". ";
645  return (indexedFdd<K,T> *)this;
646  }
647 
648  template <typename K, typename T>
649  std::pair <K,T> indexedFdd<K,T>::finishReduces(char ** partResult, size_t * pSize, int funcId, fddOpType op){
650  std::pair <K,T> result;
651 
652  if (op == OP_Reduce){
653  IreduceIFunctionP<K, T> reduceFunc = (IreduceIFunctionP<K, T>) this->context->funcTable[funcId];
654  fastCommBuffer buffer(0);
655 
656  // Get the real object behind the buffer
657  buffer.setBuffer(partResult[0], pSize[0]);
658  buffer >> result;
659 
660  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
661  std::pair <K,T> pr;
662 
663  //std::cerr << " BUFFER: " << size_t(buffer.pos()) << " " << buffer.size() << "\n";
664  buffer.setBuffer(partResult[i], pSize[i]);
665  buffer >> pr;
666 
667  result = reduceFunc(result.first, result.second, pr.first, pr.second);
668  }
669  }else{
670  IbulkReduceIFunctionP<K, T> bulkReduceFunc = (IbulkReduceIFunctionP<K, T>) this->context->funcTable[funcId];
671  T * vals = new T[this->context->numProcs() - 1];
672  K * keys = new K[this->context->numProcs() - 1];
673 
674  //#pragma omp parallel for
675  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
676  fastCommBuffer buffer(0);
677  std::pair <K,T> pr;
678 
679  buffer.setBuffer(partResult[i], pSize[i]);
680  buffer >> pr;
681 
682  keys[i] = pr.first;
683  vals[i] = pr.second;
684  }
685 
686  result = bulkReduceFunc(keys, vals, this->context->numProcs() - 1);
687 
688  delete [] vals;
689  delete [] keys;
690  // TODO do bulkreduce
691  }
692 
693  return result;
694  }
695 
696  template <typename K, typename T>
697  std::pair <K,T> indexedFdd<K,T>::reduce( void * funcP, fddOpType op){
698  //std::cerr << " Reduce \n";
699  std::pair <K,T> result;
700  int funcId = this->context->findFunc(funcP);
701  char ** partResult = new char*[this->context->numProcs() - 1];
702  size_t * rSize = new size_t[this->context->numProcs() - 1];
703  unsigned long int tid, sid;
704 
705  // Send task
706  auto start = system_clock::now();
707  unsigned long int reduceTaskId UNUSED = this->context->enqueueTask(op, this->id, 0, funcId, this->size);
708 
709  // Receive results
710  auto resultV = this->context->recvTaskResult(tid, sid, start);
711 
712  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
713  partResult[i] = (char*) resultV[i + 1].first;
714  rSize[i] = resultV[i + 1].second;
715  }
716 
717  // Finish applying reduces
718  result = finishReduces(partResult, rSize, funcId, op);
719 
720  delete [] partResult;
721  delete [] rSize;
722 
723  if (!this->cached)
724  this->discard();
725 
726 
727  //std::cerr << "\n";
728  return result;
729  }
730 
731 
732  template <typename K, typename T>
733  std::tuple <K,T,size_t> indexedFdd<K,T*>::finishReducesP(char ** partResult, size_t * pSize, int funcId, fddOpType op){
734  std::tuple <K,T,size_t> result;
735 
736  if (op == OP_Reduce){
737  IPreduceIPFunctionP<K,T> reduceFunc = (IreduceIFunctionP<K,T>) this->context->funcTable[funcId];
738  fastCommBuffer buffer(0);
739 
740  buffer.setBuffer(partResult[0], pSize[0]);
741  buffer >> result;
742 
743  //#pragma omp parallel for
744  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
745  std::tuple <K,T,size_t> pr;
746 
747  buffer.setBuffer(partResult[i], pSize[i]);
748  buffer >> pr;
749 
750  result = reduceFunc(
751  std::get<0>(result),
752  std::get<1>(result),
753  std::get<2>(result),
754  std::get<0>(pr),
755  std::get<1>(pr),
756  std::get<2>(pr));
757  }
758  }else{
759  IPbulkReduceIPFunctionP<K,T> bulkReduceFunc = (IPbulkReduceIPFunctionP<K,T>) this->context->funcTable[funcId];
760  T * vals = new T[this->context->numProcs() - 1];
761  K * keys = new K[this->context->numProcs() - 1];
762  size_t * sizes = new size_t[this->context->numProcs() - 1];
763 
764  //#pragma omp parallel for
765  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
766  fastCommBuffer buffer(0);
767  std::tuple <K,T,size_t> pr;
768 
769  buffer.setBuffer(partResult[i], pSize[i]);
770  buffer >> pr;
771 
772  std::tie (keys[i], vals[i], sizes[i]) = pr;
773  }
774 
775  result = bulkReduceFunc(keys, vals, sizes, this->context->numProcs() - 1);
776  // TODO do bulkreduce
777  }
778 
779  return result;
780  }
781 
782  template <typename K, typename T>
783  std::tuple <K,T,size_t> indexedFdd<K,T*>::reduceP( void * funcP, fddOpType op){
784  auto start = system_clock::now();
785  //std::cerr << " Reduce ";
786  std::tuple <K,T,size_t> result;
787  unsigned long int tid, sid;
788  int funcId = this->context->findFunc(funcP);
789  char ** partResult = new char *[this->context->numProcs() - 1];
790  size_t * partrSize = new size_t[this->context->numProcs() - 1];
791 
792  // Send task
793  unsigned long int reduceTaskId = this->context->enqueueTask(op, this->id, 0, funcId, this->size);
794 
795  // Receive results
796  auto resultV = this->context->recvTaskResult(tid, sid, start);
797 
798  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
799  partResult[i] = (T*) resultV[i+1].first;
800  partrSize[i] = resultV[i+1].second /= sizeof(T);
801  }
802 
803  // Finish applying reduces
804  result = finishReducesP(partResult, partrSize, funcId, op);
805 
806  delete [] partResult;
807  delete [] partrSize;
808 
809  if (!this->cached)
810  this->discard();
811 
812  //std::cerr << "\n";
813  return result;
814  }
815 
816 }
817 
818 #endif
Definition: indexedFdd.h:276
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: fddBase.h:8
Definition: fastContext.h:23
Definition: _workerFdd.h:11