MQTTサーバーを実装しながらGoを学ぶ - その11 goroutineのエラーハンドリング, map, goroutineセーフ

前回の続き。別のgoroutineで発生したエラーをerror channelを使ってハンドリングしてみたいと思います。ハンドリングの処理でmapを使ったのですが、goroutineセーフにするため sync.Map を使ってみました。

目次。

handleSub でエラー

handleSub 関数でサブスクライバに対してPUBLISHを送っている。

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) {
    for publishMessage := range fromBroker {
        bs := publishMessage.ToBytes()
        _, err := conn.Write(bs)
        if err != nil {
            // FIXME
            fmt.Println(err)
        }
    }
}

conn.Write でエラーだった場合のことを考える。例えば、mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断しておく。その後にmosquitto_pubでパブリッシュすると conn.Writeerr を返し、サーバーが以下を出力する。

write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection

Broker がサブスクライバが切断したことに気がつかず、サブスクリプションを削除せずにPUBLISHされたメッセージを配送し続けてしまう。 handleSub goroutineで発生したエラーを Broker goroutineに伝えて、 Broker が不要なサブスクリプションを削除する方法を考える。

別goroutineを受け取るerror channel

PUBLISHメッセージを配送する Broker -> handlerSub という方向のchannelとは別に、 errorhandleSub -> Broker という方向に流すchannelを用意すれば良い。

現在の Broker は以下。

// broker.go

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscription) {
    sMap := newSubscriptionMap()
    for {
        select {
        case sub := <-subscriptions:
            sMap.put(sub.clientID, sub)
        case message := <-fromPub:
            sMap.apply(func(sub *Subscription) {
                sub.pubToSub <- message
            })
        }
    }
}

以下のように selectdoneSubscriptions channelも読むようにする。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan error) {
    sMap := newSubscriptionMap()
    for {
        select {
        case sub := <-subscriptions:
            sMap.put(sub.clientID, sub)
        case message := <-fromPub:
            sMap.apply(func(sub *Subscription) {
                sub.pubToSub <- message
            })
+       case err := <-doneSubscriptions:
+           fmt.Println(err)
+       }
    }
}

doneSubscriptions channelを使って handleSub goroutine から Broker goroutineに error を流す。

server.goでは3つのchannelを生成することになる。

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    pub := make(chan *packet.Publish)
    defer close(pub)
    subscriptions := make(chan *Subscription)
    defer close(subscriptions)
+   doneSubscriptions := make(chan *DoneSubscriptionResult)
+   defer close(doneOfSubscription)

    go Broker(pub, subscriptions, doneSubscriptions)

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        go func() {
-          err = handle(conn, pub, subscriptions)
+           err = handle(conn, pub, subscriptions, doneSubscriptions)
            if err != nil {
                panic(err)
            }
        }()
    }
}

doneSubscriptions channelが 「Run 関数のgoroutine」 -> 「handle関数を実行するgoroutine」 -> 「handleSub 関数のgoroutine」と渡されていくことになる。

つまり書き込み可能なchannelが孫goroutineまで渡されることになる。channelの所有者がはっきりしなくなってきた...。

書き込み可能なchannelを関数に渡すのではなく、関数内でchannelとgoroutineを生成し、読み取り専用channelを返すようにする。

具体的には、 handleSub 関数を以下のようにする。

// server.go

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) <-chan error {
    errCh := make(chan error)
    go func() {
        defer close(errCh)
        for publishMessage := range fromBroker {
            bs := publishMessage.ToBytes()
            _, err := conn.Write(bs)
            if err != nil {
                errCh <- err
            }
        }
    }()
    return errCh
}

errCh channelのライフサイクルが handleSubscription 関数内に閉じている。関数内でchannelを生成、関数内でgoroutineを起動し生成したchanelの所有権を移譲する(goroutine内で書き込みと close をする)。そして関数の返り値は読み取り専用channelとする。

このように1つの関数内にchannelのライフサイクルを閉じ込める書き方が「Go言語による並行処理パターン」では頻出してる。

