前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。
目次。
サーバーからクライアントへのPUBLISH
MQTTのPUBLISHの仕様を読み直す。
The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.
クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。
サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。
The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.
SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。
MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。
引用元: https://cloud.google.com/pubsub/docs/overview
いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。
引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。
channel
今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。
Broker
という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte
と表現して、サブスクライバを chan<- []byte
と表現する。channelからの値の取り出しには range
を使う。 range
はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。
func Broker(fromPub <-chan []byte, toSub chan<- []byte) { defer close(toSub) for message := range fromPub { toSub <- message } }
channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。
チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。
channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。
どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。
今の実装では、 Broker
関数は toSub
に書き込むので所有権を持つことになる。所有権があるので、 defer
で close
するようにしてる。
所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。
- nilに対してclose
- closeしたchannelをclose
この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。
channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <-
で読み取る時は2つ目の返り値の値を確認するか、 range
で読み込むようにする。 <-
で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。
また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。
channelの所有者を意識しながら、 Broker
を使うコードを書いてみる。
package mqtt_test import ( "testing" "github.com/bati11/oreno-mqtt/mqtt" ) func TestBroker(t *testing.T) { pub := make(chan []byte) sub := make(chan []byte) // "broker" goroutine go mqtt.Broker(pub, sub) // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
"broker" goroutineは sub
channelに対して書き込みと close
を行うので sub
channelの所有者であり、 pub
channelの利用者である。
"pub" goroutineは pub
channelの所有者である。
"sub"以降の処理(main goroutine)は、 sub
channelの利用者である。値を読み込むのは1回だけなので range
ではなく <-
で読み取りをしている。
channelは値
Broker
を実行するときに sub
channelを渡している。 sub
channel、つまり Broker
が引数で受け取ってる chan<- []byte
は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。
しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker
goroutine生成時ではなく、後からgoroutineに chan<- []byte
を渡したい。
Broker
の引数を chan<- []byte
のchannel、つまり <-chan chan<- []byte
とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。
channelのchannelを使うとgoroutine起動後に chan<- []byte
を受け取ることができる。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { for sub := range subscriptions { ...
Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプション( chan<- []byte
)をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } ... }
fromPub
channelからメッセージを読み取ったら、サブスクリプションの配列を range
を使って順にメッセージを送る。以下のようなイメージ。channelに対する range
と配列に対する range
がごちゃ混ぜにならないように注意。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } for message := range fromPub { // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } }
しかし、これだと subscriptions
channelが閉じられるまで1つ目の for
が終わらないのでダメ。
select
複数のチャネルから読み取りたいときは、select
を使う。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
TestBroker
を書き換えてみる。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE sub := make(chan []byte) subscriptions <- sub // 新たなサブスクリプションをBrokerに送る // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
これでOK!複数クライアントがSubscribeしても問題ないことを確認。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }
sync.WaitGroup
ところで、"sub"以降の処理で、 sub1
, sub2
という順番にchannelから読み込んでるが、 Broker
内でサブスクリプション( chan<- []byte
)にメッセージを流す順番に依存してる。仮に sub2
, sub1
と順番を逆にして読み取るとブロックしてテストが終了しない。
sub1
と sub2
を select
で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。
sub1
, sub2
からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker
自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup
を使ってそれぞれのgoroutineの終了を待つ。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } }() wg.Add(1) go func() { defer wg.Done() result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }() wg.Wait() }
これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。
おしまい
MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。
でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。
今回の学び
- MQTTのサブスクリプション
- channel
- select
- sync.WaitGroup