MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect

前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。

目次。

サーバーからクライアントへのPUBLISH

MQTTのPUBLISHの仕様を読み直す。

The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.

クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。

サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。

The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.

SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。

MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。

https://cloud.google.com/pubsub/images/many-to-many.svg

引用元: https://cloud.google.com/pubsub/docs/overview

いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。

channel

今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。

Broker という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte と表現して、サブスクライバを chan<- []byte と表現する。channelからの値の取り出しには range を使う。 range はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。

func Broker(fromPub <-chan []byte, toSub chan<- []byte) {
    defer close(toSub)
    for message := range fromPub {
        toSub <- message
    }
}

channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。

チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。

channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。

どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。

今の実装では、 Broker 関数は toSub に書き込むので所有権を持つことになる。所有権があるので、 deferclose するようにしてる。

所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。

  • nilに対してclose
  • closeしたchannelをclose

この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。

channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <- で読み取る時は2つ目の返り値の値を確認するか、 range で読み込むようにする。 <- で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。

また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。

channelの所有者を意識しながら、 Broker を使うコードを書いてみる。

package mqtt_test
  
import (
    "testing"

    "github.com/bati11/oreno-mqtt/mqtt"
)

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    sub := make(chan []byte)

    // "broker" goroutine
    go mqtt.Broker(pub, sub)

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

"broker" goroutineは sub channelに対して書き込みと close を行うので sub channelの所有者であり、 pub channelの利用者である。

"pub" goroutineは pub channelの所有者である。

"sub"以降の処理(main goroutine)は、 sub channelの利用者である。値を読み込むのは1回だけなので range ではなく <- で読み取りをしている。

channelは値

Broker を実行するときに sub channelを渡している。 sub channel、つまり Broker が引数で受け取ってる chan<- []byte は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。

しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker goroutine生成時ではなく、後からgoroutineに chan<- []byte を渡したい。

Broker の引数を chan<- []byte のchannel、つまり <-chan chan<- []byte とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。

channelのchannelを使うとgoroutine起動後に chan<- []byte を受け取ることができる。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    for sub := range subscriptions {
        ...

Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプションchan<- []byte )をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }
    ...
}

fromPub channelからメッセージを読み取ったら、サブスクリプションの配列を range を使って順にメッセージを送る。以下のようなイメージ。channelに対する range と配列に対する range がごちゃ混ぜにならないように注意。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }

    for message := range fromPub {
        // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
        for _, sub := range ss {
            sub <- message
        }
    }
}

しかし、これだと subscriptions channelが閉じられるまで1つ目の for が終わらないのでダメ。

select

複数のチャネルから読み取りたいときは、select を使う。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

TestBroker を書き換えてみる。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE
    sub := make(chan []byte)
    subscriptions <- sub // 新たなサブスクリプションをBrokerに送る

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

これでOK!複数クライアントがSubscribeしても問題ないことを確認。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result1) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result1))
    }
    result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result2) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result2))
    }
}

sync.WaitGroup

ところで、"sub"以降の処理で、 sub1 , sub2 という順番にchannelから読み込んでるが、 Broker 内でサブスクリプションchan<- []byte )にメッセージを流す順番に依存してる。仮に sub2 , sub1 と順番を逆にして読み取るとブロックしてテストが終了しない。

sub1sub2select で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。

sub1 , sub2 からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker 自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup を使ってそれぞれのgoroutineの終了を待つ。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result1) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result1))
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result2) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result2))
        }
    }()

    wg.Wait()
}

これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。

おしまい

MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。

でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。

今回の学び

MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof

前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。

目次。

複数クライアントからの接続

クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
Error: Unknown error.

すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error. が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。

具体的にコードで確認する。

// server.go
func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    fmt.Println("server starts at localhost:1883")

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        err = handle(conn)
        if err != nil {
            fmt.Printf("%v", err)
            panic(err)
        }
    }
}

func handle(conn net.Conn) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        ...
    }
}

