1 #ifndef LIBFASTER_FASTCONTEXT_H 2 #define LIBFASTER_FASTCONTEXT_H 14 #include "definitions.h" 17 #include "fastScheduler.h" 19 using std::chrono::system_clock;
26 template <
typename K,
typename T>
44 _allowDataBalancing =
false;
53 _allowDataBalancing =
true;
57 bool _allowDataBalancing;
68 template <
class T>
friend class fdd;
69 template <
class T>
friend class fddCore;
70 template <
class K,
class T>
friend class iFddCore;
71 template <
class K,
class T>
friend class indexedFdd;
97 void registerFunction(
void * funcP);
106 void registerFunction(
void * funcP,
const std::string name);
115 void registerGlobal(T * varP);
124 void registerGlobal(T ** varP,
size_t s);
132 void registerGlobal(std::vector<T> * varP);
166 template <
typename T>
167 fdd<T> * onlineFullPartRead(std::string path, onlineFullPartFuncP<T> funcP);
168 template <
typename K,
typename T>
169 indexedFdd<K,T> * onlineFullPartRead(std::string path, IonlineFullPartFuncP<K,T> funcP);
170 template <
typename K,
typename T>
171 indexedFdd<K,T> * onlinePartRead(std::string path, IonlineFullPartFuncP<K,T> funcP);
181 template <
typename T>
182 fdd<T> * onlineRead(std::string path, onlineFullPartFuncP<T> funcP);
183 template <
typename K,
typename T>
184 indexedFdd<K,T> * onlineRead(std::string path, IonlineFullPartFuncP<K,T> funcP);
204 unsigned long int numFDDs;
207 std::vector< fddBase * > fddList;
208 std::vector< bool > fddInternal;
209 std::vector< void * > funcTable;
210 std::vector<std::string> funcName;
211 std::vector< std::tuple<void*, size_t, int> > globalTable;
217 int findFunc(
void * funcP);
219 unsigned long int _createFDD(
fddBase * ref,
fddType type,
const std::vector<size_t> * dataAlloc);
220 unsigned long int _createIFDD(
fddBase * ref,
fddType kTypeCode,
fddType tTypeCode,
const std::vector<size_t> * dataAlloc);
221 unsigned long int createFDD(
fddBase * ref,
size_t typeCode);
222 unsigned long int createFDD(
fddBase * ref,
size_t typeCode,
const std::vector<size_t> & dataAlloc);
223 unsigned long int createPFDD(
fddBase * ref,
size_t typeCode);
224 unsigned long int createPFDD(
fddBase * ref,
size_t typeCode,
const std::vector<size_t> & dataAlloc);
225 unsigned long int createIFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode);
226 unsigned long int createIFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode,
const std::vector<size_t> & dataAlloc);
227 unsigned long int createIPFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode);
228 unsigned long int createIPFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode,
const std::vector<size_t> & dataAlloc);
229 unsigned long int createFddGroup(
fddBase * ref, std::vector<fddBase*> & fdds);
231 unsigned long int readFDD(
fddBase * ref,
const char * fileName);
232 void getFDDInfo(
size_t & size, std::vector<size_t> & dataAlloc);
233 void writeToFile(
unsigned long int id,std::string & path, std::string & sufix);
236 unsigned long int enqueueTask(
fddOpType opT,
unsigned long int idSrc,
unsigned long int idRes,
int funcId,
size_t size);
237 unsigned long int enqueueTask(
fddOpType opT,
unsigned long int id,
size_t size);
240 std::vector< std::pair<void *, size_t> > recvTaskResult(
unsigned long int &tid,
unsigned long int &sid, system_clock::time_point & start);
242 template <
typename K>
243 void sendKeyMap(
unsigned long id, std::unordered_map<K, int> & keyMap);
244 template <
typename K>
245 void sendCogroupData(
unsigned long id, std::unordered_map<K, int> & keyMap, std::vector<bool> & flags);
247 void setInternal(
size_t i,
bool b){
251 template <
typename Z,
typename FDD>
252 void collectFDD(Z & ret, FDD * fddP){
255 comm->sendCollect(fddP->getId());
258 comm->recvFDDDataCollect(ret);
266 template <
typename T>
267 void parallelize(
unsigned long int id, T * data,
size_t size){
271 for (
int i = 1; i < comm->numProcs; ++i){
272 int dataPerProc = size/(comm->numProcs - 1);
273 int rem = size % (comm->numProcs -1);
278 comm->sendFDDSetData(
id, i, &data[offset], dataPerProc);
279 offset += dataPerProc;
282 comm->waitForReq(comm->numProcs - 1);
285 template <
typename K,
typename T>
286 void parallelizeI(
unsigned long int id, K * keys, T * data,
size_t size){
290 for (
int i = 1; i < comm->numProcs; ++i){
291 int dataPerProc = size/(comm->numProcs - 1);
292 int rem = size % (comm->numProcs -1);
297 comm->sendFDDSetIData(
id, i, &keys[offset], &data[offset], dataPerProc);
298 offset += dataPerProc;
301 comm->waitForReq(comm->numProcs - 1);
306 template <
typename T>
307 void parallelize(
unsigned long int id, T ** data,
size_t * dataSizes,
size_t size){
311 for (
int i = 1; i < comm->numProcs; ++i){
312 int dataPerProc = size/ (comm->numProcs - 1);
313 int rem = size % (comm->numProcs -1);
318 comm->sendFDDSetData(
id, i, &data[offset], &dataSizes[offset], dataPerProc);
319 offset += dataPerProc;
322 comm->waitForReq(comm->numProcs - 1);
325 template <
typename K,
typename T>
326 void parallelizeI(
unsigned long int id, K * keys, T ** data,
size_t * dataSizes,
size_t size){
330 for (
int i = 1; i < comm->numProcs; ++i){
331 int dataPerProc = size/ (comm->numProcs - 1);
332 int rem = size % (comm->numProcs -1);
337 comm->sendFDDSetIData(
id, i, &keys[offset], &data[offset], &dataSizes[offset], dataPerProc);
338 offset += dataPerProc;
341 comm->waitForReq(comm->numProcs - 1);
346 std::vector<size_t> getAllocation(
size_t size);
348 void discardFDD(
unsigned long int id);
354 globalTable.insert( globalTable.end(), std::make_tuple(varP,
sizeof(T), 0) );
358 globalTable.insert( globalTable.end(), std::make_tuple(varP, s, POINTER) );
363 globalTable.insert( globalTable.end(), std::make_tuple(varP,
sizeof(T) * varP->size(), VECTOR) );
398 template <
typename K,
typename T>
400 auto start = system_clock::now();
402 int funcId = findFunc((
void*)funcP);
403 std::vector<size_t> alloc(comm->numProcs, 0);
404 unsigned long int sid;
408 unsigned long int tid = enqueueTask(OP_OnFullPRead, newFdd->
getId(), -1, funcId, 0);
409 comm->sendFileName(path);
412 auto result = recvTaskResult(tid, sid, start);
414 for (
int i = 1; i < numProcs(); ++i){
415 if (result[i].second > 0){
416 alloc[i] = * (
size_t*) result[i].first;
420 newFdd->setSize(newSize);
423 scheduler->setAllocation(alloc, newSize);
429 template <
typename K,
typename T>
430 indexedFdd<K,T> * fastContext::onlinePartRead(std::string path, IonlineFullPartFuncP<K,T> funcP){
431 auto start = system_clock::now();
433 int funcId = findFunc((
void*)funcP);
434 std::vector<size_t> alloc(comm->numProcs, 0);
435 unsigned long int sid;
439 unsigned long int tid = enqueueTask(OP_OnPartRead, newFdd->
getId(), -1, funcId, 0);
440 comm->sendFileName(path);
443 auto result = recvTaskResult(tid, sid, start);
445 for (
int i = 1; i < numProcs(); ++i){
446 if (result[i].second > 0){
447 alloc[i] = * (
size_t*) result[i].first;
451 newFdd->setSize(newSize);
454 scheduler->setAllocation(alloc, newSize);
459 template <
typename K>
460 void fastContext::sendKeyMap(
unsigned long tid, std::unordered_map<K, int> & keyMap){
461 comm->sendKeyMap(tid, keyMap);
463 template <
typename K>
464 void fastContext::sendCogroupData(
unsigned long tid, std::unordered_map<K, int> & keyMap, std::vector<bool> &flags){
465 comm->sendCogroupData(tid, keyMap, flags);
unsigned int fddOpType
Dataset operation type.
void registerGlobal(T *varP)
Gegisters a primitive global varible to be used inside used defined functions in distributted environ...
void setGroupedByMap(bool gbm)
(UNUSED)
fastSettings(const fastSettings &s UNUSED)
fastSetting dummy constructor
core class that implements simple operations.
int getId()
Returns the identification number of the dataset.
void setGroupedByKey(bool gbk)
(UNUSED)
Fast Distributted Dataset(FDD) is like a cluster distributted Array. This class is the user side impl...
unsigned int fddType
Dataset type.
Context Configuration Class.
int numProcs()
Return the number of processes running.
fastSettings()
fastSetting default constructor
void allowDataBalancing()
Enables dynamic load balancing.
fdd< T > * onlineFullPartRead(std::string path, onlineFullPartFuncP< T > funcP)
Reads a file with online parsing and partition (NOT IMPLEMENTED)