Comments (11)
I get a very similar error when I try to do it using the new make_parallel_group:
example::v1::Request request;
example::v1::Response response;
auto read = agrpc::read(reader_writer, request, asio::experimental::deferred);
auto write = agrpc::write(reader_writer, response, asio::experimental::deferred);
auto result = co_await asio::experimental::make_parallel_group(read, write).async_wait(
asio::experimental::wait_for_one_success(),
asio::use_awaitable
);
I get the below error which is exactly the same as the error I got with one of the attempts to use use_promise:
.../asio-grpc-1.4.0/src/agrpc/detail/queryGrpcContext.hpp:32:16: error: invalid ‘static_cast’ from type ‘asio::system_context’ to type ‘agrpc::s::GrpcContext&’
[build] 32 | return static_cast<agrpc::GrpcContext&>(asio::query(executor, asio::execution::context));
[build] | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from asio-grpc.
If I change the latest example from asio::use_awaitable to
asio::use_awaitable_t<agrpc::GrpcContext::executor_type>{}
, I see:
note: mismatched types ‘asio::any_io_executor’ and ‘agrpc::s::BasicGrpcExecutor<>’
from asio-grpc.
The completion handler created from the completion token that is provided to the RPC functions must have an associated executor that refers to a GrpcContext (see also documentation). You can achieve that through the use of asio::bind_executor
:
#include <asio/bind_executor.hpp>
auto&& read_promise = agrpc::read(reader_writer, asio::bind_executor(grpc_context, asio::experimental::use_promise));
and for parallel_group:
auto read = agrpc::read(reader_writer, request, asio::bind_executor(grpc_context, asio::experimental::deferred));
auto write = agrpc::write(reader_writer, response, asio::bind_executor(grpc_context, asio::experimental::deferred));
auto result = co_await asio::experimental::make_parallel_group(std::move(read), std::move(write)).async_wait(
asio::experimental::wait_for_one_success(),
asio::use_awaitable
);
I am thinking of adding an IoObject-like wrapper for grpc::AsyncService
that would behave similar to other IoObjects in Asio, which would allow code such as:
asio::awaitable<void> handle_bidirectional_streaming_request(agrpc::GrpcContext& grpc_context, example::v1::Example::AsyncService& service)
{
agrpc::AsyncService async_service{grpc_context, service};
auto&& promise = async_service.read(reader_writer, asio::experimental::use_promise);
}
Let me know if such a class would be helpful for you then I can add it to the next release.
If you want to customize the executor of the asio::awaitable then you must use a different awaitable type:
// agrpc::GrpcAwaitable<void> is a type alias for `asio::awaitable<T, agrpc::GrpcExecutor>`
agrpc::GrpcAwaitable<void> example(example::v1::Example::AsyncService& service)
{
// agrpc::GRPC_USE_AWAITABLE is simply `asio::use_awaitable_t<agrpc::GrpcExecutor>{}`
auto&& read_promise = agrpc::read(reader_writer, agrpc::GRPC_USE_AWAITABLE);
}
Note that this does not work for use_promise_t
because its implementation is bugged. It tries to get the initiating function's associated executor which is just wrong in my opinion. Nowhere else in Asio does any completion token try to do that and no initiating function in Asio has an associated executor.
from asio-grpc.
It makes sense to me that a CompletionToken would need to be bound to an executor in order to run. However, it seems that the default agrpc::DefaultCompletionToken which resolves to asio::use_awaitable_t<> doesn't require to be bound? I'm able to compile all 3 of the below lines:
auto read = co_await agrpc::read(reader_writer, request);
auto read = co_await agrpc::read(reader_writer, request, asio::use_awaitable);
auto read = co_await agrpc::read(reader_writer, request, asio::use_awaitable_t<>{});
I see in the highlighted red section of the docs you linked (https://tradias.github.io/asio-grpc/structagrpc_1_1detail_1_1_request_fn.html#details) that it says it needs to be bound, but all the following examples on that doc page, such as the example for operator()() shows use of asio::use_awaitable without bind. In the example code the default completion token is used (https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp, line 88 for example)
from asio-grpc.
Mmh, maybe the documentation does not make it clear enough, let me know how to adjust the wording:
The completion handler of asio::use_awaitable_t
has an implicit associated executor by design - it is asio::this_coro::executor
. In other words, the executor that was used to co_spawn the coroutine.
If we co_spawn onto the grpc_context directly then we can use agrpc functions without binding:
agrpc::GrpcContext grpc_context{std::make_unique<grpc::CompletionQueue>()};
asio::co_spawn(grpc_context, [&]() -> asio::awaitable<void>
{
auto executor = co_await asio::this_coro::executor;
// executor is an asio::any_io_executor created from grpc_context.get_executor()
// it can be used for agrpc functions, e.g.:
grpc::Alarm alarm;
co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::use_awaitable);
// or equivalent, relying on the default completion token
co_await agrpc::wait(alarm, std::chrono::system_clock::now());
}, asio::detached);
But if we co_spawn onto a different execution context then we need binding:
asio::io_context io_context;
asio::co_spawn(io_context, [&]() -> asio::awaitable<void>
{
auto executor = co_await asio::this_coro::executor;
// executor is an asio::any_io_executor created from io_context.get_executor()
// it cannot be used for agrpc functions directly, it needs executor binding
grpc::Alarm alarm;
// the following line would crash:
//co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::use_awaitable);
// correct would be:
co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::bind_executor(grpc_context, asio::use_awaitable));
}, asio::detached);
from asio-grpc.
Brilliant. Thank you very much. Being able to spawn and await a result from another executor is definitely on my list of needs for this project.
Open-ended question: What I am trying to build at the moment is a bidirectional streaming server that works like so (after the stream has been initially opened):
(1) On receipt of a new request (result of calling agrpc::read), post work to another thread pool (non-grpc asio::thread_pool) to retrieve and process some data, by calling asio::co_spawn or asio::post to the other thread pool.
(2) On completion of that request, get the result back on the agrpc thread. If no write is pending, initiate a new write. Else, queue the data for handling in (3)
(3) On completion of write, check the queue and initiate a new write if data is waiting, by calling agrpc::write.
I can build this using the native GRPC CompletionQueue API. But I'm struggling to figure out how to build it purely with coroutines. It seems that even for a simple bidi server that doesn't have (2) above, I need the ability to co_await agrpc::read and agrpc::write simultaneously. You have this in your streaming server example by the use of operator&&, however this only returns when both are finished. There is an operator||, but it cancels the second awaitable when the first one returns.
What I'm thinking of is an operator that can tie together two or more awaitables and wait for only one of them to complete, and NOT cancel the others. The one that completed would be processed, then replaced by a new awaitable. The ones that didn't complete would be left alone to continue running async, and the whole group would be re-awaited on the next loop. Pseudocode below:
auto read_task = agrpc::read(reader_writer, request);
auto write_task = agrpc::write(reader_writer, response);
while(true) {
auto [did_read, read_ok, did_write, write_ok] = co_await either(read_task, write_task);
if(did_read) {
if (!read_ok) { co_return; }
asio::post(other_thread_pool, [request]() { get_write_data_async(); }); // result will be enqueued to waiting_write_data
read_task = agrpc::read(reader_writer, request);
}
if(did_write) {
if (!write_ok) { co_return; }
if (!waiting_write_data.empty()) {
response.data = waiting_write_data.front();
waiting_write_data.pop();
write_task = agrpc::write(reader_writer, response);
}
}
}
I looked at the implementation of asio::experimental::awaitable_operators::operator|| and operator&& for inspiration. It seems like something similar, but replacing the call to wait_for_one_success() with wait_for_one_success(cancellation_type::none) might prevent it from canceling the unfinished tasks. However, it still seems like make_parallel_group is designed only to spawn new tasks that were deferred. I don't know how this would behave if I passed it one or more tasks that were already executing and one task that is new.
Also, in the case where there's nothing to write, the ability to assign a "null" value to write_task, and then check for null and only await the read_task would be useful. This could also be accomplished using a boolean and if statement, but if there is a more expressive way to do it, then that would be good to know.
from asio-grpc.
I see, take a look at https://github.com/Tradias/example-vcpkg-grpc/tree/asio-grpc-16 maybe that helps :)
from asio-grpc.
I also put that code into the bidirectional streaming example: https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp
from asio-grpc.
@muusbolla Did you find some time to check out the above example code? Does it answer your questions?
from asio-grpc.
@muusbolla since I have not heard from you in a while I hope that your issue has been resolved. Do not hesitate to open another issue if you have any questions or are facing any problems, I am happy to help.
from asio-grpc.
Sorry for not responding. Priorities shifted and I ended up having to table this project. Thanks again for your great library and willingness to help :)
from asio-grpc.
Related Issues (20)
- Provide operator bool to check the validity of ServerPRCPtr HOT 1
- Question: add sender after run HOT 1
- Generic server example without manual buffer serializaiton? HOT 4
- Using asynchronous gRPC server and client on same context HOT 9
- Asio-gRPC seems to have TSAN warnings HOT 8
- How to detect client closed connection HOT 6
- Can I call ServerBuilder::BuildAndStart() after GrpcContext::run() HOT 2
- Questions on how to switch from an GrpcContext to io_context and back HOT 6
- How to shutdown grpc clients HOT 1
- Clarification Needed on Thread Context Switch in writer() Function (example streaming-server.cpp) HOT 8
- assertion failed: !started_ HOT 2
- Can I use asio-grpc inside an existing boost::asio application? HOT 7
- The agrpc::GrpcContext hangs forever HOT 13
- an upgrade from 2.5.1 to 2.9.1 leads to build errors HOT 6
- build fails with latest boost 1.84 HOT 4
- [Question]: Slowly receiveing client in long-lived streaming HOT 1
- InvokeHandler conflicts with Objective-C/C++ defines HOT 1
- Need some basic help! HOT 9
- How to make grPC Client to Establish connection based on IP address? HOT 2
- Update conan package to 3.0.0 HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from asio-grpc.