2019年3月31日 星期日

從 lwIP 學習 Inter-Thread Communication

Multithreading 程設最大的障礙是看懂了 Semaphore, Mutex, Reader/Writer Lock API...仍然不知如何「正確」完成一個專案。

大部分人都有能力讓程式動起來,但若是不允許疊床架屋,把 mutex 當成萬靈丹到處亂貼,必須做出一個易於修改擴充,可保證執行效率的 code base,那就不是每個人都辦得到了。

筆者也掙扎了很久,直到最近與 lwIP 相處了一年,才發現「眾裡尋他千百度,驀然回首,那人卻在燈火闌珊處」。原來 lwIP 早就為我們示範了一套易於實現、容易理解的實作模式(design pattern)。

看完本篇,您搞不好馬上就可以應用到手上的案子,而不需要煩惱是否要引入 ZeroMQ 用大砲轟小鳥。

場景

假設我們有 3 個 threads,分別為 Web Server、Modbus TCP Slave、Modbus RTU Master,Web Server 與 Modbus TCP Slave 都想存取下圖最下方的 Modbus RTU Slave:


在只要讀取的場合,可以由 Modbus RTU Master 代為讀回到一塊 cache 中(這個 cache 只需要實行簡單的 mutex 保護),Web Server 與 Modbus TCP Slave 可以由 cache 共享資料,也就避免了為了搶同一個 Serial Port 造成的 race condition。但除非要讀取的 Modbus RTU Slave 資料位址(如 Holding Register)固定不變,Modbus TCP Slave 與 Web Server 還是需要一個通訊機制告訴 Modbus RTU Master 要新增哪些資料位址,或者取消取某些位址。


更不用說如果 Web Server 與 Modbus TCP Slave 想對 Modbus RTU Slave 寫值,也需要一套通訊機制把欲寫入的值與資料位址告訴 Modbus RTU Master。這有很多種方法,自製的 Message Queue,或是之前提過的 Loopback socket,但最終都會導向一個問題:「該如何設計通訊協定」?

這往往朝複雜化的方向走,這時候有人搞不好就想請出 ProtoBuf,筆者以前也有這種想法,不過後來看了 lwIP 的設計,發現這也是用大砲轟小鳥,沒那個必要。接下來就來看 lwIP 示範什麼叫做「精緻」的程式設計。

庖丁解牛: getaddrinfo()

getaddrinfo() 將 domain name 轉成 IP address,效果類似 nslookup:


同樣,若以參數呼叫 getaddrinfo("yahoo.com.tw", 0, 0, &res),會得到下方的結果:


不過這裡要關注的不是結果,而是過程
/* netdb.c */
int
lwip_getaddrinfo(const char *nodename, const char *servname,
       const struct addrinfo *hints, struct addrinfo **res)
{
  /* ... */  

  if (nodename != NULL) {
    /* service location specified, try to resolve */
    err = netconn_gethostbyname(nodename, &addr);
    if (err != ERR_OK) {
      return EAI_FAIL;
    }
  } else {
    /* ... */
  }
  
  /* ... */
}
/* api_lib.c */
err_t
netconn_gethostbyname(const char *name, ip_addr_t *addr)
{
  struct dns_api_msg msg;
  err_t err;
  sys_sem_t sem;

  err = sys_sem_new(&sem, 0);
  if (err != ERR_OK) {
    return err;
  }

  msg.name = name;
  msg.addr = addr;
  msg.err = &err;
  msg.sem = &sem;

  tcpip_callback(do_gethostbyname, &msg);
  sys_sem_wait(&sem);
  sys_sem_free(&sem);

  return err;
}
/* tcpip.h */
#define tcpip_callback(f, ctx) tcpip_callback_with_block(f, ctx, 1)

/* tcpip.c */
err_t
tcpip_callback_with_block(tcpip_callback_fn function, void *ctx, u8_t block)
{
  struct tcpip_msg *msg;

  if (sys_mbox_valid(&mbox)) {
    msg = (struct tcpip_msg *)memp_malloc(MEMP_TCPIP_MSG_API);
    if (msg == NULL) {
      return ERR_MEM;
    }

    msg->type = TCPIP_MSG_CALLBACK;
    msg->msg.cb.function = function;
    msg->msg.cb.ctx = ctx;
    if (block) {
      sys_mbox_post(&mbox, msg);
    } else {
      if (sys_mbox_trypost(&mbox, msg) != ERR_OK) {
        memp_free(MEMP_TCPIP_MSG_API, msg);
        return ERR_MEM;
      }
    }
    return ERR_OK;
  }
  return ERR_VAL;
}
由上面的程式片段(取自 lwIP 1.4.1)可以看到,lwip_getaddrinfo() 採用間接的方式查詢 IP address,netconn_gethostbyname 把參數位址、error code、semaphore(from sys_new_sem) 打包成 dns_api_msg,接著 tcpip.c line:20 再把 dns_api_msg 與真正負責查詢 IP adderss 的 function - do_gethostbyname() 再打包成 tcpip_msg 成放進 mbox 傳給 lwIP task。

