Comments (9)
Hi, thanks for the detailed issue description.
You can run client and server on the same GrpcContext like you are doing. In fact, most asio-grpc tests do the same.
Note that grpc::Server::Shutdown()
is graceful, it won't cancel active requests. You will have to check for it yourself.
But the actual problem lies in this line:
auto result = co_await(agrpc::read(reader_writer, cmd) ||
alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
GRPC does not support cancellation of individual reads/writes and therefore neither does asio-grpc. I am currently working on a new API for clients and server (agrpc::ClientRPC
and agrpc::ServerRPC
) that could support the kind of "cancellation-safety" that you need for that line to work. I can imagine that each call to agrpc::read
should either initiate a new read or wait for the previous one to complete. There are some difficulties here though: What if the request parameter refers to a different object between those calls and should this allow multiple outstanding waits? Additionally, such a functionality is unrelated to gRPC and I would wish to point users of asio-grpc at an asio-based library that provides such functionality instead of having to maintain it within asio-grpc.
With all that being said though, I think your issue is the third one that stumbled across this problem of cancellation and it would seem that it is best to add this functionality to ClientRPC and ServerRPC, probably configurable through a traits type.
In the meantime you can use this experimental and not-quite-bug-free alternative based on agrpc::CancelSafe
/agrpc::BasicStream
:
// bring in the server shutdown functionality
example::ServerShutdown server_shutdown{*server, grpc_context};
// install a request handler for the bidirectional streaming endpoint
agrpc::repeatedly_request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
asio::bind_executor(grpc_context,
[&](::grpc::ServerContext &server, ::grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> &reader_writer) -> asio::awaitable<void>
{
std::cout << "server: started request\n";
// set up an alarm that is used to pace our responses to the client
agrpc::Alarm alarm(grpc_context);
int64_t duration_msec = 1000;
agrpc::GrpcStream stream{grpc_context};
example::v1::Request cmd;
stream.initiate(agrpc::read, reader_writer, cmd);
while (true)
{
using namespace asio::experimental::awaitable_operators;
// wait for the first of two events to happen:
//
// 1. we receive a new streaming message from the client
// 2. the timer expires
//
// when the timer expires, we send a message to the client.
std::cout << "server: reading/waiting\n";
auto result = co_await(stream.next() ||
alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
if (result.index() == 0)
{
if (std::get<0>(result))
{
std::cout << "server: got streaming message\n";
stream.initiate(agrpc::read, reader_writer, cmd);
}
else
{
std::cout << "server: read failed\n";
break;
}
}
else if (server_shutdown.is_shutdown)
{
break;
}
else
{
std::cout << "server: alarm expired\n";
example::v1::Response resp;
if (!co_await agrpc::write(reader_writer, resp))
{
// client disconnected
co_return;
}
}
}
co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
}
));
from asio-grpc.
Thanks for the detailed answer! I will give that a try. A followup question: can you elaborate on how the approach is "not-quite-bug-free"? Just trying to determine whether there are any showstopping reasons why I wouldn't want to do this.
from asio-grpc.
I believe the main issue is that the completion of a read is not sticky within GrpcStream. If cancellation and successful completion of the read occur at the same time than cancellation takes precedence. The next call to stream.next()
will then never complete because no read has been initiated (code transitioned into the alarm expiration branch and did not call stream.initiate()
). But I think GrpcStream can be modified without breaking existing user and I will consider a hotfix release (2.6.1) if that helps your cause.
from asio-grpc.
That makes sense. If you know a reasonable fix for this, then I would welcome it; I think the GrpcStream
-based model as presented accomplishes what I am looking for.
from asio-grpc.
I re-added a previously flaky example as a unit test but I couldn't find any issues with it. Maybe it is just something specifically wrong with that example and not with GrpcStream/CancelSafe.
One thing that is missing from my code snippet above is the call to cleanup
:
co_await stream.cleanup();
co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
which either waits for a previously initiated read to complete and completes immediately if none is pending. This is helpful because you must await all initiated reads before GrpcStream is being destructed.
from asio-grpc.
Thanks for the info. I have been working on what the process looks like for shutting down the server when a bidirectional streaming connection is open. I extended the example above to implement the shutdown logic using a saf::shared_future
. So my server loop now awaits three things: a read on the GrpcStream
, the Alarm
to indicate that it should send an update to the client, and the shutdown future becoming ready. This works nicely with ASIO's ||
operator, and my code can act accordingly to whichever of the three triggered the coroutine to resume.
I tried adding the call to stream.cleanup()
to my example and now my test hangs on the co_await stream.cleanup();
After thinking about it, this makes sense. The sequence of events is:
- Client initiates the RPC
- Server begins the wait loop above, periodically sending streaming messages. The key, though, is that there is always a
GrpcStream
read in flight on the server side. - At some point, I tell the server to shut down. This fires the shutdown
shared_future
, causing the server to break out of its loop. - It then calls
stream.cleanup()
. But, if I'm understanding your description above properly, this will cause the server to wait indefinitely for a new message from the client, thus thwarting my attempt to shut the server down.
I think this is what you were getting at when you said before that the read cannot be cancelled? Is there a clean way around this, so that the server can shut down without potentially being held hostage by clients that aren't aware of the shutdown? I would be fine with the client seeing an error on the RPC in this case, but I'm not sure what the best practice is for this sort of thing.
Alternatively, is it permissible to move the stream.cleanup()
after the agrpc::finish()
? I suppose that I could finish
the RPC on the server side so the client will know that there are no further streaming messages coming (basically the server-side analog to writes_done
). I assume that would cause the read
on the client side to complete with an error, which would allow the client to close its end of the connection, which would then cause the stream.cleanup()
on the server side to complete and thus finish the RPC. This all seems doable, as long as it would be OK to finish
the RPC from the server while there is still a pending read on its side of the stream.
from asio-grpc.
I have tested it and on Windows it seems to be fine to have an uncompleted read
while calling finish. Even though the official documentations might indicate otherwise:
Finish:
Request notification for when the server has sent the appropriate signals to the client to end the call. Should not be used concurrently with other operations.
Read:
It should not be called concurrently with other streaming APIs on the same stream.
It is true that the client-side read
will complete with false
when the server has called finished, like you said, the equivalent of writes_done
.
Alternatively you can also call ServerContext::TryCancel
before cleanup, this will cause read
to complete immediately with false
. The client will see StatusCode::CANCELLED
even if you call finish
(although this technically depends on what happens first).
I have created a complete example of the code here: https://github.com/Tradias/example-vcpkg-grpc/tree/asio-grpc-81. Might make it easier to refer to.
Did you know that asio also has a promise implementation https://www.boost.org/doc/libs/1_83_0/doc/html/boost_asio/reference/experimental__promise.html ? I don't think it can be shared however.
from asio-grpc.
Using ServerContext::TryCancel()
seems to be the way to go for me. In the event that the server shuts down, having the client see StatusCode::CANCELLED
makes semantic sense to me. I tested that change to my application and everything behaves as I would expect. I think this is a good solution to what I needed!
from asio-grpc.
Great, don't hesitate to open another issue if you have more questions!
from asio-grpc.
Related Issues (20)
- Question: Why is it recommended to create one agrpc::GrpcContext per thread? HOT 3
- feature request: support cancel in agrpc::Alarm HOT 4
- [Question] About bench mark HOT 2
- deadlock when working with 2 event loops, asio-grpc loop never yeilds to foreign loop HOT 4
- [Question] : How to detect client disconnection HOT 18
- Questions about CPU usage and event multiplexing HOT 3
- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS Error on Concurrent Write Operations HOT 2
- Lifetime issue in the docs HOT 2
- Example with small changes crashes HOT 6
- Why I get const correctness problems HOT 16
- Generic server example without manual buffer serializaiton? HOT 4
- Asio-gRPC seems to have TSAN warnings HOT 8
- How to detect client closed connection HOT 6
- Can I call ServerBuilder::BuildAndStart() after GrpcContext::run() HOT 2
- Questions on how to switch from an GrpcContext to io_context and back HOT 6
- How to shutdown grpc clients HOT 1
- Clarification Needed on Thread Context Switch in writer() Function (example streaming-server.cpp) HOT 8
- assertion failed: !started_ HOT 2
- Can I use asio-grpc inside an existing boost::asio application? HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from asio-grpc.