MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof
前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。
目次。
複数クライアントからの接続
クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello" Error: Unknown error.
すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error.
が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。
具体的にコードで確認する。
// server.go func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } fmt.Println("server starts at localhost:1883") for { conn, err := ln.Accept() if err != nil { panic(err) } err = handle(conn) if err != nil { fmt.Printf("%v", err) panic(err) } } } func handle(conn net.Conn) error { defer conn.Close() for { r := bufio.NewReader(conn) ... } }
最初のクライアントから接続が来ると、 Accept()
が net.Conn を返してきてhandle内で処理する。handle内では for { }
でループすることで、クライアントとやりとりする。
この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { }
でループしているので、 Accept()
が処理されない。そのため、1つのクライアントしか捌けない。
並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。
1つのクライアントを1つのgoroutineで取り扱うようにする。
--- a/mqtt/server.go +++ b/mqtt/server.go @@ -23,11 +23,13 @@ func Run() { panic(err) } - err = handle(conn) - if err != nil { - fmt.Printf("%v", err) - panic(err) - } + go func(conn net.Conn) { + err = handle(conn) + if err != nil { + fmt.Printf("%v", err) + panic(err) + } + }(conn) } }
goroutine
以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。
goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。
スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。
両者にはほかにも次のような細かい違いがあります。
・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない
Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。
こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。
pprof
goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。
pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。
pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。
- Go pprof 入門編 (CPU Profile とコマンドラインツール) : KLabGames Tech Blog
- Go pprof 応用編 (CPU 以外のプロファイル) : KLabGames Tech Blog
- Go pprof マスター編 (pprof の仕組み) : KLabGames Tech Blog
pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof"
してHTTPサーバーを起動するだけ。
package main -import "github.com/bati11/oreno-mqtt/mqtt" +import ( + "log" + "net/http" + _ "net/http/pprof" + + "github.com/bati11/oreno-mqtt/mqtt" +) func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() mqtt.Run() }
http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!
さらに、Go1.10以降ではGUIでグラフを確認することもできる。
以下のコマンドを実行すると
$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1
ブラウザが立ち上がり、こんなグラフが確認できる!
おしまい
goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。
今回の学び
- goroutine
- pprof
MQTTサーバーを実装しながらGoを学ぶ - その7 type switch, interface再考, プライベートな関数のテスト
前回で、クライアントからのPUBLISHを受け取るところまで実装できました。今回はSUBSCRIBEを受け取るところを実装します。いつも引用してる図で言うと、右側の「Computer」や「Mobile device」からMQTT Brokerへの通信です。
引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
サブスクライブしているクライアントにパブリッシュされたメッセージを送るところまでやりたかったのですが、Goのinterfaceに対する考え方 "Accept interfaces, Return structs" について考えていたらそこまでいきませんでした・・・。
目次。
- SUBSCRIBE
- CONNECTとCONNACK
- SUBSCRIBEとSUBACK
- "Accept interfaces, Return structs"
- 改めてSUBSCRIBEとSUBACK
- SUBSCRIBE, SUBACKを試してみる
- PINGREQ, PINGRESP
- おしまい
SUBSCRIBE
SUBSCRIBEパケットを見てみる
実装に入る前に、mosquittoを使ってSUBSCRIBEパケットを眺めてみる。まずはWiresharkを起動。その後、mosquittoサーバーを起動。
$ /usr/local/sbin/mosquitto -v
mosquittoクライアントでサブスクライブする。-i
で指定しているのは Client Identifier。 -t
で指定しているのはトピック名。
$ mosquitto_sub -i sample-subscriber -t hoge
Wiresharkを見てみる。
ふむふむ。クライアントからサーバーへ Subscribe Requestが送られ、サーバーからクライアントへSubscribe Ackを応答してる。
しばらく放っておくと...pingしてる。
実装する通信の流れ
以下の通信を実現できるようにMQTTサーバーを実装していく。
- クライアント → CONNECT → サーバー
- クライアント ← CONNACK ← サーバー
- クライアント → SUBSCRIBE → サーバー
- クライアント ← SUBACK ← サーバー
以下も実装する。
- クライアント → PINGREQ → サーバー
- クライアント ← PINGRESP ← サーバー
CONNECTとCONNACK
実装済み。
SUBSCRIBEとSUBACK
固定ヘッダーの仕様を勘違いしていたことに気がつく...
SUBSCRIBEパケットを実装する。
SUBSCRIBEパケットは固定ヘッダー、可変ヘッダー、ペイロードで構成される。
固定ヘッダーの仕様を読んでると、大きな勘違いをしてることに気がついた・・・。
Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection
固定ヘッダーの3,2,1,0ビットは予約されてる。以下のようになってるらしい。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte1 | MQTT Control Packet type (8) | Reserved | ||||||
1 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | |
byte2... | Remaining Length |
ふむふむ。つまり、固定ヘッダーの Dup
QoS
Retain
フィールドが定数なのかな、と思ったけれども・・・。
あれ?と思って改めて固定ヘッダーの仕様を見てみる。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte1 | MQTT Control Packet type | Flags specific to each MQTT Control Packet type | ||||||
byte2... | Remaining Length |
3,2,1,0ビット目は Flags specific to each MQTT Control Packet type
って書いてある!!!パケットタイプによって意味が変わるのか!
interfaceとtype switchesを使った修正方針
今の固定ヘッダーの実装は以下のようになっている。 Dup
QoS
Retain
フィールドがあるので、これは PUBLISH
パケット用の固定ヘッダーだ。 PUBLISH
用の固定ヘッダーを全てのパケットタイプで使うような実装をしていた・・・。
type FixedHeader struct { PacketType byte Dup byte QoS1 byte QoS2 byte Retain byte RemainingLength uint }
仕様を改めて読み直すと、 PUBLISH
パケットのみ3,2,1,0ビットを使っており、他のパケットタイプは全てReservedであり実質使われていない。
修正方針としては以下のようにする。
- 既存のFixedHeaderをinterfaceにする
- そして、sturctを
DefaultFixedHeader
とPublishFixedHeader
に分ける - server.goでは以下のコードのようにtype switchesを使って分岐する
fixedHeader, err := packet.ToFixedHeader(r) ... switch fh := fixedHeader.(type) { case PublishFixedHeader: err := handler.HandlePublish(fh, r) ... case packet.DefaultFixedHeader: switch fh.PacketType { case packet.CONNECT: ... }
この書き方は、以前エラーハンドリングの部分でやったError Typeパターンと同じ考え方になる。
と、思ったけどちょっと考え直す。
"Accept interfaces, Return structs"
Goには "Accept interfaces, Return structs" という言葉がある。関数の引数をinterfaceにして返り値はstructにしよう、という考え方。
なぜか?
以下の記事のまとめにこう書いてある。
The primary reasons listed are:
- Remove unneeded abstractions
- Ambiguity of a user’s need on function input
- Simplify function inputs
本文を読んで自分なりにまとめると
- Goでは、interfaceを満たすために明示的にimplementsというような記述をする必要はない。メソッドのシグニチャさえ合っていれば適合するinterfaceとして扱うことができる。そのため、本当に必要になるまでinterfaceを使って抽象化した型を作らなくて良い。呼び出される関数側で抽象化が必要なければstructを返せばいい
- 関数を利用する側の都合を全て把握するのは難しい。そのため引数で受け取るのはinterfaceで抽象化しておくと良い。そうすれば関数を利用する側はinterfaceを満たす色々なstructを渡せるようになる
- 関数は利用しない引数を受け取るべきではない。同様に、利用しない振る舞い(メソッド)はいらないので、最小のinterfaceを受け取るようにするべき(インタフェース分離の法則)
さっき自分で立てた方針は異なっている。 packet.ToFixedHeader()
がinterfaceを返し、 handler.HandlePublish()
がstructを引数にとるという "Accept interfaces, Return structs" とは逆の構造。
fixedHeader, err := packet.ToFixedHeader(r) ... switch fh := fixedHeader.(type) { case PublishFixedHeader: err := handler.HandlePublish(fh, r) ... case packet.DefaultFixedHeader: switch fh.PacketType { case packet.CONNECT: ... }
先ほどの記事の最後にも書いてあるが、関数が複数の型を返す場合はinterfaceを返すしかないし、structの振る舞い(関数)ではなくフィールドの値が必要な場合は、structとして受け取る必要がある。そのため、現在の実装は "Accept interfaces, Return structs" に従っていないが、それでも良い。
しかし、関数が複数の型を返したいという理由でinterfaceを返して、呼び出し元がtype switchで処理を分岐する、というのがGoのinterfaceの使い方として微妙な気がしてきた。interfaceはあくまで振る舞いを定義するためのもので、型をまとめるためのものではないのでは(じゃあなぜtype switchという構文があるの?というのもあるけど)。
抽象化しない
もっと愚直に実装してみよう。
やりたいことはPacketTypeがPUBLISHかどうかで、FixedHeaderの構造を変えたいだけ。
packetType := PacketType(r) if packetType == PUBLISH { publishFixedHeader := ToPublishFixedHeader(r) ... } else { defaultFixedHeader := ToDefaultFixedHeader(r) ... }
こんな感じのコードが書ければいい。ただ、packetTypeを取得するには1バイトreaderから読み取る必要があって、この1バイトにはPacketType以外の情報も含まれている。そのため、readerを1バイト分巻き戻すか、取得した1バイトをFixedHeaderを生成する関数に渡す必要がある。
bufio.Readerを含んだMQTTReaderという独自のstructを用意することにしよう。このstructに読み込んだ1バイトとreaderとを保持させる。
type MQTTReader struct { byte1 *byte r *bufio.Reader } func NewMQTTReader(r io.Reader) *MQTTReader { bufr := bufio.NewReader(r) return &MQTTReader{r: bufr} } func (d *MQTTReader) ReadPacketType() (PacketType, error) { if d.byte1 == nil { byte1, err := d.r.ReadByte() if err != nil { return PacketType(0), err } d.byte1 = &byte1 } return PacketType(*d.byte1 >> 4), nil }
server.goでは以下のように分岐する。
r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() ... if packetType == packet.PUBLISH { fixedHeader, err := packet.ToPublishFixedHeader(mqttReader) ... err = handler.HandlePublish(fixedHeader, r) ... } else { fixedHeader, err := packet.ToFixedHeader(mqttReader) ... switch packetType { case packet.CONNECT: connack, err := handler.HandleConnect(fixedHeader, r) ... }
ゴリゴリ変更。差分はこちら
今の実装だと
- server.goでFixedHeaderを取得
- handlerで
- VariableHeaderとPayloadを取得
- なんらかの処理
- レスポンス生成
という流れになってる。readerから FixedHeader
と VariableHeader
と Palyload
を保持するstruct(例えば Connect
とか)を作成してパケットを表現した方が良さそう。
server.go
r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil }
例えば、CONNECTパケットなら以下のような実装をする。
// packet/connect.go package packet type Connect struct { FixedHeader *FixedHeader VariableHeader *ConnectVariableHeader Payload *ConnectPayload } func (reader *MQTTReader) ReadConnect() (*Connect, error) { fixedHeader, err := reader.readFixedHeader() if err != nil { return nil, err } variableHeader, err := reader.readConnectVariableHeader() if err != nil { return nil, err } payload, err := reader.readConnectPayload() if err != nil { return nil, err } return &Connect{fixedHeader, variableHeader, payload}, nil } // handler/connect_handle.go package handler func HandleConnect(reader *packet.MQTTReader) (packet.Connack, error) { fmt.Printf("HandleConnect\n") connect, err := reader.ReadConnect() if err != nil { if ce, ok := err.(packet.ConnectError); ok { return ce.Connack(), nil } return packet.Connack{}, err } // TODO variableHeaderとpayloadを使って何かしらの処理 fmt.Printf(" %#v\n", connect.VariableHeader) fmt.Printf(" %#v\n", connect.Payload) return packet.NewConnackForAccepted(), nil }
良い感じ、なんだけどテストで困った。
プライベートな関数のテスト
FixedHeaderを生成するのがパブリックな packet.ToPublishFixedHeader()
からプライベートな readFixedHeader()
というプライベートなメソッドに変更したので、 packet_test
という別パッケージから参照することができず、テストできなくなってしまった。
プライベートな関数などのテストをするパターンがある。
まず、packet/export_test.goというファイルを用意して、テストしたいプライベートメソッドをパブリックにする。
package packet var ExportReadPublishFixedHeader = (*MQTTReader).readPublishFixedHeader
structの実態がないメソッドを変数にセットして、あとからレシーバーを渡してあげる。こんなことができたんですねー、Method valuesというらしい。
あとは、fixed_header_test.goからパブリックに ExportReadPublishFixeder
を呼ぶように書き換えてテストすれば良い。
- got, err := packet.ToPublishFixedHeader(tt.args.r) + got, err := packet.ExportReadPublishFixedHeader(tt.args.r)
なるほどー。export_test.goという _test
というファイル名なのでテストの時のみビルド対象に含まれるため、本番ビルドには影響がないのでパブリックにしても良いでしょう、と。
MQTTReader
に実装を寄せていった差分はこちら
改めてSUBSCRIBEとSUBACK
SUBSCRIBEパケットの仕様。
- 固定ヘッダー
- PacketType
8
- Reservedの部分は 0010 である。チェックして違ったら接続を切る
- PacketType
- 可変ヘッダー
- Packet Identifer 2byte
- Packet Identiferは、PUBLISHパケットの可変ヘッダーにもあった。ただし、PUBLISHパケットは省略可能だったのに対して、SUBSCRIBEパケットでは必須
- ペイロード
ペイロードの仕様を読むと、1回のSUBSCRIBEパケットで複数のトピックをサブスクライブでき、なおかつそれぞれQoSの指定ができるようだ。今のところQoSのことは無視して実装してきてるので後で考えることにする。
実装したものはこちら
SUBSCRIBE, SUBACKを試してみる
よし、試すぞー。
実装してるサーバーを起動。
$ go run app/main.go` server starts at localhost:1883
SUBSCRIBEを送信。
$ mosquitto_sub -i custom-client-id -t hoge
Wiresharkで見てみる。
ちゃんと動いてるー!!
これで以下の通信が実現できた。
- クライアント → CONNECT → サーバー
- クライアント ← CONNACK ← サーバー
- クライアント → SUBSCRIBE → サーバー
- クライアント ← SUBACK ← サーバー
PINGREQ, PINGRESP
次は以下の通信。
- クライアント → PINGREQ → サーバー
- クライアント ← PINGRESP ← サーバー
PINGREQ, PINGRESP共に固定ヘッダーのみ。可変ヘッダーとペイロードはなし。
実装したものはこちら
試してみる。SUBSCRIBEを送信。
$ mosquitto_sub -i custom-client-id -t hoge
その後、接続したまま放っておくと・・・
できました!!
おしまい
クライアントがSubscribeするところまでできました。次回こそは、goroutineを使って複数クライアントを捌き、PublishされたメッセージをSubscribeしてるクライアントに送信する実装をします。
今回の学び。
- interfaceとtype switch
- "Accept interfaces, Return structs"
- プライベートな関数などのテスト
MQTTサーバーを実装しながらGoを学ぶ - その6 const, iota
前回の続きです。handlerのエラーハンドリングからやります。その後、mosquitto_clientからPUBLISHパケットを自作サーバーで受け取れるようにしました。最後にhandlerのリファクタリングで Untyped constant declaration というconstの便利な使い方を知りました。
今回学ぶこと。
- handlerのエラーハンドリング
- PUBLISHパケットとDISCONNECTパケット
- Goのconstとiota
handlerでのエラーハンドリング
handlerでのエラーハンドリングを実装する。前々回調べた通りでError TypeもしくはOpaque Patternを使う。
すでに handler → packet
という依存関係ができているので、 packet
パッケージにError Typeを作ることにする。 ConnectError
というインタフェースを用意する。このインタフェースに Error()
を持たせてError Typeとして、さらに Connack
を取得するためのメソッドも追加する。
--- a/study/packet/connack.go +++ b/study/packet/connack.go @@ -28,29 +28,47 @@ func (c Connack) ToBytes() []byte { return result } +func newConnack() Connack { + fixedHeader := FixedHeader{ + PacketType: 2, + RemainingLength: 2, + } + variableHeader := ConnackVariableHeader{SessionPresent: false} + return Connack{fixedHeader, variableHeader} +} + func NewConnackForAccepted() Connack { result := newConnack() result.ReturnCode = 0 return result } -func NewConnackForRefusedByUnacceptableProtocolVersion() Connack { - result := newConnack() - result.ReturnCode = 1 - return result +type ConnectError interface { + Connack() Connack + Error() string } -func NewConnackForRefusedByIdentifierRejected() Connack { - result := newConnack() - result.ReturnCode = 2 - return result +type connectError struct { + connack Connack + msg string } -func newConnack() Connack { - fixedHeader := FixedHeader{ - PacketType: 2, - RemainingLength: 2, - } - variableHeader := ConnackVariableHeader{SessionPresent: false} - return Connack{fixedHeader, variableHeader} +func (e connectError) Connack() Connack { + return e.connack +} + +func (e connectError) Error() string { + return e.msg +} + +func RefusedByUnacceptableProtocolVersion(s string) ConnectError { + connack := newConnack() + connack.ReturnCode = 1 + return connectError{connack, s} +} + +func RefusedByIdentifierRejected(s string) ConnectError { + connack := newConnack() + connack.ReturnCode = 2 + return connectError{connack, s} }
--- a/study/packet/connect_payload.go +++ b/study/packet/connect_payload.go @@ -5,8 +5,6 @@ import ( "encoding/binary" "io" "regexp" - - "github.com/pkg/errors" ) type ConnectPayload struct { @@ -30,10 +28,10 @@ func ToConnectPayload(r *bufio.Reader) (ConnectPayload, error) { } clientID := string(clientIDBytes) if len(clientID) < 1 || len(clientID) > 23 { - return ConnectPayload{}, errors.New("ClientID length is invalid") + return ConnectPayload{}, RefusedByIdentifierRejected("ClientID length is invalid") } if !clientIDRegex.MatchString(clientID) { - return ConnectPayload{}, errors.New("clientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"") + return ConnectPayload{}, RefusedByIdentifierRejected("ClientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"") } return ConnectPayload{ClientID: clientID}, nil }
--- a/study/packet/connect_variable_header.go +++ b/study/packet/connect_variable_header.go @@ -30,11 +30,11 @@ func ToConnectVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (ConnectV protocolName := make([]byte, 6) _, err := io.ReadFull(r, protocolName) if err != nil || !isValidProtocolName(protocolName) { - return ConnectVariableHeader{}, errors.New("protocol name is invalid") + return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol name is invalid") } protocolLevel, err := r.ReadByte() if err != nil || protocolLevel != 4 { - return ConnectVariableHeader{}, errors.New("protocol level must be 4") + return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol level must be 4") } // TODO
handlerパッケージの変更。返された error
が packet.ConnectError
だった場合は、 Connack
を取得して返すように変更する。
--- a/study/handler/connect_handler.go +++ b/study/handler/connect_handler.go @@ -13,14 +13,18 @@ var variableHeaderLength = 10 func HandleConnect(fixedHeader packet.FixedHeader, r *bufio.Reader) (packet.Connack, error) { variableHeader, err := packet.ToConnectVariableHeader(fixedHeader, r) if err != nil { - // TODO err応じたCONNACKを生成して返す - return packet.NewConnackForRefusedByUnacceptableProtocolVersion(), nil + if ce, ok := err.(packet.ConnectError); ok { + return ce.Connack(), nil + } + return packet.Connack{}, err } payload, err := packet.ToConnectPayload(r) if err != nil { - // TODO err応じたCONNACKを生成して返す - return packet.NewConnackForRefusedByIdentifierRejected(), nil + if ce, ok := err.(packet.ConnectError); ok { + return ce.Connack(), nil + } + return packet.Connack{}, err } // TODO variableHeaderとpayloadを使って何かしらの処理
PUBLISHパケットとDISCONNECTパケット
やりたかったことを思い出す。最初の回に書いたように、まず実現したいのは以下のフロー。
クライアント → Connect Command → サーバー
クライアント ← Connect Ack ← サーバー
クライアント → Publish Message → サーバー
クライアント → Disconnect Req → サーバー
ここまでで、1と2のCONNECT(Connect Command)とCONNACK(Connect Ack)はできた。
次は、PUBLISH(Publish Message)とDISSCONNECT(Disconnect Req)に取り掛かる。
- PUBLISH
- DISCONNECT
PUBLISHパケット
PUBLISHパケットの可変ヘッダー
PUBLISHパケットの可変ヘッダーは以下の情報を持つ。
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039
- Topic Name
- Packet Identifier
クライアントはPUBLISHパケットの Topic Name
でトピックを指定してメッセージをサーバー(MQTT Broker)に送る。サーバーは、(まだ未実装だけど)PUBLISHで指定されたトピックをサブスクライブしてるクライアントにメッセージを転送する。
例えば以下のようなイメージ。
引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
今はサブスクライブしてるクライアントのことは考えてないので、上の図でいうと左側の "temp" というトピックに対して "75°F" というメッセージをPUBLISHしてる部分を実装する。
Topic Name
について以下のような記述がある。Topic Nameのワイルドカードというのは #
と +
の2文字。サブスクライブ時にワイルドカードを指定することで複数のTopicをサブスクライブすることができる。Publishの可変ヘッダーにはこれらの文字を含んではいけない。
The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters
Packet Identifier
はQoS1かQoS2の場合に使う。今はQoS0固定で考えているので後回し。
可変ヘッダーを実装する。
package packet import ( "bufio" "fmt" "io" "strings" ) type PublishVariableHeader struct { TopicName string PacketIdentifier *uint16 } func ToPublishVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (PublishVariableHeader, error) { if fixedHeader.PacketType != 3 { return PublishVariableHeader{}, fmt.Errorf("packet type is invalid. it got is %v", fixedHeader.PacketType) } _, err := r.ReadByte() if err != nil { return PublishVariableHeader{}, err } lengthLSB, err := r.ReadByte() if err != nil { return PublishVariableHeader{}, err } if lengthLSB == 0 { return PublishVariableHeader{}, fmt.Errorf("length LSB should be > 0") } topicNameBytes := make([]byte, lengthLSB) _, err = io.ReadFull(r, topicNameBytes) if err != nil { return PublishVariableHeader{}, err } topicName := string(topicNameBytes) if strings.ContainsAny(topicName, "# +") { return PublishVariableHeader{}, fmt.Errorf("topic name must not contain wildcard. it got is %v", topicName) } result := PublishVariableHeader{string(topicNameBytes), nil} return result, nil }
PUBLISHパケットのペイロード
ペイロードは、サブスクライバーに対して送信するメッセージそのもの。 bufio.Reader
をそのまま使うことにする。
PUBLISHパケットに対するレスポンス
QoS0の時はレスポンスなし。
DISCONNECTパケット
DISCONNECTパケットの仕様はこちら
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090
DISCONNECTパケットには可変ヘッダーもペイロードもない。
serverとhandler実装
前回のserver.goはCONNECTパケットしか想定していなかった。PUBLISHパケットとDISCONNECTパケットも想定した実装にする。
server.goの実装。 Accept()
でクライアントからの接続を待つ。接続が来て net.Conn
を取得できたら handle()
関数へ渡す。 handle()
関数内でループしてクライアントからのMQTTパケットを受け取り FixedHeader
を生成。パケットタイプによるswitchで処理を分岐し、それぞれのパケットに対応したhandlerを呼ぶ。DISCONNECTパケットの場合はコネクションを切るだけなので return nil
だけして defer
で接続を切る。
package study import ( "bufio" "fmt" "io" "net" "github.com/bati11/oreno-mqtt/study/handler" "github.com/bati11/oreno-mqtt/study/packet" ) func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } fmt.Println("server starts at localhost:1883") for { conn, err := ln.Accept() if err != nil { panic(err) } err = handle(conn) if err != nil { panic(err) } } } func handle(conn net.Conn) error { defer conn.Close() for { r := bufio.NewReader(conn) fixedHeader, err := packet.ToFixedHeader(r) if err != nil { if err == io.EOF { // クライアント側から既に切断してる場合 return nil } return err } fmt.Printf("-----\n%+v\n", fixedHeader) switch fixedHeader.PacketType { // CONNECT case 1: connack, err := handler.HandleConnect(fixedHeader, r) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } // PUBLISH case 3: err := handler.HandlePublish(fixedHeader, r) if err != nil { return err } // DISCONNECT case 14: return nil } } }
HandleConnect()
は前回から変更なし。
HandlePublish()
は以下のように実装。
package handler import ( "bufio" "fmt" "io" "github.com/bati11/oreno-mqtt/study/packet" ) func HandlePublish(fixedHeader packet.FixedHeader, r *bufio.Reader) error { fmt.Printf(" HandlePublish\n") variableHeader, err := packet.ToPublishVariableHeader(fixedHeader, r) if err != nil { return err } fmt.Printf(" %#v\n", variableHeader) payloadLength := fixedHeader.RemainingLength - variableHeader.Length payload := make([]byte, payloadLength) _, err = io.ReadFull(r, payload) if err != nil { return err } fmt.Printf(" Payload: %v\n", string(payload)) // TODO QoS0なのでレスポンスなし return nil }
mosquittoクライアントからメッセージを送ってみる
サーバーを起動する。
$ go run app/main.go server starts at localhost:1883
Wiresharkを起動しておく。
mosquittoクライアントからpublishする。
$ mosquitto_pub -t hoge -m "Hello"
結果を見てみる。
お、できてそう!サーバーの標準出力も確認。
----- {PacketType:1 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:28} HandleConnect packet.ConnectVariableHeader{ProtocolName:"MQTT", ProtocolLevel:0x4, ConnectFlags:packet.ConnectFlags{CleanSession:true, WillFlag:true, WillQoS:0x1, WillRetain:false, PasswordFlag:true, UserNameFlag:true}, KeepAlive:0xa} packet.ConnectPayload{ClientID:"custom-client-id"} ----- {PacketType:3 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:11} HandlePublish packet.PublishVariableHeader{TopicName:"hoge", PacketIdentifier:(*uint16)(nil), Length:0x6} Payload: Hello
ちゃんとPUBLISHパケットを解釈できてる!
これで最初の目標の以下の流れが実現できた。
クライアント → Connect Command → サーバー
クライアント ← Connect Ack ← サーバー
クライアント → Publish Message → サーバー
クライアント → Disconnect Req → サーバー
const
ところで、server.goの fixedHeader.PacketType
の値、マジックナンバーでswitchしてるところを分かりやすくしたい。
switch fixedHeader.PacketType { // CONNECT case 1: connack, err := handler.HandleConnect(fixedHeader, r) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } // PUBLISH case 3: err := handler.HandlePublish(fixedHeader, r) if err != nil { return err } // DISCONNECT case 14: return nil }
マジックナンバーをconstで置き換える。定数化することでコードが読みやすくなる。また、定数は実行時ではなくコンパイル時にコンパイラが最適化してくれて、パフォーマンスが良くなる場合もある。
fixed_header.goに定数を定義。
+const ( + CONNECT byte = 1 + PUBLISH byte = 3 + DISCONNECT byte = 14 +) + type FixedHeader struct { PacketType byte Dup byte
server.goのswitchは以下のようになる。
switch fixedHeader.PacketType { case packet.CONNECT: connack, err := handler.HandleConnect(fixedHeader, r) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.PUBLISH: err := handler.HandlePublish(fixedHeader, r) if err != nil { return err } case packet.DISCONNECT: return nil }
Untyped constant declaration
定数の型を byte
ではなく int
にするとどうなるか?
const ( CONNECT int = 1 PUBLISH byte = 3 DISCONNECT byte = 14 )
server.goのswitch-caseのところで以下のようなコンパイルエラーになる。fixedHeader.PacketType
の型が byte
なのに、 int
型の定数と比較してるためコンパイルエラー。
server.go:49:3: invalid case packet.CONNECT in switch on fixedHeader.PacketType (mismatched types int and byte)
では、定数の型を 書かない 場合はどうなるか?
const ( CONNECT = 1 PUBLISH byte = 3 DISCONNECT byte = 14 )
これだとコンパイルエラーにならない。 しかも、先程と異なり、 int
と比較してる箇所もコンパイルエラーにならない!
コンパイル時に適切な精度の型として埋め込んでくれるらしい。先ほどの記事ではこれを「 Untyped constant declaration 」と呼んでいる。
必要がない限り、型指定なしでconstを定義した方が良い。
const ( CONNECT = 1 PUBLISH = 3 DISCONNECT = 14 )
iota
constsについて、Effective Goも読んでみると以下のように書いてある。
「 In Go, enumerated constants are created using the iota enumerator. 」
enumというとJavaの列挙型を思い出す。けど、それは一旦置いておいてGoでは iota
という演算子を使うことで定数の定義を簡単かつ柔軟にできる。
PacketTypeの値の定数定義は iota
を使って以下のように書ける。
const ( _ = iota CONNECT CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT )
さらに、型を作ってメソッドを定義すれば、Enum(列挙型)のように使うこともできる。
type PacketType byte const ( _ PacketType = iota CONNECT CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT ) func (v PacketType) String() string { names := [...]string{ "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT"} if v < CONNECT || v > DISCONNECT { return "Unknown" } return names[v] }
Enumについては以下の記事が詳しい。
どういう時にEnumが必要なのか?が書いてある。
Why do we need enums?
Grouping and expecting only some related values Sharing common behavior Avoids using invalid values To increase the code readability and the maintainability
また、iotaを使うと定義する順番を間違えると値が変わってしまうため、値に意味があると問題になる場合もある。
いまのところ型や共通のメソッドは不要であるので、Enumはいらない気がする。値を定義する順番については、プロトコルで決まっている値で変更される頻度がほとんどないので気にしないでおく。結果、 iota
を使って型指定なしのconstを定義する形にする。
const ( _ = iota CONNECT CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT )
おしまい
ここまでで、MQTTクライアントからメッセージをPublishするところまでができました。次はいよいよ別のクライアントがSubscribeところに着手します。goroutineをどう使ってクライアントを管理するのかを考えていきます。
今回の学び。
- handlerのエラーハンドリング
- 前々回の記事
- PUBLISHパケットとDISCONNECTパケット
- Goのconstとiota