MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof

前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。

目次。

複数クライアントからの接続

クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
Error: Unknown error.

すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error. が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。

具体的にコードで確認する。

// server.go
func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    fmt.Println("server starts at localhost:1883")

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        err = handle(conn)
        if err != nil {
            fmt.Printf("%v", err)
            panic(err)
        }
    }
}

func handle(conn net.Conn) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        ...
    }
}

最初のクライアントから接続が来ると、 Accept() が net.Conn を返してきてhandle内で処理する。handle内では for { } でループすることで、クライアントとやりとりする。

この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { } でループしているので、 Accept() が処理されない。そのため、1つのクライアントしか捌けない。

並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。

1つのクライアントを1つのgoroutineで取り扱うようにする。

--- a/mqtt/server.go
+++ b/mqtt/server.go
@@ -23,11 +23,13 @@ func Run() {
                        panic(err)
                }
 
-               err = handle(conn)
-               if err != nil {
-                       fmt.Printf("%v", err)
-                       panic(err)
-               }
+               go func(conn net.Conn) {
+                       err = handle(conn)
+                       if err != nil {
+                               fmt.Printf("%v", err)
+                               panic(err)
+                       }
+               }(conn)
        }
 }

goroutine

以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。

goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。

スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。


両者にはほかにも次のような細かい違いがあります。

・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない

Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。

こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。

pprof

goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。

pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。

pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。

pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof" してHTTPサーバーを起動するだけ。

 package main
 
-import "github.com/bati11/oreno-mqtt/mqtt"
+import (
+       "log"
+       "net/http"
+       _ "net/http/pprof"
+
+       "github.com/bati11/oreno-mqtt/mqtt"
+)
 
 func main() {
+       go func() {
+               log.Println(http.ListenAndServe("localhost:6060", nil))
+       }()
        mqtt.Run()
 }

http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!

さらに、Go1.10以降ではGUIでグラフを確認することもできる。

以下のコマンドを実行すると

$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1

ブラウザが立ち上がり、こんなグラフが確認できる!

おしまい

goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。

今回の学び

MQTTサーバーを実装しながらGoを学ぶ - その7 type switch, interface再考, プライベートな関数のテスト

前回で、クライアントからのPUBLISHを受け取るところまで実装できました。今回はSUBSCRIBEを受け取るところを実装します。いつも引用してる図で言うと、右側の「Computer」や「Mobile device」からMQTT Brokerへの通信です。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

サブスクライブしているクライアントにパブリッシュされたメッセージを送るところまでやりたかったのですが、Goのinterfaceに対する考え方 "Accept interfaces, Return structs" について考えていたらそこまでいきませんでした・・・。

目次。

SUBSCRIBE

SUBSCRIBEパケットを見てみる

実装に入る前に、mosquittoを使ってSUBSCRIBEパケットを眺めてみる。まずはWiresharkを起動。その後、mosquittoサーバーを起動。

$ /usr/local/sbin/mosquitto -v

mosquittoクライアントでサブスクライブする。-iで指定しているのは Client Identifier。 -t で指定しているのはトピック名。

$ mosquitto_sub -i sample-subscriber -t hoge

Wiresharkを見てみる。

https://i.gyazo.com/45685fa968124810406d4a52c0826e5d.png

ふむふむ。クライアントからサーバーへ Subscribe Requestが送られ、サーバーからクライアントへSubscribe Ackを応答してる。

しばらく放っておくと...pingしてる。

https://i.gyazo.com/bc3c0136e9868024ff86543b58b2bbea.png

実装する通信の流れ

以下の通信を実現できるようにMQTTサーバーを実装していく。

  1. クライアント → CONNECT → サーバー
  2. クライアント ← CONNACK ← サーバー
  3. クライアント → SUBSCRIBE → サーバー
  4. クライアント ← SUBACK ← サーバー

以下も実装する。

  1. クライアント → PINGREQ → サーバー
  2. クライアント ← PINGRESP ← サーバー

CONNECTとCONNACK

実装済み。

SUBSCRIBEとSUBACK

固定ヘッダーの仕様を勘違いしていたことに気がつく...

SUBSCRIBEパケットを実装する。

SUBSCRIBEパケットは固定ヘッダー、可変ヘッダー、ペイロードで構成される。

固定ヘッダーの仕様を読んでると、大きな勘違いをしてることに気がついた・・・。

Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection

