MQTTサーバーを実装しながらGoを学ぶ - その11 goroutineのエラーハンドリング, map, goroutineセーフ

前回の続き。別のgoroutineで発生したエラーをerror channelを使ってハンドリングしてみたいと思います。ハンドリングの処理でmapを使ったのですが、goroutineセーフにするため sync.Map を使ってみました。

目次。

handleSub でエラー

handleSub 関数でサブスクライバに対してPUBLISHを送っている。

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) {
    for publishMessage := range fromBroker {
        bs := publishMessage.ToBytes()
        _, err := conn.Write(bs)
        if err != nil {
            // FIXME
            fmt.Println(err)
        }
    }
}

conn.Write でエラーだった場合のことを考える。例えば、mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断しておく。その後にmosquitto_pubでパブリッシュすると conn.Writeerr を返し、サーバーが以下を出力する。

write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection

Broker がサブスクライバが切断したことに気がつかず、サブスクリプションを削除せずにPUBLISHされたメッセージを配送し続けてしまう。 handleSub goroutineで発生したエラーを Broker goroutineに伝えて、 Broker が不要なサブスクリプションを削除する方法を考える。

別goroutineを受け取るerror channel

PUBLISHメッセージを配送する Broker -> handlerSub という方向のchannelとは別に、 errorhandleSub -> Broker という方向に流すchannelを用意すれば良い。

現在の Broker は以下。

// broker.go

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscription) {
    sMap := newSubscriptionMap()
    for {
        select {
        case sub := <-subscriptions:
            sMap.put(sub.clientID, sub)
        case message := <-fromPub:
            sMap.apply(func(sub *Subscription) {
                sub.pubToSub <- message
            })
        }
    }
}

以下のように selectdoneSubscriptions channelも読むようにする。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan error) {
    sMap := newSubscriptionMap()
    for {
        select {
        case sub := <-subscriptions:
            sMap.put(sub.clientID, sub)
        case message := <-fromPub:
            sMap.apply(func(sub *Subscription) {
                sub.pubToSub <- message
            })
+       case err := <-doneSubscriptions:
+           fmt.Println(err)
+       }
    }
}

doneSubscriptions channelを使って handleSub goroutine から Broker goroutineに error を流す。

server.goでは3つのchannelを生成することになる。

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    pub := make(chan *packet.Publish)
    defer close(pub)
    subscriptions := make(chan *Subscription)
    defer close(subscriptions)
+   doneSubscriptions := make(chan *DoneSubscriptionResult)
+   defer close(doneOfSubscription)

    go Broker(pub, subscriptions, doneSubscriptions)

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        go func() {
-          err = handle(conn, pub, subscriptions)
+           err = handle(conn, pub, subscriptions, doneSubscriptions)
            if err != nil {
                panic(err)
            }
        }()
    }
}

doneSubscriptions channelが 「Run 関数のgoroutine」 -> 「handle関数を実行するgoroutine」 -> 「handleSub 関数のgoroutine」と渡されていくことになる。

つまり書き込み可能なchannelが孫goroutineまで渡されることになる。channelの所有者がはっきりしなくなってきた...。

書き込み可能なchannelを関数に渡すのではなく、関数内でchannelとgoroutineを生成し、読み取り専用channelを返すようにする。

具体的には、 handleSub 関数を以下のようにする。

// server.go

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) <-chan error {
    errCh := make(chan error)
    go func() {
        defer close(errCh)
        for publishMessage := range fromBroker {
            bs := publishMessage.ToBytes()
            _, err := conn.Write(bs)
            if err != nil {
                errCh <- err
            }
        }
    }()
    return errCh
}

errCh channelのライフサイクルが handleSubscription 関数内に閉じている。関数内でchannelを生成、関数内でgoroutineを起動し生成したchanelの所有権を移譲する(goroutine内で書き込みと close をする)。そして関数の返り値は読み取り専用channelとする。

このように1つの関数内にchannelのライフサイクルを閉じ込める書き方が「Go言語による並行処理パターン」では頻出してる。

