libfaster API Documentation  Development Version
Super fast distributted computing
fastScheduler.cpp
1 #include <vector>
2 #include <cstring>
3 #include <iostream>
4 #include <algorithm>
5 #include <utility>
6 #include <stdio.h>
7 #include <iomanip>
8 
9 // Get RAM
10 #include "sys/types.h"
11 #include "sys/sysinfo.h"
12 
13 #include "fastScheduler.h"
14 #include "fastTask.h"
15 #include "misc.h"
16 
17 faster::fastScheduler::fastScheduler(unsigned int numProcs, std::vector<std::string> * funcName){
18  this->numProcs = numProcs;
19  numTasks = 0;
20  resetProcessWeights();
21  _dataMigrationNeeded = false;
22  this->funcName = funcName;
23  infoPos = 0;
24 }
25 faster::fastScheduler::~fastScheduler(){
26  for ( auto it = taskList.begin(); it != taskList.end() ; it++ ){
27  for (size_t i = 0; i < (*it)->globals.size(); i++){
28  delete [] (char*) std::get<0>((*it)->globals[i]);
29  }
30  delete (*it);
31  }
32  taskList.clear();
33 }
34 
35 void faster::fastScheduler::resetProcessWeights(){
36  currentWeights = std::vector<double>( numProcs, 1/(double)(numProcs-1) );
37  currentWeights[0] = 0;
38  //currentWeights[1] += currentWeights[2]/2;
39  //currentWeights[2] /= 2;
40 }
41 bool faster::fastScheduler::dataMigrationNeeded(){
42  return ( _dataMigrationNeeded && ( taskList.size() > 1 ) );
43 }
44 
45 std::vector<std::deque< std::pair<int,long int> >> faster::fastScheduler::getDataMigrationInfo(){
46  std::vector<std::deque< std::pair<int,long int> >> r (numProcs, std::deque<std::pair<int,long int>>());
47  if (taskList.size() > 1){
48  std::vector<std::pair<int,long int>> delta(numProcs,std::pair<int,size_t>(0,0));
49  unsigned int i, j;
50  fastTask * thisTask = taskList.back();
51  fastTask * lastTask = taskList[taskList.size() - 2];
52 
53  // Calculate data delta :D
54  delta[0].first = 0;
55  delta[0].second = 0;
56  for( i = 1; i < numProcs; ++i){
57  delta[i].first = i;
58  delta[i].second = lastTask->size*(thisTask->allocation.get()[0][i] - lastTask->allocation.get()[0][i]);
59  }
60  std::sort(delta.begin(), delta.end(), [](std::pair<int,size_t> a, std::pair<int,size_t> b){return a.second < b.second;});
61 
62  std::cerr << " [ Migration info: ";
63  for( size_t i = 0; i < numProcs; ++i){
64  std::cerr << delta[i].first << ":" << delta[i].second << " ";
65  }
66  std::cerr << "]\n";
67 
68  int carry = 0;
69  i = 0;
70  j = numProcs-1;
71  while (i < j){
72  carry = delta[i].second + delta[j].second;
73  // Less then enough
74  if (carry < 0){
75  // To
76  r[delta[i].first].push_back(std::make_pair(delta[j].first, delta[i].second));
77  // From
78  r[delta[j].first].push_back(std::make_pair(delta[i].first, -delta[i].second));
79  i++;
80  }else{
81  // More then enough
82  if (carry > 0){
83  // To
84  r[delta[i].first].push_back(std::make_pair(delta[j].first, -delta[j].second));
85  // From
86  r[delta[j].first].push_back(std::make_pair(delta[i].first, delta[j].second));
87  j--;
88  }else{
89  // To
90  r[delta[i].first].push_back(std::make_pair(delta[j].first, delta[i].second));
91  // From
92  r[delta[j].first].push_back(std::make_pair(delta[i].first, -delta[i].second));
93  i++;
94  j--;
95  }
96  }
97  }
98  }
99  return r;
100 }
101 
102 std::vector<size_t> faster::fastScheduler::getAllocation(size_t size){
103  std::vector<size_t> r(numProcs, 0);
104  size_t sum = 0;
105  for(unsigned int i = 1; i < numProcs; ++i){
106  r[i] = size * currentWeights[i];
107  sum += r[i];
108  }
109 
110  if ( sum < size ){
111  for(unsigned int i = 1; i < numProcs; ++i){
112  if ( i <= (size - sum) )
113  r[i] ++;
114  }
115  }
116 
117  return r;
118 }
119 void faster::fastScheduler::setAllocation(std::vector<size_t> & alloc, size_t size){
120  for ( size_t i = 0; i < alloc.size(); i++ ){
121  currentWeights[i] = (double) alloc[i] / size;
122  }
123 }
124 
125 void faster::fastScheduler::updateWeights(){
126  unsigned int i;
127  if (taskList.size() > 0){
128  fastTask * lastTask = taskList.back();
129  std::vector<double> rate(numProcs, 0);
130 
131  std::cerr << " [ Exec.Times: ";
132  for( i = 1; i < numProcs; ++i){
133  std::cerr << lastTask->times[i] << " ";
134  rate[i] = lastTask->allocation.get()[0][i]/(double)lastTask->times[i];
135  }
136  std::cerr << " ]\n";
137 
138  double powersum = 0;
139  for( i = 1; i < numProcs; ++i){
140  powersum += rate[i] ;
141  }
142 
143  if (powersum > 0)
144  for( i = 1; i < numProcs; ++i){
145  currentWeights[i] = rate[i] / powersum;
146  }
147  else
148  resetProcessWeights();
149  currentWeights[0] = 0;
150  }
151 }
152 
153 
154 std::shared_ptr <std::vector<double>> faster::fastScheduler::getNewAllocation(){
155  std::shared_ptr<std::vector<double>> r;
156  _dataMigrationNeeded = false;
157 
158  // see if the response times are too disbalanced
159  if (taskList.size() > 0) {
160  auto &t = taskList.back()->times;
161  double m = mean(t);
162  double sd = stdDev(t, m);
163  //if ( m > 800 )
164  //std::cerr << " [ Av. Exec. Time:" << m << " VC:" << sd/m << " ]";
165  if ( (m > 300) && ( sd/m > 1 ) ){
166  _dataMigrationNeeded = true;
167  }
168  }
169 
170  if ( (taskList.size() > 0) && ( ! _dataMigrationNeeded) ){
171  //r = taskList.back()->allocation;
172  r = taskList.back()->allocation;
173  }else{
174 
175  r = std::make_shared<std::vector<double>>(numProcs);
176  }
177 
178  if ( (taskList.size() > 0) && ( _dataMigrationNeeded) )
179  updateWeights();
180 
181  r.get()[0][0] = 0;
182  //std::cerr << " [ Processes Weights: ";
183  for( size_t i = 1; i < numProcs; ++i){
184  r.get()[0][i] = currentWeights[i];
185  //std::cerr << r[i] << " ";
186  }
187  //std::cerr << "]\n";
188 
189 
190  return r;
191 }
192 
193 faster::fastTask * faster::fastScheduler::enqueueTask(fddOpType opT, unsigned long int idSrc, unsigned long int idRes, int funcId, size_t size, std::vector< std::tuple<void*, size_t, int> > & globalTable){
194  fastTask * newTask = new fastTask();
195  newTask->id = numTasks++;
196  newTask->srcFDD = idSrc;
197  newTask->destFDD = idRes;
198  newTask->size = size;
199  newTask->operationType = opT;
200  newTask->functionId = funcId;
201  newTask->workersFinished = 0;
202  newTask->allocation = getNewAllocation();
203  newTask->duration = 0;
204  newTask->times = std::vector<size_t>(numProcs, 0);
205  newTask->procstats = std::vector<procstat>(numProcs);
206 
207  for ( size_t i = 0; i < globalTable.size(); i++){
208  size_t s = std::get<1>(globalTable[i]);
209  void * var = new char[s];
210  int type = std::get<2>(globalTable[i]);
211  //std::cerr << "T:" << type << " S:" << s << "\n";
212 
213  if (type & POINTER){
214  //std::cerr << "POINTER ";
215  std::memcpy(var, *(char**)std::get<0>(globalTable[i]), s);
216  }else{
217  //std::cerr << "NOTPOINTER ";
218  std::memcpy(var, std::get<0>(globalTable[i]), s);
219  }
220 
221  newTask->globals.insert(newTask->globals.end(), std::make_tuple(var, std::get<1>(globalTable[i]), type) );
222  }
223 
224  taskList.insert(taskList.end(), newTask);
225 
226  return newTask;
227 }
228 
229 faster::fastTask * faster::fastScheduler::enqueueTask(fddOpType opT, unsigned long int sid, size_t size, std::vector< std::tuple<void*, size_t, int> > & globalTable){
230  return enqueueTask(opT, sid, 0, -1, size, globalTable );
231 }
232 
233 void faster::fastScheduler::taskProgress(unsigned long int tid, unsigned long int pid, size_t time, procstat & stat){
234  taskList[tid]->workersFinished++;
235  taskList[tid]->times[pid] = time;
236  taskList[tid]->procstats[pid] = stat;
237 }
238 
239 void faster::fastScheduler::taskFinished(unsigned long int tid, size_t time){
240  taskList[tid]->duration = time;
241  taskList[tid]->procstats[0] = getProcStat();
242 }
243 
244 void faster::fastScheduler::setCalibration(std::vector<size_t> time){
245  size_t sum = 0;
246 
247  for ( size_t i = 1; i < time.size(); ++i){
248  sum += time[i];
249  }
250 
251  std::vector<double> rate(numProcs, 0);
252 
253  for ( size_t i = 1; i < time.size(); ++i){
254  rate[i] = (double) sum / time[i];
255  //rate[i] = (double) sum / (time[i]*log(time[i]));
256  }
257 
258  double powersum = 0;
259  for( size_t i = 1; i < numProcs; ++i){
260  powersum += rate[i] ;
261  }
262 
263  for ( size_t i = 1; i < time.size(); ++i){
264  currentWeights[i] = rate[i] / powersum;
265  }
266 }
267 
268 void faster::fastScheduler::printProcstats(fastTask * task){
269  double mram=0, mutime=0, mstime=0;
270  for ( size_t i = 1; i < task->procstats.size(); i++ ){
271  mram += task->procstats[i].ram;
272  mutime += task->procstats[i].utime;
273  mstime += task->procstats[i].stime;
274  }
275  mram /= task->procstats.size()-1;
276  mutime /= task->procstats.size()-1;
277  mstime /= task->procstats.size()-1;
278 
279  fprintf(stderr, "\033[1;34m%4.1lf %3.1lf %3.1lf ", mram, mutime, mstime);
280 
281  std::cerr << "\033[0m| " ;
282  for ( size_t i = 0; i < task->procstats.size(); i++ ){
283  fprintf(stderr, "%4.1lf ", task->procstats[i].ram);
284  }
285 
286 }
287 
288 void faster::fastScheduler::printTaskInfo(size_t taskID){
289  auto task = taskList[taskID];
290  std::vector<size_t> t = task->times;
291  t.erase(t.begin());
292  double m = mean(t);
293  double sd = stdDev(t, m);
294 
295 
296  std::cerr << "\033[1;34m " ;
297  fprintf(stderr, "%2ld ", task->id);
298  if ((task->operationType & (OP_GENERICREDUCE | OP_GENERICMAP | OP_GENERICUPDATE) ) && ((*funcName)[task->functionId].size() > 0)){
299  std::cerr << decodeOptypeAb(task->operationType) << " " ;
300  std::cerr << std::setw(9) << (*funcName)[task->functionId] << "\t";
301  }else{
302  std::cerr << std::setw(15) << decodeOptype(task->operationType) << "\t";
303  }
304 
305  std::cerr << "\033[0m " ;
306  fprintf(stderr, "%2d %2ld %2ld ", task->functionId, task->srcFDD, task->destFDD );
307 
308  std::cerr << "| \033[1;31m " ;
309  fprintf(stderr, "%5ld %6.1lf %3.1lf ", task->duration, m, sd/m);
310 
311  std::cerr << "\033[0m| " ;
312  printProcstats(task);
313  std::cerr << "| " ;
314 
315  for ( auto it2 = t.begin() ; it2 != t.end(); it2++){
316  fprintf(stderr, "%5ld ", *it2);
317  }
318 
319  std::cerr << "\n";
320 }
321 void faster::fastScheduler::printHeader(){
322  std::cerr << "\033[1;34mID# Task\033[0m Func Src Dest | \033[1;31mDuration(ms) Avg_Proc_Time PT_CV \033[0m | RAM | Individual_Processing_Times\n";
323 }
324 
325 void faster::fastScheduler::printTaskInfo(){
326 
327  double cvsum = 0;
328  size_t mm = 0;
329  int count = 0;
330 
331  printHeader();
332 
333  //for ( auto it = taskList.begin(); it != taskList.end(); it++){
334  for ( size_t i = 0; i < taskList.size(); ++i ){
335  std::vector<size_t> t = (taskList[i])->times;
336  t.erase(t.begin());
337  double m = mean(t);
338  double sd = stdDev(t, m);
339 
340  mm += m;
341  if( m > 100 ){
342  cvsum += sd/m;
343  count++;
344  }
345 
346  printTaskInfo(i);
347  }
348  std::cerr << "\n av_Time:" << mm/taskList.size() << " av_CV:" << cvsum/count << "\n";
349 }
350 
351 
352 void faster::fastScheduler::updateTaskInfo(){
353  while ( infoPos < taskList.size() ){
354  printTaskInfo(infoPos++);
355  }
356 }
357 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41