libfaster API Documentation  Development Version
Super fast distributted computing
workerIFdd.cpp
1 #include <iostream>
2 #include <tuple>
3 #include <set>
4 
5 #include "indexedFddStorageExtern.cpp"
6 #include "fastComm.h"
7 #include "_workerIFdd.h"
8 
9 // UPDATE
10 template <typename K, typename T>
11 char faster::_workerIFdd<K,T>::update(updateIFunctionP<K,T> updateFunc){
12  T * d = this->localData->getData();
13  K * k = this->localData->getKeys();
14  size_t s = this->localData->getSize();
15  char ret = 0;
16 
17  #pragma omp parallel for
18  for ( size_t i = 0; i < s; ++i){
19  updateFunc(k[i], d[i]);
20  }
21  return ret;
22 }
23 
24 // REDUCE
25 template <typename K, typename T>
26 std::pair<K,T> faster::_workerIFdd<K,T>::reduce(IreduceIFunctionP<K,T> reduceFunc){
27  T * d = this->localData->getData();
28  std::pair<K,T> result;
29  size_t s = this->localData->getSize();
30  K * ik = this->localData->getKeys();
31  //std::cerr << "START " << id << " " << s << " | ";
32 
33  #pragma omp parallel
34  {
35  int nT = omp_get_num_threads();
36  size_t tN = omp_get_thread_num();
37  std::pair<K,T> partResult = {};
38 
39  if ( tN < s ) {
40  partResult = {ik[tN], d[tN] };
41  }
42 
43  #pragma omp for
44  for (size_t i = nT; i < s; ++i){
45  partResult = reduceFunc(partResult.first, partResult.second, ik[i], d[i]);
46  }
47  #pragma omp master
48  result = partResult;
49 
50  #pragma omp barrier
51 
52  if ( (omp_get_thread_num() != 0) && ( tN < s ) ){
53  #pragma omp critical
54  result = reduceFunc(result.first, result.second, partResult.first, partResult.second);
55  }
56  }
57  return result;
58 }
59 
60 
61 template <typename K, typename T>
62 std::pair<K,T> faster::_workerIFdd<K,T>::bulkReduce (IbulkReduceIFunctionP<K,T> bulkReduceFunc){
63  K * ik = this->localData->getKeys();
64  return bulkReduceFunc(ik, (T*) this->localData->getData(), this->localData->getSize());
65 }
66 
67 
68 
69 template <typename K, typename T>
70 void faster::_workerIFdd<K,T>::applyIndependent(void * func, fddOpType op, fastCommBuffer & buffer){
71  char rc;
72  std::pair<K,T> rp;
73 
74  switch (op){
75  case OP_Update:
76  //std::cerr << " Update IFDD\n";
77  rc = update( ( updateIFunctionP<K,T> ) func );
78  buffer << rc;
79  break;
80  case OP_Reduce:
81  //std::cerr << " Reduce IFDD\n";
82  rp = reduce( ( IreduceIFunctionP<K,T> ) func );
83  buffer << rp;
84  break;
85  case OP_BulkReduce:
86  //std::cerr << " BulkReduce IFDD\n";
87  rp = bulkReduce( ( IbulkReduceIFunctionP<K,T> ) func );
88  buffer << rp;
89  break;
90  }
91 
92 }
93 
94 
95 
96 // -------------------------- Public Functions ------------------------ //
97 
98 
99 
100 template <typename K, typename T>
101 void faster::_workerIFdd<K,T>::setData(K * keys, T * data, size_t size) {
102  this->localData->setData( keys, data, size);
103 }
104 
105 template <typename K, typename T>
106 void faster::_workerIFdd<K,T>::setDataRaw(void * keys, void * data, size_t size){
107  this->localData->setDataRaw(keys, data, size);
108 }
109 
110 
111 template <typename K, typename T>
112 void faster::_workerIFdd<K,T>::insert(void * k, void * in, size_t s UNUSED){
113  this->localData->insert(*(K *) k, *(T*) in);
114 }
115 template <typename K, typename T>
116 void faster::_workerIFdd<K,T>::insertl(void * in){
117  insert( *(std::deque<std::pair<K,T>>*) in);
118 }
119 
120 
121 
122 
123 template <typename K, typename T>
124 void faster::_workerIFdd<K,T>::insert(K & key, T & in){
125  this->localData->insert(key, in);
126 }
127 
128 template <typename K, typename T>
129 void faster::_workerIFdd<K,T>::insert(std::deque< std::pair<K, T> > & in){
130 
131  if (this->localData->getSize() < in.size())
132  this->localData->grow(in.size());
133 
134  for ( auto it = in.begin(); it != in.end(); it++){
135  this->localData->insert(it->first, it->second);
136  }
137 }
138 
139 
140 
141 template <typename K, typename T>
142 void faster::_workerIFdd<K,T>::apply(void * func, fddOpType op, workerFddBase * dest, fastCommBuffer & buffer){
143  if (op & (OP_GENERICMAP)){
144  applyDependent(func, op, dest);
145  }else{
146  applyIndependent(func, op, buffer);
147  }
148 }
149 
150 
151 
152 template <typename K, typename T>
153 void faster::_workerIFdd<K,T>::collect(fastComm * comm) {
154  comm->sendFDDDataCollect(this->id, this->localData->getKeys(), this->localData->getData(), this->localData->getSize());
155 };
156 
157 
158