固定ヘッダーの3,2,1,0ビットは予約されてる。以下のようになってるらしい。

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Control Packet type (8) Reserved
1 0 0 0 0 0 0 1
byte2... Remaining Length

ふむふむ。つまり、固定ヘッダーの Dup QoS Retain フィールドが定数なのかな、と思ったけれども・・・。

あれ?と思って改めて固定ヘッダーの仕様を見てみる。

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Control Packet type Flags specific to each MQTT Control Packet type
byte2... Remaining Length

3,2,1,0ビット目は Flags specific to each MQTT Control Packet type って書いてある!!!パケットタイプによって意味が変わるのか!

interfaceとtype switchesを使った修正方針

今の固定ヘッダーの実装は以下のようになっている。 Dup QoS Retain フィールドがあるので、これは PUBLISH パケット用の固定ヘッダーだ。 PUBLISH 用の固定ヘッダーを全てのパケットタイプで使うような実装をしていた・・・。

type FixedHeader struct {
    PacketType      byte
    Dup             byte
    QoS1            byte
    QoS2            byte
    Retain          byte
    RemainingLength uint
}

仕様を改めて読み直すと、 PUBLISH パケットのみ3,2,1,0ビットを使っており、他のパケットタイプは全てReservedであり実質使われていない。

修正方針としては以下のようにする。

  1. 既存のFixedHeaderをinterfaceにする
  2. そして、sturctを DefaultFixedHeaderPublishFixedHeader に分ける
  3. server.goでは以下のコードのようにtype switchesを使って分岐する
