MQTTサーバーを実装しながらGoを学ぶ - その11 goroutineのエラーハンドリング, map, goroutineセーフ
前回の続き。別のgoroutineで発生したエラーをerror channelを使ってハンドリングしてみたいと思います。ハンドリングの処理でmapを使ったのですが、goroutineセーフにするため sync.Map
を使ってみました。
目次。
handleSub でエラー
handleSub
関数でサブスクライバに対してPUBLISHを送っている。
func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) { for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { // FIXME fmt.Println(err) } } }
conn.Write
でエラーだった場合のことを考える。例えば、mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断しておく。その後にmosquitto_pubでパブリッシュすると conn.Write
は err
を返し、サーバーが以下を出力する。
write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection
Broker
がサブスクライバが切断したことに気がつかず、サブスクリプションを削除せずにPUBLISHされたメッセージを配送し続けてしまう。 handleSub
goroutineで発生したエラーを Broker
goroutineに伝えて、 Broker
が不要なサブスクリプションを削除する方法を考える。
別goroutineを受け取るerror channel
PUBLISHメッセージを配送する Broker
-> handlerSub
という方向のchannelとは別に、 error
を handleSub
-> Broker
という方向に流すchannelを用意すれば良い。
現在の Broker
は以下。
// broker.go func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscription) { sMap := newSubscriptionMap() for { select { case sub := <-subscriptions: sMap.put(sub.clientID, sub) case message := <-fromPub: sMap.apply(func(sub *Subscription) { sub.pubToSub <- message }) } } }
以下のように select
で doneSubscriptions
channelも読むようにする。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan error) { sMap := newSubscriptionMap() for { select { case sub := <-subscriptions: sMap.put(sub.clientID, sub) case message := <-fromPub: sMap.apply(func(sub *Subscription) { sub.pubToSub <- message }) + case err := <-doneSubscriptions: + fmt.Println(err) + } } }
doneSubscriptions
channelを使って handleSub
goroutine から Broker
goroutineに error
を流す。
server.goでは3つのchannelを生成することになる。
func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } defer ln.Close() pub := make(chan *packet.Publish) defer close(pub) subscriptions := make(chan *Subscription) defer close(subscriptions) + doneSubscriptions := make(chan *DoneSubscriptionResult) + defer close(doneOfSubscription) go Broker(pub, subscriptions, doneSubscriptions) for { conn, err := ln.Accept() if err != nil { panic(err) } go func() { - err = handle(conn, pub, subscriptions) + err = handle(conn, pub, subscriptions, doneSubscriptions) if err != nil { panic(err) } }() } }
doneSubscriptions
channelが 「Run
関数のgoroutine」 -> 「handle
関数を実行するgoroutine」 -> 「handleSub
関数のgoroutine」と渡されていくことになる。
つまり書き込み可能なchannelが孫goroutineまで渡されることになる。channelの所有者がはっきりしなくなってきた...。
書き込み可能なchannelを関数に渡すのではなく、関数内でchannelとgoroutineを生成し、読み取り専用channelを返すようにする。
具体的には、 handleSub
関数を以下のようにする。
// server.go func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) <-chan error { errCh := make(chan error) go func() { defer close(errCh) for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { errCh <- err } } }() return errCh }
errCh
channelのライフサイクルが handleSubscription
関数内に閉じている。関数内でchannelを生成、関数内でgoroutineを起動し生成したchanelの所有権を移譲する(goroutine内で書き込みと close
をする)。そして関数の返り値は読み取り専用channelとする。
このように1つの関数内にchannelのライフサイクルを閉じ込める書き方が「Go言語による並行処理パターン」では頻出してる。
この handleSub
関数の書き換えによって、 Run
関数で生成した doneSubscriptions
channelを孫goroutine( handleSub
goroutine)まで渡さなくて良くなる。が、 handleSub
関数から返ってきた読み取り専用channelの読み込みでブロックするため新しくgoroutineを作る必要がある。
case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } - sub := make(chan *packet.Publish) - subscriptionToBroker <- sub - go handleSub(conn, sub) + subscription, errCh := handleSub(clientID, conn) + subscriptionToBroker <- subscription + go func() { + err, ok := <-errCh + if !ok { + return + } + doneSubscriptions <- err + }()
サブスクリプションの特定
MQTTのClient ID
これで error
を handleSub
goroutineから Broker
goroutineへ渡すことができそう。 error
を受け取った Broker
はどのSubscriptionを閉じるかをどうやって決めれば良いだろう?
今まで無視してたMQTTのClient IDを使うことにする。Client IDはCONNECT時にペイロードに含まれていた。
- MQTTサーバーを実装しながらGoを学ぶ - その4 テストカバレッジ - yo-kari の 日記
- http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031
CONNECTパケットのハンドリング時に、Client IDを変数にセットしておく。
func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error { defer conn.Close() + var clientID string + for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r)
case packet.CONNECT: - connack, err := handler.HandleConnect(mqttReader) + connect, connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } + clientID = connect.Payload.ClientID case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil {
現在の実装だと以下のように読み取り専用channelを Subscription
としている。
type Subscription chan<- *packet.Publish
これをstructにしてClient IDを持たせるようにする。さらに、 handleSub
goroutineから Broker
goroutineへ伝える error
もsturctで包んでClient IDを関連づける。
// broker.go type Subscription struct { clientID string pubToSub chan<- *packet.Publish } func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) { pub := make(chan *packet.Publish) s := &Subscription{ clientID: clientID, pubToSub: pub, } return s, pub } type DoneSubscriptionResult struct { clientID string err error } func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult { return &DoneSubscriptionResult{clientID, err} } func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { ... }
map
現在の Broker
の実装では Subscription
を配列で管理してる。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションの配列 var ss []Subscription for { select { case sub := <- subscriptions: // channelからサブスクリプションを読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } case err := <-doneSubscriptions: fmt.Println(err) } } }
Subscription
を配列で管理してるのをやめて、キーをClientIDとしたmapを使うことにする。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションのmap var sMap map[string]*Subscription sMap = make(map[string]*Subscription) for { select { case sub := <-subscriptions: // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加 sMap[sub.clientID] = sub case message := <-fromPub: for _, sub := range sMap { sub.pubToSub <- message } case err := <-doneSubscriptions: fmt.Println(err) } } }
次は select
で doneSubscriptions
channelから読み取った時の処理。 close
とmapから削除する処理をすれば良い。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションのmap var sMap map[string]*Subscription sMap = make(map[string]*Subscription) for { select { case sub := <-subscriptions: // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加 sMap[sub.clientID] = sub case message := <-fromPub: for _, sub := range sMap { sub.pubToSub <- message } case done := <-doneSubscriptions: fmt.Printf("close subscription: %v\n", done.clientID) if done.err != nil { fmt.Println(done.err) } s, ok := sMap[sub.clientID] if ok { sub := s.(*Subscription) close(s.pubToSub) delete(sMap, done.clientID) } } } }
sync.Mutex
複数goroutineで同じmapを操作しているのが気になる。mapは複数のgoroutineから触れるように設計されていない。スレッドセーフならぬgoroutineセーフではない。
sync.Mutexパッケージを使って同期をとりgoroutineセーフにしておく。
sync.Mutexを使うときは1つの構造体、というか型に閉じておくと良さそう。 type
で subscriptionMap
というstructを作る。
type subscriptionMap struct { mu sync.Mutex value map[string]*Subscription } func newSubscriptionMap() *subscriptionMap { m := make(map[string]*Subscription) return &subscriptionMap{ mu: sync.Mutex{}, value: m, } } func (m *subscriptionMap) get(clientID string) *Subscription { m.mu.Lock() s, ok := sMap[sub.clientID] m.mu.Unlock() if ok { return s.(*Subscription) } return nil } func (m *subscriptionMap) put(clientID string, s *Subscription) { m.mu.Lock() m.value[clientID] = s m.mu.Unlock() } func (m *subscriptionMap) delete(clientID string) { m.mu.Lock() delete(m.value, clientID) m.mu.Unlock() }
と、ここまで書いて、mapに対する range
はどうしたら良いのだろう?という疑問が...
sync.Map
ググってると sync.Map
というのを発見!
- https://golang.org/pkg/sync/#Map
- [PDF] 2017-talks/lightningtalks/BryanCMills-AnOverviewOfSyncMap/An Overview of sync.Map.pdf at master · gophercon/2017-talks · GitHub
- go1.9から追加されたsync.Mapを使う #Go - Qiita
Go Docを見ると sync.Map
で扱う値はinterface{}であることが分かる。 type
を使って自前のstructで包んだ方が良さそう。
結果的にbroker.goは以下のようになった。
type Subscription struct { clientID string pubToSub chan<- *packet.Publish } func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) { pub := make(chan *packet.Publish) s := &Subscription{ clientID: clientID, pubToSub: pub, } return s, pub } type DoneSubscriptionResult struct { clientID string err error } func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult { return &DoneSubscriptionResult{clientID, err} } type subscriptionMap struct { syncMap sync.Map } func newSubscriptionMap() *subscriptionMap { return &subscriptionMap{} } func (m *subscriptionMap) get(clientID string) *Subscription { s, ok := m.syncMap.Load(clientID) if !ok { return nil } return s.(*Subscription) } func (m *subscriptionMap) put(clientID string, s *Subscription) { m.syncMap.Store(clientID, s) } func (m *subscriptionMap) delete(clientID string) { m.syncMap.Delete(clientID) } func (m *subscriptionMap) apply(f func(s *Subscription)) { m.syncMap.Range(func(k, v interface{}) bool { s := v.(*Subscription) f(s) return true }) } func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションのmap sMap := newSubscriptionMap() for { select { case sub := <-subscriptions: // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加 sMap.put(sub.clientID, sub) case message := <-fromPub: // 全てのサブスクリプションにメッセージを配送 sMap.apply(func(sub *Subscription) { sub.pubToSub <- message }) case done := <-doneSubscriptions: fmt.Printf("close subscription: %v\n", done.clientID) if done.err != nil { fmt.Println(done.err) } sub := sMap.get(done.clientID) if sub != nil { close(sub.pubToSub) sMap.delete(done.clientID) } } } }
動かす
動かしてみる。
mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断する。その後にmosquitto_pubでパブリッシュするとサーバーが以下を出力する。
write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection
ここまでは最初と変わらないけど、もう一度パブリッシュするともう出力されない。これは1回目の Write
でエラーが発生した時に、 Broker
でサブスクリプションの削除ができたから。
これでhandleSubでエラーが発生した場合にBrokerでハンドリングする実装ができた!
おしまい
ただこれだとエラーが発生するまでサブスクリプションが削除されないし、 handleSub
goroutineが残り続ける。本当はクライアントが切断したタイミングでサブスクリプションを削除したい。
次回はContextを使うことでこれを解決しようと思います。
今回の学び。
- MQTTのClient ID
- map
- symc.Map
MQTTサーバーを実装しながらGoを学ぶ - その10 type, "chan<-"と"<-chan"
前回、goroutine, channel, selectを使って Broker
を実装しました。この Broker
をサーバーで利用し、MQTTでメッセージを配送してみます。
その前に <-chan chan<- []byte
という型が読みづらいので type
を使って対応します。その後、読み書き可能なchannelを、 chan<-
, <-chan
という型に変換しgoroutineへ渡しメッセージの配送を実現します。
目次
typeで型をつくる
現在の実装だとサブスクリプションを chan<- []byte
で表現している。channelのchannel( <-chan chan<- []byte
)やchannelの配列( []chan<- []byte
)が登場したので少々読みづらい。
type
を使って chan<- []byte
という型から Subscription
という型を作ることにする。
type Subscription chan<- []byte
Broker
の実装はこうなる。 <-chan chan<- []byte
が <-chan Subscription
、 []chan<- []byte
が []Subscription
となって読みやすい。
// broker.go package mqtt type Subscription chan<- []byte func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
サーバーに組み込む
[]byteからPublishへ変更
サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker
で配送してるメッセージの型を []byte
から packet.Publish
に変更する。 packet.Publish を生成するための packet.NewPublish()
という関数を追加。
func NewPublish(topicName string, message []byte) Publish { variableHeader := NewPublishVariableHeader(topicName) fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message))) return Publish{fixedHeader, variableHeader, message} }
Subscription
と Broker
の定義を変更する。
-type Subscription chan<- []byte +import "github.com/bati11/oreno-mqtt/mqtt/packet" -func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { +type Subscription chan<- *packet.Publish + +func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for {
server.go
実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。
package mqtt import ( "bufio" "fmt" "io" "net" "github.com/bati11/oreno-mqtt/mqtt/handler" "github.com/bati11/oreno-mqtt/mqtt/packet" ) func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } defer ln.Close() pub := make(chan *packet.Publish) defer close(pub) subscriptions := make(chan Subscription) defer close(subscriptions) go Broker(pub, subscriptions) for { conn, err := ln.Accept() if err != nil { panic(err) } go func() { err = handle(conn, pub, subscriptions) if err != nil { panic(err) } }() } } func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error { defer conn.Close() for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む case packet.PINGREQ: pingresp, err := handler.HandlePingreq(mqttReader) if err != nil { return err } _, err = conn.Write(pingresp.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil } } }
後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。
PUBLISHのハンドリング
PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker
へ配送する。
case packet.PUBLISH: - err = handler.HandlePublish(mqttReader) + publish, err := handler.HandlePublish(mqttReader) if err != nil { return err } + publishToBroker <- publish
SUBSCRIBEのハンドリング
SUBSCRIBEのハンドリングでは、Broker
からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub
関数を実行する。
case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } + sub := make(chan *packet.Publish) + subscriptionToBroker <- sub + go handleSub(conn, sub)
sub
というchannelを生成している。このchannelは2つのgoroutineに渡す。
- 1:
subscriptionToBroker
channelに書き込むことでBroker
goroutineに渡す。受け取り側では、型がchan *packet.Publish
からchan<- *packet.Publish
(つまりSubscription
)になる - 2:
handleSub
関数に引数でsub
を渡す。handleSub
関数では<-chan *packet.Publish
として引数の型を定義しておく
これで Broker
から配送されてくるメッセージを handleSub
goroutineで受け取れる。
channelの所有権の観点から見ると、 Broker
goroutineが sub
channelの所有者であり、 handleSub
goroutineが sub
channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish
)と読み取り専用channel( <-chan *packet.Publish
)は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。
handleSub
関数の実装は以下。
func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) { for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { // FIXME fmt.Println(err) } } }
Broker
から配送されてくるメッセージを channelから読み込んで、 net.Conn
に書き込む。 conn
に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。
動かす
実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。
$ mosquitto_sub -i custom-client-id -t hoge
mosquittoクライアントでパブリッシュする。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
サブスクライバの方に "Hello" と表示された!
$ mosquitto_sub -i custom-client-id -t hoge Hello
ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。
Topicのフィルタを無視していますが、そのうち・・・。
おしまい
ここまでの実装は以下にあります(tagが0.0.2)。
server.goで sub
というchannelを生成し Broker
goroutineへ渡しました。 Broker
goroutineはこのchannelの所有者ですが、channelを close
していません。サブスクライバへ送信するために net.Conn
に書き込みましたが、このときエラーが発生した場合は Broker
goroutineでchannelを close
したいです。
次回は、 handleSub
goroutineで発生するエラーのハンドリングを実装してみたいと思います。
今回の学び
MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect
前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。
目次。
サーバーからクライアントへのPUBLISH
MQTTのPUBLISHの仕様を読み直す。
The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.
クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。
サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。
The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.
SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。
MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。
引用元: https://cloud.google.com/pubsub/docs/overview
いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。
引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。
channel
今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。
Broker
という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte
と表現して、サブスクライバを chan<- []byte
と表現する。channelからの値の取り出しには range
を使う。 range
はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。
func Broker(fromPub <-chan []byte, toSub chan<- []byte) { defer close(toSub) for message := range fromPub { toSub <- message } }
channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。
チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。
channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。
どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。
今の実装では、 Broker
関数は toSub
に書き込むので所有権を持つことになる。所有権があるので、 defer
で close
するようにしてる。
所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。
- nilに対してclose
- closeしたchannelをclose
この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。
channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <-
で読み取る時は2つ目の返り値の値を確認するか、 range
で読み込むようにする。 <-
で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。
また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。
channelの所有者を意識しながら、 Broker
を使うコードを書いてみる。
package mqtt_test import ( "testing" "github.com/bati11/oreno-mqtt/mqtt" ) func TestBroker(t *testing.T) { pub := make(chan []byte) sub := make(chan []byte) // "broker" goroutine go mqtt.Broker(pub, sub) // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
"broker" goroutineは sub
channelに対して書き込みと close
を行うので sub
channelの所有者であり、 pub
channelの利用者である。
"pub" goroutineは pub
channelの所有者である。
"sub"以降の処理(main goroutine)は、 sub
channelの利用者である。値を読み込むのは1回だけなので range
ではなく <-
で読み取りをしている。
channelは値
Broker
を実行するときに sub
channelを渡している。 sub
channel、つまり Broker
が引数で受け取ってる chan<- []byte
は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。
しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker
goroutine生成時ではなく、後からgoroutineに chan<- []byte
を渡したい。
Broker
の引数を chan<- []byte
のchannel、つまり <-chan chan<- []byte
とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。
channelのchannelを使うとgoroutine起動後に chan<- []byte
を受け取ることができる。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { for sub := range subscriptions { ...
Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプション( chan<- []byte
)をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } ... }
fromPub
channelからメッセージを読み取ったら、サブスクリプションの配列を range
を使って順にメッセージを送る。以下のようなイメージ。channelに対する range
と配列に対する range
がごちゃ混ぜにならないように注意。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } for message := range fromPub { // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } }
しかし、これだと subscriptions
channelが閉じられるまで1つ目の for
が終わらないのでダメ。
select
複数のチャネルから読み取りたいときは、select
を使う。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
TestBroker
を書き換えてみる。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE sub := make(chan []byte) subscriptions <- sub // 新たなサブスクリプションをBrokerに送る // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
これでOK!複数クライアントがSubscribeしても問題ないことを確認。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }
sync.WaitGroup
ところで、"sub"以降の処理で、 sub1
, sub2
という順番にchannelから読み込んでるが、 Broker
内でサブスクリプション( chan<- []byte
)にメッセージを流す順番に依存してる。仮に sub2
, sub1
と順番を逆にして読み取るとブロックしてテストが終了しない。
sub1
と sub2
を select
で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。
sub1
, sub2
からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker
自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup
を使ってそれぞれのgoroutineの終了を待つ。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } }() wg.Add(1) go func() { defer wg.Done() result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }() wg.Wait() }
これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。
おしまい
MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。
でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。
今回の学び
- MQTTのサブスクリプション
- channel
- select
- sync.WaitGroup