Giter VIP home page Giter VIP logo

Comments (6)

bchavez avatar bchavez commented on June 15, 2024

Thank you very much for the detailed information. Give me a chance to review the information and I'll get back to you.

from rethinkdb.driver.

bchavez avatar bchavez commented on June 15, 2024

Hi @EvanSchiewe-Entegra,

So, after some study, I think the issue arises because the CancellationToken is used twice. See below:

async void RunChanges(CancellationToken ct) 
{
   Console.WriteLine("RunChanges: called");
   Cursor<RethinkDb.Driver.Model.Change<JObject>> changes = null;
   try 
   {
      changes = await R.Db("rethinkdb").Table("jobs")
                      .Changes().OptArg("include_initial", "true")
                      .RunChangesAsync<JObject>(conn, ct); <<<-- ONCE HERE
                  
      while (await changes.MoveNextAsync(ct)) <<<-- TWICE HERE
      {
         Console.WriteLine("RunChanges: got a change");
      }
   }
   catch(OperationCanceledException) 
   {
      Console.WriteLine("RunChanges: op canceled");
   }
   finally 
   {
      Console.WriteLine("RunChanges: finally");
      changes?.Close();
      Console.WriteLine("RunChanges: changes cursor closed");
   }
   Console.WriteLine("RunChanges: returning");
}
  • Once, here: .RunChangesAsync<JObject>(conn, ct);
  • Twice, here: while (await changes.MoveNextAsync(ct))

As far as I can tell, here's what happens:

  • The Main() thread (TID:M) starts up.
  • A connection to the server is established.
    • Background response pump thread (TID:B) is created and blocks on the receive side of the network socket.
  • TID:M calls RunChanges.
  • TID:M creates and sends the query over the network:
    R.Db("rethinkdb").Table("jobs")
       .Changes().OptArg("include_initial", "true")
       .RunChangesAsync<JObject>(conn, ct);
  • Just before TID:M sends the query, an Awaiter (TaskCompletionSource) is created Awaiter:HashCode:52638671 to process the response, but also registers ct (this is where the problem might be, more on this later) :
    this.registration = this.cancelToken.Register(OnCancellation, false);
  • There's nothing left to do, so TID:M bottoms out and TID:M returns from RunChanges() then TID:M awaits Task.Delay(500).
  • At this point, as the delay is happening, TID:M is returned to the thread pool.

  • TID:B detects network data has arrived. A server response is waiting to be read.

  • TID:B reads the JSON protocol string, a Response object is created.

  • TID:B identifies the Awaiter (TaskCompletionSource) that is currently awaiting the response.

  • TID:B creates a task TID:1 to further process the raw Response object.

    var response = this.Read();
    Awaiter awaitingTask;
    if( awaiters.TryRemove(response.Token, out awaitingTask) )
    {
    Task.Run(() =>
    {
    //try, because it's possible
    //the awaiting task was canceled.
    if( !awaitingTask.TrySetResult(response) )
    {
    Log.Debug($"Response Pump: The awaiter waiting for response token {response.Token} could not be set. The task was probably canceled.");
    }
    });
    }

  • TID:1 calls TrySetResult(response) and resolves the awaiting task. Awaiter:HashCode:52638671 task is now complete with a resonse object.

  • TID:1 picks up where TID:M left off at the first await continuation here:

    var res = await SendQuery(query, cancelToken, awaitResponse: true).ConfigureAwait(false);
    if( res.IsPartial || res.IsSequence )
    {
    return new Cursor<T>(this, query, res);
    }

  • TID:1 creates a new Cursor<T>.

    • During the construction of the new Cursor<T>() a Query.Continue continuation is sent over the network. This continuation also registers a new Awaiter to process the response from the cursor continuation.
  • TID:1 returns the new Cusror<T> fully constructed.

  • TID:1 resumes the next await continuation here:

    changes = await R.Db("rethinkdb").Table("jobs")
                          .Changes().OptArg("include_initial", "true")
                          .RunChangesAsync<JObject>(conn, ct);
    // TID:1 resumes here with a newly constucted Cursor<T> in 'changes'
          while (await changes.MoveNextAsync(ct))
          {
             Console.WriteLine("RunChanges: got a change");
          }
  • TID:1 loops over MoveNextAsync(ct) the first-time. Data already exists from the initial response and doesn't await the underlying cursor's in-flight continuation. Immediately returns data. "RunChanges: got a change" is printed.

  • TID:1 loops over MoveNextAsync(ct) a second time. However, no data is available; so TID:1 creates a CancellableTask using ct and awaits any event that occurs (either when data arrives from the in-flight continuation or when the cancellation token is canceled):

    using( var cancelTask = new CancellableTask(cancelToken) )
    {
    //now await on either task, pending or the cancellation of the CancellableTask.
    await Task.WhenAny(this.pendingContinue, cancelTask.Task).ConfigureAwait(false);
    //if it was the cancelTask that triggered the continuation... throw if requested.
    cancelToken.ThrowIfCancellationRequested();
    //else, no cancellation was requested.
    //we can proceed by processing the results we awaited for.
    } //ensure the disposal of the cancelToken registration upon exiting scope

  • TID:1 is returned to the thread pool because there's nothing left to do.


  • await Task.Delay(500) wakes up, the thread resuming execution after the await is now TID:2.
  • TID:2 prints "Cancelling task".
  • TID:2 invokes cts.Cancel();. The .Cancel() call causes cancellationToken.Register registrations to execute. Remember, back when we registered a cleanup from the first initial .RunChangesAsync<JObject>(conn, ct); query here:
    this.registration = this.cancelToken.Register(OnCancellation, false);

    This causes TID:2 to invoke Awaiter:HashCode:52638671:OnCancellation() to execute, however, this Awaiter:HashCode:52638671 task is already completed from initial .TrySetResult(response); and invoking .SetCancelled() on an already completed task from .RunChangesAsync<JObject>(conn, ct); won't fly:
    private void OnCancellation()
    {
    this.SetCanceled();
    }

    and I think this is the reason for the exception:

