libfaster API Documentation  Development Version
Super fast distributted computing
fastContext.h
1 #ifndef LIBFASTER_FASTCONTEXT_H
2 #define LIBFASTER_FASTCONTEXT_H
3 
4 #include <string>
5 #include <vector>
6 #include <queue>
7 #include <typeinfo>
8 #include <tuple>
9 #include <math.h>
10 #include <chrono>
11 #include <string>
12 #include <memory>
13 
14 #include "definitions.h"
15 #include "fddBase.h"
16 #include "fastComm.h"
17 #include "fastScheduler.h"
18 
19 using std::chrono::system_clock;
20 
21 namespace faster{
22 
23  template <typename T>
24  class fdd;
25 
26  template <typename K, typename T>
27  class indexedFdd;
28 
29  class fastTask;
30  class fastContext;
31 
32 
38  class fastSettings{
39  friend class fastContext;
40  public:
41 
44  _allowDataBalancing = false;
45  }
46 
48  fastSettings(const fastSettings & s UNUSED){
49  }
50 
53  _allowDataBalancing = true;
54  }
55 
56  private:
57  bool _allowDataBalancing;
58 
59  };
60 
66  class fastContext{
67 
68  template <class T> friend class fdd;
69  template <class T> friend class fddCore;
70  template <class K, class T> friend class iFddCore;
71  template <class K, class T> friend class indexedFdd;
72  template <class K> friend class groupedFdd;
73  friend class worker;
74 
75  public:
80  fastContext( int argc=0, char ** argv=NULL);
81 
83  fastContext( const fastSettings & s, int argc, char ** argv);
84 
86  ~fastContext();
87 
90 
97  void registerFunction(void * funcP);
98 
106  void registerFunction(void * funcP, const std::string name);
107 
108 
114  template <class T>
115  void registerGlobal(T * varP);
116 
123  template <class T>
124  void registerGlobal(T ** varP, size_t s);
125 
131  template <class T>
132  void registerGlobal(std::vector<T> * varP);
134 
141  void startWorkers();
142 
146  bool isDriver();
150  int numProcs(){ return comm->numProcs; }
151 
152 
154  void calibrate();
155 
158 
166  template <typename T>
167  fdd<T> * onlineFullPartRead(std::string path, onlineFullPartFuncP<T> funcP);
168  template <typename K, typename T>
169  indexedFdd<K,T> * onlineFullPartRead(std::string path, IonlineFullPartFuncP<K,T> funcP);
170  template <typename K, typename T>
171  indexedFdd<K,T> * onlinePartRead(std::string path, IonlineFullPartFuncP<K,T> funcP);
172 
181  template <typename T>
182  fdd<T> * onlineRead(std::string path, onlineFullPartFuncP<T> funcP);
183  template <typename K, typename T>
184  indexedFdd<K,T> * onlineRead(std::string path, IonlineFullPartFuncP<K,T> funcP);
186 
189 
191  void printInfo();
195  void printHeader();
199  void updateInfo();
201 
202  private:
203  int id;
204  unsigned long int numFDDs;
205  //unsigned long int numTasks;
206  fastSettings * settings;
207  std::vector< fddBase * > fddList;
208  std::vector< bool > fddInternal;
209  std::vector< void * > funcTable;
210  std::vector<std::string> funcName;
211  std::vector< std::tuple<void*, size_t, int> > globalTable;
212  fastComm * comm;
213  fastScheduler * scheduler;
214 
215  //std::vector<fastTask *> taskList;
216 
217  int findFunc(void * funcP);
218 
219  unsigned long int _createFDD(fddBase * ref, fddType type, const std::vector<size_t> * dataAlloc);
220  unsigned long int _createIFDD(fddBase * ref, fddType kTypeCode, fddType tTypeCode, const std::vector<size_t> * dataAlloc);
221  unsigned long int createFDD(fddBase * ref, size_t typeCode);
222  unsigned long int createFDD(fddBase * ref, size_t typeCode, const std::vector<size_t> & dataAlloc);
223  unsigned long int createPFDD(fddBase * ref, size_t typeCode);
224  unsigned long int createPFDD(fddBase * ref, size_t typeCode, const std::vector<size_t> & dataAlloc);
225  unsigned long int createIFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode);
226  unsigned long int createIFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode, const std::vector<size_t> & dataAlloc);
227  unsigned long int createIPFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode);
228  unsigned long int createIPFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode, const std::vector<size_t> & dataAlloc);
229  unsigned long int createFddGroup(fddBase * ref, std::vector<fddBase*> & fdds);
230 
231  unsigned long int readFDD(fddBase * ref, const char * fileName);
232  void getFDDInfo(size_t & size, std::vector<size_t> & dataAlloc);
233  void writeToFile(unsigned long int id,std::string & path, std::string & sufix);
234 
235 
236  unsigned long int enqueueTask(fddOpType opT, unsigned long int idSrc, unsigned long int idRes, int funcId, size_t size);
237  unsigned long int enqueueTask(fddOpType opT, unsigned long int id, size_t size);
238 
239  //void * recvTaskResult(unsigned long int &tid, unsigned long int &sid, size_t & size);
240  std::vector< std::pair<void *, size_t> > recvTaskResult(unsigned long int &tid, unsigned long int &sid, system_clock::time_point & start);
241 
242  template <typename K>
243  void sendKeyMap(unsigned long id, std::unordered_map<K, int> & keyMap);
244  template <typename K>
245  void sendCogroupData(unsigned long id, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
246 
247  void setInternal(size_t i, bool b){
248  fddInternal[i] = b;
249  }
250 
251  template <typename Z, typename FDD>
252  void collectFDD(Z & ret, FDD * fddP){
253  //std::cerr << " S:SendCollect " ;
254 
255  comm->sendCollect(fddP->getId());
256  //std::cerr << "ID:" << fddP->getId() << " ";
257 
258  comm->recvFDDDataCollect(ret);
259 
260  //std::cerr << ".\n";
261  }
262 
263 
264  // Propagate FDD data to other machines
265  // Primitive types
266  template <typename T>
267  void parallelize(unsigned long int id, T * data, size_t size){
268  size_t offset = 0;
269  //std::cerr << " Parallelize Data\n";
270 
271  for (int i = 1; i < comm->numProcs; ++i){
272  int dataPerProc = size/(comm->numProcs - 1);
273  int rem = size % (comm->numProcs -1);
274  if (i <= rem)
275  dataPerProc += 1;
276  //std::cerr << " S:FDDSetData P" << i << " ID:" << id << " S:" << dataPerProc << "";
277 
278  comm->sendFDDSetData(id, i, &data[offset], dataPerProc);
279  offset += dataPerProc;
280  //std::cerr << ".\n";
281  }
282  comm->waitForReq(comm->numProcs - 1);
283  //std::cerr << " Done\n";
284  }
285  template <typename K, typename T>
286  void parallelizeI(unsigned long int id, K * keys, T * data, size_t size){
287  size_t offset = 0;
288  //std::cerr << " Parallelize Data\n";
289 
290  for (int i = 1; i < comm->numProcs; ++i){
291  int dataPerProc = size/(comm->numProcs - 1);
292  int rem = size % (comm->numProcs -1);
293  if (i <= rem)
294  dataPerProc += 1;
295  //std::cerr << " S:FDDSetDataI P" << i << " ID:" << id << " S:" << dataPerProc << "";
296 
297  comm->sendFDDSetIData(id, i, &keys[offset], &data[offset], dataPerProc);
298  offset += dataPerProc;
299  //std::cerr << ".\n";
300  }
301  comm->waitForReq(comm->numProcs - 1);
302  //std::cerr << " Done\n";
303  }
304 
305  //Pointers
306  template <typename T>
307  void parallelize(unsigned long int id, T ** data, size_t * dataSizes, size_t size){
308  size_t offset = 0;
309 
310  //std::cerr << " Parallelize Data\n";
311  for (int i = 1; i < comm->numProcs; ++i){
312  int dataPerProc = size/ (comm->numProcs - 1);
313  int rem = size % (comm->numProcs -1);
314  if (i <= rem)
315  dataPerProc += 1;
316  //std::cerr << " S:FDDSetPData P" << i << " ID:" << id << " S:" << dataPerProc << "";
317 
318  comm->sendFDDSetData(id, i, &data[offset], &dataSizes[offset], dataPerProc);
319  offset += dataPerProc;
320  //std::cerr << ".\n";
321  }
322  comm->waitForReq(comm->numProcs - 1);
323  //std::cerr << " Done\n";
324  }
325  template <typename K, typename T>
326  void parallelizeI(unsigned long int id, K * keys, T ** data, size_t * dataSizes, size_t size){
327  size_t offset = 0;
328 
329  //std::cerr << " Parallelize Data\n";
330  for (int i = 1; i < comm->numProcs; ++i){
331  int dataPerProc = size/ (comm->numProcs - 1);
332  int rem = size % (comm->numProcs -1);
333  if (i <= rem)
334  dataPerProc += 1;
335  //std::cerr << " S:FDDSetPDataI P" << i << " ID:" << id << " S:" << dataPerProc << "";
336 
337  comm->sendFDDSetIData(id, i, &keys[offset], &data[offset], &dataSizes[offset], dataPerProc);
338  offset += dataPerProc;
339  //std::cerr << ".\n";
340  }
341  comm->waitForReq(comm->numProcs - 1);
342  //std::cerr << " Done\n";
343  }
344 
345 
346  std::vector<size_t> getAllocation(size_t size);
347 
348  void discardFDD(unsigned long int id);
349 
350  };
351 
352  template <class T>
354  globalTable.insert( globalTable.end(), std::make_tuple(varP, sizeof(T), 0) );
355  }
356  template <class T>
357  void fastContext::registerGlobal(T ** varP, size_t s){
358  globalTable.insert( globalTable.end(), std::make_tuple(varP, s, POINTER) );
359  }
360  // Still unimplemented
361  template <class T>
362  void fastContext::registerGlobal(std::vector<T> * varP){
363  globalTable.insert( globalTable.end(), std::make_tuple(varP, sizeof(T) * varP->size(), VECTOR) );
364  }
365 
366  /*template <typename T>
367  fdd<T> * fastContext::onlineFullPartRead(std::string path, onlineFullPartFuncP<T> funcP){
368  auto start = system_clock::now();
369  fdd<T> * newFdd = new fdd<T>(*this);
370  int funcId = findFunc((void*)funcP);
371  std::vector<size_t> alloc(comm->numProcs, 0);
372  unsigned long int sid;
373  size_t newSize = 0;
374 
375  // Send task
376  unsigned long int tid = enqueueTask(OP_OnFullPRead, newFdd->getId(), -1, funcId, 0);
377  for (int i = 1; i < numProcs(); ++i){
378  comm->sendFileName(path, i, 0, 0);
379  }
380 
381  // Receive results
382  auto result = recvTaskResult(tid, sid, start);
383 
384  for (int i = 1; i < numProcs(); ++i){
385  if (result[i].second > 0){
386  alloc[i] = * (size_t*) result[i].first;
387  newSize += alloc[i];
388  }
389  }
390  newFdd->setSize(newSize);
391  scheduler->setAllocation(alloc, newSize);
392 
393  fddList.insert(fddList.begin(), newFdd);
394 
395  return newFdd;
396  }*/
397 
398  template <typename K, typename T>
399  indexedFdd<K,T> * fastContext::onlineFullPartRead(std::string path, IonlineFullPartFuncP<K,T> funcP){
400  auto start = system_clock::now();
401  indexedFdd<K,T> * newFdd = new indexedFdd<K,T>(*this);
402  int funcId = findFunc((void*)funcP);
403  std::vector<size_t> alloc(comm->numProcs, 0);
404  unsigned long int sid;
405  size_t newSize = 0;
406 
407  // Send task
408  unsigned long int tid = enqueueTask(OP_OnFullPRead, newFdd->getId(), -1, funcId, 0);
409  comm->sendFileName(path);
410 
411  // Receive results
412  auto result = recvTaskResult(tid, sid, start);
413 
414  for (int i = 1; i < numProcs(); ++i){
415  if (result[i].second > 0){
416  alloc[i] = * (size_t*) result[i].first;
417  newSize += alloc[i];
418  }
419  }
420  newFdd->setSize(newSize);
421  newFdd->setGroupedByKey(true);
422  newFdd->setGroupedByMap(true);
423  scheduler->setAllocation(alloc, newSize);
424 
425  return newFdd;
426  }
427 
428 
429  template <typename K, typename T>
430  indexedFdd<K,T> * fastContext::onlinePartRead(std::string path, IonlineFullPartFuncP<K,T> funcP){
431  auto start = system_clock::now();
432  indexedFdd<K,T> * newFdd = new indexedFdd<K,T>(*this);
433  int funcId = findFunc((void*)funcP);
434  std::vector<size_t> alloc(comm->numProcs, 0);
435  unsigned long int sid;
436  size_t newSize = 0;
437 
438  // Send task
439  unsigned long int tid = enqueueTask(OP_OnPartRead, newFdd->getId(), -1, funcId, 0);
440  comm->sendFileName(path);
441 
442  // Receive results
443  auto result = recvTaskResult(tid, sid, start);
444 
445  for (int i = 1; i < numProcs(); ++i){
446  if (result[i].second > 0){
447  alloc[i] = * (size_t*) result[i].first;
448  newSize += alloc[i];
449  }
450  }
451  newFdd->setSize(newSize);
452  newFdd->setGroupedByKey(true);
453  newFdd->setGroupedByMap(true);
454  scheduler->setAllocation(alloc, newSize);
455 
456  return newFdd;
457  }
458 
459  template <typename K>
460  void fastContext::sendKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap){
461  comm->sendKeyMap(tid, keyMap);
462  }
463  template <typename K>
464  void fastContext::sendCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> &flags){
465  comm->sendCogroupData(tid, keyMap, flags);
466  }
467 
468 
469 }
470 
471 #endif
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41
void registerGlobal(T *varP)
Gegisters a primitive global varible to be used inside used defined functions in distributted environ...
Definition: fastContext.h:353
void setGroupedByMap(bool gbm)
(UNUSED)
Definition: indexedFdd.h:160
fastSettings(const fastSettings &s UNUSED)
fastSetting dummy constructor
Definition: fastContext.h:48
core class that implements simple operations.
Definition: fdd.h:26
int getId()
Returns the identification number of the dataset.
Definition: fddBase.h:27
Framework context class.
Definition: fastContext.h:66
void setGroupedByKey(bool gbk)
(UNUSED)
Definition: indexedFdd.h:156
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
Definition: fastContext.h:24
libfaster main namespace
Definition: _workerFdd.h:11
unsigned int fddType
Dataset type.
Definition: definitions.h:16
Context Configuration Class.
Definition: fastContext.h:38
int numProcs()
Return the number of processes running.
Definition: fastContext.h:150
fastSettings()
fastSetting default constructor
Definition: fastContext.h:43
void allowDataBalancing()
Enables dynamic load balancing.
Definition: fastContext.h:52
fdd< T > * onlineFullPartRead(std::string path, onlineFullPartFuncP< T > funcP)
Reads a file with online parsing and partition (NOT IMPLEMENTED)