Futures

FSharp.Control.Futures это экспериментальная F# библиотека асинхронного программирования, вдохновленная Rust трейтом Future.

Концептуально Future являются таким же примитивом асинхронного программирования как C# Task или F# Async, поэтому если вы знакомы с ними, начать работать с Future'ами должно быть максимально просто.

Особенности дизайна Future

  • Future является "холодной" (вычисление начинается только после явного запуска).
  • Возможность отмеы без явной передачи CancellationToken.
  • Всегда явные точки прерывания.
  • Отсутствие блокировок в базовых комбинаторах.
  • Не требует выделения памяти под обратные вызовы, только выделения самих Future.

Сравнение Task, Async, Future

TaskAsyncFuture
ТипГорячиеХолодныеХолодные
ОтменаЯвный CancellationTokenНеявный CancellationTokenВызов метода отмены
Хвостовая рекурсияНетДаДа

Создание Future

Создание Future используя функции-комбинаторы

Используя функции модуля Future можно создать базовые и получить скомбинированные вариации Future. Разберем базовые функции создания.

// Создает Future, которое моментально завершается с переданным значением
let ready = Future.ready "Hello, world!"

// То же что и `Future.ready ()`, только в единственном экземпляре
let unit' = Future.unit'

// Future, которая никогда не завершается
let never = Future.never<_>

// Future, которое выполнит функцию при своем запуске и вернет её результат.
let lazy' = Future.lazy (fun () -> printfn "Hello, world!")

Вышеописанные функции позволяют создать базовые, наиболее простые Future. Они довольно просты и не проявляют свойств асинхронности, и тем не менее, могут быть крайне полезны когда вам необходима Future заглушка или простой способ преобразовать результат или действие в асинхронный примитив.

Все Future можно комбинировать друг с другом используя комбинаторы. Ключевым является понимание комбинатора Future.bind, который позволяет передать результат одного асинхронного вычисления по цепочке в следующее. Рассмотрим его на простом псевдо примере чтения из одного места и записи в другое.

Future.bind имеет сигнатуру (binder: 'a -> Future<'b>) -> fut: Future<'a> -> Future<'b> и создает Future которое передаст результат fut в binder и дождется результата возвращенного из него Future<'b>. Можно привести в качестве аналога .then из мира JS.

В примере ниже readAndWriteFuture будет иметь следующее поведение при запуске: дождется завершения Future, полученным вызовом readFileAsync, которое читает файл "my-file.txt"; затем создаст новое Future записи в файл через writeFileAsync и дождется его завершения.

// readFileAsync: filePath: string -> Future<string>
// writeFileAsync: filePath: string -> content: string -> Future<unit>
let readAndWriteFuture =
    readFileAsync "my-file.txt"
    |> Future.bind (fun content -> writeFileAsync "other-file.txt" content)

Также Future.bind могут объединяться в цепочку друг с другом, например так:

let doManyWork =
    doWork1 ()
    |> Future.bind (fun () -> doWork2 ())
    |> Future.bind (fun () -> doWork3 ())
    |> ...
    |> Future.bind (fun () -> doWorkN ())

let doManyWorkWithResults =
    doWork1 ()
    |> Future.bind (fun val1 -> doWork2 val1)
    |> Future.bind (fun val2 -> doWork3 val2)
    |> ...
    |> Future.bind (fun valPrevN -> doWorkN valPrevN)

Однако, ситуация сильно усложняется, если единицы работы зависят от результатов друг друга.

let doManyWorkWithCrossResults =
    doWork1 ()
    |> Future.bind (fun val1 ->
        doWork2 val1
        |> Future.bind (fun val2 ->
            doWork3 val1 val2
            |> Future.bind (fun val3 -> ...)))

Future.bind позволяет соединять асинхронные вычисления в последовательную цепочку, и выполнять асинхронную операцию за операцией. Этот процесс можно упростить используя F# Computation Expressions, о чем будет описано ниже. Однако перед этим стоит рассмотреть еще несколько комбинаторов.

// Преобразование значения
let map = Future.map (fun n -> n.ToString()) (Future.ready 12)

// Игнорирование значения
let unitFuture = Future.ignore (Future.ready 12)

// Параллельный запуск с ожиданием обоих (ждет 1000 мс)
let merge = Future.merge (Future.sleepMs 1000) (Future.sleepMs 500)

// Параллельный запуск с получением первого выполненного значения и отменой оставшегося
// (Ждет 500 мс)
let first = Future.first (Future.sleepMs 1000) (Future.sleepMs 500)

// Преобразует Future<Future<'a>> в Future<'a>
let join = Future.join (Future.ready (Future.ready 12))

