10 #include "sys/types.h" 11 #include "sys/sysinfo.h" 13 #include "fastScheduler.h" 17 faster::fastScheduler::fastScheduler(
unsigned int numProcs, std::vector<std::string> * funcName){
18 this->numProcs = numProcs;
20 resetProcessWeights();
21 _dataMigrationNeeded =
false;
22 this->funcName = funcName;
25 faster::fastScheduler::~fastScheduler(){
26 for (
auto it = taskList.begin(); it != taskList.end() ; it++ ){
27 for (
size_t i = 0; i < (*it)->globals.size(); i++){
28 delete [] (
char*) std::get<0>((*it)->globals[i]);
35 void faster::fastScheduler::resetProcessWeights(){
36 currentWeights = std::vector<double>( numProcs, 1/(double)(numProcs-1) );
37 currentWeights[0] = 0;
41 bool faster::fastScheduler::dataMigrationNeeded(){
42 return ( _dataMigrationNeeded && ( taskList.size() > 1 ) );
45 std::vector<std::deque< std::pair<int,long int> >> faster::fastScheduler::getDataMigrationInfo(){
46 std::vector<std::deque< std::pair<int,long int> >> r (numProcs, std::deque<std::pair<int,long int>>());
47 if (taskList.size() > 1){
48 std::vector<std::pair<int,long int>> delta(numProcs,std::pair<int,size_t>(0,0));
50 fastTask * thisTask = taskList.back();
51 fastTask * lastTask = taskList[taskList.size() - 2];
56 for( i = 1; i < numProcs; ++i){
58 delta[i].second = lastTask->size*(thisTask->allocation.get()[0][i] - lastTask->allocation.get()[0][i]);
60 std::sort(delta.begin(), delta.end(), [](std::pair<int,size_t> a, std::pair<int,size_t> b){
return a.second < b.second;});
62 std::cerr <<
" [ Migration info: ";
63 for(
size_t i = 0; i < numProcs; ++i){
64 std::cerr << delta[i].first <<
":" << delta[i].second <<
" ";
72 carry = delta[i].second + delta[j].second;
76 r[delta[i].first].push_back(std::make_pair(delta[j].first, delta[i].second));
78 r[delta[j].first].push_back(std::make_pair(delta[i].first, -delta[i].second));
84 r[delta[i].first].push_back(std::make_pair(delta[j].first, -delta[j].second));
86 r[delta[j].first].push_back(std::make_pair(delta[i].first, delta[j].second));
90 r[delta[i].first].push_back(std::make_pair(delta[j].first, delta[i].second));
92 r[delta[j].first].push_back(std::make_pair(delta[i].first, -delta[i].second));
102 std::vector<size_t> faster::fastScheduler::getAllocation(
size_t size){
103 std::vector<size_t> r(numProcs, 0);
105 for(
unsigned int i = 1; i < numProcs; ++i){
106 r[i] = size * currentWeights[i];
111 for(
unsigned int i = 1; i < numProcs; ++i){
112 if ( i <= (size - sum) )
119 void faster::fastScheduler::setAllocation(std::vector<size_t> & alloc,
size_t size){
120 for (
size_t i = 0; i < alloc.size(); i++ ){
121 currentWeights[i] = (double) alloc[i] / size;
125 void faster::fastScheduler::updateWeights(){
127 if (taskList.size() > 0){
128 fastTask * lastTask = taskList.back();
129 std::vector<double> rate(numProcs, 0);
131 std::cerr <<
" [ Exec.Times: ";
132 for( i = 1; i < numProcs; ++i){
133 std::cerr << lastTask->times[i] <<
" ";
134 rate[i] = lastTask->allocation.get()[0][i]/(double)lastTask->times[i];
139 for( i = 1; i < numProcs; ++i){
140 powersum += rate[i] ;
144 for( i = 1; i < numProcs; ++i){
145 currentWeights[i] = rate[i] / powersum;
148 resetProcessWeights();
149 currentWeights[0] = 0;
154 std::shared_ptr <std::vector<double>> faster::fastScheduler::getNewAllocation(){
155 std::shared_ptr<std::vector<double>> r;
156 _dataMigrationNeeded =
false;
159 if (taskList.size() > 0) {
160 auto &t = taskList.back()->times;
162 double sd = stdDev(t, m);
165 if ( (m > 300) && ( sd/m > 1 ) ){
166 _dataMigrationNeeded =
true;
170 if ( (taskList.size() > 0) && ( ! _dataMigrationNeeded) ){
172 r = taskList.back()->allocation;
175 r = std::make_shared<std::vector<double>>(numProcs);
178 if ( (taskList.size() > 0) && ( _dataMigrationNeeded) )
183 for(
size_t i = 1; i < numProcs; ++i){
184 r.get()[0][i] = currentWeights[i];
193 faster::fastTask * faster::fastScheduler::enqueueTask(
fddOpType opT,
unsigned long int idSrc,
unsigned long int idRes,
int funcId,
size_t size, std::vector< std::tuple<void*, size_t, int> > & globalTable){
194 fastTask * newTask =
new fastTask();
195 newTask->id = numTasks++;
196 newTask->srcFDD = idSrc;
197 newTask->destFDD = idRes;
198 newTask->size = size;
199 newTask->operationType = opT;
200 newTask->functionId = funcId;
201 newTask->workersFinished = 0;
202 newTask->allocation = getNewAllocation();
203 newTask->duration = 0;
204 newTask->times = std::vector<size_t>(numProcs, 0);
205 newTask->procstats = std::vector<procstat>(numProcs);
207 for (
size_t i = 0; i < globalTable.size(); i++){
208 size_t s = std::get<1>(globalTable[i]);
209 void * var =
new char[s];
210 int type = std::get<2>(globalTable[i]);
215 std::memcpy(var, *(
char**)std::get<0>(globalTable[i]), s);
218 std::memcpy(var, std::get<0>(globalTable[i]), s);
221 newTask->globals.insert(newTask->globals.end(), std::make_tuple(var, std::get<1>(globalTable[i]), type) );
224 taskList.insert(taskList.end(), newTask);
229 faster::fastTask * faster::fastScheduler::enqueueTask(
fddOpType opT,
unsigned long int sid,
size_t size, std::vector< std::tuple<void*, size_t, int> > & globalTable){
230 return enqueueTask(opT, sid, 0, -1, size, globalTable );
233 void faster::fastScheduler::taskProgress(
unsigned long int tid,
unsigned long int pid,
size_t time, procstat & stat){
234 taskList[tid]->workersFinished++;
235 taskList[tid]->times[pid] = time;
236 taskList[tid]->procstats[pid] = stat;
239 void faster::fastScheduler::taskFinished(
unsigned long int tid,
size_t time){
240 taskList[tid]->duration = time;
241 taskList[tid]->procstats[0] = getProcStat();
244 void faster::fastScheduler::setCalibration(std::vector<size_t> time){
247 for (
size_t i = 1; i < time.size(); ++i){
251 std::vector<double> rate(numProcs, 0);
253 for (
size_t i = 1; i < time.size(); ++i){
254 rate[i] = (double) sum / time[i];
259 for(
size_t i = 1; i < numProcs; ++i){
260 powersum += rate[i] ;
263 for (
size_t i = 1; i < time.size(); ++i){
264 currentWeights[i] = rate[i] / powersum;
268 void faster::fastScheduler::printProcstats(fastTask * task){
269 double mram=0, mutime=0, mstime=0;
270 for (
size_t i = 1; i < task->procstats.size(); i++ ){
271 mram += task->procstats[i].ram;
272 mutime += task->procstats[i].utime;
273 mstime += task->procstats[i].stime;
275 mram /= task->procstats.size()-1;
276 mutime /= task->procstats.size()-1;
277 mstime /= task->procstats.size()-1;
279 fprintf(stderr,
"\033[1;34m%4.1lf %3.1lf %3.1lf ", mram, mutime, mstime);
281 std::cerr <<
"\033[0m| " ;
282 for (
size_t i = 0; i < task->procstats.size(); i++ ){
283 fprintf(stderr,
"%4.1lf ", task->procstats[i].ram);
288 void faster::fastScheduler::printTaskInfo(
size_t taskID){
289 auto task = taskList[taskID];
290 std::vector<size_t> t = task->times;
293 double sd = stdDev(t, m);
296 std::cerr <<
"\033[1;34m " ;
297 fprintf(stderr,
"%2ld ", task->id);
298 if ((task->operationType & (OP_GENERICREDUCE | OP_GENERICMAP | OP_GENERICUPDATE) ) && ((*funcName)[task->functionId].size() > 0)){
299 std::cerr << decodeOptypeAb(task->operationType) <<
" " ;
300 std::cerr << std::setw(9) << (*funcName)[task->functionId] <<
"\t";
302 std::cerr << std::setw(15) << decodeOptype(task->operationType) <<
"\t";
305 std::cerr <<
"\033[0m " ;
306 fprintf(stderr,
"%2d %2ld %2ld ", task->functionId, task->srcFDD, task->destFDD );
308 std::cerr <<
"| \033[1;31m " ;
309 fprintf(stderr,
"%5ld %6.1lf %3.1lf ", task->duration, m, sd/m);
311 std::cerr <<
"\033[0m| " ;
312 printProcstats(task);
315 for (
auto it2 = t.begin() ; it2 != t.end(); it2++){
316 fprintf(stderr,
"%5ld ", *it2);
321 void faster::fastScheduler::printHeader(){
322 std::cerr <<
"\033[1;34mID# Task\033[0m Func Src Dest | \033[1;31mDuration(ms) Avg_Proc_Time PT_CV \033[0m | RAM | Individual_Processing_Times\n";
325 void faster::fastScheduler::printTaskInfo(){
334 for (
size_t i = 0; i < taskList.size(); ++i ){
335 std::vector<size_t> t = (taskList[i])->times;
338 double sd = stdDev(t, m);
348 std::cerr <<
"\n av_Time:" << mm/taskList.size() <<
" av_CV:" << cvsum/count <<
"\n";
352 void faster::fastScheduler::updateTaskInfo(){
353 while ( infoPos < taskList.size() ){
354 printTaskInfo(infoPos++);
unsigned int fddOpType
Dataset operation type.