5 #include "fastContext.h" 31 for (
size_t i = 0; i < fddList.size(); i++){
49 funcTable.insert(funcTable.end(), funcP);
50 funcName.insert(funcName.end(),
"");
54 funcTable.insert(funcTable.end(), funcP);
55 funcName.insert(funcName.end(), name);
60 if ( ! comm->isDriver() ){
77 return comm->isDriver();
81 using std::chrono::system_clock;
82 using std::chrono::duration_cast;
83 using std::chrono::milliseconds;
85 unsigned long int rid;
86 unsigned long int sid = 0;
88 std::vector<size_t> time(comm->numProcs, 0);
90 auto start = system_clock::now();
91 unsigned long int tid = enqueueTask(OP_Calibrate, 0, 1000);
93 for (
int i = 1; i < comm->numProcs; ++i){
96 void * result UNUSED = comm->recvTaskResult(rid, sid, size, t, stat);
98 scheduler->taskProgress(tid, sid, t, stat);
100 comm->waitForReq(comm->numProcs - 1);
102 auto duration = duration_cast<milliseconds>(system_clock::now() - start);
103 scheduler->taskFinished(tid, duration.count());
105 scheduler->setCalibration(time);
108 int faster::fastContext::findFunc(
void * funcP){
110 for(
size_t i = 0; i < funcTable.size(); ++i){
111 if (funcTable[i] == funcP)
118 unsigned long int faster::fastContext::_createFDD(
fddBase * ref,
fddType type,
const std::vector<size_t> * dataAlloc){
121 for (
int i = 1; i < comm->numProcs; ++i){
124 comm->sendCreateFDD(numFDDs, type, (*dataAlloc)[i], i);
128 comm->sendCreateFDD(numFDDs, type, 0, i);
133 fddList.push_back(ref);
134 fddInternal.push_back(
false);
135 comm->waitForReq(comm->numProcs - 1);
141 unsigned long int faster::fastContext::_createIFDD(
fddBase * ref,
fddType kType,
fddType tType,
const std::vector<size_t> * dataAlloc){
144 for (
int i = 1; i < comm->numProcs; ++i){
146 if (dataAlloc != NULL){
148 comm->sendCreateIFDD(numFDDs, kType, tType, (*dataAlloc)[i], i);
152 comm->sendCreateIFDD(numFDDs, kType, tType, 0, i);
156 fddList.push_back(ref);
157 fddInternal.push_back(
false);
158 comm->waitForReq(comm->numProcs - 1);
165 unsigned long int faster::fastContext::createFDD(
fddBase * ref,
size_t typeCode,
const std::vector<size_t> & dataAlloc){
166 return _createFDD(ref, decodeType(typeCode), &dataAlloc);
168 unsigned long int faster::fastContext::createFDD(
fddBase * ref,
size_t typeCode){
169 return _createFDD(ref, decodeType(typeCode), NULL);
171 unsigned long int faster::fastContext::createPFDD(
fddBase * ref,
size_t typeCode,
const std::vector<size_t> & dataAlloc){
172 return _createFDD(ref, POINTER | decodeType(typeCode), &dataAlloc);
174 unsigned long int faster::fastContext::createPFDD(
fddBase * ref,
size_t typeCode){
175 return _createFDD(ref, POINTER | decodeType(typeCode), NULL);
177 unsigned long int faster::fastContext::createIFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode,
const std::vector<size_t> & dataAlloc){
178 return _createIFDD(ref, decodeType(kTypeCode), decodeType(tTypeCode), &dataAlloc);
180 unsigned long int faster::fastContext::createIFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode){
181 return _createIFDD(ref, decodeType(kTypeCode), decodeType(tTypeCode), NULL);
183 unsigned long int faster::fastContext::createIPFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode,
const std::vector<size_t> & dataAlloc){
184 return _createIFDD(ref, decodeType(kTypeCode), POINTER | decodeType(tTypeCode), &dataAlloc);
186 unsigned long int faster::fastContext::createIPFDD(
fddBase * ref,
size_t kTypeCode,
size_t tTypeCode){
187 return _createIFDD(ref, decodeType(kTypeCode), POINTER | decodeType(tTypeCode), NULL);
190 unsigned long int faster::fastContext::createFddGroup(
fddBase * ref, std::vector<fddBase*> & fddV){
191 std::vector<unsigned long int> members(fddV.size());
192 std::vector<fddType> dataTypeV(fddV.size());
194 fddType kType = fddV[0]->kType();
196 for(
size_t i = 0; i < fddV.size(); ++i){
197 members[i] = fddV[i]->getId();
200 comm->sendCreateFDDGroup( numFDDs, kType, members);
204 fddList.push_back(ref);
205 fddInternal.push_back(
false);
212 size_t findFileSize(
const char* filename)
214 std::ifstream in(filename, std::ifstream::in | std::ifstream::binary);
215 in.seekg(0, std::ifstream::end);
219 unsigned long int faster::fastContext::readFDD(
fddBase * ref,
const char * fileName){
221 size_t fileSize = findFileSize(fileName);
223 std::vector<size_t> dataAlloc = getAllocation(fileSize);
225 for (
int i = 1; i < comm->numProcs; ++i){
232 comm->sendReadFDDFile(ref->
getId(), std::string(fileName), dataAlloc[i], offset, i);
233 offset += dataAlloc[i];
237 comm->waitForReq(comm->numProcs - 1);
243 void faster::fastContext::getFDDInfo(
size_t & s, std::vector<size_t> & dataAlloc){
245 dataAlloc = std::vector<size_t>(comm->numProcs,0);
247 for (
int i = 1; i < comm->numProcs; ++i){
252 comm->recvFDDInfo(size, src);
253 dataAlloc[src] = size;
260 void faster::fastContext::writeToFile(
unsigned long int id,std::string & path, std::string & sufix){
261 unsigned long int tid, sid;
262 system_clock::time_point start;
264 comm->sendWriteFDDFile(
id, path, sufix);
266 for (
int i = 1; i < comm->numProcs; i++ ){
270 comm->recvTaskResult(tid, sid, s, time, stat);
274 unsigned long int faster::fastContext::enqueueTask(
fddOpType opT,
unsigned long int idSrc,
unsigned long int idRes,
int funcId,
size_t size){
275 fastTask * newTask = scheduler->enqueueTask(opT, idSrc, idRes, funcId, size, globalTable);
283 comm->sendTask(*newTask);
288 unsigned long int faster::fastContext::enqueueTask(
fddOpType opT,
unsigned long int id,
size_t size){
289 return enqueueTask(opT,
id, 0, -1, size);
292 std::vector< std::pair<void *, size_t> > faster::fastContext::recvTaskResult(
unsigned long int &tid,
unsigned long int &sid, system_clock::time_point & start){
293 using std::chrono::duration_cast;
294 using std::chrono::milliseconds;
296 std::vector< std::pair<void*, size_t> > result ( comm->numProcs );
300 for (
int i = 1; i < comm->numProcs; i++ ){
305 void * r = comm->recvTaskResult(tid, sid, size, time, stat);
306 result[sid].first = r;
307 result[sid].second = size;
310 scheduler->taskProgress(tid, sid, time, stat);
312 comm->waitForReq(comm->numProcs - 1);
314 auto duration = duration_cast<milliseconds>(system_clock::now() - start);
315 scheduler->taskFinished(tid, duration.count());
323 std::vector<size_t> faster::fastContext::getAllocation(
size_t size){
324 return scheduler->getAllocation(size);
327 void faster::fastContext::discardFDD(
unsigned long int id){
328 comm->sendDiscardFDD(
id);
332 scheduler->updateTaskInfo();
335 scheduler->printHeader();
339 scheduler->printTaskInfo();
unsigned int fddOpType
Dataset operation type.
void startWorkers()
Start worker machines computation.
void printInfo()
Prints task execution information for all tasks executed by the user.
void registerFunction(void *funcP)
Register a user custom function in the context.
int getId()
Returns the identification number of the dataset.
void updateInfo()
Prints information from tesk ran since last faster::fastContext::updateInfo() called.
bool isDriver()
Checks for the driver process.
~fastContext()
fastContext destructor
void calibrate()
Performs a microbenchmark to do dynamic load balancing (UNUSED)
unsigned int fddType
Dataset type.
Context Configuration Class.
void printHeader()
Prints a header for task execution information.
fastContext(int argc=0, char **argv=NULL)
fastContext default constructor