// Ловит исключение вложенной Future, возвращает Result<'a, exn>
let catch = Future.catch (Future.lazy (fun () -> failwith "exception"))
Future это уже объект машины состояний. По этой причине множественное использование (комбинирование или запуск) одного экземпляра Future недопустимы. То есть следующий код, запускающий fut дважды недопустим ```fsharp let fut = someAsyncWork () let doubleFut = fut |> Future.bind (fun () -> fut) ``` Вместо этого всегда пересоздавайте Future при необходимости двойного использования: ```fsharp let doubleFut = someAsyncWork () |> Future.bind (fun () -> someAsyncWork ()) ```

Создание Future используя Future CE

Future имеет свой CE, который используется также как async или task CE встроенные в F#. Более подробно о CE вы можете прочитать на сайте.

Например, мы можем заменить базовые функции создания на future CE:

let ready = future { return "Hello, world!" } // ~ Future.ready "Hello, world!"
let lazy' = future { return (foo ()) } // ~ Future.lazy' (fun () -> foo ())

Наиболее важным свойством CE является упрощение работы с bind. Пример чтения-записи можно переписать используя CE так:

// readFileAsync: filePath: string -> Future<string>
// writeFileAsync: filePath: string -> content: string -> Future<unit>
let readAndWriteFuture = futur {
    let! content = readFileAsync "my-file.txt"
    return! writeFileAsync "other-file.txt" content
}

Видимым преимуществом CE является возможность "уплощить" цепочка bind, зависимых между собой. Пример множественно зависимых bind можно переписать так:

let doManyWorkWithCrossResults = future {
    let! val1 = doWork1 ()
    let! val2 = doWork2 val1
    let! val3 = doWork3 val1 val2
    ...
    let! valN = doWorkN val1 val2 ... valPrevN
}

Также CE добавляют синтаксис и для Future.merge или Future.catch комбинаторов.

let parallelCE = future {
    let! val1 = doWork1 ()
    and! val2 = doWork2 ()
    and! val3 = doWork3 ()
}
let catchCE = future {
    try
        do! doWork ()
    with ex ->
        printfn $"{ex}"
}
let tryFinally = future {
    try
        do! doWork ()
    finally
        do finallize ()
}

Создание Future из Async и Task

Существующие Async и Task можно преобразовать в Future и использовать результат их работы. Исходные Async и Task будут запущены на своих родных системах запуска, но их результат будет передан через возвращенную Future.

let asyncToFuture = Future.ofAsync (async { ... })
let taskToFuture = Future.ofTask (task { ... })

Возможны и обратные преобразования. При этом Future будут запущены на механизме запуска соответствующего примитива при запуске этого примитива.

let futureToAsync = Future.ofAsync (async { ... })
let futureToTask = Future.ofTask (task { ... })

Создание Future ручной реализацией Future

Future это всего-лишь интерфейс с методами Poll и Drop. Можно создать свою Future просто реализовав их.

Ручная реализация Future корректным образом не такая тривиальная задача, требующая ручной реализации конечного или не очень автомата. Поэтому не рекомендуется делать это, только если Вы не разрабатываете API для использования механизма асинхронности на низком уровне.

Объяснения и более подробные примеры следует искать в более продвинутых главах.

Запуск Future

Запуск Future без среды исполнения на текущем потоке

Future можно запустить на текущем потоке используя Future.runBlocking. Переданная Future запустится, а вызывающий поток будет заблокирован пока не получится результат.

let fut = future {
    let! name = Future.ready "Alex"
    do! Future.sleepMs 1000
    return $"Hello, {name}!"
}

let phrase = fut |> Future.runBlocking
printfn $"{phrase}"

Запуск Future используя Runtime

Future можно запустить на Runtime. Runtime это планировщик для нескольких параллельно выполняющихся Future, не используя Future.merge и снимая его ограничения (Future скомбинированные используя Future.merge никогда не выполняются по-настоящему параллельно).

Запустить Future на планировщике можно используя его метод Spawn.

let fut = future { ... }
let fTask = ThreadPoolRuntime.Instance.Spawn(fut)

