Giter VIP home page Giter VIP logo

Comments (11)

muusbolla avatar muusbolla commented on May 20, 2024

I get a very similar error when I try to do it using the new make_parallel_group:

example::v1::Request request;
example::v1::Response response;
auto read = agrpc::read(reader_writer, request, asio::experimental::deferred);
auto write = agrpc::write(reader_writer, response, asio::experimental::deferred);
auto result = co_await asio::experimental::make_parallel_group(read, write).async_wait(
        asio::experimental::wait_for_one_success(),
        asio::use_awaitable
);

I get the below error which is exactly the same as the error I got with one of the attempts to use use_promise:

.../asio-grpc-1.4.0/src/agrpc/detail/queryGrpcContext.hpp:32:16: error: invalid ‘static_cast’ from type ‘asio::system_context’ to type ‘agrpc::s::GrpcContext&’
[build]    32 |         return static_cast<agrpc::GrpcContext&>(asio::query(executor, asio::execution::context));
[build]       |                ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

from asio-grpc.

muusbolla avatar muusbolla commented on May 20, 2024

If I change the latest example from asio::use_awaitable to
asio::use_awaitable_t<agrpc::GrpcContext::executor_type>{}
, I see:

note: mismatched types ‘asio::any_io_executor’ and ‘agrpc::s::BasicGrpcExecutor<>’

from asio-grpc.

Tradias avatar Tradias commented on May 20, 2024

The completion handler created from the completion token that is provided to the RPC functions must have an associated executor that refers to a GrpcContext (see also documentation). You can achieve that through the use of asio::bind_executor:

#include <asio/bind_executor.hpp>

auto&& read_promise = agrpc::read(reader_writer, asio::bind_executor(grpc_context, asio::experimental::use_promise));

and for parallel_group:

auto read = agrpc::read(reader_writer, request, asio::bind_executor(grpc_context, asio::experimental::deferred));
auto write = agrpc::write(reader_writer, response, asio::bind_executor(grpc_context, asio::experimental::deferred));
auto result = co_await asio::experimental::make_parallel_group(std::move(read), std::move(write)).async_wait(
        asio::experimental::wait_for_one_success(),
        asio::use_awaitable
);

I am thinking of adding an IoObject-like wrapper for grpc::AsyncService that would behave similar to other IoObjects in Asio, which would allow code such as:

asio::awaitable<void> handle_bidirectional_streaming_request(agrpc::GrpcContext& grpc_context, example::v1::Example::AsyncService& service)
{
  agrpc::AsyncService async_service{grpc_context, service};
  auto&& promise = async_service.read(reader_writer, asio::experimental::use_promise);
}

Let me know if such a class would be helpful for you then I can add it to the next release.


If you want to customize the executor of the asio::awaitable then you must use a different awaitable type:

// agrpc::GrpcAwaitable<void> is a type alias for `asio::awaitable<T, agrpc::GrpcExecutor>`
agrpc::GrpcAwaitable<void> example(example::v1::Example::AsyncService& service)
{
  // agrpc::GRPC_USE_AWAITABLE is simply `asio::use_awaitable_t<agrpc::GrpcExecutor>{}`
  auto&& read_promise = agrpc::read(reader_writer, agrpc::GRPC_USE_AWAITABLE);
}

Note that this does not work for use_promise_t because its implementation is bugged. It tries to get the initiating function's associated executor which is just wrong in my opinion. Nowhere else in Asio does any completion token try to do that and no initiating function in Asio has an associated executor.

from asio-grpc.

muusbolla avatar muusbolla commented on May 20, 2024

It makes sense to me that a CompletionToken would need to be bound to an executor in order to run. However, it seems that the default agrpc::DefaultCompletionToken which resolves to asio::use_awaitable_t<> doesn't require to be bound? I'm able to compile all 3 of the below lines:

auto read = co_await agrpc::read(reader_writer, request);
auto read = co_await agrpc::read(reader_writer, request, asio::use_awaitable);
auto read = co_await agrpc::read(reader_writer, request, asio::use_awaitable_t<>{});

I see in the highlighted red section of the docs you linked (https://tradias.github.io/asio-grpc/structagrpc_1_1detail_1_1_request_fn.html#details) that it says it needs to be bound, but all the following examples on that doc page, such as the example for operator()() shows use of asio::use_awaitable without bind. In the example code the default completion token is used (https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp, line 88 for example)

from asio-grpc.

Tradias avatar Tradias commented on May 20, 2024

Mmh, maybe the documentation does not make it clear enough, let me know how to adjust the wording:

The completion handler of asio::use_awaitable_t has an implicit associated executor by design - it is asio::this_coro::executor. In other words, the executor that was used to co_spawn the coroutine.

If we co_spawn onto the grpc_context directly then we can use agrpc functions without binding:

agrpc::GrpcContext grpc_context{std::make_unique<grpc::CompletionQueue>()};
asio::co_spawn(grpc_context, [&]() -> asio::awaitable<void>
{
  auto executor = co_await asio::this_coro::executor;
  // executor is an asio::any_io_executor created from grpc_context.get_executor()
  // it can be used for agrpc functions, e.g.:
  grpc::Alarm alarm;
  co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::use_awaitable);
  // or equivalent, relying on the default completion token
  co_await agrpc::wait(alarm, std::chrono::system_clock::now());
}, asio::detached);

