libfaster API Documentation  Development Version
Super fast distributted computing
workerRun.cpp
1 #include <string>
2 #include <fstream>
3 #include <iostream>
4 #include <unistd.h>
5 
6 #include "fastComm.h"
7 #include "worker.h"
8 
9 void faster::worker::run(){
10  //std::cerr << " Worker Working..." << '\n';
11  while (! finished ){
12  int tag;
13  int msgSrc;
14  fastTask task;
15  unsigned long int id;
16  fddType tType, kType;
17  void * data = NULL;
18  void * keys = NULL;
19  //void ** data2D = NULL;
20  size_t * lineSizes = NULL;
21  size_t size, offset;
22  std::string name, name2;
23  std::vector<unsigned long int> idV;
24 
25  // Wait for a message to arrive
26  comm->probeMsgs(tag, msgSrc);
27 
28  switch(tag){
29  case MSG_TASK:
30  //std::cerr << " R:Task \n";
31  comm->recvTask(task);
32  //std::cerr << "ID:" << task.id << " FDD:" << task.srcFDD << " F:" << task.functionId << " ";
33  solve(task);
34  //std::cerr << ".\n";
35  break;
36 
37  case MSG_CREATEFDD:
38  //std::cerr << " R:CreateFdd\n";
39  comm->recvCreateFDD(id, tType, size);
40  //std::cerr << " ID:" << id << " T:" << (int) tType << " S:" << size << " ";
41  createFDD(id, tType, size);
42  //std::cerr << ".\n";
43  break;
44 
45  case MSG_CREATEIFDD:
46  //std::cerr << " R:CreateIFdd \n";
47  comm->recvCreateIFDD(id, kType, tType, size);
48  //std::cerr << "ID:" << id << " K:" << (int) kType << " T:" << (int) tType << " S:" << size << " ";
49  createIFDD(id, kType, tType, size);
50  //std::cerr << ".\n";
51  break;
52  case MSG_CREATEGFDD:
53  //std::cerr << " R:CreateFDDGroup \n";
54  comm->recvCreateFDDGroup(id, kType, idV);
55  //std::cerr << "ID:" << id << " K:" << (int) kType << " NumMembers:" << (int) idV.size() << " ";
56  createFDDGroup(id, kType, idV );
57  //std::cerr << ".\n";
58  break;
59 
60  case MSG_DISCARDFDD:
61  //std::cerr << " R:DestroyFdd\n";
62  comm->recvDiscardFDD(id);
63  //std::cerr << "ID:" << id << " ";
64  discardFDD(id);
65  //std::cerr << ".\n";
66  break;
67 
68  case MSG_FDDSETDATAID:
69  //std::cerr << " R:SetFddData\n";
70  comm->recvFDDSetData(id, data, size);
71  //std::cerr << "ID:" << id << " S:" << size << " ";
72  setFDDData(id, data, size);
73  //std::cerr << ".\n";
74  break;
75 
76  case MSG_FDDSET2DDATAID:
77  //std::cerr << " R:SetFdd2DData ";
78  //comm->recvFDDSetData(id, data2D, lineSizes, size);
79  comm->recvFDDSetData(id, data, lineSizes, size);
80  //std::cerr << "ID:" << id << " S:" << size << " ";
81  setFDDData(id, data, lineSizes, size);
82  //delete [] data2D;
83  //std::cerr << ".\n";
84  break;
85 
86  case MSG_FDDSETIDATAID:
87  //std::cerr << " R:SetFddIData \n";
88  comm->recvFDDSetIData(id, keys, data, size);
89  //std::cerr << "ID:" << id << " S:" << size << " ";
90  setFDDIData(id, keys, data, size);
91  //std::cerr << ".\n";
92  break;
93 
94  case MSG_FDDSET2DIDATAID:
95  //std::cerr << " R:SetFdd2DIData ";
96  comm->recvFDDSetIData(id, keys, data, lineSizes, size);
97  //std::cerr << "ID:" << id << " S:" << size << " ";
98  setFDDIData(id, keys, data, lineSizes, size);
99  //delete [] data2D;
100  //std::cerr << ".\n";
101  break;
102 
103  case MSG_READFDDFILE:
104  //std::cerr << " R:ReadFddFile \n" ;
105  comm->recvReadFDDFile(id, name, size, offset);
106  //std::cerr << "ID:" << id <<" F:" << name << "(offset:" << offset << ")";
107  readFDDFile(id, name, size, offset);
108  //std::cerr << ".\n";
109  break;
110  case MSG_WRITEFDDFILE:
111  //std::cerr << " R:ReadFddFile \n" ;
112  comm->recvWriteFDDFile(id, name, name2);
113  //std::cerr << "ID:" << id <<" F:" << name << "(offset:" << offset << ")";
114  writeFDDFile(id, name, name2);
115  //std::cerr << ".\n";
116  break;
117 
118  case MSG_COLLECT:
119  //std::cerr << " R:Collect \n";
120  comm->recvCollect(id);
121  //std::cerr << "ID:" << id << " ";
122  //getFDDData(id, data, size);
123  //comm->sendFDDData(id, 0, data, size);
124  collect(id);
125  //std::cerr << ".\n";
126  break;
127 
128  case MSG_FINISH:
129  //std::cerr << " R:FINISH \n";
130  comm->recvFinish();
131  finished = true;
132  //std::cerr << ".\n";
133  break;
134  default:
135  std::cerr << " \033[1;31mR:ERROR UNRECOGNIZED MESSAGE " << tag << " from " << msgSrc << "!!!!!!!!\033[0m\n";
136  usleep(100000);
137  break;
138  }
139  }
140  //std::cerr << " Worker " << comm->getProcId() << " DONE" << '\n';
141 }
142 
143 
unsigned int fddType
Dataset type.
Definition: definitions.h:16