框架学习RxSwift2.0|框架学习RxSwift2.0 Observable创建

1.常规操作导入RXSwift,参看https://www.jianshu.com/p/b73231a29949
Rx 是 ReactiveX 的缩写,简单来说就是基于异步 Event(事件)序列的响应式编程。
Rx 可以简化异步编程方法,并提供更优雅的数据绑定。让我们可以时刻响应新的数据同时顺序地处理它们。
函数响应式编程 = 函数式编程 + 响应式编程
函数式: (Functional Programming) 把运算过程尽量写成一系列嵌套的函数调用
响应式编程: 简称RP(Reactive Programming),响应式编程是一种面向数据流和变化传播的编程范式。
【框架学习RxSwift2.0|框架学习RxSwift2.0 Observable创建】在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。
在iOS开发中我们经常会响应一些事件button、tap、textField、textView、notifaction、KVO、NSTimer等等这些,都需要做响应监听,响应后都需要在对应的响应事件中去做处理,而原生开发中,触发对象与响应方法是分离的,如button的初始化和点击响应方法是分离的。

override func viewDidLoad(){ button = UIButton.init(frame: CGRect(x: 40, y: 100, width: width-80, height: 40)) button.backgroundColor = .gray button.setTitle("按钮", for: .normal) self.view.addSubview(button) button.addTarget(self, action: #selector(clickBtn(button:)), for: .touchUpInside) } //button event @objc func clickBtn(button:UIButton) { print("点击") }

RxSwift按钮点击事件的实现
... let bag = DisposeBag() .... self.button.rx.tap//序列 .subscribe(onNext: { () in//订阅 print("Button clicked!") }, onError: { (error) in print("错误信息") }, onCompleted: { print("订阅完成") }) .disposed(by: bag)//销毁

为什么按钮可以.rx呢?
在RxSwift项目中搜索Reactive.swift文件,协议ReactiveCompatible
有rx属性,extension NSObject: ReactiveCompatible表示NSObject类及其子类实现协议ReactiveCompatible,所以NSObject类及其子类都可以.rx
//Reactive.swift... public protocol ReactiveCompatible { /// Extended type associatedtype CompatibleType/// Reactive extensions. static var rx: Reactive.Type { get set }/// Reactive extensions. var rx: Reactive { get set } }import class Foundation.NSObjectextension NSObject: ReactiveCompatible { }

RxSwift 全称ReactiveX for Swift,是一个简化异步编程的框架,实现了函数响应式编程,事件与对象紧密联系,业务流清晰,便于管理。在RxSwift中,所有异步操作(事件)和数据流均被抽象为可观察序列的概念。
RXSwift核心: 响应式编程的核心就是可监听序列的产生以及如何监听。
名词解释:
Observable:是一个可监听序列 作用是产生事件
可以理解为观察者模式里的被观察者
Observer: 序列的监听(订阅)者 作用是消费事件
可以理解为观察者模式里的观察者
Disposable: 可被清除的资源
简单梳理下上面的流程
可监听序列被订阅了,就会调用Producer的subscribe方法
调用self的run方法,当前我们的self是Producer的子类AnonymousObservable
创建AnonymousObservableSink并将订阅者observer传进去,接着调用AnonymousObservableSink实例的run,参数是AnonymousObservable实例。
AnonymousObservableSink实例的run又调用AnonymousObservable的_subscribeHandler闭包,参数是AnyObserver,AnyObserver保存了事件的回调,而_subscribeHandler闭包也就是可监听序列创建时传的闭包。
执行事件
以next事件为例
如:订阅onNext事件。
代码:
在creat尾随闭包中的onNext具体实现代码是
public func onNext(_ element: E) { self.on(.next(element)) }

内部流程如下:
调用observer的onNext方法,接着调用observer的on(_ event: Event)并将next事件传进去
执行AnyObserver的observer的事件回调
执行AnonymousObservableSink的func on(_ event: Event)方法,接着调用ObserverBase的func on(_ event: Event)让真正的Observer类去响应事件
调用AnonymousObserver的onCore方法去执行真正的事件回调。
序列例子:
1.创建序列 let ob = Observable.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("框架班级") obserber.onCompleted() //obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil)) return Disposables.create() }//2.订阅 let _ = ob.subscribe(onNext: { (text) in //4.订阅到 print("订阅到:\(text)") }, onError: { (error) in print("error: \(error)") }, onCompleted: { print("完成") }) { print("销毁") }

代码的执行顺序不按照代码行数一行一行的执行,说明这里含有闭包。
看具体创建序列的流程:
creat方法
extension ObservableType {/** Creates an observable sequence from a specified subscribe method implementation.- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method. - returns: The observable sequence with the specified implementation for the `subscribe` method. */ public static func create(_ subscribe: @escaping (RxSwift.AnyObserver) -> RxSwift.Disposable) -> RxSwift.Observable }

Create.swift文件中的源码实现
extension ObservableType { // MARK: create/** Creates an observable sequence from a specified subscribe method implementation.- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method. - returns: The observable sequence with the specified implementation for the `subscribe` method. */ public static func create(_ subscribe: @escaping (AnyObserver) -> Disposable) -> Observable { return AnonymousObservable(subscribe) } }

在Create.swift文件中实现的方法ObservableType.create,
extension ObservableType { // MARK: createpublic static func create(_ subscribe: @escaping (AnyObserver) -> Disposable) -> Observable { return AnonymousObservable(subscribe) } }

上面代码可以看出creat方法返回了一个AnonymousObservable(匿名序列)
接着看AnonymousObservable类
final private class AnonymousObservable: Producer { typealias SubscribeHandler = (AnyObserver) -> Disposablelet _subscribeHandler: SubscribeHandlerinit(_ subscribeHandler: @escaping SubscribeHandler) { //初始化时候带的参数,这里保存了一个闭包,用于subscribe回调 self._subscribeHandler = subscribeHandler }override func run(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } }

这里的AnonymousObservable类并没有subscribe方法,但是父类Producer有
lass Producer : Observable { override init() { super.init() }override func subscribe(_ observer: O) -> Disposable where O.E == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer } } }

创建序列creat方法-生成AnonymousObservable序列-保存subscribeHandler
到此整个创建序列的流程结束。
接下来是订阅流程:
ob.subscribe方法的实现
extension ObservableType { public func subscribe(onNext: ((Self.E) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> RxSwift.Disposable }

而方法实现的源码在ObservableType+Extensions.swift文件中
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable {...//创建observer ,event从AnonymousObserver()构建 let observer = AnonymousObserver { event in#if DEBUG synchronizationTracker.register(synchronizationErrorMessage: .default) defer { synchronizationTracker.unregister() } #endifswitch event { //对next,error,complated,闭包进行初始化 //只要观察者observer调用了event的.next事件,这里就会调用订阅事件.onNext case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), //回调了当前序列(ob)的_subscribeHandler闭包,由于AnonymousObserable类中没有subscribe回调,就使用其父类Produce的subscribe方法,调用子类的实现 disposable ) }

在AnonymousObserver.swift文件中的实现
final class AnonymousObserver : ObserverBase { typealias Element = ElementTypetypealias EventHandler = (Event) -> Voidprivate let _eventHandler : EventHandlerinit(_ eventHandler: @escaping EventHandler) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif self._eventHandler = eventHandler //保存了包含事件 的闭包 }override func onCore(_ event: Event) { return self._eventHandler(event) }#if TRACE_RESOURCES deinit { _ = Resources.decrementTotal() } #endif }

实际流程参看下图
框架学习RxSwift2.0|框架学习RxSwift2.0 Observable创建
文章图片
3.png 基本UI事件监听代码实现:https://gitee.com/xgkp/RX2.0login.git
参考来源:
1.https://www.jianshu.com/p/3617ab385060
2.https://blog.csdn.net/qq_18951479/article/details/96832932
3.https://www.jianshu.com/p/c9f854718933

    推荐阅读