But if we co_spawn onto a different execution context then we need binding:

asio::io_context io_context;
asio::co_spawn(io_context, [&]() -> asio::awaitable<void>
{
  auto executor = co_await asio::this_coro::executor;
  // executor is an asio::any_io_executor created from io_context.get_executor()
  // it cannot be used for agrpc functions directly, it needs executor binding
  grpc::Alarm alarm;
  // the following line would crash:
  //co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::use_awaitable);
  // correct would be:
  co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::bind_executor(grpc_context, asio::use_awaitable));
}, asio::detached);

from asio-grpc.

muusbolla avatar muusbolla commented on May 20, 2024

Brilliant. Thank you very much. Being able to spawn and await a result from another executor is definitely on my list of needs for this project.

Open-ended question: What I am trying to build at the moment is a bidirectional streaming server that works like so (after the stream has been initially opened):
(1) On receipt of a new request (result of calling agrpc::read), post work to another thread pool (non-grpc asio::thread_pool) to retrieve and process some data, by calling asio::co_spawn or asio::post to the other thread pool.
(2) On completion of that request, get the result back on the agrpc thread. If no write is pending, initiate a new write. Else, queue the data for handling in (3)
(3) On completion of write, check the queue and initiate a new write if data is waiting, by calling agrpc::write.

I can build this using the native GRPC CompletionQueue API. But I'm struggling to figure out how to build it purely with coroutines. It seems that even for a simple bidi server that doesn't have (2) above, I need the ability to co_await agrpc::read and agrpc::write simultaneously. You have this in your streaming server example by the use of operator&&, however this only returns when both are finished. There is an operator||, but it cancels the second awaitable when the first one returns.

What I'm thinking of is an operator that can tie together two or more awaitables and wait for only one of them to complete, and NOT cancel the others. The one that completed would be processed, then replaced by a new awaitable. The ones that didn't complete would be left alone to continue running async, and the whole group would be re-awaited on the next loop. Pseudocode below:

auto read_task = agrpc::read(reader_writer, request);
auto write_task = agrpc::write(reader_writer, response);
while(true) {
  auto [did_read, read_ok, did_write, write_ok] = co_await either(read_task, write_task);
  if(did_read) {
    if (!read_ok) { co_return; }
    asio::post(other_thread_pool, [request]() { get_write_data_async(); }); // result will be enqueued to waiting_write_data
    read_task = agrpc::read(reader_writer, request);
  }
  if(did_write) {
    if (!write_ok) { co_return; }
    if (!waiting_write_data.empty()) {
      response.data = waiting_write_data.front();
      waiting_write_data.pop();
      write_task = agrpc::write(reader_writer, response);
    }
  }
}

I looked at the implementation of asio::experimental::awaitable_operators::operator|| and operator&& for inspiration. It seems like something similar, but replacing the call to wait_for_one_success() with wait_for_one_success(cancellation_type::none) might prevent it from canceling the unfinished tasks. However, it still seems like make_parallel_group is designed only to spawn new tasks that were deferred. I don't know how this would behave if I passed it one or more tasks that were already executing and one task that is new.

Also, in the case where there's nothing to write, the ability to assign a "null" value to write_task, and then check for null and only await the read_task would be useful. This could also be accomplished using a boolean and if statement, but if there is a more expressive way to do it, then that would be good to know.

from asio-grpc.

Tradias avatar Tradias commented on May 20, 2024

I see, take a look at https://github.com/Tradias/example-vcpkg-grpc/tree/asio-grpc-16 maybe that helps :)

from asio-grpc.

Tradias avatar Tradias commented on May 20, 2024

I also put that code into the bidirectional streaming example: https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp

from asio-grpc.

Tradias avatar Tradias commented on May 20, 2024

@muusbolla Did you find some time to check out the above example code? Does it answer your questions?

from asio-grpc.

Tradias avatar Tradias commented on May 20, 2024

@muusbolla since I have not heard from you in a while I hope that your issue has been resolved. Do not hesitate to open another issue if you have any questions or are facing any problems, I am happy to help.

from asio-grpc.

muusbolla avatar muusbolla commented on May 20, 2024

Sorry for not responding. Priorities shifted and I ended up having to table this project. Thanks again for your great library and willingness to help :)

from asio-grpc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.