libfaster API Documentation  Development Version
Super fast distributted computing
workerFddCore.cpp
1 #include <fstream>
2 #include <chrono>
3 #include <iomanip>
4 
5 #include "_workerFdd.h"
6 #include "fastComm.h"
7 #include "fastCommBuffer.h"
8 #include "fddStorageExtern.cpp"
9 #include "misc.h"
10 
11 
12 template <typename T>
13 faster::workerFddCore<T>::workerFddCore(unsigned int ident, fddType t) : workerFddBase(ident, t){
14  keyType = Null;
15  localData = new fddStorage<T>();
16 }
17 
18 template <typename T>
19 faster::workerFddCore<T>::workerFddCore(unsigned int ident, fddType t, size_t size) : workerFddBase(ident, t){
20  keyType = Null;
21  if (size == 0)
22  localData = new fddStorage<T>();
23  else
24  localData = new fddStorage<T>(size);
25 }
26 
27 template <typename T>
29  delete localData;
30 }
31 
32 template <typename T>
34  return type;
35 }
36 template <typename T>
38  return Null;
39 }
40 
41 template <typename T>
42 T & faster::workerFddCore<T>::operator[](size_t address){
43  return localData->getData()[address];
44 }
45 template <typename T>
47  return localData->getData();
48 }
49 template <typename T>
51  return localData->getSize();
52 }
53 template <typename T>
55  return sizeof(T);
56 }
57 template <typename T>
59  return sizeof(T);
60 }
61 template <typename T>
62 void faster::workerFddCore<T>::setSize(size_t s) {
63  localData->setSize(s);
64 }
65 
66 template <typename T>
67 void faster::workerFddCore<T>::deleteItem(void * item) {
68  delete (T*) item;
69 }
70 template <typename T>
72  localData->shrink();
73 }
74 
75 template <typename T>
76 void printData(std::ofstream & outFile, std::vector<T> * data, size_t s){
77  outFile.precision(10);
78  for ( size_t i = 0; i < s; i++){
79  for ( size_t j = 0; j < data[i].size(); j++){
80  outFile << data[i][j] << " ";
81  }
82  outFile << "\n";
83  }
84 }
85 
86 template <typename T>
87 void printData(std::ofstream & outFile, T * data, size_t s){
88  outFile.precision(10);
89  for ( size_t i = 0; i < s; i++){
90  outFile << data[i] << "\n";
91  }
92 }
93 
94 template <typename T>
95 void faster::workerFddCore<T>::writeToFile(void * pathP, size_t procId, void * sufixP){
96  std::string path = * (std::string*) pathP;
97  std::string sufix = * (std::string*) sufixP;
98 
99  T * data = localData->getData();
100  size_t s = localData->getSize();
101 
102  std::string filename(path + std::to_string(procId) + sufix);
103  std::ofstream outFile(filename, std::ofstream::out);
104 
105  printData(outFile, data, s);
106 
107 }
108 
109 
110 template <typename T>
111 void faster::workerFddCore<T>::preapply(long unsigned int id, void * func, fddOpType op, workerFddBase * dest, fastComm * comm){
112  using std::chrono::system_clock;
113  using std::chrono::duration_cast;
114  using std::chrono::milliseconds;
115 
116 
117  fastCommBuffer &buffer = comm->getResultBuffer();
118  size_t durationP;
119  size_t rSizeP;
120  size_t rStatP;
121  size_t headerSize;
122  procstat s;
123 
124  buffer.reset();
125  buffer << id;
126 
127  // Reserve space for the time duration
128  durationP = buffer.size();
129  buffer.advance(sizeof(size_t));
130 
131  rStatP = buffer.size();
132  buffer.advance(s);
133 
134  rSizeP = buffer.size();
135  buffer.advance(sizeof(size_t));
136 
137  headerSize = buffer.size();
138 
139  auto start = system_clock::now();
140  if (op & (OP_GENERICMAP | OP_GENERICREDUCE)){
141  this->apply(func, op, dest, buffer);
142 
143  //if (dest) buffer << size_t(localData->getSize());
144  if (dest) buffer << size_t(dest->getSize());
145  }
146  auto end = system_clock::now();
147  auto duration = duration_cast<milliseconds>(end - start);
148  //std::cerr << " ET:" << duration.count() << " ";
149 
150  buffer.writePos(size_t(duration.count()), durationP);
151  buffer.writePos(getProcStat(), rStatP);
152  buffer.writePos(size_t(buffer.size() - headerSize), rSizeP);
153 
154  comm->sendTaskResult();
155 
156 }
157 
158 
159 template class faster::workerFddCore<char>;
160 template class faster::workerFddCore<int>;
161 template class faster::workerFddCore<long int>;
162 template class faster::workerFddCore<float>;
163 template class faster::workerFddCore<double>;
164 
165 //template class faster::workerFddCore<char*>;
166 //template class faster::workerFddCore<int*>;
167 //template class faster::workerFddCore<long int*>;
168 //template class faster::workerFddCore<float*>;
169 //template class faster::workerFddCore<double*>;
170 
172 
178 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41
unsigned int fddType
Dataset type.
Definition: definitions.h:16