3 #include "_workerFdd.h" 4 #include "workerFddBase.h" 5 #include "fddStorageExtern.cpp" 8 #include "workerFddCoreExtern.cpp" 9 #include "workerFddModule.cpp" 13 case CharP:
return new _workerFdd<char *>(id, type, size);
break;
14 case IntP:
return new _workerFdd<int *>(id, type, size);
break;
15 case LongIntP:
return new _workerFdd<long int *>(id, type, size);
break;
16 case FloatP:
return new _workerFdd<float *>(id, type, size);
break;
17 case DoubleP:
return new _workerFdd<double *>(id, type, size);
break;
27 T ** d = this->localData->getData();
28 size_t s = this->localData->getSize();
29 size_t * ls = this->localData->getLineSizes();
30 U * od = (U*) dest->getData();
34 #pragma omp parallel for 35 for (
size_t i = 0; i < s; ++i){
36 od[i] = mapFunc(d[i], ls[i]);
44 T ** d = this->localData->getData();
45 size_t s = this->localData->getSize();
46 size_t * ls = this->localData->getLineSizes();
47 size_t * dls = dest->getLineSizes();
48 U * od = (U*) dest->getData();
51 #pragma omp parallel for 52 for (
size_t i = 0; i < s; ++i){
53 std::pair<U,size_t> r = mapFunc(d[i], ls[i]);
61 template <
typename L,
typename U>
63 T ** d = this->localData->getData();
64 size_t s = this->localData->getSize();
65 size_t * ls = this->localData->getLineSizes();
66 L * ok = (L*) dest->getKeys();
67 U * od = (U*) dest->getData();
71 #pragma omp parallel for 72 for (
size_t i = 0; i < s; ++i){
73 std::pair<L,U> r = mapFunc(d[i], ls[i]);
82 template <
typename L,
typename U>
84 T ** d = this->localData->getData();
85 size_t s = this->localData->getSize();
86 size_t * ls = this->localData->getLineSizes();
87 size_t * dls = dest->getLineSizes();
88 L * ok = (L*) dest->getKeys();
89 U * od = (U*) dest->getData();
92 #pragma omp parallel for 93 for (
size_t i = 0; i < s; ++i){
94 std::tuple<L,U,size_t> r = mapFunc(d[i], ls[i]);
95 ok[i] = std::get<0>(r);
96 od[i] = std::get<1>(r);
97 dls[i] = std::get<2>(r);
104 template <
typename T>
105 template <
typename U>
107 size_t s = this->localData->getSize();
108 size_t * ls = this->localData->getLineSizes();
110 bulkMapFunc((U*) dest->getData(), (T **)this->localData->getData(), ls, s);
112 template <
typename T>
113 template <
typename U>
115 size_t s = this->localData->getSize();
116 size_t * ls = this->localData->getLineSizes();
118 bulkMapFunc((U*) dest->getData(), dest->getLineSizes(), (T **)this->localData->getData(), ls, s);
120 template <
typename T>
121 template <
typename L,
typename U>
123 size_t s = this->localData->getSize();
124 size_t * ls = this->localData->getLineSizes();
126 bulkMapFunc((L*) dest->getKeys(), (U*) dest->getData(), (T **)this->localData->getData(), ls, s);
128 template <
typename T>
129 template <
typename L,
typename U>
131 size_t s = this->localData->getSize();
132 size_t * ls = this->localData->getLineSizes();
134 bulkMapFunc((L*) dest->getKeys(), (U*) dest->getData(), dest->getLineSizes(), (T **)this->localData->getData(), ls, s);
139 template <
typename T>
140 template <
typename U>
142 T ** d = this->localData->getData();
143 size_t s = this->localData->getSize();
144 std::deque<U> resultList;
148 std::deque<U> partResultList;
150 for (
size_t i = 0; i < s; ++i){
151 std::deque<U>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
153 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
158 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
160 dest->insertl(&resultList);
162 template <
typename T>
163 template <
typename U>
165 T ** d = this->localData->getData();
166 size_t s = this->localData->getSize();
167 std::deque<std::pair<U, size_t>> resultList;
171 std::deque<std::pair<U, size_t>> partResultList;
173 for (
size_t i = 0; i < s; ++i){
174 std::deque<std::pair<U, size_t>>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
176 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
181 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
183 dest->insertl(&resultList);
185 template <
typename T>
186 template <
typename L,
typename U>
188 T ** d = this->localData->getData();
189 size_t s = this->localData->getSize();
190 std::deque<std::pair<L,U>> resultList;
194 std::deque<std::pair<L,U>> partResultList;
196 for (
size_t i = 0; i < s; ++i){
197 std::deque<std::pair<L,U>> r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
199 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
204 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
206 dest->insertl(&resultList);
208 template <
typename T>
209 template <
typename L,
typename U>
211 T ** d = this->localData->getData();
212 size_t s = this->localData->getSize();
213 std::deque<std::tuple<L,U, size_t>> resultList;
217 std::deque<std::tuple<L,U, size_t>> partResultList;
219 for (
size_t i = 0; i < s; ++i){
220 std::deque<std::tuple<L,U, size_t>>r = flatMapFunc(d[i], this->localData->getLineSizes()[i]);
222 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
227 resultList.insert(resultList.end(), partResultList.begin(), partResultList.end() );
229 dest->insertl(&resultList);
232 template <
typename T>
233 template <
typename U>
238 bulkFlatMapFunc( result, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
239 dest->setData(result, rSize);
241 template <
typename T>
242 template <
typename U>
245 size_t * rDataSizes = NULL;
248 bulkFlatMapFunc( result, rDataSizes, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
249 dest->setData((
void*) result, rDataSizes, rSize);
251 template <
typename T>
252 template <
typename L,
typename U>
258 bulkFlatMapFunc( keys, result, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
259 dest->setData(keys, result, rSize);
261 template <
typename T>
262 template <
typename L,
typename U>
266 size_t * rDataSizes = NULL;
269 bulkFlatMapFunc( keys, result, rDataSizes, rSize, (T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
270 dest->setData(keys, (
void*) result, rDataSizes, rSize);
275 template <
typename T>
277 T ** d = this->localData->getData();
278 std::pair<T*,size_t> result;
279 size_t s = this->localData->getSize();
280 size_t * ls = this->localData->getLineSizes();
285 int nT = omp_get_num_threads();
286 int tN = omp_get_thread_num();
287 std::pair<T *, size_t> partResult(d[tN], ls[tN] );
295 for (
int i = nT; i < s; ++i){
296 a = partResult.first;
298 aSize = partResult.second;
301 partResult = reduceFunc(a, aSize, b, bSize);
314 if (omp_get_thread_num() != 0){
316 b = partResult.first;
317 aSize = result.second;
318 bSize = partResult.second;
320 result = reduceFunc(a, aSize, b, bSize);
330 template <
typename T>
332 std::pair<T *, size_t> r = bulkReduceFunc((T**) this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
335 template <
typename T>
336 template <
typename U>
340 map(dest, (mapPFunctionP<T,U>) func);
344 bulkMap(dest, ( bulkMapPFunctionP<T,U> ) func);
348 flatMap(dest, ( flatMapPFunctionP<T,U> ) func);
352 bulkFlatMap(dest, ( bulkFlatMapPFunctionP<T,U> ) func);
359 template <
typename T>
360 template <
typename U>
364 map(dest, (PmapPFunctionP<T,U>) func);
368 bulkMap(dest, ( PbulkMapPFunctionP<T,U> ) func);
372 flatMap(dest, ( PflatMapPFunctionP<T,U> ) func);
376 bulkFlatMap(dest, ( PbulkFlatMapPFunctionP<T,U> ) func);
382 template <
typename T>
383 template <
typename L,
typename U>
387 map(dest, (ImapPFunctionP<T,L,U>) func);
391 bulkMap(dest, ( IbulkMapPFunctionP<T,L,U> ) func);
395 flatMap(dest, ( IflatMapPFunctionP<T,L,U> ) func);
399 bulkFlatMap(dest, ( IbulkFlatMapPFunctionP<T,L,U> ) func);
406 template <
typename T>
407 template <
typename L,
typename U>
411 map(dest, (IPmapPFunctionP<T,L,U>) func);
415 bulkMap(dest, ( IPbulkMapPFunctionP<T,L,U> ) func);
419 flatMap(dest, ( IPflatMapPFunctionP<T,L,U> ) func);
423 bulkFlatMap(dest, ( IPbulkFlatMapPFunctionP<T,L,U> ) func);
429 template <
typename T>
431 std::pair<T*,size_t> r;
434 r = reduce(( PreducePFunctionP<T> ) func);
438 r = bulkReduce(( PbulkReducePFunctionP<T> ) func);
442 buffer.write(r.first, r.second);
447 template <
typename T>
449 switch (dest->getType()){
451 case Char: _apply<char>(func, op, dest);
break;
452 case Int: _apply<int>(func, op, dest);
break;
453 case LongInt: _apply<long int>(func, op, dest);
break;
454 case Float: _apply<float>(func, op, dest);
break;
455 case Double: _apply<double>(func, op, dest);
break;
456 case CharP: _applyP<char *>(func, op, dest);
break;
457 case IntP: _applyP<int *>(func, op, dest);
break;
458 case LongIntP: _applyP<long int *>(func, op, dest);
break;
459 case FloatP: _applyP<float *>(func, op, dest);
break;
460 case DoubleP: _applyP<double *>(func, op, dest);
break;
461 case String: _apply<std::string>(func, op, dest);
break;
463 case CharV: _apply<std::vector<char>>(func, op, dest);
break;
464 case IntV: _apply<std::vector<int>>(func, op, dest);
break;
465 case LongIntV: _apply<std::vector<long int>>(func, op, dest);
break;
466 case FloatV: _apply<std::vector<float>>(func, op, dest);
break;
467 case DoubleV: _apply<std::vector<double>>(func, op, dest);
break;
471 template <
typename T>
472 template <
typename L>
474 switch (dest->getType()){
476 case Char: _applyI<L, char> (func, op, dest);
break;
477 case Int: _applyI<L, int> (func, op, dest);
break;
478 case LongInt: _applyI<L, long int> (func, op, dest);
break;
479 case Float: _applyI<L, float> (func, op, dest);
break;
480 case Double: _applyI<L, double> (func, op, dest);
break;
481 case CharP: _applyIP<L, char *> (func, op, dest);
break;
482 case IntP: _applyIP<L, int *> (func, op, dest);
break;
483 case LongIntP: _applyIP<L, long int *> (func, op, dest);
break;
484 case FloatP: _applyIP<L, float *> (func, op, dest);
break;
485 case DoubleP: _applyIP<L, double *> (func, op, dest);
break;
486 case String: _applyI<L, std::string> (func, op, dest);
break;
488 case CharV: _applyI<L, std::vector<char>> (func, op, dest);
break;
489 case IntV: _applyI<L, std::vector<int>> (func, op, dest);
break;
490 case LongIntV: _applyI<L, std::vector<long int>>(func, op, dest);
break;
491 case FloatV: _applyI<L, std::vector<float>> (func, op, dest);
break;
492 case DoubleV: _applyI<L, std::vector<double>> (func, op, dest);
break;
498 template <
typename T>
500 if (op & OP_GENERICREDUCE){
501 _applyReduce(func, op, buffer);
503 switch (dest->getKeyType()){
504 case Null: _preApply(func, op, dest);
break;
505 case Char: _preApplyI<char>(func, op, dest);
break;
506 case Int: _preApplyI<int>(func, op, dest);
break;
507 case LongInt: _preApplyI<long int>(func, op, dest);
break;
508 case Float: _preApplyI<float>(func, op, dest);
break;
509 case Double: _preApplyI<double>(func, op, dest);
break;
510 case String: _preApplyI<std::string>(func, op, dest);
break;
515 template <
typename T>
517 this->localData->setData( data, lineSizes, size);
520 template <
typename T>
522 this->localData->setDataRaw( data, lineSizes, size);
526 template <
typename T>
528 return this->localData->getLineSizes();
531 template <
typename T>
533 this->localData->insert((T*&)in, s);
535 template <
typename T>
537 insert( *(std::deque<std::pair<T*, size_t>>*) in);
540 template <
typename T>
542 this->localData->insert(in, s);
544 template <
typename T>
546 typename std::deque< std::pair<T*, size_t> >::iterator it;
548 if (this->localData->getSize() < in.size())
549 this->localData->grow(in.size());
551 for ( it = in.begin(); it != in.end(); it++)
552 this->localData->insert(it->first, it->second);
555 template <
typename T>
557 comm->sendFDDDataCollect(this->
id, this->localData->getData(), this->localData->getLineSizes(), this->localData->getSize());
unsigned int fddOpType
Dataset operation type.