2016年1月16日 星期六

Loopback 妙用

楔子

multi-threading programming 免不了需要實作 Inter-thread communication(ITC),最常見的大概就是 consumer/producer:




大部分人第一個想法就是用 queue:



於是類似下方的 code 大概一百間公司會出現一百次:

//safequeue.h
#ifndef SAFEQUEUE_H
#define SAFEQUEUE_H

#include <QQueue>
#include <QMutex>
#include <QString>

class SafeQueue
{    
    QQueue<QString> m_queue;
    mutable QMutex m_queue_mtx;

public:
    SafeQueue();
    int count();
    QString pop();
    void push(const QString& s);

};

#endif // SAFEQUEUE_H
#include "safequeue.h"

SafeQueue::SafeQueue()
{
}

int SafeQueue::count()
{
    QMutexLocker locker(&m_queue_mtx);
    return m_queue.size();
}

QString SafeQueue::pop()
{
    QMutexLocker locker(&m_queue_mtx);
    if(m_queue.size() > 0)
        return m_queue.dequeue();
    else
        return "";
}

void SafeQueue::push(const QString &s)
{
    QMutexLocker locker(&m_queue_mtx);
    m_queue.enqueue(s);
}

Consumer Thread 則是長得像下面這樣:


void ConsumerThread(void *p)
{
    SafeQueue *queue = (SafeQueue*)p;
    //...
    while(1)
    {
        if(queue->count() > 0)
        {
            QString s = queue->pop();
            //do something
        }
        else
        {
            Sleep(1); //sleep 1ms
        }

    }
    //...
}

這樣寫不能說完全不對,但是有三個缺點:

  1. 以現代 CPU 來講,1ms 可以做很多事(就算 ARM 現在 1 GHz CPU 也很常見),而這裡是完全浪費掉了。
  2. 以 Windows 來說,若沒有特別調整,Sleep() 的精度只有 30ms,也就是你浪費了 30ms。
  3. ConsumerThread 反應不夠靈敏:理想上應該是一有資料就要消化。這邊 1/30ms = 33.33,一秒鐘只能處理 33 次反應也太慢了吧?

方法2: 條件變數(condition variable)

站長最早從 PThread 習得這個方法,Windows 要到 8.0 之後才有相關 API,不過有些 framework(e.g. Qt) 有模擬出來。這個方法的主要精神就是 Thread(Consumer) 發現條件沒有成立就進入睡眠狀態,一旦由其他 Thread(Producer) 促成條件成立就喚醒睡眠中的 Thread(Consumer),實作如下:


//safequeue2.h
#ifndef SAFEQUEUE2_H
#define SAFEQUEUE2_H

#include <QQueue>
#include <QWaitCondition>
#include <QMutex>
#include <QString>

class SafeQueue
{        
    QQueue<QString> m_queue;
    mutable QMutex m_queue_mtx;
    mutable QWaitCondition m_queue_wc; //<--

public:
    SafeQueue();
    int count();
    QString pop();
    void push(const QString& s);

};

#endif // SAFEQUEUE_H
//safequeue2.cpp
#include "safequeue.h"

SafeQueue::SafeQueue()
{
}

int SafeQueue::count()
{
    QMutexLocker locker(&m_queue_mtx);
    return m_queue.size();
}

QString SafeQueue::pop()
{
    m_queue_mtx.lock();
    if(m_queue.size() == 0)
    {
        m_queue_wc.wait(&m_queue_mtx);
        QString s = m_queue.dequeue();
        m_queue_mtx.unlock();
        return s;
    }
    else
    {
        QString s = m_queue.dequeue();
        m_queue_mtx.unlock();
        return s;
    }
}

void SafeQueue::push(const QString &s)
{
    QMutexLocker locker(&m_queue_mtx);
    int old = m_queue.size();
    m_queue.enqueue(s);
    if(old == 0)
        m_queue_wc.wakeOne();
}

