Faster  0.0.4Alpha
Superfastdistributtedcomputing
fdd.h
1 #ifndef LIBFASTER_FDD_H
2 #define LIBFASTER_FDD_H
3 
4 #include <vector>
5 #include <typeinfo>
6 #include <stdio.h>
7 #include <list>
8 #include <omp.h>
9 
10 
11 #include "definitions.h"
12 #include "fddBase.h"
13 #include "fastContext.h"
14 
15 namespace faster{
16 
17  class fastTask;
18 
19  template <class K, class T>
20  class indexedFdd ;
21 
22  template <typename T>
23  class fddCore : public fddBase {
24  protected :
25  fastContext * context;
26 
27  // -------------- Core FDD Functions --------------- //
28 
29  fddCore() {
30  cached = false;
31  }
32 
33  fddCore(fastContext & c);
34 
35  // Create a empty fdd with a pre allocated size
36  fddCore(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc);
37 
38  // 1->1 function (map, bulkmap, flatmap...)
39  fddBase * _map( void * funcP, fddOpType op, fddBase * newFdd);
40  template <typename L, typename U>
41  indexedFdd<L,U> * mapI( void * funcP, fddOpType op);
42  template <typename U>
43  fdd<U> * map( void * funcP, fddOpType op);
44 
45  public:
46 
47  void discard(){
48  context->discardFDD(id);
49  }
50  void * getKeyMap() { return NULL; }
51  void setKeyMap(void * keyMap UNUSED) {}
52  bool isGroupedByKey() { return false; }
53  void setGroupedByKey(bool gbk UNUSED) {}
54  };
55 
56  // Driver side FDD
57  // It just sends commands to the workers.
58  template <class T>
59  class fdd : public fddCore<T>{
60  private:
61  T finishReduces(char ** partResult, size_t * pSize, int funcId, fddOpType op);
62  T reduce( void * funcP, fddOpType op);
63  public:
64  // -------------- Constructors --------------- //
65 
66  // Create a empty fdd
67  fdd(fastContext &c) : fddCore<T>(c){
68  this->id = c.createFDD(this, typeid(T).hash_code());
69  }
70 
71  // Create a empty fdd with a pre allocated size
72  fdd(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : fddCore<T>(c, s, dataAlloc){
73  this->id = c.createFDD(this, typeid(T).hash_code(), dataAlloc);
74  }
75 
76  // Create a empty fdd with a pre allocated size
77  fdd(fastContext &c, size_t s) : fdd(c, s, c.getAllocation(s)) {
78  }
79 
80  // Create a fdd from a array in memory
81  fdd(fastContext &c, T * data, size_t size) : fdd(c, size){
82  c.parallelize(fddBase::id, data, size);
83  }
84 
85  // Create a fdd from a vector in memory
86  fdd(fastContext &c, std::vector<T> &dataV) : fdd(c, dataV.data(), dataV.size()){ }
87 
88  // Create a fdd from a file
89  fdd(fastContext &c, const char * fileName) ;
90 
91  ~fdd(){
92  }
93 
94  // -------------- FDD Functions --------------- //
95 
96  // Run a Map
97  template <typename U>
98  fdd<U> * map( mapFunctionP<T,U> funcP ){
99  return fddCore<T>::template map<U>((void*) funcP, OP_Map);
100  }
101  template <typename U>
102  fdd<U> * map( PmapFunctionP<T,U> funcP ){
103  return fddCore<T>::template map<U>((void*) funcP, OP_Map);
104  }
105  template <typename L, typename U>
106  indexedFdd<L,U> * map( ImapFunctionP<T,L,U> funcP ){
107  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_Map);
108  }
109  template <typename L, typename U>
110  indexedFdd<L,U> * map( IPmapFunctionP<T,L,U> funcP ){
111  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_Map);
112  }
113 
114 
115  template <typename U>
116  fdd<U> * bulkMap( bulkMapFunctionP<T,U> funcP ){
117  return fddCore<T>::template map<U>((void*) funcP, OP_BulkMap);
118  }
119  template <typename U>
120  fdd<U> * bulkMap( PbulkMapFunctionP<T,U> funcP ){
121  return fddCore<T>::template map<U>((void*) funcP, OP_BulkMap);
122  }
123  template <typename L, typename U>
124  indexedFdd<L,U> * bulkMap( IbulkMapFunctionP<T,L,U> funcP ){
125  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
126  }
127  template <typename L, typename U>
128  indexedFdd<L,U> * bulkMap( IPbulkMapFunctionP<T,L,U> funcP ){
129  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
130  }
131 
132 
133  template <typename U>
134  fdd<U> * flatMap( flatMapFunctionP<T,U> funcP ){
135  return fddCore<T>::template map<U>((void*) funcP, OP_FlatMap);
136  }
137  template <typename U>
138  fdd<U> * flatMap( PflatMapFunctionP<T,U> funcP ){
139  return fddCore<T>::template map<U>((void*) funcP, OP_FlatMap);
140  }
141  template <typename L, typename U>
142  indexedFdd<L,U> * flatMap( IflatMapFunctionP<T,L,U> funcP ){
143  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
144  }
145  template <typename L, typename U>
146  indexedFdd<L,U> * flatMap( IPflatMapFunctionP<T,L,U> funcP ){
147  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
148  }
149 
150 
151  template <typename U>
152  fdd<U> * bulkFlatMap( bulkFlatMapFunctionP<T,U> funcP ){
153  return fddCore<T>::template map<U>((void*) funcP, OP_BulkFlatMap);
154  }
155  template <typename U>
156  fdd<U> * bulkFlatMap( PbulkFlatMapFunctionP<T,U> funcP ){
157  return fddCore<T>::template map<U>((void*) funcP, OP_BulkFlatMap);
158  }
159  template <typename L, typename U>
160  indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapFunctionP<T,L,U> funcP ){
161  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
162  }
163  template <typename L, typename U>
164  indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapFunctionP<T,L,U> funcP ){
165  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
166  }
167 
168 
169  // Run a Reduce
170  T reduce( reduceFunctionP<T> funcP ){
171  return reduce((void*) funcP, OP_Reduce);
172  }
173  T bulkReduce( bulkReduceFunctionP<T> funcP ){
174  return reduce((void*) funcP, OP_BulkReduce);
175  }
176 
177  // --------------- FDD Builtin functions ------------- //
178  // Collect a FDD
179  std::vector<T> collect( ){
180  std::vector<T> data(this->size);
181  this->context->collectFDD(data, this);
182  return data;
183  }
184  /*void * _collect( ) override{
185  return fddCore<T>::context->collectFDD(this);
186  }*/
187  fdd<T> * cache(){
188  this->cached = true;
189  return this;
190  }
191 
192  };
193 
194  template <class T>
195  class fdd<T *> : public fddCore<T>{
196  private:
197  std::vector <T> finishPReduces(T ** partResult, size_t * partrSize, int funcId, fddOpType op);
198  std::vector <T> reduceP(void * funcP, fddOpType op);
199  public:
200  // -------------- Constructors --------------- //
201 
202  // Create a empty fdd
203  fdd(fastContext &c) : fddCore<T>(c){
204  this->id = c.createPFDD(this, typeid(T).hash_code());
205  }
206 
207  // Create a empty fdd with a pre allocated size
208  fdd(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : fddCore<T>(c, s, dataAlloc){
209  this->id = c.createPFDD(this, typeid(T).hash_code(), dataAlloc);
210  }
211  fdd(fastContext &c, size_t s) :fdd(c, s, c.getAllocation(s)) { }
212 
213  // Create a fdd from a array in memory
214  fdd(fastContext &c, T * data[], size_t dataSizes[], size_t size) : fdd(c, size){
215  c.parallelize(fddBase::id, data, dataSizes, size);
216  }
217 
218  ~fdd(){
219  }
220 
221 
222  // -------------- FDD Functions Parameter Specification --------------- //
223  // These need to be specialized because they can return a pointer or not
224 
225  // Run a Map
226  template <typename U>
227  fdd<U> * map( mapPFunctionP<T,U> funcP ){
228  return fddCore<T>::template map<U>((void*) funcP, OP_Map);
229  }
230  template <typename U>
231  fdd<U> * map( PmapPFunctionP<T,U> funcP ){
232  return fddCore<T>::template map<U>((void*) funcP, OP_Map);
233  }
234  template <typename L, typename U>
235  indexedFdd<L,U> * map( ImapPFunctionP<T,L,U> funcP ){
236  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_Map);
237  }
238  template <typename L, typename U>
239  indexedFdd<L,U> * map( IPmapPFunctionP<T,L,U> funcP ){
240  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_Map);
241  }
242 
243 
244  template <typename U>
245  fdd<U> * bulkMap( bulkMapPFunctionP<T,U> funcP ){
246  return fddCore<T>::template map<U>((void*) funcP, OP_BulkMap);
247  }
248  template <typename U>
249  fdd<U> * bulkMap( PbulkMapPFunctionP<T,U> funcP ){
250  return fddCore<T>::template map<U>((void*) funcP, OP_BulkMap);
251  }
252  template <typename L, typename U>
253  indexedFdd<L,U> * bulkMap( IbulkMapPFunctionP<T,L,U> funcP ){
254  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
255  }
256  template <typename L, typename U>
257  indexedFdd<L,U> * bulkMap( IPbulkMapPFunctionP<T,L,U> funcP ){
258  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkMap);
259  }
260 
261 
262  template <typename U>
263  fdd<U> * flatMap( flatMapPFunctionP<T,U> funcP){
264  return fddCore<T>::template map<U>((void*) funcP, OP_FlatMap);
265  }
266  template <typename U>
267  fdd<U> * flatMap( PflatMapPFunctionP<T,U> funcP){
268  return fddCore<T>::template map<U>((void*) funcP, OP_FlatMap);
269  }
270  template <typename L, typename U>
271  indexedFdd<L,U> * flatMap( IflatMapPFunctionP<T,L,U> funcP){
272  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
273  }
274  template <typename L, typename U>
275  indexedFdd<L,U> * flatMap( IPflatMapPFunctionP<T,L,U> funcP){
276  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_FlatMap);
277  }
278 
279 
280  template <typename U>
281  fdd<U> * bulkFlatMap( bulkFlatMapPFunctionP<T,U> funcP){
282  return fddCore<T>::template map<U>((void*) funcP, OP_BulkFlatMap);
283  }
284  template <typename U>
285  fdd<U> * bulkFlatMap( PbulkFlatMapPFunctionP<T,U> funcP){
286  return fddCore<T>::template map<U>((void*) funcP, OP_BulkFlatMap);
287  }
288  template <typename L, typename U>
289  indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapPFunctionP<T,L,U> funcP){
290  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
291  }
292  template <typename L, typename U>
293  indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapPFunctionP<T,L,U> funcP){
294  return fddCore<T>::template mapI<L,U>((void*) funcP, OP_BulkFlatMap);
295  }
296 
297  // Run a Reduce
298  inline std::vector<T> reduce(PreducePFunctionP<T> funcP ){
299  return reduceP((void*) funcP, OP_Reduce);
300  }
301  inline std::vector<T> bulkReduce(PbulkReducePFunctionP<T> funcP ){
302  return reduceP((void*) funcP, OP_BulkReduce);
303  }
304 
305  // --------------- FDD Builtin functions ------------- //
306  // Collect a FDD
307  std::vector<std::pair<T*, size_t>> collect( ) {
308  std::vector<std::pair<T*, size_t>> data(this->size);
309  this->context->collectFDD(data, this);
310  return data;
311  }
312  /*void * _collect( ) override{
313  return fddCore<T>::context->collectFDD(this);
314  }*/
315  fdd<T*> * cache(){
316  this->cached = true;
317  return this;
318  }
319 
320  };
321 
322  template <typename T>
324  cached = false;
325  context = &c;
326  }
327 
328  template <typename T>
329  fddCore<T>::fddCore(fastContext &c, size_t s, const std::vector<size_t> & dataAlloc) : fddCore(c){
330  this->size = s;
331  this->dataAlloc = dataAlloc;
332  }
333 
334 
335  template <typename T>
336  fddBase * fddCore<T>::_map( void * funcP, fddOpType op, fddBase * newFdd){
337  unsigned long int tid, sid;
338  //std::cerr << " Map ";
339 
340  unsigned long int newFddId = newFdd->getId();
341 
342  // Decode function pointer
343  int funcId = context->findFunc(funcP);
344  //std::cerr << " " << funcId << ".\n";
345 
346  // Send task
347  auto start = system_clock::now();
348  context->enqueueTask(op, id, newFddId, funcId, this->size);
349 
350 
351  // Receive results
352  auto result = context->recvTaskResult(tid, sid, start);
353 
354  if ( (op & 0xff) & (OP_FlatMap | OP_BulkFlatMap) ) {
355  size_t newSize = 0;
356  for (int i = 1; i < context->numProcs(); ++i){
357  if (result[i].second > 0) newSize += * (size_t *) result[i].first;
358  }
359 
360  newFdd->setSize(newSize);
361  }
362 
363  if (!cached){
364  this->discard();
365  }
366 
367  //std::cerr << "\n";
368  return newFdd;
369  }
370 
371  template <typename T>
372  template <typename U>
373  fdd<U> * fddCore<T>::map( void * funcP, fddOpType op){
374  fddBase * newFdd ;
375 
376  if ( (op & 0xFF ) & (OP_FlatMap | OP_BulkFlatMap) ){
377  newFdd = new fdd<U>(*context);
378  }else{
379  newFdd = new fdd<U>(*context, size, dataAlloc);
380  }
381 
382  return (fdd<U> *) _map(funcP, op, newFdd);
383  }
384  template <typename T>
385  template <typename L, typename U>
386  indexedFdd<L,U> * fddCore<T>::mapI( void * funcP, fddOpType op){
387  fddBase * newFdd ;
388 
389  if ( (op & 0xFF ) & (OP_FlatMap | OP_BulkFlatMap) ){
390  newFdd = new indexedFdd<L,U>(*context);
391  }else{
392  newFdd = new indexedFdd<L,U>(*context, size, dataAlloc);
393  }
394 
395  return (indexedFdd<L,U> *) _map(funcP, op, newFdd);
396  }
397 
398  template <typename T>
399  T fdd<T>::finishReduces(char ** partResult, size_t * pSize, int funcId, fddOpType op){
400  fastCommBuffer buffer(0);
401  T result;
402 
403  if (op == OP_Reduce){
404  reduceFunctionP<T> reduceFunc = (reduceFunctionP<T>) this->context->funcTable[funcId];
405 
406  // Get the real object behind the buffer
407  buffer.setBuffer(partResult[0], pSize[0]);
408  buffer >> result;
409 
410  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
411  T pr;
412 
413  //std::cerr << " Reduce Result Size:" << pSize[i] << "\n";
414  if (pSize[i] == 0) std::cerr << "UNEXPECTED ERROR!!!!";
415 
416  buffer.setBuffer(partResult[i], pSize[i]);
417  buffer >> pr;
418 
419  result = reduceFunc(result, pr);
420  }
421  }else{
422  bulkReduceFunctionP<T> bulkReduceFunc = (bulkReduceFunctionP<T>) this->context->funcTable[funcId];
423  T * vals = new T[this->context->numProcs() - 1];
424 
425  //#pragma omp parallel for
426  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
427  fastCommBuffer buffer(0);
428 
429  buffer.setBuffer(partResult[i], pSize[i]);
430  buffer >> vals[i];
431  }
432 
433  result = bulkReduceFunc(vals, this->context->numProcs() - 1);
434  delete [] vals;
435  // TODO do bulkreduce
436  }
437  return result;
438  }
439 
440  template <typename T>
441  T fdd<T>::reduce( void * funcP, fddOpType op){
442  //std::cerr << " Reduce ";
443  //T fddCore<T>::template reduce( int funcId, fddOpType op){
444  T result;
445  unsigned long int tid, sid;
446  int funcId = this->context->findFunc(funcP);
447  //std::cerr << " " << funcId << ".\n";
448  char ** partResult = new char*[this->context->numProcs() - 1];
449  size_t * rSize = new size_t[this->context->numProcs() - 1];
450 
451  // Send task
452  //unsigned long int reduceTaskId =
453  auto start = system_clock::now();
454  this->context->enqueueTask(op, this->id, 0, funcId, this->size);
455 
456  // Receive results
457  auto resultV = this->context->recvTaskResult(tid, sid, start);
458 
459  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
460  partResult[i] = (char*) resultV[i + 1].first;
461  rSize[i] = resultV[i + 1].second;
462  }
463 
464  // Finish applying reduces
465  result = finishReduces(partResult, rSize, funcId, op);
466 
467  delete [] partResult;
468  delete [] rSize;
469 
470  if (!this->cached)
471  this->discard();
472 
473  //std::cerr << "\n";
474  return result;
475  }
476 
477  template <typename T>
478  std::vector<T> fdd<T*>::finishPReduces(T ** partResult, size_t * partrSize, int funcId, fddOpType op){
479  T * result;
480  size_t rSize;
481 
482  if (op == OP_Reduce){
483  T * pResult;
484  size_t prSize;
485 
486  PreducePFunctionP<T> reduceFunc = ( PreducePFunctionP<T> ) this->context->funcTable[funcId];
487 
488  result = partResult[0];
489  rSize = partrSize[0];
490 
491  // Do the rest of the reduces
492  for (int i = 1; i < (this->context->numProcs() - 1); ++i){
493  pResult = result;
494  prSize = rSize;
495  std::pair<T*,size_t> r = reduceFunc(pResult, prSize, partResult[i], partrSize[i]);
496  result = r.first;
497  rSize = r.second;
498  }
499 
500  }else{
501  PbulkReducePFunctionP<T> bulkReduceFunc = (PbulkReducePFunctionP<T>) this->context->funcTable[funcId];
502  std::pair<T*,size_t> r = bulkReduceFunc(partResult, partrSize, this->context->numProcs() - 1);
503  result = r.first;
504  rSize = r.second;
505  }
506 
507  std::vector<T> vResult(rSize);
508  vResult.assign(result, result + rSize);
509 
510  return vResult;
511  }
512 
513  template <typename T>
514  std::vector <T> fdd<T*>::reduceP(void * funcP, fddOpType op){
515  //std::cerr << " Reduce";
516  //T fddCore<T>::template reduce( int funcId, fddOpType op){
517  // Decode function pointer
518  int funcId = this->context->findFunc(funcP);
519  T ** partResult = new T*[this->context->numProcs() -1];
520  size_t * partrSize = new size_t[this->context->numProcs() - 1];
521  unsigned long int tid, sid;
522  //std::cerr << " " << funcId << ".\n";
523 
524  // Send task
525  //unsigned long int reduceTaskId =
526  auto start = system_clock::now();
527  this->context->enqueueTask(op, this->id, 0, funcId, this->size);
528 
529  // Receive results
530  auto resultV = this->context->recvTaskResult(tid, sid, start);
531 
532  for (int i = 0; i < (this->context->numProcs() - 1); ++i){
533  partResult[i] = (T*) resultV[i + 1].first;
534  partrSize[i] = resultV[i + 1].second /= sizeof(T);
535  }
536 
537 
538  // Finish applying reduces
539  std::vector<T> vResult = finishPReduces(partResult, partrSize, funcId, op);
540 
541  delete [] partResult;
542  delete [] partrSize;
543 
544  if (!this->cached)
545  this->discard();
546 
547  return vResult;
548  }
549 
550 
551  template <typename T>
552  fdd<T>::fdd(fastContext &c, const char * fileName) {
553  fddCore<T>::context = &c;
554  this->id = c.readFDD(this, fileName);
555 
556  // Recover FDD information (size, ? etc )
557  this->context->getFDDInfo(this->size, this->dataAlloc);
558  }
559 
560 
561 }
562 
563 #endif
Definition: fdd.h:23
Definition: fdd.h:195
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: fddBase.h:8
Definition: fastContext.h:23
Definition: _workerFdd.h:11