この handleSub 関数の書き換えによって、 Run 関数で生成した doneSubscriptions channelを孫goroutine( handleSub goroutine)まで渡さなくて良くなる。が、 handleSub 関数から返ってきた読み取り専用channelの読み込みでブロックするため新しくgoroutineを作る必要がある。

       case packet.SUBSCRIBE:
            suback, err := handler.HandleSubscribe(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(suback.ToBytes())
                        if err != nil {
                                return err
                        }
-                       sub := make(chan *packet.Publish)
-                       subscriptionToBroker <- sub
-                       go handleSub(conn, sub)
+                       subscription, errCh := handleSub(clientID, conn)
+                       subscriptionToBroker <- subscription
+                       go func() {
+                               err, ok := <-errCh
+                               if !ok {
+                                       return
+                               }
+                               doneSubscriptions <- err
+                       }()

サブスクリプションの特定

MQTTのClient ID

これで errorhandleSub goroutineから Broker goroutineへ渡すことができそう。 error を受け取った Broker はどのSubscriptionを閉じるかをどうやって決めれば良いだろう?

今まで無視してたMQTTのClient IDを使うことにする。Client IDはCONNECT時にペイロードに含まれていた。

CONNECTパケットのハンドリング時に、Client IDを変数にセットしておく。

func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error {
        defer conn.Close()
 
+       var clientID string
+
        for {
                r := bufio.NewReader(conn)
                mqttReader := packet.NewMQTTReader(r)
                case packet.CONNECT:
-                       connack, err := handler.HandleConnect(mqttReader)
+                       connect, connack, err := handler.HandleConnect(mqttReader)
                        if err != nil {
                                return err
                        }
                        _, err = conn.Write(connack.ToBytes())
                        if err != nil {
                                return err
                        }
+                       clientID = connect.Payload.ClientID
                case packet.SUBSCRIBE:
                        suback, err := handler.HandleSubscribe(mqttReader)
                        if err != nil {

現在の実装だと以下のように読み取り専用channelを Subscription としている。

type Subscription chan<- *packet.Publish

これをstructにしてClient IDを持たせるようにする。さらに、 handleSub goroutineから Broker goroutineへ伝える error もsturctで包んでClient IDを関連づける。

// broker.go

type Subscription struct {
    clientID string
    pubToSub chan<- *packet.Publish
}

func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) {
    pub := make(chan *packet.Publish)
    s := &Subscription{
        clientID: clientID,
        pubToSub: pub,
    }
    return s, pub
}

type DoneSubscriptionResult struct {
    clientID string
    err      error
}

func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult {
    return &DoneSubscriptionResult{clientID, err}
}

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    ...
}

map

現在の Broker の実装では Subscription を配列で管理してる。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションの配列
    var ss []Subscription
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプションを読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        case err := <-doneSubscriptions:
            fmt.Println(err)
        }
    }
}

Subscription を配列で管理してるのをやめて、キーをClientIDとしたmapを使うことにする。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションのmap
    var sMap map[string]*Subscription
    sMap = make(map[string]*Subscription)
    for {
        select {
        case sub := <-subscriptions:
            // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加
            sMap[sub.clientID] = sub
        case message := <-fromPub:
            for _, sub := range sMap {
                sub.pubToSub <- message
            }
        case err := <-doneSubscriptions:
            fmt.Println(err)
        }
    }
}

次は selectdoneSubscriptions channelから読み取った時の処理。 close とmapから削除する処理をすれば良い。

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションのmap
    var sMap map[string]*Subscription
    sMap = make(map[string]*Subscription)
    for {
        select {
        case sub := <-subscriptions:
            // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加
            sMap[sub.clientID] = sub
        case message := <-fromPub:
            for _, sub := range sMap {
                sub.pubToSub <- message
            }
        case done := <-doneSubscriptions:
            fmt.Printf("close subscription: %v\n", done.clientID)
            if done.err != nil {
                fmt.Println(done.err)
            }
            s, ok := sMap[sub.clientID]
            if ok {
                sub := s.(*Subscription)
                close(s.pubToSub)
                delete(sMap, done.clientID)
            }
        }
    }
}

sync.Mutex

