Skip to content

Update library/cpp/threading/future #511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions library/cpp/testing/unittest/registar.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,12 @@ public: \

#define UNIT_FAIL_IMPL(R, M) \
do { \
::NUnitTest::NPrivate::RaiseError(R, ::TStringBuilder() << R << " at " << __LOCATION__ << ", " << __PRETTY_FUNCTION__ << ": " << M, true); \
::NUnitTest::NPrivate::RaiseError(R, ::TStringBuilder() << R << " at " << __LOCATION__ << ", " << std::string{__PRETTY_FUNCTION__} << ": " << M, true); \
} while (false)

#define UNIT_FAIL_NONFATAL_IMPL(R, M) \
do { \
::NUnitTest::NPrivate::RaiseError(R, ::TStringBuilder() << R << " at " << __LOCATION__ << ", " << __PRETTY_FUNCTION__ << ": " << M, false); \
::NUnitTest::NPrivate::RaiseError(R, ::TStringBuilder() << R << " at " << __LOCATION__ << ", " << std::string{__PRETTY_FUNCTION__} << ": " << M, false); \
} while (false)

#define UNIT_FAIL(M) UNIT_FAIL_IMPL("forced failure", M)
Expand Down
35 changes: 35 additions & 0 deletions library/cpp/threading/future/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,42 @@ target_sources(threading-future
async_semaphore.cpp
async.cpp
core/future.cpp
core/fwd.cpp
fwd.cpp
wait/fwd.cpp
wait/wait.cpp
wait/wait_group.cpp
wait/wait_policy.cpp
)

_ydb_sdk_install_targets(TARGETS threading-future)

if (YDB_SDK_TESTS)
add_ydb_test(NAME future-ut
SOURCES
future_mt_ut.cpp
future_ut.cpp
LINK_LIBRARIES
yutil
threading-future
TEST_ARG
--filter-file filter.txt
LABELS
unit
)

file(WRITE ${CMAKE_CURRENT_BINARY_DIR}/filter.txt
"-TFutureMultiThreadedTest::WaitAny\n"
"-TFutureMultiThreadedTest::WaitExceptionOrAll"
)

add_ydb_test(NAME future-coroutine-ut GTEST
SOURCES
ut_gtest/coroutine_traits_ut.cpp
LINK_LIBRARIES
yutil
threading-future
LABELS
unit
)
endif()
82 changes: 82 additions & 0 deletions library/cpp/threading/future/benchmark/coroutine_traits.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include <library/cpp/threading/future/future.h>
#include <library/cpp/threading/future/core/coroutine_traits.h>

#include <benchmark/benchmark.h>

class TContext {
public:
TContext()
: NextInputPromise_(NThreading::NewPromise<bool>())
{}
~TContext() {
UpdateNextInput(false);
}

NThreading::TFuture<bool> NextInput() {
return NextInputPromise_.GetFuture();
}

void UpdateNextInput(bool hasInput = true) {
auto prevNextInputPromise = NextInputPromise_;
NextInputPromise_ = NThreading::NewPromise<bool>();
prevNextInputPromise.SetValue(hasInput);
}

private:
NThreading::TPromise<bool> NextInputPromise_;
};

static void TestPureFutureChainSubscribe(benchmark::State& state) {
TContext context;
size_t cnt = 0;
std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) {
if (hasInput.GetValue()) {
benchmark::DoNotOptimize(++cnt);
context.NextInput().Subscribe(processInput);
}
};

processInput(NThreading::MakeFuture<bool>(true));
for (auto _ : state) {
context.UpdateNextInput();
}
context.UpdateNextInput(false);
}

static void TestPureFutureChainApply(benchmark::State& state) {
TContext context;
size_t cnt = 0;
std::function<void(const NThreading::TFuture<bool>&)> processInput = [&context, &cnt, &processInput](const NThreading::TFuture<bool>& hasInput) {
if (hasInput.GetValue()) {
benchmark::DoNotOptimize(++cnt);
context.NextInput().Apply(processInput);
}
};

processInput(NThreading::MakeFuture<bool>(true));
for (auto _ : state) {
context.UpdateNextInput();
}
context.UpdateNextInput(false);
}

