1 #ifndef LIBFASTER_FDD_H
2 #define LIBFASTER_FDD_H
11 #include "definitions.h"
13 #include "fastContext.h"
19 template <
class K,
class T>
40 template <
typename L,
typename U>
43 fdd<U> * map(
void * funcP, fddOpType op);
48 context->discardFDD(
id);
50 void * getKeyMap() {
return NULL; }
51 void setKeyMap(
void * keyMap UNUSED) {}
52 bool isGroupedByKey() {
return false; }
53 void setGroupedByKey(
bool gbk UNUSED) {}
61 T finishReduces(
char ** partResult,
size_t * pSize,
int funcId, fddOpType op);
62 T reduce(
void * funcP, fddOpType op);
68 this->
id = c.createFDD(
this,
typeid(T).hash_code());
72 fdd(fastContext &c,
size_t s,
const std::vector<size_t> & dataAlloc) : fddCore<T>(c, s, dataAlloc){
73 this->
id = c.createFDD(
this,
typeid(T).hash_code(), dataAlloc);
77 fdd(fastContext &c,
size_t s) : fdd(c, s, c.getAllocation(s)) {
81 fdd(fastContext &c, T * data,
size_t size) : fdd(c, size){
82 c.parallelize(fddBase::id, data, size);
86 fdd(fastContext &c, std::vector<T> &dataV) : fdd(c, dataV.data(), dataV.size()){ }
89 fdd(fastContext &c,
const char * fileName) ;
98 fdd<U> * map( mapFunctionP<T,U> funcP ){
99 return fddCore<T>::template map<U>((
void*) funcP, OP_Map);
101 template <
typename U>
102 fdd<U> * map( PmapFunctionP<T,U> funcP ){
103 return fddCore<T>::template map<U>((
void*) funcP, OP_Map);
105 template <
typename L,
typename U>
106 indexedFdd<L,U> * map( ImapFunctionP<T,L,U> funcP ){
107 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_Map);
109 template <
typename L,
typename U>
110 indexedFdd<L,U> * map( IPmapFunctionP<T,L,U> funcP ){
111 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_Map);
115 template <
typename U>
116 fdd<U> * bulkMap( bulkMapFunctionP<T,U> funcP ){
117 return fddCore<T>::template map<U>((
void*) funcP, OP_BulkMap);
119 template <
typename U>
120 fdd<U> * bulkMap( PbulkMapFunctionP<T,U> funcP ){
121 return fddCore<T>::template map<U>((
void*) funcP, OP_BulkMap);
123 template <
typename L,
typename U>
124 indexedFdd<L,U> * bulkMap( IbulkMapFunctionP<T,L,U> funcP ){
125 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_BulkMap);
127 template <
typename L,
typename U>
128 indexedFdd<L,U> * bulkMap( IPbulkMapFunctionP<T,L,U> funcP ){
129 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_BulkMap);
133 template <
typename U>
134 fdd<U> * flatMap( flatMapFunctionP<T,U> funcP ){
135 return fddCore<T>::template map<U>((
void*) funcP, OP_FlatMap);
137 template <
typename U>
138 fdd<U> * flatMap( PflatMapFunctionP<T,U> funcP ){
139 return fddCore<T>::template map<U>((
void*) funcP, OP_FlatMap);
141 template <
typename L,
typename U>
142 indexedFdd<L,U> * flatMap( IflatMapFunctionP<T,L,U> funcP ){
143 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_FlatMap);
145 template <
typename L,
typename U>
146 indexedFdd<L,U> * flatMap( IPflatMapFunctionP<T,L,U> funcP ){
147 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_FlatMap);
151 template <
typename U>
152 fdd<U> * bulkFlatMap( bulkFlatMapFunctionP<T,U> funcP ){
153 return fddCore<T>::template map<U>((
void*) funcP, OP_BulkFlatMap);
155 template <
typename U>
156 fdd<U> * bulkFlatMap( PbulkFlatMapFunctionP<T,U> funcP ){
157 return fddCore<T>::template map<U>((
void*) funcP, OP_BulkFlatMap);
159 template <
typename L,
typename U>
160 indexedFdd<L,U> * bulkFlatMap( IbulkFlatMapFunctionP<T,L,U> funcP ){
161 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_BulkFlatMap);
163 template <
typename L,
typename U>
164 indexedFdd<L,U> * bulkFlatMap( IPbulkFlatMapFunctionP<T,L,U> funcP ){
165 return fddCore<T>::template mapI<L,U>((
void*) funcP, OP_BulkFlatMap);
170 T reduce( reduceFunctionP<T> funcP ){
171 return reduce((
void*) funcP, OP_Reduce);
173 T bulkReduce( bulkReduceFunctionP<T> funcP ){
174 return reduce((
void*) funcP, OP_BulkReduce);
179 std::vector<T> collect( ){
180 std::vector<T> data(this->size);
181 this->context->collectFDD(data,
this);
197 std::vector <T> finishPReduces(T ** partResult,
size_t * partrSize,
int funcId, fddOpType op);
198 std::vector <T> reduceP(
void * funcP, fddOpType op);
204 this->
id = c.createPFDD(
this,
typeid(T).hash_code());
209 this->
id = c.createPFDD(
this,
typeid(T).hash_code(), dataAlloc);
215 c.parallelize(fddBase::id, data, dataSizes, size);
226 template <
typename U>
227 fdd<U> * map( mapPFunctionP<T,U> funcP ){
230 template <
typename U>
231 fdd<U> * map( PmapPFunctionP<T,U> funcP ){
234 template <
typename L,
typename U>
238 template <
typename L,
typename U>
244 template <
typename U>
245 fdd<U> * bulkMap( bulkMapPFunctionP<T,U> funcP ){
248 template <
typename U>
249 fdd<U> * bulkMap( PbulkMapPFunctionP<T,U> funcP ){
252 template <
typename L,
typename U>
256 template <
typename L,
typename U>
262 template <
typename U>
263 fdd<U> * flatMap( flatMapPFunctionP<T,U> funcP){
266 template <
typename U>
267 fdd<U> * flatMap( PflatMapPFunctionP<T,U> funcP){
270 template <
typename L,
typename U>
274 template <
typename L,
typename U>
280 template <
typename U>
281 fdd<U> * bulkFlatMap( bulkFlatMapPFunctionP<T,U> funcP){
284 template <
typename U>
285 fdd<U> * bulkFlatMap( PbulkFlatMapPFunctionP<T,U> funcP){
288 template <
typename L,
typename U>
292 template <
typename L,
typename U>
298 inline std::vector<T> reduce(PreducePFunctionP<T> funcP ){
299 return reduceP((
void*) funcP, OP_Reduce);
301 inline std::vector<T> bulkReduce(PbulkReducePFunctionP<T> funcP ){
302 return reduceP((
void*) funcP, OP_BulkReduce);
307 std::vector<std::pair<T*, size_t>> collect( ) {
308 std::vector<std::pair<T*, size_t>> data(this->size);
309 this->context->collectFDD(data,
this);
322 template <
typename T>
328 template <
typename T>
329 fddCore<T>::fddCore(fastContext &c,
size_t s,
const std::vector<size_t> & dataAlloc) : fddCore(c){
331 this->dataAlloc = dataAlloc;
335 template <
typename T>
336 fddBase * fddCore<T>::_map(
void * funcP, fddOpType op, fddBase * newFdd){
337 unsigned long int tid, sid;
340 unsigned long int newFddId = newFdd->getId();
343 int funcId = context->findFunc(funcP);
347 auto start = system_clock::now();
348 context->enqueueTask(op,
id, newFddId, funcId, this->size);
352 auto result = context->recvTaskResult(tid, sid, start);
354 if ( (op & 0xff) & (OP_FlatMap | OP_BulkFlatMap) ) {
356 for (
int i = 1; i < context->numProcs(); ++i){
357 if (result[i].second > 0) newSize += * (
size_t *) result[i].first;
360 newFdd->setSize(newSize);
371 template <
typename T>
372 template <
typename U>
373 fdd<U> * fddCore<T>::map(
void * funcP, fddOpType op){
376 if ( (op & 0xFF ) & (OP_FlatMap | OP_BulkFlatMap) ){
377 newFdd =
new fdd<U>(*context);
379 newFdd =
new fdd<U>(*context, size, dataAlloc);
382 return (fdd<U> *) _map(funcP, op, newFdd);
384 template <
typename T>
385 template <
typename L,
typename U>
386 indexedFdd<L,U> * fddCore<T>::mapI(
void * funcP, fddOpType op){
389 if ( (op & 0xFF ) & (OP_FlatMap | OP_BulkFlatMap) ){
390 newFdd =
new indexedFdd<L,U>(*context);
392 newFdd =
new indexedFdd<L,U>(*context, size, dataAlloc);
395 return (indexedFdd<L,U> *) _map(funcP, op, newFdd);
398 template <
typename T>
399 T fdd<T>::finishReduces(
char ** partResult,
size_t * pSize,
int funcId, fddOpType op){
400 fastCommBuffer buffer(0);
403 if (op == OP_Reduce){
404 reduceFunctionP<T> reduceFunc = (reduceFunctionP<T>) this->context->funcTable[funcId];
407 buffer.setBuffer(partResult[0], pSize[0]);
410 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
414 if (pSize[i] == 0) std::cerr <<
"UNEXPECTED ERROR!!!!";
416 buffer.setBuffer(partResult[i], pSize[i]);
419 result = reduceFunc(result, pr);
422 bulkReduceFunctionP<T> bulkReduceFunc = (bulkReduceFunctionP<T>) this->context->funcTable[funcId];
423 T * vals =
new T[this->context->numProcs() - 1];
426 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
427 fastCommBuffer buffer(0);
429 buffer.setBuffer(partResult[i], pSize[i]);
433 result = bulkReduceFunc(vals, this->context->numProcs() - 1);
440 template <
typename T>
441 T fdd<T>::reduce(
void * funcP, fddOpType op){
445 unsigned long int tid, sid;
446 int funcId = this->context->findFunc(funcP);
448 char ** partResult =
new char*[this->context->numProcs() - 1];
449 size_t * rSize =
new size_t[this->context->numProcs() - 1];
453 auto start = system_clock::now();
454 this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
457 auto resultV = this->context->recvTaskResult(tid, sid, start);
459 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
460 partResult[i] = (
char*) resultV[i + 1].first;
461 rSize[i] = resultV[i + 1].second;
465 result = finishReduces(partResult, rSize, funcId, op);
467 delete [] partResult;
477 template <
typename T>
478 std::vector<T> fdd<T*>::finishPReduces(T ** partResult,
size_t * partrSize,
int funcId, fddOpType op){
482 if (op == OP_Reduce){
486 PreducePFunctionP<T> reduceFunc = ( PreducePFunctionP<T> ) this->context->funcTable[funcId];
488 result = partResult[0];
489 rSize = partrSize[0];
492 for (
int i = 1; i < (this->context->numProcs() - 1); ++i){
495 std::pair<T*,size_t> r = reduceFunc(pResult, prSize, partResult[i], partrSize[i]);
501 PbulkReducePFunctionP<T> bulkReduceFunc = (PbulkReducePFunctionP<T>) this->context->funcTable[funcId];
502 std::pair<T*,size_t> r = bulkReduceFunc(partResult, partrSize, this->context->numProcs() - 1);
507 std::vector<T> vResult(rSize);
508 vResult.assign(result, result + rSize);
513 template <
typename T>
514 std::vector <T> fdd<T*>::reduceP(
void * funcP, fddOpType op){
518 int funcId = this->context->findFunc(funcP);
519 T ** partResult =
new T*[this->context->numProcs() -1];
520 size_t * partrSize =
new size_t[this->context->numProcs() - 1];
521 unsigned long int tid, sid;
526 auto start = system_clock::now();
527 this->context->enqueueTask(op, this->
id, 0, funcId, this->size);
530 auto resultV = this->context->recvTaskResult(tid, sid, start);
532 for (
int i = 0; i < (this->context->numProcs() - 1); ++i){
533 partResult[i] = (T*) resultV[i + 1].first;
534 partrSize[i] = resultV[i + 1].second /=
sizeof(T);
539 std::vector<T> vResult = finishPReduces(partResult, partrSize, funcId, op);
541 delete [] partResult;
551 template <
typename T>
552 fdd<T>::fdd(fastContext &c,
const char * fileName) {
553 fddCore<T>::context = &c;
554 this->
id = c.readFDD(
this, fileName);
557 this->context->getFDDInfo(this->size, this->dataAlloc);
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: fastContext.h:23
Definition: _workerFdd.h:11