Futures
FSharp.Control.Futures это экспериментальная F# библиотека асинхронного программирования, вдохновленная Rust трейтом Future.
Концептуально Future являются таким же примитивом асинхронного программирования как C# Task или F# Async, поэтому если вы знакомы с ними, начать работать с Future'ами должно быть максимально просто.
Особенности дизайна Future
- Future является "холодной" (вычисление начинается только после явного запуска).
- Возможность отмеы без явной передачи CancellationToken.
- Всегда явные точки прерывания.
- Отсутствие блокировок в базовых комбинаторах.
- Не требует выделения памяти под обратные вызовы, только выделения самих Future.
Сравнение Task, Async, Future
Task | Async | Future | |
---|---|---|---|
Тип | Горячие | Холодные | Холодные |
Отмена | Явный CancellationToken | Неявный CancellationToken | Вызов метода отмены |
Хвостовая рекурсия | Нет | Да | Да |
Создание Future
Создание Future используя функции-комбинаторы
Используя функции модуля Future можно создать базовые и получить скомбинированные вариации Future. Разберем базовые функции создания.
// Создает Future, которое моментально завершается с переданным значением
let ready = Future.ready "Hello, world!"
// То же что и `Future.ready ()`, только в единственном экземпляре
let unit' = Future.unit'
// Future, которая никогда не завершается
let never = Future.never<_>
// Future, которое выполнит функцию при своем запуске и вернет её результат.
let lazy' = Future.lazy (fun () -> printfn "Hello, world!")
Вышеописанные функции позволяют создать базовые, наиболее простые Future. Они довольно просты и не проявляют свойств асинхронности, и тем не менее, могут быть крайне полезны когда вам необходима Future заглушка или простой способ преобразовать результат или действие в асинхронный примитив.
Все Future можно комбинировать друг с другом используя комбинаторы. Ключевым является понимание комбинатора Future.bind, который позволяет передать результат одного асинхронного вычисления по цепочке в следующее. Рассмотрим его на простом псевдо примере чтения из одного места и записи в другое.
Future.bind имеет сигнатуру (binder: 'a -> Future<'b>) -> fut: Future<'a> -> Future<'b>
и
создает Future которое передаст результат fut в binder
и дождется результата возвращенного из него Future<'b>.
Можно привести в качестве аналога .then из мира JS.
В примере ниже readAndWriteFuture будет иметь следующее поведение при запуске: дождется завершения Future, полученным вызовом readFileAsync, которое читает файл "my-file.txt"; затем создаст новое Future записи в файл через writeFileAsync и дождется его завершения.
// readFileAsync: filePath: string -> Future<string>
// writeFileAsync: filePath: string -> content: string -> Future<unit>
let readAndWriteFuture =
readFileAsync "my-file.txt"
|> Future.bind (fun content -> writeFileAsync "other-file.txt" content)
Также Future.bind могут объединяться в цепочку друг с другом, например так:
let doManyWork =
doWork1 ()
|> Future.bind (fun () -> doWork2 ())
|> Future.bind (fun () -> doWork3 ())
|> ...
|> Future.bind (fun () -> doWorkN ())
let doManyWorkWithResults =
doWork1 ()
|> Future.bind (fun val1 -> doWork2 val1)
|> Future.bind (fun val2 -> doWork3 val2)
|> ...
|> Future.bind (fun valPrevN -> doWorkN valPrevN)
Однако, ситуация сильно усложняется, если единицы работы зависят от результатов друг друга.
let doManyWorkWithCrossResults =
doWork1 ()
|> Future.bind (fun val1 ->
doWork2 val1
|> Future.bind (fun val2 ->
doWork3 val1 val2
|> Future.bind (fun val3 -> ...)))
Future.bind позволяет соединять асинхронные вычисления в последовательную цепочку, и выполнять асинхронную операцию за операцией. Этот процесс можно упростить используя F# Computation Expressions, о чем будет описано ниже. Однако перед этим стоит рассмотреть еще несколько комбинаторов.
// Преобразование значения
let map = Future.map (fun n -> n.ToString()) (Future.ready 12)
// Игнорирование значения
let unitFuture = Future.ignore (Future.ready 12)
// Параллельный запуск с ожиданием обоих (ждет 1000 мс)
let merge = Future.merge (Future.sleepMs 1000) (Future.sleepMs 500)
// Параллельный запуск с получением первого выполненного значения и отменой оставшегося
// (Ждет 500 мс)
let first = Future.first (Future.sleepMs 1000) (Future.sleepMs 500)
// Преобразует Future<Future<'a>> в Future<'a>
let join = Future.join (Future.ready (Future.ready 12))
// Ловит исключение вложенной Future, возвращает Result<'a, exn>
let catch = Future.catch (Future.lazy (fun () -> failwith "exception"))
Создание Future используя Future CE
Future имеет свой CE, который используется также как async или task CE встроенные в F#. Более подробно о CE вы можете прочитать на сайте.
Например, мы можем заменить базовые функции создания на future CE:
let ready = future { return "Hello, world!" } // ~ Future.ready "Hello, world!"
let lazy' = future { return (foo ()) } // ~ Future.lazy' (fun () -> foo ())
Наиболее важным свойством CE является упрощение работы с bind. Пример чтения-записи можно переписать используя CE так:
// readFileAsync: filePath: string -> Future<string>
// writeFileAsync: filePath: string -> content: string -> Future<unit>
let readAndWriteFuture = futur {
let! content = readFileAsync "my-file.txt"
return! writeFileAsync "other-file.txt" content
}
Видимым преимуществом CE является возможность "уплощить" цепочка bind, зависимых между собой. Пример множественно зависимых bind можно переписать так:
let doManyWorkWithCrossResults = future {
let! val1 = doWork1 ()
let! val2 = doWork2 val1
let! val3 = doWork3 val1 val2
...
let! valN = doWorkN val1 val2 ... valPrevN
}
Также CE добавляют синтаксис и для Future.merge или Future.catch комбинаторов.
let parallelCE = future {
let! val1 = doWork1 ()
and! val2 = doWork2 ()
and! val3 = doWork3 ()
}
let catchCE = future {
try
do! doWork ()
with ex ->
printfn $"{ex}"
}
let tryFinally = future {
try
do! doWork ()
finally
do finallize ()
}
Создание Future из Async и Task
Существующие Async и Task можно преобразовать в Future и использовать результат их работы. Исходные Async и Task будут запущены на своих родных системах запуска, но их результат будет передан через возвращенную Future.
let asyncToFuture = Future.ofAsync (async { ... })
let taskToFuture = Future.ofTask (task { ... })
Возможны и обратные преобразования. При этом Future будут запущены на механизме запуска соответствующего примитива при запуске этого примитива.
let futureToAsync = Future.ofAsync (async { ... })
let futureToTask = Future.ofTask (task { ... })
Создание Future ручной реализацией Future
Future это всего-лишь интерфейс с методами Poll и Drop. Можно создать свою Future просто реализовав их.
Ручная реализация Future корректным образом не такая тривиальная задача, требующая ручной реализации конечного или не очень автомата. Поэтому не рекомендуется делать это, только если Вы не разрабатываете API для использования механизма асинхронности на низком уровне.
Объяснения и более подробные примеры следует искать в более продвинутых главах.
Запуск Future
Запуск Future без среды исполнения на текущем потоке
Future можно запустить на текущем потоке используя Future.runBlocking
.
Переданная Future запустится, а вызывающий поток будет заблокирован
пока не получится результат.
let fut = future {
let! name = Future.ready "Alex"
do! Future.sleepMs 1000
return $"Hello, {name}!"
}
let phrase = fut |> Future.runBlocking
printfn $"{phrase}"
Запуск Future используя Runtime
Future можно запустить на Runtime. Runtime это планировщик для нескольких параллельно выполняющихся Future, не используя Future.merge и снимая его ограничения (Future скомбинированные используя Future.merge никогда не выполняются по-настоящему параллельно).
Запустить Future на планировщике можно используя его метод Spawn.
let fut = future { ... }
let fTask = ThreadPoolRuntime.Instance.Spawn(fut)
Spawn возвращает объект запущенной задачи (IFutureTask<'a>). Используя экземпляр запущенной задачи можно преобразовать её в ожидающую выполнения Future используя Await, или прервать её выполнение через Abort. Если задача была прервана, ожидающая Future выбросит исключение при своем запуске.
future {
let fTask = ThreadPoolRuntime.Instance.Spawn(future { ... })
do! doOtherWork ()
let! fTaskResult = fTask.Await()
}
По-умолчанию Await создает Future, вызывающую Abort при своем Drop.
Это можно переопределить вызвав Await с флагом background=true (fTask.Await(true)
).
Детали реализации
Дизайн данной библиотеки вдохновлен дизайном Future
из языка Rust.
Однако сейчас содержит некоторые значимые изменения.
Принцип работы типа Future
Весь мeханизм организации асинхронного вычисления на основе Future
сводится к опросу
самой Future
путём вызова Poll: IContext -> Poll<'a>
до завершения её вычисления.
Future
может быть отброшена вызовом Future.Drop: unit -> unit
Результатом опроса является текущее состояние вычисления Future
:
Pending
- вычислениеFuture
находится в процессе и еще не завершено.Ready x
- значение вычислено и опрос больше не требуется.Transit fut
- говорит о подмене текущейFuture
другой такого же типа. Уменьшает глубину опроса и предназначен для оптимизации bind выражаемых конструкций.
Если Future
не совершает асинхронную работу и может сразу вернуть
результат(возможно с некоторой работой), то при первом опросе должен возвращаться Ready
.
В противном случае Future
должна запомнить переданный Context
и вызвать его метод Wake
в случае завершения асинхронной работы, чтобы планировщик пробудил её от ожидания этой работы.
При написании Future стоит учитывать, что пока в ней выполняются блокирующие операции -- блокируется поток планировщика, на котором она запушена. Для долгих блокирующих операций (когда это возможно, например, в циклах которые сами не возвращают управление) можно применить
Future.yieldWorkflow
, который возвращает поток планировщику одновременно с пробуждениемFuture
.
Дизайн отбрасывания Future
прокидывает отбрасывание ко всем дочерним Future
независимо от способа их создания.
Это позволяет отбрасывать запущенную на планировщие Future
через IFutureTask
(который может быть преобразован в Future
).
А также отбрасывать любую Future
в альтернативах (например внутри Future.first
).
Ожидаемые инварианты Future
-
Future
полностью синхронна извне: методыPoll
иDrop
взаимно не потоко-безопасны и не должны вызываться одновременно (в т.ч. нельзя вызывать у однойFuture
методPoll
илиDrop
несколько раз одновременно). -
Future
может опрашиваться (Poll
) повторно, даже если она не пробудилась. -
Future
может пробуждаться несколько раз до повторного опроса. -
Future
считается терминальной (т.е. выполневшейся) после:- Вызова
Poll
с результатомReady x
, - Вызова
Poll
с результатомTransit fut
- Вызова
Poll
с броском исключения (логический эквиваленReady (Error exn)
) - Вызова
Drop
(в т.ч. с броском исключения, хотя бросать исключения вDrop
нельзя)
- Вызова
-
Когда
Future
переходит в терминальное состояние, она должна остановить (или отвязать от себя) её запущенные фоновые процессы и освободить занятые резурсы. -
После достижения терминального состояния,
Future
не должна больше использоваться (т.е. не должны вызыватьсяPoll
иDrop
) -
Future
должна быть доведена до терминального состояния, либо посредством окончания опроса, либо отменой, чтобы предотвратить утечку ресурсов и не привести к вечному блокированию. -
Контекст для
Future
должен быть одним и тем же от первого опроса, до терминального состояния.Future
достаточно сохранить контекст при первом опросе и не перезаписывать его. (Можно сказать, что каждый контекст соответствует одному запуску.) -
Контекст каждой запущенной
Future
должен быть уникальным. Мотивация этого поведения в том чтобы предотвратить пробуждение новыхFuture
в планировщике при утечке контекста старыми. -
При достижении терминального состояния
Future
не следует пробуждаться, т.е. следует позаботиться о том, чтобыWake
её контекста не мог больше использоваться. Однако повторное пробуждение после достижения терминального состояния допустимо по вышеописанному правилу. -
Drop
не должен бросать исключения и должен отработать корректно, чтобы произвести освобождение ресурсов. (РеализациямFuture
НЕ нужно заботиться об этом, но если рансай)
Примечания
Для реализаций
Future
любое поведение не следующее инвариантам -- UB, и может как обрабатываться особым способом, так и игнорироваться.
IStream
по сути является расширением моделиFuture
, поэтому инвариантыFuture
как правило верны и дляIStream
В отличие от реализации в Rust, я не заставляю Future обновлять последний сохраненный ею IContext при каждом опросе. Преимущества на обоих платформах мне не очевидны, а это ощутимо усложняет реализацию. Поэтому это может измениться в будущем.
Методы
Future
не предназначены для ручного вызова, т.к. требуют с собой деликатного обращения. По этой причинеIFutureTask
не наследуетFuture
, а имеет отображение в неё
Каждая
Future
может быть использована как-либо (забайнджена, запихнула в комбинатор, запущена) только ОДИН раз
Есть некоторая коллизия с тем, что Future может выкидывать исключения исключения как предметные, так и свои. Например, неявным результатом IVar может быть любое исключение. И одно из них это FutureTerminatedException. В этой точке будет не так прозрачно, исключение произошло в читающей Future или в записывающей. Но 0. Исключениям можно, у них сложный переезд из места появление в абсолютно случайное
- Возможные решения слишком муторные и не стоят того из-за исключения, которое означает проблему в библиотечном, а не пользовательском коде