Giter VIP home page Giter VIP logo

Comments (13)

Tradias avatar Tradias commented on July 24, 2024 1

Your welcome, I am glad you figured it out. Using asio::detached really is an anti-pattern, I highly recommend using something like https://github.com/Tradias/asio-grpc/blob/master/test/utils/utils/asio_utils.hpp#L45 instead, otherwise exception from co_spawned coroutine just disappear unhandled.

from asio-grpc.

Tradias avatar Tradias commented on July 24, 2024

If you are using the GrpcContext for a server then you must call ServerShutdown::shutdown before destructing it. There should also be an assertion triggered in gRPC if you forget.

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

Yes, I do call this function and can see in my logs that Shutdown() ruturns

    if (!m_is_shutdown.exchange(true)) {                                                              
        m_shutdown_thread = std::thread([this] {                                                                                  
            m_signals.cancel();                                                                       
            m_server->Shutdown();                                                                     
            std::cout << "I see this line" << std::endl;                                  
        });                                                                                           
    }

and I also see this line

    m_grpc_context->run();                                                                            
    std::cout << "I see this as well" << std::endl;

AFAICS, there is nothing else remaining that could block the destructor, but when I call m_grpc_context.reset() it gets blocked. I am using a std::unique_ptr just to have the ability to explicitly call the destructor and check if it is blocking.

I could also find this link to a similar problem: https://groups.google.com/g/grpc-io/c/ly-JwdzS1EM

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

The GrpcContext destructor is blocked here

detail::drain_completion_queue(*this);

I have modifiled this code to print the number of interactions

inline void drain_completion_queue(agrpc::GrpcContext& grpc_context)
{
    while (detail::GrpcContextImplementation::do_one(grpc_context, detail::GrpcContextImplementation::INFINITE_FUTURE,
                                                     detail::InvokeHandler::NO, detail::AlwaysFalsePredicate{}))
    {
        std::cout << "------ in the loop" << std::endl;
    }
}

and get the following output

------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop

and then it hangs. I will try to determine where exactly it gets blocked in that funciton.

from asio-grpc.

Tradias avatar Tradias commented on July 24, 2024

Ok thanks, I don't think it helps to dig deeper into asio-grpc code. You should try to compare the examples with your own code. The order of destruction (GrpcContext first, then grpc::Server) is also important.

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

I am pretty sure I am doing the shutdown exactly as you do in you examples, but it still blocks. Could this be caused by having two GrpcContext on the same program, where one is the server and the other is a client? This is needed here to implement an automated integration test.

from asio-grpc.

Tradias avatar Tradias commented on July 24, 2024

Most asio-grpc tests also use two GrpcContexts, just as you described. The shutdown order is rather tricky, you can see it here: https://github.com/Tradias/asio-grpc/blob/master/test/utils/utils/grpc_client_server_test.cpp

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

This is the code I have

class MockServices {
public:
    MockServices(char const* address = mock::default_server_address)
        : m_grpc_context { m_builder.AddCompletionQueue() }
    {
        m_builder.AddListeningPort(address, grpc::InsecureServerCredentials());
        // Calls m_builder.RegisterService for many services.
        m_server = m_builder.BuildAndStart();
    }

    ~MockServices()
    {
        if (m_shutdown_thread.joinable()) {
            m_shutdown_thread.join();
        } else if (!m_is_shutdown.exchange(true)) {
            m_server->Shutdown();
        }

        m_server.reset();
    }

    void run()
    {
        m_grpc_context.run();
    }

    void stop()
    {
        if (!m_is_shutdown.exchange(true)) {
            m_shutdown_thread = std::thread([this] {
                m_server->Shutdown();
            });
        }
    }
    ...

private:
    std::unique_ptr<grpc::Server> m_server;
    grpc::ServerBuilder m_builder;
    agrpc::GrpcContext m_grpc_context;
    // Here are our services which we use in the constructor.
    std::atomic_bool m_is_shutdown { false };
    std::thread m_shutdown_thread;
};

The run() function runs in one thread while stop() is called from another thread. Does this look correct?

from asio-grpc.

Tradias avatar Tradias commented on July 24, 2024

