Skip to content

Commit 65b5279

Browse files
committed
chore: refactor ExecutorGroup
1 parent 5d78642 commit 65b5279

File tree

3 files changed

+160
-98
lines changed

3 files changed

+160
-98
lines changed

include/mrdox/Support/ExecutorGroup.hpp

+47-98
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
#define MRDOX_SUPPORT_EXECUTORGROUP_HPP
1313

1414
#include <mrdox/Platform.hpp>
15+
#include <mrdox/Support/any_callable.hpp>
1516
#include <mrdox/Support/ThreadPool.hpp>
16-
#include <mrdox/Support/unlock_guard.hpp>
17-
#include <condition_variable>
1817
#include <deque>
18+
#include <memory>
1919
#include <mutex>
2020
#include <vector>
2121

@@ -25,48 +25,65 @@ namespace mrdox {
2525
class MRDOX_DECL
2626
ExecutorGroupBase
2727
{
28+
class scoped_agent;
29+
2830
protected:
31+
struct Impl;
32+
2933
struct MRDOX_DECL
3034
AnyAgent
3135
{
3236
virtual ~AnyAgent() = 0;
3337
virtual void* get() noexcept = 0;
3438
};
3539

40+
std::unique_ptr<Impl> impl_;
41+
std::vector<std::unique_ptr<AnyAgent>> agents_;
42+
std::deque<any_callable<void(void*)>> work_;
43+
44+
explicit ExecutorGroupBase(ThreadPool&);
45+
void post(any_callable<void(void*)>);
46+
void run(std::unique_lock<std::mutex>);
47+
3648
public:
49+
template<class T>
50+
using arg_t = ThreadPool::arg_t<T>;
51+
52+
~ExecutorGroupBase();
53+
ExecutorGroupBase(ExecutorGroupBase&&) noexcept;
54+
55+
/** Block until all work has completed.
56+
*/
57+
void
58+
wait() noexcept;
3759
};
3860

3961
/** A set of execution agents for performing concurrent work.
4062
*/
4163
template<class Agent>
4264
class ExecutorGroup : public ExecutorGroupBase
4365
{
44-
struct Impl
66+
struct AgentImpl : AnyAgent
4567
{
46-
std::mutex mutex_;
47-
std::condition_variable cv_;
48-
};
68+
Agent agent_;
4969

50-
ThreadPool& threadPool_;
51-
std::unique_ptr<Impl> impl_;
52-
std::vector<std::unique_ptr<Agent>> agents_;
53-
std::deque<any_callable<void(Agent&)>> work_;
54-
std::size_t busy_ = 0;
55-
56-
public:
57-
template<class T>
58-
using arg_t = ThreadPool::arg_t<T>;
70+
template<class... Args>
71+
AgentImpl(Args&&... args)
72+
: agent_(std::forward<Args>(args)...)
73+
{
74+
}
5975

60-
ExecutorGroup(ExecutorGroup const&) = delete;
61-
ExecutorGroup& operator=(ExecutorGroup&&) = delete;
62-
ExecutorGroup& operator=(ExecutorGroup const&) = delete;
63-
ExecutorGroup(ExecutorGroup&&) = default;
76+
void* get() noexcept override
77+
{
78+
return &agent_;
79+
}
80+
};
6481

82+
public:
6583
explicit
6684
ExecutorGroup(
67-
ThreadPool& threadPool) noexcept
68-
: threadPool_(threadPool)
69-
, impl_(std::make_unique<Impl>())
85+
ThreadPool& threadPool)
86+
: ExecutorGroupBase(threadPool)
7087
{
7188
}
7289

@@ -79,8 +96,9 @@ class ExecutorGroup : public ExecutorGroupBase
7996
void
8097
emplace(Args&&... args)
8198
{
82-
agents_.emplace_back(std::make_unique<Agent>(
83-
std::forward<Args>(args)...));
99+
agents_.emplace_back(
100+
std::make_unique<AgentImpl>(
101+
std::forward<Args>(args)...));
84102
}
85103

