libfaster API Documentation  Development Version
Super fast distributted computing
hdfsEngine.cpp
1 #include <iostream>
2 #include <algorithm>
3 #include <cmath>
4 
5 #include <string>
6 #include <unordered_map>
7 
8 #include "hdfs.h"
9 #include "hdfsEngine.h"
10 
11 faster::hdfsEngine::hdfsEngine(){
12  _ready = false;
13  _fs = hdfsConnect("default", 0);
14  _ready = true;
15 }
16 faster::hdfsEngine::~hdfsEngine(){
17  hdfsDisconnect((hdfsFS) _fs);
18 }
19 
20 bool faster::hdfsEngine::isReady(){
21  return _ready;
22 }
23 
24 bool faster::hdfsEngine::isConnected(){
25  return (_fs != NULL);
26 }
27 
28 faster::hdfsFile faster::hdfsEngine::open(std::string path, fileMode mode){
29  return hdfsFile((hdfsFS) _fs, path, mode);
30 }
31 
32 void faster::hdfsEngine::close(faster::hdfsFile & f){
33  f.close();
34 }
35 
36 void faster::hdfsEngine::del(std::string path){
37  hdfsDelete((::hdfsFS)_fs, path.data(), 1);
38 }
39 
40 bool faster::hdfsEngine::exists(std::string path){
41  int r = hdfsExists((::hdfsFS) _fs, path.data());
42  return (r == 0);
43 }
44 
45 faster::hdfsFile::hdfsFile(void * fs, std::string & path, fileMode mode){
46  _fs = fs;
47  _path = path;
48  _f = hdfsOpenFile((::hdfsFS) fs, path.data(), mode,0,0,0);
49  _open = true;
50  _buffer.resize(128*1024);
51  _path = path;
52 }
53 faster::hdfsFile::~hdfsFile(){
54  close();
55 }
56 void faster::hdfsFile::close(){
57  if (_open){
58  hdfsCloseFile((::hdfsFS) _fs, (::hdfsFile) _f);
59  _open = false;
60  }
61 }
62 
63 size_t faster::hdfsFile::read(char * v, size_t n){
64  return hdfsRead((::hdfsFS)_fs, (::hdfsFile)_f, (void*) v, n);
65 }
66 
67 size_t faster::hdfsFile::write(char * v, size_t n){
68  return hdfsWrite((::hdfsFS)_fs, (::hdfsFile)_f, (void*) v, n);
69 }
70 
71 size_t faster::hdfsFile::seek(size_t offset){
72  return hdfsSeek((::hdfsFS)_fs, (::hdfsFile)_f, (tOffset) offset);
73 }
74 
75 size_t faster::hdfsFile::readLine(char * v, size_t n, char sep){
76  char buffer[1024];
77  int count = 0, m, k;
78  char c ;
79 
80  do{
81  k = 0;
82  m = read(buffer, std::min(1024, (int) n - count));
83  c = buffer[k];
84  // Find the separator in this chunk
85  while ((c != sep) && (k < m)){
86  k++;
87  c = buffer[k];
88  }
89 
90  std::copy_n(buffer, k, &v[count]);
91  count += k;
92  }while (c != sep);
93 
94  // Go back to the end of the line
95  int rest = m - k;
96  seek(count - rest);
97 
98  return count;
99 }
100 
101 std:: vector<std::deque<int>> faster::hdfsFile::getBlocksLocations(){
102  std::vector<std::deque<int>> loc;
103  std::unordered_map<std::string,int> hostMap;
104  //int i = 0;
105  int hostsMapped = 0;
106 
107  hdfsFileInfo * info = hdfsGetPathInfo((::hdfsFS)_fs, _path.data());
108 
109  std::cerr << "Getting Blocks Locations for: " << _path << " (" << info->mSize << " bytes)";
110  char *** blocksLoc = hdfsGetHosts((::hdfsFS)_fs, _path.data(), 0, info->mSize);
111  int numBlocks = std::ceil((float)info->mSize/info->mBlockSize);
112  loc.resize(numBlocks);
113 
114  std::cerr << ".\n";
115  for ( int i = 0; i < numBlocks ; i++){
116  //while ( blocksLoc[i] != NULL){
117  int j = 0;
118  char ** hostList = blocksLoc[i];
119  std::cerr << " (" << i << ") ";
120  while ( hostList[j] != NULL ){
121  std::cerr << hostList[j] << " ";
122  auto search = hostMap.find(hostList[j]);
123  if ( search == hostMap.end()){
124  hostMap[hostList[j]] = hostsMapped ++;
125  }
126  auto & locList = loc[i];
127  locList.insert(locList.end(), hostMap[hostList[j]]);
128  j++;
129  }
130  //i++;
131  std::cerr << "\n";
132  }
133  for ( int i = 0; i < numBlocks ; i++){
134  std::cerr << " (" << i << ") ";
135  for (auto it = loc[i].begin() ; it != loc[i].end(); it++){
136  std::cerr << *it << " ";
137  }
138  std::cerr << "\n";
139  }
140 
141 
142  hdfsFreeFileInfo(info,1);
143  hdfsFreeHosts(blocksLoc);
144 
145  return loc;
146 }
147 
148 void faster::hdfsFile::del(){
149  close();
150  hdfsDelete((::hdfsFS)_fs, _path.data(), 1);
151 }