最初のクライアントから接続が来ると、 Accept() が net.Conn を返してきてhandle内で処理する。handle内では for { } でループすることで、クライアントとやりとりする。

この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { } でループしているので、 Accept() が処理されない。そのため、1つのクライアントしか捌けない。

並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。

1つのクライアントを1つのgoroutineで取り扱うようにする。

--- a/mqtt/server.go
+++ b/mqtt/server.go
@@ -23,11 +23,13 @@ func Run() {
                        panic(err)
                }
 
-               err = handle(conn)
-               if err != nil {
-                       fmt.Printf("%v", err)
-                       panic(err)
-               }
+               go func(conn net.Conn) {
+                       err = handle(conn)
+                       if err != nil {
+                               fmt.Printf("%v", err)
+                               panic(err)
+                       }
+               }(conn)
        }
 }

goroutine

以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。

goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。

スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。


両者にはほかにも次のような細かい違いがあります。

・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない

Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。

こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。

pprof

goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。

pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。

pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。

pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof" してHTTPサーバーを起動するだけ。

 package main
 
-import "github.com/bati11/oreno-mqtt/mqtt"
+import (
+       "log"
+       "net/http"
+       _ "net/http/pprof"
+
+       "github.com/bati11/oreno-mqtt/mqtt"
+)
 
 func main() {
+       go func() {
+               log.Println(http.ListenAndServe("localhost:6060", nil))
+       }()
        mqtt.Run()
 }

http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!

さらに、Go1.10以降ではGUIでグラフを確認することもできる。

以下のコマンドを実行すると

$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1

ブラウザが立ち上がり、こんなグラフが確認できる!

おしまい

goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。

今回の学び

MQTTサーバーを実装しながらGoを学ぶ - その7 type switch, interface再考, プライベートな関数のテスト

前回で、クライアントからのPUBLISHを受け取るところまで実装できました。今回はSUBSCRIBEを受け取るところを実装します。いつも引用してる図で言うと、右側の「Computer」や「Mobile device」からMQTT Brokerへの通信です。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

サブスクライブしているクライアントにパブリッシュされたメッセージを送るところまでやりたかったのですが、Goのinterfaceに対する考え方 "Accept interfaces, Return structs" について考えていたらそこまでいきませんでした・・・。

目次。

SUBSCRIBE

SUBSCRIBEパケットを見てみる

実装に入る前に、mosquittoを使ってSUBSCRIBEパケットを眺めてみる。まずはWiresharkを起動。その後、mosquittoサーバーを起動。

$ /usr/local/sbin/mosquitto -v

mosquittoクライアントでサブスクライブする。-iで指定しているのは Client Identifier。 -t で指定しているのはトピック名。

$ mosquitto_sub -i sample-subscriber -t hoge

Wiresharkを見てみる。

https://i.gyazo.com/45685fa968124810406d4a52c0826e5d.png

ふむふむ。クライアントからサーバーへ Subscribe Requestが送られ、サーバーからクライアントへSubscribe Ackを応答してる。

しばらく放っておくと...pingしてる。

https://i.gyazo.com/bc3c0136e9868024ff86543b58b2bbea.png

実装する通信の流れ

以下の通信を実現できるようにMQTTサーバーを実装していく。

  1. クライアント → CONNECT → サーバー
  2. クライアント ← CONNACK ← サーバー
  3. クライアント → SUBSCRIBE → サーバー
  4. クライアント ← SUBACK ← サーバー

以下も実装する。

  1. クライアント → PINGREQ → サーバー
  2. クライアント ← PINGRESP ← サーバー

CONNECTとCONNACK

実装済み。

SUBSCRIBEとSUBACK

固定ヘッダーの仕様を勘違いしていたことに気がつく...

SUBSCRIBEパケットを実装する。

SUBSCRIBEパケットは固定ヘッダー、可変ヘッダー、ペイロードで構成される。

固定ヘッダーの仕様を読んでると、大きな勘違いをしてることに気がついた・・・。

Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection

固定ヘッダーの3,2,1,0ビットは予約されてる。以下のようになってるらしい。

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Control Packet type (8) Reserved
1 0 0 0 0 0 0 1
byte2... Remaining Length

ふむふむ。つまり、固定ヘッダーの Dup QoS Retain フィールドが定数なのかな、と思ったけれども・・・。

あれ?と思って改めて固定ヘッダーの仕様を見てみる。

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Control Packet type Flags specific to each MQTT Control Packet type
byte2... Remaining Length

3,2,1,0ビット目は Flags specific to each MQTT Control Packet type って書いてある!!!パケットタイプによって意味が変わるのか!

interfaceとtype switchesを使った修正方針

今の固定ヘッダーの実装は以下のようになっている。 Dup QoS Retain フィールドがあるので、これは PUBLISH パケット用の固定ヘッダーだ。 PUBLISH 用の固定ヘッダーを全てのパケットタイプで使うような実装をしていた・・・。

type FixedHeader struct {
    PacketType      byte
    Dup             byte
    QoS1            byte
    QoS2            byte
    Retain          byte
    RemainingLength uint
}

仕様を改めて読み直すと、 PUBLISH パケットのみ3,2,1,0ビットを使っており、他のパケットタイプは全てReservedであり実質使われていない。

修正方針としては以下のようにする。

  1. 既存のFixedHeaderをinterfaceにする
  2. そして、sturctを DefaultFixedHeaderPublishFixedHeader に分ける
  3. server.goでは以下のコードのようにtype switchesを使って分岐する
