dxfile.h
Go to the documentation of this file.
1 // Copyright (C) 2013-2016 DNAnexus, Inc.
2 //
3 // This file is part of dx-toolkit (DNAnexus platform client libraries).
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License"); you may
6 // not use this file except in compliance with the License. You may obtain a
7 // copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 // License for the specific language governing permissions and limitations
15 // under the License.
16 
22 #ifndef DXCPP_BINDINGS_DXFILE_H
23 #define DXCPP_BINDINGS_DXFILE_H
24 
25 #include <fstream>
26 #include <sstream>
27 #include <boost/thread.hpp>
28 #include "../bqueue.h"
29 #include "../bindings.h"
30 #include "../utils.h"
31 
32 namespace dx {
34 
80  class DXFile: public DXDataObject {
81  private:
82  dx::JSON describe_(const std::string &s)const{return fileDescribe(dxid_,s);}
83  void addTypes_(const std::string &s)const{fileAddTypes(dxid_,s);}
84  void removeTypes_(const std::string &s)const{fileRemoveTypes(dxid_,s);}
85  dx::JSON getDetails_(const std::string &s)const{return fileGetDetails(dxid_,s);}
86  void setDetails_(const std::string &s)const{fileSetDetails(dxid_,s);}
87  void setVisibility_(const std::string &s)const{fileSetVisibility(dxid_,s);}
88  void rename_(const std::string &s)const{fileRename(dxid_,s);}
89  void setProperties_(const std::string &s)const{fileSetProperties(dxid_,s);}
90  void addTags_(const std::string &s)const{fileAddTags(dxid_,s);}
91  void removeTags_(const std::string &s)const{fileRemoveTags(dxid_,s);}
92  void close_(const std::string &s)const{fileClose(dxid_,s);}
93  dx::JSON listProjects_(const std::string &s)const{return fileListProjects(dxid_,s);}
94 
95  // For async write() ///////////////////
96  void joinAllWriteThreads_();
97  void writeChunk_();
98  void createWriteThreads_();
100 
101  // For linear query ///////////////////////////////////////////////
102  void readChunk_() const;
103  void getChunkHttp_(int64_t start, int64_t end, std::string& result) const;
105 
110  bool hasAnyPartBeenUploaded;
111 
117  int64_t pos_;
118 
122  int64_t gcount_;
123 
128  int64_t file_length_;
129 
134  std::stringstream buffer_;
135 
140  int cur_part_;
141 
146  bool eof_;
147 
158  mutable bool is_closed_;
159 
160  void reset_data_processing_();
161  void reset_config_variables_();
162  void reset_everything_();
163  void copy_config_variables_(const DXFile &to_copy);
164  void init_internals_();
165 
166  int64_t max_buf_size_;
167  int max_write_threads_;
168 
169  // To allow interleaving (without compiler optimization possibly changing order)
170  // we use std::atomic (a c++11 feature)
171  // Ref https://parasol.tamu.edu/bjarnefest/program/boehm-slides.pdf (page 7)
172  // Update: Since CLang does not support atomics yet, we are using locking
173  // mechanism with alongwith volatile
174  volatile int countThreadsWaitingOnConsume, countThreadsNotWaitingOnConsume;
175  boost::mutex countThreadsMutex;
176  std::vector<boost::thread> writeThreads;
177  static const int DEFAULT_WRITE_THREADS = 5;
178  static const int64_t DEFAULT_BUFFER_MAXSIZE = 100 * 1024 * 1024; // 100 MB
179  BlockingQueue<std::pair<std::string, int> > uploadPartRequestsQueue;
180 
181  // For linear query
182  mutable std::map<int64_t, std::string> lq_results_;
183  mutable int64_t lq_chunk_limit_;
184  mutable int64_t lq_query_start_;
185  mutable int64_t lq_query_end_;
186  mutable unsigned lq_max_chunks_;
187  mutable int64_t lq_next_result_;
188  mutable std::string lq_url;
189  mutable dx::JSON lq_headers;
190  mutable std::vector<boost::thread> lq_readThreads_;
191  mutable boost::mutex lq_results_mutex_, lq_query_start_mutex_;
192 
193  public:
194 
195  DXFile(): DXDataObject() {
196  reset_everything_();
197  }
198 
202  DXFile(const DXFile& to_copy) : DXDataObject() {
203  reset_everything_();
204  setIDs(to_copy.dxid_, to_copy.proj_);
205  copy_config_variables_(to_copy);
206  }
207 
214  DXFile(const char *dxid, const char *proj=NULL): DXDataObject() {
215  reset_everything_();
216  setIDs(std::string(dxid), (proj == NULL) ? config::CURRENT_PROJECT() : std::string(proj));
217  }
218 
225  DXFile(const std::string &dxid, const std::string &proj=config::CURRENT_PROJECT()): DXDataObject() {
226  reset_everything_();
227  setIDs(dxid, proj);
228  }
229 
237  DXFile(const dx::JSON &dxlink): DXDataObject() {
238  reset_everything_();
239  setIDs(dxlink);
240  }
241 
248  DXFile& operator=(const DXFile& to_copy) {
249  if (this == &to_copy)
250  return *this;
251 
252  this->setIDs(to_copy.dxid_, to_copy.proj_); // setIDs() will call reset_data_processing_() & init_internals_()
253  this->copy_config_variables_(to_copy);
254  return *this;
255  }
256 
257  ~DXFile() {
258  flush();
259  stopLinearQuery();
260  }
261  // File-specific functions
262 
270  void setIDs(const std::string &dxid, const std::string &proj=config::CURRENT_PROJECT());
271 
279  void setIDs(const char *dxid, const char *proj = NULL);
280 
289  void setIDs(const dx::JSON &dxlink);
290 
301  void create(const std::string &media_type="",
302  const dx::JSON &data_obj_fields=dx::JSON(dx::JSON_OBJECT));
303 
310  int64_t getMaxBufferSize() const {
311  return max_buf_size_;
312  }
313 
321  void setMaxBufferSize(const int64_t buf_size) {
322  if (buf_size < (5 * 1024 * 1024)) {
323  throw DXFileError("Maximum buffer size for DXFile must be >= 5242880 (5MB)");
324  }
325  max_buf_size_ = buf_size;
326  }
327 
334  int getNumWriteThreads() const {
335  return max_write_threads_;
336  }
337 
344  void setNumWriteThreads(const int numThreads) {
345  max_write_threads_ = numThreads;
346  }
347 
359  void read(char* ptr, int64_t n);
360 
364  int64_t gcount() const;
365 
372  bool eof() const;
373 
385  void seek(const int64_t pos);
386 
399  void flush();
400 
419  void write(const char* ptr, int64_t n);
420 
432  void write(const std::string &data);
433 
444  void uploadPart(const std::string &data, const int index=-1);
445 
462  void uploadPart(const char* ptr, int64_t n, const int index=-1);
463 
467  bool is_open() const;
468 
472  bool is_closed() const;
473 
484  void close(const bool block=false);
485 
489  void waitOnClose() const;
490 
508  void startLinearQuery(const int64_t start_byte=0,
509  const int64_t num_bytes=-1,
510  const int64_t chunk_size=10*1024*1024,
511  const unsigned max_chunks=20,
512  const unsigned thread_count=5) const;
513 
522  void stopLinearQuery() const;
523 
541  bool getNextChunk(std::string &chunk) const;
542 
549  static DXFile openDXFile(const std::string &dxid);
550 
562  static DXFile newDXFile(const std::string &media_type="",
563  const dx::JSON &data_obj_fields=
564  dx::JSON(dx::JSON_OBJECT));
565 
579  static void downloadDXFile(const std::string &dxid,
580  const std::string &filename,
581  int64_t chunksize=1048576);
582 
600  static DXFile uploadLocalFile(const std::string &filename,
601  const std::string &media_type="",
602  const dx::JSON &data_obj_fields=
603  dx::JSON(dx::JSON_OBJECT),
604  bool waitForClose=false);
605 
614  DXFile clone(const std::string &dest_proj_id,
615  const std::string &dest_folder="/") const;
616 
621  };
622 }
623 #endif
void setIDs(const std::string &dxid, const std::string &proj=config::CURRENT_PROJECT())
void write(const char *ptr, int64_t n)
Definition: dxfile.cc:443
void setMaxBufferSize(const int64_t buf_size)
Definition: dxfile.h:321
DXFile clone(const std::string &dest_proj_id, const std::string &dest_folder="/") const
Definition: dxfile.cc:622
bool is_closed() const
Definition: dxfile.cc:540
bool is_open() const
Definition: dxfile.cc:530
A remote file handler.
Definition: dxfile.h:80
void setNumWriteThreads(const int numThreads)
Definition: dxfile.h:344
static DXFile uploadLocalFile(const std::string &filename, const std::string &media_type="", const dx::JSON &data_obj_fields=dx::JSON(dx::JSON_OBJECT), bool waitForClose=false)
Definition: dxfile.cc:596
void uploadPart(const std::string &data, const int index=-1)
void read(char *ptr, int64_t n)
Definition: dxfile.cc:145
DXFile & operator=(const DXFile &to_copy)
Definition: dxfile.h:248
Definition: bqueue.h:35
void stopLinearQuery() const
Definition: dxfile.cc:287
static DXFile openDXFile(const std::string &dxid)
Definition: dxfile.cc:565
void create(const std::string &media_type="", const dx::JSON &data_obj_fields=dx::JSON(dx::JSON_OBJECT))
Definition: dxfile.cc:133
static DXFile newDXFile(const std::string &media_type="", const dx::JSON &data_obj_fields=dx::JSON(dx::JSON_OBJECT))
Definition: dxfile.cc:569
DXFile(const dx::JSON &dxlink)
Definition: dxfile.h:237
static void downloadDXFile(const std::string &dxid, const std::string &filename, int64_t chunksize=1048576)
Definition: dxfile.cc:576
void startLinearQuery(const int64_t start_byte=0, const int64_t num_bytes=-1, const int64_t chunk_size=10 *1024 *1024, const unsigned max_chunks=20, const unsigned thread_count=5) const
Definition: dxfile.cc:184
bool eof() const
Definition: dxfile.cc:303
DXFile(const DXFile &to_copy)
Definition: dxfile.h:202
An executable object that can be published for others to discover.
Definition: api.cc:7
The abstract base class for all data object remote handlers.
Definition: bindings.h:42
void seek(const int64_t pos)
Definition: dxfile.cc:307
void waitOnClose() const
Definition: dxfile.cc:561
DXFile(const std::string &dxid, const std::string &proj=config::CURRENT_PROJECT())
Definition: dxfile.h:225
virtual void close() const
Definition: bindings.cc:152
int64_t gcount() const
Definition: dxfile.cc:299
DXFile(const char *dxid, const char *proj=NULL)
Definition: dxfile.h:214
bool getNextChunk(std::string &chunk) const
Definition: dxfile.cc:263
int64_t getMaxBufferSize() const
Definition: dxfile.h:310
Represents errors relating to the DXFile class.
Definition: exceptions.h:95
void flush()
Definition: dxfile.cc:468
int getNumWriteThreads() const
Definition: dxfile.h:334