Skip to content

Commit c2453ef

Browse files
committed
Fix the buggy Future and Promise implementations
Fixes #298 ### Motivation Currently the `Future` and `Promise` are implemented manually by managing conditional variables. However, the conditional variable sometimes behaviors incorrectly on macOS, while the existing `future` and `promise` from the C++ standard library works well. ### Modifications Redesign `Future` and `Promise` based on the utilities in the standard `<future>` header. In addition, fix the possible race condition when `addListener` is called after `setValue` or `setFailed`: - Thread 1: call `setValue`, switch existing listeners and call them one by one out of the lock. - Thread 2: call `addListener`, detect `complete_` is true and call the listener directly. Now, the previous listeners and the new listener are called concurrently in thread 1 and 2. This patch fixes the problem by locking the executions of all existing callbacks before modifying `complete_` to `true`. ### Verifications Run the reproduce code in #298 for 10 times and found it never failed or hang.
1 parent 94cf3fc commit c2453ef

File tree

4 files changed

+68
-124
lines changed

4 files changed

+68
-124
lines changed

lib/BinaryProtoLookupService.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string&
146146
}
147147

148148
uint64_t BinaryProtoLookupService::newRequestId() {
149-
Lock lock(mutex_);
149+
std::lock_guard<std::mutex> lock(mutex_);
150150
return ++requestIdGenerator_;
151151
}
152152

lib/Future.h

Lines changed: 62 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -19,162 +19,106 @@
1919
#ifndef LIB_FUTURE_H_
2020
#define LIB_FUTURE_H_
2121

22-
#include <condition_variable>
22+
#include <atomic>
2323
#include <functional>
24+
#include <future>
2425
#include <list>
2526
#include <memory>
2627
#include <mutex>
27-
28-
using Lock = std::unique_lock<std::mutex>;
28+
#include <utility>
2929