System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.

System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
   at System.Threading.Tasks.TaskCompletionSource`1.SetCanceled()
   at RethinkDb.Driver.Utils.CancellableTask.OnCancellation() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver\Utils\CancellableTask.cs:line 26
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.CancellationCallbackInfo.ExecuteCallback()
   at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
   --- End of inner exception stack trace ---
   at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
   at System.Threading.CancellationTokenSource.NotifyCancellation(Boolean throwOnFirstException)
   at System.Threading.CancellationTokenSource.Cancel()
   at RethinkDb.Driver.Tests.GitHubIssues.Issue146.<Test>d__1.MoveNext() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver.Tests\GitHubIssues\Issue146.cs:line 34
---> (Inner Exception #0) System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
   at System.Threading.Tasks.TaskCompletionSource`1.SetCanceled()
   at RethinkDb.Driver.Utils.CancellableTask.OnCancellation() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver\Utils\CancellableTask.cs:line 26
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.CancellationCallbackInfo.ExecuteCallback()
   at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)<---

Suffice to say, if you just use ct once, the problem should go away. The code below removes the ct parameter from .RunChangesAsync(conn):

changes = await R.Db("rethinkdb").Table("jobs")
                .Changes().OptArg("include_initial", "true")
                .RunChangesAsync<JObject>(conn); <<<-- REMOVE ct HERE
            
while (await changes.MoveNextAsync(ct)) <<<-- KEEP ct HERE
{
   Console.WriteLine("RunChanges: got a change");
}

I'm not sure what the best practice should be here, but my gut feeling is you should be able to pass ct around without being signaled in as many places as you need. We'll probably have to figure out how to clean up those already completed tasks in Awatiers and figure out how to unregister cancelToken.Register(OnCancellation registrations once .TrySetResult(response) has been called.

If you have any feedback, suggestions or ideas, please let me know your thoughts.

Thanks,
Brian

from rethinkdb.driver.

EvanSchiewe-Entegra avatar EvanSchiewe-Entegra commented on June 15, 2024

Thanks for the detailed response! That is a very easy to follow analysis.

my gut feeling is you should be able to pass ct around without being signaled in as many places as you need

I do agree with you here. It seems like Microsoft does too, at least based on their code samples.

I can't say how often this use case is truly necessary in Rethink client code. In my test case, it doesn't make a big difference if I have to wait for the relatively short RunChangesAsync to finish before the indefinitely looping changefeed receives the cancellation. I don't think this issue will cause any pain now that I know how to avoid it.

I'm afraid I won't be of much help actually coming up with a clean solution for this, but please do let me know if there's anything I can do to help.

Thanks again for your time!

from rethinkdb.driver.

bchavez avatar bchavez commented on June 15, 2024

No problem. Thank you for reporting the issue. After some time sleeping on the problem, I think I have a decent solution that will fix the issue.

I should have a new release later today. However, I just want to take a little more time today to look at Microsoft's CoreFX sources dotnet/corefx and see if I can find better disposable registration CancellationToken patterns. I skimmed through a few books yesterday and they weren't that much help.

from rethinkdb.driver.

bchavez avatar bchavez commented on June 15, 2024

Hi @EvanSchiewe-Entegra,

A new version of RethinkDb.Driver v2.3.150 is now released that contains the fix.

Let me know if it works for you.

Thanks,
Brian

from rethinkdb.driver.

EvanSchiewe-Entegra avatar EvanSchiewe-Entegra commented on June 15, 2024

Looks great to me!

Thanks,
Evan

from rethinkdb.driver.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.