Skip to content

Commit 404e497

Browse files
committed
Supported TFuture in coroutines
1 parent ac206ef commit 404e497

File tree

9 files changed

+497
-50
lines changed

9 files changed

+497
-50
lines changed

library/cpp/threading/future/CMakeLists.txt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,33 @@ target_sources(threading-future
99
async_semaphore.cpp
1010
async.cpp
1111
core/future.cpp
12+
core/fwd.cpp
13+
fwd.cpp
14+
wait/fwd.cpp
1215
wait/wait.cpp
16+
wait/wait_group.cpp
17+
wait/wait_policy.cpp
1318
)
1419

1520
_ydb_sdk_install_targets(TARGETS threading-future)
21+
22+
if (YDB_SDK_TESTS)
23+
add_ydb_test(NAME future-ut
24+
SOURCES
25+
future_mt_ut.cpp
26+
future_ut.cpp
27+
LINK_LIBRARIES
28+
threading-future
29+
LABELS
30+
unit
31+
)
32+
33+
add_ydb_test(NAME future-coroutine-ut GTEST
34+
SOURCES
35+
ut_gtest/coroutine_traits_ut.cpp
36+
LINK_LIBRARIES
37+
threading-future
38+
LABELS
39+
unit
40+
)
41+
endif()
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#include <library/cpp/threading/future/future.h>
2+
#include <library/cpp/threading/future/core/coroutine_traits.h>
3+
4+
#include <benchmark/benchmark.h>
5+
6+
class TContext {
7+
public:
8+
TContext()
9+
: NextInputPromise_(NThreading::NewPromise<bool>())
10+
{}
11+
~TContext() {
12+
UpdateNextInput(false);
13+
}
14+
15+
NThreading::TFuture<bool> NextInput() {
16+
return NextInputPromise_.GetFuture();
17+
}
18+
19+
void UpdateNextInput(bool hasInput = true) {
20+
auto prevNextInputPromise = NextInputPromise_;
21+
NextInputPromise_ = NThreading::NewPromise<bool>();
22+
prevNextInputPromise.SetValue(hasInput);
23+
}
24+
25+
private:
26+
NThreading::TPromise<bool> NextInputPromise_;
27+
};
28+
29+
static void TestPureFutureChainSubscribe(benchmark::State& state) {
30+
TContext context;
31+
size_t cnt = 0;
32+
std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) {
33+
if (hasInput.GetValue()) {
34+
benchmark::DoNotOptimize(++cnt);
35+
context.NextInput().Subscribe(processInput);
36+
}
37+
};
38+
39+
processInput(NThreading::MakeFuture<bool>(true));
40+
for (auto _ : state) {
41+
context.UpdateNextInput();
42+
}
43+
context.UpdateNextInput(false);
44+
}
45+
46+
static void TestPureFutureChainApply(benchmark::State& state) {
47+
TContext context;
48+
size_t cnt = 0;
49+
std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) {
50+
if (hasInput.GetValue()) {
51+
benchmark::DoNotOptimize(++cnt);
52+
context.NextInput().Apply(processInput);
53+
}
54+
};
55+
56+
processInput(NThreading::MakeFuture<bool>(true));
57+
for (auto _ : state) {
58+
context.UpdateNextInput();
59+
}
60+
context.UpdateNextInput(false);
61+
}
62+
63+
static void TestCoroFutureChain(benchmark::State& state) {
64+
TContext context;
65+
size_t cnt = 0;
66+
auto coroutine = [&context, &cnt]() -> NThreading::TFuture<void> {
67+
while (co_await context.NextInput()) {
68+
benchmark::DoNotOptimize(++cnt);
69+
}
70+
};
71+
72+
auto coroutineFuture = coroutine();
73+
for (auto _ : state) {
74+
context.UpdateNextInput();
75+
}
76+
context.UpdateNextInput(false);
77+
coroutineFuture.GetValueSync();
78+
}
79+
80+
BENCHMARK(TestPureFutureChainSubscribe);
81+
BENCHMARK(TestPureFutureChainApply);
82+
BENCHMARK(TestCoroFutureChain);
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#pragma once
2+
3+
#include <library/cpp/threading/future/future.h>
4+
5+
#include <coroutine>
6+
7+
template <typename... Args>
8+
struct std::coroutine_traits<NThreading::TFuture<void>, Args...> {
9+
struct promise_type {
10+
11+
NThreading::TFuture<void> get_return_object() {
12+
return Promise_.GetFuture();
13+
}
14+
15+
std::suspend_never initial_suspend() { return {}; }
16+
std::suspend_never final_suspend() noexcept { return {}; }
17+
18+
void unhandled_exception() {
19+
Promise_.SetException(std::current_exception());
20+
}
21+
22+
void return_void() {
23+
Promise_.SetValue();
24+
}
25+
26+
private:
27+
NThreading::TPromise<void> Promise_ = NThreading::NewPromise();
28+
};
29+
};
30+
31+
template <typename T, typename... Args>
32+
struct std::coroutine_traits<NThreading::TFuture<T>, Args...> {
33+
struct promise_type {
34+
NThreading::TFuture<T> get_return_object() {
35+
return Promise_.GetFuture();
36+
}
37+
38+
std::suspend_never initial_suspend() { return {}; }
39+
std::suspend_never final_suspend() noexcept { return {}; }
40+
41+
void unhandled_exception() {
42+
Promise_.SetException(std::current_exception());
43+
}
44+
45+
void return_value(auto&& val) {
46+
Promise_.SetValue(std::forward<decltype(val)>(val));
47+
}
48+
49+
private:
50+
NThreading::TPromise<T> Promise_ = NThreading::NewPromise<T>();
51+
};
52+
};
53+
54+
namespace NThreading {
55+
56+
template <typename T, bool Extracting = false>
57+
struct TFutureAwaitable {
58+
NThreading::TFuture<T> Future;
59+
60+
TFutureAwaitable(const NThreading::TFuture<T>& future) noexcept requires (!Extracting)
61+
: Future{future}
62+
{
63+
}
64+
65+
TFutureAwaitable(NThreading::TFuture<T>&& future) noexcept
66+
: Future{std::move(future)}
67+
{
68+
}
69+
70+
bool await_ready() const noexcept {
71+
return Future.IsReady();
72+
}
73+
74+
void await_suspend(auto h) noexcept {
75+
/*
76+
* This library assumes that resume never throws an exception.
77+
* This assumption is made due to the fact that the users of these library in most cases do not need to write their own coroutine handlers,
78+
* and all coroutine handlers provided by the library do not throw exception from resume.
79+
*
80+
* WARNING: do not change subscribe to apply or something other here, creating an extra future state degrades performance.
81+
*/
82+
Future.NoexceptSubscribe(
83+
[h](auto) mutable noexcept {
84+
h();
85+
}
86+
);
87+
}
88+
89+
decltype(auto) await_resume() {
90+
if constexpr (Extracting && !std::is_same_v<T, void>) { // Future<void> has only GetValue()
91+
return Future.ExtractValue();
92+
} else {
93+
return Future.GetValue();
94+
}
95+
}
96+
};
97+
98+
template <typename T>
99+
using TExtractingFutureAwaitable = TFutureAwaitable<T, true>;
100+
101+
} // namespace NThreading
102+
103+
template <typename T>
104+
auto operator co_await(const NThreading::TFuture<T>& future) noexcept {
105+
return NThreading::TFutureAwaitable{future};
106+
}
107+
108+
template <typename T>
109+
auto operator co_await(NThreading::TFuture<T>&& future) noexcept {
110+
// Not TExtractongFutureAwaitable, because TFuture works like std::shared_future.
111+
// auto value = co_await GetCachedFuture();
112+
// If GetCachedFuture stores a future in some cache and returns its copies,
113+
// then subsequent uses of co_await will return a moved-from value.
114+
return NThreading::TFutureAwaitable{std::move(future)};
115+
}
116+
117+
namespace NThreading {
118+
119+
template <typename T>
120+
auto AsAwaitable(const NThreading::TFuture<T>& fut) noexcept {
121+
return TFutureAwaitable(fut);
122+
}
123+
124+
template <typename T>
125+
auto AsExtractingAwaitable(NThreading::TFuture<T>&& fut) noexcept {
126+
return TExtractingFutureAwaitable<T>(std::move(fut));
127+
}
128+
129+
} // namespace NThreading

