Giter VIP home page Giter VIP logo

fsharp.control.taskseq's Introduction

build test Nuget

TaskSeq

An implementation of IAsyncEnumerable<'T> as a computation expression: taskSeq { ... } with an accompanying TaskSeq module and functions, that allow seamless use of asynchronous sequences similar to F#'s native seq and task CE's.

Release notes

See Releases for the an extensive version history of TaskSeq. See Status overview below for a progress report.


Table of contents


Overview

The IAsyncEnumerable interface was added to .NET in .NET Core 3.0 and is part of .NET Standard 2.1. The main use-case was for iterative asynchronous, sequential enumeration over some resource. For instance, an event stream or a REST API interface with pagination, asynchronous reading over a list of files and accumulating the results, where each action can be modeled as a MoveNextAsync call on the IAsyncEnumerator<'T> given by a call to GetAsyncEnumerator().

Since the introduction of task in F# the call for a native implementation of task sequences has grown, in particular because proper iteration over an IAsyncEnumerable has proven challenging, especially if one wants to avoid mutable variables. This library is an answer to that call and applies the same resumable state machine approach with taskSeq.

Module functions

As with seq and Seq, this library comes with a bunch of well-known collection functions, like TaskSeq.empty, isEmpty or TaskSeq.map, iter, collect, fold and TaskSeq.find, pick, choose, filter, takeWhile. Where applicable, these come with async variants, like TaskSeq.mapAsync iterAsync, collectAsync, foldAsync and TaskSeq.findAsync, pickAsync, chooseAsync, filterAsync, takeWhileAsync which allows the applied function to be asynchronous.

See below for a full list of currently implemented functions and their variants.

taskSeq computation expressions

The taskSeq computation expression can be used just like using seq. Additionally, it adds support for working with Tasks through let! and looping over both normal and asynchronous sequences (ones that implement IAsyncEnumerable<'T>'). You can use yield! and yield and there's support for use and use!, try-with and try-finally and while loops within the task sequence expression:

Installation

Dotnet Nuget

dotnet add package FSharp.Control.TaskSeq

For a specific project:

dotnet add myproject.fsproj package FSharp.Control.TaskSeq

F# Interactive (FSI):

// latest version
> #r "nuget: FSharp.Control.TaskSeq"

// or with specific version
> #r "nuget: FSharp.Control.TaskSeq, 0.4.0"

Paket:

dotnet paket add FSharp.Control.TaskSeq --project <project>

Package Manager:

PM> NuGet\Install-Package FSharp.Control.TaskSeq

As package reference in fsproj or csproj file:

<!-- replace version with most recent version -->
<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0" />

Examples

open System.IO
open FSharp.Control

// singleton is fine
let helloTs = taskSeq { yield "Hello, World!" }

// cold-started, that is, delay-executed
let f() = task {
    // using toList forces execution of whole sequence
    let! hello = TaskSeq.toList helloTs  // toList returns a Task<'T list>
    return List.head hello
}

// can be mixed with normal sequences
let oneToTen = taskSeq { yield! [1..10] }

// can be used with F#'s task and async in a for-loop
let f() = task { for x in oneToTen do printfn "Number %i" x }
let g() = async { for x in oneToTen do printfn "Number %i" x }

// returns a delayed sequence of IAsyncEnumerable<string>
let allFilesAsLines() = taskSeq {
    let files = Directory.EnumerateFiles(@"c:\temp")
    for file in files do
        // await
        let! contents = File.ReadAllLinesAsync file
        // return all lines
        yield! contents
}

let write file =
    allFilesAsLines()

    // synchronous map function on asynchronous task sequence
    |> TaskSeq.map (fun x -> x.Replace("a", "b"))

    // asynchronous map
    |> TaskSeq.mapAsync (fun x -> task { return "hello: " + x })

    // asynchronous iter
    |> TaskSeq.iterAsync (fun data -> File.WriteAllTextAsync(fileName, data))


// infinite sequence
let feedFromTwitter user pwd = taskSeq {
    do! loginToTwitterAsync(user, pwd)
    while true do
       let! message = getNextNextTwitterMessageAsync()
       yield message
}

Choosing between AsyncSeq and TaskSeq

The AsyncSeq and TaskSeq libraries both operate on asynchronous sequences, but there are a few fundamental differences. The most notable being that the former does not implement IAsyncEnumerable<'T>, though it does have a type of that name with different semantics (not surprising; it predates the definition of the modern one). Another key difference is that TaskSeq uses ValueTasks for the asynchronous computations, whereas AsyncSeq uses F#'s Async<'T>.

There are more differences:

TaskSeq AsyncSeq
Frameworks .NET 5.0+, NetStandard 2.1 .NET 5.0+, NetStandard 2.0 and 2.1, .NET Framework 4.6.1+
F# concept of task async
Underlying type Generic.IAsyncEnumerable<'T> note #1 Its own type, also called IAsyncEnumerable<'T>note #1
Implementation State machine (statically compiled) No state machine, continuation style
Semantics seq-like: on-demand seq-like: on-demand
Disposability Asynchronous, through IAsyncDisposable Synchronous, through IDisposable
Support let! All task-like: Async<'T>, Task<'T>, ValueTask<'T> or any GetAwaiter() Async<'T> only
Support do! Async<unit>, Task<unit> and Task, ValueTask<unit> and ValueTask Async<unit> only
Support yield! IAsyncEnumerable<'T> (= TaskSeq), AsyncSeq, any sequence AsyncSeq
Support for IAsyncEnumerable<'T> (= TaskSeq), AsyncSeq, any sequence AsyncSeq, any sequence
Behavior with yield Zero allocations; no Task or even ValueTask created Allocates an F# Async wrapped in a singleton AsyncSeq
Conversion to other TaskSeq.toAsyncSeq AsyncSeq.toAsyncEnum
Conversion from other Implicit (yield!) or TaskSeq.ofAsyncSeq AsyncSeq.ofAsyncEnum
Recursion in yield! No (requires F# support, upcoming) Yes
Iteration semantics Two operations, 'Next' is a value task, 'Current' must be called separately One operation, 'Next' is Async, returns option with 'Current'
MoveNextAsync Returns ValueTask<bool> Returns Async<'T option>
Current Returns 'T n/a
Cancellation See #133, until 0.3.0: use GetAsyncEnumerator(cancelToken) Implicit token flows to all subtasks per async semantics
Performance Very high, negligible allocations Slower, more allocations, due to using async and cont style
Parallelism Unclear, interface is meant for sequential/async processing Supported by extension functions

¹⁾ Both AsyncSeq and TaskSeq use a type called IAsyncEnumerable<'T>, but only TaskSeq uses the type from the BCL Generic Collections. AsyncSeq supports .NET Framework 4.6.x and NetStandard 2.0 as well, which do not have this type in the BCL.

Status & planning

The TaskSeq project already has a wide array of functions and functionalities, see overview below. The current status is: STABLE. However, certain features we'd really like to add:

  • Take existing taskSeq resumable code from F# and fix it. DONE
  • Add almost all functions from Seq that could apply to TaskSeq (full overview below). MOSTLY DONE, STILL TODO
  • Add remaining relevant functions from Seq. PLANNED FOR 0.4.x
    • min / max / minBy / maxBy & async variant (see #221)
    • insertAt / updateAt and related (see #236)
    • average / averageBy, sum and related
    • forall / forallAsync (see #240)
    • skip / drop / truncate / take (see #209)
    • chunkBySize / windowed
    • compareWith
    • distinct
    • exists2 / map2 / fold2 / iter2 and related '2'-functions
    • mapFold
    • pairwise / allpairs / permute / distinct / distinctBy
    • replicate
    • reduce / scan
    • unfold
  • Publish package on Nuget, DONE, PUBLISHED SINCE: 7 November 2022. See https://www.nuget.org/packages/FSharp.Control.TaskSeq
  • Make TaskSeq interoperable with Task by expanding the latter with a for .. in .. do that acceps task sequences
  • Add to/from functions to seq, list, array
  • Add applicable functions from AsyncSeq. PLANNED FOR 0.5-alpha
  • (Better) support for cancellations
    • Make the tasks cancellable with token (see #133). PLANNED FOR 0.5-alpha
    • Support ConfiguredCancelableAsyncEnumerable (see #167). PLANNED FOR 0.5-alpha
    • Interop with cancellableTask and valueTask from IcedTasks
  • Interop with AsyncSeq.
  • (maybe) Support any awaitable type in the function lib (that is: where a Task is required, accept a ValueTask and Async as well)
  • Add TaskEx functionality (separate lib). DISCUSSION
  • Move documentation to https://fsprojects.github.io

Implementation progress

Nuget

Progress taskSeq CE

The resumable state machine backing the taskSeq CE is now finished and restartability (not to be confused with resumability) has been implemented and stabilized. Full support for empty task sequences is done. Focus is now on adding functionality there, like adding more useful overloads for yield and let!. Suggestions are welcome!.

Progress and implemented TaskSeq module functions

We are working hard on getting a full set of module functions on TaskSeq that can be used with IAsyncEnumerable sequences. Our guide is the set of F# Seq functions in F# Core and, where applicable, the functions provided by AsyncSeq. Each implemented function is documented through XML doc comments to provide the necessary context-sensitive help.

This is what has been implemented so far, is planned or skipped:

Done Seq TaskSeq Variants Remarks
allPairs allPairs note #1
#81 append append
#81 appendSeq
#81 prependSeq
average average
averageBy averageBy averageByAsync
cache cache note #1
#67 cast cast
#67 box
#67 unbox
#23 choose choose chooseAsync
chunkBySize chunkBySize
#11 collect collect collectAsync
#11 collectSeq collectSeqAsync
compareWith compareWith compareWithAsync
#69 concat concat
#237 concat (list) concat (list)
#237 concat (array) concat (array)
#237 concat (r-array) concat (r-array)
#237 concat (seq) concat (seq)
#70 contains contains
#82 delay delay
distinct distinct
distinctBy dictinctBy distinctByAsync
#209 drop
#2 empty empty
#23 exactlyOne exactlyOne
#83 except except
#83 exceptOfSeq
#70 exists exists existsAsync
exists2 exists2
#23 filter filter filterAsync
#23 find find findAsync
🚫 findBack note #2
#68 findIndex findIndex findIndexAsync
🚫 findIndexBack n/a n/a note #2
#2 fold fold foldAsync
fold2 fold2 fold2Async
🚫 foldBack note #2
🚫 foldBack2 note #2
#240 forall forall forallAsync
forall2 forall2 forall2Async
groupBy groupBy groupByAsync note #1
#23 head head
#68 indexed indexed
#69 init init initAsync
#69 initInfinite initInfinite initInfiniteAsync
#236 insertAt insertAt
#236 insertManyAt insertManyAt
#23 isEmpty isEmpty
#23 item item
#2 iter iter iterAsync
iter2 iter2 iter2Async
#2 iteri iteri iteriAsync
iteri2 iteri2 iteri2Async
#23 last last
#53 length length
#53 lengthBy lengthByAsync
#2 map map mapAsync
map2 map2 map2Async
map3 map3 map3Async
mapFold mapFold mapFoldAsync
🚫 mapFoldBack note #2
#2 mapi mapi mapiAsync
mapi2 mapi2 mapi2Async
#221 max max
#221 maxBy maxBy maxByAsync
#221 min min
#221 minBy minBy minByAsync
#2 ofArray ofArray
#2 ofAsyncArray
#2 ofAsyncList
#2 ofAsyncSeq
#2 ofList ofList
#2 ofTaskList
#2 ofResizeArray
#2 ofSeq
#2 ofTaskArray
#2 ofTaskList
#2 ofTaskSeq
pairwise pairwise
permute permute permuteAsync
#23 pick pick pickAsync
🚫 readOnly note #3
reduce reduce reduceAsync
🚫 reduceBack note #2
#236 removeAt removeAt
#236 removeManyAt removeManyAt
replicate replicate
rev note #1
scan scan scanAsync
🚫 scanBack note #2
#90 singleton singleton
#209 skip skip
#219 skipWhile skipWhile skipWhileAsync
#219 skipWhileInclusive skipWhileInclusiveAsync
sort note #1
sortBy note #1
sortByAscending note #1
sortByDescending note #1
sortWith note #1
splitInto splitInto
sum sum
sumBy sumBy sumByAsync
#76 tail tail
#209 take take
#126 takeWhile takeWhile takeWhileAsync
#126 takeWhileInclusive takeWhileInclusiveAsync
#2 toArray toArray toArrayAsync
#2 toIList toIListAsync
#2 toList toList toListAsync
#2 toResizeArray toResizeArrayAsync
#2 toSeq toSeqAsync
[…]
transpose note #1
#209 truncate truncate
#23 tryExactlyOne tryExactlyOne tryExactlyOneAsync
#23 tryFind tryFind tryFindAsync
🚫 tryFindBack note #2
#68 tryFindIndex tryFindIndex tryFindIndexAsync
🚫 tryFindIndexBack note #2
#23 tryHead tryHead
#23 tryItem tryItem
#23 tryLast tryLast
#23 tryPick tryPick tryPickAsync
#76 tryTail
unfold unfold unfoldAsync
#236 updateAt updateAt
#217 where where whereAsync
windowed windowed
#2 zip zip
zip3 zip3
zip4

¹⁾ These functions require a form of pre-materializing through TaskSeq.cache, similar to the approach taken in the corresponding Seq functions. It doesn't make much sense to have a cached async sequence. However, AsyncSeq does implement these, so we'll probably do so eventually as well. ²⁾ Because of the async nature of TaskSeq sequences, iterating from the back would be bad practice. Instead, materialize the sequence to a list or array and then apply the xxxBack iterators. ³⁾ The motivation for readOnly in Seq is that a cast from a mutable array or list to a seq<_> is valid and can be cast back, leading to a mutable sequence. Since TaskSeq doesn't implement IEnumerable<_>, such casts are not possible.

More information

Further reading on IAsyncEnumerable

  • A good C#-based introduction can be found in this blog.
  • An MSDN article written shortly after it was introduced.
  • Converting a seq to an IAsyncEnumerable demo gist as an example, though TaskSeq contains many more utility functions and uses a slightly different approach.

Further reading on resumable state machines

Further reading on computation expressions

Building & testing

TLDR: just run build. Or load the sln file in Visual Studio or VS Code and compile.

Prerequisites

At the very least, to get the source to compile, you'll need:

  • .NET 6 or .NET 7 Preview
  • F# 6.0 or 7.0 compiler
  • To use build.cmd, the dotnet command must be accessible from your path.

Just check-out this repo locally. Then, from the root of the repo, you can do:

Build the solution

build [build] [release|debug]

With no arguments, defaults to release.

Run the tests

build test [release|debug]

With no arguments, defaults to release. By default, all tests are output to the console. If you don't want that, you can use --logger console;verbosity=summary. Furthermore, no TRX file is generated and the --blame-xxx flags aren't set.

Run the CI command

build ci [release|debug]

With no arguments, defaults to release. This will run dotnet test with the --blame-xxx settings enabled to prevent hanging tests caused by an xUnit runner bug.

There are no special CI environment variables that need to be set for running this locally.

Advanced

You can pass any additional options that are valid for dotnet test and dotnet build respectively. However, these cannot be the very first argument, so you should either use build build --myadditionalOptions fizz buzz, or just specify the build-kind, i.e. this is fine:

build debug --verbosity detailed
build test --logger console;verbosity=summary

At this moment, additional options cannot have quotes in them.

Command modifiers, like release and debug, can be specified with - or / if you so prefer: dotnet build /release.

Get help

build help

For more info, see this PR: #29.

Work in progress

The taskSeq CE using the statically compilable resumable state machine approach is based on, and draw heavily from Don Symes taskSeq.fs as used to test the resumable state machine in the F# core compiler.

On top of that, this library adds a set of TaskSeq module functions, with their Async variants, on par with Seq and AsyncSeq.

Current set of TaskSeq utility functions

The following are the current surface area of the TaskSeq utility functions, ordered alphabetically.

module TaskSeq =
    val append: source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> TaskSeq<'T>
    val appendSeq: source1: TaskSeq<'T> -> source2: seq<'T> -> TaskSeq<'T>
    val box: source: TaskSeq<'T> -> TaskSeq<obj>
    val cast: source: TaskSeq<obj> -> TaskSeq<'T>
    val choose: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val collect: binder: ('T -> #TaskSeq<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val collectAsync: binder: ('T -> #Task<'TSeqU>) -> source: TaskSeq<'T> -> TaskSeq<'U> when 'TSeqU :> TaskSeq<'U>
    val collectSeq: binder: ('T -> #seq<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val collectSeqAsync: binder: ('T -> #Task<'SeqU>) -> source: TaskSeq<'T> -> TaskSeq<'U> when 'SeqU :> seq<'U>
    val concat: sources: TaskSeq<#TaskSeq<'T>> -> TaskSeq<'T>
    val concat: sources: TaskSeq<'T seq> -> TaskSeq<'T>
    val concat: sources: TaskSeq<'T list> -> TaskSeq<'T>
    val concat: sources: TaskSeq<'T array> -> TaskSeq<'T>
    val concat: sources: TaskSeq<ResizeArray<'T>> -> TaskSeq<'T>
    val contains<'T when 'T: equality> : value: 'T -> source: TaskSeq<'T> -> Task<bool>
    val delay: generator: (unit -> TaskSeq<'T>) -> TaskSeq<'T>
    val drop: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
    val empty<'T> : TaskSeq<'T>
    val exactlyOne: source: TaskSeq<'T> -> Task<'T>
    val except<'T when 'T: equality> : itemsToExclude: TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
    val exceptOfSeq<'T when 'T: equality> : itemsToExclude: seq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
    val exists: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<bool>
    val existsAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<bool>
    val filter: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
    val filterAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
    val find: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<'T>
    val findAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<'T>
    val findIndex: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<int>
    val findIndexAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<int>
    val fold: folder: ('State -> 'T -> 'State) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>
    val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>
    val forall: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<bool>
    val forallAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<bool>
    val head: source: TaskSeq<'T> -> Task<'T>
    val indexed: source: TaskSeq<'T> -> TaskSeq<int * 'T>
    val init: count: int -> initializer: (int -> 'T) -> TaskSeq<'T>
    val initAsync: count: int -> initializer: (int -> #Task<'T>) -> TaskSeq<'T>
    val initInfinite: initializer: (int -> 'T) -> TaskSeq<'T>
    val initInfiniteAsync: initializer: (int -> #Task<'T>) -> TaskSeq<'T>
    val insertAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
    val insertManyAt: position:int -> values:TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
    val isEmpty: source: TaskSeq<'T> -> Task<bool>
    val item: index: int -> source: TaskSeq<'T> -> Task<'T>
    val iter: action: ('T -> unit) -> source: TaskSeq<'T> -> Task<unit>
    val iterAsync: action: ('T -> #Task<unit>) -> source: TaskSeq<'T> -> Task<unit>
    val iteri: action: (int -> 'T -> unit) -> source: TaskSeq<'T> -> Task<unit>
    val iteriAsync: action: (int -> 'T -> #Task<unit>) -> source: TaskSeq<'T> -> Task<unit>
    val last: source: TaskSeq<'T> -> Task<'T>
    val length: source: TaskSeq<'T> -> Task<int>
    val lengthBy: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<int>
    val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<int>
    val lengthOrMax: max: int -> source: TaskSeq<'T> -> Task<int>
    val map: mapper: ('T -> 'U) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val mapAsync: mapper: ('T -> #Task<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val mapi: mapper: (int -> 'T -> 'U) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val mapiAsync: mapper: (int -> 'T -> #Task<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U>
    val max: source: TaskSeq<'T> -> Task<'T> when 'T: comparison
    val max: source: TaskSeq<'T> -> Task<'T> when 'T: comparison
    val maxBy: projection: ('T -> 'U) -> source: TaskSeq<'T> -> Task<'T> when 'U: comparison
    val minBy: projection: ('T -> 'U) -> source: TaskSeq<'T> -> Task<'T> when 'U: comparison
    val maxByAsync: projection: ('T -> #Task<'U>) -> source: TaskSeq<'T> -> Task<'T> when 'U: comparison
    val minByAsync: projection: ('T -> #Task<'U>) -> source: TaskSeq<'T> -> Task<'T> when 'U: comparison    val ofArray: source: 'T[] -> TaskSeq<'T>
    val ofAsyncArray: source: Async<'T> array -> TaskSeq<'T>
    val ofAsyncList: source: Async<'T> list -> TaskSeq<'T>
    val ofAsyncSeq: source: seq<Async<'T>> -> TaskSeq<'T>
    val ofList: source: 'T list -> TaskSeq<'T>
    val ofResizeArray: source: ResizeArray<'T> -> TaskSeq<'T>
    val ofSeq: source: seq<'T> -> TaskSeq<'T>
    val ofTaskArray: source: #Task<'T> array -> TaskSeq<'T>
    val ofTaskList: source: #Task<'T> list -> TaskSeq<'T>
    val ofTaskSeq: source: seq<#Task<'T>> -> TaskSeq<'T>
    val pick: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> Task<'U>
    val pickAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> Task<'U>
    val prependSeq: source1: seq<'T> -> source2: TaskSeq<'T> -> TaskSeq<'T>
    val removeAt: position:int -> source: TaskSeq<'T> -> TaskSeq<'T>
    val removeManyAt: position:int -> count:int -> source: TaskSeq<'T> -> TaskSeq<'T>
    val singleton: source: 'T -> TaskSeq<'T>
    val skip: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
    val tail: source: TaskSeq<'T> -> Task<TaskSeq<'T>>
    val take: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
    val takeWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<TaskSeq<'T>>
    val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<TaskSeq<'T>>
    val takeWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<TaskSeq<'T>>
    val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<TaskSeq<'T>>
    val toArray: source: TaskSeq<'T> -> 'T[]
    val toArrayAsync: source: TaskSeq<'T> -> Task<'T[]>
    val toIListAsync: source: TaskSeq<'T> -> Task<IList<'T>>
    val toList: source: TaskSeq<'T> -> 'T list
    val toListAsync: source: TaskSeq<'T> -> Task<'T list>
    val toResizeArrayAsync: source: TaskSeq<'T> -> Task<ResizeArray<'T>>
    val toSeq: source: TaskSeq<'T> -> seq<'T>
    val truncate: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
    val tryExactlyOne: source: TaskSeq<'T> -> Task<'T option>
    val tryFind: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<'T option>
    val tryFindAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<'T option>
    val tryFindIndex: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<int option>
    val tryFindIndexAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<int option>
    val tryHead: source: TaskSeq<'T> -> Task<'T option>
    val tryItem: index: int -> source: TaskSeq<'T> -> Task<'T option>
    val tryLast: source: TaskSeq<'T> -> Task<'T option>
    val tryPick: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> Task<'U option>
    val tryPickAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> Task<'U option>
    val tryTail: source: TaskSeq<'T> -> Task<TaskSeq<'T> option>
    val where: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
    val whereAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
    val unbox<'U when 'U: struct> : source: TaskSeq<obj> -> TaskSeq<'U>
    val updateAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
    val zip: source1: TaskSeq<'T> -> source2: TaskSeq<'U> -> TaskSeq<'T * 'U>

fsharp.control.taskseq's People

Contributors

abelbraaksma avatar bartelink avatar dependabot[bot] avatar dsyme avatar gusty avatar peterfaria-lula avatar theangrybyrd avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fsharp.control.taskseq's Issues

Consider `StartImmediateAsTask` instead of `StartAsTask` to prevent a thread hop

In PR #114 (adding support for Async in Bind), @bartelink raised the question whether it would be better to use StartImmediateAsTask instead of StartAsTask.

Specifically, it is about this part:

let mutable awaiter =
    Async
        .StartAsTask(asyncSource, cancellationToken = sm.Data.cancellationToken)
        .GetAwaiter()

The current approach was taken to mimic the same approach taken for task in F# Core: https://github.com/dotnet/fsharp/blob/d91b6c5c97accf363d135d1f99816410da4ec341/src/FSharp.Core/tasks.fs#L462, basically:

member inline this.Bind(computation: Async<'TResult1>, continuation: ('TResult1 -> TaskCode<'TOverall, 'TResult2>)) : TaskCode<'TOverall, 'TResult2> =
    this.Bind(Async.StartAsTask computation, continuation)

For comparison, see what was done in async2, which was poised to be the new async based on resumable code, like task is, but unlike task cold-started like the current async.

So, we have two scenarios:

  • use StartAsTask to get similar semantics as the task CE, but this causes a thread hop
  • use StartImmediateAsTask, which uses the current context

I'm not aware of any downside of using the latter. It definitely saves a thread switch in the general case (note that the thread pool may not always cause a physical thread switch).

Please implement `TaskSeq.tryItem`, `tryLast`, `tryHead`, `tryPick`, `tryFind` and the non-try counterparts

Signatures should be similar like for Seq, with overloads for async versions so that the picker or chooser function can return a Task.

Consider:

TaskSeq.isEmpty
TaskSeq.head
TaskSeq.tryHead
TaskSeq.last
TaskSeq.tryLast
TaskSeq.tryExactlyOne
TaskSeq.exactlyOne
TaskSeq.item
TaskSeq.tryItem
TaskSeq.find + async version
TaskSeq.tryFind + async version
TaskSeq.pick + async version
TaskSeq.tryPick + async version
TaskSeq.filter + async version
TaskSeq.choose + async version

As with Seq, the non-try variants will raise an exception if the precondition is not met.

Why is 6.0.2 the min FSharp.Core version requirement?

Great to see this taking shape. Was able to port Equinox to use it in one impl without issues. Am intending to spread that to the other integrations in due course.

However, it does force a dep on FSharp.Core v 6.0.2 - is there an upstream requirement that triggers this ? Is this likely to remain stable?

(Reason I ask is that the Equinox deps are 6.0.0 and I was trying to hold the line on that. Should I up the requirement, Equinox will likely shift to depending on 6.0.6, as there's an ugly shim required for a StackOverflow in Async.Parallel that's not fixed until then)

Implement For on the built-in TaskBuilder

It would be useful to add an extension method For on TaskBuilder to be able to do for loops over IAsyncEnumerable in tasks.

let mySeq =
    taskSeq {
        for i in 0..9 do yield i
    }

let myTask =
    task {
        for x in mySeq do // <-- This is currently not possible.
            printfn $"{x}"
    }

Naming notes

I do notice the name "TaskSeq" is confusing some people - people thinking this is about "sequences of tasks". I'm not sure what to do about this.

While thinking about this I included some general notes on naming in this space, see below

  • F# IEnumerator

    • cold start
    • run once
    • no implicit cancellation token
    • no asynchronous waits
    • many results
    • state machines
    • = HotSynchronousFastEnumerator
    • = IEnumerable<T>
  • F# IEnumerable = Seq

    • cold start
    • run multiple
    • no implicit cancellation token
    • no asynchronous waits
    • many results
    • state machines
    • = ColdSynchronousFastEnumerable
    • ~= unit -> IEnumerable<T>
  • .NET/F#/C# Task = C# async/await

    • hot start
    • run once
    • no implicit cancellation token
    • asynchronous waits
    • one result
    • state machines
    • = HotAsynchronousFastValue
    • = Task<T>
  • IcedTask ColdTask

    • cold start
    • run many times
    • no implicit cancellation token
    • asynchronous waits
    • one result
    • state machines
    • = ColdAsynchronousFastValueFactory
    • ~= unit -> Task<T>
  • IcedTask CancellableTask

    • cold start
    • run many
    • implicit cancellation token
    • asynchronous waits
    • one result
    • state machines
    • = ColdAsynchronousFastCancellableValueFactory
    • ~= CancellationToken -> Task<T>
  • Async = F# async

    • cold start
    • run multiple
    • implicit cancellation token
    • asynchronous waits, one result
    • no state machines
    • = ColdAsynchronousCancellableValueFactory
    • ~= CancellationToken -> Task<T>
  • Current F# AsyncSeq

    • cold start
    • run multiple
    • implicit cancellation token
    • asynchronous waits
    • many results
    • no state machines
    • = ColdAsynchronousCancellableEnumerable
    • ~= CancellationToken -> IAsyncEnumerator<T>
  • Current F# TaskSeq

    • cold start
    • run multiple
    • implicit cancellation token governing iteration but not passed to each task along the way
    • asynchronous waits
    • many results
    • state machines
    • = ColdAsynchronousHalfCancellableEnumerable
    • ~= CancellationToken -> IAsyncEnumerator<T>

I'm leaving the question of tailcalls off the list, as much as I'd like to address that.

It's worth noting that at a high level there's no real logical difference between CancellableTask and F# Async<_>. Nor between F# TaskSeq and F# AsyncSeq.

The sweet spot for F# is really Cold+RunMany+Asynchronous+Fast+Cancellable+Tailcalls, which is what TaskSeq is close to being technically (except tailcalls, sadly).

Implement `^TaskLike` similar to F#'s `task`

To support the same types F# Core supports with task, specifically in let! and do!, we'll need to implement the ^TaskLike SRTP approach. This prevents ambiguities caused by Task<'T> :> Task. It also limits the needed overloads for ValueTask<'T> and ValueTask.

Related to #43, which can only be implemented by using this approach.

This change will unify the types supported between task and taskSeq, except for async, which should be added next (see #79).

Implement `TaskSeq.length`, `allPairs`, `cache`, `cast`, `concat`, `findIndex`, `contains`, `exists`, `init`, `initInfinite`, `indexed` etc

While most of the above have trivial semantics and should sometimes take an Async overload (i.e., findIndexAsync and initAsync for the lambda), I'm not entirely sure of the usefulness of TaskSeq.cache (and we already have TaskSeq.toSeqCached), but perhaps for parity it should be included.

A note on TaskSeq.cast vs Seq.cast

While the docs on Seq.cast say that it is meant for a "loosely typed sequence", to be cast to a generically strongly typed sequence, it can be used just fine to cast an F# seq, because it also implements IEnumerable (i.e., the non-generic variant).

// this is fine
let x: seq<uint> = seq { 1 } |> Seq.cast

But F# also allows you to perform an illegal cast:

// this won't give any warnings, but will throw an exception
let x: seq<Guid> = seq { 1 } |> Seq.cast

Since IAsyncEnumerable<'T> only exists as a typed sequence, we should honor that and only allow valid casts. However, F# doesn't allow a constraint like 'T :> 'U, the rh-side must be a concrete type. This means that in the end, it'll effectively work the same way as for Seq.cast through boxing. Alternatively, if the type-relation is known ahead of time, users should probably just write TaskSeq.map (fun x -> x :> _) just as they would for seq<_>.

// this is fine
let x: seq<uint> = taskSeq { 1 } |> TaskSeq.cast
// this *should* raise a compile-time exception, but there's no way to enforce that
let x: seq<Guid> = taskSeq { 1 } |> TaskSeq.cast  // incompatible

For comparison, Linq.Enumerable.Cast works through the untyped Enumerable as well.


EDIT: to get better parity with Seq.cast and Linq.Enumerable.Cast, I decided to only allow it to work with untyped task sequences (that is: IAsyncEnumerable<obj>). I've updated the signature below. In addition, we'll be adding a box and unbox helper as well, the latter only for value types. If users want a reinterpret_cast style cast, they can use TaskSeq.box >> TaskSeq.cast. Due to the static way this is compiled, this won't add overhead.

TODO list:

  • length, see #53
  • lengthBy, see #53
  • lengthByAsync, see #53
  • allPairs // later, requires cache to be implemented
  • indexed, see #68
  • cache // later, see MBP approach of AsyncSeq
  • cast, see #67
  • box, see #67
  • unbox, see #67
  • concat, see #69
  • findIndex, see #68
  • findIndexAsync, see #68
  • tryFindIndex, see #68
  • tryFindIndexAsync, see #68
  • contains
  • exists
  • existsAsync
  • init, see #69
  • initAsync, see #69
  • initInifinite, see #69
  • initInifiniteAsync, see #69

Proposals of signatures:

module TaskSeq =
    val length: source: taskSeq<'T> -> Task<int>
    val lengthBy: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
    val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>

    val allPairs: source1: taskSeq<'T> -> source2: taskSeq<'U> -> taskSeq<'T * 'U>
    val indexed: source: taskSeq<'T> -> taskSeq<int * 'T>
    val cache: source: taskSeq<'T> -> taskSeq<'T> // later, see MBP approach of AsyncSeq
    val cast: source: taskSeq<obj> -> taskSeq<'U>
    val box: source: taskSeq<'T> -> taskSeq<obj>
    val unbox<'T when 'T: struct> : source: taskSeq<obj> -> taskSeq<'U>
    val concat: source1: taskSeq<'T> -> source2: taskSeq<'T> -> taskSeq<'T>

    val findIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
    val findIndexAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<int>
    val tryFindIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int option>
    val tryFindIndexAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<int option>
    val contains: value: 'T -> source: taskSeq<'T> -> Task<bool> (requires comparison)
    val exists: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<bool>
    val existsAsync: predicate: ('T -> Task<bool>) -> source: taskSeq<'T> -> Task<bool>

    val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>
    val initAsync: count: int -> initializer: (int -> Task<'T>) -> taskSeq<'T>
    val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>
    val initInfiniteAsync: initializer: (int -> Task<'T>) -> taskSeq<'T>

Warnings NU1605 when building test project

You may see the following warnings when running build.cmd or building from VS:

d:\Projects\OpenSource\Abel\TaskSeq\src\FSharp.Control.TaskSeq.Test\FSharp.Control.TaskSeq.Test.fsproj : warning NU1605: Detected package downgrade: FSharp.Core from 6.0.7 to 6.0.3. Reference the package directly from the project to select a different version.  [d:\Projects\OpenSource\Abel\TaskSeq\src\FSharp.Control.TaskSeq.sln]

Release version 0.4.0-alpha.1

Releasing an alpha version as some of the features I want completed in 0.4.0 are not completed yet. These are the relevant release notes:

0.4.0-alpha.1

Some highlights, but see Release 0.4.0.-alpha.1 for details

  • fixes not calling Dispose for 'use!', 'use', or finally blocks #157 (by @bartelink)
  • BREAKING CHANGE: null args now raise ArgumentNullException instead of NullReferenceException, #127
  • adds TaskSeq.takeWhile, takeWhileAsync, takeWhileInclusive, takeWhileInclusiveAsync, #126 (by @bartelink)
  • adds AsyncSeq vs TaskSeq comparison chart, #131
  • removes release-notes.txt from file dependencies, but keep in the package, #138

CancellationToken not used/registered

Hi.

Thanks for working on this. I have been exploring this repository since I wanted to do something similar, and I noticed that even when manually passing in a cancellation token, it is not directly acted upon. So for example

let consumeManually (enumerable: IAsyncEnumerable<int>) (token: CancellationToken) = task {
    do! Task.Yield()
    let mutable hasRead = false
    use enumerator = enumerable.GetAsyncEnumerator(token)        
    let! canRead = enumerator.MoveNextAsync()
    hasRead <- canRead
    while hasRead do
        let _here = enumerator.Current
        let! canRead = enumerator.MoveNextAsync()
        hasRead <- canRead

    return ()
}

let infinite () = taskSeq {        
    while true do
        yield 1
}

[<Fact>]
let usesCancellationToken () = task {
    use src = new CancellationTokenSource()
    let readToEnd = consumeManually (infinite()) src.Token
    do! Task.Delay(100)
    src.Cancel()
    do! readToEnd 
}

will run forever. However, if something like Async.Sleep() is bound inside the taskSeq, it will end with an exception, since the token is actually passed to the async when it is bound.

Do you think it would make sense to set the promiseOfValueOrEnd to a cancelled exception (which seems to be the ideomatic dotnet way) or complete(false) when the token is cancelled? I.e, using the CancellationTokenRegistration to abort MoveNextAsync on cancellation?

Productize Task/ValueTask/Async.`ignore`

I've long felt icky about using Async.Ignore. Seeing the Async.ignore that's presently hiding in plain sight here makes me realise the absence of a Task.ignore and ValueTask.ignore from FSharp.Core is something I've also been grinning and bearing (sprinkling :> Task and all sorts of other such mindless hacks). Utils.fs presently has helpers that feel very tempting to use more broadly. However, binding to a set of support helpers on the fringe of this library's core feature set is obviously debatable.

(The other pair from here that I feel deserves a canonical implementation in a library that doesn't include a jungle of other less-related helpers is Async.ofTask/Task.toAsync, but with corrected AggregateException semantics, and honoring of Async.Cancellation when compared to Async.AwaitTask as per FSharp.Core. Will likely calve off a separate issue from this if placeholder FSharp.Core.TaskShims package/repo is deemed to be the best home for a set of ignore helpers)

related:

TaskEx: parallelLimit

Replaces #129. TaskEx top level issue: #139

Async.Parallel's optional degree of parallelism parameter was added late in the game, but is critical - dumping an arbitrary unbounded number of work items onto the threadpool is not something that should be easy and/or the default thing to do without due consideration for how that will work under stess.

There are some other shortcomings, which frequently lead to various bespoke helpers proliferating:

  • pipelining is painful, necessitating an explicit argument name (e.g. fun computations -> Async.Parallel(computations, maxDegreeOfParallelism=dop) etc) (note this is not the case for Async.Sequential)
  • before v FSharp.Core v 6.0.6, [there was a stack overflow bug that can tear down the process](// dotnet/fsharp#13165) if >1200 items are started with a throttle and cancellation is triggered quickly (so having a layer between Async.Parallel and direct consumption within an app might be useful)

Current proposed APIs (will be updated inline based on any discussion below):

module Async =
    let parallelLimit maxDegreeOfParallelism computations =
        Async.Parallel(computations, maxDegreeOfParallelism = maxDegreeOfParallelism)

NOTES:

  • the naming aligns with that used in Node https://www.npmjs.com/package/run-parallel-limit
  • A common case is to use this to run but await failure of multiple Async<unit> tasks. Having to use |> Async.Ignore<unit[]> is ugly for that (and most people probably do |> Async.Ignore, which prevents the compiler from helping you if your computations start to return values where they previously returned unit)
  • How do you swap back/forth from that to Task, considering cancellation tokens (see #142) and unwrapping AggregateExceptions (see #141). Providing an equivalent of this that works well with task expressions should likely be prototyped alongside any permanent API for this. Example impl. Also, perhaps a Task.sequential might make sense
  • having Throttled in the name is pretty well established in multiple internal library suites, and in posts such as https://www.compositional-it.com/news-blog/improved-asynchronous-support-in-f-4-7

Add better support for cancellation tokens passing

Currently, it is not very clear how users can pass a CancellationToken through. While the CE has support for cancellation tokens, and the token is passed on to GetAsyncEnumerator(ct), unless users access the interface directly, there is currently no way to pass a cancellation token.

There are four, not necessarily exclusive ways, to implement this.

  1. Add TaskSeq.setCancellationToken, which will return a taskSeq with the given cancellation token
  2. Add a do! overload that allows writing taskSeq { do! myCancelToken } in your CE, otherwise behaving the same as (1)
  3. Add a custom operation, such that you can write taskSeq { cancellationToken myCancelToken } in your CE
  4. Add a parameter to the taskSeq CE constructor. However, if possible, this may not be easily discoverable

There are upsides and downsides to each of these approaches. I think the first option, together with a getCancellationToken should at least be supported.

However, there are other challenges as well. The helper functions in the TaskSeq module all use the CancellationToken() constructor (that is: no cancellation support). The taskSeq builder in that respect is a bit of a mixed beast. Yes, you can pass in a cancellation token, but you have to do it manually, and then, using any of the helpers will basically ignore this token.

That's not a good position to be in. Probably, code like source.GetAsyncEnumerator(CancellationToken()) should become something like source.GetAsyncEnumerator(TaskSeq.getCancellationToken source). Which runs into another issue: while the state machine has a property for the cancellation token, any other implementation of IAsyncEnumerable<'T> does not, which requires a type check to detect.

Adding all this magic comes at a cost. In AsyncSeq and Async this was resolved by adding overloads that take an optional cancellation token. Not ideal either.

Implement dynamic versions of the resumable code

In rare scenarios, for instance, where the resumable code is invoked in a top-level function, the compiler may not be able to compile the resumable code statically. For these cases, we need a dynamic version of the resumable code (which exists, but currently simply raises an exception).

This issues serves as a placeholder for this.

CI tests appear to run indefinitely when parallelized, but only on Github, not locally

Attempt at fixing and more analysis: #27.

While working on PR #23, it turned out that the test runner never finished, leading to empty logs (while running or after canceling). The bad behavior of the Github reporting is explained here: actions/runner#1326 (and it was closed with no fix).

The cause appears to be a (potential) race condition. It is yet unclear whether that is caused by the resumable code or something else. The temporary solution in #23 is to disable parallel runs in CI and to improve reporting.

Unfortunately, xUnit reporting is quite useless for cases like this. It only reports when a test has finished, not when it started. While dotnet run has a --blame-hang-timeout, which at least solves the issue with GH action reporting, as at the very least, this results in a failed test run.

Opening this issue to investigate further, so that we can close #23 meanwhile, as the implementations there have nothing to do with this behavior (removing enough tests removes the issue, but it doesn't matter which ones you remove, ultimately, the race condition comes back).

Example of a run that went on for over 5 hours (!): https://github.com/abelbraaksma/TaskSeq/actions/runs/3250819154/jobs/5334999458 (note: after retention period is over, this log will be gone).

TaskEx index

Replaces #128 #129

Within the F# ecosystem, there are a number of commonly used augmentations that typically get maintained as internal helpers within libraries and applications, manually copied around in various ways. Examples of such helpers:

While it can be argued that many of these concerns should be addressed in FSharp.Core, these are being logged here first in an attempt to narrow down the number of suggestions, with a view to getting stable naming (and potentially a shims library of some kind as a stopgap). NOTE that building a one stop shop for lots of arbitrary helpers is a non-goal of this effort.

NOTES:

F# fails to determine which overload of `TaskSeqBuilder.Using` to use.

F# fails to determine which overload of TaskSeqBuilder.Using to use when type implements both IDisposable and IAsyncDisposable.

A real-world example with the NpgsqlDataReader type from Npgsql:

#r "nuget: Npgsql"
#r "nuget: FSharp.Control.TaskSeq"

open Npgsql
open FSharp.Control


let readRows (command: NpgsqlCommand) (ct: CancellationToken) f = taskSeq {
    use! reader = command.ExecuteReaderAsync ct // Fails to compile.
    
    let! reader = command.ExecuteReaderAsync ct
    use reader = reader // Also fails to compile.
}

Below is a set of test cases replicating the issue.

#r "nuget: FSharp.Control.TaskSeq"
#r "nuget: Xunit"
#r "nuget: FsUnit"

open System
open System.Threading.Tasks
open FSharp.Control
open FsUnit
open Xunit


type private OneGetter() =
    member _.Get1() = 1

type private Disposable() =
    inherit OneGetter()

    interface IDisposable with
        member _.Dispose() = ()

type private AsyncDisposable() =
    inherit OneGetter()

    interface IAsyncDisposable with
        member _.DisposeAsync() = ValueTask()

type private MultiDispose() =
    inherit OneGetter()

    interface IDisposable with
        member _.Dispose() =
            ()

    interface IAsyncDisposable with
        member _.DisposeAsync() =
            ValueTask()

let private check ts = task {
    let! length = ts |> TaskSeq.length
    length |> should equal 1
}

[<Fact>]
let ``CE task: Using when type implements IDisposable``() =
    let ts = taskSeq {
        use x = new Disposable()

        yield x.Get1()
    }

    check ts

[<Fact>]
let ``CE task: Using when type implements IAsyncDisposable``() =
    let ts = taskSeq {
        use x = AsyncDisposable()
        yield x.Get1()
    }

    check ts


[<Fact>]
let ``CE task: Using when type implements IDisposable and IAsyncDisposable``() =
    let ts = taskSeq {
        use x = new MultiDispose() // Fails to compile
        yield x.Get1()
    }

    check ts

[<Fact>]
let ``CE task: Using! when type implements IDisposable``() =
    let ts = taskSeq {
        use! x = task { return new Disposable() }
        yield x.Get1()
    }

    check ts


[<Fact>]
let ``CE task: Using! when type implements IAsyncDisposable``() =
    let ts = taskSeq {
        use! x = task { return AsyncDisposable() }
        yield x.Get1()
    }

    check ts


[<Fact>]
let ``CE task: Using! when type implements IDisposable and IAsyncDisposable``() =
    let ts = taskSeq {
        use! x = task { return new MultiDispose() } // Fails to compile
        yield x.Get1()
    }

    check ts

Performance smoke-tests seem to be broken

Running ``CE taskSeq with nested deeply yield!`` (permalink, goes to original from the time of writing this) takes forever, but based on the non-delaying performance, it should be "just fine" and take an average time.

This test creates nested IAsyncEnumerable<'T>, themselves based on unit -> task<'T> functions to prevent having hot-started tasks. However, the test takes over 30s to finish (with the commented code enabled). Note that all of these tests deliberately use a randomly generated delay though Task.Delay.

Similar tests with many tasks that are not delayed do not have an issue.

Move namespaces from FSharpy.TaskSeq to FSharp.Control

We should either target Microsoft.FSharp.Control or FSharp.Control. I noticed that these namespaces, or at least the first one, is auto-opened in F# projects. According to @dsyme that shouldn't happen, so that behavior may not stick, but FusionTasks does exhibit that behavior, see for instance this source file on Async extensions, which get auto-opened as soon as you project has a reference to that lib.

I'll have to toy with the options a bit. Maybe the above behavior is only that way because Async is a type in F# Core already, but TaskSeq isn't.

Just opening this to keep track of options. Suggestions welcome.

TaskEx: ignore

Replaces #128. TaskEx top level issue: #139

Async.Ignore has always been ugly and undiscoverable. While I tend to make ignoring explicit by using let! _ = <async stuff I want to ignore result of>, it's commonly the last expression in a function, and having to do let! _ = <thing I'm wrapping> in () is too much.

It is proposed that the module associated with any given builder should by convention have an ignore function that correctly observes completion of the work, dropping the result, but propagating exceptions, if any

Current proposed APIs that are not currently in FSharp.Core (will be updated inline based on discussion below):

module Async =
    let inline ignore (a : Async<'t>) = Async.Ignore
module Task =
    let inline ignore (t : Task<'t>) = ...
module ValueTask =
    let inline ignore (t : ValueTask<'t>) = ...

NOTES:

  1. Currently, TaskSeq (and other libs such as IcedTasks contain various bespoke implementations
  2. While this could live in a TaskEx lib, it could be argued that FSharp.Core is the obvious home for them. (However that would raise the issue of whether they need to go into the 6.x release line in order to align with minimal dependencies for various common libraries)

Remove AggregateException wrapping in Async CE `for` extension and prevent threadpool transition

At present the current impl of for within async { expressions raises two concerns for me:

  • AwaitTaskCorrect semantics would be preferable to promulgating usage of Async.AwaitTask in a place that most people will not necessarily even infer that it's in play.
  • AIUI Async.StartAsTask includes an unnecessary transition to the Thread Pool (with a potential context switch?) which could instead safely be replaced with Async.StartImmediateAsTask in this context. EDIT: moved to separate thread #135
  • the starting of the child Async does not propagate the continuation token (not sure whether fixing that is possible/required) EDIT: separate issue, see #133

Remove irrelevant internals from the public surface area

Certain functions, like Debug.xxx or the module TaskSeqInternals are publicly visible (though most of the actual functions are not). Likewise, types like TaskSeqResumptionFunc should probably be hidden.

Adding proper fsi files (currently only for the TaskSeq module), should do the trick.

Should we throw an exception for TaskSeq.zip when the sequences are of unequal length?

While writing tests for #31, I started pondering this ☝️.

For comparison, F# throws an ArgumentException for Array.zip and List.zip, but not for Seq.zip. Most likely, this is because "getting the next item" can have an unwanted side effect, so instead, they choose to stop processing as soon as one of the sequences is exhausted.

Arguably, such side effect can still happen. I.e., for zip we have to call MoveNextAsync, which will give us a bool that tells us whether we reached the end of the sequence. But, if the first sequence still has items, but the second sequence is exhausted, the side effect of the first sequence for "one item past the last" has already happened.

In fact, I think the Seq.zip in F# should probably come with a warning. What if each item is "getting a web page of several megabytes"? AFAIK, there's no peek function that would potentially tell me, without side effects, that a sequence is exhausted.

Edit: linking the discussion in the F# repo: dotnet/fsharp#14121

Allow use of `Task`, as opposed to `Task<'T>` in `do!` statements

Currently, this is not possible:

taskSeq {
   do! Task.Delay 220  // fails, because this returns a Task, not a Task<unit>
}

Add necessary overloads for allowing this, similarly to how the task CE works.

Of note is that allowing Task can lead to unexpected behavior, as Task<'T> inherits from Task. However, we should strive to have parity with task in F#. to avoid surprises for users. Just like with normal task, we would still disallow do! task { return 10 }. This should be OK, as the F# compiler does not (yet) allow that kind of conversion.

Consider tail recursion

We've removed the tail recursion because it was hard to consolidate it into yield! (it was instead done with return!, which has no place in taskSeq).

However, today I helped someone with some code that he considered for taskSeq which had the following approach:

let getPoliciesAsync policyid =
    asyncSeq{
        use connection = new NpgsqlConnection(npgsqlConnectionStringBuilder.ConnectionString)
        use command = new NpgsqlCommand($"SELECT policy_data FROM policy.tbl_policy where policy_id = {Sql.uuid policyid};", connection)
        let! reader = command.ExecuteReaderAsync() |> Async.AwaitTask
        let rec someRec() = asyncSeq{
            let! rowExists = reader.ReadAsync() |> Async.AwaitTask
            if rowExists then
                yield {| dt1 = reader.GetString(0) |}
                yield! someRec()
        }
        yield! someRec()
    } |> AsyncSeq.toAsyncEnum

As you can see, it uses asyncSeq, but also: it is recursive. The same approach with taskSeq would likely be more performant, however, if there are a lot of rows, this becomes problematic. This code can be rewritten with a loop, though.

@dsyme, sharing this with you in case we want to revisit this at some point.

Interoperating with CancellableTasks

While writing #77 I realised it would be logical to allow this TaskSeq thing to interoperate with a standard CancellableTask, that is

taskSeq {
    let! res = cancellableTask { ... }
    ...
}

which would pass the CancellationToken governing the iteration of the TaskSeq to the Cancellable task. This would give robust cancellation token passing through the overall computation.

Under the alternative long term strategy in #77 this would be the same as

asyncSeq {
    let! res = async2 { ... } // re-implemented F# async
    ...
}

because F# cancellableTask and async2 are more or less the same thing.

This would mean bringing a CancellableTask into this library from IcedTasks, which may be a good thing.

TaskEx: AwaitTaskCorrect / Task.toAsync / Async.ofTask

Replaces #129. TaskEx top level issue: #139

The default implementation of the Async.AwaitTask methods in FSharp.Core have some key shortcomings:

  1. when the Task faults, yielding an exception, that exception is typically (always?) wrapped in an egregious AggregateException
  2. the default implementation does not abort/cancel when the ambient CancellationToken of the async expr within which Async.AwaitTask is triggered
  3. cancelling an Async computation should not just abort the processing, it should also propagate a TaskCancelledException to align with the behavior of Task

While it can be argued that the current behavior is 'wrong', it's also obvious that breaking it would be untenable, and the semantic differences are beyond what one might cover with subtle overloads and/or adding optional arguments etc.

Current proposed APIs (will be updated inline based on any discussion below):

module Async =
    let inline ofTask (t : Task<'t>) : Async<'t> = AwaitTaskCorrect t
    let inline ofUnitTask (t : Task) : Async<'t> = AwaitTaskCorrect t
module Task =
    let inline toAsync (t : Task<'t>) : Async<'t> = Async.ofTask t

NOTES:

  • the above naming is taken from the helpers within TaskSeq, which are exposed in the FSharp.Control namespace. NOTE the current implementations use Async.AwaitTask, but that was not as a conscious choice, and there is a desire` to fix at least some of the shortcomings noted
  • given the fact that task and async are now first class citizens of FSharp.Core, having an of/to pairing would seem to make sense. This is open to debate; not sure the degree to which the prior art is consistent wrt this beyond collection types/modules
  • There is an fslang-suggestion regarding this. The purpose of this issue is as a placeholder for potentially filling the gap until this issue can be more thoroughly resolved in FSharp.Core proper.
  • Equinox, Propulsion and FSharp.AWS.DynamoDB all have copies of the canonical impl in Eirik's fssnip. Not ruling out tweaks to the semantics, but ideally those libraries, and others, would all share the same semantics
  • if this is handled as a module, one also frequently needs an ofUnitTask alongside as above (Async.AwaitTask is a pair of overloaded methods, which often brings its own issues with intellisense and error messages etc)

Disposal is not called with `use!` and the like

It appears that Dispose() or DisposeAsync() is not being called when use! is used. First mentioned offline by @bartelink. Logging here to link the PR to a tracking issue and for the changelog/release notes.

Turned out that this was inadvertently commented out in 27486f3, which shouldn't have happened. In a follow up I will create some tests to ensure this won't happen again.

Add `skipWhile`, `skipWhileInclusive` with async variants, and `takeUntil` etc

Similar to #122. The naming should be skipWhile, even though skipUntil was proposed as well in that thread. F# Core has skipWhile, so we should stick with that. Also, I think they are semantically different:

Skip while

  • skipWhile: as long as predicate is true, keep skipping, then stop
  • skipWhileAsync: same, but async
  • skipWhileInclusive: same, but yield first, then test predicate on yielded value
  • skipWhileInclusiveAsync: same, but async

Skip until

  • skipUntil: skip until predicate becomes true
  • skipUntilAsync: same, but async
  • skipUntilInclusive: same, but skip first, then test predicate
  • skipUntilInclusiveAsync:same, but asyncate

And the missing Take Until

We should include these if we do the skipUntilXXX functions:

  • takeUntil: as long as predicate is false, keep skipping, then stop
  • takeUntilAsync: same, but async
  • takeUntilInclusive: same, but yield first, then test predicate on yielded value
  • takeUntilInclusiveAsync: same, but async

Not sure we should have both until and while, as they are logically each other's opposites.

Implement functions `TaskSeq.skip`, `skipWhile`, `where`, `truncate`, `take`, `insertAt`, `updateAt`, `forall`, `concat` (seq)

Please add the following from the wish list of the front page:

TODO list:

  • skip, take (#209)
  • truncate and drop (as with Seq.truncate, these are the counterparts of take and skip that do not throw) (see #209)
  • skipWhile, skipWhileAsync, skipWhileInclusive, skipWhileInclusiveAsync (#219)
  • where and whereAsync (to mimic Seq, but this is just an alias for filter) (#217)
  • (takeWhile etc, see #122 / #126, already implemented)
    • improvements: #235
  • insertAt, removeAt and updateAt, plus insertManyAt, removeManyAt and updateManyAt (#236)
  • forall and forallAsync (#240)
  • max, maxBy, min, minBy, plus maxByAsync and minByAsync (#221)
  • concat overloads (for concatenating a taskseq of sequences/lists etc) (#237)

Full list of signatures:

val take: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val skip: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val drop: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val truncate: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>

val skipWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val skipWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

val where: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val whereAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

// these four already implemented:
val takeWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>
val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

val insertAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val removeAt: position:int -> source: TaskSeq<'T> -> TaskSeq<'T>
val updateAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>

val insertManyAt: position:int -> values:TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
val removeManyAt: position:int -> count:int -> source: TaskSeq<'T> -> TaskSeq<'T>

val forall: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<bool>
val forallAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> Task<bool>

val max: source:TaskSeq<'T> -> Task<'T> (requires comparison)
val min: source:TaskSeq<'T> -> Task<'T> (requires comparison)
val maxBy: projection: ('T -> 'U) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val minBy: projection: ('T -> 'U) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val maxByAsync: projection: ('T -> #Task<'U>) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)
val minByAsync: projection: ('T -> #Task<'U>) -> source:TaskSeq<'T> -> Task<'T> (requires comparison)

// overloads for existing concat (which takes nested task sequences)
val concat: sources: TaskSeq<'T seq> -> TaskSeq<'T>
val concat: sources: TaskSeq<'T list> -> TaskSeq<'T>
val concat: sources: TaskSeq<'T array> -> TaskSeq<'T>
val concat: sources: TaskSeq<ResizeArray<'T>> -> TaskSeq<'T>

Move to static members and overloads, to support cancellation token passing, `ValueTask` and `Async` in the module members

It is currently quite a pain to implement cancellation passing explicitly. As has been said in several issues already, we need to support this on each member to do it properly. Plus we will need overloads:

  • Each method taking a TaskSeq should also seamlessly take a ConfiguredCancelableAsyncEnumerable<_>, which is available in .NET Standard 2.1. This would solve #167, reported by @xperiandri (PS: to prevent the surface area to blow up, I will check if ofCancelable/toCancelable functions would serve the same purpose).
  • Each method currently with the Async prefix should be able to take a Task (as they can, currently), an Async and a ValueTask. Writing functions like iterAsync, iterWithTask, iterWithValueTask etc is going to be too painful long-run.
  • Each method should have an optional cancellation token argument as their last argument (?), so that piping into the functions is still possible, but it will default to no cancellation token. Only members that consume the task sequence should get a cancellation token argument.

TaskEx: Async.startImmediateAsTask

Replaces #129. TaskShims top level issue: #139

In contrast to its sibling, Async.StartAsTask (because it is not starting a thread), Async.StartImmediateAsTask does not have a taskCreationOptions optional parameter, so people often use it pipeline expressions.

However, that makes it easy to gloss over the fact that the computation will then not have an ambient cancellation token.

In order to resolve those forces (wanting to be able to execute an async via piping, without the risk of omitting to consider cancelation), it is proposed to have a common helper function (with a lower case name) that

  • takes a cancellation token (as its first argument)
  • passes that and the Async onto Async.StartImmediateAsTask

Current proposed APIs (will be updated inline based on any discussion below):

module Async =
    let inline startImmediateAsTask ct (a : Async<'t>) = Async.StartImmediateAsTask(a, cancellationToken = ct)
   // ALTERNATELY
    let inline executeAsTask ct (a : Async<'t>) = Async.StartImmediateAsTask(a, cancellationToken = ct)

NOTES:

Support `ConfiguredCancelableAsyncEnumerable<'T>`

When you call .WithCancellation(cancellationToken) on IAsyncEnumerable<'T> you get System.Runtime.CompilerServices. ConfiguredCancelableAsyncEnumerable<'T> which is not currently supported.

So that this code does not work

    static member ToFlatListAsync<'Source>(source: IQueryable<'Source>, [<Optional>] cancellationToken: CancellationToken) = task {
        let builder = ImmutableArray.CreateBuilder<'Source>()
        do!
            source.AsAsyncEnumerable().WithCancellation(cancellationToken)
            |> TaskSeq.iterAsync (fun x -> builder.Add(x))

        return builder.ToImmutable();
    }

Iterating multiple times over a taskSeq { ... } raises InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.

As in the title. Any of these errors only happen when multiple iterations are attempted and the source is a taskSeq CE, not when the source is a user-defined or library defined IAsyncEnumerator<_>, used with the TaskSeq library functions.

I already started investigating this issue and it has to do with properly resetting state when "reaching the end" and when "getting an enumerator over the same resource". See #36 continued: #42.

Operation not valid error, MoveNextAsync()

// this throws: InvalidOperationException: Operation is not valid due to the current state of the object.
task {
    let tskSeq = taskSeq { yield 1; yield 2 }
    let enum = tskSeq.GetAsyncEnumerator()
    let! isNext = enum.MoveNextAsync()  // true
    let! isNext = enum.MoveNextAsync()  // true
    let! isNext = enum.MoveNextAsync()  // false
    let! isNext = enum.MoveNextAsync()  // error here
    ()
}

Operation not valid error, multiple GetAsyncEnumerator() with MoveNextAsync()

// throws: InvalidOperationException: Operation is not valid due to the current state of the object.
task {
    let tskSeq = getEmptyVariant variant
    use enumerator = tskSeq.GetAsyncEnumerator()
    let! isNext = enumerator.MoveNextAsync()
    use enumerator = tskSeq.GetAsyncEnumerator()
    let! isNext = enumerator.MoveNextAsync()  // throws here
    ()
}

Transition State error

// this throws: 
// InvalidOperationException: 
// An attempt was made to transition a task to a final state when it had already completed.
task {
    let tskSeq = taskSeq { yield 1; yield 2 }
    let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1)
    let result1 = TaskSeq.toArray ts1
    let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1)
    let result2 = TaskSeq.toArray ts2 // error here
    ()
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.