static void TestCoroFutureChain(benchmark::State& state) {
TContext context;
size_t cnt = 0;
auto coroutine = [&context, &cnt]() -> NThreading::TFuture<void> {
while (co_await context.NextInput()) {
benchmark::DoNotOptimize(++cnt);
}
};

auto coroutineFuture = coroutine();
for (auto _ : state) {
context.UpdateNextInput();
}
context.UpdateNextInput(false);
coroutineFuture.GetValueSync();
}

BENCHMARK(TestPureFutureChainSubscribe);
BENCHMARK(TestPureFutureChainApply);
BENCHMARK(TestCoroFutureChain);
129 changes: 129 additions & 0 deletions library/cpp/threading/future/core/coroutine_traits.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#pragma once

#include <library/cpp/threading/future/future.h>

#include <coroutine>

template <typename... Args>
struct std::coroutine_traits<NThreading::TFuture<void>, Args...> {
struct promise_type {

NThreading::TFuture<void> get_return_object() {
return Promise_.GetFuture();
}

std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }

void unhandled_exception() {
Promise_.SetException(std::current_exception());
}

void return_void() {
Promise_.SetValue();
}

private:
NThreading::TPromise<void> Promise_ = NThreading::NewPromise();
};
};

template <typename T, typename... Args>
struct std::coroutine_traits<NThreading::TFuture<T>, Args...> {
struct promise_type {
NThreading::TFuture<T> get_return_object() {
return Promise_.GetFuture();
}

std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }

void unhandled_exception() {
Promise_.SetException(std::current_exception());
}

void return_value(auto&& val) {
Promise_.SetValue(std::forward<decltype(val)>(val));
}

private:
NThreading::TPromise<T> Promise_ = NThreading::NewPromise<T>();
};
};

namespace NThreading {

template <typename T, bool Extracting = false>
struct TFutureAwaitable {
NThreading::TFuture<T> Future;

TFutureAwaitable(const NThreading::TFuture<T>& future) noexcept requires (!Extracting)
: Future{future}
{
}

TFutureAwaitable(NThreading::TFuture<T>&& future) noexcept
: Future{std::move(future)}
{
}

bool await_ready() const noexcept {
return Future.IsReady();
}

void await_suspend(auto h) noexcept {
/*
* This library assumes that resume never throws an exception.
* This assumption is made due to the fact that the users of these library in most cases do not need to write their own coroutine handlers,
* and all coroutine handlers provided by the library do not throw exception from resume.
*
* WARNING: do not change subscribe to apply or something other here, creating an extra future state degrades performance.
*/
Future.NoexceptSubscribe(
[h](auto) mutable noexcept {
h();
}
);
}

decltype(auto) await_resume() {
if constexpr (Extracting && !std::is_same_v<T, void>) { // Future<void> has only GetValue()
return Future.ExtractValue();
} else {
return Future.GetValue();
}
}
};

template <typename T>
using TExtractingFutureAwaitable = TFutureAwaitable<T, true>;

} // namespace NThreading

template <typename T>
auto operator co_await(const NThreading::TFuture<T>& future) noexcept {
return NThreading::TFutureAwaitable{future};
}

template <typename T>
auto operator co_await(NThreading::TFuture<T>&& future) noexcept {
// Not TExtractongFutureAwaitable, because TFuture works like std::shared_future.
// auto value = co_await GetCachedFuture();
// If GetCachedFuture stores a future in some cache and returns its copies,
// then subsequent uses of co_await will return a moved-from value.
return NThreading::TFutureAwaitable{std::move(future)};
}

