你好,歡迎來到IOS教程網

 Ios教程網 >> IOS編程開發 >> IOS開發基礎 >> Android和iOS開發中的異步處理(四)——異步任務和隊列

Android和iOS開發中的異步處理(四)——異步任務和隊列

編輯:IOS開發基礎

本文是系列文章《Android和iOS開發中的異步處理》的第四篇。在本篇文章中,我們主要討論在客戶端編程中經常使用的隊列結構,它的異步編程方式以及相關的接口設計問題。

前幾天,有位同事跑過來一起討論一個技術問題。情況是這樣的,他最近在開發一款手游,用戶在客戶端上的每次操作都需要向服務器同步數據。本來按照傳統的網絡請求處理方式,用戶發起操作後,需要等待操作完成,這時界面要顯示一個請求等待的過程(比如轉菊花)。當請求完成了,客戶端顯示層才更新,用戶也才能發起下一個操作。但是,這個游戲要求用戶能在短時間內連續做很多操作。如果每個操作都要經歷一個請求等待的過程,無疑體驗是很糟糕的。

其實呢,這裡就需要一個操作任務隊列。用戶不用等待一個操作完成,而是只要把操作放入隊列裡,就可以繼續進行下一步操作了。只是,當隊列中有操作出錯時,需要進入一個統一的錯誤處理流程。當然,服務器也要配合進行一些處理,比如要更加慎重地對待操作去重問題。

本文要討論的就是跟隊列的設計和實現有關的那些問題。

注:本系列文章中出現的代碼已經整理到GitHub上(持續更新),代碼庫地址為:https://github.com/tielei/AsyncProgrammingDemos

其中,當前這篇文章中出現的Java代碼,位於com.zhangtielei.demos.async.programming.queueing這個package中。

概述

在客戶端編程中,使用隊列的場景其實是很多的。這裡我們列舉其中幾個。

  • 發送聊天消息。現在一般的聊天軟件都允許用戶連續輸入多條聊天消息,也就是說,用戶不用等待前一條消息發送成功了,再鍵入第二條消息。系統會保證用戶的消息有序,而且由於網絡狀況不好而發送失敗的消息會經歷若干次重試,從而保證消息盡力送達。這其實背後有一個消息發送隊列,它對消息進行排隊處理,並且在錯誤發生時進行有限的重試。

  • 一次上傳多張照片。如果用戶能夠一次性選中多張照片進行上傳操作,這個上傳過程時間會比較長,一般需要一個或多個隊列。隊列的重試功能還能夠允許文件的斷點續傳(當然這要求服務端要有相應的支持)。

  • 將關鍵的高頻操作異步化,提升體驗。比如前面提到的那個游戲連續操作的例子,再比如在微信朋友圈發照片或者評論別人,都不需要等待本次網絡請求結束,就可以進行後續操作。這背後也隱藏著一個隊列機制。

為了討論方便,我們把這種對一系列操作進行排隊,並具備一定失敗重試能力的隊列稱為“任務隊列”。

下面本文分三個章節來討論異步任務和任務隊列的相關話題。

  1. 介紹傳統的線程安全隊列TSQ(Thread-Safe Queue)。

  2. 適合客戶端編程環境的無鎖隊列。這一部分遵循異步任務的經典回調方式(Callback)來設計接口。關於異步任務的回調相關的詳細討論,請參見這個系列的第二篇。

  3. 基於RxJava響應式編程的思想實現的隊列。在這一部分,我們會看到RxJava對於異步任務的接口設計會產生怎樣的影響。

Thread-Safe Queue

在多線程的環境下,提到隊列就不能不提TSQ。它是一個很經典的工具,在不同的線程之間提供了一條有序傳輸數據的通道。它的結構圖如下所示。

Android和iOS開發中的異步處理(四)——異步任務和隊列 - 鐵蕾的個人博客.png