mbox 在 lwIP 中稱作 mailbox,lwIP 並未強制規定 mailbox 要實作成哪種資料結構,不過筆者看過的實作中幾乎都是 queue(first in first out),接著我們把上面的程式與描述整理成圖形就一目了然了:

lwIP 收到這個 message 就只要呼叫 msg.cb.function 就好:
/* tcpip.c */

static void
tcpip_thread(void *arg)
{
  /* ... */
  while (1) {                          /* MAIN Loop */
    
    /* wait for a message, timeouts are processed while waiting */
    sys_timeouts_mbox_fetch(&mbox, (void **)&msg);
    LOCK_TCPIP_CORE();
    switch (msg->type) {
    /* ... */

    case TCPIP_MSG_CALLBACK:
      msg->msg.cb.function(msg->msg.cb.ctx);
      memp_free(MEMP_TCPIP_MSG_API, msg);
      break;

    /* ... */
    }
  }
}
/* api_msg.c */
void
do_gethostbyname(void *arg)
{
  struct dns_api_msg *msg = (struct dns_api_msg*)arg;

  *msg->err = dns_gethostbyname(msg->name, msg->addr, do_dns_found, msg);
  if (*msg->err != ERR_INPROGRESS) {
    /* on error or immediate success, wake up the application
     * task waiting in netconn_gethostbyname */
    sys_sem_signal(msg->sem);
  }
}
由 api_msg.c line:11 sys_sem_signal() 可以看到,如果查詢成功,就會 signal api_lib.c line:9 的 semaphore,結束等待。

由上面的描述可以發現,這種方法大幅減少記憶體拷貝,不用設計複雜的 protocol,可以輕易套用到 Windows 與 Linux 上,不過有幾點要注意:
  1. lwIP 大部分的使用環境(RTOS)沒有區分 user mode or kernel mode,所以產生 semaphore 並不是耗時的運算,如果 Windows/Linux 要這麼做的話可以事先產生好一組 semaphore pool 備用。
  2. 如果採用 loopback socket,則不需要使用 semaphore,只需要把 error code 透過 socket 寫回即可。
  3. 每次從 memp_malloc() 配置的記憶體區塊大小相同,這個可以簡單的用 linked list 實作,lwIP 為了效率使用 array  模擬,如果是 Windows or Linux 這就不是必要的作法(通常這時候我會用 C++)
當然還要考慮錯誤處理(error handling),有時候呼叫端無法永遠等下去(blocking),如果呼叫端 timeout 而被呼叫端(如上面的 tcpip_thread)不知道,還傻傻的把 error code 從 socket 寫回,呼叫端接下來收到此 error code 可能不知道如何處理,或是與其他呼叫搞混,簡單的作法就是加上一個流水號:


既然無法永遠等下去,那就不能像 netconn_gethostbyname() 把 local variable(msg)的位址傳給 tcpip_thread,這樣會造成無法挽回的災難。例如上圖中的 tcpip_msg, dns_api_msg 我們可以加入一個旗標識別這塊記憶體是哪來的?是 local or static variable,還是動態配置而來,該由呼叫端釋放記憶體還是被呼叫端(如上面的 tcpip_thread)釋放記憶體?如果採用 C++,可以利用 smart pointer or reference counting 減輕負擔。

另外,timeout 機制可以改為呼叫時就把 timeout value 傳給被呼叫端,被呼叫端就可以知道呼叫端可以忍耐多久,在 timeout 時通知呼叫端,應可減輕記憶體管理的成本。

筆者以前做 HMI 時也處理過相關問題,現在看起來可以做得更好,如果老同事看到覺得不錯就拿去用吧!對於開發 IoT or protocol gateway 的人來說,這應該也是有用的技巧,在這裡分享給大家。

沒有留言:

張貼留言