libfaster API Documentation  Development Version
Super fast distributted computing
groupedFdd.h
1 #ifndef LIBFASTER_GROUPEDFDD_H
2 #define LIBFASTER_GROUPEDFDD_H
3 
4 #include <memory>
5 
6 //#include "indexedFdd.h"
7 #include "misc.h"
8 #include "fastContext.h"
9 
10 namespace faster{
11 
12  template <class K, class T>
13  class iFddCore ;
14 
15  template <class K, class T>
16  class indexedFdd ;
17 
18  /*template<typename K, typename... Types>
19  class groupedFdd : fddBase{
20  private:
21  static const unsigned short int dimension = sizeof...(Types);
22  fastContext * context;
23  std::vector<fddBase *> members;
24 
25  template <typename T, typename... iFdds>
26  void addFdds(iFddCore<K,T> * fdd, iFdds ... otherFdds){
27  members.insert(members.end(), fdd);
28 
29  addFdds(otherFdds... );
30  }
31 
32  template <typename T, typename... OtherArgs, typename... OtherSizes, typename U>
33  void unpackMap( std::pair<K,U> (*funcP) (const K &, T *, OtherArgs ... otherArgs, size_t, OtherSizes ... otherSizes), fddOpType op){
34  unpackMap<K, OtherArgs..., U, OtherSizes...>(funcP, op);
35  }
36 
37  public:
38  template <typename... FddTypes>
39  groupedFdd(fastContext * c, FddTypes * ... fddTypes){
40  context = c;
41 
42  addFdds(fddTypes...);
43  //context->coPartition(members);
44  id = context->createFddGroup(this, members);
45  }
46 
47  template <typename... Sizes, typename U>
48  indexedFdd<K,U> * mapByKey( std::pair<K,U> (*funcP) (const K & key, Types * ... args, Sizes ... sizes) ){
49  unpackMap<K,U>(funcP, OP_Map);
50  return NULL; //map<K,U>((void*) funcP, OP_Map);
51  }
52 
53  };*/
54  template<typename K>
56  private:
57  int numMembers;
58  fastContext * context;
59  std::vector<fddBase *> members;
60 
62  _kType = decodeType(typeid(K).hash_code());
63  _tType = Null;
64  context = c;
65  members.reserve(4);
66  cached = false;
67  }
68  fddBase * _map (void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start);
69  template <typename To>
70  fdd<To> * map(void * funcP, fddOpType op);
71  template <typename Ko, typename To>
72  indexedFdd<Ko,To> * mapI(void * funcP, fddOpType op);
73 
74  groupedFdd<K> * update(void * funcP, fddOpType op);
75 
76  void cogroup(system_clock::time_point & start);
77  public:
86  template <typename T, typename U>
87  groupedFdd(fastContext * c, iFddCore<K,T> * fdd0, iFddCore<K,U> * fdd1, system_clock::time_point & start) : groupedFdd(c) {
88  members.insert(members.end(), fdd0);
89  members.insert(members.end(), fdd1);
90  id = context->createFddGroup(this, members);
91  cogroup(start);
92  }
103  template <typename T, typename U, typename V>
104  groupedFdd(fastContext * c, iFddCore<K,T> * fdd0, iFddCore<K,U> * fdd1, iFddCore<K,V> * fdd2, system_clock::time_point & start) : groupedFdd(c){
105  members.insert(members.end(), fdd0);
106  members.insert(members.end(), fdd1);
107  members.insert(members.end(), fdd2);
108  id = context->createFddGroup(this, members);
109  cogroup(start);
110  }
111 
114 
119  this->cached = true;
120  return this;
121  }
122 
124  void discard(){
125  context->discardFDD(id);
126  dataAlloc.clear();
127  }
129 
130  bool isGroupedByKey() { return false; }
131  void setGroupedByKey(bool gbk UNUSED) {}
132 
135 
136  // UpdateByKey
138  groupedFdd<K> * updateByKey( updateByKeyG2FunctionP<K> funcP){
139  update((void*) funcP, OP_UpdateByKey);
140  return this;
141  }
142 
144  groupedFdd<K> * updateByKey( updateByKeyG3FunctionP<K> funcP){
145  update((void*) funcP, OP_UpdateByKey);
146  return this;
147  }
148 
151  // BulkUpdateByKey
152  groupedFdd<K> * bulkUpdate( bulkUpdateG2FunctionP<K> funcP){
153  update((void*) funcP, OP_BulkUpdate);
154  return this;
155  }
156 
159  groupedFdd<K> * bulkUpdate( bulkUpdateG3FunctionP<K> funcP){
160  update((void*) funcP, OP_BulkUpdate);
161  return this;
162  }
164 
165 
166  // MapByKey
169 
172  template <typename Ko, typename To>
173  indexedFdd<Ko,To> * mapByKey( ImapByKeyG2FunctionP<K,Ko,To> funcP){
174  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_MapByKey);
175  }
176 
179  template <typename Ko, typename To>
180  indexedFdd<Ko,To> * mapByKey( ImapByKeyG3FunctionP<K,Ko,To> funcP){
181  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_MapByKey);
182  }
183 
186  template <typename To>
187  fdd<To> * mapByKey( mapByKeyG2FunctionP<K,To> funcP){
188  return (fdd<To> *) map<To>((void*) funcP, OP_MapByKey);
189  }
190 
192  template <typename To>
193  fdd<To> * mapByKey( mapByKeyG3FunctionP<K,To> funcP){
194  return (fdd<To> *) map<To>((void*) funcP, OP_MapByKey);
195  }
197 
198  // FlatMapByKey
201 
204  template <typename Ko, typename To>
205  indexedFdd<Ko,To> * flatMapByKey( IflatMapByKeyG2FunctionP<K,Ko,To> funcP){
206  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_FlatMapByKey);
207  }
208 
211  template <typename Ko, typename To>
212  indexedFdd<Ko,To> * flatMapByKey( IflatMapByKeyG3FunctionP<K,Ko,To> funcP){
213  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_FlatMapByKey);
214  }
215 
218  template <typename To>
219  fdd<To> * flatMapByKey( flatMapByKeyG2FunctionP<K,To> funcP){
220  return (fdd<To> *) map<To>((void*) funcP, OP_FlatMapByKey);
221  }
222 
225  template <typename To>
226  fdd<To> * flatMapByKey( flatMapByKeyG3FunctionP<K,To> funcP){
227  return (fdd<To> *) map<To>((void*) funcP, OP_FlatMapByKey);
228  }
229 
230 
231  // Bulk Flat Map
234  template <typename Ko, typename To>
235  indexedFdd<Ko,To> * bulkFlatMap( IbulkFlatMapG2FunctionP<K, Ko,To> funcP ){
236  return mapI<Ko,To>((void*) funcP, OP_BulkFlatMap);
237  }
240  template <typename Ko, typename To>
241  indexedFdd<Ko,To> * bulkFlatMap( IbulkFlatMapG3FunctionP<K, Ko,To> funcP ){
242  return mapI<Ko,To>((void*) funcP, OP_BulkFlatMap);
243  }
246  template <typename To>
247  fdd<To> * bulkFlatMap( bulkFlatMapG2FunctionP<K, To> funcP ){
248  return map<To>((void*) funcP, OP_BulkFlatMap);
249  }
252  template <typename To>
253  fdd<To> * bulkFlatMap( bulkFlatMapG3FunctionP<K, To> funcP ){
254  return map<To>((void*) funcP, OP_BulkFlatMap);
255  }
257 
258 
259 
260  };
261 
262  template <typename K>
263  fddBase * groupedFdd<K>::_map (void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start){
264  //std::cerr << " Map ";
265  unsigned long int tid, sid;
266  unsigned long int newFddId = newFdd->getId();
267 
268  // Decode function pointer
269  int funcId = context->findFunc(funcP);
270 
271  // Send task
272  context->enqueueTask(op, id, newFddId, funcId, this->size);
273 
274  // Receive results
275  auto result = context->recvTaskResult(tid, sid, start);
276 
277  if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
278  size_t fddSize = 0;
279 
280  for (int i = 1; i < context->numProcs(); ++i){
281  if (result[i].second > 0) fddSize += * (size_t*) result[i].first;
282  }
283  newFdd->setSize(fddSize);
284  }
285 
286  if ( ! cached ){
287  for ( size_t i = 0; i < members.size(); ++i){
288  if ( ! members[i]->isCached() ){
289  members[i]->discard();
290  //std::cerr << "GD" << id << " ";
291  }
292  }
293  discard();
294  }
295 
296  //std::cerr << "\n";
297  return newFdd;
298  }
299  template <typename K>
300  template <typename To>
301  fdd<To> * groupedFdd<K>::map (void * funcP, fddOpType op){
302  fdd<To> * newFdd = NULL;
303  auto start = system_clock::now();
304 
305  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
306  newFdd = new fdd<To>(*context);
307  }else{
308  newFdd = new fdd<To>(*context, size);
309  }
310 
311  return (fdd<To> *) _map(funcP, op, newFdd, start);
312  }
313  template <typename K>
314  template <typename Ko, typename To>
315  indexedFdd<Ko,To> * groupedFdd<K>::mapI(void * funcP, fddOpType op){
316  indexedFdd<Ko,To> * newFdd = NULL;
317  auto start = system_clock::now();
318 
319  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
320  newFdd = new indexedFdd<Ko,To>(*context);
321  }else{
322  newFdd = new indexedFdd<Ko,To>(*context, size);
323  }
324 
325  return (indexedFdd<Ko,To> *) _map(funcP, op, newFdd, start);
326  }
327 
328  template <typename K>
329  groupedFdd<K> * groupedFdd<K>::update(void * funcP, fddOpType op){
330  auto start = system_clock::now();
331  unsigned long int sid;
332 
333  // Decode function pointer
334  int funcId = context->findFunc(funcP);
335 
336  // Send task
337  unsigned long int tid = context->enqueueTask(op, this->id, 0, funcId, this->size);
338 
339  auto result = context->recvTaskResult(tid, sid, start);
340 
341  if ( ! cached ){
342  for ( size_t i = 0; i < members.size(); ++i){
343  if ( ! members[i]->isCached() ){
344  members[i]->discard();
345  }
346  }
347  discard();
348  }
349 
350  return this;
351  }
352 
353  template <typename K>
354  void groupedFdd<K>::cogroup(system_clock::time_point & start){
355  using std::chrono::system_clock;
356  using std::chrono::duration_cast;
357  using std::chrono::milliseconds;
358 
359  //std::cerr << " DCogroup";
360  start = system_clock::now();
361 
362  unsigned long int sid;
363  //std::cerr << " Init:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
364  //start = system_clock::now();
365  unsigned long int tid = context->enqueueTask(OP_CoGroup, id, this->size);
366 
367  for (size_t i = 1; i < members.size(); ++i){
368  members[i]->setGroupedByKey(true);
369  }
370 
371  auto result = context->recvTaskResult(tid, sid, start);
372  //std::cerr << " Process:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
373  }
374 
375 
376 }
377 
378 #endif
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41
bool isCached()
Returns true if the dataset is cached.
Definition: fddBase.h:36
int getId()
Returns the identification number of the dataset.
Definition: fddBase.h:27
void discard()
deallocates previously cached fdd
Definition: groupedFdd.h:124
Framework context class.
Definition: fastContext.h:66
groupedFdd< K > * cache()
Prevents automatic memory deallocation from hapenning.
Definition: groupedFdd.h:118
groupedFdd(fastContext *c, iFddCore< K, T > *fdd0, iFddCore< K, U > *fdd1, iFddCore< K, V > *fdd2, system_clock::time_point &start)
Creates a indexedFdd group with two members.
Definition: groupedFdd.h:104
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
Definition: fastContext.h:24
libfaster main namespace
Definition: _workerFdd.h:11
groupedFdd(fastContext *c, iFddCore< K, T > *fdd0, iFddCore< K, U > *fdd1, system_clock::time_point &start)
Creates a indexedFdd group with two members.
Definition: groupedFdd.h:87
int numProcs()
Return the number of processes running.
Definition: fastContext.h:150