Faster  0.0.4Alpha
Superfastdistributtedcomputing
groupedFdd.h
1 #ifndef LIBFASTER_GROUPEDFDD_H
2 #define LIBFASTER_GROUPEDFDD_H
3 
4 #include <memory>
5 
6 //#include "indexedFdd.h"
7 #include "misc.h"
8 #include "fastContext.h"
9 
10 namespace faster{
11 
12  template <class K, class T>
13  class iFddCore ;
14 
15  template <class K, class T>
16  class indexedFdd ;
17 
18  /*template<typename K, typename... Types>
19  class groupedFdd : fddBase{
20  private:
21  static const unsigned short int dimension = sizeof...(Types);
22  fastContext * context;
23  std::vector<fddBase *> members;
24 
25  template <typename T, typename... iFdds>
26  void addFdds(iFddCore<K,T> * fdd, iFdds ... otherFdds){
27  members.insert(members.end(), fdd);
28 
29  addFdds(otherFdds... );
30  }
31 
32  template <typename T, typename... OtherArgs, typename... OtherSizes, typename U>
33  void unpackMap( std::pair<K,U> (*funcP) (const K &, T *, OtherArgs ... otherArgs, size_t, OtherSizes ... otherSizes), fddOpType op){
34  unpackMap<K, OtherArgs..., U, OtherSizes...>(funcP, op);
35  }
36 
37  public:
38  template <typename... FddTypes>
39  groupedFdd(fastContext * c, FddTypes * ... fddTypes){
40  context = c;
41 
42  addFdds(fddTypes...);
43  //context->coPartition(members);
44  id = context->createFddGroup(this, members);
45  }
46 
47  template <typename... Sizes, typename U>
48  indexedFdd<K,U> * mapByKey( std::pair<K,U> (*funcP) (const K & key, Types * ... args, Sizes ... sizes) ){
49  unpackMap<K,U>(funcP, OP_Map);
50  return NULL; //map<K,U>((void*) funcP, OP_Map);
51  }
52 
53  };*/
54  template<typename K>
56  private:
57  int numMembers;
58  fastContext * context;
59  std::vector<fddBase *> members;
60 
62  _kType = decodeType(typeid(K).hash_code());
63  _tType = Null;
64  context = c;
65  members.reserve(4);
66  cached = false;
67  }
68  fddBase * _map (void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start);
69  template <typename To>
70  fdd<To> * map(void * funcP, fddOpType op);
71  template <typename Ko, typename To>
72  indexedFdd<Ko,To> * mapI(void * funcP, fddOpType op);
73 
74  groupedFdd<K> * update(void * funcP, fddOpType op);
75 
76  void cogroup(std::shared_ptr<std::unordered_map<K, int>> & keyMap, system_clock::time_point & start);
77  public:
78  template <typename T, typename U>
79  groupedFdd(fastContext * c, iFddCore<K,T> * fdd0, iFddCore<K,U> * fdd1, std::shared_ptr<std::unordered_map<K, int>> & keyMap, system_clock::time_point & start) : groupedFdd(c) {
80  members.insert(members.end(), fdd0);
81  members.insert(members.end(), fdd1);
82  id = context->createFddGroup(this, members);
83  cogroup(keyMap, start);
84  }
85  template <typename T, typename U, typename V>
86  groupedFdd(fastContext * c, iFddCore<K,T> * fdd0, iFddCore<K,U> * fdd1, iFddCore<K,V> * fdd2, std::shared_ptr<std::unordered_map<K, int>> & keyMap, system_clock::time_point & start) : groupedFdd(c){
87  members.insert(members.end(), fdd0);
88  members.insert(members.end(), fdd1);
89  members.insert(members.end(), fdd2);
90  id = context->createFddGroup(this, members);
91  cogroup(keyMap, start);
92  }
93 
94  groupedFdd<K> * cache(){
95  this->cached = true;
96  return this;
97  }
98 
99  // UpdateByKey
100  groupedFdd<K> * updateByKey( updateByKeyG2FunctionP<K> funcP){
101  update((void*) funcP, OP_UpdateByKey);
102  return this;
103  }
104 
105  groupedFdd<K> * updateByKey( updateByKeyG3FunctionP<K> funcP){
106  update((void*) funcP, OP_UpdateByKey);
107  return this;
108  }
109 
110  // BulkUpdateByKey
111  groupedFdd<K> * bulkUpdate( bulkUpdateG2FunctionP<K> funcP){
112  update((void*) funcP, OP_BulkUpdate);
113  return this;
114  }
115 
116  groupedFdd<K> * bulkUpdate( bulkUpdateG3FunctionP<K> funcP){
117  update((void*) funcP, OP_BulkUpdate);
118  return this;
119  }
120 
121 
122  // MapByKey
123  template <typename Ko, typename To>
124  indexedFdd<Ko,To> * mapByKey( ImapByKeyG2FunctionP<K,Ko,To> funcP){
125  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_MapByKey);
126  }
127 
128  template <typename Ko, typename To>
129  indexedFdd<Ko,To> * mapByKey( ImapByKeyG3FunctionP<K,Ko,To> funcP){
130  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_MapByKey);
131  }
132  template <typename To>
133  fdd<To> * mapByKey( mapByKeyG2FunctionP<K,To> funcP){
134  return (fdd<To> *) map<To>((void*) funcP, OP_MapByKey);
135  }
136 
137  template <typename To>
138  fdd<To> * mapByKey( mapByKeyG3FunctionP<K,To> funcP){
139  return (fdd<To> *) map<To>((void*) funcP, OP_MapByKey);
140  }
141 
142  // FlatMapByKey
143 
144  template <typename Ko, typename To>
145  indexedFdd<Ko,To> * flatMapByKey( IflatMapByKeyG2FunctionP<K,Ko,To> funcP){
146  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_FlatMapByKey);
147  }
148 
149  template <typename Ko, typename To>
150  indexedFdd<Ko,To> * flatMapByKey( IflatMapByKeyG3FunctionP<K,Ko,To> funcP){
151  return (indexedFdd<Ko,To> *) mapI<Ko,To>((void*) funcP, OP_FlatMapByKey);
152  }
153  template <typename To>
154  fdd<To> * flatMapByKey( flatMapByKeyG2FunctionP<K,To> funcP){
155  return (fdd<To> *) map<To>((void*) funcP, OP_FlatMapByKey);
156  }
157 
158  template <typename To>
159  fdd<To> * flatMapByKey( flatMapByKeyG3FunctionP<K,To> funcP){
160  return (fdd<To> *) map<To>((void*) funcP, OP_FlatMapByKey);
161  }
162 
163  void discard(){
164  context->discardFDD(id);
165  dataAlloc.clear();
166  }
167 
168 
169  // Bulk Flat Map
170  template <typename Ko, typename To>
171  indexedFdd<Ko,To> * bulkFlatMap( IbulkFlatMapG2FunctionP<K, Ko,To> funcP ){
172  return mapI<Ko,To>((void*) funcP, OP_BulkFlatMap);
173  }
174  template <typename Ko, typename To>
175  indexedFdd<Ko,To> * bulkFlatMap( IbulkFlatMapG3FunctionP<K, Ko,To> funcP ){
176  return mapI<Ko,To>((void*) funcP, OP_BulkFlatMap);
177  }
178  template <typename To>
179  fdd<To> * bulkFlatMap( bulkFlatMapG2FunctionP<K, To> funcP ){
180  return map<To>((void*) funcP, OP_BulkFlatMap);
181  }
182  template <typename To>
183  fdd<To> * bulkFlatMap( bulkFlatMapG3FunctionP<K, To> funcP ){
184  return map<To>((void*) funcP, OP_BulkFlatMap);
185  }
186 
187 
188  void * getKeyMap() { return NULL; }
189  void setKeyMap(void * keyMap UNUSED) {}
190  bool isGroupedByKey() { return false; }
191  void setGroupedByKey(bool gbk UNUSED) {}
192 
193  };
194 
195  template <typename K>
196  fddBase * groupedFdd<K>::_map (void * funcP, fddOpType op, fddBase * newFdd, system_clock::time_point & start){
197  //std::cerr << " Map ";
198  unsigned long int tid, sid;
199  unsigned long int newFddId = newFdd->getId();
200 
201  // Decode function pointer
202  int funcId = context->findFunc(funcP);
203 
204  // Send task
205  context->enqueueTask(op, id, newFddId, funcId, this->size);
206 
207  // Receive results
208  auto result = context->recvTaskResult(tid, sid, start);
209 
210  if ( (op & 0xff) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
211  size_t fddSize = 0;
212 
213  for (int i = 1; i < context->numProcs(); ++i){
214  if (result[i].second > 0) fddSize += * (size_t*) result[i].first;
215  }
216  newFdd->setSize(fddSize);
217  }
218 
219  if ( ! cached ){
220  for ( size_t i = 0; i < members.size(); ++i){
221  if ( ! members[i]->isCached() ){
222  members[i]->discard();
223  //std::cerr << "GD" << id << " ";
224  }
225  }
226  discard();
227  }
228 
229  //std::cerr << "\n";
230  return newFdd;
231  }
232  template <typename K>
233  template <typename To>
234  fdd<To> * groupedFdd<K>::map (void * funcP, fddOpType op){
235  fdd<To> * newFdd = NULL;
236  auto start = system_clock::now();
237 
238  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
239  newFdd = new fdd<To>(*context);
240  }else{
241  newFdd = new fdd<To>(*context, size);
242  }
243 
244  return (fdd<To> *) _map(funcP, op, newFdd, start);
245  }
246  template <typename K>
247  template <typename Ko, typename To>
248  indexedFdd<Ko,To> * groupedFdd<K>::mapI(void * funcP, fddOpType op){
249  indexedFdd<Ko,To> * newFdd = NULL;
250  auto start = system_clock::now();
251 
252  if ( (op & 0xFF ) & (OP_MapByKey | OP_FlatMapByKey | OP_FlatMap | OP_BulkFlatMap) ){
253  newFdd = new indexedFdd<Ko,To>(*context);
254  }else{
255  newFdd = new indexedFdd<Ko,To>(*context, size);
256  }
257 
258  return (indexedFdd<Ko,To> *) _map(funcP, op, newFdd, start);
259  }
260 
261  template <typename K>
262  groupedFdd<K> * groupedFdd<K>::update(void * funcP, fddOpType op){
263  auto start = system_clock::now();
264  unsigned long int sid;
265 
266  // Decode function pointer
267  int funcId = context->findFunc(funcP);
268 
269  // Send task
270  unsigned long int tid = context->enqueueTask(op, this->id, 0, funcId, this->size);
271 
272  auto result = context->recvTaskResult(tid, sid, start);
273 
274  if ( ! cached ){
275  for ( size_t i = 0; i < members.size(); ++i){
276  if ( ! members[i]->isCached() ){
277  members[i]->discard();
278  }
279  }
280  discard();
281  }
282 
283  return this;
284  }
285 
286  template <typename K>
287  void groupedFdd<K>::cogroup(std::shared_ptr<std::unordered_map<K, int>> & keyMap, system_clock::time_point & start){
288  using std::chrono::system_clock;
289  std::vector<bool> exchangeData (members.size()-1, true);
290 
291  unsigned long int sid;
292 
293  for (size_t i = 1; i < members.size(); ++i){
294  if ( members[i]->isGroupedByKey() ){
295  void * km = members[i]->getKeyMap();
296  if ( *(std::shared_ptr<std::unordered_map<K, int>>*)km != keyMap ){
297  members[i]->setKeyMap(&keyMap);
298  }else{
299  exchangeData[i-1] = false;
300  }
301  }else{
302  members[i]->setKeyMap(&keyMap);
303  members[i]->setGroupedByKey(true);
304  }
305  }
306 
307  unsigned long int tid = context->enqueueTask(OP_CoGroup, id, this->size);
308 
309  context->sendCogroupData(tid, *keyMap, exchangeData);
310 
311 
312  auto result = context->recvTaskResult(tid, sid, start);
313  }
314 
315 
316 }
317 
318 #endif
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: groupedFdd.h:13
Definition: groupedFdd.h:55
Definition: fddBase.h:8
Definition: fastContext.h:23
Definition: _workerFdd.h:11