Added
Link Here
|
1 |
--- openvdb/openvdb/io/Queue.cc.orig 2021-06-11 21:38:41 UTC |
2 |
+++ openvdb/openvdb/io/Queue.cc |
3 |
@@ -5,21 +5,23 @@ |
4 |
/// @author Peter Cucka |
5 |
|
6 |
#include "Queue.h" |
7 |
- |
8 |
#include "File.h" |
9 |
#include "Stream.h" |
10 |
-#include <openvdb/Exceptions.h> |
11 |
-#include <openvdb/util/logging.h> |
12 |
+#include "openvdb/Exceptions.h" |
13 |
+#include "openvdb/util/logging.h" |
14 |
+ |
15 |
#include <tbb/concurrent_hash_map.h> |
16 |
-#include <tbb/task.h> |
17 |
-#include <tbb/tbb_thread.h> // for tbb::this_tbb_thread::sleep() |
18 |
-#include <tbb/tick_count.h> |
19 |
+#include <tbb/task_arena.h> |
20 |
+ |
21 |
+#include <thread> |
22 |
#include <algorithm> // for std::max() |
23 |
#include <atomic> |
24 |
#include <iostream> |
25 |
#include <map> |
26 |
#include <mutex> |
27 |
+#include <chrono> |
28 |
|
29 |
+ |
30 |
namespace openvdb { |
31 |
OPENVDB_USE_VERSION_NAMESPACE |
32 |
namespace OPENVDB_VERSION_NAME { |
33 |
@@ -28,18 +30,19 @@ namespace io { |
34 |
namespace { |
35 |
|
36 |
// Abstract base class for queuable TBB tasks that adds a task completion callback |
37 |
-class Task: public tbb::task |
38 |
+class Task |
39 |
{ |
40 |
public: |
41 |
Task(Queue::Id id): mId(id) {} |
42 |
- ~Task() override {} |
43 |
+ virtual ~Task() {} |
44 |
|
45 |
Queue::Id id() const { return mId; } |
46 |
|
47 |
void setNotifier(Queue::Notifier& notifier) { mNotify = notifier; } |
48 |
+ virtual void execute() const = 0; |
49 |
|
50 |
protected: |
51 |
- void notify(Queue::Status status) { if (mNotify) mNotify(this->id(), status); } |
52 |
+ void notify(Queue::Status status) const { if (mNotify) mNotify(this->id(), status); } |
53 |
|
54 |
private: |
55 |
Queue::Id mId; |
56 |
@@ -48,7 +51,7 @@ class Task: public tbb::task (private) |
57 |
|
58 |
|
59 |
// Queuable TBB task that writes one or more grids to a .vdb file or an output stream |
60 |
-class OutputTask: public Task |
61 |
+class OutputTask : public Task |
62 |
{ |
63 |
public: |
64 |
OutputTask(Queue::Id id, const GridCPtrVec& grids, const Archive& archive, |
65 |
@@ -56,10 +59,10 @@ class OutputTask: public Task (public) |
66 |
: Task(id) |
67 |
, mGrids(grids) |
68 |
, mArchive(archive.copy()) |
69 |
- , mMetadata(metadata) |
70 |
- {} |
71 |
+ , mMetadata(metadata) {} |
72 |
+ ~OutputTask() override {} |
73 |
|
74 |
- tbb::task* execute() override |
75 |
+ void execute() const override |
76 |
{ |
77 |
Queue::Status status = Queue::FAILED; |
78 |
try { |
79 |
@@ -69,10 +72,8 @@ class OutputTask: public Task (public) |
80 |
if (const char* msg = e.what()) { |
81 |
OPENVDB_LOG_ERROR(msg); |
82 |
} |
83 |
- } catch (...) { |
84 |
- } |
85 |
+ } catch (...) {} |
86 |
this->notify(status); |
87 |
- return nullptr; // no successor to this task |
88 |
} |
89 |
|
90 |
private: |
91 |
@@ -94,7 +95,6 @@ struct Queue::Impl |
92 |
/// @todo Provide more information than just "succeeded" or "failed"? |
93 |
using StatusMap = tbb::concurrent_hash_map<Queue::Id, Queue::Status>; |
94 |
|
95 |
- |
96 |
Impl() |
97 |
: mTimeout(Queue::DEFAULT_TIMEOUT) |
98 |
, mCapacity(Queue::DEFAULT_CAPACITY) |
99 |
@@ -159,12 +159,15 @@ struct Queue::Impl |
100 |
|
101 |
bool canEnqueue() const { return mNumTasks < Int64(mCapacity); } |
102 |
|
103 |
- void enqueue(Task& task) |
104 |
+ void enqueue(OutputTask& task) |
105 |
{ |
106 |
- tbb::tick_count start = tbb::tick_count::now(); |
107 |
+ auto start = std::chrono::steady_clock::now(); |
108 |
while (!canEnqueue()) { |
109 |
- tbb::this_tbb_thread::sleep(tbb::tick_count::interval_t(0.5/*sec*/)); |
110 |
- if ((tbb::tick_count::now() - start).seconds() > double(mTimeout)) { |
111 |
+ std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500)); |
112 |
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( |
113 |
+ std::chrono::steady_clock::now() - start); |
114 |
+ const double seconds = double(duration.count()) / 1000.0; |
115 |
+ if (seconds > double(mTimeout)) { |
116 |
OPENVDB_THROW(RuntimeError, |
117 |
"unable to queue I/O task; " << mTimeout << "-second time limit expired"); |
118 |
} |
119 |
@@ -173,7 +176,10 @@ struct Queue::Impl |
120 |
std::placeholders::_1, std::placeholders::_2); |
121 |
task.setNotifier(notify); |
122 |
this->setStatus(task.id(), Queue::PENDING); |
123 |
- tbb::task::enqueue(task); |
124 |
+ |
125 |
+ // get the global task arena |
126 |
+ tbb::task_arena arena(tbb::task_arena::attach{}); |
127 |
+ arena.enqueue([task = std::move(task)] { task.execute(); }); |
128 |
++mNumTasks; |
129 |
} |
130 |
|
131 |
@@ -204,7 +210,7 @@ Queue::~Queue() |
132 |
/// (e.g., by keeping a static registry of queues that also dispatches |
133 |
/// or blocks notifications)? |
134 |
while (mImpl->mNumTasks > 0) { |
135 |
- tbb::this_tbb_thread::sleep(tbb::tick_count::interval_t(0.5/*sec*/)); |
136 |
+ std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500)); |
137 |
} |
138 |
} |
139 |
|
140 |
@@ -290,16 +296,8 @@ Queue::Id |
141 |
Queue::writeGridVec(const GridCPtrVec& grids, const Archive& archive, const MetaMap& metadata) |
142 |
{ |
143 |
const Queue::Id taskId = mImpl->mNextId++; |
144 |
- // From the "GUI Thread" chapter in the TBB Design Patterns guide |
145 |
- OutputTask* task = |
146 |
- new(tbb::task::allocate_root()) OutputTask(taskId, grids, archive, metadata); |
147 |
- try { |
148 |
- mImpl->enqueue(*task); |
149 |
- } catch (openvdb::RuntimeError&) { |
150 |
- // Destroy the task if it could not be enqueued, then rethrow the exception. |
151 |
- tbb::task::destroy(*task); |
152 |
- throw; |
153 |
- } |
154 |
+ OutputTask task(taskId, grids, archive, metadata); |
155 |
+ mImpl->enqueue(task); |
156 |
return taskId; |
157 |
} |
158 |
|