libfaster API Documentation  Development Version
Super fast distributted computing
workerIPFddDependent.cpp
1 #include <tuple>
2 #include <list>
3 #include <iostream>
4 
5 #include "_workerIFdd.h"
6 #include "indexedFddStorageExtern.cpp"
7 
8 // MAP
9 template <typename K, typename T>
10 template <typename L, typename U>
11 void faster::_workerIFdd<K,T*>::map (workerFddBase * dest, ImapIPFunctionP<K,T,L,U> mapFunc){
12  T ** d = this->localData->getData();
13  size_t s = this->localData->getSize();
14  size_t * ls = this->localData->getLineSizes();
15  K * ik = this->localData->getKeys();
16  L * ok = (L*) dest->getKeys();
17  U * od = (U*) dest->getData();
18 
19  //std::cerr << "START " << id << " " << s << " ";
20 
21  #pragma omp parallel for
22  for (size_t i = 0; i < s; ++i){
23  std::pair<L,U> r = mapFunc(ik[i], d[i], ls[i]);
24  ok[i] = r.first;
25  od[i] = r.second;
26  }
27  //std::cerr << "END ";
28 }
29 template <typename K, typename T>
30 template <typename L, typename U>
31 void faster::_workerIFdd<K,T*>::map (workerFddBase * dest, IPmapIPFunctionP<K,T,L,U> mapFunc){
32  T ** d = this->localData->getData();
33  size_t s = this->localData->getSize();
34  size_t * ls = this->localData->getLineSizes();
35  size_t * dls = dest->getLineSizes();
36  K * ik = this->localData->getKeys();
37  L * ok = (L*) dest->getKeys();
38  U * od = (U*) dest->getData();
39  //std::cerr << "START " << id << " " << s << " ";
40 
41  #pragma omp parallel for
42  for (size_t i = 0; i < s; ++i){
43  std::tuple<L,U,size_t> r = mapFunc(ik[i], d[i], ls[i]);
44  ok[i] = std::get<0>(r);
45  od[i] = std::get<1>(r);
46  dls[i] = std::get<2>(r);
47  }
48  //std::cerr << "END ";
49 }
50 template <typename K, typename T>
51 template <typename U>
52 void faster::_workerIFdd<K,T*>::map (workerFddBase * dest, mapIPFunctionP<K,T,U> mapFunc){
53  T ** d = this->localData->getData();
54  size_t s = this->localData->getSize();
55  K * ik = this->localData->getKeys();
56  size_t * ls = this->localData->getLineSizes();
57  U * od = (U*) dest->getData();
58 
59  //std::cerr << "START " << id << " " << s << " ";
60 
61  #pragma omp parallel for
62  for (size_t i = 0; i < s; ++i){
63  od[i] = mapFunc(ik[i], d[i], ls[i]);
64  }
65  //std::cerr << "END ";
66 }
67 template <typename K, typename T>
68 template <typename U>
69 void faster::_workerIFdd<K,T*>::map (workerFddBase * dest, PmapIPFunctionP<K,T,U> mapFunc){
70  T ** d = this->localData->getData();
71  size_t s = this->localData->getSize();
72  K * ik = this->localData->getKeys();
73  size_t * ls = this->localData->getLineSizes();
74  size_t * dls = dest->getLineSizes();
75  U * od = (U*) dest->getData();
76  //std::cerr << "START " << id << " " << s << " ";
77 
78  #pragma omp parallel for
79  for (size_t i = 0; i < s; ++i){
80  std::pair<U,size_t> r = mapFunc(ik[i], d[i], ls[i]);
81  od[i] = r.first;
82  dls[i] = r.second;
83  }
84  //std::cerr << "END ";
85 }
86 
87 
88 
89 // Bulk MAP
90 
91 template <typename K, typename T>
92 template <typename L, typename U>
93 void faster::_workerIFdd<K,T*>::bulkMap (workerFddBase * dest, IbulkMapIPFunctionP<K,T,L,U> bulkMapFunc){
94  size_t s = this->localData->getSize();
95  size_t * ls = this->localData->getLineSizes();
96  K * ik = this->localData->getKeys();
97  L * ok = (L*) dest->getKeys();
98 
99  bulkMapFunc(ok, (U*) dest->getData(), ik, (T **)this->localData->getData(), ls, s);
100 }
101 template <typename K, typename T>
102 template <typename L, typename U>
103 void faster::_workerIFdd<K,T*>::bulkMap (workerFddBase * dest, IPbulkMapIPFunctionP<K,T,L,U> bulkMapFunc){
104  size_t s = this->localData->getSize();
105  size_t * ls = this->localData->getLineSizes();
106  K * ik = this->localData->getKeys();
107  L * ok = (L*) dest->getKeys();
108 
109  bulkMapFunc(ok, (U*) dest->getData(), dest->getLineSizes(), ik, (T **)this->localData->getData(), ls, s);
110 }
111 template <typename K, typename T>
112 template <typename U>
113 void faster::_workerIFdd<K,T*>::bulkMap (workerFddBase * dest, bulkMapIPFunctionP<K,T,U> bulkMapFunc){
114  size_t s = this->localData->getSize();
115  size_t * ls = this->localData->getLineSizes();
116  K * ik = this->localData->getKeys();
117 
118  bulkMapFunc((U*) dest->getData(), ik, (T **)this->localData->getData(), ls, s);
119 }
120 template <typename K, typename T>
121 template <typename U>
122 void faster::_workerIFdd<K,T*>::bulkMap (workerFddBase * dest, PbulkMapIPFunctionP<K,T,U> bulkMapFunc){
123  size_t s = this->localData->getSize();
124  size_t * ls = this->localData->getLineSizes();
125  K * ik = this->localData->getKeys();
126 
127  bulkMapFunc((U*) dest->getData(), dest->getLineSizes(), ik, (T **)this->localData->getData(), ls, s);
128 }
129 
130 
131 template <typename K, typename T>
132 template <typename L, typename U>
133 void faster::_workerIFdd<K,T*>::flatMap(workerFddBase * dest, IflatMapIPFunctionP<K,T,L,U> flatMapFunc ){
134  T ** d = this->localData->getData();
135  size_t s = this->localData->getSize();
136  std::deque<std::pair<L,U>> resultList;
137 
138  #pragma omp parallel
139  {
140  std::deque<std::pair<L,U>> partResultList;
141  #pragma omp for
142  for (size_t i = 0; i < s; ++i){
143  std::deque<std::pair<L,U>> r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
144 
145  partResultList.insert(partResultList.end(), r.begin(), r.end() );
146  }
147 
148  //Copy result to the FDD array
149  #pragma omp critical
150  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
151  }
152  dest->insertl(&resultList);
153 }
154 template <typename K, typename T>
155 template <typename L, typename U>
156 void faster::_workerIFdd<K,T*>::flatMap(workerFddBase * dest, IPflatMapIPFunctionP<K,T,L,U> flatMapFunc ){
157  T ** d = this->localData->getData();
158  size_t s = this->localData->getSize();
159  std::deque<std::tuple<L, U, size_t>> resultList;
160 
161  #pragma omp parallel
162  {
163  std::deque<std::tuple<L, U, size_t>> partResultList;
164  #pragma omp for
165  for (size_t i = 0; i < s; ++i){
166  std::deque<std::tuple<L, U, size_t>>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
167 
168  partResultList.insert(partResultList.end(), r.begin(), r.end() );
169  }
170 
171  //Copy result to the FDD array
172  #pragma omp critical
173  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
174  }
175  dest->insertl(&resultList);
176 }
177 template <typename K, typename T>
178 template <typename U>
179 void faster::_workerIFdd<K,T*>::flatMap(workerFddBase * dest, flatMapIPFunctionP<K,T,U> flatMapFunc ){
180  T ** d = this->localData->getData();
181  size_t s = this->localData->getSize();
182  std::deque<U> resultList;
183 
184  #pragma omp parallel
185  {
186  std::deque<U> partResultList;
187  #pragma omp for
188  for (size_t i = 0; i < s; ++i){
189  std::deque<U> r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
190 
191  partResultList.insert(partResultList.end(), r.begin(), r.end() );
192  }
193 
194  //Copy result to the FDD array
195  #pragma omp critical
196  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
197  }
198  dest->insertl(&resultList);
199 }
200 template <typename K, typename T>
201 template <typename U>
202 void faster::_workerIFdd<K,T*>::flatMap(workerFddBase * dest, PflatMapIPFunctionP<K,T,U> flatMapFunc ){
203  T ** d = this->localData->getData();
204  size_t s = this->localData->getSize();
205  std::deque<std::pair<U, size_t>> resultList;
206 
207  #pragma omp parallel
208  {
209  std::deque<std::pair<U, size_t>> partResultList;
210  #pragma omp for
211  for (size_t i = 0; i < s; ++i){
212  std::deque<std::pair<U, size_t>>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
213 
214  partResultList.insert(partResultList.end(), r.begin(), r.end() );
215  }
216 
217  //Copy result to the FDD array
218  #pragma omp critical
219  resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
220  }
221  dest->insertl(&resultList);
222 }
223 
224 
225 
226 
227 template <typename K, typename T>
228 template <typename L, typename U>
229 void faster::_workerIFdd<K,T*>::bulkFlatMap(workerFddBase * dest, IbulkFlatMapIPFunctionP<K,T,L,U> bulkFlatMapFunc ){
230  L * ok;
231  U * result;
232  size_t rSize;
233  K * ik = this->localData->getKeys();
234 
235  bulkFlatMapFunc( ok, result, rSize, ik, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
236  dest->setData(ok, result, rSize);
237 }
238 template <typename K, typename T>
239 template <typename L, typename U>
240 void faster::_workerIFdd<K,T*>::bulkFlatMap(workerFddBase * dest, IPbulkFlatMapIPFunctionP<K,T,L,U> bulkFlatMapFunc ){
241  L * ok;
242  U * result;
243  size_t * rDataSizes = NULL;
244  size_t rSize;
245  K * ik = this->localData->getKeys();
246 
247  bulkFlatMapFunc( ok, result, rDataSizes, rSize, ik, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
248  dest->setData(ok, result, rDataSizes, rSize);
249 }
250 template <typename K, typename T>
251 template <typename U>
252 void faster::_workerIFdd<K,T*>::bulkFlatMap(workerFddBase * dest, bulkFlatMapIPFunctionP<K,T,U> bulkFlatMapFunc ){
253  U * result;
254  size_t rSize;
255  K * ik = this->localData->getKeys();
256 
257  bulkFlatMapFunc( result, rSize, ik, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
258  dest->setData(result, rSize);
259 }
260 template <typename K, typename T>
261 template <typename U>
262 void faster::_workerIFdd<K,T*>::bulkFlatMap(workerFddBase * dest, PbulkFlatMapIPFunctionP<K,T,U> bulkFlatMapFunc ){
263  U * result;
264  size_t * rDataSizes = NULL;
265  size_t rSize;
266  K * ik = this->localData->getKeys();
267 
268  bulkFlatMapFunc( result, rDataSizes, rSize, ik, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
269  dest->setData(result, rDataSizes, rSize);
270 }
271 
272 
273 
274 
275 // Pointer -> Not Pointer
276 template <typename K, typename T>
277 template <typename L, typename U>
278 void faster::_workerIFdd<K,T*>::_applyI(void * func, fddOpType op, workerFddBase * dest){
279  switch (op){
280  case OP_Map:
281  map<L,U>(dest, (ImapIPFunctionP<K,T,L,U>) func);
282  //std::cerr << "Map";
283  break;
284  case OP_BulkMap:
285  bulkMap<L,U>(dest, ( IbulkMapIPFunctionP<K,T,L,U> ) func);
286  //std::cerr << "BulkMap ";
287  break;
288  case OP_FlatMap:
289  flatMap<L,U>(dest, ( IflatMapIPFunctionP<K,T,L,U> ) func);
290  //std::cerr << "FlatMap ";
291  break;
292  case OP_BulkFlatMap:
293  bulkFlatMap<L,U>(dest, ( IbulkFlatMapIPFunctionP<K,T,L,U> ) func);
294  //std::cerr << "BulkFlatMap ";
295  break;
296  }
297 }
298 
299 // Pointer -> Pointer
300 template <typename K, typename T>
301 template <typename L, typename U>
302 void faster::_workerIFdd<K,T*>::_applyIP(void * func, fddOpType op, workerFddBase * dest){
303  switch (op){
304  case OP_Map:
305  map<L,U>(dest, (IPmapIPFunctionP<K,T,L,U>) func);
306  //std::cerr << "Map";
307  break;
308  case OP_BulkMap:
309  bulkMap<L,U>(dest, ( IPbulkMapIPFunctionP<K,T,L,U> ) func);
310  //std::cerr << "BulkMap ";
311  break;
312  case OP_FlatMap:
313  flatMap<L,U>(dest, ( IPflatMapIPFunctionP<K,T,L,U> ) func);
314  //std::cerr << "FlatMap ";
315  break;
316  case OP_BulkFlatMap:
317  bulkFlatMap<L,U>(dest, ( IPbulkFlatMapIPFunctionP<K,T,L,U> ) func);
318  //std::cerr << "BulkFlatMap ";
319  break;
320  }
321 }
322 
323 template <typename K, typename T>
324 template <typename U>
325 void faster::_workerIFdd<K,T*>::_apply(void * func, fddOpType op, workerFddBase * dest){
326  switch (op){
327  case OP_Map:
328  map<U>(dest, (mapIPFunctionP<K,T,U>) func);
329  //std::cerr << "Map";
330  break;
331  case OP_BulkMap:
332  bulkMap<U>(dest, ( bulkMapIPFunctionP<K,T,U> ) func);
333  //std::cerr << "BulkMap ";
334  break;
335  case OP_FlatMap:
336  flatMap<U>(dest, ( flatMapIPFunctionP<K,T,U> ) func);
337  //std::cerr << "FlatMap ";
338  break;
339  case OP_BulkFlatMap:
340  bulkFlatMap<U>(dest, ( bulkFlatMapIPFunctionP<K,T,U> ) func);
341  //std::cerr << "BulkFlatMap ";
342  break;
343  }
344 }
345 
346 // Pointer -> Pointer
347 template <typename K, typename T>
348 template <typename U>
349 void faster::_workerIFdd<K,T*>::_applyP(void * func, fddOpType op, workerFddBase * dest){
350  switch (op){
351  case OP_Map:
352  map<U>(dest, (PmapIPFunctionP<K,T,U>) func);
353  //std::cerr << "Map";
354  break;
355  case OP_BulkMap:
356  bulkMap<U>(dest, ( PbulkMapIPFunctionP<K,T,U> ) func);
357  //std::cerr << "BulkMap ";
358  break;
359  case OP_FlatMap:
360  flatMap<U>(dest, ( PflatMapIPFunctionP<K,T,U> ) func);
361  //std::cerr << "FlatMap ";
362  break;
363  case OP_BulkFlatMap:
364  bulkFlatMap<U>(dest, ( PbulkFlatMapIPFunctionP<K,T,U> ) func);
365  //std::cerr << "BulkFlatMap ";
366  break;
367  }
368 }
369 
370 
371 template <typename K, typename T>
372 template <typename L>
373 void faster::_workerIFdd<K,T*>::_preApplyI(void * func, fddOpType op, workerFddBase * dest){
374  switch (dest->getType()){
375  case Null: break;
376  case Char: _applyI<L,char> (func, op, dest); break;
377  case Int: _applyI<L,int> (func, op, dest); break;
378  case LongInt: _applyI<L,long int>(func, op, dest); break;
379  case Float: _applyI<L,float> (func, op, dest); break;
380  case Double: _applyI<L,double> (func, op, dest); break;
381  case CharP: _applyIP<L,char *> (func, op, dest); break;
382  case IntP: _applyIP<L,int *> (func, op, dest); break;
383  case LongIntP: _applyIP<L,long int *> (func, op, dest); break;
384  case FloatP: _applyIP<L,float *> (func, op, dest); break;
385  case DoubleP: _applyIP<L,double *> (func, op, dest); break;
386  case String: _applyI<L,std::string>(func, op, dest); break;
387  //case Custom: _applyI(func, op, dest); break;
388  case CharV: _applyI<L,std::vector<char>> (func, op, dest); break;
389  case IntV: _applyI<L,std::vector<int>> (func, op, dest); break;
390  case LongIntV: _applyI<L,std::vector<long int>>(func, op, dest); break;
391  case FloatV: _applyI<L,std::vector<float>> (func, op, dest); break;
392  case DoubleV: _applyI<L,std::vector<double>> (func, op, dest); break;
393 
394  }
395 
396 }
397 
398 template <typename K, typename T>
399 void faster::_workerIFdd<K,T*>::_preApply(void * func, fddOpType op, workerFddBase * dest){
400  switch (dest->getType()){
401  case Null: break;
402 
403  case Char: _apply<char> (func, op, dest); break;
404  case Int: _apply<int> (func, op, dest); break;
405  case LongInt: _apply<long int> (func, op, dest); break;
406  case Float: _apply<float> (func, op, dest); break;
407  case Double: _apply<double> (func, op, dest); break;
408  case CharP: _applyP<char *> (func, op, dest); break;
409  case IntP: _applyP<int *> (func, op, dest); break;
410  case LongIntP: _applyP<long int *> (func, op, dest); break;
411  case FloatP: _applyP<float *> (func, op, dest); break;
412  case DoubleP: _applyP<double *> (func, op, dest); break;
413  case String: _apply<std::string>(func, op, dest); break;
414  //case Custom: _apply<void *>(func, op, dest); break;
415  case CharV: _apply<std::vector<char>> (func, op, dest); break;
416  case IntV: _apply<std::vector<int>> (func, op, dest); break;
417  case LongIntV:_apply<std::vector<long int>>(func, op, dest); break;
418  case FloatV: _apply<std::vector<float>> (func, op, dest); break;
419  case DoubleV: _apply<std::vector<double>> (func, op, dest); break;
420  }
421 
422 }
423 
424 
425 
426 template <typename K, typename T>
427 void faster::_workerIFdd<K,T*>::applyDependent(void * func, fddOpType op, workerFddBase * dest){
428  switch (dest->getKeyType()){
429  case Null: _preApply(func, op, dest);break;
430  case Char: _preApplyI<char>(func, op, dest); break;
431  case Int: _preApplyI<int>(func, op, dest); break;
432  case LongInt: _preApplyI<long int>(func, op, dest); break;
433  case Float: _preApplyI<float>(func, op, dest); break;
434  case Double: _preApplyI<double>(func, op, dest); break;
435  case String: _preApplyI<std::string>(func, op, dest); break;
436  }
437 }
438 
439 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41