Spawn возвращает объект запущенной задачи (IFutureTask<'a>). Используя экземпляр запущенной задачи можно преобразовать её в ожидающую выполнения Future используя Await, или прервать её выполнение через Abort. Если задача была прервана, ожидающая Future выбросит исключение при своем запуске.

future {
    let fTask = ThreadPoolRuntime.Instance.Spawn(future { ... })
    do! doOtherWork ()
    let! fTaskResult = fTask.Await()
}
IFutureTask.Await может быть вызван только один раз.

По-умолчанию Await создает Future, вызывающую Abort при своем Drop. Это можно переопределить вызвав Await с флагом background=true (fTask.Await(true)).

Детали реализации

Дизайн данной библиотеки вдохновлен дизайном Future из языка Rust. Однако сейчас содержит некоторые значимые изменения.

Принцип работы типа Future

Весь мeханизм организации асинхронного вычисления на основе Future сводится к опросу самой Future путём вызова Poll: IContext -> Poll<'a> до завершения её вычисления. Future может быть отброшена вызовом Future.Drop: unit -> unit

Результатом опроса является текущее состояние вычисления Future:

  1. Pending - вычисление Future находится в процессе и еще не завершено.
  2. Ready x - значение вычислено и опрос больше не требуется.
  3. Transit fut - говорит о подмене текущей Future другой такого же типа. Уменьшает глубину опроса и предназначен для оптимизации bind выражаемых конструкций.

Если Future не совершает асинхронную работу и может сразу вернуть результат(возможно с некоторой работой), то при первом опросе должен возвращаться Ready. В противном случае Future должна запомнить переданный Context и вызвать его метод Wake в случае завершения асинхронной работы, чтобы планировщик пробудил её от ожидания этой работы.

При написании Future стоит учитывать, что пока в ней выполняются блокирующие операции -- блокируется поток планировщика, на котором она запушена. Для долгих блокирующих операций (когда это возможно, например, в циклах которые сами не возвращают управление) можно применить Future.yieldWorkflow, который возвращает поток планировщику одновременно с пробуждением Future.

Дизайн отбрасывания Future прокидывает отбрасывание ко всем дочерним Future независимо от способа их создания. Это позволяет отбрасывать запущенную на планировщие Future через IFutureTask (который может быть преобразован в Future). А также отбрасывать любую Future в альтернативах (например внутри Future.first).

Ожидаемые инварианты Future

  • Future полностью синхронна извне: методы Poll и Drop взаимно не потоко-безопасны и не должны вызываться одновременно (в т.ч. нельзя вызывать у одной Future метод Poll или Drop несколько раз одновременно).

  • Future может опрашиваться (Poll) повторно, даже если она не пробудилась.

  • Future может пробуждаться несколько раз до повторного опроса.

  • Future считается терминальной (т.е. выполневшейся) после:

    • Вызова Poll с результатом Ready x,
    • Вызова Poll с результатом Transit fut
    • Вызова Poll с броском исключения (логический эквивален Ready (Error exn))
    • Вызова Drop (в т.ч. с броском исключения, хотя бросать исключения в Drop нельзя)
  • Когда Future переходит в терминальное состояние, она должна остановить (или отвязать от себя) её запущенные фоновые процессы и освободить занятые резурсы.

  • После достижения терминального состояния, Future не должна больше использоваться (т.е. не должны вызываться Poll и Drop)

  • Future должна быть доведена до терминального состояния, либо посредством окончания опроса, либо отменой, чтобы предотвратить утечку ресурсов и не привести к вечному блокированию.

  • Контекст для Future должен быть одним и тем же от первого опроса, до терминального состояния. Future достаточно сохранить контекст при первом опросе и не перезаписывать его. (Можно сказать, что каждый контекст соответствует одному запуску.)

  • Контекст каждой запущенной Future должен быть уникальным. Мотивация этого поведения в том чтобы предотвратить пробуждение новых Future в планировщике при утечке контекста старыми.

  • При достижении терминального состояния Future не следует пробуждаться, т.е. следует позаботиться о том, чтобы Wake её контекста не мог больше использоваться. Однако повторное пробуждение после достижения терминального состояния допустимо по вышеописанному правилу.

  • Drop не должен бросать исключения и должен отработать корректно, чтобы произвести освобождение ресурсов. (Реализациям Future НЕ нужно заботиться об этом, но если рансай)

Примечания

Для реализаций Future любое поведение не следующее инвариантам -- UB, и может как обрабатываться особым способом, так и игнорироваться.

IStream по сути является расширением модели Future, поэтому инварианты Future как правило верны и для IStream

В отличие от реализации в Rust, я не заставляю Future обновлять последний сохраненный ею IContext при каждом опросе. Преимущества на обоих платформах мне не очевидны, а это ощутимо усложняет реализацию. Поэтому это может измениться в будущем.

Методы Future не предназначены для ручного вызова, т.к. требуют с собой деликатного обращения. По этой причине IFutureTask не наследует Future, а имеет отображение в неё

Каждая Future может быть использована как-либо (забайнджена, запихнула в комбинатор, запущена) только ОДИН раз

Есть некоторая коллизия с тем, что Future может выкидывать исключения исключения как предметные, так и свои. Например, неявным результатом IVar может быть любое исключение. И одно из них это FutureTerminatedException. В этой точке будет не так прозрачно, исключение произошло в читающей Future или в записывающей. Но 0. Исключениям можно, у них сложный переезд из места появление в абсолютно случайное

  1. Возможные решения слишком муторные и не стоят того из-за исключения, которое означает проблему в библиотечном, а не пользовательском коде