1 #ifndef LIBFASTER_GROUPEDFDD_H
2 #define LIBFASTER_GROUPEDFDD_H
8 #include "fastContext.h"
12 template <
class K,
class T>
15 template <
class K,
class T>
59 std::vector<fddBase *> members;
62 _kType = decodeType(
typeid(K).hash_code());
68 fddBase * _map (
void * funcP, fddOpType op,
fddBase * newFdd, system_clock::time_point & start);
69 template <
typename To>
70 fdd<To> * map(
void * funcP, fddOpType op);
71 template <
typename Ko,
typename To>
76 void cogroup(std::shared_ptr<std::unordered_map<K, int>> & keyMap, system_clock::time_point & start);
78 template <
typename T,
typename U>
80 members.insert(members.end(), fdd0);
81 members.insert(members.end(), fdd1);
82 id = context->createFddGroup(
this, members);
83 cogroup(keyMap, start);
85 template <
typename T,
typename U,
typename V>
87 members.insert(members.end(), fdd0);
88 members.insert(members.end(), fdd1);
89 members.insert(members.end(), fdd2);
90 id = context->createFddGroup(
this, members);
91 cogroup(keyMap, start);
100 groupedFdd<K> * updateByKey( updateByKeyG2FunctionP<K> funcP){
101 update((
void*) funcP, OP_UpdateByKey);
105 groupedFdd<K> * updateByKey( updateByKeyG3FunctionP<K> funcP){
106 update((
void*) funcP, OP_UpdateByKey);
112 update((
void*) funcP, OP_BulkUpdate);
117 update((
void*) funcP, OP_BulkUpdate);
123 template <
typename Ko,
typename To>
128 template <
typename Ko,
typename To>
132 template <
typename To>
133 fdd<To> * mapByKey( mapByKeyG2FunctionP<K,To> funcP){
134 return (
fdd<To> *) map<To>((
void*) funcP, OP_MapByKey);
137 template <
typename To>
138 fdd<To> * mapByKey( mapByKeyG3FunctionP<K,To> funcP){
139 return (
fdd<To> *) map<To>((
void*) funcP, OP_MapByKey);
144 template <
typename Ko,
typename To>
149 template <
typename Ko,
typename To>
153 template <
typename To>
154 fdd<To> * flatMapByKey( flatMapByKeyG2FunctionP<K,To> funcP){
155 return (
fdd<To> *) map<To>((
void*) funcP, OP_FlatMapByKey);
158 template <
typename To>
159 fdd<To> * flatMapByKey( flatMapByKeyG3FunctionP<K,To> funcP){
160 return (
fdd<To> *) map<To>((
void*) funcP, OP_FlatMapByKey);
164 context->discardFDD(
id);
170 template <
typename Ko,
typename To>
172 return mapI<Ko,To>((
void*) funcP, OP_BulkFlatMap);
174 template <
typename Ko,
typename To>
176 return mapI<Ko,To>((
void*) funcP, OP_BulkFlatMap);
178 template <
typename To>
179 fdd<To> * bulkFlatMap( bulkFlatMapG2FunctionP<K, To> funcP ){
180 return map<To>((
void*) funcP, OP_BulkFlatMap);
182 template <
typename To>
183 fdd<To> * bulkFlatMap( bulkFlatMapG3FunctionP<K, To> funcP ){
184 return map<To>((
void*) funcP, OP_BulkFlatMap);
188 void * getKeyMap() {
return NULL; }
189 void setKeyMap(
void * keyMap UNUSED) {}
190 bool isGroupedByKey() {
return false; }
191 void setGroupedByKey(
bool gbk UNUSED) {}
195 template <
typename K>
198 unsigned long int tid, sid;
199 unsigned long int newFddId = newFdd->getId();
202 int funcId = context->findFunc(funcP);
205 context->enqueueTask(op,
id, newFddId, funcId, this->size);
208 auto result = context->recvTaskResult(tid, sid, start);
210 if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
213 for (
int i = 1; i < context->numProcs(); ++i){
214 if (result[i].second > 0) fddSize += * (
size_t*) result[i].first;
216 newFdd->setSize(fddSize);
220 for (
size_t i = 0; i < members.size(); ++i){
221 if ( ! members[i]->isCached() ){
222 members[i]->discard();
232 template <
typename K>
233 template <
typename To>
234 fdd<To> * groupedFdd<K>::map (
void * funcP, fddOpType op){
235 fdd<To> * newFdd = NULL;
236 auto start = system_clock::now();
238 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
239 newFdd =
new fdd<To>(*context);
241 newFdd =
new fdd<To>(*context, size);
244 return (fdd<To> *) _map(funcP, op, newFdd, start);
246 template <
typename K>
247 template <
typename Ko,
typename To>
248 indexedFdd<Ko,To> * groupedFdd<K>::mapI(
void * funcP, fddOpType op){
249 indexedFdd<Ko,To> * newFdd = NULL;
250 auto start = system_clock::now();
252 if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
253 newFdd =
new indexedFdd<Ko,To>(*context);
255 newFdd =
new indexedFdd<Ko,To>(*context, size);
258 return (indexedFdd<Ko,To> *) _map(funcP, op, newFdd, start);
261 template <
typename K>
262 groupedFdd<K> * groupedFdd<K>::update(
void * funcP, fddOpType op){
263 auto start = system_clock::now();
264 unsigned long int sid;
267 int funcId = context->findFunc(funcP);
270 unsigned long int tid = context->enqueueTask(op, this->
id, 0, funcId, this->size);
272 auto result = context->recvTaskResult(tid, sid, start);
275 for (
size_t i = 0; i < members.size(); ++i){
276 if ( ! members[i]->isCached() ){
277 members[i]->discard();
286 template <
typename K>
287 void groupedFdd<K>::cogroup(std::shared_ptr<std::unordered_map<K, int>> & keyMap, system_clock::time_point & start){
288 using std::chrono::system_clock;
289 std::vector<bool> exchangeData (members.size()-1,
true);
291 unsigned long int sid;
293 for (
size_t i = 1; i < members.size(); ++i){
294 if ( members[i]->isGroupedByKey() ){
295 void * km = members[i]->getKeyMap();
296 if ( *(std::shared_ptr<std::unordered_map<K, int>>*)km != keyMap ){
297 members[i]->setKeyMap(&keyMap);
299 exchangeData[i-1] =
false;
302 members[i]->setKeyMap(&keyMap);
303 members[i]->setGroupedByKey(
true);
307 unsigned long int tid = context->enqueueTask(OP_CoGroup,
id, this->size);
309 context->sendCogroupData(tid, *keyMap, exchangeData);
312 auto result = context->recvTaskResult(tid, sid, start);
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: groupedFdd.h:13
Definition: groupedFdd.h:55
Definition: fastContext.h:23
Definition: _workerFdd.h:11