Comments (13)
Your welcome, I am glad you figured it out. Using asio::detached
really is an anti-pattern, I highly recommend using something like https://github.com/Tradias/asio-grpc/blob/master/test/utils/utils/asio_utils.hpp#L45 instead, otherwise exception from co_spawned coroutine just disappear unhandled.
from asio-grpc.
If you are using the GrpcContext for a server then you must call ServerShutdown::shutdown
before destructing it. There should also be an assertion triggered in gRPC if you forget.
from asio-grpc.
Yes, I do call this function and can see in my logs that Shutdown() ruturns
if (!m_is_shutdown.exchange(true)) {
m_shutdown_thread = std::thread([this] {
m_signals.cancel();
m_server->Shutdown();
std::cout << "I see this line" << std::endl;
});
}
and I also see this line
m_grpc_context->run();
std::cout << "I see this as well" << std::endl;
AFAICS, there is nothing else remaining that could block the destructor, but when I call m_grpc_context.reset()
it gets blocked. I am using a std::unique_ptr
just to have the ability to explicitly call the destructor and check if it is blocking.
I could also find this link to a similar problem: https://groups.google.com/g/grpc-io/c/ly-JwdzS1EM
from asio-grpc.
The GrpcContext
destructor is blocked here
detail::drain_completion_queue(*this);
I have modifiled this code to print the number of interactions
inline void drain_completion_queue(agrpc::GrpcContext& grpc_context)
{
while (detail::GrpcContextImplementation::do_one(grpc_context, detail::GrpcContextImplementation::INFINITE_FUTURE,
detail::InvokeHandler::NO, detail::AlwaysFalsePredicate{}))
{
std::cout << "------ in the loop" << std::endl;
}
}
and get the following output
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
------> in the loop
and then it hangs. I will try to determine where exactly it gets blocked in that funciton.
from asio-grpc.
Ok thanks, I don't think it helps to dig deeper into asio-grpc code. You should try to compare the examples with your own code. The order of destruction (GrpcContext first, then grpc::Server) is also important.
from asio-grpc.
I am pretty sure I am doing the shutdown exactly as you do in you examples, but it still blocks. Could this be caused by having two GrpcContext
on the same program, where one is the server and the other is a client? This is needed here to implement an automated integration test.
from asio-grpc.
Most asio-grpc tests also use two GrpcContexts, just as you described. The shutdown order is rather tricky, you can see it here: https://github.com/Tradias/asio-grpc/blob/master/test/utils/utils/grpc_client_server_test.cpp
from asio-grpc.
This is the code I have
class MockServices {
public:
MockServices(char const* address = mock::default_server_address)
: m_grpc_context { m_builder.AddCompletionQueue() }
{
m_builder.AddListeningPort(address, grpc::InsecureServerCredentials());
// Calls m_builder.RegisterService for many services.
m_server = m_builder.BuildAndStart();
}
~MockServices()
{
if (m_shutdown_thread.joinable()) {
m_shutdown_thread.join();
} else if (!m_is_shutdown.exchange(true)) {
m_server->Shutdown();
}
m_server.reset();
}
void run()
{
m_grpc_context.run();
}
void stop()
{
if (!m_is_shutdown.exchange(true)) {
m_shutdown_thread = std::thread([this] {
m_server->Shutdown();
});
}
}
...
private:
std::unique_ptr<grpc::Server> m_server;
grpc::ServerBuilder m_builder;
agrpc::GrpcContext m_grpc_context;
// Here are our services which we use in the constructor.
std::atomic_bool m_is_shutdown { false };
std::thread m_shutdown_thread;
};
The run()
function runs in one thread while stop()
is called from another thread. Does this look correct?
from asio-grpc.
I still cannot reproduce the issue. Here is the code I tried:
// Copyright 2023 Dennis Hezel
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "awaitable_client_rpc.hpp"
#include "awaitable_server_rpc.hpp"
#include "helloworld/helloworld.grpc.pb.h"
#include "helper.hpp"
#include <agrpc/asio_grpc.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/signal_set.hpp>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
namespace asio = boost::asio;
class MockServices
{
public:
MockServices(char const* address) : m_grpc_context{m_builder.AddCompletionQueue()}
{
m_builder.AddListeningPort(address, grpc::InsecureServerCredentials());
m_builder.RegisterService(&service);
m_server = m_builder.BuildAndStart();
}
~MockServices()
{
if (m_shutdown_thread.joinable())
{
m_shutdown_thread.join();
}
else if (!m_is_shutdown.exchange(true))
{
m_server->Shutdown();
}
m_server.reset();
}
void run() { m_grpc_context.run(); }
void stop()
{
if (!m_is_shutdown.exchange(true))
{
m_shutdown_thread = std::thread(
[this]
{
m_server->Shutdown();
});
}
}
std::unique_ptr<grpc::Server> m_server;
grpc::ServerBuilder m_builder;
agrpc::GrpcContext m_grpc_context;
helloworld::Greeter::AsyncService service;
std::atomic_bool m_is_shutdown{false};
std::thread m_shutdown_thread;
};
int main(int argc, const char** argv)
{
const auto port = argc >= 2 ? argv[1] : "50051";
const auto host = std::string("localhost:") + port;
grpc::Status status;
helloworld::Greeter::Stub stub{grpc::CreateChannel(host, grpc::InsecureChannelCredentials())};
agrpc::GrpcContext grpc_context;
MockServices s{"0.0.0.0:50051"};
using RPC = example::AwaitableServerRPC<&helloworld::Greeter::AsyncService::RequestSayHello>;
agrpc::register_awaitable_rpc_handler<RPC>(
s.m_grpc_context, s.service,
[&](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
helloworld::HelloReply response;
response.set_message("Hello " + request.name());
co_await rpc.finish(response, grpc::Status::OK);
},
asio::detached);
std::thread t{[&]
{
s.run();
}};
asio::co_spawn(
grpc_context,
[&]() -> asio::awaitable<void>
{
using CRPC = example::AwaitableClientRPC<&helloworld::Greeter::Stub::PrepareAsyncSayHello>;
grpc::ClientContext client_context;
helloworld::HelloRequest request;
request.set_name("world");
helloworld::HelloReply response;
status = co_await CRPC::request(grpc_context, stub, client_context, request, response);
std::cout << status.ok() << " response: " << response.message() << std::endl;
s.stop();
},
asio::detached);
grpc_context.run();
abort_if_not(status.ok());
t.join();
}
from asio-grpc.
My question was more about whether the shutdown in the code I posted looks correct. It occurs only under certain conditions so it is hard to reproduce. Is there any other way we could use to force it to exit? This is causing major headaches here.
from asio-grpc.
Your code seems fine, from my reproducer you can see that it shuts down cleanly.
I have some more hints:
- Unfinished Alarms cause the GrpcContext to hang during shutdown (until the Alarm fires).
- Calling
m_grpc_context.stop()
while there are unfinished RPCs causesm_server->Shutdown();
to never complete. This can be resolved by callingm_grpc_context.run()
again or by creating sufficientasio::work_guard
s.
from asio-grpc.
If I store the grpc-context in an optional to force destruction I get the following output on reset
=================================================================
==248854==ERROR: AddressSanitizer: heap-use-after-free on address 0x612000005bc0 at pc 0x560c77614e3f bp 0x7ffda6cd2090 sp 0x7ffda6cd2080
READ of size 8 at 0x612000005bc0 thread T0
#0 0x560c77614e3e in grpc::ServerInterface::RegisteredAsyncRequest::FinalizeResult(void**, bool*) /opt/cinemo/bifrost/bif-device-sdk/_deps/grpc-x86_64-linux-gnu-src/include/grpcpp/server_interface.h:212
#1 0x560c77c3dd6b in grpc::ServerInterface::PayloadAsyncRequest<bif::device::v1::PlayRequest>::FinalizeResult(void**, bool*) /opt/cinemo/bifrost/bif-device-sdk/_deps/grpc-x86_64-linux-gnu-src/include/grpcpp/server_interface.h:289
#2 0x560c78cc5642 in grpc::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) (/opt/cinemo/bifrost/bif-device-sdk/src/bifrost/tests/bifrost_tests+0x1c15642)
#3 0x560c7759c6d0 in grpc::CompletionQueue::NextStatus grpc::CompletionQueue::AsyncNext<gpr_timespec>(void**, bool*, gpr_timespec const&) /opt/cinemo/bifrost/bif-device-sdk/_deps/grpc-x86_64-linux-gnu-src/include/grpcpp/completion_queue.h:200
#4 0x560c77596d9e in agrpc::s::detail::get_next_event(grpc::CompletionQueue*, agrpc::s::detail::GrpcCompletionQueueEvent&, gpr_timespec) (/opt/cinemo/bifrost/bif-device-sdk/src/bifrost/tests/bifrost_tests+0x4e6d9e)
#5 0x560c77596f00 in agrpc::s::detail::GrpcContextImplementation::handle_next_completion_queue_event(agrpc::s::GrpcContext&, gpr_timespec, agrpc::s::detail::InvokeHandler) (/opt/cinemo/bifrost/bif-device-sdk/src/bifrost/tests/bifrost_tests+0x4e6f00)
#6 0x560c7759b04b in bool agrpc::s::detail::GrpcContextImplementation::do_one<agrpc::s::detail::AlwaysFalsePredicate>(agrpc::s::GrpcContext&, gpr_timespec, agrpc::s::detail::InvokeHandler, agrpc::s::detail::AlwaysFalsePredicate) /opt/cinemo/bifrost/bif-device-sdk/_deps/asio_grpc-src/src/agrpc/detail/grpc_context_implementation.ipp:210
#7 0x560c77595cbc in agrpc::s::detail::drain_completion_queue(agrpc::s::GrpcContext&) /opt/cinemo/bifrost/bif-device-sdk/_deps/asio_grpc-src/src/agrpc/detail/grpc_context.ipp:45
#8 0x560c775960b9 in agrpc::s::GrpcContext::~GrpcContext() /opt/cinemo/bifrost/bif-device-sdk/_deps/asio_grpc-src/src/agrpc/detail/grpc_context.ipp:74
#9 0x560c77aa8ad9 in std::_Optional_payload_base<agrpc::s::GrpcContext>::_M_destroy() /usr/include/c++/11/optional:260
#10 0x560c77aa02c5 in std::_Optional_payload_base<agrpc::s::GrpcContext>::_M_reset() /usr/include/c++/11/optional:280
#11 0x560c77a94d4b in std::_Optional_payload<agrpc::s::GrpcContext, false, false, false>::~_Optional_payload() /usr/include/c++/11/optional:401
#12 0x560c77a8ec91 in std::_Optional_base<agrpc::s::GrpcContext, false, false>::~_Optional_base() /usr/include/c++/11/optional:472
#13 0x560c77a8ecb1 in std::optional<agrpc::s::GrpcContext>::~optional() /usr/include/c++/11/optional:662
from asio-grpc.
Calling m_grpc_context.stop() while there are unfinished RPCs causes m_server->Shutdown(); to never complete. This can be resolved by calling m_grpc_context.run() again or by creating sufficient asio::work_guards.
This is gold. I first added stop()
because nothing else seemed to work. After removing it yesterday I happened to find there was an uncaught exception being thrown from a co_spawn
'ed task that blocked somehow the shutdown in the code above. It happens if you use asio::detached
as token in co_spawn
.
Thanks for your help.
from asio-grpc.
Related Issues (20)
- Using asynchronous gRPC server and client on same context HOT 9
- Asio-gRPC seems to have TSAN warnings HOT 8
- How to detect client closed connection HOT 6
- Can I call ServerBuilder::BuildAndStart() after GrpcContext::run() HOT 2
- Questions on how to switch from an GrpcContext to io_context and back HOT 6
- How to shutdown grpc clients HOT 1
- Clarification Needed on Thread Context Switch in writer() Function (example streaming-server.cpp) HOT 8
- assertion failed: !started_ HOT 2
- Can I use asio-grpc inside an existing boost::asio application? HOT 7
- an upgrade from 2.5.1 to 2.9.1 leads to build errors HOT 6
- build fails with latest boost 1.84 HOT 4
- [Question]: Slowly receiveing client in long-lived streaming HOT 1
- InvokeHandler conflicts with Objective-C/C++ defines HOT 1
- Need some basic help! HOT 9
- How to make grPC Client to Establish connection based on IP address? HOT 2
- Update conan package to 3.0.0 HOT 1
- Provide operator bool to check the validity of ServerPRCPtr HOT 1
- Question: add sender after run HOT 1
- simple program stuck HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from asio-grpc.