I still cannot reproduce the issue. Here is the code I tried:

// Copyright 2023 Dennis Hezel
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_client_rpc.hpp"
#include "awaitable_server_rpc.hpp"
#include "helloworld/helloworld.grpc.pb.h"
#include "helper.hpp"

#include <agrpc/asio_grpc.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/signal_set.hpp>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>

namespace asio = boost::asio;

class MockServices
{
  public:
    MockServices(char const* address) : m_grpc_context{m_builder.AddCompletionQueue()}
    {
        m_builder.AddListeningPort(address, grpc::InsecureServerCredentials());
        m_builder.RegisterService(&service);
        m_server = m_builder.BuildAndStart();
    }

    ~MockServices()
    {
        if (m_shutdown_thread.joinable())
        {
            m_shutdown_thread.join();
        }
        else if (!m_is_shutdown.exchange(true))
        {
            m_server->Shutdown();
        }

        m_server.reset();
    }

    void run() { m_grpc_context.run(); }

    void stop()
    {
        if (!m_is_shutdown.exchange(true))
        {
            m_shutdown_thread = std::thread(
                [this]
                {
                    m_server->Shutdown();
                });
        }
    }

    std::unique_ptr<grpc::Server> m_server;
    grpc::ServerBuilder m_builder;
    agrpc::GrpcContext m_grpc_context;
    helloworld::Greeter::AsyncService service;
    std::atomic_bool m_is_shutdown{false};
    std::thread m_shutdown_thread;
};