消費者和生產者分屬不同的線程,這樣消費者和生產者才能解耦,生產不至於被消費所阻塞。如果把TSQ用於任務隊列,那麼生產相當於用戶的操作產生了任務,消費相當於任務的啟動和執行。

消費者線程運行在一個循環當中,它不停地嘗試從隊列裡取數據,如果沒有數據,則阻塞在隊列頭上。這種阻塞操作需要依賴操作系統的一些原語。

利用隊列進行解耦,是一個很重要的思想。說遠一點,TSQ的思想推廣到進程之間,就相當於在分布式系統裡經常使用的Message Queue。它對於異構服務之間的解耦,以及屏蔽不同服務之間的性能差異,可以起到關鍵作用。

而TSQ在客戶端編程中比較少見,原因包括:

  • 它需要額外啟動一個單獨的線程作為消費者。

  • 更適合客戶端環境的“主線程->異步線程->主線程”的編程模式(參見這個系列的第一篇中Run Loop那一章節的相關描述),使得生產者和消費者可以都運行在主線程中,這樣就不需要一個Thread-Safe的隊列,而是只需要一個普通隊列就行了(下一章要講到)。

我們在這裡提到TSQ,主要是因為它比較經典,也能夠和其它方式做一個對比。我們在這裡就不給出它的源碼演示了,想了解細節的同學可以參見GitHub。GitHub上的演示代碼使用了JDK中現成的TSQ的實現:LinkedBlockingQueue。

基於Callback的任務隊列

01.png

如上圖所示,生產者和消費者都運行在一個線程,即主線程。按照這種思路來實現任務隊列,我們需要執行的任務本身必須是異步的,否則整個隊列的任務就沒法異步化。

我們定義要執行的異步任務的接口如下:

public interface Task {
    /**
     * 唯一標識當前任務的ID
     * @return
     */
    String getTaskId();

    /**
     * 由於任務是異步任務, 那麼start方法被調用只是啟動任務;
     * 任務完成後會回調TaskListener.
     *
     * 注: start方法需在主線程上執行.
     */
    void start();

    /**
     * 設置回調監聽.
     * @param listener
     */
    void setListener(TaskListener listener);

    /**
     * 異步任務回調接口.
     */
    interface TaskListener {
        /**
         * 當前任務完成的回調.
         * @param task
         */
        void taskComplete(Task task);
        /**
         * 當前任務執行失敗的回調.
         * @param task
         * @param cause 失敗原因
         */
        void taskFailed(Task task, Throwable cause);
    }
}

由於Task是一個異步任務,所以我們為它定義了一個回調接口TaskListener。

getTaskId是為了得到一個能唯一標識當前任務的ID,便於對不同任務進行精確區分。

另外,為了更通用的表達失敗原因,我們這裡選用一個Throwable對象來表達(注:在實際編程中這未必是一個值得效仿的做法,具體情況請具體分析)。

有人可能會說:這裡把Task接口定義成異步的,那如果想執行一個同步的任務該怎麼辦?這其實很好辦。把同步任務改造成異步任務是很簡單的,有很多種方法(反過來卻很難)。

任務隊列的接口,定義如下:

public interface TaskQueue {
    /**
     * 向隊列中添加一個任務.
     * @param task
     */
    void addTask(Task task);

    /**
     * 設置監聽器.
     * @param listener
     */
    void setListener(TaskQueueListener listener);

    /**
     * 銷毀隊列.
     * 注: 隊列在最後不用的時候, 應該主動銷毀它.
     */
    void destroy();

    /**
     * 任務隊列對外監聽接口.
     */
    interface TaskQueueListener {
        /**
         * 任務完成的回調.
         * @param task
         */
        void taskComplete(Task task);
        /**
         * 任務最終失敗的回調.
         * @param task
         * @param cause 失敗原因
         */
        void taskFailed(Task task, Throwable cause);
    }
}

