1 #ifndef LIBFASTER_INDEXEDFDD_H 2 #define LIBFASTER_INDEXEDFDD_H 11 #include "definitions.h" 13 #include "fastContext.h" 23 template <
typename K,
typename T>
28 template <
typename K,
typename T>
29 class iFddCore :
public fddBase{
34 fastContext * context;
37 _kType = decodeType(
typeid(K).hash_code());
44 iFddCore(fastContext &c) {
45 _kType = decodeType(
typeid(K).hash_code());
53 iFddCore(fastContext &c,
size_t s,
const std::vector<size_t> & dataAlloc) {
54 _kType = decodeType(
typeid(K).hash_code());
60 this->dataAlloc = dataAlloc;
66 std::unordered_map<K, std::tuple<size_t, int, size_t>> * calculateKeyCount(std::vector< std::pair<void *, size_t> > & result);
67 std::unordered_map<K, int> calculateKeyMap(std::unordered_map<K, std::tuple<size_t, int, size_t>> & count);
71 fddBase * _map(
void * funcP,
fddOpType op, fddBase * newFdd, system_clock::time_point & start);
73 fdd<U> * map(
void * funcP,
fddOpType op);
74 template <
typename L,
typename U>
75 indexedFdd<L,U> * mapI(
void * funcP,
fddOpType op);
77 indexedFdd<K,T> * groupByKeyMapped();
78 indexedFdd<K,T> * groupByKeyHashed();
98 auto start = system_clock::now();
113 template<
typename U,
typename V>
117 auto start = system_clock::now();
137 context->discardFDD(
id);
147 void writeToFile(std::string path, std::string sufix);
166 template <
typename K,
typename T>
169 std::pair <K,T> finishReduces(
char ** partResult,
size_t * pSize,
int funcId,
fddOpType op);
170 std::pair <K,T> reduce(
void * funcP,
fddOpType op);
179 this->_tType = decodeType(
typeid(T).hash_code());
180 this->
id = c.createIFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code());
185 this->_tType = decodeType(
typeid(T).hash_code());
186 this->
id = c.createIFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code(), dataAlloc);
194 c.parallelizeI(this->
id, keys, data, size);
213 std::vector<std::pair<K,T>> data(this->size);
214 this->context->collectFDD(data,
this);
242 template <
typename L,
typename U>
247 template <
typename L,
typename U>
252 template <
typename U>
257 template <
typename U>
265 template <
typename L,
typename U>
271 template <
typename L,
typename U>
277 template <
typename L,
typename U>
283 template <
typename L,
typename U>
292 template <
typename L,
typename U>
298 template <
typename L,
typename U>
304 template <
typename L,
typename U>
310 template <
typename L,
typename U>
322 template <
typename L,
typename U>
327 template <
typename L,
typename U>
332 template <
typename L,
typename U>
337 template <
typename L,
typename U>
347 template <
typename L,
typename U>
353 template <
typename L,
typename U>
359 template <
typename L,
typename U>
365 template <
typename L,
typename U>
377 std::pair<K,T>
reduce( IreduceIFunctionP<K,T> funcP ){
378 return reduce((
void*) funcP, OP_Reduce);
387 std::pair<K,T>
bulkReduce( IbulkReduceIFunctionP<K,T> funcP ){
388 return reduce((
void*) funcP, OP_BulkReduce);
394 template <
typename K,
typename T>
397 std::tuple <K,T,size_t> finishReducesP(
char ** partResult,
size_t * pSize,
int funcId,
fddOpType op);
398 std::tuple <K,T,size_t> reduceP(
void * funcP,
fddOpType op);
405 this->_tType = POINTER | decodeType(
typeid(T).hash_code());
406 this->
id = c.createIPFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code());
411 this->_tType = POINTER | decodeType(
typeid(T).hash_code());
412 this->
id = c.createIPFDD(
this,
typeid(K).hash_code(),
typeid(T).hash_code(), c.getAllocation(s));
419 c.parallelizeI(this->
id, keys, data, dataSizes, size);
432 template <
typename L,
typename U>
436 template <
typename L,
typename U>
440 template <
typename L,
typename U>
441 fdd<U> * map( mapIPFunctionP<K,T,U> funcP ){
444 template <
typename L,
typename U>
445 fdd<U> * map( PmapIPFunctionP<K,T,U> funcP ){
450 template <
typename L,
typename U>
454 template <
typename L,
typename U>
458 template <
typename L,
typename U>
459 fdd<U> * mapByKey( mapByKeyIPFunctionP<K,T,U> funcP ){
462 template <
typename L,
typename U>
463 fdd<U> * mapByKey( PmapByKeyIPFunctionP<K,T,U> funcP ){
468 template <
typename L,
typename U>
472 template <
typename L,
typename U>
476 template <
typename L,
typename U>
477 fdd<U> * bulkMap( bulkMapIPFunctionP<K,T,U> funcP ){
480 template <
typename L,
typename U>
481 fdd<U> * bulkMap( PbulkMapIPFunctionP<K,T,U> funcP ){
486 template <
typename L,
typename U>
490 template <
typename L,
typename U>
494 template <
typename L,
typename U>
495 fdd<U> * flatMap( flatMapIPFunctionP<K,T,U> funcP){
498 template <
typename L,
typename U>
499 fdd<U> * flatMap( PflatMapIPFunctionP<K,T,U> funcP){
504 template <
typename L,
typename U>
505 indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapIPFunctionP<K,T,L,U> funcP){
508 template <
typename L,
typename U>
509 indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapIPFunctionP<K,T,L,U> funcP){
512 template <
typename L,
typename U>
513 fdd<U> * bulkFlatMap( bulkFlatMapIPFunctionP<K,T,U> funcP){
516 template <
typename L,
typename U>
517 fdd<U> * bulkFlatMap( PbulkFlatMapIPFunctionP<K,T,U> funcP){
524 inline std::vector<std::pair<K,T>> reduce(IPreduceIPFunctionP<K,T> funcP ){
525 return reduce((
void*) funcP, OP_Reduce);
530 inline std::vector<std::pair<K,T>> bulkReduce(IPbulkReduceIPFunctionP<K,T> funcP ){
531 return reduce((
void*) funcP, OP_BulkReduce);
536 std::vector<std::tuple<K,T*, size_t>> collect( ) {
537 std::vector<std::tuple<K,T*, size_t>> data(this->size);
538 this->context->collectFDD(data,
this);
550 template <
typename K,
typename T>
553 unsigned long int tid, sid;
554 auto start = system_clock::now();
557 int funcId = context->findFunc(funcP);
560 context->enqueueTask(op,
id, 0, funcId, this->size);
563 auto result = context->recvTaskResult(tid, sid, start);
571 template <
typename K,
typename T>
574 unsigned long int tid, sid;
575 unsigned long int newFddId = newFdd->
getId();
577 context->setInternal(newFdd->
getId(),
true);
580 int funcId = context->findFunc(funcP);
583 context->enqueueTask(op,
id, newFddId, funcId, this->size);
586 auto result = context->recvTaskResult(tid, sid, start);
588 if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ) {
591 for (
int i = 1; i < context->numProcs(); ++i){
592 if (result[i].second > 0) newSize += * (
size_t*) result[i].first;
595 newFdd->setSize(newSize);
604 template <
typename K,
typename T>
605 template <
typename L,
typename U>
608 auto start = system_clock::now();
610 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
613 if (dataAlloc.empty()) dataAlloc = context->getAllocation(size);
620 template <
typename K,
typename T>
621 template <
typename U>
624 auto start = system_clock::now();
626 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
627 newFdd =
new fdd<U>(*context);
629 if (dataAlloc.empty()) dataAlloc = context->getAllocation(size);
630 newFdd =
new fdd<U>(*context, size, dataAlloc);
633 return (
fdd<U> *) _map(funcP, op, newFdd, start);
636 template <
typename K,
typename T>
640 unsigned long int tid, sid;
641 std::unordered_map<K,size_t> count;
643 auto start = system_clock::now();
644 context->enqueueTask(OP_CountByKey,
id, this->size);
646 auto result = context->recvTaskResult(tid, sid, start);
648 for (
int i = 1; i < context->numProcs(); ++i){
651 size_t kCount, numKeys;
652 decoder.setBuffer(result[i].first, result[i].second);
655 for (
size_t i = 0; i < numKeys; ++i ) {
656 decoder >> key >> kCount;
662 count[key] += kCount;
672 template <
typename K,
typename T>
676 auto * count =
new std::unordered_map< K, std::tuple<size_t, int, size_t> >();
677 count->reserve(this->size);
679 for (
int i = 1; i < context->numProcs(); ++i){
681 size_t kCount, numKeys;
683 if (result[i].second == 0)
continue;
685 decoder.setBuffer(result[i].first, result[i].second);
688 for (
size_t j = 0; j < numKeys; ++j ) {
690 decoder >> key >> kCount;
691 auto it = count->find(key);
693 if (it != count->end()){
695 int &owner = std::get<1>(it->second);
696 size_t &ownerCount = std::get<2>(it->second);
698 std::get<0>(it->second) += kCount;
701 if (kCount > ownerCount){
707 (*count)[key] = std::make_tuple(kCount, i, kCount);
714 template <
typename K,
typename T>
716 size_t size = this->size;
717 std::unordered_map<K, int> kMap(count.size());
718 std::unordered_map<K, bool> done;
719 size_t numProcs = context->numProcs();
720 std::vector<size_t> keyAlloc(numProcs,0);
721 std::vector<size_t> procBudget = context->getAllocation(size);
729 for (
auto it = count.begin(); it != count.end(); it++){
731 size_t kCount = std::get<0>(it->second);
732 int preffered = std::get<1>(it->second);
734 if(keyAlloc[preffered] < procBudget[preffered]){
735 kMap[key] = preffered;
736 keyAlloc [preffered] += kCount;
745 for (
auto it = count.begin(); it != count.end(); it++){
748 size_t kCount = std::get<0>(it->second);
749 int preffered = 1 + rand() % (numProcs - 1);
751 while(keyAlloc[preffered] >= (procBudget[preffered] + 1)){
753 preffered = (preffered + 1) % numProcs;
755 kMap[key] = preffered;
756 keyAlloc [preffered] += kCount;
773 template <
typename K,
typename T>
775 unsigned long int tid, sid;
778 using std::chrono::system_clock;
779 using std::chrono::duration_cast;
780 using std::chrono::milliseconds;
781 std::cerr <<
" GroupByKey ";
782 auto start = system_clock::now();
784 context->enqueueTask(OP_CountByKey,
id, this->size);
786 auto result = context->recvTaskResult(tid, sid, start);
787 std::cerr <<
" CBK:" << duration_cast<milliseconds>(system_clock::now() - start).count();
788 start = system_clock::now();
791 auto * count = calculateKeyCount(result);
792 std::cerr <<
" proc.Keys:" << duration_cast<milliseconds>(system_clock::now() - start).count();
793 auto start2 = system_clock::now();
795 std::unordered_map<K, int> keyMap = calculateKeyMap(*count);
797 std::cerr <<
" calc.KeyMap:" << duration_cast<milliseconds>(system_clock::now() - start2).count();
798 start2 = system_clock::now();
801 unsigned long int tid = context->enqueueTask(OP_GroupByKey,
id, this->size);
802 std::cerr <<
" enq.Task:" << duration_cast<milliseconds>(system_clock::now() - start2).count();
803 start2 = system_clock::now();
804 context->sendKeyMap(tid, keyMap);
806 std::cerr <<
" snd.KeyMap:" << duration_cast<milliseconds>(system_clock::now() - start2).count();
808 dataAlloc.resize(context->numProcs());
809 result = context->recvTaskResult(tid, sid, start);
811 for (
int i = 1; i < context->numProcs(); ++i){
812 if (result[i].second > 0){
813 dataAlloc[i] = * (
size_t*) result[i].first;
814 newSize += dataAlloc[i];
823 template <
typename K,
typename T>
825 unsigned long int tid, sid;
828 auto start = system_clock::now();
833 tid = context->enqueueTask(OP_GroupByKeyH,
id, this->size);
835 auto result = context->recvTaskResult(tid, sid, start);
837 dataAlloc.resize(context->numProcs());
838 for (
int i = 1; i < context->numProcs(); ++i){
839 if (result[i].second > 0){
840 dataAlloc[i] = * (
size_t*) result[i].first;
841 newSize += dataAlloc[i];
852 template <
typename K,
typename T>
855 return groupByKeyMapped();
857 return groupByKeyHashed();
860 template <
typename K,
typename T>
862 context->writeToFile(
id, path, sufix);
866 template <
typename K,
typename T>
868 std::pair <K,T> result;
870 if (op == OP_Reduce){
871 IreduceIFunctionP<K, T> reduceFunc = (IreduceIFunctionP<K, T>) this->context->funcTable[funcId];
875 buffer.setBuffer(partResult[0], pSize[0]);
878 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
882 buffer.setBuffer(partResult[i], pSize[i]);
885 result = reduceFunc(result.first, result.second, pr.first, pr.second);
888 IbulkReduceIFunctionP<K, T> bulkReduceFunc = (IbulkReduceIFunctionP<K, T>) this->context->funcTable[funcId];
889 T * vals =
new T[this->context->numProcs() - 1];
890 K * keys =
new K[this->context->numProcs() - 1];
893 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
897 buffer.setBuffer(partResult[i], pSize[i]);
904 result = bulkReduceFunc(keys, vals, this->context->numProcs() - 1);
914 template <
typename K,
typename T>
917 std::pair <K,T> result;
918 int funcId = this->context->findFunc(funcP);
919 char ** partResult =
new char*[this->context->numProcs() - 1];
920 size_t * rSize =
new size_t[this->context->numProcs() - 1];
921 unsigned long int tid, sid;
924 auto start = system_clock::now();
925 unsigned long int reduceTaskId UNUSED = this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
928 auto resultV = this->context->recvTaskResult(tid, sid, start);
930 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
931 partResult[i] = (
char*) resultV[i + 1].first;
932 rSize[i] = resultV[i + 1].second;
936 result = finishReduces(partResult, rSize, funcId, op);
938 delete [] partResult;
950 template <
typename K,
typename T>
952 std::tuple <K,T,size_t> result;
954 if (op == OP_Reduce){
955 IPreduceIPFunctionP<K,T> reduceFunc = (IreduceIFunctionP<K,T>) this->context->funcTable[funcId];
958 buffer.setBuffer(partResult[0], pSize[0]);
962 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
963 std::tuple <K,T,size_t> pr;
965 buffer.setBuffer(partResult[i], pSize[i]);
977 IPbulkReduceIPFunctionP<K,T> bulkReduceFunc = (IPbulkReduceIPFunctionP<K,T>) this->context->funcTable[funcId];
978 T * vals =
new T[this->context->numProcs() - 1];
979 K * keys =
new K[this->context->numProcs() - 1];
980 size_t * sizes =
new size_t[this->context->numProcs() - 1];
983 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
985 std::tuple <K,T,size_t> pr;
987 buffer.setBuffer(partResult[i], pSize[i]);
990 std::tie (keys[i], vals[i], sizes[i]) = pr;
993 result = bulkReduceFunc(keys, vals, sizes, this->context->numProcs() - 1);
1000 template <
typename K,
typename T>
1002 auto start = system_clock::now();
1004 std::tuple <K,T,size_t> result;
1005 unsigned long int tid, sid;
1006 int funcId = this->context->findFunc(funcP);
1007 char ** partResult =
new char *[this->context->numProcs() - 1];
1008 size_t * partrSize =
new size_t[this->context->numProcs() - 1];
1011 unsigned long int reduceTaskId = this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
1014 auto resultV = this->context->recvTaskResult(tid, sid, start);
1016 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
1017 partResult[i] = (T*) resultV[i+1].first;
1018 partrSize[i] = resultV[i+1].second /=
sizeof(T);
1022 result = finishReducesP(partResult, partrSize, funcId, op);
1024 delete [] partResult;
1025 delete [] partrSize;
unsigned int fddOpType
Dataset operation type.
fdd< U > * map(mapIFunctionP< K, T, U > funcP)
creates a fdd<U>
indexedFdd< L, U > * mapByKey(IPmapByKeyIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
indexedFdd(fastContext &c, std::string)
Create a indexedFdd from a file.
indexedFdd< L, U > * flatMap(IflatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
fdd< U > * mapByKey(PmapByKeyIFunctionP< K, T, U > funcP)
creates a fdd<U *>
void setGroupedByMap(bool gbm)
(UNUSED)
fdd< U > * bulkMap(bulkMapIFunctionP< K, T, U > funcP)
creates a fdd<U>
indexedFdd(fastContext &c)
Create a empty indexedFdd.
std::unordered_map< K, size_t > countByKey()
Count how many unique key there is in the dataset.
indexedFdd< K, T > * update(updateIFunctionP< K, T > funcP)
updates the content of a indexedFDD
int getId()
Returns the identification number of the dataset.
void writeToFile(std::string path, std::string sufix)
Writes FDD content to file.
indexedFdd(fastContext &c, size_t s)
Create a empty indexedFdd with a pre allocated size.
std::vector< std::pair< K, T > > collect()
Brings the distributted data from a indexedFDD to the driver memory.
indexedFdd< L, U > * mapByKey(ImapByKeyIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
std::pair< K, T > bulkReduce(IbulkReduceIFunctionP< K, T > funcP)
summarizes a fdd<K,T> into a single value of type T using a bulk function pair<K,T> F(K...
void setGroupedByKey(bool gbk)
(UNUSED)
indexedFdd< K, T > * groupByKey()
Groups distributed dataset by key.
bool isGroupedByKey()
Determines if a dataset is grouped by key.
fdd< U > * map(PmapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
std::pair< K, T > reduce(IreduceIFunctionP< K, T > funcP)
summarizes a fdd<K,T> into a single value of type T
groupedFdd< K > * cogroup(iFddCore< K, U > *fdd1, iFddCore< K, V > *fdd2)
Groupes tree datasets together according with the keys of the first dataset.
indexedFdd< L, U > * map(ImapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
fdd< U > * bulkFlatMap(PbulkFlatMapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
indexedFdd< L, U > * flatMap(IPflatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
void discard()
deallocates previously cached FDD
fdd< U > * bulkMap(PbulkMapIFunctionP< K, T, U > funcP)
creates a fdd<U *>
indexedFdd< L, U > * bulkFlatMap(IbulkFlatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
indexedFdd< L, U > * bulkMap(IPbulkMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
groupedFdd< K > * cogroup(iFddCore< K, U > *fdd1)
Groupes two datasets twogether according with the keys of the first dataset.
fdd< U > * flatMap(flatMapIFunctionP< K, T, U > funcP)
creates a fdd<U>
~indexedFdd()
Class Destructor. WARNING: It will deallocate distributed memory.
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
fdd< U > * mapByKey(mapByKeyIFunctionP< K, T, U > funcP)
creates a fdd<U>
indexedFdd< L, U > * bulkMap(IbulkMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U>
indexedFdd(fastContext &c, K *keys, T *data, size_t size)
Create a indexedFdd from a array in memory.
indexedFdd< L, U > * bulkFlatMap(IPbulkFlatMapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
fdd< U > * bulkFlatMap(bulkFlatMapIFunctionP< K, T, U > funcP)
creates a fdd<U>
indexedFdd(fastContext &c, size_t s, const std::vector< size_t > &dataAlloc)
Create a empty indexedFdd with a pre allocated size.
indexedFdd< L, U > * map(IPmapIFunctionP< K, T, L, U > funcP)
creates a indexedFdd<L,U*>
indexedFdd< K, T > * cache()
Prevents automatic memory deallocation from hapenning.
fdd< U > * flatMap(PflatMapIFunctionP< K, T, U > funcP)
creates a fdd<U *>