當 Consumer Thread 呼叫 m_queue.pop() 時若是沒有資料,則會呼叫 m_queue_wc.wait() 進入睡眠,並且 unlock mutex(m_queue_mtx),等到 Producer Thread 呼叫 m_queue.push() 後,發現之前 queue 是空的(表示可能有 Consumer Thread 在等待),就呼叫 m_queue_wc.wakeOne() 叫醒 Consumer Thread,Consumer Thread 會重新取得 lock mutex(m_queue_mtx),取得資料然後返回。

我們解決了 Consumer Thread 傻等的問題,但碰到下面的情形呢?

當然,SafeQueue::push() 已經加上 mutex 保護了,所以可以放心給 A/B/C 三個 threads 同時呼叫。不過若是深度思考,Thread A 在 push 時 Thread B/C 被阻擋在門外,在 CPU 核心越來越多的今天似乎效能被浪費掉了,有沒有讓 A/B/C 同時可以 push 的方法?

直覺的想法就是把 queue 一分為三:


接著可以借助 semaphore(e.g. win32: CreateSemaphore),與 WaitForMultipleObjects 這兩樣武器,不過 WaitForMultipleObjects 很容易誤用,這個方案也會走到越來越複雜的境界。

沒別的方法了嗎?

方法3: Loopback

這個方法的核心就是用 TCP loopback 通訊 (UDP 應該也行,站長沒試過)替代 DIY queue,對 Winsock/BSD socket 有粗淺認識的人都知道一旦 connect()/accept() 成功,就會產生一組全雙工通訊管道:


我們只要把這個 client/server 連結的行為放在本地端,server binding local address (IPv4: 127.0.0.1、IPv6: ::1),就可以讓兩個 thread 互相通訊:


這樣做有幾個優點:

  • Thread A/B 各自使用獨立的 socket handle 進行通訊,不需要 mutex,也就是 lock-free
  • 通訊通道記憶體管理由作業系統負責,程序員無須關心。
  • Thread(e.g. Consumer Thread) 可以使用 select() 掃瞄多個 socket handle,select() 是跨平台函式,同樣的作法可以移植到 Linux,反之亦然。
示意圖:


如果你覺得這樣是旁門左道,微軟在 Windows Server 2012 與 Windows 8 添加了 "TCP Loopback Fast Path",而 "Fast TCP Loopback Performance and Low Latency with Windows Server 2012 TCP Loopback Fast Path" 這篇文章明白指出這個新功能就是替使用 TCP loopback 做 IPC 的軟體加速。

事實上在 UNIX 裡早有個使用多年的 system call: socketpair() 用來產生上述用來通訊用的成對 socket handle,其第一個參數 domain 在大多數的實作裡(e.g. Linux),只能填入 AF_UNIX(UNIX domain socket),表示 socketpair 的用途就是 IPC/ITC。

在 Linux 世界裡頗流行的 D-Bus,也是建築在 UNIX domain socket 之上。

腦筋動的快的人可以發現,這樣的架構不只可以用在 IPC/ITC 上,既然原來就是使用 TCP,那表示也有機會把 Thread 打散到不同的機器上進行分散式運算。如果平台限定在 Linux 上,則可以使用效率更高的 epoll。


如果您讀到這裡躍躍欲試,但不知道怎麼開始,不用辛苦的 k winsock API(當然 Linux user 輕舟已過萬重山),可以參考 libevent 的 evutil_ersatz_socketpair(),他已經在那邊等您了 :)

缺點:

  1. 跟 DIY queue 相比,socket 是有限的資源。
  2. 傳遞資料從 C++ object 變為 byte stream。
關於 1.,Windows 限制單一 process 能使用的 socket 數上限為 32767,這對大部分應用來說都非常足夠,而 select() 在 Windows 上預設也只能輪詢 64 個 socket handles,如果您要使用很多 socket handles + threads,恐怕要先討論的是是不是要把雞蛋全放在同一個籃子,而且如同站長之前的文章解釋過,同時啟用太多 threads 並不會有什麼好下場。甚至於如果您有大量輪詢 socket handles 的需求,可以考慮移師 Linux,因為 Linux epoll() 輪詢上千上萬個 handles 並不罕見。

