Comments (6)
Thank you very much for the detailed information. Give me a chance to review the information and I'll get back to you.
from rethinkdb.driver.
So, after some study, I think the issue arises because the CancellationToken
is used twice. See below:
async void RunChanges(CancellationToken ct)
{
Console.WriteLine("RunChanges: called");
Cursor<RethinkDb.Driver.Model.Change<JObject>> changes = null;
try
{
changes = await R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn, ct); <<<-- ONCE HERE
while (await changes.MoveNextAsync(ct)) <<<-- TWICE HERE
{
Console.WriteLine("RunChanges: got a change");
}
}
catch(OperationCanceledException)
{
Console.WriteLine("RunChanges: op canceled");
}
finally
{
Console.WriteLine("RunChanges: finally");
changes?.Close();
Console.WriteLine("RunChanges: changes cursor closed");
}
Console.WriteLine("RunChanges: returning");
}
- Once, here:
.RunChangesAsync<JObject>(conn, ct);
- Twice, here:
while (await changes.MoveNextAsync(ct))
As far as I can tell, here's what happens:
- The
Main()
thread (TID:M
) starts up. - A connection to the server is established.
- Background response pump thread (
TID:B
) is created and blocks on the receive side of the network socket.
- Background response pump thread (
TID:M
callsRunChanges
.TID:M
creates and sends the query over the network:R.Db("rethinkdb").Table("jobs") .Changes().OptArg("include_initial", "true") .RunChangesAsync<JObject>(conn, ct);
- Just before
TID:M
sends the query, anAwaiter
(TaskCompletionSource) is createdAwaiter:HashCode:52638671
to process the response, but also registersct
(this is where the problem might be, more on this later) : - There's nothing left to do, so
TID:M
bottoms out andTID:M
returns fromRunChanges()
thenTID:M
awaitsTask.Delay(500)
. - At this point, as the delay is happening,
TID:M
is returned to the thread pool.
-
TID:B
detects network data has arrived. A server response is waiting to be read. -
TID:B
reads the JSON protocol string, aResponse
object is created. -
TID:B
identifies theAwaiter
(TaskCompletionSource) that is currently awaiting the response. -
TID:B
creates a taskTID:1
to further process the rawResponse
object.
RethinkDb.Driver/Source/RethinkDb.Driver/Net/SocketWrapper.cs
Lines 230 to 243 in 0447498
-
TID:1
callsTrySetResult(response)
and resolves the awaiting task.Awaiter:HashCode:52638671
task is now complete with a resonse object. -
TID:1
picks up whereTID:M
left off at the firstawait
continuation here:RethinkDb.Driver/Source/RethinkDb.Driver/Net/Connection.cs
Lines 269 to 273 in 0447498
-
TID:1
creates a newCursor<T>
.- During the construction of the
new Cursor<T>()
aQuery.Continue
continuation is sent over the network. This continuation also registers a newAwaiter
to process the response from the cursor continuation.
- During the construction of the
-
TID:1
returns thenew Cusror<T>
fully constructed. -
TID:1
resumes the next await continuation here:changes = await R.Db("rethinkdb").Table("jobs") .Changes().OptArg("include_initial", "true") .RunChangesAsync<JObject>(conn, ct); // TID:1 resumes here with a newly constucted Cursor<T> in 'changes' while (await changes.MoveNextAsync(ct)) { Console.WriteLine("RunChanges: got a change"); }
-
TID:1
loops overMoveNextAsync(ct)
the first-time. Data already exists from the initial response and doesn't await the underlying cursor's in-flight continuation. Immediately returns data."RunChanges: got a change"
is printed. -
TID:1
loops overMoveNextAsync(ct)
a second time. However, no data is available; soTID:1
creates aCancellableTask
usingct
and awaits any event that occurs (either when data arrives from the in-flight continuation or when the cancellation token is canceled):RethinkDb.Driver/Source/RethinkDb.Driver/Net/Cursor.cs
Lines 160 to 168 in 0447498
-
TID:1
is returned to the thread pool because there's nothing left to do.
await Task.Delay(500)
wakes up, the thread resuming execution after the await is nowTID:2
.TID:2
prints"Cancelling task"
.TID:2
invokescts.Cancel();
. The.Cancel()
call causescancellationToken.Register
registrations to execute. Remember, back when we registered a cleanup from the first initial.RunChangesAsync<JObject>(conn, ct);
query here:
This causesTID:2
to invokeAwaiter:HashCode:52638671:OnCancellation()
to execute, however, thisAwaiter:HashCode:52638671
task is already completed from initial.TrySetResult(response)
; and invoking.SetCancelled()
on an already completed task from.RunChangesAsync<JObject>(conn, ct);
won't fly:RethinkDb.Driver/Source/RethinkDb.Driver/Utils/CancellableTask.cs
Lines 22 to 25 in 0447498
and I think this is the reason for the exception:
System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
at System.Threading.Tasks.TaskCompletionSource`1.SetCanceled()
at RethinkDb.Driver.Utils.CancellableTask.OnCancellation() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver\Utils\CancellableTask.cs:line 26
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.CancellationCallbackInfo.ExecuteCallback()
at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
--- End of inner exception stack trace ---
at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
at System.Threading.CancellationTokenSource.NotifyCancellation(Boolean throwOnFirstException)
at System.Threading.CancellationTokenSource.Cancel()
at RethinkDb.Driver.Tests.GitHubIssues.Issue146.<Test>d__1.MoveNext() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver.Tests\GitHubIssues\Issue146.cs:line 34
---> (Inner Exception #0) System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
at System.Threading.Tasks.TaskCompletionSource`1.SetCanceled()
at RethinkDb.Driver.Utils.CancellableTask.OnCancellation() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver\Utils\CancellableTask.cs:line 26
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.CancellationCallbackInfo.ExecuteCallback()
at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)<---
Suffice to say, if you just use ct
once, the problem should go away. The code below removes the ct
parameter from .RunChangesAsync(conn)
:
changes = await R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn); <<<-- REMOVE ct HERE
while (await changes.MoveNextAsync(ct)) <<<-- KEEP ct HERE
{
Console.WriteLine("RunChanges: got a change");
}
I'm not sure what the best practice should be here, but my gut feeling is you should be able to pass ct
around without being signaled in as many places as you need. We'll probably have to figure out how to clean up those already completed tasks in Awatier
s and figure out how to unregister cancelToken.Register(OnCancellation
registrations once .TrySetResult(response)
has been called.
If you have any feedback, suggestions or ideas, please let me know your thoughts.
Thanks,
Brian
from rethinkdb.driver.
Thanks for the detailed response! That is a very easy to follow analysis.
my gut feeling is you should be able to pass ct around without being signaled in as many places as you need
I do agree with you here. It seems like Microsoft does too, at least based on their code samples.
I can't say how often this use case is truly necessary in Rethink client code. In my test case, it doesn't make a big difference if I have to wait for the relatively short RunChangesAsync
to finish before the indefinitely looping changefeed receives the cancellation. I don't think this issue will cause any pain now that I know how to avoid it.
I'm afraid I won't be of much help actually coming up with a clean solution for this, but please do let me know if there's anything I can do to help.
Thanks again for your time!
from rethinkdb.driver.
No problem. Thank you for reporting the issue. After some time sleeping on the problem, I think I have a decent solution that will fix the issue.
I should have a new release later today. However, I just want to take a little more time today to look at Microsoft's CoreFX sources dotnet/corefx and see if I can find better disposable registration CancellationToken
patterns. I skimmed through a few books yesterday and they weren't that much help.
from rethinkdb.driver.
A new version of RethinkDb.Driver v2.3.150
is now released that contains the fix.
Let me know if it works for you.
Thanks,
Brian
from rethinkdb.driver.
Looks great to me!
Thanks,
Evan
from rethinkdb.driver.
Related Issues (20)
- Create a method to return result as raw json HOT 4
- Resolution function on OptArg insert conflict HOT 1
- ConnectionPool.Builder.ConnectAsync does not respect InitialTimeout() HOT 4
- Add Error codes to Exceptions HOT 2
- Object's primary key and key of RunAtom() is different (Client-side generation). HOT 4
- Support for System.Text.Json.JsonDocument
- Cursor<T> should implement IAsyncEnumerable<T> in .net core 3
- RethinkDB 2.4 Release
- Best options: RunResult, RunAtom or RunCursor HOT 3
- Get list from Table that contains 'A' in the 'message' field HOT 1
- ASL 2.0 HOT 1
- Implement lambda functions as a possible argument in Insert().OptArgs("conflict",...) HOT 1
- System.NullReferenceException HOT 1
- Add EntityFramework support to Linq provider HOT 5
- Polymorphism HOT 1
- Various exceptions when updating a document HOT 1
- System.Configuration.ConfigurationErrorsException: HOT 1
- Wrong OptArg serialisation after OrderBy
- Insert not working with RunNoReply & Can't use runOpts is always null -- solved
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rethinkdb.driver.