Faster  0.0.4Alpha
Superfastdistributtedcomputing
fastContext.h
1 #ifndef LIBFASTER_FASTCONTEXT_H
2 #define LIBFASTER_FASTCONTEXT_H
3 
4 #include <string>
5 #include <vector>
6 #include <queue>
7 #include <typeinfo>
8 #include <tuple>
9 #include <math.h>
10 #include <chrono>
11 #include <string>
12 
13 #include "definitions.h"
14 #include "fddBase.h"
15 #include "fastComm.h"
16 #include "fastScheduler.h"
17 
18 using std::chrono::system_clock;
19 
20 namespace faster{
21 
22  template <typename T>
23  class fdd;
24 
25  template <typename K, typename T>
26  class indexedFdd;
27 
28  class fastTask;
29  class fastContext;
30 
31 
32  class fastSettings{
33  friend class fastContext;
34  public:
35 
36  fastSettings() {
37  _allowDataBalancing = false;
38  }
39 
40  fastSettings(const fastSettings & s UNUSED){
41  }
42 
43  void allowDataBalancing(){
44  _allowDataBalancing = true;
45  }
46 
47  private:
48  bool _allowDataBalancing;
49 
50  };
51 
52  // General context
53  // Manages the driver and the Workers
54  class fastContext{
55 
56  template <class T> friend class fdd;
57  template <class T> friend class fddCore;
58  template <class K, class T> friend class iFddCore;
59  template <class K, class T> friend class indexedFdd;
60  template <class K> friend class groupedFdd;
61  friend class worker;
62 
63  public:
64  fastContext( int & argc, char **& argv): fastContext(fastSettings(), argc, argv){}
65  fastContext( const fastSettings & s, int & argc, char **& argv);
66  ~fastContext();
67 
68  void registerFunction(void * funcP);
69  void registerFunction(void * funcP, const std::string name);
70  template <class T>
71  void registerGlobal(T * varP);
72 
73  void startWorkers();
74 
75  void calibrate();
76 
77  void printInfo();
78  void printHeader();
79  void updateInfo();
80 
81  private:
82  int id;
83  unsigned long int numFDDs;
84  //unsigned long int numTasks;
85  fastSettings * settings;
86  std::vector< fddBase * > fddList;
87  std::vector<void*> funcTable;
88  std::vector<std::string> funcName;
89  std::vector< std::pair<void*, size_t> > globalTable;
90  fastComm * comm;
91  fastScheduler * scheduler;
92 
93  //std::vector<fastTask *> taskList;
94 
95  int findFunc(void * funcP);
96 
97  unsigned long int _createFDD(fddBase * ref, fddType type, const std::vector<size_t> * dataAlloc);
98  unsigned long int _createIFDD(fddBase * ref, fddType kTypeCode, fddType tTypeCode, const std::vector<size_t> * dataAlloc);
99  unsigned long int createFDD(fddBase * ref, size_t typeCode);
100  unsigned long int createFDD(fddBase * ref, size_t typeCode, const std::vector<size_t> & dataAlloc);
101  unsigned long int createPFDD(fddBase * ref, size_t typeCode);
102  unsigned long int createPFDD(fddBase * ref, size_t typeCode, const std::vector<size_t> & dataAlloc);
103  unsigned long int createIFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode);
104  unsigned long int createIFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode, const std::vector<size_t> & dataAlloc);
105  unsigned long int createIPFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode);
106  unsigned long int createIPFDD(fddBase * ref, size_t kTypeCode, size_t tTypeCode, const std::vector<size_t> & dataAlloc);
107  unsigned long int createFddGroup(fddBase * ref, std::vector<fddBase*> & fdds);
108 
109  unsigned long int readFDD(fddBase * ref, const char * fileName);
110  void getFDDInfo(size_t & size, std::vector<size_t> & dataAlloc);
111 
112  int numProcs(){ return comm->numProcs; }
113 
114 
115  unsigned long int enqueueTask(fddOpType opT, unsigned long int idSrc, unsigned long int idRes, int funcId, size_t size);
116  unsigned long int enqueueTask(fddOpType opT, unsigned long int id, size_t size);
117 
118  //void * recvTaskResult(unsigned long int &tid, unsigned long int &sid, size_t & size);
119  std::vector< std::pair<void *, size_t> > recvTaskResult(unsigned long int &tid, unsigned long int &sid, system_clock::time_point & start);
120 
121  template <typename K>
122  void sendKeyMap(unsigned long id, std::unordered_map<K, int> & keyMap);
123  template <typename K>
124  void sendCogroupData(unsigned long id, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
125 
126 
127  template <typename Z, typename FDD>
128  void collectFDD(Z & ret, FDD * fddP){
129  //std::cerr << " S:SendCollect " ;
130 
131  comm->sendCollect(fddP->getId());
132  //std::cerr << "ID:" << fddP->getId() << " ";
133 
134  comm->recvFDDDataCollect(ret);
135 
136  //std::cerr << ".\n";
137  }
138 
139 
140  // Propagate FDD data to other machines
141  // Primitive types
142  template <typename T>
143  void parallelize(unsigned long int id, T * data, size_t size){
144  size_t offset = 0;
145  //std::cerr << " Parallelize Data\n";
146 
147  for (int i = 1; i < comm->numProcs; ++i){
148  int dataPerProc = size/(comm->numProcs - 1);
149  int rem = size % (comm->numProcs -1);
150  if (i <= rem)
151  dataPerProc += 1;
152  //std::cerr << " S:FDDSetData P" << i << " ID:" << id << " S:" << dataPerProc << "";
153 
154  comm->sendFDDSetData(id, i, &data[offset], dataPerProc);
155  offset += dataPerProc;
156  //std::cerr << ".\n";
157  }
158  comm->waitForReq(comm->numProcs - 1);
159  //std::cerr << " Done\n";
160  }
161  template <typename K, typename T>
162  void parallelizeI(unsigned long int id, K * keys, T * data, size_t size){
163  size_t offset = 0;
164  //std::cerr << " Parallelize Data\n";
165 
166  for (int i = 1; i < comm->numProcs; ++i){
167  int dataPerProc = size/(comm->numProcs - 1);
168  int rem = size % (comm->numProcs -1);
169  if (i <= rem)
170  dataPerProc += 1;
171  //std::cerr << " S:FDDSetDataI P" << i << " ID:" << id << " S:" << dataPerProc << "";
172 
173  comm->sendFDDSetIData(id, i, &keys[offset], &data[offset], dataPerProc);
174  offset += dataPerProc;
175  //std::cerr << ".\n";
176  }
177  comm->waitForReq(comm->numProcs - 1);
178  //std::cerr << " Done\n";
179  }
180 
181  //Pointers
182  template <typename T>
183  void parallelize(unsigned long int id, T ** data, size_t * dataSizes, size_t size){
184  size_t offset = 0;
185 
186  //std::cerr << " Parallelize Data\n";
187  for (int i = 1; i < comm->numProcs; ++i){
188  int dataPerProc = size/ (comm->numProcs - 1);
189  int rem = size % (comm->numProcs -1);
190  if (i <= rem)
191  dataPerProc += 1;
192  //std::cerr << " S:FDDSetPData P" << i << " ID:" << id << " S:" << dataPerProc << "";
193 
194  comm->sendFDDSetData(id, i, &data[offset], &dataSizes[offset], dataPerProc);
195  offset += dataPerProc;
196  //std::cerr << ".\n";
197  }
198  comm->waitForReq(comm->numProcs - 1);
199  //std::cerr << " Done\n";
200  }
201  template <typename K, typename T>
202  void parallelizeI(unsigned long int id, K * keys, T ** data, size_t * dataSizes, size_t size){
203  size_t offset = 0;
204 
205  //std::cerr << " Parallelize Data\n";
206  for (int i = 1; i < comm->numProcs; ++i){
207  int dataPerProc = size/ (comm->numProcs - 1);
208  int rem = size % (comm->numProcs -1);
209  if (i <= rem)
210  dataPerProc += 1;
211  //std::cerr << " S:FDDSetPDataI P" << i << " ID:" << id << " S:" << dataPerProc << "";
212 
213  comm->sendFDDSetIData(id, i, &keys[offset], &data[offset], &dataSizes[offset], dataPerProc);
214  offset += dataPerProc;
215  //std::cerr << ".\n";
216  }
217  comm->waitForReq(comm->numProcs - 1);
218  //std::cerr << " Done\n";
219  }
220 
221 
222  std::vector<size_t> getAllocation(size_t size);
223 
224  void discardFDD(unsigned long int id);
225 
226  };
227 
228  template <class T>
229  void fastContext::registerGlobal(T * varP){
230  std::pair<void*, size_t> globalReg ;
231  globalReg.first = varP;
232  globalReg.second = sizeof(T);
233  globalTable.insert( globalTable.end(), globalReg );
234  }
235 
236  template <typename K>
237  void fastContext::sendKeyMap(unsigned long tid, std::unordered_map<K, int> & keyMap){
238  comm->sendKeyMap(tid, keyMap);
239  }
240  template <typename K>
241  void fastContext::sendCogroupData(unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> &flags){
242  comm->sendCogroupData(tid, keyMap, flags);
243  }
244 
245 
246 }
247 
248 #endif
Definition: fastComm.h:116
Definition: fdd.h:23
Definition: fastContext.h:26
Definition: fastContext.h:54
Definition: groupedFdd.h:13
Definition: fastScheduler.h:14
Definition: groupedFdd.h:55
Definition: fddBase.h:8
Definition: fastContext.h:23
Definition: _workerFdd.h:11
Definition: fastTask.h:11
Definition: fastContext.h:32
Definition: worker.h:19