Каждый разработчик слышал об Rx, будь то на последней конференции разработчиков или во время чтения публикации в блоге вроде этой 😎. Практически невозможно умудриться не услышать о реактивном программировании, но что же оно всё-таки собой представляет? Заглянем в интернет:
Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений. Это означает, что должна существовать возможность легко выражать статические и динамические потоки данных с помощью применяемых языков программирования, а также то, что нижележащая модель исполнения должна автоматически распространять изменения благодаря потоку данных. — Википедия
Если честно, я считаю, что большинству людей, как и мне, такое объяснение не даст ни малейшего представления о том, чем, собственно, является реактивное программирование. Поэтому я попытаюсь создать своё, простое и понятное введение в этот современный подход к разработке ПО, применяя версию Rx для языка “Swift” — RxSwift.
- Наблюдаемые последовательности 🎞
Первое, что нужно понять, — всё в RxSwift является либо наблюдаемой последовательностью, либо чем-то, что оперирует распространяемыми наблюдаемой последовательностью событиями или подписывается на них.
Массивы, строки и словари в RxSwift преобразуются в наблюдаемые последовательности. Вы можете создать наблюдаемую последовательность из любого объекта, соответствующего протоколу Sequence стандартной библиотеки языка Swift.
Попробуем создать несколько наблюдаемых последовательностей:
let helloSequence = Observable.just("Hello Rx")
let fibonacciSequence = Observable.from([0,1,1,2,3,5,8])
let dictSequence = Observable.from([1:"Hello",2:"World"])
Подписаться на наблюдаемые последовательности можно, вызвав
subscribe(on:(Event)-> ())
Переданный блок будет принимать уведомления обо всех событиях, распространяемых данной последовательностью.
let helloSequence = Observable.of("Hello Rx")
let subscription = helloSequence.subscribe { event in
print(event)
}
OUTPUT:
next("Hello Rx")
completed
Наблюдаемые последовательности могут распространять ноль или более событий за время своего существования.
В RxSwift событие — это просто перечисление (Enumeration Type) с тремя возможными состояниями:
.next(value: T) — Когда в наблюдаемой последовательности возникает новое значение или набор значений, она отправляет событие “next” своим подписчикам, как вы могли видеть в примере выше. Ассоциированное значение будет содержать актуальное значение из последовательности.
.error(error: Error) — При обнаружении ошибки последовательность распространит событие “error”. Также это приведёт к уничтожению последовательности.
.completed — Если последовательность завершилась нормально, она отправляет подписчикам событие “completed”.
let helloSequence = Observable.from(["H","e","l","l","o"])
let subscription = helloSequence.subscribe { event in
switch event {
case .next(let value):
print(value)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
OUTPUT:
H e l l o
completed
Если вы хотите отменить подписку, вы можете сделать это, вызвав её метод dispose. Вы также можете добавить подписку в Disposebag (“корзину”), которая автоматически отменит подписку при выполнении метода deinit экземпляра DisposeBag. Кроме того, вы можете подписаться только на конкретное событие. Например, если вы хотите просто получать сообщения об ошибках, распространяемые последовательностью, вы можете использовать subscribe(onError:(Error->())).
Фрагмент кода, иллюстрирующий всё, что мы уже успели обсудить:
// Creating a DisposeBag so subscribtion will be cancelled correctly
let bag = DisposeBag()
// Creating an Observable Sequence that emits a String value
let observable = Observable.just("Hello Rx!")
// Creating a subscription just for next events
let subscription = observable.subscribe (onNext:{
print($0)
})
// Adding the Subscription to a Dispose Bag
subscription.addDisposableTo(bag)
2. Subject’ы 📫
Subject — это особый вид наблюдаемой последовательности, вы можете подписаться и динамически добавлять к нему элементы. В настоящее время существует 4 различных вида Subject’ов в RxSwift
- PublishSubject: Подписавшись на него, вы будете получать уведомления обо всех событиях, которые произойдут после вашей подписки.
- BehaviourSubject: Предоставляет подписчику самый последний элемент и всё, что будет распространяться этой последовательностью после подписки.
- ReplaySubject: Если вы хотите воспроизвести для новых подписчиков больше, чем только последний элемент, вам следует использовать ReplaySubject. С ReplaySubject вы можете задавать, сколько последних элементов следует распространять для новых подписчиков.
- Variable: Это просто обёртка BehaviourSubject, легче воспринимаемая теми, кто мало знаком с реактивным программированием. Может использоваться как обычная переменная.
В рамках данной статьи я рассматриваю только работу PublishSubject. Обратитесь к дополнительному материалу на GitHub’е, если вы хотите узнать больше о других subject-типах. Они отличаются, в основном, только количеством прошлых событий, распространяемых и получаемых подписчиками по первоначальной подписке.
Publish: 0
Behaviour & Variable: 1
Replay: N
Рассмотрим PublishSubject.
Вначале нам нужно создать экземпляр PublishSubject. Делается это очень просто: можем использовать для этого инициализатор по умолчанию.
let bag = DisposeBag()
var publishSubject = PublishSubject()
Мы можем добавлять новые значения к этой последовательности с помощью функции onNext(). onCompleted() завершает последовательность, а onError(error) приводит к распространению события-ошибки. Давайте добавим некоторые значения в наш PublishSubject.
publishSubject.onNext("Hello")
publishSubject.onNext("World")
Если вы подпишетесь на этот subject после добавления “Hello” и “World” с помощью onNext(), вы не получите два этих значения через события. В отличие от BehaviourSubject, который бы получил “World”, поскольку это самое последнее событие.
Теперь давайте создадим подписку и добавим новые значения в Subject.
Ещё мы создадим вторую подписку и добавим ещё значений.
Обратите внимание на комментарии, чтобы понять, что именно происходит в коде.
let subscription1 = publishSubject.subscribe(onNext:{
print($0)
}).addDisposableTo(bag)
// Subscription1 receives these 2 events, Subscription2 won't
publishSubject.onNext("Hello")
publishSubject.onNext("Again")
// Sub2 will not get "Hello" and "Again" because it susbcribed later
let subscription2 = publishSubject.subscribe(onNext:{
print(#line,$0)
})
publishSubject.onNext("Both Subscriptions receive this message")
Мои поздравления 🎉. Если вы дочитали до этого момента, значит вы ознакомились с основами RxSwift. Многое ещё предстоит изучить, но в Rx всё базируется на этих простых принципах. Можете сделать сейчас небольшой перерыв и немного поэкспериментировать с этими принципами, чтобы как следует их понять. Если вы готовы, то давайте продолжим, поскольку впереди ещё очень много интересного.
3. Marble Diagrams 🙌🏼
Если вы работаете с RxSwift или вообще с Rx, вам стоит освоить Marble Diagrams (“шариковые диаграммы”). Эти диаграммы визуализируют изменения в наблюдаемых последовательностях. Диаграмма состоит из входящего потока (input stream) сверху, исходящего потока (output stream) снизу и фактической преобразующей функции посередине.
Например, давайте рассмотрим операцию, которая задерживает распространяемые события наблюдаемой последовательности на 150 миллисекунд. Не обращайте внимания на параметр “scheduler”, о нём я расскажу немного позже:
Достаточно просто для понимания, да?
Существуют замечательные open source проекты как для iOS, так и для Android, позволяющие поэкспериментировать в интерактивном режиме с такими диаграммами на ваших мобильных устройствах. Поиграйтесь с ними, и обещаю, что вы разберётесь в Rx очень быстро.
Веб-приложение:http://rxmarbles.com
iOS-приложение: https://itunes.apple.com/com/app/rxmarbles/id1087272442
Android: https://goo.gl/b5YD8K
4. Преобразования ⚙️
Иногда нам может потребоваться преобразовать, объединить или отфильтровать элементы, распространяемые наблюдаемой последовательностью до того, как их получит подписчик. Я познакомлю вас с базовыми операторами преобразования, расскажу кое-что о фильтрах и о возможностях объединения последовательностей. В заключение я покажу, как производить преобразование, объединение и др. в разных потоках.
Начнём.
4.1 Map
Чтобы преобразовать распространяемые наблюдаемой последовательностью элементы до того, как они попадут к своим подписчикам, необходимо использовать оператор map. Допустим, имеется преобразование, умножающее каждое значение последовательности на 10 перед распространением.
Observable.of(1,2,3,4).map { value in
return value * 10
}.subscribe(onNext:{
print($0)
})
OUTPUT: 10 20 30 40
4.2 FlatMap
Представим наблюдаемую последовательность, состоящую из объектов, которые сами являются наблюдаемыми, и допустим, что мы хотим создать из них новую последовательность. В таких случаях в игру вступает FlatMap. FlatMap объединит распространение итоговых наблюдаемых и распространит эти объединённые результаты как отдельную последовательность.
let sequence1 = Observable.of(1,2)
let sequence2 = Observable.of(1,2)
let sequenceOfSequences = Observable.of(sequence1,sequence2)
sequenceOfSequences.flatMap{ return $0 }.subscribe(onNext:{
print($0)
})
OUTPUT: 1 2 1 2
4.3 Scan
Scan принимает исходное значение и используется для агрегирования значений аналогично reduce в Swift’е.
Observable.of(1,2,3,4,5).scan(0) { seed, value in
return seed + value
}.subscribe(onNext:{
print($0)
})
OUTPUT: 1 3 6 10 15
4.4 Buffer
Оператор Buffer преобразует наблюдаемого, распространяющего элементы, в наблюдаемого, который распространяет буферизированные коллекции этих элементов.
5.Filter 🚬
Если на следующие события необходимо реагировать только исходя из определённого условия, используйте оператор filter.
5.1 Filter
Базовая операция фильтрации работает аналогично своему эквиваленту в Swift’е. Вы просто задаёте условие, которое должно соблюдаться, и если условие удовлетворено, то событие .next будет распространено среди его подписчиков.
Observable.of(2,30,22,5,60,1).filter{$0 > 10}.subscribe(onNext:{
print($0)
})
OUTPUT: 30 22 60
5.2 DistinctUntilChanged
Если вы хотите распространять события next только в случае, если значение изменилось, используйте distinctUntilChanged.
Observable.of(1,2,2,1,3).distinctUntilChanged().subscribe(onNext:{
print($0)
})
OUTPUT: 1 2 1 3
Другие операторы фильтра, которые стоит опробовать:
- Debounce
- TakeDuration
- Skip
6. Combine 💑
Объединение последовательностей — широко распространённая задача. RxSwift предоставляет для этого множество операторов. Вот три из них:
6.1 StartWith
Если вы хотите, чтобы наблюдаемый распространял определённую последовательность элементов до того, как начнёт распространять элементы, которые от него ожидаются в обычном режиме, используйте оператор startWith.
Observable.of(2,3).startWith(1).subscribe(onNext:{
print($0)
})
OUTPUT: 1 2 3
6.2 Merge
Существует возможность объединить выдачу от нескольких наблюдаемых с помощью оператора Merge, чтобы они вели себя как единый наблюдаемый.
let publish1 = PublishSubject()
let publish2 = PublishSubject()
Observable.of(publish1,publish2).merge().subscribe(onNext:{
print($0)
})
publish1.onNext(20)
publish1.onNext(40)
publish1.onNext(60)
publish2.onNext(1)
publish1.onNext(80)
publish2.onNext(2)
publish1.onNext(100)
OUTPUT: 20 40 60 1 80 2 100
6.3 Zip
Используйте метод Zip, если хотите объединить распространяемые разными наблюдаемыми последовательностями элементы в одну наблюдаемую последовательность. Zip срабатывает в строгой последовательности, то есть первыми двумя элементами, распространёнными Zip’ом, будет комбинация первого элемента первой последовательности и первого элемента второй последовательности. Также имейте в виду, что Zip распространит только количество элементов, равное наименьшему числу элементов, распространяемых исходными наблюдаемыми.
let a = Observable.of(1,2,3,4,5)
let b = Observable.of("a","b","c","d")
Observable.zip(a,b){ return ($0,$1) }.subscribe {
print($0)
}
OUTPUT: (1, "a")(2, "b") (3, "c") (4, "d")
Другие операторы комбинирования, которые стоит опробовать:
- Concat
- CombineLatest
- SwitchLatests
7. Сторонние действия 👉
Если вы хотите зарегистрировать callback’и, которые будут исполняться при определённых событиях в наблюдаемой последовательности, используйте оператор doOn. Он не изменяет распространяемые элементы, а просто передаёт их.
Вы можете использовать:
- do(onNext:) - если хотите что-то делать только по событию next
- do(onError:) - если будут распространяться ошибки и
- do(onCompleted:) - для успешного завершения последовательности.
Observable.of(1,2,3,4,5).do(onNext: {
$0 * 10 // This has no effect on the actual subscription
}).subscribe(onNext:{
print($0)
})
8. Schedulers ⏰
Операторы работают в том же потоке, где была создана подписка. В RxSwift используются scheduler’ы (диспетчеры), чтобы заставить операторы производить свою работу в конкретной очереди. Также вы можете заставить подписку происходить в определённой очереди. Используйте subscribeOn и observeOn для таких задач. Если вы знакомы с концепцией operation-queues или dispatch-queue, то здесь для вас не будет ничего нового. Scheduler может быть последовательным (serial) или гоночным (concurrent) аналогично GCD или OperationQueue. В RxSwift есть пять типов scheduler’ов:
- MainScheduler — Применяется для работы, которая должна производиться в MainThread. В случае если schedule-методы вызываются из основного потока, действия будут исполнены сразу же, без диспетчеризации. Этот scheduler обычно применяется для работы с UI.
- CurrentThreadScheduler — Распределяет порции (units) работы в текущем потоке. Это scheduler по умолчанию для операторов, генерирующих элементы.
- SerialDispatchQueueScheduler — Применяется для работы, которая должна быть выполнена в определённом dispatch_queue_t. Проконтролирует преобразование concurrent-очереди, если таковая передана, в serial. Благодаря серийным scheduler’ам возможны определённые оптимизации observeOn. Основной scheduler является экземпляром SerialDispatchQueueScheduler.
- ConcurrentDispatchQueueScheduler — Применяется для работы, которая должна быть выполнена в определённом dispatch_queue_t. Вы также можете передать serial dispatch очередь, это не должно вызвать какие-либо проблемы. Этот диспетчер подходит для случаев, когда некоторая работа должна выполняться в фоновом режиме.
- OperationQueueScheduler — Применяется для работы, которая должна быть выполнена в определённом NSOperationQueue. Этот диспетчер подойдёт для случаев, когда имеется особенно большой кусок работы, которая должна быть выполнена в фоновом режиме, и вы хотите тонко настроить организацию гонки (concurrent processing), используя maxConcurrentOperationCount.
Вот пример кода, демонстрирующий, как можно наблюдать за чем-то в режиме гонки в фоновой очереди и при этом подписаться на главную очередь.
let publish1 = PublishSubject()
let publish2 = PublishSubject()
let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .background)
Observable.of(publish1,publish2)
.observeOn(concurrentScheduler)
.merge()
.subscribeOn(MainScheduler())
.subscribe(onNext:{
print($0)
})
publish1.onNext(20)
publish1.onNext(40)
OUTPUT: 20 40
Подытожим 🎁
Поздравляю, вы освоили основы RxSwift. Удачного программирования! 🎉
В скором будущем ожидайте статью “Изучите и освоите RxCocoa”…
* * *
Добавляйте меня в github'е, twitter'е, linkedin'е или xing'е, если у вас есть какие-то вопросы. Если вам нравится электронная музыка, можете также послушать мои треки на SoundCloud ;)
ИСПОЛЬЗОВАННЫЕ ИСТОЧНИКИ:
- https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Schedulers.md
- http://reactivex.io/documentation/operators.html
- http://rxmarbles.com
Автор: Sebastian Boldt
Дата публикации: 04.03.2017
Оригинал статьи
Перевод: Борис Радченко, radchenko.boris@gmail.com
Дата перевода: 23.10.2018