上一篇解讀了ReactiveCocoa
的三個重要的類的底層實現,本篇繼續。
RACMulticastConnection
: 用於當一個信號被多次訂閱時,為了保證創建信號時,避免多次調用創建信號的block造成副作用,可以使用該類處理,保證創建信號的block執行一次。
// 創建信號
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id subscriber) {
NSLog(@"發送請求");
[subscriber sendNext:@1];
return nil;
}];
// 創建連接
RACMulticastConnection *connect = [signal publish];
// 訂閱連接的信號
[connect.signal subscribeNext:^(id x) {
NSLog(@"connect 第一次訂閱信號: %@", x);
}];
[connect.signal subscribeNext:^(id x) {
NSLog(@"connect 第二次訂閱信號: %@", x);
}];
// 連接
[connect connect];
1.創建connect,connect.sourceSignal -> RACSignal(原始信號) connect.signal -> RACSubject
2.訂閱connect.signal,會調用RACSubject的subscribeNext,創建訂閱者,而且把訂閱者保存起來,不會執行block。
3.[connect connect]內部會訂閱RACSignal(原始信號),並且訂閱者是RACSubject
3.1.訂閱原始信號,就會調用原始信號中的didSubscribe
3.2 didSubscribe,拿到訂閱者調用sendNext,其實是調用RACSubject的sendNext
4.RACSubject的sendNext,會遍歷RACSubject所有訂閱者發送信號。
4.1 因為剛剛第二步,都是在訂閱RACSubject,因此會拿到第二步所有的訂閱者,調用他們的nextBlock
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe
// RACDynamicSignal.m
+ (RACSignal *)createSignal:(RACDisposable * (^)(id subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
//將代碼塊保存到信號裡面(但此時僅僅是保存,沒有調用),所以信號還是冷信號
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
[signal publish]
// RACSignal+Operations.m
- (RACMulticastConnection *)publish {
// 創建訂閱者
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
// 創建connection,參數是剛才創建的訂閱者
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}
// RACMulticastConnection.m
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
if (self == nil) return nil;
// 保存原始信號
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
// 保存訂閱者,即_signal是RACSubject對象
_signal = subject;
return self;
}
(RACDisposable *)subscribeNext:(void (^ )(id x))nextBlock;
// RACSignal.m
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
// RACSubscriber.m
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
// RACSubject.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}
[connect connect];
// RACMulticastConnection.m
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
// 訂閱原生信號
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}
// RACDynamicSignal.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
// RACSubject.m
- (void)sendNext:(id)value {
// 遍歷_subscribers數組,執行nextBlock
[self enumerateSubscribersUsingBlock:^(id subscriber) {
[subscriber sendNext:value];
}];
}
RACMulticastConnection
利用RACSubject
實現了創建信號的block
只執行一次的功能。對於需要對此訂閱信號,但是不希望多次創建信號的應用場合,可以RACMulticastConnection
解決。
RACCommand
類用來表示動作的執行, 是對動作觸發後的連鎖事件的封裝。常用在封裝網絡請求,按鈕點擊等等場合。
RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(id input) {
return [RACSignal createSignal:^RACDisposable *(id subscriber) {
if (/* DISABLES CODE */ (YES)) {
// 正常發送數據,必須發送完成信號
[subscriber sendNext:@"Smile"];
[subscriber sendCompleted];
} else {
// 發送錯誤信號
[subscriber sendError:[NSError errorWithDomain:@"Network failed" code:0005 userInfo:nil]];
}
// 信號被銷毀前,做一些清理的工作;如果不需要,可以 return nil
return [RACDisposable disposableWithBlock:^{
NSLog(@"信號被銷毀了");
}];
}];
}];
// 執行信號並訂閱
[[command execute:nil] subscribeNext:^(id x) {
NSLog(@"receive data: %@", x);
}];
RACCommand底層實現
1. 創建命令,保存signalBlock
2. 執行命令
* 2.1 調用signalBlock
* 2.2 創建connect,傳入RACReplaySubject對象,然後連接信號
3. 訂閱信號
* 3.1 創建訂閱者,保存到RACReplaySubject對象的_subscribers數組中
* 3.2 遍歷valuesReceived數組,調用訂閱者發送數據
- (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock
// RACCommand.m
- (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
}
- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil);
self = [super init];
if (self == nil) return nil;
_activeExecutionSignals = [[NSMutableArray alloc] init];
// 保存創建信號的block
_signalBlock = [signalBlock copy];
......
}
- (RACSignal *)execute:(id)input
// RACCommand.m
- (RACSignal *)execute:(id)input {
// `immediateEnabled` is guaranteed to send a value upon subscription, so
// -first is acceptable here.
BOOL enabled = [[self.immediateEnabled first] boolValue];
if (!enabled) {
NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
RACUnderlyingCommandErrorKey: self
}];
return [RACSignal error:error];
}
RACSignal *signal = self.signalBlock(input);
......
// 創建連接,用RACReplaySubject作為訂閱者
RACMulticastConnection *connection = [[signal
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]];
......
// 連接信號
[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, [input rac_description]];
}
// RACMulticastConnection.m
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
// 執行創建信號的block
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock
// RACSignal.m
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
// RACReplaySubject.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
// 調用訂閱者,發送數據 "Smile"
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
// 調用父類方法,保存訂閱者到_subscribers數組
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
RACCommand
用來封裝事件時,還可以訂閱信號(executionSignals
)、訂閱最新信號(switchToLatest
)、跳過幾次信號(skip
)或信號是否正在執行(executing
),在執行信號時,還可以監聽錯誤信號和完成信號,請參考demo例子。
ReactiveCocoa
框架的源碼分析暫告一段落,如有分析不足之處,歡迎互相交流。
Demo地址:
RACDemo