library/cpp/threading/future/core/future-inl.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#if !defined(INCLUDE_FUTURE_INL_H)
44
#error "you should never include future-inl.h directly"
5+
#include "future.h" // Fix LSP
56
#endif // INCLUDE_FUTURE_INL_H
67

78
namespace NThreading {
@@ -116,6 +117,9 @@ namespace NThreading {
116117
bool HasException() const {
117118
return AtomicGet(State) == ExceptionSet;
118119
}
120+
bool IsReady() const {
121+
return AtomicGet(State) != NotReady;
122+
}
119123

120124
const T& GetValue(TDuration timeout = TDuration::Zero()) const {
121125
AccessValue(timeout, ValueRead);
@@ -297,6 +301,9 @@ namespace NThreading {
297301
bool HasException() const {
298302
return AtomicGet(State) == ExceptionSet;
299303
}
304+
bool IsReady() const {
305+
return AtomicGet(State) != NotReady;
306+
}
300307

301308
void GetValue(TDuration timeout = TDuration::Zero()) const {
302309
TAtomicBase state = AtomicGet(State);
@@ -583,6 +590,10 @@ namespace NThreading {
583590
inline bool TFuture<T>::HasException() const {
584591
return State && State->HasException();
585592
}
593+
template <typename T>
594+
inline bool TFuture<T>::IsReady() const {
595+
return State && State->IsReady();
596+
}
586597

587598
template <typename T>
588599
inline void TFuture<T>::Wait() const {
@@ -688,6 +699,9 @@ namespace NThreading {
688699
inline bool TFuture<void>::HasException() const {
689700
return State && State->HasException();
690701
}
702+
inline bool TFuture<void>::IsReady() const {
703+
return State && State->IsReady();
704+
}
691705

692706
inline void TFuture<void>::Wait() const {
693707
EnsureInitialized();
@@ -823,6 +837,11 @@ namespace NThreading {
823837
return State && State->HasException();
824838
}
825839

840+
template <typename T>
841+
inline bool TPromise<T>::IsReady() const {
842+
return State && State->IsReady();
843+
}
844+
826845
template <typename T>
827846
inline void TPromise<T>::SetException(const TString& e) {
828847
EnsureInitialized();
@@ -904,6 +923,10 @@ namespace NThreading {
904923
return State && State->HasException();
905924
}
906925

926+
inline bool TPromise<void>::IsReady() const {
927+
return State && State->IsReady();
928+
}
929+
907930
inline void TPromise<void>::SetException(const TString& e) {
908931
EnsureInitialized();
909932
State->SetException(std::make_exception_ptr(yexception() << e));

library/cpp/threading/future/core/future.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ namespace NThreading {
9898
void TryRethrow() const;
9999
bool HasException() const;
100100

101+
// returns true if exception or value was set.
102+
// allows to check readiness without locking cheker-thread
103+
// NOTE: returns true even if value was extracted from promise
104+
// good replace for HasValue() || HasException()
105+
bool IsReady() const;
106+
101107
void Wait() const;
102108
bool Wait(TDuration timeout) const;
103109
bool Wait(TInstant deadline) const;
@@ -153,6 +159,11 @@ namespace NThreading {
153159
void TryRethrow() const;
154160
bool HasException() const;
155161

162+
// returns true if exception or value was set.
163+
// allows to check readiness without locking cheker-thread
164+
// good replace for HasValue() || HasException()
165+
bool IsReady() const;
166+
156167
void Wait() const;
157168
bool Wait(TDuration timeout) const;
158169
bool Wait(TInstant deadline) const;
@@ -216,6 +227,12 @@ namespace NThreading {
216227

217228
void TryRethrow() const;
218229
bool HasException() const;
230+
231+
// returns true if exception or value was set.
232+
// allows to check readiness without locking cheker-thread
233+
// NOTE: returns true even if value was extracted from promise
234+
// good replace for HasValue() || HasException()
235+
bool IsReady() const;
219236
void SetException(const TString& e);
220237
void SetException(std::exception_ptr e);
221238
bool TrySetException(std::exception_ptr e);
@@ -256,6 +273,11 @@ namespace NThreading {
256273

257274
void TryRethrow() const;
258275
bool HasException() const;
276+
277+
// returns true if exception or value was set.
278+
// allows to check readiness without locking cheker-thread
279+
// good replace for HasValue() || HasException()
280+
bool IsReady() const;
259281
void SetException(const TString& e);
260282
void SetException(std::exception_ptr e);
261283
bool TrySetException(std::exception_ptr e);

library/cpp/threading/future/future.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#pragma once
22

3+
// IWYU pragma: begin_exports
34
#include "core/future.h"
45
#include "wait/wait.h"
6+
// IWYU pragma: end_exports

0 commit comments

Comments
 (0)