複数goroutineで同じmapを操作しているのが気になる。mapは複数のgoroutineから触れるように設計されていない。スレッドセーフならぬgoroutineセーフではない。

sync.Mutexパッケージを使って同期をとりgoroutineセーフにしておく。

sync.Mutexを使うときは1つの構造体、というか型に閉じておくと良さそう。 typesubscriptionMap というstructを作る。

type subscriptionMap struct {
    mu    sync.Mutex
    value map[string]*Subscription
}

func newSubscriptionMap() *subscriptionMap {
    m := make(map[string]*Subscription)
    return &subscriptionMap{
        mu:    sync.Mutex{},
        value: m,
    }
}

func (m *subscriptionMap) get(clientID string) *Subscription {
    m.mu.Lock()
    s, ok := sMap[sub.clientID]
    m.mu.Unlock()
    if ok {
        return s.(*Subscription)
    }
    return nil
}

func (m *subscriptionMap) put(clientID string, s *Subscription) {
    m.mu.Lock()
    m.value[clientID] = s
    m.mu.Unlock()
}

func (m *subscriptionMap) delete(clientID string) {
    m.mu.Lock()
    delete(m.value, clientID)
    m.mu.Unlock()
}

と、ここまで書いて、mapに対する range はどうしたら良いのだろう?という疑問が...

sync.Map

ググってると sync.Map というのを発見!

Go Docを見ると sync.Map で扱う値はinterface{}であることが分かる。 type を使って自前のstructで包んだ方が良さそう。

結果的にbroker.goは以下のようになった。

type Subscription struct {
    clientID string
    pubToSub chan<- *packet.Publish
}

func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) {
    pub := make(chan *packet.Publish)
    s := &Subscription{
        clientID: clientID,
        pubToSub: pub,
    }
    return s, pub
}

type DoneSubscriptionResult struct {
    clientID string
    err      error
}

func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult {
    return &DoneSubscriptionResult{clientID, err}
}

type subscriptionMap struct {
    syncMap sync.Map
}

func newSubscriptionMap() *subscriptionMap {
    return &subscriptionMap{}
}

func (m *subscriptionMap) get(clientID string) *Subscription {
    s, ok := m.syncMap.Load(clientID)
    if !ok {
        return nil
    }
    return s.(*Subscription)
}

func (m *subscriptionMap) put(clientID string, s *Subscription) {
    m.syncMap.Store(clientID, s)
}

func (m *subscriptionMap) delete(clientID string) {
    m.syncMap.Delete(clientID)
}

func (m *subscriptionMap) apply(f func(s *Subscription)) {
    m.syncMap.Range(func(k, v interface{}) bool {
        s := v.(*Subscription)
        f(s)
        return true
    })
}

func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) {
    // サブスクリプションのmap
    sMap := newSubscriptionMap()
    for {
        select {
        case sub := <-subscriptions:
            // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加
            sMap.put(sub.clientID, sub)
        case message := <-fromPub:
            // 全てのサブスクリプションにメッセージを配送
            sMap.apply(func(sub *Subscription) {
                sub.pubToSub <- message
            })
        case done := <-doneSubscriptions:
            fmt.Printf("close subscription: %v\n", done.clientID)
            if done.err != nil {
                fmt.Println(done.err)
            }
            sub := sMap.get(done.clientID)
            if sub != nil {
                close(sub.pubToSub)
                sMap.delete(done.clientID)
            }
        }
    }
}

動かす

動かしてみる。

mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断する。その後にmosquitto_pubでパブリッシュするとサーバーが以下を出力する。

write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection

ここまでは最初と変わらないけど、もう一度パブリッシュするともう出力されない。これは1回目の Write でエラーが発生した時に、 Brokerサブスクリプションの削除ができたから。

これでhandleSubでエラーが発生した場合にBrokerでハンドリングする実装ができた!

おしまい

ただこれだとエラーが発生するまでサブスクリプションが削除されないし、 handleSub goroutineが残り続ける。本当はクライアントが切断したタイミングでサブスクリプションを削除したい。

次回はContextを使うことでこれを解決しようと思います。

今回の学び。