任務隊列TaskQueue本身的操作也是異步的,addTask只是將任務放入隊列,至於它什麼時候完成(或失敗),調用者需要監聽TaskQueueListener接口。

需要注意的一點是,TaskQueueListener的taskFailed,與前面TaskListener的taskFailed不同,它表示任務在經過一定次數的失敗後,最終放棄重試從而最終失敗。而後者只表示那個任務一次執行失敗。

我們重點討論TaskQueue的實現,而Task的實現我們這裡不關心,我們只關心它的接口。TaskQueue的實現代碼如下:

public class CallbackBasedTaskQueue implements TaskQueue, Task.TaskListener {
    private static final String TAG = "TaskQueue";

    /**
     * Task排隊的隊列. 不需要thread-safe
     */
    private Queue taskQueue = new LinkedList();

    private TaskQueueListener listener;
    private boolean stopped;

    /**
     * 一個任務最多重試次數.
     * 重試次數超過MAX_RETRIES, 任務則最終失敗.
     */
    private static final int MAX_RETRIES = 3;
    /**
     * 當前任務的執行次數記錄(當嘗試超過MAX_RETRIES時就最終失敗)
     */
    private int runCount;

    @Override
    public void addTask(Task task) {
        //新任務加入隊列
        taskQueue.offer(task);
        task.setListener(this);

        if (taskQueue.size() == 1 && !stopped) {
            //當前是第一個排隊任務, 立即執行它
            launchNextTask();
        }
    }

    @Override
    public void setListener(TaskQueueListener listener) {
        this.listener = listener;
    }

    @Override
    public void destroy() {
        stopped = true;
    }

    private void launchNextTask() {
        //取當前隊列頭的任務, 但不出隊列
        Task task = taskQueue.peek();
        if (task == null) {
            //impossible case
            Log.e(TAG, "impossible: NO task in queue, unexpected!");
            return;
        }
        Log.d(TAG, "start task (" + task.getTaskId() + ")");
        task.start();
        runCount = 1;
    }

    @Override
    public void taskComplete(Task task) {
        Log.d(TAG, "task (" + task.getTaskId() + ") complete");
        finishTask(task, null);
    }

    @Override
    public void taskFailed(Task task, Throwable error) {
        if (runCount < MAX_RETRIES && !stopped) {
            //可以繼續嘗試
            Log.d(TAG, "task (" + task.getTaskId() + ") failed, try again. runCount: " + runCount);
            task.start();
            runCount++;
        }
        else {
            //最終失敗
            Log.d(TAG, "task (" + task.getTaskId() + ") failed, final failed! runCount: " + runCount);
            finishTask(task, error);
        }
    }

    /**
     * 一個任務最終結束(成功或最終失敗)後的處理
     * @param task
     * @param error
     */
    private void finishTask(Task task, Throwable error) {
        //回調
        if (listener != null && !stopped) {
            try {
                if (error == null) {
                    listener.taskComplete(task);
                }
                else {
                    listener.taskFailed(task, error);
                }
            }
            catch (Throwable e) {
                Log.e(TAG, "", e);
            }
        }
        task.setListener(null);

        //出隊列
        taskQueue.poll();

        //啟動隊列下一個任務
        if (taskQueue.size() > 0 && !stopped) {
            launchNextTask();
        }
    }
}

在這個實現中,我們需要注意的幾點是:

  • 進出隊列的所有操作(offer, peek, take)都運行在主線程,所以隊列數據結構不再需要線程安全。我們選擇了LinkedList的實現。

  • 任務的啟動執行,依賴兩個機會:

1.任務進隊列addTask的時候,如果原來隊列為空(當前任務是第一個任務),那麼啟動它;

2.一個任務執行完成(成功了,或者最終失敗了)後,如果隊列裡有排隊的其它任務,那麼取下一個任務啟動執行。

  • 任務一次執行失敗,並不算失敗,還要經過若干次重試。如果重試次數超過MAX_RETRIES,才算最終失敗。runCount記錄了當前任務的累計執行次數。

