Что такое Reactive Extensions
Итак, продолжая тему Reactive, пробежимся по библиотеке Reactive Extensions. Как следует из первой же строки описания проекта, Reactive Extensions (Rx) - это библиотека для создания асинхронных, основанных на событиях, программ с помощью observable-последовательностей и LINQ-style операторов. Используя Rx, мы представляем асинхронные потоки данных абстракцией Observables, делаем к ним запросы посредством LINQ-операторов и параметризуем конкурентный доступ к асинхронных потокам данных с помощью Планировщиков (Schedulers). Итого, Rx = Observables + LINQ + Schedulers.
В каких случаях мне использовать Rx?
Далее я буду использовать довольно много текста из книги Lee Campbell “Introduction to Rx”, онлайн-версия которой доступна по ссылке.
Если вы работаете с предметной областью, где приходится оперировать с последовательностью событий, то Rx - то, о чем неплохо бы вспомнить.
Нужно использовать Rx
Rx создана для управления такими видами событий как:
- UI-события, то есть при разработке клиентских приложений
- Доменные события, такие как “Заказ оформлен”, “Заказ утвержден”, “Списание средств выполнено” и т.п. К этому приходят, например, применяя практику проектирования по предметной области (Domain-Driven Design, DDD)
- Всяческие инфраструктурные события, источником которых может быть filewatcher, WMI, ETW и т.п.
- События из broadcast-источников, таких как шины сообщений, всевозможные датчики (тут можно вспомнить про модные сейчас IoT-устройства) и прочее
- События из Complex event processing (CEP) сервисов
Можно использовать Rx для асинхронных вызовов методов, возвращающих Task
или Task<T>
, т.к. это есть ничто иное, как последовательность из одного элемента. Возможно также использование и с APM-методами вида BeginXXX
/EndXXX
.
Не стоит использовать Rx для выполнения задач, которые успешно решаются с помощью IEnumerable<T>
, тем самым заменяя pull-модель взаимодействия push-моделью.
Основы Rx
Как я отмечал в предыдущем посте, в основе Rx лежит паттерн Observer. В Rx есть два базовых интерфейса:
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
и
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
Все, что реализует IObservable<T>
стоит воспринимать как поток объектов типа T. Не нужно путать IObservable<T>
с также существующей в .NET абстракцией Stream
из неймспейса System.IO
, т.к. последняя поддерживает возможность чтения и записи, а также “просмотра” потока (seek). Тем не менее, в Rx присутствуют концепции следования по потоку (push), освобождения ресурсов (закрытия) и завершения работы с потоком (EOF). Однако, кроме этого Rx предоставляет возможность конкурентного доступа, трансформации потока, слияния потоков, агрегации и расширения (expanding).
Важно также, что observables - это не коллекции в привычном понимании. Значения из IObservable<T>
обычно не материализованы, как этого можно ожидать от коллекции. Таким образом, observables - это последовательности. Чтобы не путать их с последовательностями IEnumerable<T>
Lee Campbell предлагает воспринимать экземпляры IEnumerable<T>
как данные в статичном состоянии, в то время как экземпляры IObservable<T>
- данные в движении.
Также важно упомянуть, что IObservable<T>
является функционально сопряженным (functional dual) к IEnumerable<T>
. Напомню, как выглядят IEnumerable<T>
public interface IEnumerable
{
IEnumerator GetEnumerator();
}
public interface IEnumerable<out T> : IEnumerable
{
new IEnumerator<T> GetEnumerator();
}
и IEnumerator<out T>
public interface IEnumerator
{
bool MoveNext();
Object Current { get; }
void Reset();
}
public interface IEnumerator<out T> : IDisposable, IEnumerator
{
new T Current { get; }
}
Для тех, кто помнит функциональный анализ и хочет поглубже разобраться, рекомендую посмотреть это видео на Channel 9.
Если вкратце, то это значит, что также как IEnumerable<T>
позволяет реализовать три свойства - следующее значение (MoveNext()
и Current
), обработку исключений (обычный try/catch/finally и Dispose()
) и окончание последовательности (MoveNext()
, возвращающий false
) с помощью IEnumerator<T>
, так же и IObservable<T>
позволяет сделать то же самое с помощью IObserver<T>
- методы OnNext(T)
, OnError(Exception)
и OnCompleted()
соотвественно.
Rx предоставляет четкий контракт, которому необходимо следовать, если вы решите реализовывать базовые интерфейсы самостоятельно. В такой реализации стоит учитывать, что опционально может быть вызван метод OnNext(T)
и далее возможен (но не обязятелен) вызов одного из OnError(Exception)
или OnCompleted()
. То есть, если последовательность завершается, то обязательно вызовом либо OnError(Exception)
, либо OnCompleted()
. Однако, ни один из этих трех методов может быть и не вызван вовсе, что дает возможность реализации пустых или бесконечных последовательностей.
Subject
Rx конечно же предоставляет реализацию этих базовых интерфейсов, тем самым избавляя нас от необходимости делать это самостоятельно - это тип Subject<T>
. Этот тип реализует сразу оба интерфейса, предоставляя вызывающему коду целостно использовать концепцию publisher/subscriber (или publisher/consumer). Вот пример:
var subject = new Subject<string>();
subject.Subscribe(Console.WriteLine);
subject.OnNext("a");
subject.OnNext("b");
subject.OnNext("c");
Это рабочий код, так как в Rx есть множество extension-методов, в том числе метод Subscribe(Action<T>)
. В результате работы кода на консоль будет выведено abc
.
Но Subject<T>
- это самая простая имплементация. Существуют также еще три, существенно меняющие поведение при использовании каждой из них.
ReplaySubject
ReplaySubject<T>
- реализация, позволяющая “повторить” события для подписчиков, которые начали слушать последовательность не с начала. ReplaySubject<T>
, созданный при помощи конструктора по-умолчанию кэширует все значения и воспроизводит их для всех подписчиков. Однако, это может привести к memory pressure, поэтому сущеуствует конструктор, позволяющий задать размер буфера, то есть количество кэшируемых значений. Код ниже выведет на консоль bc
. Даже, если до вызова Subscribe(Action<T>)
вызвать OnCompleted()
.
var subject = new ReplaySubject<string>(2);
subject.OnNext("a");
subject.OnNext("b");
subject.OnNext("c");
subject.Subscribe(Console.WriteLine);
Существуют также конструкторы, позволяющие задать временно окно для кэширования, а также и то, и другое.
BehaviorSubject
BehaviorSubject<T>
- реализация, запоминающяя последнее опубликованное значение. Для BehaviorSubject<T>
требуется задать значение T
по-умолчанию. Отличие от ReplaySubject<T>
в том, что, во-первых, размер буфера равен 1 и, во-вторых, ничего не будет получено, если подписка была выполнена после завершения последовательности, то есть, например, вызова OnCompleted()
. Код ниже выведет на консоль bc
var subject = new BehaviorSubject<string>("x");
subject.OnNext("a");
subject.OnNext("b");
subject.Subscribe(Console.WriteLine);
subject.OnNext("c");
А этот код ничего не выведет
var subject = new BehaviorSubject<string>("x");
subject.OnNext("a");
subject.OnNext("b");
subject.OnCompleted();
subject.Subscribe(Console.WriteLine);
subject.OnNext("c");
AsyncSubject
AsyncSubject<T>
- реализация, очень похожая на BehaviorSubject<T>
. Она также кэширует последнее значение, однако публикует его только по окончанию последовательности. То есть этот код ничего не выведет
var subject = new AsyncSubject<string>();
subject.OnNext("a");
subject.OnNext("b");
subject.Subscribe(Console.WriteLine);
subject.OnNext("c");
а этот код выведет c
var subject = new AsyncSubject<string>();
subject.OnNext("a");
subject.OnNext("b");
subject.Subscribe(Console.WriteLine);
subject.OnNext("c");
subject.OnCompleted();
Интерфейс ISubject
Описанные выше типы реализуют IObservable<T>
и IObserver<T>
через специальный интерфейс
public interface ISubject<in TSource, out TResult> : IObserver<TSource>, IObservable<TResult>
{
}
Точнее даже через интерфейс с одинаковыми типами TSource
и TResult
public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>
{
}
Важно заметить, что все четыре реализации, описанные выше реализуют лишь контракт ISubject<T>
и не имеют общих базовых классов.
Также стоит упомянуть, что существует фабричный метод
public static ISubject<TSource, TResult> Create<TSource, TResult>(
IObserver<TSource> observer,
IObservable<TResult> observable)
{...}
позволяющий создать ISubject<TSource, TResult>
из имеющихся IObserver<TSource>
и IObservable<TResult>
.
Summary
В этом посте мы пробежались по самой сути Rx, а также посмотрели на имеющиеся реализации базовых контрактов. Однако, в продакшен коде очень редко придется использовать интерфейс IObserver<T>
и subject-типы. Тем не менее, их знание и понимание принципов работы очень важно для правильного использования Rx. Напротив, интерфейс IObservable<T>
очень широко используется в Rx в качестве возвращаемого значения, представляя тем самым последовательность данных в движении. Для этого интерфейса реализовано огромное количество extension-методов для возможности использования Rx в LINQ-стиле для оперирования этими последовательностями. Что это за возможности - рассмотрим одном из следующих постов.
Stay tuned!