この handleSub 関数の書き換えによって、 Run 関数で生成した doneSubscriptions channelを孫goroutine( handleSub goroutine)まで渡さなくて良くなる。が、 handleSub 関数から返ってきた読み取り専用channelの読み込みでブロックするため新しくgoroutineを作る必要がある。

       case packet.SUBSCRIBE:
            suback, err := handler.HandleSubscribe(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(suback.ToBytes())
                        if err != nil {
                                return err
                        }
-                       sub := make(chan *packet.Publish)
-                       subscriptionToBroker <- sub
-                       go handleSub(conn, sub)
+                       subscription, errCh := handleSub(clientID, conn)
+                       subscriptionToBroker <- subscription
+                       go func() {
+                               err, ok := <-errCh
+                               if !ok {
+                                       return
+                               }
+                               doneSubscriptions <- err
+                       }()

サブスクリプションの特定

MQTTのClient ID

これで errorhandleSub goroutineから Broker goroutineへ渡すことができそう。 error を受け取った Broker はどのSubscriptionを閉じるかをどうやって決めれば良いだろう?

今まで無視してたMQTTのClient IDを使うことにする。Client IDはCONNECT時にペイロードに含まれていた。

CONNECTパケットのハンドリング時に、Client IDを変数にセットしておく。

func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error {
        defer conn.Close()
 
+       var clientID string
+
        for {
                r := bufio.NewReader(conn)
                mqttReader := packet.NewMQTTReader(r)
                case packet.CONNECT:
-                       connack, err := handler.HandleConnect(mqttReader)
+                       connect, connack, err := handler.HandleConnect(mqttReader)
                        if err != nil {
                                return err
                        }
                        _, err = conn.Write(connack.ToBytes())
                        if err != nil {
                                return err
                        }
+                       clientID = connect.Payload.ClientID
                case packet.SUBSCRIBE:
                        suback, err := handler.HandleSubscribe(mqttReader)
                        if err != nil {

現在の実装だと以下のように読み取り専用channelを Subscription としている。

type Subscription chan<- *packet.Publish

これをstructにしてClient IDを持たせるようにする。さらに、 handleSub goroutineから Broker goroutineへ伝える error もsturctで包んでClient IDを関連づける。

// broker.go

type Subscription struct {
    clientID string
    pubToSub chan<- *packet.Publish
}

func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) {
    pub := make(chan *packet.Publish)
    s := &Subscription{
        clientID: clientID,
        pubToSub: pub,
    }
    return s, pub
}

type DoneSubscriptionResult struct {
    clientID string
    err      error
}

func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult {
    return &DoneSubscriptionResult{clientID, err}
}

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    ...
}

map

現在の Broker の実装では Subscription を配列で管理してる。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションの配列
    var ss []Subscription
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプションを読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        case err := <-doneSubscriptions:
            fmt.Println(err)
        }
    }
}

Subscription を配列で管理してるのをやめて、キーをClientIDとしたmapを使うことにする。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションのmap
    var sMap map[string]*Subscription
    sMap = make(map[string]*Subscription)
    for {
        select {
        case sub := <-subscriptions:
            // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加
            sMap[sub.clientID] = sub
        case message := <-fromPub:
            for _, sub := range sMap {
                sub.pubToSub <- message
            }
        case err := <-doneSubscriptions:
            fmt.Println(err)
        }
    }
}

次は selectdoneSubscriptions channelから読み取った時の処理。 close とmapから削除する処理をすれば良い。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションのmap
    var sMap map[string]*Subscription
    sMap = make(map[string]*Subscription)
    for {
        select {
        case sub := <-subscriptions:
            // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加
            sMap[sub.clientID] = sub
        case message := <-fromPub:
            for _, sub := range sMap {
                sub.pubToSub <- message
            }
        case done := <-doneSubscriptions:
            fmt.Printf("close subscription: %v\n", done.clientID)
            if done.err != nil {
                fmt.Println(done.err)
            }
            s, ok := sMap[sub.clientID]
            if ok {
                sub := s.(*Subscription)
                close(s.pubToSub)
                delete(sMap, done.clientID)
            }
        }
    }
}

sync.Mutex

複数goroutineで同じmapを操作しているのが気になる。mapは複数のgoroutineから触れるように設計されていない。スレッドセーフならぬgoroutineセーフではない。

sync.Mutexパッケージを使って同期をとりgoroutineセーフにしておく。

sync.Mutexを使うときは1つの構造体、というか型に閉じておくと良さそう。 typesubscriptionMap というstructを作る。

type subscriptionMap struct {
    mu    sync.Mutex
    value map[string]*Subscription
}