2.  的問題變成要設計通訊協議,在過去也是可能一百家公司有一百種作法,好在 Google 很大方的開放了自家設計的 ProtoBuf 供大眾使用,只要先定義好 schema 就會自動產生相關讀寫程式碼,免除各家自造輪子不相容的問題。

參考資料

Python Cookbook, 3rd Edition >12.13. Polling Multiple Thread Queues

希望這篇文章對您有幫助 ^^

13 則留言:

  1. 請問有什麼可以實際應用的場合下能夠用上的呢?

    看起來是個很實用的技術應用,謝謝你的分享!

    回覆刪除
    回覆
    1. 感謝留言

      光是「process 結束時,乾淨的結束內在所有 thread(s)」就很好用了,我會再寫一篇介紹

      刪除
  2. Qt 或 cpp 如何達到 像是
    Python Cookbook, 3rd Edition >12.13. Polling Multiple Thread Queues
    所介紹的 使用方式嗎 ?
    網路上找不太到sample code..

    回覆刪除
    回覆
    1. Qt 要用 QLocalSocket,或者乾脆用 QTcpServer + QTcpSocket。QLocalSocket 在 UNIX 就是用 UNIX Domain socket,在 Windows 是用 named pipe。Qt 在 Linux 也是用 select() or epoll() 做訊息處理,Qt Windows 就沒仔細看了

      刪除
    2. class PollableQueue(queue.Queue):
      def __init__(self):
      super().__init__()
      # Create a pair of connected sockets if os.name == 'posix':
      self._putsocket, self._getsocket = socket.socketpair()
      else:
      # Compatibility on non-POSIX systems
      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1', 0))
      server.listen(1)
      self._putsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self._putsocket.connect(server.getsockname())
      self._getsocket, _ = server.accept()
      server.close()

      不懂在 Queue裡 建socket通道後 又close , 整個流程是怎麼work的?

      還有
      def consumer(queues): '''
      Consumer that reads data on multiple queues simultaneously
      '''
      while True:
      can_read, _, _ = select.select(queues,[],[]) for r in can_read:
      for r in can_read:
      item = r.get();

      cpp的select 是不是無法做成類似這樣?

      刪除
    3. 不好意思 程式貼的有點亂 跑去參考python cookbook 3 發現有些理解

      刪除
    4. 您找一下evutil_ersatz_socketpair原始碼就知道了,另外把blocking,non-blocking弄清楚就好了,select用法請看UNP

      刪除
    5. 大約有些懂了, 就是用socket來做類似lock的動作, queue 放入item後 thread 中的select才有return
      但是 去翻了網路 也有說這種方式跟 mutex 比起來效能不彰 約有十倍慢
      連結: http://sidekick.windforwings.com/2012/05/inter-thread-communication-socketpairs.html

      刪除
    6. 那麼好處只有使用thread時, 好debug 跟 程式碼乾淨 但還多了一個壞處: byte stream 要自己解碼
      cpp select 回來大約還要使用FD_ISSET 區分出哪個 queue object 再從queue取出..

      刪除
    7. 當然不可能比內部共享記憶體快,如果要講,Windows上的CRITICAL_SECITON 還比 Mutex更快,前者不需要進入 kernel,Python cook book也講了這會有一些性能上的損失

      個人見解是您的程式需要多快的反應時間?多少throughput才夠?才去考慮最激進的方案

      應該不是說用socket替代lock,而是這樣的程式本就是lock free,直接達到thread-safe

      您貼的網址並沒有使用select,也沒有把socket設定成non-blocking,這是使用select的關鍵,甚至在 Windows 上 blocking socket 是效能最差的(官方資料)

      您覺得FD_ISSET要掃描整個fd array很不便,這是因為Windows沒有提供類似Linux epoll的界面,如果是Linux就該考慮使用epoll,這是Kernel 2.6後就有的 api,epoll回傳的都是已經read to read/write的fd.

      至於 byte stream 要解碼個人是覺得還好,畢竟Google 都給你 ProtoBuf了(Pokemon Go也採用ProtoBuf)

      而且與共享記憶體相比.只有這種架構可以擴展成網路分散式運算,除非您的程式不需要scalability的能力

      而且好debug難道不該是最優先被考慮的選項?說真的,我剛畢業時也寫了一堆mutex+queue的東西,但這樣的架構寫一般寫小型程式尚可,當你的程式成長成一個大怪物時,還要跟一群人co-work時,就會演變成可怕的災難,各種mutex像狗皮藥膏一樣到處出現,但沒有人有能力一眼看出來為何要加這些mutex,甚至在做效能調校時這些mutex還會成為阻礙...

      所以,為何MS要幫TCP loopback解開封印?Python Cookbook把這樣的用法寫到書上?不是沒有原因的

      刪除
    8. 謝謝指教 以前做工控時 控制加影像檢測 自己跟同事常用polling的方式 polling一些設備
      本人又不喜歡使用 waitformultiobject
      看到站長提供的做法 真心覺得不錯
      但似乎有效能衝擊的問題 所以應該用在thread A B C丟資料量大但次數少 consumer thread 要複雜運算的情景下

      請問站長做過的案例 在何種情況下使用 ? 輕運算也用嗎?

      刪除
    9. 老實說 thread多的時候 同步挺麻煩的
      我也是能免用則免用
      但是站長提倡的做法 在台灣還算少見
      所以在分析好處壞處 可以replace all嗎 之類的

      刪除
    10. WaitForMultipleObject要用的好不容易,請看本站另一篇文章,簡單來說WaitForMultipleObject比select還要容易誤用:

      http://goodspeedlee.blogspot.tw/2015/09/waitformultipleobjects-windows-io.html

      您在設計工控程式時一定碰過好幾個 thread 同時想要讀寫某個設備的料吧?

      最糟的作法就是在 WriteFile(),ReadFile() 前加上 mutex,這個不用我多說,一定不會有好下場。

      比較正常一點的作法就是讓一個 thread A負責 polling device,其他的 thread B,C,D把想要傳送的資料用 DIY queue 丟給他。

      問題來了,當 thread A 等待設備回應時,在不用WaitForMultipleObject+overlapped io的情形下,是無法反應對B,C,D丟出的訊息的。

      另外一個問題是如果今天B,C,D沒有來要求read,write,A無事可作又要隨時待命,那就會變成本文開頭的例子,用sleep(1)的方式不斷檢查。

      所以不用WaitForMultipleObject+overlapped io幾乎是不可能的,就算你不想用 socket 替代 DIY queue。

      還有一個問題是,B,C,D有時候無法射後不理,他要確認A真的寫入才能繼續往下走,如果不用本文的方法,那B,C,D就是要提供一個queue或是semaphore之類的,使用WaitForSingleObject 去等A發訊號過來。

      綜合以上的issue,你會發現本文提供的方法還比較簡單不容易誤用。而如果您覺得每次都叫A去讀很慢,另外一篇文章提供一個加上cache的作法:

      http://goodspeedlee.blogspot.tw/2016/01/loopback_20.html


      所以我也沒說要全盤拋棄 mutex,這個作法可以提昇不少效能,讓B,C,D不用每次都用socket叫A做事。

      所以個人推薦的方案是

      網路類設備:
      select()+non-blocking

      非網路類設備:
      WaitForMultipleObject+overlapped io

      當然如果您碰上效能不夠,那就是要另外的訂製方案,不在本文之內,本文也不是仙丹。

      在我的case中我主要是為了服務腳本引擎搭配閘道服務,用上這個作法後bug只出現在解析封包這一段,除了剛開始解析封包碰到一些小bug,幾乎沒再碰過當機等問題,。

      刪除