libfaster API Documentation  Development Version
Super fast distributted computing
workerFddGroup.cpp
1 #include <chrono>
2 #include <omp.h>
3 #include <algorithm>
4 #include <chrono>
5 
6 #include "workerFddGroup.h"
7 #include "workerFdd.h"
8 #include "fastComm.h"
9 #include "indexedFddStorageExtern.cpp"
10 #include "misc.h"
11 
12  using std::chrono::system_clock;
13  using std::chrono::duration_cast;
14  using std::chrono::milliseconds;
15 
16 template <typename K>
17 faster::workerFddGroup<K>::workerFddGroup(unsigned long int id, fddType keyT, std::vector< workerFddBase* > & members): workerFddBase(id, Null){
18  keyType = keyT;
19  this->members = members;
20 }
21 
22 
23 template <typename K>
24 std::vector< std::vector<void*>> faster::workerFddGroup<K>::findKeyInterval(size_t i){
25  //std::cerr << "FindKeyIntervalG\n";
26  //auto start = system_clock::now();
27  faster::workerFddBase * wfdd = members[i];
28  char * data = (char*) wfdd->getData();
29  K * keys = (K *) wfdd->getKeys();
30  size_t fddSize = wfdd->getSize();
31  int baseSize = wfdd->baseSize();
32 
33  std::unordered_map<K, int> keyCount(wfdd->getSize()*1.2);
34  for ( size_t i = 0; i < fddSize; ++i){
35  keyCount[keys[i]] ++;
36  }
37 
38  //auto t0 = duration_cast<milliseconds>(system_clock::now() - start).count();
39  //auto start2 = system_clock::now();
40 
41  std::unordered_map<K, std::vector<void*>> keyLocations(keyCount.size()*1.2);
42  for ( auto it = keyCount.cbegin(); it != keyCount.end(); ++it ){
43  keyLocations[it->first].reserve(it->second);
44  }
45  keyCount.clear();
46 
47  //auto t1 = duration_cast<milliseconds>(system_clock::now() - start2).count();
48  //start2 = system_clock::now();
49 
50  for ( size_t i = 0; i < fddSize; ++i){
51  K & key = keys[i];
52  void * d = &data[baseSize * i];
53  auto locs = keyLocations.find(key);
54  if (locs != keyLocations.end()){
55  locs->second.insert(locs->second.end(), d);
56  }
57  }
58 
59  //auto t3 = duration_cast<milliseconds>(system_clock::now() - start2).count();
60  //start2 = system_clock::now();
61 
62  std::vector<std::vector<void*>> keyLocationsV(uKeys->size());
63 
64  //std::cerr << "FKI[" ;
65  // Move Data locations to the result vector
66  for ( size_t i = 0; i < uKeys->size() ; ++i){
67  auto l = keyLocations.find((*uKeys)[i]);
68 
69  if ( l == keyLocations.end() ){
70  keyLocationsV[i] = {};
71  }else{
72  //std::cerr << (*uKeys)[i] << ":" << l->second.size() << " ";
73  //keyLocationsV[i] = std::move(l->second);
74  keyLocationsV[i] = l->second;
75  }
76  }
77  //std::cerr << "]FKI\n" ;
78  //std::cerr << "T1:" << t1 << " TOT:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
79  //std::cerr << "FindKeyInterval T0:" << t0 << " t1:" << t1 << " t2:" << t2 << " t3:" << t3 << " " << duration_cast<milliseconds>(system_clock::now() - start).count() << "ms\n";
80  //std::cerr << "FindKeyIntervalG DONE\n";
81 
82  return keyLocationsV;
83 }
84 
85 template <typename K>
86 std::vector< std::vector<void*> > * faster::workerFddGroup<K>::getMemberKeyLocations(size_t i){
87  std::vector< std::vector<void*> > * location;
88 
89  location = members[i]->getKeyLocations();
90 
91  if ( location->size() == 0 ){
92  //std::cerr << " getKeyLocations ("<<i<<")\n";
93  (*location) = findKeyInterval(i);
94  //auto loc = findKeyInterval(i);
95  //location->resize(loc.size());
96 
97  //for ( size_t j = 0; j < loc.size(); j++ ){
98  //(*location)[j] = std::move(loc[j]);
99  //loc[j].clear();
100  //}
101  //loc.clear();
102  }
103 
104  return location;
105 }
106 
107 template <typename K>
109 
110  //std::cerr << "updateByKey KeyLocations\n";
111  //std::unordered_map<K, std::pair<void*, size_t>> keyLocations[3];
112  std::vector< std::vector<void*> > * keyLocations[3];
113 
114  for ( size_t i = 0; i < members.size(); ++i){
115  keyLocations[i] = getMemberKeyLocations(i);
116  }
117  //std::cerr << "updateByKey Run\n";
118 
119  if ( members.size() < 3 ){
120  #pragma omp parallel for
121  for ( size_t i = 0; i < uKeys->size(); ++i){
122  ((updateByKeyG2FunctionP<K>) func)
123  ((*uKeys)[i],
124  (*keyLocations[0])[i],
125  (*keyLocations[1])[i]);
126  }
127  }else{
128  #pragma omp parallel for
129  for ( size_t i = 0; i < uKeys->size(); ++i){
130  ((updateByKeyG3FunctionP<K>) func)
131  ((*uKeys)[i],
132  (*keyLocations[0])[i],
133  (*keyLocations[1])[i],
134  (*keyLocations[2])[i] );
135  }
136  }
137 }
138 
139 template <typename K>
140 void faster::workerFddGroup<K>::bulkUpdate(void * func){
141  if ( members.size() < 3 ){
142  ((bulkUpdateG2FunctionP<K>) func)
143  (
144  (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
145  (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize()
146  );
147  }else{
148  ((bulkUpdateG3FunctionP<K>) func)
149  (
150  (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
151  (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize(),
152  (K*) members[2]->getKeys(), members[2]->getData(), members[2]->getSize()
153  );
154  }
155 }
156 
157 
158 template <typename K>
159 //template <typename U, typename T0, typename T1, typename T2>
160 template <typename U>
161 void faster::workerFddGroup<K>::mapByKey(workerFddBase * dest, void * func){
162  //std::unordered_map<K, std::pair<void*, size_t>> keyLocations[3];
163  std::vector< std::vector<void*> > * keyLocations[3];
164 
165  //std::cerr << "START " << id << " " << s << " ";
166 
167  for ( size_t i = 0; i < members.size(); ++i){
168  keyLocations[i] = getMemberKeyLocations(i);
169  }
170  dest->setSize(uKeys->size());
171  U * od = (U*) dest->getData();
172 
173  if ( members.size() < 3 ){
174  #pragma omp parallel for
175  for ( size_t i = 0; i < uKeys->size(); ++i){
176  od[i] = ((mapByKeyG2FunctionP<K,U>) func)
177  ( (*uKeys)[i],
178  (*keyLocations[0])[i],
179  (*keyLocations[1])[i]);
180 
181  }
182  }else{
183  #pragma omp parallel for
184  for ( size_t i = 0; i < uKeys->size(); ++i){
185  od[i] = ((mapByKeyG3FunctionP<K,U>) func)
186  ( (*uKeys)[i],
187  (*keyLocations[0])[i],
188  (*keyLocations[1])[i],
189  (*keyLocations[2])[i] );
190 
191  }
192  }
193  //std::cerr << "END ";
194 }
195 
196 template <typename K>
197 //template <typename L, typename U, typename T0, typename T1, typename T2>
198 template <typename L, typename U>
199 void faster::workerFddGroup<K>::mapByKeyI(workerFddBase * dest, void * func){
200  //std::cerr << " \033[0;33mMap By Key\033[0m\n";
201  //std::unordered_map<K, std::pair<void*, size_t>> keyLocations[3];
202  std::vector< std::vector<void*> > * keyLocations[3];
203 
204 
205  //std::cerr << " FindKeyInterval ";
206  for ( size_t i = 0; i < members.size(); ++i){
207  keyLocations[i] = getMemberKeyLocations(i);
208  }
209  //std::cerr << "\n";
210  //std::cerr << " SetSize: " << uKeys->size() << "\n";
211 
212  dest->setSize(uKeys->size());
213  L * ok = (L*) dest->getKeys();
214  U * od = (U*) dest->getData();
215 
216 
217  //std::cerr << " Run\n";
218  if ( members.size() < 3 ){
219  #pragma omp parallel for
220  for ( size_t i = 0; i < uKeys->size(); ++i){
221  std::pair<L,U> r = ( (ImapByKeyG2FunctionP<K,L,U>) func)
222  ( (*uKeys)[i],
223  (*keyLocations[0])[i],
224  (*keyLocations[1])[i] );
225  ok[i] = r.first;
226  od[i] = r.second;
227 
228  }
229  }else{
230  #pragma omp parallel for
231  for ( size_t i = 0; i < uKeys->size(); ++i){
232  std::pair<L,U> r = ( (ImapByKeyG3FunctionP<K,L,U>) func )
233  ( (*uKeys)[i],
234  (*keyLocations[0])[i],
235  (*keyLocations[1])[i],
236  (*keyLocations[2])[i] );
237  ok[i] = r.first;
238  od[i] = r.second;
239 
240  }
241  }
242 
243  //std::cerr << " DONE\n";
244  //std::cerr << "END ";
245 }
246 
247 template <typename K>
248 //template <typename U, typename T0, typename T1, typename T2>
249 template <typename U>
250 void faster::workerFddGroup<K>::flatMapByKey(workerFddBase * dest, void * func){
251  //std::unordered_map<K, std::pair<void*, size_t>> keyLocations[3];
252  std::vector< std::vector<void*> > * keyLocations[3];
253  std::deque<U> resultList;
254 
255  for ( size_t i = 0; i < members.size(); ++i){
256  keyLocations[i] = getMemberKeyLocations(i);
257  }
258 
259  if ( members.size() < 3 ){
260  #pragma omp parallel
261  {
262  std::deque<U> pResultList;
263 
264  #pragma omp for
265  for ( size_t i = 0; i < uKeys->size(); ++i){
266  std::deque<U> r = ((flatMapByKeyG2FunctionP<K,U>) func)
267  ( (*uKeys)[i],
268  (*keyLocations[0])[i],
269  (*keyLocations[1])[i] );
270 
271 
272  if (r.size() > 0)
273  pResultList.insert(pResultList.end(), r.begin(), r.end());
274  }
275 
276  #pragma omp critical
277  resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
278  }
279  }else{
280  #pragma omp parallel
281  {
282  std::deque<U> pResultList;
283 
284  #pragma omp for
285  for ( size_t i = 0; i < uKeys->size(); ++i){
286  std::deque<U> r = ((flatMapByKeyG3FunctionP<K,U>) func)
287  ( (*uKeys)[i],
288  (*keyLocations[0])[i],
289  (*keyLocations[1])[i],
290  (*keyLocations[2])[i] );
291 
292 
293  if (r.size() > 0)
294  pResultList.insert(pResultList.end(), r.begin(), r.end());
295 
296  }
297 
298  #pragma omp critical
299  resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
300  }
301  }
302  dest->insertl(&resultList);
303  //std::cerr << "END ";
304 }
305 
306 template <typename K>
307 //template <typename L, typename U, typename T0, typename T1, typename T2>
308 template <typename L, typename U>
309 void faster::workerFddGroup<K>::flatMapByKeyI(workerFddBase * dest, void * func){
310  //std::unordered_map<K, std::pair<void*, size_t>> keyLocations[3];
311  std::vector< std::vector<void*> > * keyLocations[3];
312  std::deque<std::pair<L,U>> resultList;
313 
314  //std::cerr << "START " << id << " S:" << uKeys->size() << " \n";
315 
316 
317  //std::cerr << "KL ";
318  for ( size_t i = 0; i < members.size(); ++i){
319  keyLocations[i] = getMemberKeyLocations(i);
320  }
321 
322  if ( members.size() <3 ){
323  #pragma omp parallel
324  {
325  std::deque<std::pair<L,U>> pResultList;
326 
327  #pragma omp for
328  for ( size_t i = 0; i < uKeys->size(); ++i){
329  std::deque<std::pair<L,U>> r = ( (IflatMapByKeyG2FunctionP<K,L,U>) func)
330  ( (*uKeys)[i],
331  (*keyLocations[0])[i],
332  (*keyLocations[1])[i] );
333 
334 
335  if (r.size() > 0)
336  pResultList.insert(pResultList.end(), r.begin(), r.end());
337  }
338 
339  #pragma omp critical
340  resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
341  }
342  }else{
343  #pragma omp parallel
344  {
345  std::deque<std::pair<L,U>> pResultList;
346 
347  #pragma omp for
348  for ( size_t i = 0; i < uKeys->size(); ++i){
349  std::deque<std::pair<L,U>> r = ( (IflatMapByKeyG3FunctionP<K,L,U>) func )
350  ( (*uKeys)[i],
351  (*keyLocations[0])[i],
352  (*keyLocations[1])[i],
353  (*keyLocations[2])[i] );
354 
355 
356  if (r.size() > 0)
357  pResultList.insert(pResultList.end(), r.begin(), r.end());
358  }
359 
360  #pragma omp critical
361  resultList.insert(resultList.end(), pResultList.begin(), pResultList.end());
362  }
363 
364  }
365  dest->insertl(&resultList);
366 
367 
368  //std::cerr << "END ";
369 }
370 
371 template <typename K>
372 template <typename U>
373 void faster::workerFddGroup<K>::bulkFlatMap(workerFddBase * dest, void * func){
374  std::deque<U> resultList;
375 
376  if ( members.size() < 3 ){
377  resultList = ( (bulkFlatMapG2FunctionP<K,U>) func )
378  (
379  (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
380  (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize()
381  );
382  }else{
383  resultList = ( (bulkFlatMapG3FunctionP<K,U>) func )
384  (
385  (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
386  (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize(),
387  (K*) members[2]->getKeys(), members[2]->getData(), members[2]->getSize()
388  );
389  }
390 
391  dest->insertl(&resultList);
392 }
393 
394 template <typename K>
395 template <typename L, typename U>
396 void faster::workerFddGroup<K>::bulkFlatMapI(workerFddBase * dest, void * func){
397  std::deque<std::pair<L,U>> resultList;
398 
399  if ( members.size() < 3 ){
400  resultList = ( (IbulkFlatMapG2FunctionP<K,L,U>) func )
401  (
402  (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
403  (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize()
404  );
405  }else{
406  resultList = ( (IbulkFlatMapG3FunctionP<K,L,U>) func )
407  (
408  (K*) members[0]->getKeys(), members[0]->getData(), members[0]->getSize(),
409  (K*) members[1]->getKeys(), members[1]->getData(), members[1]->getSize(),
410  (K*) members[2]->getKeys(), members[2]->getData(), members[2]->getSize()
411  );
412  }
413 
414  dest->insertl(&resultList);
415  //std::cerr << " NewSize = " << dest->getSize() << "\n";
416 }
417 
418 template <typename K>
419 //template <typename U, typename T0, typename T1, typename T2>
420 template <typename U>
421 void faster::workerFddGroup<K>::_apply(void * func, fddOpType op, workerFddBase * dest){
422  switch (op){
423  case OP_MapByKey:
424  //std::cerr << " MapByKey \n";
425  mapByKey<U>(dest, func);
426  break;
427  case OP_FlatMapByKey:
428  //std::cerr << " FlatMapByKey \n";
429  flatMapByKey<U>(dest, func);
430  break;
431  case OP_BulkFlatMap:
432  //std::cerr << " BulkFlatMap \n";
433  bulkFlatMap<U>(dest, func);
434  break;
435  }
436 }
437 
438 template <typename K>
439 //template <typename L, typename U, typename T0, typename T1, typename T2>
440 template <typename L, typename U>
441 void faster::workerFddGroup<K>::_applyI(void * func, fddOpType op, workerFddBase * dest){
442  switch (op){
443  case OP_MapByKey:
444  //std::cerr << " MapByKeyI \n";
445  mapByKeyI<L,U>(dest, func);
446  break;
447  case OP_FlatMapByKey:
448  //std::cerr << " FlatMapByKeyI \n";
449  flatMapByKeyI<L,U>(dest, func);
450  break;
451  case OP_BulkFlatMap:
452  //std::cerr << " BulkFlatMapI \n";
453  bulkFlatMapI<L,U>(dest, func);
454  break;
455  }
456 }
457 
458 template <typename K>
459 //template <typename T0, typename T1, typename T2>
460 void faster::workerFddGroup<K>::_applyReduce(void * func UNUSED, fddOpType op UNUSED, fastCommBuffer & buffer){
461  void * r = NULL;
462  size_t rSize = sizeof(r);
463 
464  switch (op){
465  case OP_UpdateByKey:
466  //std::cerr << " Update \n";
467  updateByKey(func);
468  break;
469  case OP_BulkUpdate:
470  //std::cerr << " BulkUpdate \n";
471  bulkUpdate(func);
472  break;
473  /*case OP_Reduce:
474  r = reduce( ( reduceGFunctionP<T> ) func);
475  //std::cerr << "Reduce ";
476  break;
477  case OP_BulkReduce:
478  r = bulkReduce( ( bulkReduceGFunctionP<T> ) func);
479  //std::cerr << "BulkReduce ";
480  break;*/
481  }
482 
483  if (op & OP_GENERICREDUCE) buffer.write(r,rSize);
484 }
485 
486 template <typename K>
487 void faster::workerFddGroup<K>::_preApply(void * func, fddOpType op, workerFddBase * dest){
488  switch (dest->getType()){
489  case Char: _apply<char>(func, op, dest); break;
490  case Int: _apply<int>(func, op, dest); break;
491  case LongInt: _apply<long int>(func, op, dest); break;
492  case Float: _apply<float>(func, op, dest); break;
493  case Double: _apply<double>(func, op, dest); break;
494  case String: _apply<std::string>(func, op, dest); break;
495  case CharV: _apply<std::vector<char>>(func, op, dest); break;
496  case IntV: _apply<std::vector<int>>(func, op, dest); break;
497  case LongIntV: _apply<std::vector<long int>>(func, op, dest); break;
498  case FloatV: _apply<std::vector<float>>(func, op, dest); break;
499  case DoubleV: _apply<std::vector<double>>(func, op, dest); break;
500  }
501 }
502 
503 template <typename K>
504 template <typename L>
505 void faster::workerFddGroup<K>::_preApplyI(void * func, fddOpType op, workerFddBase * dest){
506  switch (dest->getType()){
507  case Null: break;
508  case Char: _applyI<L, char> (func, op, dest); break;
509  case Int: _applyI<L, int> (func, op, dest); break;
510  case LongInt: _applyI<L, long int> (func, op, dest); break;
511  case Float: _applyI<L, float> (func, op, dest); break;
512  case Double: _applyI<L, double> (func, op, dest); break;
513  case String: _applyI<L, std::string> (func, op, dest); break;
514  case CharV: _applyI<L, std::vector<char>> (func, op, dest); break;
515  case IntV: _applyI<L, std::vector<int>> (func, op, dest); break;
516  case LongIntV: _applyI<L, std::vector<long int>>(func, op, dest); break;
517  case FloatV: _applyI<L, std::vector<float>> (func, op, dest); break;
518  case DoubleV: _applyI<L, std::vector<double>> (func, op, dest); break;
519  }
520 }
521 
522 
523 template <typename K>
524 inline void faster::workerFddGroup<K>::apply(void * func, fddOpType op, workerFddBase * dest, fastCommBuffer & buffer){
525  if (op & OP_GENERICMAP){
526  switch (dest->getKeyType()){
527  case Null: _preApply(func, op, dest); break;
528  case Char: _preApplyI<char>(func, op, dest); break;
529  case Int: _preApplyI<int>(func, op, dest); break;
530  case LongInt: _preApplyI<long int>(func, op, dest); break;
531  case Float: _preApplyI<float>(func, op, dest); break;
532  case Double: _preApplyI<double>(func, op, dest); break;
533  case String: _preApplyI<std::string>(func, op, dest); break;
534  }
535  }else{
536  _applyReduce(func, op, buffer);
537  }
538 }
539 
540 template <typename K>
541 void faster::workerFddGroup<K>::cogroup(fastComm *comm){
542  //std::cerr << " Cogroup";
543  //auto start = system_clock::now();
544  fastCommBuffer &buffer = comm->getResultBuffer();
545 
546  //std::cerr << " RecvKeyMap\n";
547  //comm->recvCogroupData(tid, keyMap, regroup);
548  //std::cerr << " RcvUK:" << duration_cast<milliseconds>(system_clock::now() - start).count() << " ";
549  //start = system_clock::now();
550 
551  //std::cerr << " My Keys: ";
552  this->uKeys = *(std::shared_ptr<std::vector<K>>*) members[0]->getUKeys();
553  this->keyMap = *(std::shared_ptr<std::unordered_map<K, int>>*) members[0]->getKeyMap();
554  //for( auto it = keyMap.begin(); it != keyMap.end(); it++ ){
555  // if ( it->second == comm->getProcId() ){
556  // //std::cerr << "\033[0;31m";
557  // uKeys->insert(uKeys->end(), it->first);
558  // }
559  // //std::cerr << it->first << "\033[0m ";
560  //}
561  //std::cerr << " SaveUK:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
562  //start = system_clock::now();
563  //std::cerr << "\n";
564 
565  //uKeys->shrink_to_fit();
566 
567  //std::sort(uKeys->begin(), uKeys->end());
568 
569  //std::cerr << " Exchange Data By Key: ";
570  for ( size_t i = 1; i < members.size(); ++i){
571  auto memberUKeys = *(std::shared_ptr<std::vector<K>>*) members[i]->getUKeys();
572  if ( memberUKeys != uKeys ){
573  //std::cerr << i << " ";
574  members[i]->setUKeys( & this->uKeys );
575  members[i]->setKeyMap( & this->keyMap );
576 
577  members[i]->exchangeDataByKey(comm);
578  //std::cerr << " REGROUP " << i << "\n";
579  //std::cerr << "Tx:" << duration_cast<milliseconds>(system_clock::now() - start).count() << " ";
580  //start = system_clock::now();
581  }
582  buffer << members[i]->getSize();
583  }
584  //std::cerr << "\n";
585  //std::cerr << " EDBK:" << duration_cast<milliseconds>(system_clock::now() - start).count() << "\n";
586 }
587 
588 template <typename K>
589 void faster::workerFddGroup<K>::preapply(unsigned long int id, void * func, fddOpType op, workerFddBase * dest, fastComm * comm){
590  using std::chrono::system_clock;
591  using std::chrono::duration_cast;
592  using std::chrono::milliseconds;
593 
594  fastCommBuffer &buffer = comm->getResultBuffer();
595  size_t durationP;
596  size_t rSizeP;
597  size_t rStatP;
598  size_t headerSize;
599  size_t ret = 0;
600  procstat s;
601 
602  buffer.reset();
603  buffer << id;
604 
605  // Reserve space for the time duration
606  durationP = buffer.size();
607  buffer.advance(sizeof(size_t));
608 
609  rStatP = buffer.size();
610  buffer.advance(s);
611 
612  rSizeP = buffer.size();
613  buffer.advance(sizeof(size_t));
614 
615  headerSize = buffer.size();
616 
617  auto start = system_clock::now();
618  if (op & (OP_GENERICMAP | OP_GENERICREDUCE | OP_GENERICUPDATE)){
619  apply(func, op, dest, buffer);
620 
621  //std::cerr << " RSIZE:" << size_t(dest->getSize());
622  if (dest) buffer << size_t(dest->getSize());
623  }else{
624  if (op == OP_CoGroup){
625  cogroup(comm);
626  }
627  buffer << ret;
628  }
629  auto end = system_clock::now();
630  auto duration = duration_cast<milliseconds>(end - start);
631  //std::cerr << " ET:" << duration.count();
632 
633  buffer.writePos(size_t(duration.count()), durationP);
634  buffer.writePos(getProcStat(), rStatP);
635  buffer.writePos(size_t(buffer.size() - headerSize), rSizeP);
636 
637  comm->sendTaskResult();
638 }
639 
640 
unsigned int fddOpType
Dataset operation type.
Definition: definitions.h:41