英文原文:The GCD Handbook,譯者:walkingway(微博)
Grand Central Dispatch 大中樞派發或俗稱 GCD 是一件極其強大的武器。他為你提供了很多底層工具,比如隊列和信號量,你可以組合這些工具來達成自己想要的多線程效果。不幸的是,這些基於 C 的 API 晦澀難懂,並且想要將低級工具組合起來實現高抽象層級 API 的效果(譯者注:類似於 NSOperation)也不是一件容易的事。這篇文章,我來教大家如何利用 GCD 提供的工具來達成高抽象層級的行為。
在後台工作
這或許是 GCD 提供的最簡單的工具了,他能讓你在後台線程處理一些工作,然後返回主線程繼續,就如同 UIKit 的操作只能在主線程中進行。
在本指南中,我將使用 doSomeExpensiveWork() 函數來表示一些長期運行的任務,並最終返回一個結果。
根據這一思路,我們的套路一般是:
let defaultPriority = DISPATCH_QUEUE_PRIORITY_DEFAULT let backgroundQueue = dispatch_get_global_queue(defaultPriority, 0) dispatch_async(backgroundQueue, { let result = doSomeExpensiveWork() dispatch_async(dispatch_get_main_queue(), { //use `result` somehow }) })
在實際項目中,除了 DISPATCH_QUEUE_PRIORITY_DEFAULT,我們幾乎不使用其他的優先級選項。dispatch_get_global_queue() 將返回一個隊列,支持數百個線程的執行。如果你總是需要在一個後台隊列上執行開銷龐大的操作,那麼可以使用 dispatch_queue_create 創建自己的隊列,dispatch_queue_create 帶兩個參數,第一個是需要指定的隊列名,第二個說明是串行隊列還是並發隊列。
注意每次調用使用的是 dispatch_async 而不是 dispatch_sync。dispatch_async 將在 block 執行前立即返回,而 dispatch_sync 則會等到 block 執行完畢後才返回。內部的調用可以使用 dispatch_sync(因為不在乎什麼時候返回),但是外部的調用必須是 dispatch_async(否則主線程會被阻塞)。
創建單例
dispatch_once 這個 API 可以用來創建單例。不過這種方式在 Swift 已不再重要,Swift 有更簡單的方法來創建單例。我這裡就只貼 OC 的實現:
+ (instancetype) sharedInstance { static dispatch_once_t onceToken; static id sharedInstance; dispatch_once(&onceToken, ^{ sharedInstance = [[self alloc] init]; }); return sharedInstance; }
將 completion block 扁平化
至此我們的 GCD 之旅開始變得有趣起來。我們可以使用信號量來阻塞一個線程任意時長,直到一個信號從另一個線程發出。信號量和 GCD 的其他部分一樣也是線程安全的,而且能夠從任意位置被觸發。
信號量適用於當你有一個異步 API 需要同步執行的情形,但你不能修改它
// on a background queue dispatch_semaphore_t semaphore = dispatch_semaphore_create(0) doSomeExpensiveWorkAsynchronously(completionBlock: { dispatch_semaphore_signal(semaphore) }) dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER) //the expensive asynchronous work is now done
調用 dispatch_semaphore_wait 將會阻塞線程直到 dispatch_semaphore_signal 被調用。這就意味著 signal 必須從不同的線程被調用,因為當前線程是被完全阻塞的。更進一步,你從來都不該在主線程中調用 wait,而是只從後台線程中調用。
你可以在調用 dispatch_semaphore_wait 時設置一個超時時間,但是我總是趨向使用 DISPATCH_TIME_FOREVER
為什麼在已經有 completion block 的情況下,還想要攤平代碼?因為方便呀,我能想到的一種場景是執行一組異步程序,但他們必須串行執行(即只有前一個任務執行完成,才會繼續執行下一個任務)。現在把上述想法簡單地抽象成一個 AsyncSerialWorker 示例:
typealias DoneBlock = () -> () typealias WorkBlock = (DoneBlock) -> () class AsyncSerialWorker { private let serialQueue = dispatch_queue_create("com.khanlou.serial.queue", DISPATCH_QUEUE_SERIAL) func enqueueWork(work: WorkBlock) { dispatch_async(serialQueue) { let semaphore = dispatch_semaphore_create(0) work({ dispatch_semaphore_signal(semaphore) }) dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER) } } }
上面這個簡短的類創建了一個串行隊列,允許你將 work 的入隊列操作放進 block 中。在 WorkBlock 需要一個 DoneBlock 作為參數,而 DoneBlock 會在當前工作結束時被執行,我們通過將 DoneBlock 設置為 {dispatch_semaphore_signal(semaphore)} 來調整信號量,從而讓串行隊列繼續執行下去。
譯者注:既然已經使用了 DISPATCHQUEUESERIAL,那麼隊列中 work 的執行順序不應該是先進先出的嗎?確實是這樣,但如果我們把 work 看成是一個耗時的網絡操作,其內部是提交到其他線程並發去執行(dispatch_async),也就是每次執行到 work 就立刻返回了,即使最終結果可能還未返回。那麼我們想要保證隊列中的 work 等到前一個 work 執行返回結果後才執行,就需要 semaphore。說了這麼多還是舉個例子吧,打開 Playground:
import UIKit import XCPlayground typealias DoneBlock = () -> () typealias WorkBlock = (DoneBlock) -> () class AsyncSerialWorker { private let serialQueue = dispatch_queue_create("com.khanlou.serial.queue", DISPATCH_QUEUE_SERIAL) func enqueueWork(work: WorkBlock) { dispatch_async(serialQueue) { let semaphore = dispatch_semaphore_create(0) work({ dispatch_semaphore_signal(semaphore) }) dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER) } } } let a = AsyncSerialWorker() for i in 1...5 { a.enqueueWork { doneBlock in dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)) { sleep(arc4random_uniform(4)+1) print(i) doneBlock() } } } XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
此時的輸出結果為:1,2,3,4,5,如果將關於 semaphore 的代碼都注釋掉,結果就不會是按順序輸出了。
dispatch_semaphore_create(0) 當兩個線程需要協調處理某個事件時,我們在這裡傳入 0;內部其實是維護了一個計數器,我們下面會說到。
限制並發的數量
在上面的例子中,信號量被用來當做是一個簡單的標志,但它也可以當成一個有限資源的計數器。如果你想針對某些特定的資源限制連接數,你可以這樣做:
class LimitedWorker { private let concurrentQueue = dispatch_queue_create("com.khanlou.concurrent.queue", DISPATCH_QUEUE_CONCURRENT) private let semaphore: dispatch_semaphore_t init(limit: Int) { semaphore = dispatch_semaphore_create(limit) } func enqueueWork(work: () -> ()) { dispatch_async(concurrentQueue) { dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER) work() dispatch_semaphore_signal(semaphore) } } }
這個例子來自於蘋果官方的多線程編程指南,官方給出的解釋如下:
在創建信號量的時候,可以限定資源的可用數。這個可用數(long 類型)會隨信號量初始化時作為參數傳入。每次等待信號量時,dispatch_semaphore_wait 都會消耗一次這個可用數,如果結果為負,函數會告訴內核阻斷你的線程。另一方面,dispatch_semaphore_signal 函數每次執行都會將該可用計數 + 1,以此來表明已經釋放了資源。如果此刻有因為等待可用資源而被阻隔的任務,系統會從等待的隊列中解鎖一個任務來執行。
這個效果類似於 NSOperationQueue 的 maxConcurrentOperationCount。如果你使用原生的 GCD 隊列而不是 NSOperationQueue,你就能使用信號量來限制並發任務的數量。
值得注意是:每次調用 enqueueWork 都會將 work 提交到一個並發隊列,而該並發隊列收到任務就會丟出去執行,直到觸碰到信號量數量耗盡的天花板(work 入隊列的速度太快,dispatch_semaphore_wait 已經消耗完了所有的數量,而之前的 work 還未執行完畢,dispatch_semaphore_signal 不能增加信號量的可用數量)
等待許多並發任務去完成
如果你有許多 blocks 任務要去執行,你需要在他們全部完成時得到通知,你可以使用 group。dispatch_group_async 允許你在隊列中添加任務(這些任務應該是同步執行的),而且你會追蹤有多少被添加的任務。注意:同一個 dispatch group 能夠添加不同隊列上的任務,並且能保持對所有組內任務的追蹤。當所有被追蹤的任務完成時,一個傳遞給 dispatch_group_notify 的 block 會被觸發執行,有點類似於 completion block
dispatch_group_t group = dispatch_group_create() for item in someArray { dispatch_group_async(group, backgroundQueue, { performExpensiveWork(item: item) }) } dispatch_group_notify(group, dispatch_get_main_queue(), { // all the work is complete }
對於攤平一個擁有 completion block 的函數來說,下面是一個絕佳的示例。分發組(The dispatch group)認為整個 block 將在返回時完成,所以 block 會一直等到任務完成時才會返回
還有更多手動使用 dispatch groups 的姿勢,尤其有很多異步執行的大開銷任務:
// must be on a background thread dispatch_group_t group = dispatch_group_create() for item in someArray { dispatch_group_enter(group) performExpensiveAsyncWork(item: item, completionBlock: { dispatch_group_leave(group) }) } dispatch_group_wait(group, DISPATCH_TIME_FOREVER) // all the work is complete
這段代碼更加復雜一些,但是我們逐行去看還是可以理解的。如同信號量,groups 同樣保持著一個線程安全的、可以操控的內部計數器。你可以使用這個計數器來確保在 completion block 執行前,多個大開銷任務都已執行完畢。使用 enter 來增加計數器,使用 leave 來減少計數器。dispatch_group_async 已為你處理了這些細節,所以盡情地享受即可。
代碼片段的最後一行是 wait 調用:他會阻斷當前線程並且等待計數器達到 0 才會繼續執行。注意盡管你使用了 enter/leave API,但你還是能夠通過 dispatch_group_notify 將 block 提交到隊列中。反過來也是成立的:如果你用了 dispatch_group_async API,也是能夠使用 dispatch_group_wait 的。
dispatch_group_wait 和 dispatch_semaphore_wait 一樣都接收一個超時參數。再次重申,我更喜歡 DISPATCH_TIME_FOREVER,同樣的不要在主線程中調用 dispatch_group_wait。
上面兩段代碼最大的不同在於使用 notify 可以完全從主線程上被調用,而使用 wait 的只能在後台線程上使用(至少是 wait 部分,因為他會完全阻塞當前線程)
隔離隊列
Swift 中的字典(和數組)都是值類型,當他們被修改時,他們的引用會被一個新的副本所替代。但是,因為更新 Swift 對象的實例變量操作並不是原子性的,所以這些操作也不是線程安全的。如果兩個線程同一時間更新一個字典(比如都添加一個值),而且這兩個操作都嘗試寫同一塊內存,這就會導致內存崩壞,我們可以使用隔離隊列來達成線程安全的目的。
先來構建一個標識映射Identity Map,一個標識映射是一個字典,代表著從 ID 到 model 對象的映射
Identity Map(標識映射)模式是通過將所有已加載對象放在一個映射中確保所有對象只被加載一次,並且在引用這些對象時使用該映射來查找對象。在處理數據並發訪問時,要有一種策略讓多個用戶共同影響同一個業務實體,這個固然很重要。同樣重要的是,單個用戶在一個長運行事務或復雜事務中始終使用業務實體的一致版本。Identity Map模式提供的功能;為事務中使用所有的業務對象均保存一個版本,如果一個實體被請求兩次,返回同一個實體。
class IdentityMap { var dictionary = Dictionary() func object(forID ID: String) -> T? { return dictionary[ID] as T? } func addObject(object: T) { dictionary[object.ID] = object } }
這個對象基本就是一個字典封裝器,如果有多個線程在同一時刻調用函數 addObject,內存將會崩壞,因為線程會操作相同的引用。這也是操作系統中的經典的讀者-寫者問題,簡而言之,我們可以在同一時刻有多個讀者,但同一時刻只能有一個線程可以寫入。
幸運的是 GCD 針對在該場景下同樣擁有強力武器,有如下四種 APIs 供我們選用:
dispatch_sync
dispatch_async
dispatch_barrier_sync
dispatch_barrier_async
我們的想法是讀操作可以支持同步和異步,而寫操作也能支持異步寫入,並且必須確保是寫入的是同一個引用。GCD 的 barrier 集合 APIs 提供了解決方案:他們將會一直等到隊列中的任務清空,才會繼續執行 block。使用 barrier APIs 可以用來限制我們對字典對象的寫入,並且確保我們不會在同一時刻進行多個寫操作,以及正在寫操作時同時進行讀操作。
class IdentityMap { var dictionary = Dictionary() let accessQueue = dispatch_queue_create("com.khanlou.isolation.queue", DISPATCH_QUEUE_CONCURRENT) func object(withID ID: String) -> T? { var result: T? = nil dispatch_sync(accessQueue, { result = dictionary[ID] as T? }) return result } func addObject(object: T) { dispatch_barrier_async(accessQueue, { dictionary[object.ID] = object }) } }
dispatch_sync 將會分發 block 到我們的隔離隊列上然後等待其執行完畢。通過這種方式,我們就實現了同步讀操作(如果我們搞成異步讀取,getter 方法就需要一個 completion block)。因為 accessQueue 是並發隊列,這些同步讀取操作可以並發執行,也就是允許同時讀。
dispatch_barrier_async 將分發 block 到隔離隊列上,async 異步部分意味著會立即返回,並不會等待 block 執行完畢。這對性能是有好處的,但是在一個寫操作後立即執行一個讀操作會導致讀到一個半成品的數據(因為可能寫操作還未完成就開始讀了)。
dispatch_barrier_async 中的 barrier 部分意味著:只要 barrier block 進入隊列,並不會立即執行,而是會等待該隊列其他 block 執行完畢後再執行。所以在這點上就保證了我們的 barrier block 每次都只有它自己在執行。而所有在他之後提交的 block 也會一直等待這個 barrier block 執行完再執行。
傳入 dispatch_barrier_async() 函數的 queue,必須是用 dispatch_queue_create 創建的並發 queue。如果是串行 queue 或者是 global concurrent queues,這個函數就會變成 dispatch_async() 了
完結
GCD 是一個具備底層特性的框架,通過他們,我們可以構建高層級的抽象行為。如果還有一些我沒提到的高層級的行為可以用 GCD 來構建,歡迎來交流。