func newSubscriptionMap() *subscriptionMap {
    m := make(map[string]*Subscription)
    return &subscriptionMap{
        mu:    sync.Mutex{},
        value: m,
    }
}

func (m *subscriptionMap) get(clientID string) *Subscription {
    m.mu.Lock()
    s, ok := sMap[sub.clientID]
    m.mu.Unlock()
    if ok {
        return s.(*Subscription)
    }
    return nil
}

func (m *subscriptionMap) put(clientID string, s *Subscription) {
    m.mu.Lock()
    m.value[clientID] = s
    m.mu.Unlock()
}

func (m *subscriptionMap) delete(clientID string) {
    m.mu.Lock()
    delete(m.value, clientID)
    m.mu.Unlock()
}

と、ここまで書いて、mapに対する range はどうしたら良いのだろう?という疑問が...

sync.Map

ググってると sync.Map というのを発見!

Go Docを見ると sync.Map で扱う値はinterface{}であることが分かる。 type を使って自前のstructで包んだ方が良さそう。

結果的にbroker.goは以下のようになった。

type Subscription struct {
    clientID string
    pubToSub chan<- *packet.Publish
}

func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) {
    pub := make(chan *packet.Publish)
    s := &Subscription{
        clientID: clientID,
        pubToSub: pub,
    }
    return s, pub
}

type DoneSubscriptionResult struct {
    clientID string
    err      error
}

func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult {
    return &DoneSubscriptionResult{clientID, err}
}

type subscriptionMap struct {
    syncMap sync.Map
}

func newSubscriptionMap() *subscriptionMap {
    return &subscriptionMap{}
}

func (m *subscriptionMap) get(clientID string) *Subscription {
    s, ok := m.syncMap.Load(clientID)
    if !ok {
        return nil
    }
    return s.(*Subscription)
}

func (m *subscriptionMap) put(clientID string, s *Subscription) {
    m.syncMap.Store(clientID, s)
}

func (m *subscriptionMap) delete(clientID string) {
    m.syncMap.Delete(clientID)
}

func (m *subscriptionMap) apply(f func(s *Subscription)) {
    m.syncMap.Range(func(k, v interface{}) bool {
        s := v.(*Subscription)
        f(s)
        return true
    })
}

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションのmap
    sMap := newSubscriptionMap()
    for {
        select {
        case sub := <-subscriptions:
            // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加
            sMap.put(sub.clientID, sub)
        case message := <-fromPub:
            // 全てのサブスクリプションにメッセージを配送
            sMap.apply(func(sub *Subscription) {
                sub.pubToSub <- message
            })
        case done := <-doneSubscriptions:
            fmt.Printf("close subscription: %v\n", done.clientID)
            if done.err != nil {
                fmt.Println(done.err)
            }
            sub := sMap.get(done.clientID)
            if sub != nil {
                close(sub.pubToSub)
                sMap.delete(done.clientID)
            }
        }
    }
}

動かす

動かしてみる。

mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断する。その後にmosquitto_pubでパブリッシュするとサーバーが以下を出力する。

write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection

ここまでは最初と変わらないけど、もう一度パブリッシュするともう出力されない。これは1回目の Write でエラーが発生した時に、 Brokerサブスクリプションの削除ができたから。

これでhandleSubでエラーが発生した場合にBrokerでハンドリングする実装ができた!

おしまい

ただこれだとエラーが発生するまでサブスクリプションが削除されないし、 handleSub goroutineが残り続ける。本当はクライアントが切断したタイミングでサブスクリプションを削除したい。

次回はContextを使うことでこれを解決しようと思います。

今回の学び。

MQTTサーバーを実装しながらGoを学ぶ - その10 type, "chan<-"と"<-chan"

前回、goroutine, channel, selectを使って Broker を実装しました。この Broker をサーバーで利用し、MQTTでメッセージを配送してみます。

その前に <-chan chan<- []byte という型が読みづらいので type を使って対応します。その後、読み書き可能なchannelを、 chan<- , <-chan という型に変換しgoroutineへ渡しメッセージの配送を実現します。

目次

typeで型をつくる

現在の実装だとサブスクリプションchan<- []byte で表現している。channelのchannel( <-chan chan<- []byte )やchannelの配列( []chan<- []byte )が登場したので少々読みづらい。

type を使って chan<- []byte という型から Subscription という型を作ることにする。

