Giter VIP home page Giter VIP logo

Comments (9)

Tradias avatar Tradias commented on June 8, 2024

Hi, thanks for the detailed issue description.

You can run client and server on the same GrpcContext like you are doing. In fact, most asio-grpc tests do the same.

Note that grpc::Server::Shutdown() is graceful, it won't cancel active requests. You will have to check for it yourself.

But the actual problem lies in this line:

                auto result = co_await(agrpc::read(reader_writer, cmd) ||
                    alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));

GRPC does not support cancellation of individual reads/writes and therefore neither does asio-grpc. I am currently working on a new API for clients and server (agrpc::ClientRPC and agrpc::ServerRPC) that could support the kind of "cancellation-safety" that you need for that line to work. I can imagine that each call to agrpc::read should either initiate a new read or wait for the previous one to complete. There are some difficulties here though: What if the request parameter refers to a different object between those calls and should this allow multiple outstanding waits? Additionally, such a functionality is unrelated to gRPC and I would wish to point users of asio-grpc at an asio-based library that provides such functionality instead of having to maintain it within asio-grpc.
With all that being said though, I think your issue is the third one that stumbled across this problem of cancellation and it would seem that it is best to add this functionality to ClientRPC and ServerRPC, probably configurable through a traits type.

In the meantime you can use this experimental and not-quite-bug-free alternative based on agrpc::CancelSafe/agrpc::BasicStream:

    // bring in the server shutdown functionality
    example::ServerShutdown server_shutdown{*server, grpc_context};

    // install a request handler for the bidirectional streaming endpoint
    agrpc::repeatedly_request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
        asio::bind_executor(grpc_context, 
        [&](::grpc::ServerContext &server, ::grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> &reader_writer) -> asio::awaitable<void>
        {
            std::cout << "server: started request\n";

            // set up an alarm that is used to pace our responses to the client
            agrpc::Alarm alarm(grpc_context);
            int64_t duration_msec = 1000;

            agrpc::GrpcStream stream{grpc_context};
            example::v1::Request cmd;
            stream.initiate(agrpc::read, reader_writer, cmd);
                
            while (true)
            {
                using namespace asio::experimental::awaitable_operators;

                // wait for the first of two events to happen:
                // 
                // 1. we receive a new streaming message from the client
                // 2. the timer expires
                //
                // when the timer expires, we send a message to the client.
                std::cout << "server: reading/waiting\n";
                auto result = co_await(stream.next() ||
                    alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
                if (result.index() == 0)
                {
                    if (std::get<0>(result))
                    {
                        std::cout << "server: got streaming message\n";
                        stream.initiate(agrpc::read, reader_writer, cmd);
                    }
                    else
                    {
                        std::cout << "server: read failed\n";
                        break;
                    }
                }
                else if (server_shutdown.is_shutdown)
                {
                    break;
                }
                else
                {
                    std::cout << "server: alarm expired\n";
                    example::v1::Response resp;
                    if (!co_await agrpc::write(reader_writer, resp))
                    {
                        // client disconnected
                        co_return;
                    }
                }
            }

            co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
        }
    ));

from asio-grpc.

otherjason avatar otherjason commented on June 8, 2024

Thanks for the detailed answer! I will give that a try. A followup question: can you elaborate on how the approach is "not-quite-bug-free"? Just trying to determine whether there are any showstopping reasons why I wouldn't want to do this.

from asio-grpc.

Tradias avatar Tradias commented on June 8, 2024

I believe the main issue is that the completion of a read is not sticky within GrpcStream. If cancellation and successful completion of the read occur at the same time than cancellation takes precedence. The next call to stream.next() will then never complete because no read has been initiated (code transitioned into the alarm expiration branch and did not call stream.initiate()). But I think GrpcStream can be modified without breaking existing user and I will consider a hotfix release (2.6.1) if that helps your cause.

from asio-grpc.

otherjason avatar otherjason commented on June 8, 2024

That makes sense. If you know a reasonable fix for this, then I would welcome it; I think the GrpcStream-based model as presented accomplishes what I am looking for.

from asio-grpc.

Tradias avatar Tradias commented on June 8, 2024