86104
/** Submit work to be executed.
@@ -96,86 +114,17 @@ class ExecutorGroup : public ExecutorGroupBase
96114
async(F&& f, Args&&... args)
97115
{
98116
static_assert(std::is_invocable_v<F, Agent&, arg_t<Args>...>);
99-
std::unique_lock<std::mutex> lock(impl_->mutex_);
100-
work_.emplace_back(
117+
post(
101118
[
102119
f = std::forward<F>(f),
103120
args = std::tuple<arg_t<Args>...>(args...)
104-
](Agent& agent)
121+
](void* agent)
105122
{
106-
std::apply(f, std::tuple_cat(
107-
std::tuple<Agent&>(agent),
123+
std::apply(f,
124+
std::tuple_cat(std::tuple<Agent&>(
125+
*reinterpret_cast<Agent*>(agent)),
108126
std::move(args)));
109127
});
110-
if(agents_.empty())
111-
return;
112-
run(std::move(lock));
113-
}
114-
115-
/** Block until all work has completed.
116-
*/
117-
void
118-
wait()
119-
{
120-
std::unique_lock<std::mutex> lock(impl_->mutex_);
121-
impl_->cv_.wait(lock,
122-
[&]
123-
{
124-
return work_.empty() && busy_ == 0;
125-
});
126-
}
127-
128-
private:
129-
class scoped_agent
130-
{
131-
ExecutorGroup& group_;
132-
std::unique_ptr<Agent> agent_;
133-
134-
public:
135-
scoped_agent(
136-
ExecutorGroup& group,
137-
std::unique_ptr<Agent> agent) noexcept
138-
: group_(group)
139-
, agent_(std::move(agent))
140-
{
141-
}
142-
143-
~scoped_agent()
144-
{
145-
--group_.busy_;
146-
group_.agents_.emplace_back(std::move(agent_));
147-
group_.impl_->cv_.notify_all();
148-
}
149-
150-
Agent& operator*() const noexcept
151-
{
152-
return *agent_;
153-
}
154-
};
155-
156-
void
157-
run(std::unique_lock<std::mutex> lock)
158-
{
159-
std::unique_ptr<Agent> agent(std::move(agents_.back()));
160-
agents_.pop_back();
161-
++busy_;
162-
163-
threadPool_.async(
164-
[this, agent = std::move(agent)]() mutable
165-
{
166-
std::unique_lock<std::mutex> lock(impl_->mutex_);
167-
scoped_agent scope(*this, std::move(agent));
168-
for(;;)
169-
{
170-
if(work_.empty())
171-
break;
172-
any_callable<void(Agent&)> work(
173-
std::move(work_.front()));
174-
work_.pop_front();
175-
unlock_guard unlock(impl_->mutex_);
176-
work(*scope);
177-
}
178-
});
179128
}
180129
};
181130

source/-adoc/SinglePageVisitor.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
//
1010

1111
#include "SinglePageVisitor.hpp"
12+
#include <mrdox/Support/unlock_guard.hpp>
1213

1314
namespace clang {
1415
namespace mrdox {

source/Support/ExecutorGroup.cpp

+112
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,121 @@
99
//
1010

1111
#include <mrdox/Support/ExecutorGroup.hpp>
12+
#include <mrdox/Support/unlock_guard.hpp>
13+
#include <condition_variable>
1214

1315
namespace clang {
1416
namespace mrdox {
1517

18+
struct ExecutorGroupBase::
19+
Impl
20+
{
21+
ThreadPool& threadPool;
22+
std::mutex mutex;
23+
std::condition_variable cv;
24+
std::size_t busy = 0;
25+
26+
explicit
27+
Impl(ThreadPool& threadPool_)
28+
: threadPool(threadPool_)
29+
{
30+
}
31+
};
32+
33+
class ExecutorGroupBase::
34+
scoped_agent
35+
{
36+
ExecutorGroupBase& group_;
37+
std::unique_ptr<AnyAgent> agent_;
38+
39+
public:
40+
scoped_agent(
41+
ExecutorGroupBase& group,
42+
std::unique_ptr<AnyAgent> agent) noexcept
43+
: group_(group)
44+
, agent_(std::move(agent))
45+
{
46+
}
47+
48+
~scoped_agent()
49+
{
50+
--group_.impl_->busy;
51+
group_.agents_.emplace_back(std::move(agent_));
52+
group_.impl_->cv.notify_all();
53+
}
54+
55+
void* get() const noexcept
56+
{
57+
return agent_->get();
58+
}
59+
};
60+
61+
ExecutorGroupBase::
62+
AnyAgent::
63+
~AnyAgent() = default;
64+
65+
ExecutorGroupBase::
66+
ExecutorGroupBase(
67+
ThreadPool& threadPool)
68+
: impl_(std::make_unique<Impl>(threadPool))
69+
{
70+
}
71+
72+
ExecutorGroupBase::
73+
~ExecutorGroupBase() = default;
74+
75+
ExecutorGroupBase::
76+
ExecutorGroupBase(
77+
ExecutorGroupBase&&) noexcept = default;
78+
79+
void
80+
ExecutorGroupBase::
81+
post(any_callable<void(void*)> work)
82+
{
83+
std::unique_lock<std::mutex> lock(impl_->mutex);
84+
work_.emplace_back(std::move(work));
85+
if(agents_.empty())
86+
return;
87+
run(std::move(lock));
88+
}
89+
90+
void
91+
ExecutorGroupBase::
92+
run(std::unique_lock<std::mutex> lock)
93+
{
94+
std::unique_ptr<AnyAgent> agent(std::move(agents_.back()));
95+
agents_.pop_back();
96+
++impl_->busy;
97+
98+
impl_->threadPool.async(
99+
[this, agent = std::move(agent)]() mutable
100+
{
101+
std::unique_lock<std::mutex> lock(impl_->mutex);
102+
scoped_agent scope(*this, std::move(agent));
103+
for(;;)
104+
{
105+
if(work_.empty())
106+
break;
107+
any_callable<void(void*)> work(
108+
std::move(work_.front()));
109+
work_.pop_front();
110+
unlock_guard unlock(impl_->mutex);
111+
work(scope.get());
112+
}
113+
});
114+
}
115+
116+
void
117+
ExecutorGroupBase::
118+
wait() noexcept
119+
{
120+
std::unique_lock<std::mutex> lock(impl_->mutex);
121+
impl_->cv.wait(lock,
122+
[&]
123+
{
124+
return work_.empty() && impl_->busy == 0;
125+
});
126+
}
127+
16128
} // mrdox
17129
} // clang

0 commit comments

Comments
 (0)