type Subscription chan<- []byte

Broker の実装はこうなる。 <-chan chan<- []byte<-chan Subscription[]chan<- []byte[]Subscription となって読みやすい。

// broker.go
package mqtt

type Subscription chan<- []byte

func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) {
    // サブスクリプションの配列
    var ss []Subscription
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

サーバーに組み込む

[]byteからPublishへ変更

サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker で配送してるメッセージの型を []byte から packet.Publish に変更する。 packet.Publish を生成するための packet.NewPublish() という関数を追加。

func NewPublish(topicName string, message []byte) Publish {
    variableHeader := NewPublishVariableHeader(topicName)
    fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message)))
    return Publish{fixedHeader, variableHeader, message}
}

SubscriptionBroker の定義を変更する。

-type Subscription chan<- []byte
+import "github.com/bati11/oreno-mqtt/mqtt/packet"
 
-func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) {
+type Subscription chan<- *packet.Publish
+
+func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) {
     // サブスクリプションの配列
     var ss []Subscription
     for {

server.go

実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。

package mqtt

import (
    "bufio"
    "fmt"
    "io"
    "net"

    "github.com/bati11/oreno-mqtt/mqtt/handler"
    "github.com/bati11/oreno-mqtt/mqtt/packet"
)

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    pub := make(chan *packet.Publish)
    defer close(pub)
    subscriptions := make(chan Subscription)
    defer close(subscriptions)

    go Broker(pub, subscriptions)

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        go func() {
            err = handle(conn, pub, subscriptions)
            if err != nil {
                panic(err)
            }
        }()
    }
}

func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        mqttReader := packet.NewMQTTReader(r)
        packetType, err := mqttReader.ReadPacketType()
        if err != nil {
            if err == io.EOF {
                return nil
            }
            return err
        }
        switch packetType {
        case packet.PUBLISH:
            err = handler.HandlePublish(mqttReader)
            if err != nil {
                return err
            }
            // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む
        case packet.CONNECT:
            connack, err := handler.HandleConnect(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(connack.ToBytes())
            if err != nil {
                return err
            }
        case packet.SUBSCRIBE:
            suback, err := handler.HandleSubscribe(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(suback.ToBytes())
            if err != nil {
                return err
            }
            // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む
        case packet.PINGREQ:
            pingresp, err := handler.HandlePingreq(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(pingresp.ToBytes())
            if err != nil {
                return err
            }
        case packet.DISCONNECT:
            return nil
        }
    }
}

後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。

PUBLISHのハンドリング

PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker へ配送する。

                case packet.PUBLISH:
-                       err = handler.HandlePublish(mqttReader)
+                       publish, err := handler.HandlePublish(mqttReader)
                        if err != nil {
                                return err
                        }
+                       publishToBroker <- publish

SUBSCRIBEのハンドリング

SUBSCRIBEのハンドリングでは、Broker からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub 関数を実行する。

                case packet.SUBSCRIBE:
                        suback, err := handler.HandleSubscribe(mqttReader)
                        if err != nil {
                            return err
                        }
                        _, err = conn.Write(suback.ToBytes())
                        if err != nil {
                                return err
                        }
+                       sub := make(chan *packet.Publish)
+                       subscriptionToBroker <- sub
+                       go handleSub(conn, sub)

sub というchannelを生成している。このchannelは2つのgoroutineに渡す。

  • 1: subscriptionToBroker channelに書き込むことで Broker goroutineに渡す。受け取り側では、型が chan *packet.Publish から chan<- *packet.Publish (つまり Subscription )になる
  • 2: handleSub 関数に引数で sub を渡す。 handleSub 関数では <-chan *packet.Publish として引数の型を定義しておく

これで Broker から配送されてくるメッセージを handleSub goroutineで受け取れる。

channelの所有権の観点から見ると、 Broker goroutineが sub channelの所有者であり、 handleSub goroutineが sub channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish )と読み取り専用channel( <-chan *packet.Publish )は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。

handleSub 関数の実装は以下。

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) {
    for publishMessage := range fromBroker {
        bs := publishMessage.ToBytes()
        _, err := conn.Write(bs)
        if err != nil {
            // FIXME
            fmt.Println(err)
        }
    }
}

Broker から配送されてくるメッセージを channelから読み込んで、 net.Conn に書き込む。 conn に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。

動かす

実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。

