1 #ifndef LIBFASTER_INDEXEDFDD_H
2 #define LIBFASTER_INDEXEDFDD_H
11 #include "definitions.h"
13 #include "fastContext.h"
23 template <
typename K,
typename T>
28 template <
typename K,
typename T>
29 class iFddCore :
public fddBase{
33 std::shared_ptr<std::unordered_map<K, int>> keyMap;
34 fastContext * context;
37 _kType = decodeType(
typeid(K).hash_code());
43 iFddCore(fastContext &c) {
44 _kType = decodeType(
typeid(K).hash_code());
51 iFddCore(fastContext &c,
size_t s,
const std::vector<size_t> & dataAlloc) {
52 _kType = decodeType(
typeid(K).hash_code());
57 this->dataAlloc = dataAlloc;
63 std::shared_ptr<std::unordered_map<K, int>> calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> count);
66 fddBase * _map(
void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start);
68 fdd<U> * map(
void * funcP, fddOpType op);
69 template <
typename L,
typename U>
70 indexedFdd<L,U> * mapI(
void * funcP, fddOpType op);
78 groupedFdd<K> * cogroup(iFddCore<K,U> * fdd1){
79 auto start = system_clock::now();
83 return new groupedFdd<K>(context,
this, fdd1, keyMap, start);
86 template<
typename U,
typename V>
87 groupedFdd<K> * cogroup(iFddCore<K,U> * fdd1, iFddCore<K,V> * fdd2){
88 auto start = system_clock::now();
92 return new groupedFdd<K>(context,
this, fdd1, fdd2, keyMap, start);
95 std::unordered_map<K, size_t> countByKey();
97 indexedFdd<K,T> * groupByKey();
101 context->discardFDD(
id);
102 this->keyMap.reset();
105 void * getKeyMap(
void) {
106 return &this->keyMap;
108 void setKeyMap(
void * keyMap) {
109 this->keyMap = * ( std::shared_ptr<std::unordered_map<K, int>> * ) keyMap;
111 bool isGroupedByKey() {
114 void setGroupedByKey(
bool gbk) {
120 template <
typename K,
typename T>
121 class indexedFdd :
public iFddCore<K,T>{
123 std::pair <K,T> finishReduces(
char ** partResult,
size_t * pSize,
int funcId, fddOpType op);
124 std::pair <K,T> reduce(
void * funcP, fddOpType op);
132 indexedFdd(fastContext &c) : iFddCore<K,T>(c){
133 this->_tType = decodeType(
typeid(T).hash_code());
134 this->
id = c.createIFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code());
138 indexedFdd(fastContext &c,
size_t s,
const std::vector<size_t> & dataAlloc) : iFddCore<K,T>(c, s, dataAlloc){
139 this->_tType = decodeType(
typeid(T).hash_code());
140 this->
id = c.createIFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code(), dataAlloc);
142 indexedFdd(fastContext &c,
size_t s) : indexedFdd(c, s, c.getAllocation(s)) { }
145 indexedFdd(fastContext &c, K * keys, T * data,
size_t size) : indexedFdd(c, size){
146 c.parallelizeI(this->
id, keys, data, size);
156 template <
typename L,
typename U>
157 indexedFdd<L,U> * map( ImapIFunctionP<K,T,L,U> funcP ){
158 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_Map);
160 template <
typename L,
typename U>
161 indexedFdd<L,U> * map( IPmapIFunctionP<K,T,L,U> funcP ){
162 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_Map);
164 template <
typename L,
typename U>
165 fdd<U> * map( mapIFunctionP<K,T,U> funcP ){
166 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_Map);
168 template <
typename L,
typename U>
169 fdd<U> * map( PmapIFunctionP<K,T,U> funcP ){
170 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_Map);
174 template <
typename L,
typename U>
175 indexedFdd<L,U> * mapByKey( ImapByKeyIFunctionP<K,T,L,U> funcP ){
176 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_MapByKey);
178 template <
typename L,
typename U>
179 indexedFdd<L,U> * mapByKey( IPmapByKeyIFunctionP<K,T,L,U> funcP ){
180 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_MapByKey);
182 template <
typename L,
typename U>
183 fdd<U> * mapByKey( mapByKeyIFunctionP<K,T,U> funcP ){
184 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_MapByKey);
186 template <
typename L,
typename U>
187 fdd<U> * mapByKey( PmapByKeyIFunctionP<K,T,U> funcP ){
188 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_MapByKey);
193 template <
typename L,
typename U>
194 indexedFdd<L,U> * bulkMap( IbulkMapIFunctionP<K,T,L,U> funcP ){
195 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_BulkMap);
197 template <
typename L,
typename U>
198 indexedFdd<L,U> * bulkMap( IPbulkMapIFunctionP<K,T,L,U> funcP ){
199 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_BulkMap);
201 template <
typename L,
typename U>
202 fdd<U> * bulkMap( bulkMapIFunctionP<K,T,U> funcP ){
203 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_BulkMap);
205 template <
typename L,
typename U>
206 fdd<U> * bulkMap( PbulkMapIFunctionP<K,T,U> funcP ){
207 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_BulkMap);
212 template <
typename L,
typename U>
213 indexedFdd<L,U> * flatMap( IflatMapIFunctionP<K,T,L,U> funcP ){
214 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_FlatMap);
216 template <
typename L,
typename U>
217 indexedFdd<L,U> * flatMap( IPflatMapIFunctionP<K,T,L,U> funcP ){
218 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_FlatMap);
220 template <
typename L,
typename U>
221 fdd<U> * flatMap( flatMapIFunctionP<K,T,U> funcP ){
222 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_FlatMap);
224 template <
typename L,
typename U>
225 fdd<U> * flatMap( PflatMapIFunctionP<K,T,U> funcP ){
226 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_FlatMap);
230 template <
typename L,
typename U>
231 indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIFunctionP<K,T,L,U> funcP ){
232 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_BulkFlatMap);
234 template <
typename L,
typename U>
235 indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIFunctionP<K,T,L,U> funcP ){
236 return iFddCore<K,T>::template mapI<L,U>((
void*) funcP, OP_BulkFlatMap);
238 template <
typename L,
typename U>
239 fdd<U> * bulkFlatMap( bulkFlatMapIFunctionP<K,T,U> funcP ){
240 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_BulkFlatMap);
242 template <
typename L,
typename U>
243 fdd<U> * bulkFlatMap( PbulkFlatMapIFunctionP<K,T,U> funcP ){
244 return iFddCore<K,T>::template map<U>((
void*) funcP, OP_BulkFlatMap);
250 std::pair<K,T> reduce( IreduceIFunctionP<K,T> funcP ){
251 return reduce((
void*) funcP, OP_Reduce);
256 std::pair<K,T> bulkReduce( IbulkReduceIFunctionP<K,T> funcP ){
257 return reduce((
void*) funcP, OP_BulkReduce);
262 std::vector<std::pair<K,T>> collect( ){
264 std::vector<std::pair<K,T>> data(this->size);
265 this->context->collectFDD(data,
this);
269 indexedFdd<K,T> * cache(){
275 template <
typename K,
typename T>
278 std::tuple <K,T,size_t> finishReducesP(
char ** partResult,
size_t * pSize,
int funcId, fddOpType op);
279 std::tuple <K,T,size_t> reduceP(
void * funcP, fddOpType op);
286 this->_tType = POINTER | decodeType(
typeid(T).hash_code());
287 this->
id = c.createIPFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code());
292 this->_tType = POINTER | decodeType(
typeid(T).hash_code());
293 this->
id = c.createIPFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code(), c.getAllocation(s));
300 c.parallelizeI(this->
id, keys, data, dataSizes, size);
313 template <
typename L,
typename U>
317 template <
typename L,
typename U>
321 template <
typename L,
typename U>
322 fdd<U> * map( mapIPFunctionP<K,T,U> funcP ){
325 template <
typename L,
typename U>
326 fdd<U> * map( PmapIPFunctionP<K,T,U> funcP ){
331 template <
typename L,
typename U>
335 template <
typename L,
typename U>
339 template <
typename L,
typename U>
340 fdd<U> * mapByKey( mapByKeyIPFunctionP<K,T,U> funcP ){
343 template <
typename L,
typename U>
344 fdd<U> * mapByKey( PmapByKeyIPFunctionP<K,T,U> funcP ){
349 template <
typename L,
typename U>
353 template <
typename L,
typename U>
357 template <
typename L,
typename U>
358 fdd<U> * bulkMap( bulkMapIPFunctionP<K,T,U> funcP ){
361 template <
typename L,
typename U>
362 fdd<U> * bulkMap( PbulkMapIPFunctionP<K,T,U> funcP ){
367 template <
typename L,
typename U>
371 template <
typename L,
typename U>
375 template <
typename L,
typename U>
376 fdd<U> * flatMap( flatMapIPFunctionP<K,T,U> funcP){
379 template <
typename L,
typename U>
380 fdd<U> * flatMap( PflatMapIPFunctionP<K,T,U> funcP){
385 template <
typename L,
typename U>
386 indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIPFunctionP<K,T,L,U> funcP){
389 template <
typename L,
typename U>
390 indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIPFunctionP<K,T,L,U> funcP){
393 template <
typename L,
typename U>
394 fdd<U> * bulkFlatMap( bulkFlatMapIPFunctionP<K,T,U> funcP){
397 template <
typename L,
typename U>
398 fdd<U> * bulkFlatMap( PbulkFlatMapIPFunctionP<K,T,U> funcP){
405 inline std::vector<std::pair<K,T>> reduce(IPreduceIPFunctionP<K,T> funcP ){
406 return reduce((
void*) funcP, OP_Reduce);
411 inline std::vector<std::pair<K,T>> bulkReduce(IPbulkReduceIPFunctionP<K,T> funcP ){
412 return reduce((
void*) funcP, OP_BulkReduce);
417 std::vector<std::tuple<K,T*, size_t>> collect( ) {
418 std::vector<std::tuple<K,T*, size_t>> data(this->size);
419 this->context->collectFDD(data,
this);
431 template <
typename K,
typename T>
434 unsigned long int tid, sid;
435 unsigned long int newFddId = newFdd->getId();
438 int funcId = context->findFunc(funcP);
441 context->enqueueTask(op,
id, newFddId, funcId, this->size);
444 auto result = context->recvTaskResult(tid, sid, start);
446 if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ) {
449 for (
int i = 1; i < context->numProcs(); ++i){
450 if (result[i].second > 0) newSize += * (
size_t*) result[i].first;
453 newFdd->setSize(newSize);
462 template <
typename K,
typename T>
463 template <
typename L,
typename U>
464 indexedFdd<L,U> * iFddCore<K,T>::mapI(
void * funcP, fddOpType op){
465 indexedFdd<L,U> * newFdd;
466 auto start = system_clock::now();
468 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
469 newFdd =
new indexedFdd<L,U>(*context);
471 newFdd =
new indexedFdd<L,U>(*context, size, dataAlloc);
474 return (indexedFdd<L,U> *) _map(funcP, op, newFdd, start);
477 template <
typename K,
typename T>
478 template <
typename U>
479 fdd<U> * iFddCore<K,T>::map(
void * funcP, fddOpType op){
481 auto start = system_clock::now();
483 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
484 newFdd =
new fdd<U>(*context);
486 newFdd =
new fdd<U>(*context, size, dataAlloc);
489 return (fdd<U> *) _map(funcP, op, newFdd, start);
492 template <
typename K,
typename T>
493 std::unordered_map<K,size_t> iFddCore<K,T>::countByKey(){
495 fastCommBuffer decoder(0);
496 unsigned long int tid, sid;
497 std::unordered_map<K,size_t> count;
499 auto start = system_clock::now();
500 context->enqueueTask(OP_CountByKey,
id, this->size);
502 auto result = context->recvTaskResult(tid, sid, start);
504 for (
int i = 1; i < context->numProcs(); ++i){
507 size_t kCount, numKeys;
508 decoder.setBuffer(result[i].first, result[i].second);
511 for (
size_t i = 0; i < numKeys; ++i ) {
512 decoder >> key >> kCount;
518 count[key] += kCount;
526 template <
typename K,
typename T>
527 std::shared_ptr<std::unordered_map<K, int>> iFddCore<K,T>::calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> count){
528 size_t size = this->size;
529 std::shared_ptr<std::unordered_map<K, int>> kMap = std::make_shared<std::unordered_map<K, int>>();
530 std::unordered_map<K, bool> done;
531 size_t numProcs = context->numProcs();
532 std::vector<size_t> keyAlloc(numProcs,0);
533 std::vector<size_t> procBudget = context->getAllocation(size);
535 kMap->reserve(count.size());
542 for (
auto it = count.begin(); it != count.end(); it++){
544 size_t kCount = std::get<0>(it->second);
545 int preffered = std::get<1>(it->second);
547 if(keyAlloc[preffered] < procBudget[preffered]){
548 (*kMap)[key] = preffered;
549 keyAlloc [preffered] += kCount;
558 for (
auto it = count.begin(); it != count.end(); it++){
561 size_t kCount = std::get<0>(it->second);
562 int preffered = 1 + rand() % (numProcs - 1);
564 while(keyAlloc[preffered] >= procBudget[preffered]){
565 preffered = 1 + rand() % (numProcs - 1);
567 (*kMap)[key] = preffered;
568 keyAlloc [preffered] += kCount;
585 template <
typename K,
typename T>
586 indexedFdd<K,T> * iFddCore<K,T>::groupByKey(){
587 fastCommBuffer decoder(0);
588 unsigned long int tid, sid;
593 auto * count =
new std::unordered_map< K, std::tuple<size_t, int, size_t> >();
594 count->reserve(this->size);
596 auto start = system_clock::now();
597 context->enqueueTask(OP_CountByKey,
id, this->size);
599 auto result = context->recvTaskResult(tid, sid, start);
602 for (
int i = 1; i < context->numProcs(); ++i){
604 size_t kCount, numKeys;
606 if (result[i].second == 0)
continue;
608 decoder.setBuffer(result[i].first, result[i].second);
611 for (
size_t i = 0; i < numKeys; ++i ) {
613 decoder >> key >> kCount;
614 auto it = count->find(key);
616 if (it != count->end()){
618 int &owner = std::get<1>(it->second);
619 size_t &ownerCount = std::get<2>(it->second);
621 std::get<0>(it->second) += kCount;
624 if (kCount > ownerCount){
630 (*count)[key] = std::make_tuple(kCount, sid, kCount);
634 this->keyMap = calculateKeyMap(*count);
638 unsigned long int tid = context->enqueueTask(OP_GroupByKey,
id, this->size);
639 context->sendKeyMap(tid, *keyMap);
641 result = context->recvTaskResult(tid, sid, start);
645 return (indexedFdd<K,T> *)
this;
648 template <
typename K,
typename T>
649 std::pair <K,T> indexedFdd<K,T>::finishReduces(
char ** partResult,
size_t * pSize,
int funcId, fddOpType op){
650 std::pair <K,T> result;
652 if (op == OP_Reduce){
653 IreduceIFunctionP<K, T> reduceFunc = (IreduceIFunctionP<K, T>) this->context->funcTable[funcId];
654 fastCommBuffer buffer(0);
657 buffer.setBuffer(partResult[0], pSize[0]);
660 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
664 buffer.setBuffer(partResult[i], pSize[i]);
667 result = reduceFunc(result.first, result.second, pr.first, pr.second);
670 IbulkReduceIFunctionP<K, T> bulkReduceFunc = (IbulkReduceIFunctionP<K, T>) this->context->funcTable[funcId];
671 T * vals =
new T[this->context->numProcs() - 1];
672 K * keys =
new K[this->context->numProcs() - 1];
675 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
676 fastCommBuffer buffer(0);
679 buffer.setBuffer(partResult[i], pSize[i]);
686 result = bulkReduceFunc(keys, vals, this->context->numProcs() - 1);
696 template <
typename K,
typename T>
697 std::pair <K,T> indexedFdd<K,T>::reduce(
void * funcP, fddOpType op){
699 std::pair <K,T> result;
700 int funcId = this->context->findFunc(funcP);
701 char ** partResult =
new char*[this->context->numProcs() - 1];
702 size_t * rSize =
new size_t[this->context->numProcs() - 1];
703 unsigned long int tid, sid;
706 auto start = system_clock::now();
707 unsigned long int reduceTaskId UNUSED = this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
710 auto resultV = this->context->recvTaskResult(tid, sid, start);
712 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
713 partResult[i] = (
char*) resultV[i + 1].first;
714 rSize[i] = resultV[i + 1].second;
718 result = finishReduces(partResult, rSize, funcId, op);
720 delete [] partResult;
732 template <
typename K,
typename T>
733 std::tuple <K,T,size_t> indexedFdd<K,T*>::finishReducesP(
char ** partResult,
size_t * pSize,
int funcId, fddOpType op){
734 std::tuple <K,T,size_t> result;
736 if (op == OP_Reduce){
737 IPreduceIPFunctionP<K,T> reduceFunc = (IreduceIFunctionP<K,T>) this->context->funcTable[funcId];
738 fastCommBuffer buffer(0);
740 buffer.setBuffer(partResult[0], pSize[0]);
744 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
745 std::tuple <K,T,size_t> pr;
747 buffer.setBuffer(partResult[i], pSize[i]);
759 IPbulkReduceIPFunctionP<K,T> bulkReduceFunc = (IPbulkReduceIPFunctionP<K,T>) this->context->funcTable[funcId];
760 T * vals =
new T[this->context->numProcs() - 1];
761 K * keys =
new K[this->context->numProcs() - 1];
762 size_t * sizes =
new size_t[this->context->numProcs() - 1];
765 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
766 fastCommBuffer buffer(0);
767 std::tuple <K,T,size_t> pr;
769 buffer.setBuffer(partResult[i], pSize[i]);
772 std::tie (keys[i], vals[i], sizes[i]) = pr;
775 result = bulkReduceFunc(keys, vals, sizes, this->context->numProcs() - 1);
782 template <
typename K,
typename T>
783 std::tuple <K,T,size_t> indexedFdd<K,T*>::reduceP(
void * funcP, fddOpType op){
784 auto start = system_clock::now();
786 std::tuple <K,T,size_t> result;
787 unsigned long int tid, sid;
788 int funcId = this->context->findFunc(funcP);
789 char ** partResult =
new char *[this->context->numProcs() - 1];
790 size_t * partrSize =
new size_t[this->context->numProcs() - 1];
793 unsigned long int reduceTaskId = this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
796 auto resultV = this->context->recvTaskResult(tid, sid, start);
798 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
799 partResult[i] = (T*) resultV[i+1].first;
800 partrSize[i] = resultV[i+1].second /=
sizeof(T);
804 result = finishReducesP(partResult, partrSize, funcId, op);
806 delete [] partResult;
Definition: indexedFdd.h:276
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: fastContext.h:23
Definition: _workerFdd.h:11