6 #include "workerFddGroup.h" 9 #include "indexedFddStorageExtern.cpp" 12 using std::chrono::system_clock;
13 using std::chrono::duration_cast;
14 using std::chrono::milliseconds;
19 this->members = members;
28 char * data = (
char*) wfdd->getData();
29 K * keys = (K *) wfdd->getKeys();
30 size_t fddSize = wfdd->getSize();
31 int baseSize = wfdd->baseSize();
33 std::unordered_map<K, int> keyCount(wfdd->getSize()*1.2);
34 for (
size_t i = 0; i < fddSize; ++i){
41 std::unordered_map<K, std::vector<void*>> keyLocations(keyCount.size()*1.2);
42 for (
auto it = keyCount.cbegin(); it != keyCount.end(); ++it ){
43 keyLocations[it->first].reserve(it->second);
50 for (
size_t i = 0; i < fddSize; ++i){
52 void * d = &data[baseSize * i];
53 auto locs = keyLocations.find(key);
54 if (locs != keyLocations.end()){
55 locs->second.insert(locs->second.end(), d);
62 std::vector<std::vector<void*>> keyLocationsV(uKeys->size());
66 for (
size_t i = 0; i < uKeys->size() ; ++i){
67 auto l = keyLocations.find((*uKeys)[i]);
69 if ( l == keyLocations.end() ){
70 keyLocationsV[i] = {};
74 keyLocationsV[i] = l->second;
87 std::vector< std::vector<void*> > * location;
89 location = members[i]->getKeyLocations();
91 if ( location->size() == 0 ){
93 (*location) = findKeyInterval(i);
107 template <
typename K>
112 std::vector< std::vector<void*> > * keyLocations[3];
114 for (
size_t i = 0; i < members.size(); ++i){
115 keyLocations[i] = getMemberKeyLocations(i);
119 if ( members.size() < 3 ){
120 #pragma omp parallel for 121 for (
size_t i = 0; i < uKeys->size(); ++i){
122 ((updateByKeyG2FunctionP<K>) func)
124 (*keyLocations[0])[i],
125 (*keyLocations[1])[i]);
128 #pragma omp parallel for 129 for (
size_t i = 0; i < uKeys->size(); ++i){
130 ((updateByKeyG3FunctionP<K>) func)
132 (*keyLocations[0])[i],
133 (*keyLocations[1])[i],
134 (*keyLocations[2])[i] );
139 template <
typename K>
141 if ( members.size() < 3 ){
142 ((bulkUpdateG2FunctionP<K>) func)
144 (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
145 (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize()
148 ((bulkUpdateG3FunctionP<K>) func)
150 (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
151 (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize(),
152 (K*) members[2]->getKeys(), members[2]->getData(), members[2]->getSize()
158 template <
typename K>
160 template <
typename U>
163 std::vector< std::vector<void*> > * keyLocations[3];
167 for (
size_t i = 0; i < members.size(); ++i){
168 keyLocations[i] = getMemberKeyLocations(i);
170 dest->setSize(uKeys->size());
171 U * od = (U*) dest->getData();
173 if ( members.size() < 3 ){
174 #pragma omp parallel for 175 for (
size_t i = 0; i < uKeys->size(); ++i){
176 od[i] = ((mapByKeyG2FunctionP<K,U>) func)
178 (*keyLocations[0])[i],
179 (*keyLocations[1])[i]);
183 #pragma omp parallel for 184 for (
size_t i = 0; i < uKeys->size(); ++i){
185 od[i] = ((mapByKeyG3FunctionP<K,U>) func)
187 (*keyLocations[0])[i],
188 (*keyLocations[1])[i],
189 (*keyLocations[2])[i] );
196 template <
typename K>
198 template <
typename L,
typename U>
202 std::vector< std::vector<void*> > * keyLocations[3];
206 for (
size_t i = 0; i < members.size(); ++i){
207 keyLocations[i] = getMemberKeyLocations(i);
212 dest->setSize(uKeys->size());
213 L * ok = (L*) dest->getKeys();
214 U * od = (U*) dest->getData();
218 if ( members.size() < 3 ){
219 #pragma omp parallel for 220 for (
size_t i = 0; i < uKeys->size(); ++i){
221 std::pair<L,U> r = ( (ImapByKeyG2FunctionP<K,L,U>) func)
223 (*keyLocations[0])[i],
224 (*keyLocations[1])[i] );
230 #pragma omp parallel for 231 for (
size_t i = 0; i < uKeys->size(); ++i){
232 std::pair<L,U> r = ( (ImapByKeyG3FunctionP<K,L,U>) func )
234 (*keyLocations[0])[i],
235 (*keyLocations[1])[i],
236 (*keyLocations[2])[i] );
247 template <
typename K>
249 template <
typename U>
252 std::vector< std::vector<void*> > * keyLocations[3];
253 std::deque<U> resultList;
255 for (
size_t i = 0; i < members.size(); ++i){
256 keyLocations[i] = getMemberKeyLocations(i);
259 if ( members.size() < 3 ){
262 std::deque<U> pResultList;
265 for (
size_t i = 0; i < uKeys->size(); ++i){
266 std::deque<U> r = ((flatMapByKeyG2FunctionP<K,U>) func)
268 (*keyLocations[0])[i],
269 (*keyLocations[1])[i] );
273 pResultList.insert(pResultList.end(), r.begin(), r.end());
277 resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
282 std::deque<U> pResultList;
285 for (
size_t i = 0; i < uKeys->size(); ++i){
286 std::deque<U> r = ((flatMapByKeyG3FunctionP<K,U>) func)
288 (*keyLocations[0])[i],
289 (*keyLocations[1])[i],
290 (*keyLocations[2])[i] );
294 pResultList.insert(pResultList.end(), r.begin(), r.end());
299 resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
302 dest->insertl(&resultList);
306 template <
typename K>
308 template <
typename L,
typename U>
311 std::vector< std::vector<void*> > * keyLocations[3];
312 std::deque<std::pair<L,U>> resultList;
318 for (
size_t i = 0; i < members.size(); ++i){
319 keyLocations[i] = getMemberKeyLocations(i);
322 if ( members.size() <3 ){
325 std::deque<std::pair<L,U>> pResultList;
328 for (
size_t i = 0; i < uKeys->size(); ++i){
329 std::deque<std::pair<L,U>> r = ( (IflatMapByKeyG2FunctionP<K,L,U>) func)
331 (*keyLocations[0])[i],
332 (*keyLocations[1])[i] );
336 pResultList.insert(pResultList.end(), r.begin(), r.end());
340 resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
345 std::deque<std::pair<L,U>> pResultList;
348 for (
size_t i = 0; i < uKeys->size(); ++i){
349 std::deque<std::pair<L,U>> r = ( (IflatMapByKeyG3FunctionP<K,L,U>) func )
351 (*keyLocations[0])[i],
352 (*keyLocations[1])[i],
353 (*keyLocations[2])[i] );
357 pResultList.insert(pResultList.end(), r.begin(), r.end());
361 resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
365 dest->insertl(&resultList);
371 template <
typename K>
372 template <
typename U>
374 std::deque<U> resultList;
376 if ( members.size() < 3 ){
377 resultList = ( (bulkFlatMapG2FunctionP<K,U>) func )
379 (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
380 (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize()
383 resultList = ( (bulkFlatMapG3FunctionP<K,U>) func )
385 (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
386 (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize(),
387 (K*) members[2]->getKeys(), members[2]->getData(), members[2]->getSize()
391 dest->insertl(&resultList);
394 template <
typename K>
395 template <
typename L,
typename U>
397 std::deque<std::pair<L,U>> resultList;
399 if ( members.size() < 3 ){
400 resultList = ( (IbulkFlatMapG2FunctionP<K,L,U>) func )
402 (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
403 (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize()
406 resultList = ( (IbulkFlatMapG3FunctionP<K,L,U>) func )
408 (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
409 (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize(),
410 (K*) members[2]->getKeys(), members[2]->getData(), members[2]->getSize()
414 dest->insertl(&resultList);
418 template <
typename K>
420 template <
typename U>
425 mapByKey<U>(dest, func);
427 case OP_FlatMapByKey:
429 flatMapByKey<U>(dest, func);
433 bulkFlatMap<U>(dest, func);
438 template <
typename K>
440 template <
typename L,
typename U>
445 mapByKeyI<L,U>(dest, func);
447 case OP_FlatMapByKey:
449 flatMapByKeyI<L,U>(dest, func);
453 bulkFlatMapI<L,U>(dest, func);
458 template <
typename K>
462 size_t rSize =
sizeof(r);
483 if (op & OP_GENERICREDUCE) buffer.write(r,rSize);
486 template <
typename K>
488 switch (dest->getType()){
489 case Char: _apply<char>(func, op, dest);
break;
490 case Int: _apply<int>(func, op, dest);
break;
491 case LongInt: _apply<long int>(func, op, dest);
break;
492 case Float: _apply<float>(func, op, dest);
break;
493 case Double: _apply<double>(func, op, dest);
break;
494 case String: _apply<std::string>(func, op, dest);
break;
495 case CharV: _apply<std::vector<char>>(func, op, dest);
break;
496 case IntV: _apply<std::vector<int>>(func, op, dest);
break;
497 case LongIntV: _apply<std::vector<long int>>(func, op, dest);
break;
498 case FloatV: _apply<std::vector<float>>(func, op, dest);
break;
499 case DoubleV: _apply<std::vector<double>>(func, op, dest);
break;
503 template <
typename K>
504 template <
typename L>
506 switch (dest->getType()){
508 case Char: _applyI<L, char> (func, op, dest);
break;
509 case Int: _applyI<L, int> (func, op, dest);
break;
510 case LongInt: _applyI<L, long int> (func, op, dest);
break;
511 case Float: _applyI<L, float> (func, op, dest);
break;
512 case Double: _applyI<L, double> (func, op, dest);
break;
513 case String: _applyI<L, std::string> (func, op, dest);
break;
514 case CharV: _applyI<L, std::vector<char>> (func, op, dest);
break;
515 case IntV: _applyI<L, std::vector<int>> (func, op, dest);
break;
516 case LongIntV: _applyI<L, std::vector<long int>>(func, op, dest);
break;
517 case FloatV: _applyI<L, std::vector<float>> (func, op, dest);
break;
518 case DoubleV: _applyI<L, std::vector<double>> (func, op, dest);
break;
523 template <
typename K>
525 if (op & OP_GENERICMAP){
526 switch (dest->getKeyType()){
527 case Null: _preApply(func, op, dest);
break;
528 case Char: _preApplyI<char>(func, op, dest);
break;
529 case Int: _preApplyI<int>(func, op, dest);
break;
530 case LongInt: _preApplyI<long int>(func, op, dest);
break;
531 case Float: _preApplyI<float>(func, op, dest);
break;
532 case Double: _preApplyI<double>(func, op, dest);
break;
533 case String: _preApplyI<std::string>(func, op, dest);
break;
536 _applyReduce(func, op, buffer);
540 template <
typename K>
544 fastCommBuffer &buffer = comm->getResultBuffer();
552 this->uKeys = *(std::shared_ptr<std::vector<K>>*) members[0]->getUKeys();
553 this->keyMap = *(std::shared_ptr<std::unordered_map<K, int>>*) members[0]->getKeyMap();
570 for (
size_t i = 1; i < members.size(); ++i){
571 auto memberUKeys = *(std::shared_ptr<std::vector<K>>*) members[i]->getUKeys();
572 if ( memberUKeys != uKeys ){
574 members[i]->setUKeys( & this->uKeys );
575 members[i]->setKeyMap( & this->keyMap );
577 members[i]->exchangeDataByKey(comm);
582 buffer << members[i]->getSize();
588 template <
typename K>
590 using std::chrono::system_clock;
591 using std::chrono::duration_cast;
592 using std::chrono::milliseconds;
594 fastCommBuffer &buffer = comm->getResultBuffer();
606 durationP = buffer.size();
607 buffer.advance(
sizeof(
size_t));
609 rStatP = buffer.size();
612 rSizeP = buffer.size();
613 buffer.advance(
sizeof(
size_t));
615 headerSize = buffer.size();
617 auto start = system_clock::now();
618 if (op & (OP_GENERICMAP | OP_GENERICREDUCE | OP_GENERICUPDATE)){
619 apply(func, op, dest, buffer);
622 if (dest) buffer << size_t(dest->getSize());
624 if (op == OP_CoGroup){
629 auto end = system_clock::now();
630 auto duration = duration_cast<milliseconds>(end - start);
633 buffer.writePos(
size_t(duration.count()), durationP);
634 buffer.writePos(getProcStat(), rStatP);
635 buffer.writePos(
size_t(buffer.size() - headerSize), rSizeP);
637 comm->sendTaskResult();
unsigned int fddOpType
Dataset operation type.