int main(int argc, const char** argv)
{
    const auto port = argc >= 2 ? argv[1] : "50051";
    const auto host = std::string("localhost:") + port;

    grpc::Status status;

    helloworld::Greeter::Stub stub{grpc::CreateChannel(host, grpc::InsecureChannelCredentials())};
    agrpc::GrpcContext grpc_context;

    MockServices s{"0.0.0.0:50051"};

    using RPC = example::AwaitableServerRPC<&helloworld::Greeter::AsyncService::RequestSayHello>;
    agrpc::register_awaitable_rpc_handler<RPC>(
        s.m_grpc_context, s.service,
        [&](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
        {
            helloworld::HelloReply response;
            response.set_message("Hello " + request.name());
            co_await rpc.finish(response, grpc::Status::OK);
        },
        asio::detached);

    std::thread t{[&]
                  {
                      s.run();
                  }};

    asio::co_spawn(
        grpc_context,
        [&]() -> asio::awaitable<void>
        {
            using CRPC = example::AwaitableClientRPC<&helloworld::Greeter::Stub::PrepareAsyncSayHello>;
            grpc::ClientContext client_context;
            helloworld::HelloRequest request;
            request.set_name("world");
            helloworld::HelloReply response;
            status = co_await CRPC::request(grpc_context, stub, client_context, request, response);
            std::cout << status.ok() << " response: " << response.message() << std::endl;
            s.stop();
        },
        asio::detached);

    grpc_context.run();

    abort_if_not(status.ok());
    t.join();
}

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

My question was more about whether the shutdown in the code I posted looks correct. It occurs only under certain conditions so it is hard to reproduce. Is there any other way we could use to force it to exit? This is causing major headaches here.

from asio-grpc.

Tradias avatar Tradias commented on July 24, 2024

Your code seems fine, from my reproducer you can see that it shuts down cleanly.

I have some more hints:

  • Unfinished Alarms cause the GrpcContext to hang during shutdown (until the Alarm fires).
  • Calling m_grpc_context.stop() while there are unfinished RPCs causes m_server->Shutdown(); to never complete. This can be resolved by calling m_grpc_context.run() again or by creating sufficient asio::work_guards.

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

If I store the grpc-context in an optional to force destruction I get the following output on reset

=================================================================
==248854==ERROR: AddressSanitizer: heap-use-after-free on address 0x612000005bc0 at pc 0x560c77614e3f bp 0x7ffda6cd2090 sp 0x7ffda6cd2080
READ of size 8 at 0x612000005bc0 thread T0
    #0 0x560c77614e3e in grpc::ServerInterface::RegisteredAsyncRequest::FinalizeResult(void**, bool*) /opt/cinemo/bifrost/bif-device-sdk/_deps/grpc-x86_64-linux-gnu-src/include/grpcpp/server_interface.h:212
    #1 0x560c77c3dd6b in grpc::ServerInterface::PayloadAsyncRequest<bif::device::v1::PlayRequest>::FinalizeResult(void**, bool*) /opt/cinemo/bifrost/bif-device-sdk/_deps/grpc-x86_64-linux-gnu-src/include/grpcpp/server_interface.h:289
    #2 0x560c78cc5642 in grpc::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) (/opt/cinemo/bifrost/bif-device-sdk/src/bifrost/tests/bifrost_tests+0x1c15642)
    #3 0x560c7759c6d0 in grpc::CompletionQueue::NextStatus grpc::CompletionQueue::AsyncNext<gpr_timespec>(void**, bool*, gpr_timespec const&) /opt/cinemo/bifrost/bif-device-sdk/_deps/grpc-x86_64-linux-gnu-src/include/grpcpp/completion_queue.h:200
    #4 0x560c77596d9e in agrpc::s::detail::get_next_event(grpc::CompletionQueue*, agrpc::s::detail::GrpcCompletionQueueEvent&, gpr_timespec) (/opt/cinemo/bifrost/bif-device-sdk/src/bifrost/tests/bifrost_tests+0x4e6d9e)
    #5 0x560c77596f00 in agrpc::s::detail::GrpcContextImplementation::handle_next_completion_queue_event(agrpc::s::GrpcContext&, gpr_timespec, agrpc::s::detail::InvokeHandler) (/opt/cinemo/bifrost/bif-device-sdk/src/bifrost/tests/bifrost_tests+0x4e6f00)
    #6 0x560c7759b04b in bool agrpc::s::detail::GrpcContextImplementation::do_one<agrpc::s::detail::AlwaysFalsePredicate>(agrpc::s::GrpcContext&, gpr_timespec, agrpc::s::detail::InvokeHandler, agrpc::s::detail::AlwaysFalsePredicate) /opt/cinemo/bifrost/bif-device-sdk/_deps/asio_grpc-src/src/agrpc/detail/grpc_context_implementation.ipp:210
    #7 0x560c77595cbc in agrpc::s::detail::drain_completion_queue(agrpc::s::GrpcContext&) /opt/cinemo/bifrost/bif-device-sdk/_deps/asio_grpc-src/src/agrpc/detail/grpc_context.ipp:45
    #8 0x560c775960b9 in agrpc::s::GrpcContext::~GrpcContext() /opt/cinemo/bifrost/bif-device-sdk/_deps/asio_grpc-src/src/agrpc/detail/grpc_context.ipp:74
    #9 0x560c77aa8ad9 in std::_Optional_payload_base<agrpc::s::GrpcContext>::_M_destroy() /usr/include/c++/11/optional:260
    #10 0x560c77aa02c5 in std::_Optional_payload_base<agrpc::s::GrpcContext>::_M_reset() /usr/include/c++/11/optional:280
    #11 0x560c77a94d4b in std::_Optional_payload<agrpc::s::GrpcContext, false, false, false>::~_Optional_payload() /usr/include/c++/11/optional:401
    #12 0x560c77a8ec91 in std::_Optional_base<agrpc::s::GrpcContext, false, false>::~_Optional_base() /usr/include/c++/11/optional:472
    #13 0x560c77a8ecb1 in std::optional<agrpc::s::GrpcContext>::~optional() /usr/include/c++/11/optional:662

from asio-grpc.

criatura2 avatar criatura2 commented on July 24, 2024

Calling m_grpc_context.stop() while there are unfinished RPCs causes m_server->Shutdown(); to never complete. This can be resolved by calling m_grpc_context.run() again or by creating sufficient asio::work_guards.

This is gold. I first added stop() because nothing else seemed to work. After removing it yesterday I happened to find there was an uncaught exception being thrown from a co_spawn'ed task that blocked somehow the shutdown in the code above. It happens if you use asio::detached as token in co_spawn.

Thanks for your help.

from asio-grpc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.