MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect
前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。
目次。
サーバーからクライアントへのPUBLISH
MQTTのPUBLISHの仕様を読み直す。
The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.
クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。
サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。
The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.
SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。
MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。
引用元: https://cloud.google.com/pubsub/docs/overview
いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。
引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。
channel
今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。
Broker
という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte
と表現して、サブスクライバを chan<- []byte
と表現する。channelからの値の取り出しには range
を使う。 range
はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。
func Broker(fromPub <-chan []byte, toSub chan<- []byte) { defer close(toSub) for message := range fromPub { toSub <- message } }
channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。
チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。
channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。
どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。
今の実装では、 Broker
関数は toSub
に書き込むので所有権を持つことになる。所有権があるので、 defer
で close
するようにしてる。
所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。
- nilに対してclose
- closeしたchannelをclose
この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。
channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <-
で読み取る時は2つ目の返り値の値を確認するか、 range
で読み込むようにする。 <-
で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。
また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。
channelの所有者を意識しながら、 Broker
を使うコードを書いてみる。
package mqtt_test import ( "testing" "github.com/bati11/oreno-mqtt/mqtt" ) func TestBroker(t *testing.T) { pub := make(chan []byte) sub := make(chan []byte) // "broker" goroutine go mqtt.Broker(pub, sub) // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
"broker" goroutineは sub
channelに対して書き込みと close
を行うので sub
channelの所有者であり、 pub
channelの利用者である。
"pub" goroutineは pub
channelの所有者である。
"sub"以降の処理(main goroutine)は、 sub
channelの利用者である。値を読み込むのは1回だけなので range
ではなく <-
で読み取りをしている。
channelは値
Broker
を実行するときに sub
channelを渡している。 sub
channel、つまり Broker
が引数で受け取ってる chan<- []byte
は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。
しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker
goroutine生成時ではなく、後からgoroutineに chan<- []byte
を渡したい。
Broker
の引数を chan<- []byte
のchannel、つまり <-chan chan<- []byte
とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。
channelのchannelを使うとgoroutine起動後に chan<- []byte
を受け取ることができる。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { for sub := range subscriptions { ...
Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプション( chan<- []byte
)をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } ... }
fromPub
channelからメッセージを読み取ったら、サブスクリプションの配列を range
を使って順にメッセージを送る。以下のようなイメージ。channelに対する range
と配列に対する range
がごちゃ混ぜにならないように注意。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } for message := range fromPub { // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } }
しかし、これだと subscriptions
channelが閉じられるまで1つ目の for
が終わらないのでダメ。
select
複数のチャネルから読み取りたいときは、select
を使う。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
TestBroker
を書き換えてみる。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE sub := make(chan []byte) subscriptions <- sub // 新たなサブスクリプションをBrokerに送る // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
これでOK!複数クライアントがSubscribeしても問題ないことを確認。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }
sync.WaitGroup
ところで、"sub"以降の処理で、 sub1
, sub2
という順番にchannelから読み込んでるが、 Broker
内でサブスクリプション( chan<- []byte
)にメッセージを流す順番に依存してる。仮に sub2
, sub1
と順番を逆にして読み取るとブロックしてテストが終了しない。
sub1
と sub2
を select
で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。
sub1
, sub2
からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker
自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup
を使ってそれぞれのgoroutineの終了を待つ。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } }() wg.Add(1) go func() { defer wg.Done() result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }() wg.Wait() }
これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。
おしまい
MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。
でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。
今回の学び
- MQTTのサブスクリプション
- channel
- select
- sync.WaitGroup
MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof
前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。
目次。
複数クライアントからの接続
クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello" Error: Unknown error.
すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error.
が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。
具体的にコードで確認する。
// server.go func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } fmt.Println("server starts at localhost:1883") for { conn, err := ln.Accept() if err != nil { panic(err) } err = handle(conn) if err != nil { fmt.Printf("%v", err) panic(err) } } } func handle(conn net.Conn) error { defer conn.Close() for { r := bufio.NewReader(conn) ... } }
最初のクライアントから接続が来ると、 Accept()
が net.Conn を返してきてhandle内で処理する。handle内では for { }
でループすることで、クライアントとやりとりする。
この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { }
でループしているので、 Accept()
が処理されない。そのため、1つのクライアントしか捌けない。
並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。
1つのクライアントを1つのgoroutineで取り扱うようにする。
--- a/mqtt/server.go +++ b/mqtt/server.go @@ -23,11 +23,13 @@ func Run() { panic(err) } - err = handle(conn) - if err != nil { - fmt.Printf("%v", err) - panic(err) - } + go func(conn net.Conn) { + err = handle(conn) + if err != nil { + fmt.Printf("%v", err) + panic(err) + } + }(conn) } }
goroutine
以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。
goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。
スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。
両者にはほかにも次のような細かい違いがあります。
・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない
Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。
こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。
pprof
goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。
pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。
pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。
- Go pprof 入門編 (CPU Profile とコマンドラインツール) : KLabGames Tech Blog
- Go pprof 応用編 (CPU 以外のプロファイル) : KLabGames Tech Blog
- Go pprof マスター編 (pprof の仕組み) : KLabGames Tech Blog
pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof"
してHTTPサーバーを起動するだけ。
package main -import "github.com/bati11/oreno-mqtt/mqtt" +import ( + "log" + "net/http" + _ "net/http/pprof" + + "github.com/bati11/oreno-mqtt/mqtt" +) func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() mqtt.Run() }
http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!
さらに、Go1.10以降ではGUIでグラフを確認することもできる。
以下のコマンドを実行すると
$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1
ブラウザが立ち上がり、こんなグラフが確認できる!
おしまい
goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。
今回の学び
- goroutine
- pprof
MQTTサーバーを実装しながらGoを学ぶ - その7 type switch, interface再考, プライベートな関数のテスト
前回で、クライアントからのPUBLISHを受け取るところまで実装できました。今回はSUBSCRIBEを受け取るところを実装します。いつも引用してる図で言うと、右側の「Computer」や「Mobile device」からMQTT Brokerへの通信です。
引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
サブスクライブしているクライアントにパブリッシュされたメッセージを送るところまでやりたかったのですが、Goのinterfaceに対する考え方 "Accept interfaces, Return structs" について考えていたらそこまでいきませんでした・・・。
目次。
- SUBSCRIBE
- CONNECTとCONNACK
- SUBSCRIBEとSUBACK
- "Accept interfaces, Return structs"
- 改めてSUBSCRIBEとSUBACK
- SUBSCRIBE, SUBACKを試してみる
- PINGREQ, PINGRESP
- おしまい
SUBSCRIBE
SUBSCRIBEパケットを見てみる
実装に入る前に、mosquittoを使ってSUBSCRIBEパケットを眺めてみる。まずはWiresharkを起動。その後、mosquittoサーバーを起動。
$ /usr/local/sbin/mosquitto -v
mosquittoクライアントでサブスクライブする。-i
で指定しているのは Client Identifier。 -t
で指定しているのはトピック名。
$ mosquitto_sub -i sample-subscriber -t hoge
Wiresharkを見てみる。
ふむふむ。クライアントからサーバーへ Subscribe Requestが送られ、サーバーからクライアントへSubscribe Ackを応答してる。
しばらく放っておくと...pingしてる。
実装する通信の流れ
以下の通信を実現できるようにMQTTサーバーを実装していく。
- クライアント → CONNECT → サーバー
- クライアント ← CONNACK ← サーバー
- クライアント → SUBSCRIBE → サーバー
- クライアント ← SUBACK ← サーバー
以下も実装する。
- クライアント → PINGREQ → サーバー
- クライアント ← PINGRESP ← サーバー
CONNECTとCONNACK
実装済み。
SUBSCRIBEとSUBACK
固定ヘッダーの仕様を勘違いしていたことに気がつく...
SUBSCRIBEパケットを実装する。
SUBSCRIBEパケットは固定ヘッダー、可変ヘッダー、ペイロードで構成される。
固定ヘッダーの仕様を読んでると、大きな勘違いをしてることに気がついた・・・。
Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection
固定ヘッダーの3,2,1,0ビットは予約されてる。以下のようになってるらしい。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte1 | MQTT Control Packet type (8) | Reserved | ||||||
1 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | |
byte2... | Remaining Length |
ふむふむ。つまり、固定ヘッダーの Dup
QoS
Retain
フィールドが定数なのかな、と思ったけれども・・・。
あれ?と思って改めて固定ヘッダーの仕様を見てみる。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte1 | MQTT Control Packet type | Flags specific to each MQTT Control Packet type | ||||||
byte2... | Remaining Length |
3,2,1,0ビット目は Flags specific to each MQTT Control Packet type
って書いてある!!!パケットタイプによって意味が変わるのか!
interfaceとtype switchesを使った修正方針
今の固定ヘッダーの実装は以下のようになっている。 Dup
QoS
Retain
フィールドがあるので、これは PUBLISH
パケット用の固定ヘッダーだ。 PUBLISH
用の固定ヘッダーを全てのパケットタイプで使うような実装をしていた・・・。
type FixedHeader struct { PacketType byte Dup byte QoS1 byte QoS2 byte Retain byte RemainingLength uint }
仕様を改めて読み直すと、 PUBLISH
パケットのみ3,2,1,0ビットを使っており、他のパケットタイプは全てReservedであり実質使われていない。
修正方針としては以下のようにする。
- 既存のFixedHeaderをinterfaceにする
- そして、sturctを
DefaultFixedHeader
とPublishFixedHeader
に分ける - server.goでは以下のコードのようにtype switchesを使って分岐する
fixedHeader, err := packet.ToFixedHeader(r) ... switch fh := fixedHeader.(type) { case PublishFixedHeader: err := handler.HandlePublish(fh, r) ... case packet.DefaultFixedHeader: switch fh.PacketType { case packet.CONNECT: ... }
この書き方は、以前エラーハンドリングの部分でやったError Typeパターンと同じ考え方になる。
と、思ったけどちょっと考え直す。
"Accept interfaces, Return structs"
Goには "Accept interfaces, Return structs" という言葉がある。関数の引数をinterfaceにして返り値はstructにしよう、という考え方。
なぜか?
以下の記事のまとめにこう書いてある。
The primary reasons listed are:
- Remove unneeded abstractions
- Ambiguity of a user’s need on function input
- Simplify function inputs
本文を読んで自分なりにまとめると
- Goでは、interfaceを満たすために明示的にimplementsというような記述をする必要はない。メソッドのシグニチャさえ合っていれば適合するinterfaceとして扱うことができる。そのため、本当に必要になるまでinterfaceを使って抽象化した型を作らなくて良い。呼び出される関数側で抽象化が必要なければstructを返せばいい
- 関数を利用する側の都合を全て把握するのは難しい。そのため引数で受け取るのはinterfaceで抽象化しておくと良い。そうすれば関数を利用する側はinterfaceを満たす色々なstructを渡せるようになる
- 関数は利用しない引数を受け取るべきではない。同様に、利用しない振る舞い(メソッド)はいらないので、最小のinterfaceを受け取るようにするべき(インタフェース分離の法則)
さっき自分で立てた方針は異なっている。 packet.ToFixedHeader()
がinterfaceを返し、 handler.HandlePublish()
がstructを引数にとるという "Accept interfaces, Return structs" とは逆の構造。
fixedHeader, err := packet.ToFixedHeader(r) ... switch fh := fixedHeader.(type) { case PublishFixedHeader: err := handler.HandlePublish(fh, r) ... case packet.DefaultFixedHeader: switch fh.PacketType { case packet.CONNECT: ... }
先ほどの記事の最後にも書いてあるが、関数が複数の型を返す場合はinterfaceを返すしかないし、structの振る舞い(関数)ではなくフィールドの値が必要な場合は、structとして受け取る必要がある。そのため、現在の実装は "Accept interfaces, Return structs" に従っていないが、それでも良い。
しかし、関数が複数の型を返したいという理由でinterfaceを返して、呼び出し元がtype switchで処理を分岐する、というのがGoのinterfaceの使い方として微妙な気がしてきた。interfaceはあくまで振る舞いを定義するためのもので、型をまとめるためのものではないのでは(じゃあなぜtype switchという構文があるの?というのもあるけど)。
抽象化しない
もっと愚直に実装してみよう。
やりたいことはPacketTypeがPUBLISHかどうかで、FixedHeaderの構造を変えたいだけ。
packetType := PacketType(r) if packetType == PUBLISH { publishFixedHeader := ToPublishFixedHeader(r) ... } else { defaultFixedHeader := ToDefaultFixedHeader(r) ... }
こんな感じのコードが書ければいい。ただ、packetTypeを取得するには1バイトreaderから読み取る必要があって、この1バイトにはPacketType以外の情報も含まれている。そのため、readerを1バイト分巻き戻すか、取得した1バイトをFixedHeaderを生成する関数に渡す必要がある。
bufio.Readerを含んだMQTTReaderという独自のstructを用意することにしよう。このstructに読み込んだ1バイトとreaderとを保持させる。
type MQTTReader struct { byte1 *byte r *bufio.Reader } func NewMQTTReader(r io.Reader) *MQTTReader { bufr := bufio.NewReader(r) return &MQTTReader{r: bufr} } func (d *MQTTReader) ReadPacketType() (PacketType, error) { if d.byte1 == nil { byte1, err := d.r.ReadByte() if err != nil { return PacketType(0), err } d.byte1 = &byte1 } return PacketType(*d.byte1 >> 4), nil }
server.goでは以下のように分岐する。
r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() ... if packetType == packet.PUBLISH { fixedHeader, err := packet.ToPublishFixedHeader(mqttReader) ... err = handler.HandlePublish(fixedHeader, r) ... } else { fixedHeader, err := packet.ToFixedHeader(mqttReader) ... switch packetType { case packet.CONNECT: connack, err := handler.HandleConnect(fixedHeader, r) ... }
ゴリゴリ変更。差分はこちら
今の実装だと
- server.goでFixedHeaderを取得
- handlerで
- VariableHeaderとPayloadを取得
- なんらかの処理
- レスポンス生成
という流れになってる。readerから FixedHeader
と VariableHeader
と Palyload
を保持するstruct(例えば Connect
とか)を作成してパケットを表現した方が良さそう。
server.go
r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil }
例えば、CONNECTパケットなら以下のような実装をする。
// packet/connect.go package packet type Connect struct { FixedHeader *FixedHeader VariableHeader *ConnectVariableHeader Payload *ConnectPayload } func (reader *MQTTReader) ReadConnect() (*Connect, error) { fixedHeader, err := reader.readFixedHeader() if err != nil { return nil, err } variableHeader, err := reader.readConnectVariableHeader() if err != nil { return nil, err } payload, err := reader.readConnectPayload() if err != nil { return nil, err } return &Connect{fixedHeader, variableHeader, payload}, nil } // handler/connect_handle.go package handler func HandleConnect(reader *packet.MQTTReader) (packet.Connack, error) { fmt.Printf("HandleConnect\n") connect, err := reader.ReadConnect() if err != nil { if ce, ok := err.(packet.ConnectError); ok { return ce.Connack(), nil } return packet.Connack{}, err } // TODO variableHeaderとpayloadを使って何かしらの処理 fmt.Printf(" %#v\n", connect.VariableHeader) fmt.Printf(" %#v\n", connect.Payload) return packet.NewConnackForAccepted(), nil }
良い感じ、なんだけどテストで困った。
プライベートな関数のテスト
FixedHeaderを生成するのがパブリックな packet.ToPublishFixedHeader()
からプライベートな readFixedHeader()
というプライベートなメソッドに変更したので、 packet_test
という別パッケージから参照することができず、テストできなくなってしまった。
プライベートな関数などのテストをするパターンがある。
まず、packet/export_test.goというファイルを用意して、テストしたいプライベートメソッドをパブリックにする。
package packet var ExportReadPublishFixedHeader = (*MQTTReader).readPublishFixedHeader
structの実態がないメソッドを変数にセットして、あとからレシーバーを渡してあげる。こんなことができたんですねー、Method valuesというらしい。
あとは、fixed_header_test.goからパブリックに ExportReadPublishFixeder
を呼ぶように書き換えてテストすれば良い。
- got, err := packet.ToPublishFixedHeader(tt.args.r) + got, err := packet.ExportReadPublishFixedHeader(tt.args.r)
なるほどー。export_test.goという _test
というファイル名なのでテストの時のみビルド対象に含まれるため、本番ビルドには影響がないのでパブリックにしても良いでしょう、と。
MQTTReader
に実装を寄せていった差分はこちら
改めてSUBSCRIBEとSUBACK
SUBSCRIBEパケットの仕様。
- 固定ヘッダー
- PacketType
8
- Reservedの部分は 0010 である。チェックして違ったら接続を切る
- PacketType
- 可変ヘッダー
- Packet Identifer 2byte
- Packet Identiferは、PUBLISHパケットの可変ヘッダーにもあった。ただし、PUBLISHパケットは省略可能だったのに対して、SUBSCRIBEパケットでは必須
- ペイロード
ペイロードの仕様を読むと、1回のSUBSCRIBEパケットで複数のトピックをサブスクライブでき、なおかつそれぞれQoSの指定ができるようだ。今のところQoSのことは無視して実装してきてるので後で考えることにする。
実装したものはこちら
SUBSCRIBE, SUBACKを試してみる
よし、試すぞー。
実装してるサーバーを起動。
$ go run app/main.go` server starts at localhost:1883
SUBSCRIBEを送信。
$ mosquitto_sub -i custom-client-id -t hoge
Wiresharkで見てみる。
ちゃんと動いてるー!!
これで以下の通信が実現できた。
- クライアント → CONNECT → サーバー
- クライアント ← CONNACK ← サーバー
- クライアント → SUBSCRIBE → サーバー
- クライアント ← SUBACK ← サーバー
PINGREQ, PINGRESP
次は以下の通信。
- クライアント → PINGREQ → サーバー
- クライアント ← PINGRESP ← サーバー
PINGREQ, PINGRESP共に固定ヘッダーのみ。可変ヘッダーとペイロードはなし。
実装したものはこちら
試してみる。SUBSCRIBEを送信。
$ mosquitto_sub -i custom-client-id -t hoge
その後、接続したまま放っておくと・・・
できました!!
おしまい
クライアントがSubscribeするところまでできました。次回こそは、goroutineを使って複数クライアントを捌き、PublishされたメッセージをSubscribeしてるクライアントに送信する実装をします。
今回の学び。
- interfaceとtype switch
- "Accept interfaces, Return structs"
- プライベートな関数などのテスト