upload co_context

This commit is contained in:
ouczb 2024-01-29 12:16:21 +08:00
parent 3e95395cb2
commit 1688905f7f
19 changed files with 888 additions and 30 deletions

View File

@ -0,0 +1,15 @@
#include <condition_variable>
#include <mutex>
namespace co_context::detail {
struct io_context_meta_type {
std::mutex mtx;
std::condition_variable cv;
ctx_id_t create_count; // Do not initialize this
ctx_id_t ready_count; // Do not initialize this
};
inline io_context_meta_type io_context_meta;
} // namespace co_context::detail

View File

@ -0,0 +1,29 @@
#include <cstdint>
#include <span>
#include "task_info.h"
namespace co_context::detail {
class lazy_awaiter {
protected:
task_info io_info;
public:
lazy_awaiter() noexcept {
this_thread.worker->forward_lazy();
}
[[nodiscard]]
int32_t result() const noexcept {
return io_info.result;
}
static constexpr bool await_ready() noexcept { return false; }
int32_t await_resume() const noexcept {
return result();
}
void await_suspend(std::coroutine_handle<> current) noexcept {
io_info.handle = current;
}
};
struct lazy_write : lazy_awaiter {
inline
lazy_write(int fd, std::span<const char> buf, uint64_t offset) noexcept {}
};
}

View File

@ -0,0 +1,205 @@
#pragma once
#include "type.h"
namespace co_context {
template<
std::unsigned_integral T,
T capacity,
safety is_thread_safe = safety::safe,
bool is_blocking = is_thread_safe>
struct spsc_cursor {
static_assert(std::has_single_bit(capacity));
static_assert(!is_thread_safe || std::atomic<T>::is_always_lock_free);
static_assert(
is_thread_safe || !is_blocking,
"a thread-unsafe instance "
"can not be blocking"
);
inline static constexpr T mask = capacity - 1;
T m_head = 0;
T m_tail = 0;
[[nodiscard]]
inline T head() const noexcept {
return m_head & mask;
}
[[nodiscard]]
inline T tail() const noexcept {
return m_tail & mask;
}
[[nodiscard]]
inline T raw_head() const noexcept {
return m_head;
}
[[nodiscard]]
inline T raw_tail() const noexcept {
return m_tail;
}
[[nodiscard]]
inline bool is_empty() const noexcept {
return m_head == m_tail;
}
[[nodiscard]]
inline T size() const noexcept {
return m_tail - m_head;
}
[[nodiscard]]
inline T available_number() const noexcept {
return capacity - (m_tail - m_head);
}
[[nodiscard]]
inline bool is_available() const noexcept {
return bool(capacity - (m_tail - m_head));
}
[[nodiscard]]
inline T load_head() const noexcept {
if constexpr (is_thread_safe) {
return as_c_atomic(m_head).load(std::memory_order_acquire) & mask;
}
else {
return head();
}
}
[[nodiscard]]
inline T load_tail() const noexcept {
if constexpr (is_thread_safe) {
return as_c_atomic(m_tail).load(std::memory_order_acquire) & mask;
}
else {
return tail();
}
}
[[nodiscard]]
inline T load_raw_head() const noexcept {
if constexpr (is_thread_safe) {
return as_c_atomic(m_head).load(std::memory_order_acquire);
}
else {
return raw_head();
}
}
[[nodiscard]]
inline T load_raw_tail() const noexcept {
if constexpr (is_thread_safe) {
return as_c_atomic(m_tail).load(std::memory_order_acquire);
}
else {
return raw_tail();
}
}
[[nodiscard]]
inline T load_raw_tail_relaxed() const noexcept {
if constexpr (is_thread_safe) {
return as_c_atomic(m_tail).load(std::memory_order_relaxed);
}
else {
return raw_tail();
}
}
// inline void set_raw_tail(T tail) const noexcept { m_tail = tail; }
inline void store_raw_tail(T tail) noexcept {
if constexpr (is_thread_safe) {
as_atomic(m_tail).store(tail, std::memory_order_release);
}
else {
m_tail = tail;
}
}
[[nodiscard]]
inline bool is_empty_load_head() const noexcept {
return m_tail == load_raw_head();
}
[[nodiscard]]
inline bool is_empty_load_tail() const noexcept {
return m_head == load_raw_tail();
}
[[nodiscard]]
inline bool is_empty_load_tail_relaxed() const noexcept {
return m_head == load_raw_tail_relaxed();
}
[[nodiscard]]
inline bool is_available_load_head() const noexcept {
return bool(capacity - (m_tail - load_raw_head()));
}
inline void wait_for_available() const noexcept {
const T head_full = m_tail - capacity;
if constexpr (!is_thread_safe) {
assert(head_full != load_raw_head());
return;
}
if constexpr (is_blocking) {
as_c_atomic(m_head).wait(head_full, std::memory_order_acquire);
}
else {
while (head_full == load_raw_head()) {}
}
}
inline void wait_for_not_empty() const noexcept {
const T tail_empty = m_head;
if constexpr (!is_thread_safe) {
assert(tail_empty != load_raw_tail());
return;
}
if constexpr (is_blocking) {
as_c_atomic(m_tail).wait(tail_empty, std::memory_order_acquire);
}
else {
while (tail_empty == load_raw_tail()) {}
}
}
inline void push(T num = 1) noexcept {
if constexpr (is_thread_safe) {
as_atomic(m_tail).store(m_tail + num, std::memory_order_release);
}
else {
m_tail += num;
}
}
inline void pop(T num = 1) noexcept {
if constexpr (is_thread_safe) {
as_atomic(m_head).store(m_head + num, std::memory_order_release);
}
else {
m_head += num;
}
}
inline void push_notify(T num = 1) noexcept {
push(num);
if constexpr (is_thread_safe && is_blocking) {
as_atomic(m_tail).notify_one();
}
}
inline void pop_notify(T num = 1) noexcept {
pop(num);
if constexpr (is_thread_safe && is_blocking) {
as_atomic(m_head).notify_one();
}
}
};
} // namespace co_context