fixedHeader, err := packet.ToFixedHeader(r)
...
switch fh := fixedHeader.(type) {
case PublishFixedHeader:
    err := handler.HandlePublish(fh, r)
    ...
case packet.DefaultFixedHeader:
    switch fh.PacketType {
    case packet.CONNECT:
        ...
}

この書き方は、以前エラーハンドリングの部分でやったError Typeパターンと同じ考え方になる。

と、思ったけどちょっと考え直す。

"Accept interfaces, Return structs"

Goには "Accept interfaces, Return structs" という言葉がある。関数の引数をinterfaceにして返り値はstructにしよう、という考え方。

なぜか?

以下の記事のまとめにこう書いてある。

The primary reasons listed are:

  • Remove unneeded abstractions
  • Ambiguity of a user’s need on function input
  • Simplify function inputs

本文を読んで自分なりにまとめると

  • Goでは、interfaceを満たすために明示的にimplementsというような記述をする必要はない。メソッドのシグニチャさえ合っていれば適合するinterfaceとして扱うことができる。そのため、本当に必要になるまでinterfaceを使って抽象化した型を作らなくて良い。呼び出される関数側で抽象化が必要なければstructを返せばいい
  • 関数を利用する側の都合を全て把握するのは難しい。そのため引数で受け取るのはinterfaceで抽象化しておくと良い。そうすれば関数を利用する側はinterfaceを満たす色々なstructを渡せるようになる
  • 関数は利用しない引数を受け取るべきではない。同様に、利用しない振る舞い(メソッド)はいらないので、最小のinterfaceを受け取るようにするべき(インタフェース分離の法則)

さっき自分で立てた方針は異なっている。 packet.ToFixedHeader() がinterfaceを返し、 handler.HandlePublish() がstructを引数にとるという "Accept interfaces, Return structs" とは逆の構造。

fixedHeader, err := packet.ToFixedHeader(r)
...
switch fh := fixedHeader.(type) {
case PublishFixedHeader:
    err := handler.HandlePublish(fh, r)
    ...
case packet.DefaultFixedHeader:
    switch fh.PacketType {
    case packet.CONNECT:
        ...
}

先ほどの記事の最後にも書いてあるが、関数が複数の型を返す場合はinterfaceを返すしかないし、structの振る舞い(関数)ではなくフィールドの値が必要な場合は、structとして受け取る必要がある。そのため、現在の実装は "Accept interfaces, Return structs" に従っていないが、それでも良い。

しかし、関数が複数の型を返したいという理由でinterfaceを返して、呼び出し元がtype switchで処理を分岐する、というのがGoのinterfaceの使い方として微妙な気がしてきた。interfaceはあくまで振る舞いを定義するためのもので、型をまとめるためのものではないのでは(じゃあなぜtype switchという構文があるの?というのもあるけど)。

抽象化しない

もっと愚直に実装してみよう。

やりたいことはPacketTypeがPUBLISHかどうかで、FixedHeaderの構造を変えたいだけ。

packetType := PacketType(r)
if packetType == PUBLISH {
    publishFixedHeader := ToPublishFixedHeader(r)
    ...
} else {
    defaultFixedHeader := ToDefaultFixedHeader(r)
    ...
}

こんな感じのコードが書ければいい。ただ、packetTypeを取得するには1バイトreaderから読み取る必要があって、この1バイトにはPacketType以外の情報も含まれている。そのため、readerを1バイト分巻き戻すか、取得した1バイトをFixedHeaderを生成する関数に渡す必要がある。

bufio.Readerを含んだMQTTReaderという独自のstructを用意することにしよう。このstructに読み込んだ1バイトとreaderとを保持させる。

type MQTTReader struct {
    byte1 *byte
    r     *bufio.Reader
}

func NewMQTTReader(r io.Reader) *MQTTReader {
    bufr := bufio.NewReader(r)
    return &MQTTReader{r: bufr}
}

func (d *MQTTReader) ReadPacketType() (PacketType, error) {
    if d.byte1 == nil {
        byte1, err := d.r.ReadByte()
        if err != nil {
            return PacketType(0), err
        }
        d.byte1 = &byte1
    }
    return PacketType(*d.byte1 >> 4), nil
}

server.goでは以下のように分岐する。

r := bufio.NewReader(conn)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
...
if packetType == packet.PUBLISH {
    fixedHeader, err := packet.ToPublishFixedHeader(mqttReader)
    ...
    err = handler.HandlePublish(fixedHeader, r)
    ...
} else {
    fixedHeader, err := packet.ToFixedHeader(mqttReader)
    ...
    switch packetType {
    case packet.CONNECT:
        connack, err := handler.HandleConnect(fixedHeader, r)
        ...
}

ゴリゴリ変更。差分はこちら

今の実装だと

  • server.goでFixedHeaderを取得
  • handlerで
    • VariableHeaderとPayloadを取得
    • なんらかの処理
    • レスポンス生成

という流れになってる。readerから FixedHeaderVariableHeaderPalyload を保持するstruct(例えば Connect とか)を作成してパケットを表現した方が良さそう。

server.go

r := bufio.NewReader(conn)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
if err != nil {
    if err == io.EOF {
        return nil
    }
    return err
}
switch packetType {
case packet.PUBLISH:
    err = handler.HandlePublish(mqttReader)
    if err != nil {
        return err
    }
case packet.CONNECT:
    connack, err := handler.HandleConnect(mqttReader)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
case packet.DISCONNECT:
    return nil
}

例えば、CONNECTパケットなら以下のような実装をする。

// packet/connect.go
package packet

type Connect struct {
    FixedHeader    *FixedHeader
    VariableHeader *ConnectVariableHeader
    Payload        *ConnectPayload
}

func (reader *MQTTReader) ReadConnect() (*Connect, error) {
    fixedHeader, err := reader.readFixedHeader()
    if err != nil {
        return nil, err
    }
    variableHeader, err := reader.readConnectVariableHeader()
    if err != nil {
        return nil, err
    }
    payload, err := reader.readConnectPayload()
    if err != nil {
        return nil, err
    }
    return &Connect{fixedHeader, variableHeader, payload}, nil
}

// handler/connect_handle.go
package handler

func HandleConnect(reader *packet.MQTTReader) (packet.Connack, error) {
    fmt.Printf("HandleConnect\n")
    connect, err := reader.ReadConnect()
    if err != nil {
        if ce, ok := err.(packet.ConnectError); ok {
            return ce.Connack(), nil
        }
        return packet.Connack{}, err
    }

    // TODO variableHeaderとpayloadを使って何かしらの処理
    fmt.Printf("  %#v\n", connect.VariableHeader)
    fmt.Printf("  %#v\n", connect.Payload)

    return packet.NewConnackForAccepted(), nil
}

良い感じ、なんだけどテストで困った。

プライベートな関数のテスト

FixedHeaderを生成するのがパブリックな packet.ToPublishFixedHeader() からプライベートな readFixedHeader() というプライベートなメソッドに変更したので、 packet_test という別パッケージから参照することができず、テストできなくなってしまった。

プライベートな関数などのテストをするパターンがある。

まず、packet/export_test.goというファイルを用意して、テストしたいプライベートメソッドをパブリックにする。

package packet

var ExportReadPublishFixedHeader = (*MQTTReader).readPublishFixedHeader

structの実態がないメソッドを変数にセットして、あとからレシーバーを渡してあげる。こんなことができたんですねー、Method valuesというらしい。

あとは、fixed_header_test.goからパブリックに ExportReadPublishFixeder を呼ぶように書き換えてテストすれば良い。

-                       got, err := packet.ToPublishFixedHeader(tt.args.r)
+                       got, err := packet.ExportReadPublishFixedHeader(tt.args.r)

なるほどー。export_test.goという _test というファイル名なのでテストの時のみビルド対象に含まれるため、本番ビルドには影響がないのでパブリックにしても良いでしょう、と。

MQTTReader に実装を寄せていった差分はこちら

改めてSUBSCRIBEとSUBACK

SUBSCRIBEパケットの仕様。

  • 固定ヘッダー
    • PacketType 8
    • Reservedの部分は 0010 である。チェックして違ったら接続を切る
  • 可変ヘッダー
    • Packet Identifer 2byte
    • Packet Identiferは、PUBLISHパケットの可変ヘッダーにもあった。ただし、PUBLISHパケットは省略可能だったのに対して、SUBSCRIBEパケットでは必須
  • ペイロード
    • Packet FilterとQoSのペアのリスト
    • Packet Fileterとは、サブスクライブするトピックを指定する文字列のこと。ワイルドカードを指定することができる。今の実装ではワイルドカードはサポートしないことにする
    • ワイルドカードをサポートしない場合は、ワイルドカード文字を拒否しないといけない
    • PacketFilterとQoSのペアが1つもない場合は拒否しないといけない

ペイロードの仕様を読むと、1回のSUBSCRIBEパケットで複数のトピックをサブスクライブでき、なおかつそれぞれQoSの指定ができるようだ。今のところQoSのことは無視して実装してきてるので後で考えることにする。

実装したものはこちら

SUBSCRIBE, SUBACKを試してみる

よし、試すぞー。

実装してるサーバーを起動。

$ go run app/main.go`
  server starts at localhost:1883

SUBSCRIBEを送信。

$ mosquitto_sub -i custom-client-id -t hoge

Wiresharkで見てみる。

https://i.gyazo.com/0c25a55702a02e1f94d651bd3dc64402.png

ちゃんと動いてるー!!

これで以下の通信が実現できた。

  1. クライアント → CONNECT → サーバー
  2. クライアント ← CONNACK ← サーバー
  3. クライアント → SUBSCRIBE → サーバー
  4. クライアント ← SUBACK ← サーバー

PINGREQ, PINGRESP

次は以下の通信。

  1. クライアント → PINGREQ → サーバー
  2. クライアント ← PINGRESP ← サーバー

PINGREQ, PINGRESP共に固定ヘッダーのみ。可変ヘッダーとペイロードはなし。

実装したものはこちら

試してみる。SUBSCRIBEを送信。

$ mosquitto_sub -i custom-client-id -t hoge

その後、接続したまま放っておくと・・・

https://i.gyazo.com/1585f3d8b58d8cfd84d53fa4248652f7.png

できました!!

おしまい

クライアントがSubscribeするところまでできました。次回こそは、goroutineを使って複数クライアントを捌き、PublishされたメッセージをSubscribeしてるクライアントに送信する実装をします。

今回の学び。

MQTTサーバーを実装しながらGoを学ぶ - その6 const, iota

前回の続きです。handlerのエラーハンドリングからやります。その後、mosquitto_clientからPUBLISHパケットを自作サーバーで受け取れるようにしました。最後にhandlerのリファクタリングで Untyped constant declaration というconstの便利な使い方を知りました。

今回学ぶこと。

  • handlerのエラーハンドリング
  • PUBLISHパケットとDISCONNECTパケット
  • Goのconstとiota

handlerでのエラーハンドリング

handlerでのエラーハンドリングを実装する。前々回調べた通りでError TypeもしくはOpaque Patternを使う。

すでに handler → packet という依存関係ができているので、 packet パッケージにError Typeを作ることにする。 ConnectError というインタフェースを用意する。このインタフェースに Error() を持たせてError Typeとして、さらに Connack を取得するためのメソッドも追加する。

--- a/study/packet/connack.go
+++ b/study/packet/connack.go
@@ -28,29 +28,47 @@ func (c Connack) ToBytes() []byte {
        return result
 }
 
+func newConnack() Connack {
+       fixedHeader := FixedHeader{
+               PacketType:      2,
+               RemainingLength: 2,
+       }
+       variableHeader := ConnackVariableHeader{SessionPresent: false}
+       return Connack{fixedHeader, variableHeader}
+}
+
 func NewConnackForAccepted() Connack {
        result := newConnack()
        result.ReturnCode = 0
        return result
 }
 
-func NewConnackForRefusedByUnacceptableProtocolVersion() Connack {
-       result := newConnack()
-       result.ReturnCode = 1
-       return result
+type ConnectError interface {
+       Connack() Connack
+       Error() string
 }
 
-func NewConnackForRefusedByIdentifierRejected() Connack {
-       result := newConnack()
-       result.ReturnCode = 2
-       return result
+type connectError struct {
+       connack Connack
+       msg     string
 }
 
-func newConnack() Connack {
-       fixedHeader := FixedHeader{
-               PacketType:      2,
-               RemainingLength: 2,
-       }
-       variableHeader := ConnackVariableHeader{SessionPresent: false}
-       return Connack{fixedHeader, variableHeader}
+func (e connectError) Connack() Connack {
+       return e.connack
+}
+
+func (e connectError) Error() string {
+       return e.msg
+}
+
+func RefusedByUnacceptableProtocolVersion(s string) ConnectError {
+       connack := newConnack()
+       connack.ReturnCode = 1
+       return connectError{connack, s}
+}
+
+func RefusedByIdentifierRejected(s string) ConnectError {
+       connack := newConnack()
+       connack.ReturnCode = 2
+       return connectError{connack, s}
 }
--- a/study/packet/connect_payload.go
+++ b/study/packet/connect_payload.go
@@ -5,8 +5,6 @@ import (
        "encoding/binary"
        "io"
        "regexp"
-
-       "github.com/pkg/errors"
 )
 
 type ConnectPayload struct {
@@ -30,10 +28,10 @@ func ToConnectPayload(r *bufio.Reader) (ConnectPayload, error) {
        }
        clientID := string(clientIDBytes)
        if len(clientID) < 1 || len(clientID) > 23 {
-               return ConnectPayload{}, errors.New("ClientID length is invalid")
+               return ConnectPayload{}, RefusedByIdentifierRejected("ClientID length is invalid")
        }
        if !clientIDRegex.MatchString(clientID) {
-               return ConnectPayload{}, errors.New("clientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"")
+               return ConnectPayload{}, RefusedByIdentifierRejected("ClientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"")
        }
        return ConnectPayload{ClientID: clientID}, nil
 }
--- a/study/packet/connect_variable_header.go
+++ b/study/packet/connect_variable_header.go
@@ -30,11 +30,11 @@ func ToConnectVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (ConnectV
        protocolName := make([]byte, 6)
        _, err := io.ReadFull(r, protocolName)
        if err != nil || !isValidProtocolName(protocolName) {
-               return ConnectVariableHeader{}, errors.New("protocol name is invalid")
+               return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol name is invalid")
        }
        protocolLevel, err := r.ReadByte()
        if err != nil || protocolLevel != 4 {
-               return ConnectVariableHeader{}, errors.New("protocol level must be 4")
+               return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol level must be 4")
        }
 
        // TODO

handlerパッケージの変更。返された errorpacket.ConnectError だった場合は、 Connack を取得して返すように変更する。

--- a/study/handler/connect_handler.go
+++ b/study/handler/connect_handler.go
@@ -13,14 +13,18 @@ var variableHeaderLength = 10
 func HandleConnect(fixedHeader packet.FixedHeader, r *bufio.Reader) (packet.Connack, error) {
        variableHeader, err := packet.ToConnectVariableHeader(fixedHeader, r)
        if err != nil {
-               // TODO err応じたCONNACKを生成して返す
-               return packet.NewConnackForRefusedByUnacceptableProtocolVersion(), nil
+               if ce, ok := err.(packet.ConnectError); ok {
+                       return ce.Connack(), nil
+               }
+               return packet.Connack{}, err
        }

        payload, err := packet.ToConnectPayload(r)
        if err != nil {
-               // TODO err応じたCONNACKを生成して返す
-               return packet.NewConnackForRefusedByIdentifierRejected(), nil
+               if ce, ok := err.(packet.ConnectError); ok {
+                       return ce.Connack(), nil
+               }
+               return packet.Connack{}, err
        }

        // TODO variableHeaderとpayloadを使って何かしらの処理

PUBLISHパケットとDISCONNECTパケット

やりたかったことを思い出す。最初の回に書いたように、まず実現したいのは以下のフロー。

  1. クライアント → Connect Command → サーバー
  2. クライアント ← Connect Ack ← サーバー
  3. クライアント → Publish Message → サーバー
  4. クライアント → Disconnect Req → サーバー

ここまでで、1と2のCONNECT(Connect Command)とCONNACK(Connect Ack)はできた。

次は、PUBLISH(Publish Message)とDISSCONNECT(Disconnect Req)に取り掛かる。

PUBLISHパケット

PUBLISHパケットの可変ヘッダー

PUBLISHパケットの可変ヘッダーは以下の情報を持つ。

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039

  • Topic Name
  • Packet Identifier

クライアントはPUBLISHパケットの Topic Name でトピックを指定してメッセージをサーバー(MQTT Broker)に送る。サーバーは、(まだ未実装だけど)PUBLISHで指定されたトピックをサブスクライブしてるクライアントにメッセージを転送する。

例えば以下のようなイメージ。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

今はサブスクライブしてるクライアントのことは考えてないので、上の図でいうと左側の "temp" というトピックに対して "75°F" というメッセージをPUBLISHしてる部分を実装する。

Topic Name について以下のような記述がある。Topic Nameのワイルドカードというのは #+ の2文字。サブスクライブ時にワイルドカードを指定することで複数のTopicをサブスクライブすることができる。Publishの可変ヘッダーにはこれらの文字を含んではいけない。

The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters

Packet Identifier はQoS1かQoS2の場合に使う。今はQoS0固定で考えているので後回し。

可変ヘッダーを実装する。

package packet

import (
    "bufio"
    "fmt"
    "io"
    "strings"
)

type PublishVariableHeader struct {
    TopicName        string
    PacketIdentifier *uint16
}

func ToPublishVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (PublishVariableHeader, error) {
    if fixedHeader.PacketType != 3 {
        return PublishVariableHeader{}, fmt.Errorf("packet type is invalid. it got is %v", fixedHeader.PacketType)
    }

    _, err := r.ReadByte()
    if err != nil {
        return PublishVariableHeader{}, err
    }
    lengthLSB, err := r.ReadByte()
    if err != nil {
        return PublishVariableHeader{}, err
    }
    if lengthLSB == 0 {
        return PublishVariableHeader{}, fmt.Errorf("length LSB should be > 0")
    }
    topicNameBytes := make([]byte, lengthLSB)
    _, err = io.ReadFull(r, topicNameBytes)
    if err != nil {
        return PublishVariableHeader{}, err
    }
    topicName := string(topicNameBytes)
    if strings.ContainsAny(topicName, "# +") {
        return PublishVariableHeader{}, fmt.Errorf("topic name must not contain wildcard. it got is %v", topicName)
    }

    result := PublishVariableHeader{string(topicNameBytes), nil}
    return result, nil
}

PUBLISHパケットのペイロード

ペイロードは、サブスクライバーに対して送信するメッセージそのもの。 bufio.Reader をそのまま使うことにする。

PUBLISHパケットに対するレスポンス

QoS0の時はレスポンスなし。

DISCONNECTパケット

DISCONNECTパケットの仕様はこちら

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090

DISCONNECTパケットには可変ヘッダーもペイロードもない。

serverとhandler実装

前回のserver.goはCONNECTパケットしか想定していなかった。PUBLISHパケットとDISCONNECTパケットも想定した実装にする。

server.goの実装。 Accept() でクライアントからの接続を待つ。接続が来て net.Conn を取得できたら handle() 関数へ渡す。 handle() 関数内でループしてクライアントからのMQTTパケットを受け取り FixedHeader を生成。パケットタイプによるswitchで処理を分岐し、それぞれのパケットに対応したhandlerを呼ぶ。DISCONNECTパケットの場合はコネクションを切るだけなので return nil だけして defer で接続を切る。

package study

import (
    "bufio"
    "fmt"
    "io"
    "net"

    "github.com/bati11/oreno-mqtt/study/handler"
    "github.com/bati11/oreno-mqtt/study/packet"
)

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    fmt.Println("server starts at localhost:1883")

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        err = handle(conn)
        if err != nil {
            panic(err)
        }
    }
}

func handle(conn net.Conn) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        fixedHeader, err := packet.ToFixedHeader(r)
        if err != nil {
            if err == io.EOF {
                // クライアント側から既に切断してる場合
                return nil
            }
            return err
        }
        fmt.Printf("-----\n%+v\n", fixedHeader)

        switch fixedHeader.PacketType {
        // CONNECT
        case 1:
            connack, err := handler.HandleConnect(fixedHeader, r)
            if err != nil {
                return err
            }
            _, err = conn.Write(connack.ToBytes())
            if err != nil {
                return err
            }
        // PUBLISH
        case 3:
            err := handler.HandlePublish(fixedHeader, r)
            if err != nil {
                return err
            }
        // DISCONNECT
        case 14:
            return nil
        }
    }
}

HandleConnect() は前回から変更なし。

HandlePublish() は以下のように実装。

package handler

import (
    "bufio"
    "fmt"
    "io"

    "github.com/bati11/oreno-mqtt/study/packet"
)

func HandlePublish(fixedHeader packet.FixedHeader, r *bufio.Reader) error {
    fmt.Printf("  HandlePublish\n")
    variableHeader, err := packet.ToPublishVariableHeader(fixedHeader, r)
    if err != nil {
        return err
    }
    fmt.Printf("  %#v\n", variableHeader)

    payloadLength := fixedHeader.RemainingLength - variableHeader.Length
    payload := make([]byte, payloadLength)
    _, err = io.ReadFull(r, payload)
    if err != nil {
        return err
    }
    fmt.Printf("  Payload: %v\n", string(payload))

    // TODO QoS0なのでレスポンスなし
    return nil
}

mosquittoクライアントからメッセージを送ってみる

サーバーを起動する。

$ go run app/main.go 
server starts at localhost:1883

Wiresharkを起動しておく。

mosquittoクライアントからpublishする。

$ mosquitto_pub -t hoge -m "Hello"

結果を見てみる。

https://i.gyazo.com/38af18d974d97c8694564f9eb8f2320a.png

お、できてそう!サーバーの標準出力も確認。

-----
{PacketType:1 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:28}
HandleConnect
  packet.ConnectVariableHeader{ProtocolName:"MQTT", ProtocolLevel:0x4, ConnectFlags:packet.ConnectFlags{CleanSession:true, WillFlag:true, WillQoS:0x1, WillRetain:false, PasswordFlag:true, UserNameFlag:true}, KeepAlive:0xa}
  packet.ConnectPayload{ClientID:"custom-client-id"}
-----
{PacketType:3 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:11}
  HandlePublish
  packet.PublishVariableHeader{TopicName:"hoge", PacketIdentifier:(*uint16)(nil), Length:0x6}
  Payload: Hello

ちゃんとPUBLISHパケットを解釈できてる!

これで最初の目標の以下の流れが実現できた。

  1. クライアント → Connect Command → サーバー
  2. クライアント ← Connect Ack ← サーバー
  3. クライアント → Publish Message → サーバー
  4. クライアント → Disconnect Req → サーバー

const

ところで、server.goの fixedHeader.PacketType の値、マジックナンバーでswitchしてるところを分かりやすくしたい。

switch fixedHeader.PacketType {
// CONNECT
case 1:
    connack, err := handler.HandleConnect(fixedHeader, r)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
// PUBLISH
case 3:
    err := handler.HandlePublish(fixedHeader, r)
    if err != nil {
        return err
    }
// DISCONNECT
case 14:
    return nil
}

マジックナンバーをconstで置き換える。定数化することでコードが読みやすくなる。また、定数は実行時ではなくコンパイル時にコンパイラが最適化してくれて、パフォーマンスが良くなる場合もある。

fixed_header.goに定数を定義。

+const (
+       CONNECT    byte = 1
+       PUBLISH    byte = 3
+       DISCONNECT byte = 14
+)
+
 type FixedHeader struct {
        PacketType      byte
        Dup             byte

server.goのswitchは以下のようになる。

switch fixedHeader.PacketType {
case packet.CONNECT:
    connack, err := handler.HandleConnect(fixedHeader, r)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
case packet.PUBLISH:
    err := handler.HandlePublish(fixedHeader, r)
    if err != nil {
        return err
    }
case packet.DISCONNECT:
    return nil
}

Untyped constant declaration

定数の型を byte ではなく int にするとどうなるか?

const (
       CONNECT    int = 1
       PUBLISH    byte = 3
       DISCONNECT byte = 14
)

server.goのswitch-caseのところで以下のようなコンパイルエラーになる。fixedHeader.PacketType の型が byte なのに、 int 型の定数と比較してるためコンパイルエラー。

server.go:49:3: invalid case packet.CONNECT in switch on fixedHeader.PacketType (mismatched types int and byte)

では、定数の型を 書かない 場合はどうなるか?

const (
    CONNECT         = 1
    PUBLISH    byte = 3
    DISCONNECT byte = 14
)

これだとコンパイルエラーにならない。 しかも、先程と異なり、 int と比較してる箇所もコンパイルエラーにならない!

コンパイル時に適切な精度の型として埋め込んでくれるらしい。先ほどの記事ではこれを「 Untyped constant declaration 」と呼んでいる。

必要がない限り、型指定なしでconstを定義した方が良い。

const (
    CONNECT    = 1
    PUBLISH    = 3
    DISCONNECT = 14
)

iota

constsについて、Effective Goも読んでみると以下のように書いてある。

In Go, enumerated constants are created using the iota enumerator.

enumというとJavaの列挙型を思い出す。けど、それは一旦置いておいてGoでは iota という演算子を使うことで定数の定義を簡単かつ柔軟にできる。

PacketTypeの値の定数定義は iota を使って以下のように書ける。

const (
    _ = iota
    CONNECT
    CONNACK
    PUBLISH
    PUBACK
    PUBREC
    PUBREL
    PUBCOMP
    SUBSCRIBE
    SUBACK
    UNSUBSCRIBE
    UNSUBACK
    PINGREQ
    PINGRESP
    DISCONNECT
)

さらに、型を作ってメソッドを定義すれば、Enum(列挙型)のように使うこともできる。

type PacketType byte

const (
    _ PacketType = iota
    CONNECT
    CONNACK
    PUBLISH
    PUBACK
    PUBREC
    PUBREL
    PUBCOMP
    SUBSCRIBE
    SUBACK
    UNSUBSCRIBE
    UNSUBACK
    PINGREQ
    PINGRESP
    DISCONNECT
)

func (v PacketType) String() string {
    names := [...]string{
        "CONNECT",
        "CONNACK",
        "PUBLISH",
        "PUBACK",
        "PUBREC",
        "PUBREL",
        "PUBCOMP",
        "SUBSCRIBE",
        "SUBACK",
        "UNSUBSCRIBE",
        "UNSUBACK",
        "PINGREQ",
        "PINGRESP",
        "DISCONNECT"}
    if v < CONNECT || v > DISCONNECT {
        return "Unknown"
    }
    return names[v]
}

Enumについては以下の記事が詳しい。

どういう時にEnumが必要なのか?が書いてある。

Why do we need enums?

Grouping and expecting only some related values
Sharing common behavior
Avoids using invalid values
To increase the code readability and the maintainability

また、iotaを使うと定義する順番を間違えると値が変わってしまうため、値に意味があると問題になる場合もある。

いまのところ型や共通のメソッドは不要であるので、Enumはいらない気がする。値を定義する順番については、プロトコルで決まっている値で変更される頻度がほとんどないので気にしないでおく。結果、 iota を使って型指定なしのconstを定義する形にする。

const (
    _ = iota
    CONNECT
    CONNACK
    PUBLISH
    PUBACK
    PUBREC
    PUBREL
    PUBCOMP
    SUBSCRIBE
    SUBACK
    UNSUBSCRIBE
    UNSUBACK
    PINGREQ
    PINGRESP
    DISCONNECT
)

おしまい

ここまでで、MQTTクライアントからメッセージをPublishするところまでができました。次はいよいよ別のクライアントがSubscribeところに着手します。goroutineをどう使ってクライアントを管理するのかを考えていきます。

今回の学び。