Comments (6)
Take a look at this example: #74 (comment) . It replaces agrpc::repeatedly_request
with a custom version that sets up agrpc::notify_when_done
.
I am also working on a new API for servers that will make this significantly more convenient.
from asio-grpc.
Thank you!
I was able to figure it out from there. It got a little bit ugly with templates, but here is the generic helper function that I ended up with:
#pragma once
#include <functional>
#include <string>
#include "agrpc/asio_grpc.hpp"
#include "boost/asio.hpp"
#include "boost/asio/experimental/promise.hpp"
#include "boost/asio/experimental/use_promise.hpp"
#include "grpc/grpc.h"
namespace my_namespace {
using OnDonePromise = boost::asio::experimental::promise<
void(),
boost::asio::basic_system_executor<boost::asio::execution::detail::blocking::possibly_t<0>,
boost::asio::execution::detail::relationship::fork_t<0>,
std::allocator<void>>,
std::allocator<void>>;
template <class Request, class Response>
using ResponseStreamingRequestHandlerFn = std::function<boost::asio::awaitable<void>(
grpc::ServerContext&, Request&, grpc::ServerAsyncWriter<Response>&, OnDonePromise)>;
namespace detail {
template <class Request, class Response, class AsyncService, class AsyncRequestFn>
boost::asio::awaitable<void> ResponseStreamingRequestLoop(
agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
ResponseStreamingRequestHandlerFn<Request, Response> request_handler) {
grpc::ServerContext server_context;
OnDonePromise on_done =
agrpc::notify_when_done(grpc_context, server_context, boost::asio::experimental::use_promise);
Request request;
grpc::ServerAsyncWriter<Response> response_writer{&server_context};
const bool request_ok =
co_await agrpc::request(async_request_fn, service, server_context, request, response_writer,
boost::asio::use_awaitable);
if (!request_ok) {
// At this point, `agrpc::notify_when_done` will never complete.
grpc_context.work_finished();
co_return;
}
boost::asio::co_spawn(grpc_context,
ResponseStreamingRequestLoop(grpc_context, service,
std::move(async_request_fn), request_handler),
boost::asio::detached);
co_await request_handler(server_context, request, response_writer, std::move(on_done));
}
} // namespace detail
// This helper function provides the same functionality as agrpc::repeatedly_request, with the
// addition of passing an on_done promise to the callback. This enables the callback to check if the
// client has closed the connection.
template <class Request, class Response, class AsyncService, class AsyncRequestFn>
void RepeatedRequestResponseStreaming(
agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
ResponseStreamingRequestHandlerFn<Request, Response> request_handler) {
boost::asio::co_spawn(
grpc_context,
detail::ResponseStreamingRequestLoop(grpc_context, service, std::move(async_request_fn),
std::move(request_handler)),
boost::asio::detached);
}
} // namespace my_namespace
To use it, I do something like this:
auto request_handler =
[this](grpc::ServerContext& context, proto::MyServiceRequest& request,
grpc::ServerAsyncWriter<proto::MyServiceResponse>& response_writer,
OnDonePromise on_done) -> asio::awaitable<void> {
while (true) {
if (new_data_is_available) {
if (!co_await agrpc::write(response_writer, proto::MyServiceResponse{})) {
break;
}
}
grpc::Alarm alarm;
co_await agrpc::wait(alarm, std::chrono::system_clock::now() + std::chrono::seconds{1});
if (on_done.completed()) {
break;
}
}
co_await agrpc::finish(response_writer, grpc::Status::OK);
};
RepeatedRequestResponseStreaming<proto::MyServiceRequest, proto::MyServiceResponse>(
grpc_context, service, &proto::MyService::AsyncService::RequestMyEndpoint,
std::move(request_handler));
from asio-grpc.
Yep seems fine, I would have probably written it without the std::function
since it doesn't save on template instantiations anyways. Maybe like so (untested):
#pragma once
#include <functional>
#include <string>
#include "agrpc/asio_grpc.hpp"
#include "boost/asio.hpp"
#include "boost/asio/experimental/promise.hpp"
#include "boost/asio/experimental/use_promise.hpp"
#include "grpc/grpc.h"
namespace my_namespace {
using OnDonePromise = boost::asio::experimental::promise<
void(), asio::system_executor>;
namespace detail {
template <class AsyncService, class AsyncRequestFn>
boost::asio::awaitable<void> ResponseStreamingRequestLoop(
agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
RequestHandler request_handler) {
grpc::ServerContext server_context;
OnDonePromise on_done =
agrpc::notify_when_done(grpc_context, server_context, boost::asio::experimental::use_promise);
Request request;
grpc::ServerAsyncWriter<Response> response_writer{&server_context};
const bool request_ok =
co_await agrpc::request(async_request_fn, service, server_context, request, response_writer,
boost::asio::use_awaitable);
if (!request_ok) {
// At this point, `agrpc::notify_when_done` will never complete.
grpc_context.work_finished();
co_return;
}
boost::asio::co_spawn(grpc_context,
ResponseStreamingRequestLoop(grpc_context, service,
async_request_fn, request_handler),
boost::asio::detached);
co_await request_handler(server_context, request, response_writer, std::move(on_done));
}
} // namespace detail
// This helper function provides the same functionality as agrpc::repeatedly_request, with the
// addition of passing an on_done promise to the callback. This enables the callback to check if the
// client has closed the connection.
template <class AsyncService, class AsyncRequestFn, class RequestHandler>
void RepeatedRequestResponseStreaming(
agrpc::GrpcContext& grpc_context, AsyncService& service, AsyncRequestFn async_request_fn,
RequestHandler request_handler) {
boost::asio::co_spawn(
grpc_context,
detail::ResponseStreamingRequestLoop(grpc_context, service, async_request_fn,
std::move(request_handler)),
boost::asio::detached);
}
// ...
auto request_handler =
[this](grpc::ServerContext& context, proto::MyServiceRequest& request,
grpc::ServerAsyncWriter<proto::MyServiceResponse>& response_writer,
OnDonePromise on_done) -> asio::awaitable<void> {
while (true) {
if (new_data_is_available) {
if (!co_await agrpc::write(response_writer, proto::MyServiceResponse{})) {
break;
}
}
grpc::Alarm alarm;
co_await agrpc::wait(alarm, std::chrono::system_clock::now() + std::chrono::seconds{1});
if (on_done.completed()) {
break;
}
}
co_await agrpc::finish(response_writer, grpc::Status::OK);
};
RepeatedRequestResponseStreaming(
grpc_context, service, &proto::MyService::AsyncService::RequestMyEndpoint,
std::move(request_handler));
} // namespace my_namespace
from asio-grpc.
This is great. Thanks again! Closing this issue.
from asio-grpc.
I tried getting rid of the Request and Response template args as you suggested, but I need them in the ResponseStreamingRequestLoop function still.
from asio-grpc.
Related Issues (20)
- Question: Why is it recommended to create one agrpc::GrpcContext per thread? HOT 3
- feature request: support cancel in agrpc::Alarm HOT 4
- [Question] About bench mark HOT 2
- deadlock when working with 2 event loops, asio-grpc loop never yeilds to foreign loop HOT 4
- [Question] : How to detect client disconnection HOT 18
- Questions about CPU usage and event multiplexing HOT 3
- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS Error on Concurrent Write Operations HOT 2
- Lifetime issue in the docs HOT 2
- Example with small changes crashes HOT 6
- Why I get const correctness problems HOT 16
- Generic server example without manual buffer serializaiton? HOT 4
- Using asynchronous gRPC server and client on same context HOT 9
- Asio-gRPC seems to have TSAN warnings HOT 8
- Can I call ServerBuilder::BuildAndStart() after GrpcContext::run() HOT 2
- Questions on how to switch from an GrpcContext to io_context and back HOT 6
- How to shutdown grpc clients HOT 1
- Clarification Needed on Thread Context Switch in writer() Function (example streaming-server.cpp) HOT 8
- assertion failed: !started_ HOT 2
- Can I use asio-grpc inside an existing boost::asio application? HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from asio-grpc.