libfaster API Documentation  Development Version
Super fast distributted computing
workerPFdd.cpp
1 #include <iostream>
2 #include <tuple>
3 #include "_workerFdd.h"
4 #include "workerFddBase.h"
5 #include "fddStorageExtern.cpp"
6 #include "fastComm.h"
7 
8 #include "workerFddCoreExtern.cpp"
9 #include "workerFddModule.cpp"
10 
11 faster::workerFddBase * faster::newWorkerSDL(unsigned long int id, fddType type, size_t size){
12  switch (type){
13  case CharP: return new _workerFdd<char *>(id, type, size); break;
14  case IntP: return new _workerFdd<int *>(id, type, size); break;
15  case LongIntP: return new _workerFdd<long int *>(id, type, size); break;
16  case FloatP: return new _workerFdd<float *>(id, type, size); break;
17  case DoubleP: return new _workerFdd<double *>(id, type, size); break;
18  //case Custom: return new _workerFdd<void *>(id, type, size); break;
19  }
20  return NULL;
21 }
22 
23 // MAP
24 template <typename T>
25 template <typename U>
26 void faster::_workerFdd<T*>::map (workerFddBase * dest, mapPFunctionP<T,U> mapFunc){
27  T ** d = this->localData->getData();
28  size_t s = this->localData->getSize();
29  size_t * ls = this->localData->getLineSizes();
30  U * od = (U*) dest->getData();
31 
32  //std::cerr << "START " << id << " " << s << " ";
33 
34  #pragma omp parallel for
35  for (size_t i = 0; i < s; ++i){
36  od[i] = mapFunc(d[i], ls[i]);
37  }
38  //std::cerr << "END ";
39 }
40 
41 template <typename T>
42 template <typename U>
43 void faster::_workerFdd<T*>::map (workerFddBase * dest, PmapPFunctionP<T,U> mapFunc){
44  T ** d = this->localData->getData();
45  size_t s = this->localData->getSize();
46  size_t * ls = this->localData->getLineSizes();
47  size_t * dls = dest->getLineSizes();
48  U * od = (U*) dest->getData();
49  //std::cerr << "START " << id << " " << s << " ";
50 
51  #pragma omp parallel for
52  for (size_t i = 0; i < s; ++i){
53  std::pair<U,size_t> r = mapFunc(d[i], ls[i]);
54  od[i] = r.first;
55  dls[i] = r.second;
56  }
57  //std::cerr << "END ";
58 }
59 
60 template <typename T>
61 template <typename L, typename U>
62 void faster::_workerFdd<T*>::map (workerFddBase * dest, ImapPFunctionP<T,L,U> mapFunc){
63  T ** d = this->localData->getData();
64  size_t s = this->localData->getSize();
65  size_t * ls = this->localData->getLineSizes();
66  L * ok = (L*) dest->getKeys();
67  U * od = (U*) dest->getData();
68 
69  //std::cerr << "START " << id << " " << s << " ";
70 
71  #pragma omp parallel for
72  for (size_t i = 0; i < s; ++i){
73  std::pair<L,U> r = mapFunc(d[i], ls[i]);
74  ok[i] = r.first;
75  od[i] = r.second;
76 
77  }
78  //std::cerr << "END ";
79 }
80 
81 template <typename T>
82 template <typename L, typename U>
83 void faster::_workerFdd<T*>::map (workerFddBase * dest, IPmapPFunctionP<T,L,U> mapFunc){
84  T ** d = this->localData->getData();
85  size_t s = this->localData->getSize();
86  size_t * ls = this->localData->getLineSizes();
87  size_t * dls = dest->getLineSizes();
88  L * ok = (L*) dest->getKeys();
89  U * od = (U*) dest->getData();
90  //std::cerr << "START " << id << " " << s << " ";
91 
92  #pragma omp parallel for
93  for (size_t i = 0; i < s; ++i){
94  std::tuple<L,U,size_t> r = mapFunc(d[i], ls[i]);
95  ok[i] = std::get<0>(r);
96  od[i] = std::get<1>(r);
97  dls[i] = std::get<2>(r);
98  }
99  //std::cerr << "END ";
100 }
101 
102 
103 // BulkMap
104 template <typename T>
105 template <typename U>
106 void faster::_workerFdd<T*>::bulkMap (workerFddBase * dest, bulkMapPFunctionP<T,U> bulkMapFunc){
107  size_t s = this->localData->getSize();
108  size_t * ls = this->localData->getLineSizes();
109 
110  bulkMapFunc((U*) dest->getData(), (T **)this->localData->getData(), ls, s);
111 }
112 template <typename T>
113 template <typename U>
114 void faster::_workerFdd<T*>::bulkMap (workerFddBase * dest, PbulkMapPFunctionP<T,U> bulkMapFunc){
115  size_t s = this->localData->getSize();
116  size_t * ls = this->localData->getLineSizes();
117 
118  bulkMapFunc((U*) dest->getData(), dest->getLineSizes(), (T **)this->localData->getData(), ls, s);
119 }
120 template <typename T>
121 template <typename L, typename U>
122 void faster::_workerFdd<T*>::bulkMap (workerFddBase * dest, IbulkMapPFunctionP<T,L,U> bulkMapFunc){
123  size_t s = this->localData->getSize();
124  size_t * ls = this->localData->getLineSizes();
125 
126  bulkMapFunc((L*) dest->getKeys(), (U*) dest->getData(), (T **)this->localData->getData(), ls, s);
127 }
128 template <typename T>
129 template <typename L, typename U>
130 void faster::_workerFdd<T*>::bulkMap (workerFddBase * dest, IPbulkMapPFunctionP<T,L,U> bulkMapFunc){
131  size_t s = this->localData->getSize();
132  size_t * ls = this->localData->getLineSizes();
133 
134  bulkMapFunc((L*) dest->getKeys(), (U*) dest->getData(), dest->getLineSizes(), (T **)this->localData->getData(), ls, s);
135 }
136 
137 
138 // FlatMap
139 template <typename T>
140 template <typename U>
141 void faster::_workerFdd<T*>::flatMap(workerFddBase * dest, flatMapPFunctionP<T,U> flatMapFunc ){
142  T ** d = this->localData->getData();
143  size_t s = this->localData->getSize();
144  std::deque<U> resultList;
145 
146  #pragma omp parallel
147  {
148  std::deque<U> partResultList;
149  #pragma omp for
150  for (size_t i = 0; i < s; ++i){
151  std::deque<U>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
152 
153  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
154  }
155 
156  //Copy result to the FDD array
157  #pragma omp critical
158  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
159  }
160  dest->insertl(&resultList);
161 }
162 template <typename T>
163 template <typename U>
164 void faster::_workerFdd<T*>::flatMap(workerFddBase * dest, PflatMapPFunctionP<T,U> flatMapFunc ){
165  T ** d = this->localData->getData();
166  size_t s = this->localData->getSize();
167  std::deque<std::pair<U, size_t>> resultList;
168 
169  #pragma omp parallel
170  {
171  std::deque<std::pair<U, size_t>> partResultList;
172  #pragma omp for
173  for (size_t i = 0; i < s; ++i){
174  std::deque<std::pair<U, size_t>>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
175 
176  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
177  }
178 
179  //Copy result to the FDD array
180  #pragma omp critical
181  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
182  }
183  dest->insertl(&resultList);
184 }
185 template <typename T>
186 template <typename L, typename U>
187 void faster::_workerFdd<T*>::flatMap(workerFddBase * dest, IflatMapPFunctionP<T,L,U> flatMapFunc ){
188  T ** d = this->localData->getData();
189  size_t s = this->localData->getSize();
190  std::deque<std::pair<L,U>> resultList;
191 
192  #pragma omp parallel
193  {
194  std::deque<std::pair<L,U>> partResultList;
195  #pragma omp for
196  for (size_t i = 0; i < s; ++i){
197  std::deque<std::pair<L,U>> r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
198 
199  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
200  }
201 
202  //Copy result to the FDD array
203  #pragma omp critical
204  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
205  }
206  dest->insertl(&resultList);
207 }
208 template <typename T>
209 template <typename L, typename U>
210 void faster::_workerFdd<T*>::flatMap(workerFddBase * dest, IPflatMapPFunctionP<T,L,U> flatMapFunc ){
211  T ** d = this->localData->getData();
212  size_t s = this->localData->getSize();
213  std::deque<std::tuple<L,U, size_t>> resultList;
214 
215  #pragma omp parallel
216  {
217  std::deque<std::tuple<L,U, size_t>> partResultList;
218  #pragma omp for
219  for (size_t i = 0; i < s; ++i){
220  std::deque<std::tuple<L,U, size_t>>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
221 
222  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
223  }
224 
225  //Copy result to the FDD array
226  #pragma omp critical
227  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
228  }
229  dest->insertl(&resultList);
230 }
231 
232 template <typename T>
233 template <typename U>
234 void faster::_workerFdd<T*>::bulkFlatMap(workerFddBase * dest, bulkFlatMapPFunctionP<T,U> bulkFlatMapFunc ){
235  U * result;
236  size_t rSize;
237 
238  bulkFlatMapFunc( result, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
239  dest->setData(result, rSize);
240 }
241 template <typename T>
242 template <typename U>
243 void faster::_workerFdd<T*>::bulkFlatMap(workerFddBase * dest, PbulkFlatMapPFunctionP<T,U> bulkFlatMapFunc ){
244  U * result;
245  size_t * rDataSizes = NULL;
246  size_t rSize;
247 
248  bulkFlatMapFunc( result, rDataSizes, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
249  dest->setData((void*) result, rDataSizes, rSize);
250 }
251 template <typename T>
252 template <typename L, typename U>
253 void faster::_workerFdd<T*>::bulkFlatMap(workerFddBase * dest, IbulkFlatMapPFunctionP<T,L,U> bulkFlatMapFunc ){
254  L * keys;
255  U * result;
256  size_t rSize;
257 
258  bulkFlatMapFunc( keys, result, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
259  dest->setData(keys, result, rSize);
260 }
261 template <typename T>
262 template <typename L, typename U>
263 void faster::_workerFdd<T*>::bulkFlatMap(workerFddBase * dest, IPbulkFlatMapPFunctionP<T,L,U> bulkFlatMapFunc ){
264  L * keys;
265  U * result;
266  size_t * rDataSizes = NULL;
267  size_t rSize;
268 
269  bulkFlatMapFunc( keys, result, rDataSizes, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
270  dest->setData(keys, (void*) result, rDataSizes, rSize);
271 }
272 
273 
274 // REDUCE
275 template <typename T>
276 std::pair<T*,size_t> faster::_workerFdd<T*>::reduce (PreducePFunctionP<T> reduceFunc){
277  T ** d = this->localData->getData();
278  std::pair<T*,size_t> result;
279  size_t s = this->localData->getSize();
280  size_t * ls = this->localData->getLineSizes();
281  //std::cerr << "START " << id << " " << s << " | ";
282 
283  #pragma omp parallel
284  {
285  int nT = omp_get_num_threads();
286  int tN = omp_get_thread_num();
287  std::pair<T *, size_t> partResult(d[tN], ls[tN] );
288  T * a, * b;
289  size_t aSize, bSize;
290 
291  //#pragma omp master
292  //std::cerr << tN << "(" << nT << ")";
293 
294  #pragma omp for
295  for (int i = nT; i < s; ++i){
296  a = partResult.first;
297  b = d[i];
298  aSize = partResult.second;
299  bSize = ls[i];
300 
301  partResult = reduceFunc(a, aSize, b, bSize);
302 
303  delete [] a;
304  delete [] b;
305  }
306  #pragma omp master
307  {
308  result = partResult;
309  }
310 
311  #pragma omp barrier
312 
313  #pragma omp critical
314  if (omp_get_thread_num() != 0){
315  a = result.first;
316  b = partResult.first;
317  aSize = result.second;
318  bSize = partResult.second;
319 
320  result = reduceFunc(a, aSize, b, bSize);
321 
322  delete [] a;
323  delete [] b;
324  }
325  }
326  //std::cerr << "END ";
327  return result;
328 }
329 
330 template <typename T>
331 std::pair<T*,size_t> faster::_workerFdd<T*>::bulkReduce (PbulkReducePFunctionP<T> bulkReduceFunc){
332  std::pair<T *, size_t> r = bulkReduceFunc((T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
333  return r;
334 }
335 template <typename T>
336 template <typename U>
337 void faster::_workerFdd<T*>::_apply(void * func, fddOpType op, workerFddBase * dest){
338  switch (op){
339  case OP_Map:
340  map(dest, (mapPFunctionP<T,U>) func);
341  //std::cerr << "Map";
342  break;
343  case OP_BulkMap:
344  bulkMap(dest, ( bulkMapPFunctionP<T,U> ) func);
345  //std::cerr << "BulkMap ";
346  break;
347  case OP_FlatMap:
348  flatMap(dest, ( flatMapPFunctionP<T,U> ) func);
349  //std::cerr << "FlatMap ";
350  break;
351  case OP_BulkFlatMap:
352  bulkFlatMap(dest, ( bulkFlatMapPFunctionP<T,U> ) func);
353  //std::cerr << "BulkFlatMap ";
354  break;
355  }
356 }
357 
358 // Pointer -> Pointer
359 template <typename T>
360 template <typename U>
361 void faster::_workerFdd<T*>::_applyP(void * func, fddOpType op, workerFddBase * dest){
362  switch (op){
363  case OP_Map:
364  map(dest, (PmapPFunctionP<T,U>) func);
365  //std::cerr << "Map";
366  break;
367  case OP_BulkMap:
368  bulkMap(dest, ( PbulkMapPFunctionP<T,U> ) func);
369  //std::cerr << "BulkMap ";
370  break;
371  case OP_FlatMap:
372  flatMap(dest, ( PflatMapPFunctionP<T,U> ) func);
373  //std::cerr << "FlatMap ";
374  break;
375  case OP_BulkFlatMap:
376  bulkFlatMap(dest, ( PbulkFlatMapPFunctionP<T,U> ) func);
377  //std::cerr << "BulkFlatMap ";
378  break;
379  }
380 }
381 
382 template <typename T>
383 template <typename L, typename U>
384 void faster::_workerFdd<T*>::_applyI(void * func, fddOpType op, workerFddBase * dest){
385  switch (op){
386  case OP_Map:
387  map(dest, (ImapPFunctionP<T,L,U>) func);
388  //std::cerr << "Map";
389  break;
390  case OP_BulkMap:
391  bulkMap(dest, ( IbulkMapPFunctionP<T,L,U> ) func);
392  //std::cerr << "BulkMap ";
393  break;
394  case OP_FlatMap:
395  flatMap(dest, ( IflatMapPFunctionP<T,L,U> ) func);
396  //std::cerr << "FlatMap ";
397  break;
398  case OP_BulkFlatMap:
399  bulkFlatMap(dest, ( IbulkFlatMapPFunctionP<T,L,U> ) func);
400  //std::cerr << "BulkFlatMap ";
401  break;
402  }
403 }
404 
405 // Pointer -> Pointer
406 template <typename T>
407 template <typename L, typename U>
408 void faster::_workerFdd<T*>::_applyIP(void * func, fddOpType op, workerFddBase * dest){
409  switch (op){
410  case OP_Map:
411  map(dest, (IPmapPFunctionP<T,L,U>) func);
412  //std::cerr << "Map";
413  break;
414  case OP_BulkMap:
415  bulkMap(dest, ( IPbulkMapPFunctionP<T,L,U> ) func);
416  //std::cerr << "BulkMap ";
417  break;
418  case OP_FlatMap:
419  flatMap(dest, ( IPflatMapPFunctionP<T,L,U> ) func);
420  //std::cerr << "FlatMap ";
421  break;
422  case OP_BulkFlatMap:
423  bulkFlatMap(dest, ( IPbulkFlatMapPFunctionP<T,L,U> ) func);
424  //std::cerr << "BulkFlatMap ";
425  break;
426  }
427 }
428 
429 template <typename T>
430 void faster::_workerFdd<T*>::_applyReduce(void * func, fddOpType op, fastCommBuffer & buffer){
431  std::pair<T*,size_t> r;
432  switch (op){
433  case OP_Reduce:
434  r = reduce(( PreducePFunctionP<T> ) func);
435  //std::cerr << "Reduce " ;
436  break;
437  case OP_BulkReduce:
438  r = bulkReduce(( PbulkReducePFunctionP<T> ) func);
439  //std::cerr << "BulkReduce ";
440  break;
441  }
442  buffer.write(r.first, r.second);
443 
444 }
445 
446 
447 template <typename T>
448 void faster::_workerFdd<T*>::_preApply(void * func, fddOpType op, workerFddBase * dest){
449  switch (dest->getType()){
450  case Null: break;
451  case Char: _apply<char>(func, op, dest); break;
452  case Int: _apply<int>(func, op, dest); break;
453  case LongInt: _apply<long int>(func, op, dest); break;
454  case Float: _apply<float>(func, op, dest); break;
455  case Double: _apply<double>(func, op, dest); break;
456  case CharP: _applyP<char *>(func, op, dest); break;
457  case IntP: _applyP<int *>(func, op, dest); break;
458  case LongIntP: _applyP<long int *>(func, op, dest); break;
459  case FloatP: _applyP<float *>(func, op, dest); break;
460  case DoubleP: _applyP<double *>(func, op, dest); break;
461  case String: _apply<std::string>(func, op, dest); break;
462  //case Custom: _apply<void *>(func, op, (workerFdd *) dest); break;
463  case CharV: _apply<std::vector<char>>(func, op, dest); break;
464  case IntV: _apply<std::vector<int>>(func, op, dest); break;
465  case LongIntV: _apply<std::vector<long int>>(func, op, dest); break;
466  case FloatV: _apply<std::vector<float>>(func, op, dest); break;
467  case DoubleV: _apply<std::vector<double>>(func, op, dest); break;
468  }
469 }
470 
471 template <typename T>
472 template <typename L>
473 void faster::_workerFdd<T*>::_preApplyI(void * func, fddOpType op, workerFddBase * dest){
474  switch (dest->getType()){
475  case Null: break;
476  case Char: _applyI<L, char> (func, op, dest); break;
477  case Int: _applyI<L, int> (func, op, dest); break;
478  case LongInt: _applyI<L, long int> (func, op, dest); break;
479  case Float: _applyI<L, float> (func, op, dest); break;
480  case Double: _applyI<L, double> (func, op, dest); break;
481  case CharP: _applyIP<L, char *> (func, op, dest); break;
482  case IntP: _applyIP<L, int *> (func, op, dest); break;
483  case LongIntP: _applyIP<L, long int *> (func, op, dest); break;
484  case FloatP: _applyIP<L, float *> (func, op, dest); break;
485  case DoubleP: _applyIP<L, double *> (func, op, dest); break;
486  case String: _applyI<L, std::string> (func, op, dest); break;
487  //case Custom: _applyI<L, void *> (func, op, dest); break;
488  case CharV: _applyI<L, std::vector<char>> (func, op, dest); break;
489  case IntV: _applyI<L, std::vector<int>> (func, op, dest); break;
490  case LongIntV: _applyI<L, std::vector<long int>>(func, op, dest); break;
491  case FloatV: _applyI<L, std::vector<float>> (func, op, dest); break;
492  case DoubleV: _applyI<L, std::vector<double>> (func, op, dest); break;
493  }
494 }
495 
496 // -------------------------- Public Functions ------------------------ //
497 
498 template <typename T>
499 void faster::_workerFdd<T*>::apply(void * func, fddOpType op, workerFddBase * dest, fastCommBuffer & buffer){
500  if (op & OP_GENERICREDUCE){
501  _applyReduce(func, op, buffer);
502  }else{
503  switch (dest->getKeyType()){
504  case Null: _preApply(func, op, dest);break;
505  case Char: _preApplyI<char>(func, op, dest); break;
506  case Int: _preApplyI<int>(func, op, dest); break;
507  case LongInt: _preApplyI<long int>(func, op, dest); break;
508  case Float: _preApplyI<float>(func, op, dest); break;
509  case Double: _preApplyI<double>(func, op, dest); break;
510  case String: _preApplyI<std::string>(func, op, dest); break;
511  }
512  }
513 }
514 
515 template <typename T>
516 void faster::_workerFdd<T*>::setData(T ** data, size_t *lineSizes, size_t size) {
517  this->localData->setData( data, lineSizes, size);
518 }
519 
520 template <typename T>
521 void faster::_workerFdd<T*>::setDataRaw(void * data, size_t *lineSizes, size_t size) {
522  this->localData->setDataRaw( data, lineSizes, size);
523 }
524 
525 
526 template <typename T>
528  return this->localData->getLineSizes();
529 }
530 
531 template <typename T>
532 void faster::_workerFdd<T*>::insert(void * k UNUSED, void * in, size_t s){
533  this->localData->insert((T*&)in, s);
534 }
535 template <typename T>
536 void faster::_workerFdd<T*>::insertl(void * in){
537  insert( *(std::deque<std::pair<T*, size_t>>*) in);
538 }
539 
540 template <typename T>
541 void faster::_workerFdd<T*>::insert(T* & in, size_t s){
542  this->localData->insert(in, s);
543 }
544 template <typename T>
545 void faster::_workerFdd<T*>::insert(std::deque< std::pair<T*, size_t> > & in){
546  typename std::deque< std::pair<T*, size_t> >::iterator it;
547 
548  if (this->localData->getSize() < in.size())
549  this->localData->grow(in.size());
550 
551  for ( it = in.begin(); it != in.end(); it++)
552  this->localData->insert(it->first, it->second);
553 }
554 
555 template <typename T>
556 void faster::_workerFdd<T*>::collect(fastComm * comm) {
557  comm->sendFDDDataCollect(this->id, this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
558 
559 };
560 
561 
562 
563 template class faster::_workerFdd<char *>;
564 template class faster::_workerFdd<int *>;
565 template class faster::_workerFdd<long int *>;
566 template class faster::_workerFdd<float *>;
567 template class faster::_workerFdd<double *>;
568 //template class workerFdd<void *>;
569 
570 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41