MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect

前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。

目次。

サーバーからクライアントへのPUBLISH

MQTTのPUBLISHの仕様を読み直す。

The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.

クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。

サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。

The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.

SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。

MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。

https://cloud.google.com/pubsub/images/many-to-many.svg

引用元: https://cloud.google.com/pubsub/docs/overview

いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。

channel

今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。

Broker という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte と表現して、サブスクライバを chan<- []byte と表現する。channelからの値の取り出しには range を使う。 range はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。

func Broker(fromPub <-chan []byte, toSub chan<- []byte) {
    defer close(toSub)
    for message := range fromPub {
        toSub <- message
    }
}

channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。

チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。

channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。

どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。

今の実装では、 Broker 関数は toSub に書き込むので所有権を持つことになる。所有権があるので、 deferclose するようにしてる。

所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。

  • nilに対してclose
  • closeしたchannelをclose

この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。

channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <- で読み取る時は2つ目の返り値の値を確認するか、 range で読み込むようにする。 <- で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。

また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。

channelの所有者を意識しながら、 Broker を使うコードを書いてみる。

package mqtt_test
  
import (
    "testing"

    "github.com/bati11/oreno-mqtt/mqtt"
)

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    sub := make(chan []byte)

    // "broker" goroutine
    go mqtt.Broker(pub, sub)

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

"broker" goroutineは sub channelに対して書き込みと close を行うので sub channelの所有者であり、 pub channelの利用者である。

"pub" goroutineは pub channelの所有者である。

"sub"以降の処理(main goroutine)は、 sub channelの利用者である。値を読み込むのは1回だけなので range ではなく <- で読み取りをしている。

channelは値

Broker を実行するときに sub channelを渡している。 sub channel、つまり Broker が引数で受け取ってる chan<- []byte は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。

しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker goroutine生成時ではなく、後からgoroutineに chan<- []byte を渡したい。

Broker の引数を chan<- []byte のchannel、つまり <-chan chan<- []byte とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。

channelのchannelを使うとgoroutine起動後に chan<- []byte を受け取ることができる。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    for sub := range subscriptions {
        ...

Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプションchan<- []byte )をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }
    ...
}

fromPub channelからメッセージを読み取ったら、サブスクリプションの配列を range を使って順にメッセージを送る。以下のようなイメージ。channelに対する range と配列に対する range がごちゃ混ぜにならないように注意。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }

    for message := range fromPub {
        // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
        for _, sub := range ss {
            sub <- message
        }
    }
}

しかし、これだと subscriptions channelが閉じられるまで1つ目の for が終わらないのでダメ。

select

複数のチャネルから読み取りたいときは、select を使う。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

TestBroker を書き換えてみる。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE
    sub := make(chan []byte)
    subscriptions <- sub // 新たなサブスクリプションをBrokerに送る

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

これでOK!複数クライアントがSubscribeしても問題ないことを確認。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result1) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result1))
    }
    result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result2) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result2))
    }
}

sync.WaitGroup

ところで、"sub"以降の処理で、 sub1 , sub2 という順番にchannelから読み込んでるが、 Broker 内でサブスクリプションchan<- []byte )にメッセージを流す順番に依存してる。仮に sub2 , sub1 と順番を逆にして読み取るとブロックしてテストが終了しない。

sub1sub2select で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。

sub1 , sub2 からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker 自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup を使ってそれぞれのgoroutineの終了を待つ。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result1) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result1))
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result2) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result2))
        }
    }()

    wg.Wait()
}

これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。

おしまい

MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。

でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。

今回の学び