fixedHeader, err := packet.ToFixedHeader(r)
...
switch fh := fixedHeader.(type) {
case PublishFixedHeader:
    err := handler.HandlePublish(fh, r)
    ...
case packet.DefaultFixedHeader:
    switch fh.PacketType {
    case packet.CONNECT:
        ...
}

この書き方は、以前エラーハンドリングの部分でやったError Typeパターンと同じ考え方になる。

と、思ったけどちょっと考え直す。

"Accept interfaces, Return structs"

Goには "Accept interfaces, Return structs" という言葉がある。関数の引数をinterfaceにして返り値はstructにしよう、という考え方。

なぜか?

以下の記事のまとめにこう書いてある。

The primary reasons listed are:

  • Remove unneeded abstractions
  • Ambiguity of a user’s need on function input
  • Simplify function inputs

本文を読んで自分なりにまとめると

  • Goでは、interfaceを満たすために明示的にimplementsというような記述をする必要はない。メソッドのシグニチャさえ合っていれば適合するinterfaceとして扱うことができる。そのため、本当に必要になるまでinterfaceを使って抽象化した型を作らなくて良い。呼び出される関数側で抽象化が必要なければstructを返せばいい
  • 関数を利用する側の都合を全て把握するのは難しい。そのため引数で受け取るのはinterfaceで抽象化しておくと良い。そうすれば関数を利用する側はinterfaceを満たす色々なstructを渡せるようになる
  • 関数は利用しない引数を受け取るべきではない。同様に、利用しない振る舞い(メソッド)はいらないので、最小のinterfaceを受け取るようにするべき(インタフェース分離の法則)

さっき自分で立てた方針は異なっている。 packet.ToFixedHeader() がinterfaceを返し、 handler.HandlePublish() がstructを引数にとるという "Accept interfaces, Return structs" とは逆の構造。

fixedHeader, err := packet.ToFixedHeader(r)
...
switch fh := fixedHeader.(type) {
case PublishFixedHeader:
    err := handler.HandlePublish(fh, r)
    ...
case packet.DefaultFixedHeader:
    switch fh.PacketType {
    case packet.CONNECT:
        ...
}

先ほどの記事の最後にも書いてあるが、関数が複数の型を返す場合はinterfaceを返すしかないし、structの振る舞い(関数)ではなくフィールドの値が必要な場合は、structとして受け取る必要がある。そのため、現在の実装は "Accept interfaces, Return structs" に従っていないが、それでも良い。

しかし、関数が複数の型を返したいという理由でinterfaceを返して、呼び出し元がtype switchで処理を分岐する、というのがGoのinterfaceの使い方として微妙な気がしてきた。interfaceはあくまで振る舞いを定義するためのもので、型をまとめるためのものではないのでは(じゃあなぜtype switchという構文があるの?というのもあるけど)。

抽象化しない

もっと愚直に実装してみよう。

やりたいことはPacketTypeがPUBLISHかどうかで、FixedHeaderの構造を変えたいだけ。

packetType := PacketType(r)
if packetType == PUBLISH {
    publishFixedHeader := ToPublishFixedHeader(r)
    ...
} else {
    defaultFixedHeader := ToDefaultFixedHeader(r)
    ...
}

こんな感じのコードが書ければいい。ただ、packetTypeを取得するには1バイトreaderから読み取る必要があって、この1バイトにはPacketType以外の情報も含まれている。そのため、readerを1バイト分巻き戻すか、取得した1バイトをFixedHeaderを生成する関数に渡す必要がある。

bufio.Readerを含んだMQTTReaderという独自のstructを用意することにしよう。このstructに読み込んだ1バイトとreaderとを保持させる。

type MQTTReader struct {
    byte1 *byte
    r     *bufio.Reader
}

func NewMQTTReader(r io.Reader) *MQTTReader {
    bufr := bufio.NewReader(r)
    return &MQTTReader{r: bufr}
}

func (d *MQTTReader) ReadPacketType() (PacketType, error) {
    if d.byte1 == nil {
        byte1, err := d.r.ReadByte()
        if err != nil {
            return PacketType(0), err
        }
        d.byte1 = &byte1
    }
    return PacketType(*d.byte1 >> 4), nil
}

server.goでは以下のように分岐する。

r := bufio.NewReader(conn)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
...
if packetType == packet.PUBLISH {
    fixedHeader, err := packet.ToPublishFixedHeader(mqttReader)
    ...
    err = handler.HandlePublish(fixedHeader, r)
    ...
} else {
    fixedHeader, err := packet.ToFixedHeader(mqttReader)
    ...
    switch packetType {
    case packet.CONNECT:
        connack, err := handler.HandleConnect(fixedHeader, r)
        ...
}

ゴリゴリ変更。差分はこちら

今の実装だと

  • server.goでFixedHeaderを取得
  • handlerで
    • VariableHeaderとPayloadを取得
    • なんらかの処理
    • レスポンス生成

という流れになってる。readerから FixedHeaderVariableHeaderPalyload を保持するstruct(例えば Connect とか)を作成してパケットを表現した方が良さそう。

server.go

r := bufio.NewReader(conn)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
if err != nil {
    if err == io.EOF {
        return nil
    }
    return err
}
switch packetType {
case packet.PUBLISH:
    err = handler.HandlePublish(mqttReader)
    if err != nil {
        return err
    }
case packet.CONNECT:
    connack, err := handler.HandleConnect(mqttReader)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
case packet.DISCONNECT:
    return nil
}

例えば、CONNECTパケットなら以下のような実装をする。

// packet/connect.go
package packet

type Connect struct {
    FixedHeader    *FixedHeader
    VariableHeader *ConnectVariableHeader
    Payload        *ConnectPayload
}

func (reader *MQTTReader) ReadConnect() (*Connect, error) {
    fixedHeader, err := reader.readFixedHeader()
    if err != nil {
        return nil, err
    }
    variableHeader, err := reader.readConnectVariableHeader()
    if err != nil {
        return nil, err
    }
    payload, err := reader.readConnectPayload()
    if err != nil {
        return nil, err
    }
    return &Connect{fixedHeader, variableHeader, payload}, nil
}

// handler/connect_handle.go
package handler

func HandleConnect(reader *packet.MQTTReader) (packet.Connack, error) {
    fmt.Printf("HandleConnect\n")
    connect, err := reader.ReadConnect()
    if err != nil {
        if ce, ok := err.(packet.ConnectError); ok {
            return ce.Connack(), nil
        }
        return packet.Connack{}, err
    }

    // TODO variableHeaderとpayloadを使って何かしらの処理
    fmt.Printf("  %#v\n", connect.VariableHeader)
    fmt.Printf("  %#v\n", connect.Payload)

    return packet.NewConnackForAccepted(), nil
}

良い感じ、なんだけどテストで困った。

プライベートな関数のテスト

FixedHeaderを生成するのがパブリックな packet.ToPublishFixedHeader() からプライベートな readFixedHeader() というプライベートなメソッドに変更したので、 packet_test という別パッケージから参照することができず、テストできなくなってしまった。

プライベートな関数などのテストをするパターンがある。

まず、packet/export_test.goというファイルを用意して、テストしたいプライベートメソッドをパブリックにする。

package packet

var ExportReadPublishFixedHeader = (*MQTTReader).readPublishFixedHeader

structの実態がないメソッドを変数にセットして、あとからレシーバーを渡してあげる。こんなことができたんですねー、Method valuesというらしい。

あとは、fixed_header_test.goからパブリックに ExportReadPublishFixeder を呼ぶように書き換えてテストすれば良い。

-                       got, err := packet.ToPublishFixedHeader(tt.args.r)
+                       got, err := packet.ExportReadPublishFixedHeader(tt.args.r)

なるほどー。export_test.goという _test というファイル名なのでテストの時のみビルド対象に含まれるため、本番ビルドには影響がないのでパブリックにしても良いでしょう、と。

MQTTReader に実装を寄せていった差分はこちら

改めてSUBSCRIBEとSUBACK

SUBSCRIBEパケットの仕様。

  • 固定ヘッダー
    • PacketType 8
    • Reservedの部分は 0010 である。チェックして違ったら接続を切る
  • 可変ヘッダー
    • Packet Identifer 2byte
    • Packet Identiferは、PUBLISHパケットの可変ヘッダーにもあった。ただし、PUBLISHパケットは省略可能だったのに対して、SUBSCRIBEパケットでは必須
  • ペイロード
    • Packet FilterとQoSのペアのリスト
    • Packet Fileterとは、サブスクライブするトピックを指定する文字列のこと。ワイルドカードを指定することができる。今の実装ではワイルドカードはサポートしないことにする
    • ワイルドカードをサポートしない場合は、ワイルドカード文字を拒否しないといけない
    • PacketFilterとQoSのペアが1つもない場合は拒否しないといけない

ペイロードの仕様を読むと、1回のSUBSCRIBEパケットで複数のトピックをサブスクライブでき、なおかつそれぞれQoSの指定ができるようだ。今のところQoSのことは無視して実装してきてるので後で考えることにする。

実装したものはこちら

SUBSCRIBE, SUBACKを試してみる

よし、試すぞー。

実装してるサーバーを起動。

$ go run app/main.go`
  server starts at localhost:1883

SUBSCRIBEを送信。

$ mosquitto_sub -i custom-client-id -t hoge

Wiresharkで見てみる。

https://i.gyazo.com/0c25a55702a02e1f94d651bd3dc64402.png

ちゃんと動いてるー!!

これで以下の通信が実現できた。

  1. クライアント → CONNECT → サーバー
  2. クライアント ← CONNACK ← サーバー
  3. クライアント → SUBSCRIBE → サーバー
  4. クライアント ← SUBACK ← サーバー

PINGREQ, PINGRESP

次は以下の通信。

  1. クライアント → PINGREQ → サーバー
  2. クライアント ← PINGRESP ← サーバー

PINGREQ, PINGRESP共に固定ヘッダーのみ。可変ヘッダーとペイロードはなし。

実装したものはこちら

試してみる。SUBSCRIBEを送信。

$ mosquitto_sub -i custom-client-id -t hoge

その後、接続したまま放っておくと・・・

https://i.gyazo.com/1585f3d8b58d8cfd84d53fa4248652f7.png

できました!!

おしまい

クライアントがSubscribeするところまでできました。次回こそは、goroutineを使って複数クライアントを捌き、PublishされたメッセージをSubscribeしてるクライアントに送信する実装をします。

今回の学び。