CallbackBasedTaskQueue的代碼揭示了任務隊列的基本實現模式。

任務隊列對於失敗任務的重試策略,大大提高了最終成功的概率。在GitHub上的演示程序中,我把Task的失敗概率設置得很高(高達80%),在重試3次的配置下,當任務執行的時候仍然有比較大的概率能最終執行成功。

基於RxJava的任務隊列

關於RxJava到底有什麼用?網上有很多討論。

有人說,RxJava就是為了異步。這個當然沒錯,但說得不具體。

也有人說,RxJava的真正好處就是它提供的各種lift變換。還有人說,RxJava最大的用處是它的Schedulers機制,能夠方便地切換線程。其實這些都不是革命性的關鍵因素。

那關鍵的是什麼呢?我個人認為,是它對於回調接口設計產生的根本性的影響:它消除了為每個異步接口單獨定義回調接口的必要性。

這裡馬上就有一個例子。我們使用RxJava對TaskQueue接口重新進行改寫。

public interface TaskQueue {
    /**
     * 向隊列中添加一個任務.
     *
     * @param task
     * @param (R) 異步任務執行完要返回的數據類型.(此處圓括號代替尖括號)
     * @return 一個Observable. 調用者通過這個Observable獲取異步任務執行結果.
     */
    (R) Observable(R) addTask(Task(R) task);(此處圓括號代替尖括號)

    /**
     * 銷毀隊列.
     * 注: 隊列在最後不用的時候, 應該主動銷毀它.
     */
    void destroy();
}

我們仔細看一看這個修改後的TaskQueue接口定義。

  • 原來的回調接口TaskQueueListener沒有了。

  • 異步接口addTask原來沒有返回值,現在返回了一個Observable。調用者拿到這個Observable,然後去訂閱它(subscribe),就能獲得任務執行結果(成功或失敗)。這裡的改動很關鍵。本來addTask什麼也不返回,要想獲得結果必須監聽一個回調接口,這是典型的異步任務的運作方式。但這裡返回一個Observable之後,讓它感覺上非常類似一個同步接口了。說得再抽象一點,這個Observable是我們站在當下對於未來的一個指代,本來還沒有運行的、發生在未來的虛無缥缈的任務,這時候有一個實實在在的東西被我們抓在手裡了。而且我們還能對它在當下就進行很多操作,並可以和其它Observable結合。這是這一思想真正的強大之處。

相應地,Task接口本來也是一個異步接口,自然也可以用這種方式進行修改:

/**
 * 異步任務接口定義.
 *
 * 不再使用TaskListener傳遞回調, 而是使用Observable.
 *
 * @param (R) 異步任務執行完要返回的數據類型.(此處圓括號代替尖括號)
 */
public interface Task (R) {  (此處圓括號代替尖括號)
    /**
     * 唯一標識當前任務的ID
     * @return
     */
    String getTaskId();
    /**
     *
     * 啟動任務.
     *
     * 注: start方法需在主線程上執行.
     *
     * @return 一個Observable. 調用者通過這個Observable獲取異步任務執行結果.
     */
    Observable(R) start(); (此處圓括號代替尖括號)
}

這裡把改為RxJava的接口討論清楚了,具體的隊列實現反而不重要了。具體實現代碼就不在這裡討論了,想了解詳情的同學還是參見GitHub。注意GitHub的實現中用到了一個小技巧:把一個異步的任務封裝成Observable,我們可以使用AsyncOnSubscribe。

總結

再說一下TSQ

我們在文章開頭講述了TSQ,並指出它在客戶端編程中很少被使用。但並不是說在客戶端環境中TSQ就沒有存在的意義。