$ mosquitto_sub -i custom-client-id -t hoge

mosquittoクライアントでパブリッシュする。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"

サブスクライバの方に "Hello" と表示された!

$ mosquitto_sub -i custom-client-id -t hoge
Hello

ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。

Topicのフィルタを無視していますが、そのうち・・・。

おしまい

ここまでの実装は以下にあります(tagが0.0.2)。

github.com

server.goで sub というchannelを生成し Broker goroutineへ渡しました。 Broker goroutineはこのchannelの所有者ですが、channelを close していません。サブスクライバへ送信するために net.Conn に書き込みましたが、このときエラーが発生した場合は Broker goroutineでchannelを close したいです。

次回は、 handleSub goroutineで発生するエラーのハンドリングを実装してみたいと思います。

今回の学び

MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect

前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。

目次。

サーバーからクライアントへのPUBLISH

MQTTのPUBLISHの仕様を読み直す。

The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.

クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。

サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。

The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.

SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。

MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。

https://cloud.google.com/pubsub/images/many-to-many.svg

引用元: https://cloud.google.com/pubsub/docs/overview

いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。

channel

今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。

Broker という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte と表現して、サブスクライバを chan<- []byte と表現する。channelからの値の取り出しには range を使う。 range はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。

func Broker(fromPub <-chan []byte, toSub chan<- []byte) {
    defer close(toSub)
    for message := range fromPub {
        toSub <- message
    }
}

channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。

チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。

channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。

どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。

今の実装では、 Broker 関数は toSub に書き込むので所有権を持つことになる。所有権があるので、 deferclose するようにしてる。

所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。

  • nilに対してclose
  • closeしたchannelをclose

この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。

channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <- で読み取る時は2つ目の返り値の値を確認するか、 range で読み込むようにする。 <- で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。

また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。

channelの所有者を意識しながら、 Broker を使うコードを書いてみる。

package mqtt_test
  
import (
    "testing"

    "github.com/bati11/oreno-mqtt/mqtt"
)

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    sub := make(chan []byte)

    // "broker" goroutine
    go mqtt.Broker(pub, sub)

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

"broker" goroutineは sub channelに対して書き込みと close を行うので sub channelの所有者であり、 pub channelの利用者である。

"pub" goroutineは pub channelの所有者である。

"sub"以降の処理(main goroutine)は、 sub channelの利用者である。値を読み込むのは1回だけなので range ではなく <- で読み取りをしている。

channelは値

Broker を実行するときに sub channelを渡している。 sub channel、つまり Broker が引数で受け取ってる chan<- []byte は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。

しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker goroutine生成時ではなく、後からgoroutineに chan<- []byte を渡したい。

Broker の引数を chan<- []byte のchannel、つまり <-chan chan<- []byte とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。

channelのchannelを使うとgoroutine起動後に chan<- []byte を受け取ることができる。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    for sub := range subscriptions {
        ...

Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプションchan<- []byte )をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }
    ...
}

fromPub channelからメッセージを読み取ったら、サブスクリプションの配列を range を使って順にメッセージを送る。以下のようなイメージ。channelに対する range と配列に対する range がごちゃ混ぜにならないように注意。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }

    for message := range fromPub {
        // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
        for _, sub := range ss {
            sub <- message
        }
    }
}

しかし、これだと subscriptions channelが閉じられるまで1つ目の for が終わらないのでダメ。

select

複数のチャネルから読み取りたいときは、select を使う。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

TestBroker を書き換えてみる。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE
    sub := make(chan []byte)
    subscriptions <- sub // 新たなサブスクリプションをBrokerに送る

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

これでOK!複数クライアントがSubscribeしても問題ないことを確認。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result1) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result1))
    }
    result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result2) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result2))
    }
}

sync.WaitGroup

ところで、"sub"以降の処理で、 sub1 , sub2 という順番にchannelから読み込んでるが、 Broker 内でサブスクリプションchan<- []byte )にメッセージを流す順番に依存してる。仮に sub2 , sub1 と順番を逆にして読み取るとブロックしてテストが終了しない。

sub1sub2select で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。

sub1 , sub2 からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker 自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup を使ってそれぞれのgoroutineの終了を待つ。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result1) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result1))
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result2) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result2))
        }
    }()

    wg.Wait()
}

これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。

おしまい

MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。

でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。

今回の学び