3030
namespace pulsar {
3131

3232
template <typename Result, typename Type>
33-
struct InternalState {
34-
std::mutex mutex;
35-
std::condition_variable condition;
36-
Result result;
37-
Type value;
38-
bool complete;
39-
40-
std::list<typename std::function<void(Result, const Type&)> > listeners;
41-
};
42-
43-
template <typename Result, typename Type>
44-
class Future {
33+
class InternalState {
4534
public:
46-
typedef std::function<void(Result, const Type&)> ListenerCallback;
47-
48-
Future& addListener(ListenerCallback callback) {
49-
InternalState<Result, Type>* state = state_.get();
50-
Lock lock(state->mutex);
35+
using Listener = std::function<void(Result, const Type &)>;
5136

52-
if (state->complete) {
53-
lock.unlock();
54-
callback(state->result, state->value);
37+
void addListener(Listener listener) {
38+
if (completed()) {
39+
listener(future_.get().first, future_.get().second);
5540
} else {
56-
state->listeners.push_back(callback);
41+
std::lock_guard<std::mutex> lock{mutex_};
42+
listeners_.emplace_back(listener);
5743
}
58-
59-
return *this;
6044
}
6145

62-
Result get(Type& result) {
63-
InternalState<Result, Type>* state = state_.get();
64-
Lock lock(state->mutex);
65-
66-
if (!state->complete) {
67-
// Wait for result
68-
while (!state->complete) {
69-
state->condition.wait(lock);
70-
}
46+
bool complete(Result result, const Type &value) {
47+
std::unique_lock<std::mutex> lock{mutex_};
48+
if (completed_) {
49+
return false;
7150
}
72-
73-
result = state->value;
74-
return state->result;
75-
}
76-
77-
template <typename Duration>
78-
bool get(Result& res, Type& value, Duration d) {
79-
InternalState<Result, Type>* state = state_.get();
80-
Lock lock(state->mutex);
81-
82-
if (!state->complete) {
83-
// Wait for result
84-
while (!state->complete) {
85-
if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) {
86-
// Timeout while waiting for the future to complete
87-
return false;
88-
}
89-
}
51+
decltype(listeners_) listeners;
52+
listeners.swap(listeners_);
53+
for (auto &&listener : listeners) {
54+
listener(result, value);
9055
}
91-
92-
value = state->value;
93-
res = state->result;
56+
// Set completed_ with true after all listeners are called to ensure the order that any listener that
57+
// is added after complete() is called after all existing listeners.
58+
completed_.store(true, std::memory_order_release);
59+
listeners_.clear();
60+
lock.unlock();
61+
promise_.set_value(std::make_pair(result, value));
9462
return true;
9563
}
9664

97-
private:
98-
typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
99-
Future(InternalStatePtr state) : state_(state) {}
65+
bool completed() const noexcept { return completed_.load(std::memory_order_acquire); }
10066

101-
std::shared_ptr<InternalState<Result, Type> > state_;
67+
Result get(Type &result) {
68+
auto pair = future_.get();
69+
result = std::move(pair.second);
70+
return pair.first;
71+
}
10272

103-
template <typename U, typename V>
104-
friend class Promise;
73+
private:
74+
std::atomic_bool completed_{false};
75+
std::promise<std::pair<Result, Type>> promise_;
76+
std::shared_future<std::pair<Result, Type>> future_{promise_.get_future()};
77+
std::list<Listener> listeners_;
78+
mutable std::mutex mutex_;
10579
};
10680

10781
template <typename Result, typename Type>
108-
class Promise {
109-
public:
110-
Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
111-
112-
bool setValue(const Type& value) const {
113-
static Result DEFAULT_RESULT;
114-
InternalState<Result, Type>* state = state_.get();
115-
Lock lock(state->mutex);
116-
117-
if (state->complete) {
118-
return false;
119-
}
82+
using InternalStatePtr = std::shared_ptr<InternalState<Result, Type>>;
12083

121-
state->value = value;
122-
state->result = DEFAULT_RESULT;
123-
state->complete = true;
124-
125-
decltype(state->listeners) listeners;
126-
listeners.swap(state->listeners);
127-
128-
lock.unlock();
129-
130-
for (auto& callback : listeners) {
131-
callback(DEFAULT_RESULT, value);
132-
}
84+
template <typename Result, typename Type>
85+
class Future {
86+
public:
87+
using Listener = typename InternalState<Result, Type>::Listener;
13388

134-
state->condition.notify_all();
135-
return true;
89+
Future &addListener(Listener listener) {
90+
state_->addListener(listener);
91+
return *this;
13692
}
13793

138-
bool setFailed(Result result) const {
139-
static Type DEFAULT_VALUE;
140-
InternalState<Result, Type>* state = state_.get();
141-
Lock lock(state->mutex);
94+
Result get(Type &result) { return state_->get(result); }
14295

143-
if (state->complete) {
144-
return false;
145-
}
96+
private:
97+
InternalStatePtr<Result, Type> state_;
14698

147-
state->result = result;
148-
state->complete = true;
99+
Future(InternalStatePtr<Result, Type> state) : state_(state) {}
149100

150-
decltype(state->listeners) listeners;
151-
listeners.swap(state->listeners);
101+
template <typename U, typename V>
102+
friend class Promise;
103+
};
152104

153-
lock.unlock();
105+
template <typename Result, typename Type>
106+
class Promise {
107+
public:
108+
Promise() : state_(std::make_shared<InternalState<Result, Type>>()) {}
154109

155-
for (auto& callback : listeners) {
156-
callback(result, DEFAULT_VALUE);
157-
}
110+
bool setValue(const Type &value) const { return state_->complete({}, value); }
158111

159-
state->condition.notify_all();
160-
return true;
161-
}
112+
bool setFailed(Result result) const { return state_->complete(result, {}); }
162113

163-
bool isComplete() const {
164-
InternalState<Result, Type>* state = state_.get();
165-
Lock lock(state->mutex);
166-
return state->complete;
167-
}
114+
bool isComplete() const { return state_->completed(); }
168115

169-
Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
116+
Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }
170117

171118
private:
172-
typedef std::function<void(Result, const Type&)> ListenerCallback;
173-
std::shared_ptr<InternalState<Result, Type> > state_;
119+
const InternalStatePtr<Result, Type> state_;
174120
};
175121

176-
class Void {};
177-
178-
} /* namespace pulsar */
122+
} // namespace pulsar
179123

180-
#endif /* LIB_FUTURE_H_ */
124+
#endif

lib/stats/ProducerStatsImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
7171
return;
7272
}
7373

74-
Lock lock(mutex_);
74+
std::unique_lock<std::mutex> lock(mutex_);
7575
std::ostringstream oss;
7676
oss << *this;
7777
numMsgsSent_ = 0;
@@ -86,7 +86,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
8686
}
8787

8888
void ProducerStatsImpl::messageSent(const Message& msg) {
89-
Lock lock(mutex_);
89+
std::lock_guard<std::mutex> lock(mutex_);
9090
numMsgsSent_++;
9191
totalMsgsSent_++;
9292
numBytesSent_ += msg.getLength();
@@ -96,7 +96,7 @@ void ProducerStatsImpl::messageSent(const Message& msg) {
9696
void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) {
9797
boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time();
9898
double diffInMicros = (currentTime - publishTime).total_microseconds();
99-
Lock lock(mutex_);
99+
std::lock_guard<std::mutex> lock(mutex_);
100100
totalLatencyAccumulator_(diffInMicros);
101101
latencyAccumulator_(diffInMicros);
102102
sendMap_[res] += 1; // Value will automatically be initialized to 0 in the constructor

tests/BasicEndToEndTest.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ TEST(BasicEndToEndTest, testBatchMessages) {
191191
}
192192

193193
void resendMessage(Result r, const MessageId msgId, Producer producer) {
194-
Lock lock(mutex_);
194+
std::unique_lock<std::mutex> lock(mutex_);
195195
if (r != ResultOk) {
196196
LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount);
197197
if (++globalResendMessageCount >= 3) {
@@ -993,7 +993,7 @@ TEST(BasicEndToEndTest, testResendViaSendCallback) {
993993
// 3 seconds
994994
std::this_thread::sleep_for(std::chrono::microseconds(3 * 1000 * 1000));
995995
producer.close();
996-
Lock lock(mutex_);
996+
std::lock_guard<std::mutex> lock(mutex_);
997997
ASSERT_GE(globalResendMessageCount, 3);
998998
}
999999

0 commit comments

Comments
 (0)