namespace NThreading {

template <typename T>
auto AsAwaitable(const NThreading::TFuture<T>& fut) noexcept {
return TFutureAwaitable(fut);
}

template <typename T>
auto AsExtractingAwaitable(NThreading::TFuture<T>&& fut) noexcept {
return TExtractingFutureAwaitable<T>(std::move(fut));
}

} // namespace NThreading
23 changes: 23 additions & 0 deletions library/cpp/threading/future/core/future-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#if !defined(INCLUDE_FUTURE_INL_H)
#error "you should never include future-inl.h directly"
#include "future.h" // Fix LSP
#endif // INCLUDE_FUTURE_INL_H

namespace NThreading {
Expand Down Expand Up @@ -116,6 +117,9 @@ namespace NThreading {
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
bool IsReady() const {
return AtomicGet(State) != NotReady;
}

const T& GetValue(TDuration timeout = TDuration::Zero()) const {
AccessValue(timeout, ValueRead);
Expand Down Expand Up @@ -297,6 +301,9 @@ namespace NThreading {
bool HasException() const {
return AtomicGet(State) == ExceptionSet;
}
bool IsReady() const {
return AtomicGet(State) != NotReady;
}

void GetValue(TDuration timeout = TDuration::Zero()) const {
TAtomicBase state = AtomicGet(State);
Expand Down Expand Up @@ -583,6 +590,10 @@ namespace NThreading {
inline bool TFuture<T>::HasException() const {
return State && State->HasException();
}
template <typename T>
inline bool TFuture<T>::IsReady() const {
return State && State->IsReady();
}

template <typename T>
inline void TFuture<T>::Wait() const {
Expand Down Expand Up @@ -688,6 +699,9 @@ namespace NThreading {
inline bool TFuture<void>::HasException() const {
return State && State->HasException();
}
inline bool TFuture<void>::IsReady() const {
return State && State->IsReady();
}

inline void TFuture<void>::Wait() const {
EnsureInitialized();
Expand Down Expand Up @@ -823,6 +837,11 @@ namespace NThreading {
return State && State->HasException();
}

template <typename T>
inline bool TPromise<T>::IsReady() const {
return State && State->IsReady();
}

template <typename T>
inline void TPromise<T>::SetException(const TString& e) {
EnsureInitialized();
Expand Down Expand Up @@ -904,6 +923,10 @@ namespace NThreading {
return State && State->HasException();
}

inline bool TPromise<void>::IsReady() const {
return State && State->IsReady();
}

inline void TPromise<void>::SetException(const TString& e) {
EnsureInitialized();
State->SetException(std::make_exception_ptr(yexception() << e));
Expand Down
22 changes: 22 additions & 0 deletions library/cpp/threading/future/core/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ namespace NThreading {
void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// NOTE: returns true even if value was extracted from promise
// good replace for HasValue() || HasException()
bool IsReady() const;

void Wait() const;
bool Wait(TDuration timeout) const;
bool Wait(TInstant deadline) const;
Expand Down Expand Up @@ -153,6 +159,11 @@ namespace NThreading {
void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// good replace for HasValue() || HasException()
bool IsReady() const;

void Wait() const;
bool Wait(TDuration timeout) const;
bool Wait(TInstant deadline) const;
Expand Down Expand Up @@ -216,6 +227,12 @@ namespace NThreading {

void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// NOTE: returns true even if value was extracted from promise
// good replace for HasValue() || HasException()
bool IsReady() const;
void SetException(const TString& e);
void SetException(std::exception_ptr e);
bool TrySetException(std::exception_ptr e);
Expand Down Expand Up @@ -256,6 +273,11 @@ namespace NThreading {

void TryRethrow() const;
bool HasException() const;

// returns true if exception or value was set.
// allows to check readiness without locking cheker-thread
// good replace for HasValue() || HasException()
bool IsReady() const;
void SetException(const TString& e);
void SetException(std::exception_ptr e);
bool TrySetException(std::exception_ptr e);
Expand Down
2 changes: 2 additions & 0 deletions library/cpp/threading/future/future.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#pragma once

// IWYU pragma: begin_exports
#include "core/future.h"
#include "wait/wait.h"
// IWYU pragma: end_exports
Loading
Loading