I re-added a previously flaky example as a unit test but I couldn't find any issues with it. Maybe it is just something specifically wrong with that example and not with GrpcStream/CancelSafe.

One thing that is missing from my code snippet above is the call to cleanup:

co_await stream.cleanup();
co_await agrpc::finish(reader_writer, ::grpc::Status::OK);

which either waits for a previously initiated read to complete and completes immediately if none is pending. This is helpful because you must await all initiated reads before GrpcStream is being destructed.

from asio-grpc.

otherjason avatar otherjason commented on June 8, 2024

Thanks for the info. I have been working on what the process looks like for shutting down the server when a bidirectional streaming connection is open. I extended the example above to implement the shutdown logic using a saf::shared_future. So my server loop now awaits three things: a read on the GrpcStream, the Alarm to indicate that it should send an update to the client, and the shutdown future becoming ready. This works nicely with ASIO's || operator, and my code can act accordingly to whichever of the three triggered the coroutine to resume.

I tried adding the call to stream.cleanup() to my example and now my test hangs on the co_await stream.cleanup(); After thinking about it, this makes sense. The sequence of events is:

  • Client initiates the RPC
  • Server begins the wait loop above, periodically sending streaming messages. The key, though, is that there is always a GrpcStream read in flight on the server side.
  • At some point, I tell the server to shut down. This fires the shutdown shared_future, causing the server to break out of its loop.
  • It then calls stream.cleanup(). But, if I'm understanding your description above properly, this will cause the server to wait indefinitely for a new message from the client, thus thwarting my attempt to shut the server down.

I think this is what you were getting at when you said before that the read cannot be cancelled? Is there a clean way around this, so that the server can shut down without potentially being held hostage by clients that aren't aware of the shutdown? I would be fine with the client seeing an error on the RPC in this case, but I'm not sure what the best practice is for this sort of thing.

Alternatively, is it permissible to move the stream.cleanup() after the agrpc::finish()? I suppose that I could finish the RPC on the server side so the client will know that there are no further streaming messages coming (basically the server-side analog to writes_done). I assume that would cause the read on the client side to complete with an error, which would allow the client to close its end of the connection, which would then cause the stream.cleanup() on the server side to complete and thus finish the RPC. This all seems doable, as long as it would be OK to finish the RPC from the server while there is still a pending read on its side of the stream.

from asio-grpc.

Tradias avatar Tradias commented on June 8, 2024

I have tested it and on Windows it seems to be fine to have an uncompleted read while calling finish. Even though the official documentations might indicate otherwise:

Finish:

Request notification for when the server has sent the appropriate signals to the client to end the call. Should not be used concurrently with other operations.

https://grpc.github.io/grpc/cpp/classgrpc_1_1_server_async_reader_writer_interface.html#ae63d347d413c401b1e976f165efde50b

Read:

It should not be called concurrently with other streaming APIs on the same stream.

https://grpc.github.io/grpc/cpp/classgrpc_1_1internal_1_1_async_reader_interface.html#adf87c602036d69158950b7299d0aae70

It is true that the client-side read will complete with false when the server has called finished, like you said, the equivalent of writes_done.

Alternatively you can also call ServerContext::TryCancel before cleanup, this will cause read to complete immediately with false. The client will see StatusCode::CANCELLED even if you call finish (although this technically depends on what happens first).

I have created a complete example of the code here: https://github.com/Tradias/example-vcpkg-grpc/tree/asio-grpc-81. Might make it easier to refer to.

Did you know that asio also has a promise implementation https://www.boost.org/doc/libs/1_83_0/doc/html/boost_asio/reference/experimental__promise.html ? I don't think it can be shared however.

from asio-grpc.

otherjason avatar otherjason commented on June 8, 2024

Using ServerContext::TryCancel() seems to be the way to go for me. In the event that the server shuts down, having the client see StatusCode::CANCELLED makes semantic sense to me. I tested that change to my application and everything behaves as I would expect. I think this is a good solution to what I needed!

from asio-grpc.

Tradias avatar Tradias commented on June 8, 2024

Great, don't hesitate to open another issue if you have more questions!

from asio-grpc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.