1 #ifndef LIBFASTER_FDD_H 2 #define LIBFASTER_FDD_H 11 #include "definitions.h" 13 #include "fastContext.h" 19 template <
class K,
class T>
45 template <
typename L,
typename U>
55 context->discardFDD(
id);
65 void writeToFile(std::string & path, std::string & sufix);
87 T finishReduces(
char ** partResult,
size_t * pSize,
int funcId,
fddOpType op);
94 this->
id = c.createFDD(
this,
typeid(T).hash_code());
99 this->
id = c.createFDD(
this,
typeid(T).hash_code(), dataAlloc);
119 assign(data.data(), data.size());
137 std::vector<T> data(this->size);
138 this->context->collectFDD(data,
this);
163 template <
typename U>
168 template <
typename U>
173 template <
typename L,
typename U>
178 template <
typename L,
typename U>
185 template <
typename U>
191 template <
typename U>
197 template <
typename L,
typename U>
203 template <
typename L,
typename U>
214 template <
typename U>
219 template <
typename U>
224 template <
typename L,
typename U>
228 template <
typename L,
typename U>
237 template <
typename U>
243 template <
typename U>
249 template <
typename L,
typename U>
255 template <
typename L,
typename U>
266 return reduce((
void*) funcP, OP_Reduce);
272 return reduce((
void*) funcP, OP_BulkReduce);
282 std::vector <T> finishPReduces(T ** partResult,
size_t * partrSize,
int funcId,
fddOpType op);
283 std::vector <T> reduceP(
void * funcP,
fddOpType op);
289 this->
id = c.createPFDD(
this,
typeid(T).hash_code());
294 this->
id = c.createPFDD(
this,
typeid(T).hash_code(), dataAlloc);
300 c.parallelize(fddBase::id, data, dataSizes, size);
310 template <
typename U>
311 fdd<U> * map( mapPFunctionP<T,U> funcP ){
314 template <
typename U>
315 fdd<U> * map( PmapPFunctionP<T,U> funcP ){
318 template <
typename L,
typename U>
322 template <
typename L,
typename U>
328 template <
typename U>
329 fdd<U> * bulkMap( bulkMapPFunctionP<T,U> funcP ){
332 template <
typename U>
333 fdd<U> * bulkMap( PbulkMapPFunctionP<T,U> funcP ){
336 template <
typename L,
typename U>
340 template <
typename L,
typename U>
346 template <
typename U>
347 fdd<U> * flatMap( flatMapPFunctionP<T,U> funcP){
350 template <
typename U>
351 fdd<U> * flatMap( PflatMapPFunctionP<T,U> funcP){
354 template <
typename L,
typename U>
358 template <
typename L,
typename U>
364 template <
typename U>
365 fdd<U> * bulkFlatMap( bulkFlatMapPFunctionP<T,U> funcP){
368 template <
typename U>
369 fdd<U> * bulkFlatMap( PbulkFlatMapPFunctionP<T,U> funcP){
372 template <
typename L,
typename U>
376 template <
typename L,
typename U>
382 inline std::vector<T> reduce(PreducePFunctionP<T> funcP ){
383 return reduceP((
void*) funcP, OP_Reduce);
385 inline std::vector<T> bulkReduce(PbulkReducePFunctionP<T> funcP ){
386 return reduceP((
void*) funcP, OP_BulkReduce);
391 std::vector<std::pair<T*, size_t>> collect( ) {
392 std::vector<std::pair<T*, size_t>> data(this->size);
393 this->context->collectFDD(data,
this);
406 template <
typename T>
412 template <
typename T>
415 this->dataAlloc = dataAlloc;
419 template <
typename T>
421 unsigned long int tid, sid;
423 context->setInternal(newFdd->
getId(),
true);
425 unsigned long int newFddId = newFdd->
getId();
428 int funcId = context->findFunc(funcP);
432 auto start = system_clock::now();
433 context->enqueueTask(op,
id, newFddId, funcId, this->size);
437 auto result = context->recvTaskResult(tid, sid, start);
439 if ( (op & 0xff) & (OP_FlatMap | OP_BulkFlatMap) ) {
441 for (
int i = 1; i < context->numProcs(); ++i){
442 if (result[i].second > 0) newSize += * (
size_t *) result[i].first;
445 newFdd->setSize(newSize);
456 template <
typename T>
457 template <
typename U>
461 if ( (op & 0xFF ) & (OP_FlatMap | OP_BulkFlatMap) ){
462 newFdd =
new fdd<U>(*context);
464 if (dataAlloc.empty()) dataAlloc = context->getAllocation(size);
465 newFdd =
new fdd<U>(*context, size, dataAlloc);
468 return (
fdd<U> *) _map(funcP, op, newFdd);
471 template <
typename T>
473 context->writeToFile(
id, path, sufix);
476 template <
typename T>
477 template <
typename L,
typename U>
481 if ( (op & 0xFF ) & (OP_FlatMap | OP_BulkFlatMap) ){
484 if (dataAlloc.empty()) dataAlloc = context->getAllocation(size);
491 template <
typename T>
496 if (op == OP_Reduce){
497 reduceFunctionP<T> reduceFunc = (reduceFunctionP<T>) this->context->funcTable[funcId];
500 buffer.setBuffer(partResult[0], pSize[0]);
503 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
507 if (pSize[i] == 0) std::cerr <<
"UNEXPECTED ERROR!!!!";
509 buffer.setBuffer(partResult[i], pSize[i]);
512 result = reduceFunc(result, pr);
515 bulkReduceFunctionP<T> bulkReduceFunc = (bulkReduceFunctionP<T>) this->context->funcTable[funcId];
516 T * vals =
new T[this->context->numProcs() - 1];
519 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
522 buffer.setBuffer(partResult[i], pSize[i]);
526 result = bulkReduceFunc(vals, this->context->numProcs() - 1);
533 template <
typename T>
538 unsigned long int tid, sid;
539 int funcId = this->context->findFunc(funcP);
541 char ** partResult =
new char*[this->context->numProcs() - 1];
542 size_t * rSize =
new size_t[this->context->numProcs() - 1];
546 auto start = system_clock::now();
547 this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
550 auto resultV = this->context->recvTaskResult(tid, sid, start);
552 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
553 partResult[i] = (
char*) resultV[i + 1].first;
554 rSize[i] = resultV[i + 1].second;
558 result = finishReduces(partResult, rSize, funcId, op);
560 delete [] partResult;
570 template <
typename T>
575 if (op == OP_Reduce){
579 PreducePFunctionP<T> reduceFunc = ( PreducePFunctionP<T> ) this->context->funcTable[funcId];
581 result = partResult[0];
582 rSize = partrSize[0];
585 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
588 std::pair<T*,size_t> r = reduceFunc(pResult, prSize, partResult[i], partrSize[i]);
594 PbulkReducePFunctionP<T> bulkReduceFunc = (PbulkReducePFunctionP<T>) this->context->funcTable[funcId];
595 std::pair<T*,size_t> r = bulkReduceFunc(partResult, partrSize, this->context->numProcs() - 1);
600 std::vector<T> vResult(rSize);
601 vResult.assign(result, result + rSize);
606 template <
typename T>
611 int funcId = this->context->findFunc(funcP);
612 T ** partResult =
new T*[this->context->numProcs() -1];
613 size_t * partrSize =
new size_t[this->context->numProcs() - 1];
614 unsigned long int tid, sid;
619 auto start = system_clock::now();
620 this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
623 auto resultV = this->context->recvTaskResult(tid, sid, start);
625 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
626 partResult[i] = (T*) resultV[i + 1].first;
627 partrSize[i] = resultV[i + 1].second /=
sizeof(T);
632 std::vector<T> vResult = finishPReduces(partResult, partrSize, funcId, op);
634 delete [] partResult;
644 template <
typename T>
647 this->
id = c.readFDD(
this, fileName);
650 this->context->getFDDInfo(this->size, this->dataAlloc);
fdd< U > * bulkFlatMap(PbulkFlatMapFunctionP< T, U > funcP)
creates a fdd<U *>
fdd< U > * flatMap(PflatMapFunctionP< T, U > funcP)
creates a fdd<U *>
unsigned int fddOpType
Dataset operation type.
fdd(fastContext &c, T *data, size_t size)
Create a fdd from a array in memory.
indexedFdd< L, U > * map(IPmapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U*>
indexedFdd< L, U > * bulkFlatMap(IbulkFlatMapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U>
void writeToFile(std::string &path, std::string &sufix)
Writes FDD content to file.
T bulkReduce(bulkReduceFunctionP< T > funcP)
summarizes a fdd<T> into a single value of type T using a bulk function T F(T,T)
~fdd()
Class Destructor. WARNING: It will deallocate ditributted memory.
void setKeyMap(void *keyMap UNUSED)
(UNUSED)
core class that implements simple operations.
indexedFdd< L, U > * bulkFlatMap(IPbulkFlatMapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U*>
int getId()
Returns the identification number of the dataset.
bool isGroupedByKey()
(UNUSED)
fdd(fastContext &c, std::vector< T > &dataV)
Create a fdd from a vector in memory.
void discard()
deallocates previusly cached fdd
std::vector< T > collect()
Brings the distributted data from a FDD to the driver memory.
void setGroupedByKey(bool gbk UNUSED)
(UNUSED)
fdd< U > * bulkFlatMap(bulkFlatMapFunctionP< T, U > funcP)
creates a fdd<U>
void assign(T *data, size_t size)
Assign a fdd content from a array.
T reduce(reduceFunctionP< T > funcP)
summarizes a fdd<T> into a single value of type T
fdd< U > * map(mapFunctionP< T, U > funcP)
creates a fdd<U>
indexedFdd< L, U > * flatMap(IPflatMapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U*>
fdd(fastContext &c)
Create a empty fdd.
indexedFdd< L, U > * bulkMap(IbulkMapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U>
fdd(fastContext &c, size_t s)
Create a empty fdd with a pre allocated size.
fdd(fastContext &c, size_t s, const std::vector< size_t > &dataAlloc)
Create a empty fdd with a pre allocated size.
void assign(std::vector< T > &data)
Assign a fdd content from a vector.
fdd< U > * bulkMap(bulkMapFunctionP< T, U > funcP)
creates a fdd<U>
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
indexedFdd< L, U > * flatMap(IflatMapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U>
fdd< U > * flatMap(flatMapFunctionP< T, U > funcP)
creates a fdd<U>
fdd< U > * map(PmapFunctionP< T, U > funcP)
creates a fdd<U *>
indexedFdd< L, U > * map(ImapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U>
fdd< U > * bulkMap(PbulkMapFunctionP< T, U > funcP)
creates a fdd<U *>
void * getKeyMap()
(UNUSED)
fdd< T > * cache()
Prevents automatic memory deallocation from hapenning.
indexedFdd< L, U > * bulkMap(IPbulkMapFunctionP< T, L, U > funcP)
creates a indexedFdd<L,U*>