實際上,客戶端的Run Loop(即Android的Looper)本身就是一個TSQ,要不然它也沒法在不同線程之間安全地傳遞消息和調度任務。正是因為客戶端有了一個Run Loop,我們才有可能使用無鎖的方式來實現任務隊列。所以說,我們在客戶端的編程,總是與TSQ有著千絲萬縷的聯系。

順便說一句,Android中的android.os.Looper,最終會依賴Linux內核中大名鼎鼎的epoll事件機制。

本文的任務隊列設計中所忽略的

本文的核心任務是要講解任務隊列的異步編程方式,所以忽略了一些設計細節。如果你要實現一個生產環境能使用的任務隊列,可能還需要考慮以下這些點:

  1. 本文只設計了任務的成功和失敗回調,沒有執行進度回調。

  2. 本文沒有涉及到任務取消和暫停的問題(我們下一篇文章會涉及這個話題)。

  3. 任務隊列的一些細節參數應該是可以由使用者設置的,比如最大重試次數。

  4. 長生命周期的隊列和短生命周期的頁面之間的交互,本文沒有考慮。在GitHub實現的演示代碼中,為了簡單起見,演示頁面關閉後,任務隊列也銷毀了。但實際中不應該是這樣的。關於“長短生命周期的交互”,我後來發現也是一個比較重要的問題,也許後面我們有機會再討論。

  5. 在Android中,類似任務隊列這種可能長時間後台運行的組件,一般外層會使用Service進行封裝。

  6. 任務隊列對於失敗重試的處理,要求服務器慎重地對待去重問題。

  7. 監聽到任務隊列失敗發生之後,錯誤處理變得復雜。

RxJava的優缺點

本文最後運用了RxJava對任務隊列進行了重寫。我們確實將接口簡化了許多,省去了回調接口的設計,也讓調用者能用統一的方式來處理異步任務。

但是,我們也需要注意到RxJava帶來的一些問題:

  • RxJava是個比較重的框架,它非常抽象,難以理解。它對於接口的調用者簡單,而對於接口的實現者來說,是個難題。在實現一個異步接口的時候,如何返回一個恰當的Observable實例,有時候並不是那麼顯而易見。

  • Observable依賴subscribe去驅動它的上游開始運行。也就是說,你如果只是添加一個任務,但不去觀察它,它就不會執行!如果你只是想運行一個任務,但並不關心結果,那麼,這辦不到。舉個不恰當的例子,這有點像量子力學,觀察對結果造成影響……

  • 受前一點影響,在本文給出的GitHub代碼的實現中,第一個任務的真正啟動運行,並不是在addTask中,而是有所延遲,延遲到調用者的subscribe開始執行後。而且其執行線程環境有可能受到調用者對於Schedulers的設置的影響(比如通過subscribeOn),有不在主線程執行的風險。

RxJava在調試時會出現奇怪的、讓人難以理解的調用棧。

考慮到RxJava帶來的這些問題,如果我要實現一個完整功能的任務隊列或者其它復雜的異步任務,特別是要把它開源出來的的時候,我有可能不會讓它對RxJava產生絕對的依賴。而是有可能像Retrofit那樣,同時支持自己的輕量的異步機制和RxJava。

在本文結束之前,我再提出一個有趣的開放性問題。本文GitHub上給出的代碼大量使用了匿名類(相當於Java 8的lambda表達式),這會導致對象之間的引用關系變得復雜。那麼,對於這些對象的引用關系的分析,會是一個很有趣的話題。比如,這些引用關系開始是如何隨著程序執行建立起來的,最終銷毀的時候又是如何解除的?有沒有內存洩露呢?歡迎留言討論。

在下一篇,我們將討論有關異步任務更復雜的一個問題:異步任務的取消。

my_weixin_sign_sf_840.jpg

  1. 上一頁:
  2. 下一頁:
蘋果刷機越獄教程| IOS教程問題解答| IOS技巧綜合| IOS7技巧| IOS8教程
Copyright © Ios教程網 All Rights Reserved