Giter VIP home page Giter VIP logo

Comments (6)

Tradias avatar Tradias commented on June 1, 2024

Take a look at this example: #74 (comment) . It replaces agrpc::repeatedly_request with a custom version that sets up agrpc::notify_when_done.

I am also working on a new API for servers that will make this significantly more convenient.

from asio-grpc.

kgreenek avatar kgreenek commented on June 1, 2024

Thank you!

I was able to figure it out from there. It got a little bit ugly with templates, but here is the generic helper function that I ended up with:

#pragma once

#include <functional>
#include <string>

#include "agrpc/asio_grpc.hpp"
#include "boost/asio.hpp"
#include "boost/asio/experimental/promise.hpp"
#include "boost/asio/experimental/use_promise.hpp"
#include "grpc/grpc.h"

namespace my_namespace {

using OnDonePromise = boost::asio::experimental::promise<
    void(),
    boost::asio::basic_system_executor<boost::asio::execution::detail::blocking::possibly_t<0>,
                                       boost::asio::execution::detail::relationship::fork_t<0>,
                                       std::allocator<void>>,
    std::allocator<void>>;

template <class Request, class Response>
using ResponseStreamingRequestHandlerFn = std::function<boost::asio::awaitable<void>(
    grpc::ServerContext&, Request&, grpc::ServerAsyncWriter<Response>&, OnDonePromise)>;

namespace detail {

template <class Request, class Response, class AsyncService, class AsyncRequestFn>
boost::asio::awaitable<void> ResponseStreamingRequestLoop(
    agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
    ResponseStreamingRequestHandlerFn<Request, Response> request_handler) {
  grpc::ServerContext server_context;
  OnDonePromise on_done =
      agrpc::notify_when_done(grpc_context, server_context, boost::asio::experimental::use_promise);
  Request request;
  grpc::ServerAsyncWriter<Response> response_writer{&server_context};
  const bool request_ok =
      co_await agrpc::request(async_request_fn, service, server_context, request, response_writer,
                              boost::asio::use_awaitable);
  if (!request_ok) {
    // At this point, `agrpc::notify_when_done` will never complete.
    grpc_context.work_finished();
    co_return;
  }
  boost::asio::co_spawn(grpc_context,
                        ResponseStreamingRequestLoop(grpc_context, service,
                                                     std::move(async_request_fn), request_handler),
                        boost::asio::detached);
  co_await request_handler(server_context, request, response_writer, std::move(on_done));
}

}  // namespace detail

// This helper function provides the same functionality as agrpc::repeatedly_request, with the
// addition of passing an on_done promise to the callback. This enables the callback to check if the
// client has closed the connection.
template <class Request, class Response, class AsyncService, class AsyncRequestFn>
void RepeatedRequestResponseStreaming(
    agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
    ResponseStreamingRequestHandlerFn<Request, Response> request_handler) {
  boost::asio::co_spawn(
      grpc_context,
      detail::ResponseStreamingRequestLoop(grpc_context, service, std::move(async_request_fn),
                                           std::move(request_handler)),
      boost::asio::detached);
}

}  // namespace my_namespace

To use it, I do something like this:

  auto request_handler =
      [this](grpc::ServerContext& context, proto::MyServiceRequest& request,
             grpc::ServerAsyncWriter<proto::MyServiceResponse>& response_writer,
             OnDonePromise on_done) -> asio::awaitable<void> {
    while (true) {
      if (new_data_is_available) {
        if (!co_await agrpc::write(response_writer, proto::MyServiceResponse{})) {
          break;
        }
      }
      grpc::Alarm alarm;
      co_await agrpc::wait(alarm, std::chrono::system_clock::now() + std::chrono::seconds{1});
      if (on_done.completed()) {
        break;
      }
    }
    co_await agrpc::finish(response_writer, grpc::Status::OK);
  };
  RepeatedRequestResponseStreaming<proto::MyServiceRequest, proto::MyServiceResponse>(
      grpc_context, service, &proto::MyService::AsyncService::RequestMyEndpoint,
      std::move(request_handler));

from asio-grpc.

Tradias avatar Tradias commented on June 1, 2024

Yep seems fine, I would have probably written it without the std::function since it doesn't save on template instantiations anyways. Maybe like so (untested):

#pragma once

#include <functional>
#include <string>

#include "agrpc/asio_grpc.hpp"
#include "boost/asio.hpp"
#include "boost/asio/experimental/promise.hpp"
#include "boost/asio/experimental/use_promise.hpp"
#include "grpc/grpc.h"

namespace my_namespace {

using OnDonePromise = boost::asio::experimental::promise<
    void(), asio::system_executor>;

namespace detail {

template <class AsyncService, class AsyncRequestFn>
boost::asio::awaitable<void> ResponseStreamingRequestLoop(
    agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
    RequestHandler request_handler) {
  grpc::ServerContext server_context;
  OnDonePromise on_done =
      agrpc::notify_when_done(grpc_context, server_context, boost::asio::experimental::use_promise);
  Request request;
  grpc::ServerAsyncWriter<Response> response_writer{&server_context};
  const bool request_ok =
      co_await agrpc::request(async_request_fn, service, server_context, request, response_writer,
                              boost::asio::use_awaitable);
  if (!request_ok) {
    // At this point, `agrpc::notify_when_done` will never complete.
    grpc_context.work_finished();
    co_return;
  }
  boost::asio::co_spawn(grpc_context,
                        ResponseStreamingRequestLoop(grpc_context, service,
                                                     async_request_fn, request_handler),
                        boost::asio::detached);
  co_await request_handler(server_context, request, response_writer, std::move(on_done));
}

}  // namespace detail

// This helper function provides the same functionality as agrpc::repeatedly_request, with the
// addition of passing an on_done promise to the callback. This enables the callback to check if the
// client has closed the connection.
template <class AsyncService, class AsyncRequestFn, class RequestHandler>
void RepeatedRequestResponseStreaming(
    agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
    RequestHandler request_handler) {
  boost::asio::co_spawn(
      grpc_context,
      detail::ResponseStreamingRequestLoop(grpc_context, service, async_request_fn,
                                           std::move(request_handler)),
      boost::asio::detached);
}

// ...
  auto request_handler =
      [this](grpc::ServerContext& context, proto::MyServiceRequest& request,
             grpc::ServerAsyncWriter<proto::MyServiceResponse>& response_writer,
             OnDonePromise on_done) -> asio::awaitable<void> {
    while (true) {
      if (new_data_is_available) {
        if (!co_await agrpc::write(response_writer, proto::MyServiceResponse{})) {
          break;
        }
      }
      grpc::Alarm alarm;
      co_await agrpc::wait(alarm, std::chrono::system_clock::now() + std::chrono::seconds{1});
      if (on_done.completed()) {
        break;
      }
    }
    co_await agrpc::finish(response_writer, grpc::Status::OK);
  };
  RepeatedRequestResponseStreaming(
      grpc_context, service, &proto::MyService::AsyncService::RequestMyEndpoint,
      std::move(request_handler));

}  // namespace my_namespace

from asio-grpc.

kgreenek avatar kgreenek commented on June 1, 2024

This is great. Thanks again! Closing this issue.

from asio-grpc.

kgreenek avatar kgreenek commented on June 1, 2024

I tried getting rid of the Request and Response template args as you suggested, but I need them in the ResponseStreamingRequestLoop function still.

from asio-grpc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.