bqueue.h
1 // Copyright (C) 2013-2016 DNAnexus, Inc.
2 //
3 // This file is part of dx-toolkit (DNAnexus platform client libraries).
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License"); you may
6 // not use this file except in compliance with the License. You may obtain a
7 // copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 // License for the specific language governing permissions and limitations
15 // under the License.
16 
17 #ifndef UA_BQUEUE_H
18 #define UA_BQUEUE_H
19 
20 #include <queue>
21 #include <boost/thread.hpp>
22 
23 namespace dx {
34  template<typename T>
35  class BlockingQueue {
36  public:
37 
38  BlockingQueue(int capacity_ = -1) : capacity(capacity_) {
39  }
40 
41  void setCapacity(int capacity_);
42  int getCapacity() const;
43  void produce(T chunk);
44  T consume();
45 
46  size_t size() const;
47  bool empty() const;
48 
49  private:
50 
51  /* The capacity of the queue, or -1 if the capacity is unbounded. */
52  int capacity;
53 
54  /* The underlying queue. */
55  std::queue<T> chunks;
56 
57  boost::mutex mut;
58  boost::condition_variable canProduce;
59  boost::condition_variable canConsume;
60  };
61 
62  template<typename T> void BlockingQueue<T>::setCapacity(int capacity_) {
63  capacity = capacity_;
64  }
65 
66  template<typename T> int BlockingQueue<T>::getCapacity() const {
67  return capacity;
68  }
69 
70  template<typename T> void BlockingQueue<T>::produce(T chunk) {
71  {
72  boost::unique_lock<boost::mutex> lock(mut);
73  if (capacity != -1) {
74  while (chunks.size() == (size_t) capacity) {
75  canProduce.wait(lock);
76  }
77  }
78  chunks.push(chunk);
79  }
80  canConsume.notify_all();
81  }
82 
83  template<typename T> T BlockingQueue<T>::consume() {
84  T chunk;
85  {
86  boost::unique_lock<boost::mutex> lock(mut);
87  while (chunks.empty()) {
88  canConsume.wait(lock);
89  }
90  chunk = chunks.front();
91  chunks.pop();
92  }
93  canProduce.notify_all();
94  return chunk;
95  }
96 
97  template<typename T> size_t BlockingQueue<T>::size() const {
98  return chunks.size();
99  }
100 
101  template<typename T> bool BlockingQueue<T>::empty() const {
102  return chunks.empty();
103  }
104 }
105 
106 #endif
Definition: bqueue.h:35
An executable object that can be published for others to discover.
Definition: api.cc:7