1 #ifndef LIBFASTER_GROUPEDFDD_H 2 #define LIBFASTER_GROUPEDFDD_H 8 #include "fastContext.h" 12 template <
class K,
class T>
15 template <
class K,
class T>
59 std::vector<fddBase *> members;
62 _kType = decodeType(
typeid(K).hash_code());
69 template <
typename To>
71 template <
typename Ko,
typename To>
76 void cogroup(system_clock::time_point & start);
86 template <
typename T,
typename U>
88 members.insert(members.end(), fdd0);
89 members.insert(members.end(), fdd1);
90 id = context->createFddGroup(
this, members);
103 template <
typename T,
typename U,
typename V>
105 members.insert(members.end(), fdd0);
106 members.insert(members.end(), fdd1);
107 members.insert(members.end(), fdd2);
108 id = context->createFddGroup(
this, members);
125 context->discardFDD(
id);
130 bool isGroupedByKey() {
return false; }
131 void setGroupedByKey(
bool gbk UNUSED) {}
138 groupedFdd<K> * updateByKey( updateByKeyG2FunctionP<K> funcP){
139 update((
void*) funcP, OP_UpdateByKey);
144 groupedFdd<K> * updateByKey( updateByKeyG3FunctionP<K> funcP){
145 update((
void*) funcP, OP_UpdateByKey);
153 update((
void*) funcP, OP_BulkUpdate);
160 update((
void*) funcP, OP_BulkUpdate);
172 template <
typename Ko,
typename To>
179 template <
typename Ko,
typename To>
186 template <
typename To>
187 fdd<To> * mapByKey( mapByKeyG2FunctionP<K,To> funcP){
188 return (
fdd<To> *) map<To>((
void*) funcP, OP_MapByKey);
192 template <
typename To>
193 fdd<To> * mapByKey( mapByKeyG3FunctionP<K,To> funcP){
194 return (
fdd<To> *) map<To>((
void*) funcP, OP_MapByKey);
204 template <
typename Ko,
typename To>
211 template <
typename Ko,
typename To>
218 template <
typename To>
219 fdd<To> * flatMapByKey( flatMapByKeyG2FunctionP<K,To> funcP){
220 return (
fdd<To> *) map<To>((
void*) funcP, OP_FlatMapByKey);
225 template <
typename To>
226 fdd<To> * flatMapByKey( flatMapByKeyG3FunctionP<K,To> funcP){
227 return (
fdd<To> *) map<To>((
void*) funcP, OP_FlatMapByKey);
234 template <
typename Ko,
typename To>
236 return mapI<Ko,To>((
void*) funcP, OP_BulkFlatMap);
240 template <
typename Ko,
typename To>
242 return mapI<Ko,To>((
void*) funcP, OP_BulkFlatMap);
246 template <
typename To>
247 fdd<To> * bulkFlatMap( bulkFlatMapG2FunctionP<K, To> funcP ){
248 return map<To>((
void*) funcP, OP_BulkFlatMap);
252 template <
typename To>
253 fdd<To> * bulkFlatMap( bulkFlatMapG3FunctionP<K, To> funcP ){
254 return map<To>((
void*) funcP, OP_BulkFlatMap);
262 template <
typename K>
265 unsigned long int tid, sid;
266 unsigned long int newFddId = newFdd->
getId();
269 int funcId = context->findFunc(funcP);
272 context->enqueueTask(op,
id, newFddId, funcId, this->size);
275 auto result = context->recvTaskResult(tid, sid, start);
277 if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
280 for (
int i = 1; i < context->
numProcs(); ++i){
281 if (result[i].second > 0) fddSize += * (
size_t*) result[i].first;
283 newFdd->setSize(fddSize);
287 for (
size_t i = 0; i < members.size(); ++i){
289 members[i]->discard();
299 template <
typename K>
300 template <
typename To>
303 auto start = system_clock::now();
305 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
306 newFdd =
new fdd<To>(*context);
308 newFdd =
new fdd<To>(*context, size);
311 return (
fdd<To> *) _map(funcP, op, newFdd, start);
313 template <
typename K>
314 template <
typename Ko,
typename To>
317 auto start = system_clock::now();
319 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
328 template <
typename K>
330 auto start = system_clock::now();
331 unsigned long int sid;
334 int funcId = context->findFunc(funcP);
337 unsigned long int tid = context->enqueueTask(op, this->
id, 0, funcId, this->size);
339 auto result = context->recvTaskResult(tid, sid, start);
342 for (
size_t i = 0; i < members.size(); ++i){
344 members[i]->discard();
353 template <
typename K>
355 using std::chrono::system_clock;
356 using std::chrono::duration_cast;
357 using std::chrono::milliseconds;
360 start = system_clock::now();
362 unsigned long int sid;
365 unsigned long int tid = context->enqueueTask(OP_CoGroup,
id, this->size);
367 for (
size_t i = 1; i < members.size(); ++i){
368 members[i]->setGroupedByKey(
true);
371 auto result = context->recvTaskResult(tid, sid, start);
unsigned int fddOpType
Dataset operation type.
bool isCached()
Returns true if the dataset is cached.
int getId()
Returns the identification number of the dataset.
void discard()
deallocates previously cached fdd
groupedFdd< K > * cache()
Prevents automatic memory deallocation from hapenning.
groupedFdd(fastContext *c, iFddCore< K, T > *fdd0, iFddCore< K, U > *fdd1, iFddCore< K, V > *fdd2, system_clock::time_point &start)
Creates a indexedFdd group with two members.
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
groupedFdd(fastContext *c, iFddCore< K, T > *fdd0, iFddCore< K, U > *fdd1, system_clock::time_point &start)
Creates a indexedFdd group with two members.
int numProcs()
Return the number of processes running.