View File

@ -0,0 +1,13 @@
namespace co_context::detail {
struct [[nodiscard]] task_info {
std::coroutine_handle<> handle;
int32_t result;
[[nodiscard]]
uint64_t as_user_data() const noexcept {
return static_cast<uint64_t>(reinterpret_cast<uintptr_t>(this));
}
};
}

View File

@ -0,0 +1,221 @@
#include <cassert>
#include <concepts>
#include <coroutine>
#include <exception>
#include <memory>
namespace co_context {
template<typename T>
class task;
namespace detail {
template<typename T>
class task_promise_base;
/**
* @brief When current task<> finishes, resume its parent.
*/
template<typename T>
struct task_final_awaiter {
static constexpr bool await_ready() noexcept { return false; }
template<std::derived_from<task_promise_base<T>> Promise>
std::coroutine_handle<>
await_suspend(std::coroutine_handle<Promise> current) noexcept {
return current.promise().parent_coro;
}
// Won't be resumed anyway
constexpr void await_resume() const noexcept {}
};
/**
* @brief When current task<> finishes, resume its parent.
*/
template<>
struct task_final_awaiter<void> {
static constexpr bool await_ready() noexcept { return false; }
template<std::derived_from<task_promise_base<void>> Promise>
std::coroutine_handle<>
await_suspend(std::coroutine_handle<Promise> current) noexcept {
auto& promise = current.promise();
std::coroutine_handle<> continuation = promise.parent_coro;
if (promise.is_detached_flag == Promise::is_detached) {
current.destroy();
}
return continuation;
}
// Won't be resumed anyway
constexpr void await_resume() const noexcept {}
};
/**
* @brief Define the behavior of all tasks.
*
* final_suspend: yes, and return to parent
*/
template<typename T>
class task_promise_base {
friend struct task_final_awaiter<T>;
public:
task_promise_base() noexcept = default;
inline constexpr std::suspend_always initial_suspend() noexcept {
return {};
}
inline constexpr task_final_awaiter<T> final_suspend() noexcept {
return {};
}
inline void set_parent(std::coroutine_handle<> continuation) noexcept {
parent_coro = continuation;
}
task_promise_base(const task_promise_base&) = delete;
task_promise_base(task_promise_base&&) = delete;
task_promise_base& operator=(const task_promise_base&) = delete;
task_promise_base& operator=(task_promise_base&&) = delete;
private:
std::coroutine_handle<> parent_coro{ std::noop_coroutine() };
};
/**
* @brief task<> with a return value
*
* @tparam T the type of the final result
*/
template<typename T>
class task_promise final : public task_promise_base<T> {
public:
task_promise() noexcept : state(value_state::mono) {};
~task_promise() {
switch (state) {
[[likely]] case value_state::value:
value.~T();
break;
case value_state::exception:
exception_ptr.~exception_ptr();
break;
default: break;
}
};
task<T> get_return_object() noexcept;
void unhandled_exception() noexcept {
exception_ptr = std::current_exception();
state = value_state::exception;
}
template<typename Value>
requires std::convertible_to<Value&&, T>
void return_value(Value&& result
) noexcept(std::is_nothrow_constructible_v<T, Value&&>) {
std::construct_at(
std::addressof(value), std::forward<Value>(result)
);
state = value_state::value;
}
// get the lvalue ref
T& result()& {
if (state == value_state::exception) [[unlikely]] {
std::rethrow_exception(exception_ptr);
}
assert(state == value_state::value);
return value;
}
// get the prvalue
T&& result()&& {
if (state == value_state::exception) [[unlikely]] {
std::rethrow_exception(exception_ptr);
}
assert(state == value_state::value);
return std::move(value);
}
private:
union {
T value;
std::exception_ptr exception_ptr;
};
enum class value_state : uint8_t { mono, value, exception } state;
};
template<>
class task_promise<void> final : public task_promise_base<void> {
friend struct task_final_awaiter<void>;
friend class task<void>;
public:
task_promise() noexcept : is_detached_flag(0) {};
~task_promise() noexcept {
if (is_detached_flag != is_detached) {
exception_ptr.~exception_ptr();
}
}
task<void> get_return_object() noexcept;
constexpr void return_void() noexcept {}
void unhandled_exception() {
if (is_detached_flag == is_detached) {
std::rethrow_exception(std::current_exception());
}
else {
exception_ptr = std::current_exception();
}
}
void result() const {
if (this->exception_ptr) [[unlikely]] {
std::rethrow_exception(this->exception_ptr);
}
}
private:
inline static constexpr uintptr_t is_detached = -1ULL;
union {
uintptr_t is_detached_flag; // set to `is_detached` if is detached.
std::exception_ptr exception_ptr;
};
}; // namespace co_context
template<typename T>
class task_promise<T&> final : public task_promise_base<T&> {
public:
task_promise() noexcept = default;
task<T&> get_return_object() noexcept;
void unhandled_exception() noexcept {
this->exception_ptr = std::current_exception();
}
void return_value(T& result) noexcept {
value = std::addressof(result);
}
T& result() {
if (exception_ptr) [[unlikely]] {
std::rethrow_exception(exception_ptr);
}
return *value;
}
private:
T* value = nullptr;
std::exception_ptr exception_ptr;
};
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include "type.h"
namespace co_context {
class io_context;
namespace detail {
struct worker_meta;
struct thread_meta {
// The running io_context on this thread.
io_context *ctx = nullptr;
worker_meta *worker = nullptr; // ctx + offset = worker
ctx_id_t ctx_id = static_cast<ctx_id_t>(-1);
};
extern thread_local thread_meta this_thread; // NOLINT(*global-variables)
} // namespace co_context::detail
}

View File

@ -0,0 +1,18 @@
#pragma once
#define CO_CONTEXT_AWAIT_HINT nodiscard("Did you forget to co_await?")
#if defined(__GNUG__)
#define CO_CONTEXT_NOINLINE gnu::noinline
#elif defined(__clang__)
#define CO_CONTEXT_NOINLINE clang::noinline
#else
#define CO_CONTEXT_NOINLINE gnu::noinline, clang::noinline, msvc::noinline
#endif
namespace co_context {
enum safety : bool { unsafe = false, safe = true };
using cur_t = uint32_t;
using ctx_id_t = uint8_t;
inline constexpr cur_t swap_capacity = 16384;
} // namespace co_context

View File

@ -0,0 +1,50 @@
#include <array>
#include <coroutine>
#include <mutex>
#include "thread_meta.h"
#include "spsc_cursor.h"
namespace co_context::detail {
struct worker_meta final {
// number of I/O tasks running inside io_uring
int32_t requests_to_reap = 0;
// if there is at least one entry to submit to io_uring
uint32_t requests_to_submit = 0;
std::array<std::coroutine_handle<>, swap_capacity> reap_swap;
spsc_cursor<cur_t, swap_capacity, unsafe> reap_cur;
void init();
void deinit();
void forward_lazy() noexcept;
void forward_task(std::coroutine_handle<> handle) noexcept;
[[nodiscard]]
cur_t number_to_schedule() const noexcept {
const auto& cur = this->reap_cur;
return cur.size();
}
// if there is at least one task newly spawned or forwarded
[[nodiscard]]
bool has_task_ready() const noexcept {
return !reap_cur.is_empty();
}
// Get a coroutine to run. Guarantee to be non-null.
[[nodiscard]]
std::coroutine_handle<> schedule() noexcept;
void work_once();
/**
* @brief poll the submission swap zone
*/
void poll_submission() noexcept;
/**
* @brief poll the uring completion queue
*
* @return number of cqes handled by worker
*/
uint32_t poll_completion() noexcept;
};
}

View File

@ -1,24 +1,29 @@
/*
* A coroutine framework aimed at high-concurrency io with reasonable latency,
* based on liburingcxx.
*
* Copyright 2022 Zifeng Deng
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <thread>
#include "task.h"
#include "detail/worker_meta.h"
namespace co_context {
class [[nodiscard]] io_context final {
private:
friend struct detail::worker_meta;
using worker_meta = detail::worker_meta;
private:
std::thread host_thread;
worker_meta worker;
// should io_context stop
bool will_stop = false;
ctx_id_t id;
public:
explicit io_context() noexcept;
void init();
void deinit();
void start();
void run();
void co_spawn(task<void>&& entrance) noexcept;
public:
void do_submission_part() noexcept;
void do_completion_part() noexcept;
void do_worker_part();
};
} // namespace co_context

View File

@ -0,0 +1,12 @@
#pragma once
#include "detail/type.h"
#include "detail/lazy_io_awaiter.h"
namespace co_context {
inline namespace lazy {
inline detail::lazy_write
write(int fd, std::span<const char> buf, uint64_t offset) noexcept {
return detail::lazy_write{fd, buf , offset};
}
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include "detail/type.h"
#include "detail/task_promise.h"
namespace co_context {
#define CO_CONTEXT_AWAIT_HINT nodiscard("Did you forget to co_await?")
template<typename T = void>
class [[CO_CONTEXT_AWAIT_HINT]] task {
public:
using promise_type = detail::task_promise<T>;
private:
std::coroutine_handle<promise_type> handle;
public:
explicit task(std::coroutine_handle<promise_type> current) noexcept
: handle(current) {}
std::coroutine_handle<promise_type> get_handle() noexcept { return handle; }
void detach() noexcept {
if constexpr (std::is_void_v<T>) {
handle.promise().is_detached_flag = promise_type::is_detached;
}
handle = nullptr;
}
};
namespace detail {
template<typename T>
inline task<T> task_promise<T>::get_return_object() noexcept {
return task<T>{
std::coroutine_handle<task_promise>::from_promise(*this)
};
}
inline task<void> task_promise<void>::get_return_object() noexcept {
return task<void>{
std::coroutine_handle<task_promise>::from_promise(*this)
};
}
template<typename T>
inline task<T&> task_promise<T&>::get_return_object() noexcept {
return task<T&>{
std::coroutine_handle<task_promise>::from_promise(*this)
};
}
} // namespace detail
} // namespace co_context

View File

@ -0,0 +1,54 @@
#include "co_context/detail/worker_meta.h"
namespace co_context::detail {
thread_local thread_meta this_thread; // NOLINT(*global-variables)
void worker_meta::init()
{
this_thread.worker = this;
}
void worker_meta::deinit()
{
this_thread.worker = nullptr;
}
void worker_meta::forward_lazy() noexcept
{
++requests_to_reap; // NOTE may required no reap or required multi-reap.
++requests_to_submit;
}
void worker_meta::forward_task(std::coroutine_handle<> handle) noexcept
{
auto& cur = reap_cur;
reap_swap[cur.tail()] = handle;
cur.push();
}
std::coroutine_handle<> worker_meta::schedule() noexcept
{
auto& cur = this->reap_cur;
std::coroutine_handle<> chosen_coro = reap_swap[cur.head()];
cur.pop();
return chosen_coro;
}
void worker_meta::work_once()
{
const auto coro = this->schedule();
coro.resume();
}
void worker_meta::poll_submission() noexcept
{
// submit sqes
if (requests_to_submit) [[likely]] {
bool will_wait = !has_task_ready();
requests_to_submit = 0;
}
}
uint32_t worker_meta::poll_completion() noexcept
{
return 0;
}
}

View File

@ -1,8 +1,70 @@
#pragma once
#include <thread>
#include "co_context/io_context.h"
#include "co_context/detail/io_context_meta.h"
namespace co_context {
class [[nodiscard]] io_context final {
io_context::io_context() noexcept
{
auto& meta = detail::io_context_meta;
std::lock_guard lg{meta.mtx};
id = meta.create_count++;
}
void io_context::init()
{
detail::this_thread.ctx = this;
detail::this_thread.ctx_id = id;
worker.init();
}
void io_context::deinit()
{
detail::this_thread.ctx = nullptr;
detail::this_thread.ctx_id = static_cast<ctx_id_t>(-1);
};
worker.deinit();
auto& meta = detail::io_context_meta;
std::lock_guard lg{meta.mtx};
--meta.ready_count;
--meta.create_count;
}
void io_context::start()
{
init();
run();
}
void io_context::co_spawn(task<void>&& entrance) noexcept {
auto handle = entrance.get_handle();
entrance.detach();
worker.forward_task(handle);
}
void io_context::run()
{
while (!will_stop) [[likely]] {
do_worker_part();
do_submission_part();
do_completion_part();
}
}
void io_context::do_submission_part() noexcept
{
worker.poll_submission();
}
void io_context::do_completion_part() noexcept
{
const uint32_t handled_num = worker.poll_completion();
bool is_not_over = handled_num | worker.requests_to_reap;
if (!is_not_over) [[unlikely]] {
will_stop = true;
}
}
void io_context::do_worker_part()
{
auto num = worker.number_to_schedule();
for (; num > 0; --num) {
worker.work_once();
}
}
} // namespace co_context

20
engine/3rdparty/co_context/src/main.cpp vendored Normal file
View File

@ -0,0 +1,20 @@
#include<iostream>
#include "co_context/io_context.h"
#include "co_context/lazy_io.h"
using namespace std;
using namespace co_context;
task<> the_answer(int t) {
const char* name = "hello";
cout << "lazyout" << endl;
co_await lazy::write(1, { name, 5 }, 10);
cout << name << t << endl;
co_return;
}
int main() {
io_context context;
int a = 3;
context.co_spawn(the_answer(a));
context.co_spawn(the_answer(5));
context.start();
return 0;
}

View File

@ -0,0 +1,93 @@
#include<iostream>
#include<coroutine>
using namespace std;
struct future_type_int {
struct promise_type;
using co_handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
int ret_val;
promise_type() {
std::cout << "promise_type constructor" << std::endl;
}
~promise_type() {
std::cout << "promise_type destructor" << std::endl;
}
auto get_return_object() {
std::cout << "get_return_object" << std::endl;
return co_handle_type::from_promise(*this);
}
auto initial_suspend() {
std::cout << "initial_suspend" << std::endl;
return std::suspend_always();
}
auto final_suspend() noexcept(true) {
std::cout << "final_suspend" << std::endl;
return std::suspend_never();
}
void return_value(int val) {
std::cout << "return_value : " << val << std::endl;
ret_val = val;
}
void unhandled_exception() {
std::cout << "unhandled_exception" << std::endl;
std::terminate();
}
auto yield_value(int val) {
std::cout << "yield_value : " << val << std::endl;
ret_val = val;
return std::suspend_always();
}
};
future_type_int(co_handle_type co_handle) {
std::cout << "future_type_int constructor" << std::endl;
co_handle_ = co_handle;
}
~future_type_int() {
std::cout << "future_type_int destructor" << std::endl;
co_handle_.destroy();
}
future_type_int(const future_type_int&) = delete;
future_type_int(future_type_int&&) = delete;
bool resume() {
if (!co_handle_.done()) {
co_handle_.resume();
}
return !co_handle_.done();
}
co_handle_type co_handle_;
};
future_type_int three_step_coroutine() {
std::cout << "three_step_coroutine begin" << std::endl;
co_yield 222;
std::cout << "three_step_coroutine running" << std::endl;
co_yield 333;
std::cout << "three_step_coroutine end" << std::endl;
co_return 444;
}
void three_step_coroutine2(future_type_int& future_obj) {
std::cout << "three_step_coroutine begin" << std::endl;
future_obj.co_handle_.promise().yield_value(222);
}
int main() {
int a = 11;
future_type_int future_obj = three_step_coroutine();
std::cout << "=======calling first resume======" << std::endl;
future_obj.resume();
std::cout << "ret_val = " << future_obj.co_handle_.promise().ret_val << std::endl;
std::cout << "=======calling second resume=====" << std::endl;
future_obj.resume();
std::cout << "ret_val = " << future_obj.co_handle_.promise().ret_val << std::endl;
std::cout << "=======calling third resume======" << std::endl;
future_obj.resume();
std::cout << "ret_val = " << future_obj.co_handle_.promise().ret_val << std::endl;
std::cout << "=======main end======" << std::endl;
return 0;
}

View File

@ -1,5 +1,5 @@
target("co_context")
set_kind("static")
add_includedirs("include")
add_files("src/**.cpp")
set_kind("binary")
add_includedirs("include", {public = true})
add_files("src/**.cpp","src/main.cpp")
add_headerfiles("include/**.h")

View File

@ -3,7 +3,7 @@ includes("test/**xmake.lua")
target("zengine")
set_kind("binary")
add_deps("co_context")
--add_deps("co_context")
add_includedirs("src")
add_files("src/*.cpp", "src/**.cpp")
add_headerfiles("src/**.h", "src/**.inl")

View File

@ -1,6 +1,6 @@
add_rules("mode.debug", "mode.release")
set_arch("x64")
set_languages("cxx17")
set_languages("cxx20")
set_project("zengine")
set_toolchains("clang")
includes("engine")