前回の続き。別のgoroutineで発生したエラーをerror channelを使ってハンドリングしてみたいと思います。ハンドリングの処理でmapを使ったのですが、goroutineセーフにするため sync.Map
を使ってみました。
目次。
handleSub でエラー
handleSub
関数でサブスクライバに対してPUBLISHを送っている。
func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) { for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { // FIXME fmt.Println(err) } } }
conn.Write
でエラーだった場合のことを考える。例えば、mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断しておく。その後にmosquitto_pubでパブリッシュすると conn.Write
は err
を返し、サーバーが以下を出力する。
write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection
Broker
がサブスクライバが切断したことに気がつかず、サブスクリプションを削除せずにPUBLISHされたメッセージを配送し続けてしまう。 handleSub
goroutineで発生したエラーを Broker
goroutineに伝えて、 Broker
が不要なサブスクリプションを削除する方法を考える。
別goroutineを受け取るerror channel
PUBLISHメッセージを配送する Broker
-> handlerSub
という方向のchannelとは別に、 error
を handleSub
-> Broker
という方向に流すchannelを用意すれば良い。
現在の Broker
は以下。
// broker.go func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscription) { sMap := newSubscriptionMap() for { select { case sub := <-subscriptions: sMap.put(sub.clientID, sub) case message := <-fromPub: sMap.apply(func(sub *Subscription) { sub.pubToSub <- message }) } } }
以下のように select
で doneSubscriptions
channelも読むようにする。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan error) { sMap := newSubscriptionMap() for { select { case sub := <-subscriptions: sMap.put(sub.clientID, sub) case message := <-fromPub: sMap.apply(func(sub *Subscription) { sub.pubToSub <- message }) + case err := <-doneSubscriptions: + fmt.Println(err) + } } }
doneSubscriptions
channelを使って handleSub
goroutine から Broker
goroutineに error
を流す。
server.goでは3つのchannelを生成することになる。
func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } defer ln.Close() pub := make(chan *packet.Publish) defer close(pub) subscriptions := make(chan *Subscription) defer close(subscriptions) + doneSubscriptions := make(chan *DoneSubscriptionResult) + defer close(doneOfSubscription) go Broker(pub, subscriptions, doneSubscriptions) for { conn, err := ln.Accept() if err != nil { panic(err) } go func() { - err = handle(conn, pub, subscriptions) + err = handle(conn, pub, subscriptions, doneSubscriptions) if err != nil { panic(err) } }() } }
doneSubscriptions
channelが 「Run
関数のgoroutine」 -> 「handle
関数を実行するgoroutine」 -> 「handleSub
関数のgoroutine」と渡されていくことになる。
つまり書き込み可能なchannelが孫goroutineまで渡されることになる。channelの所有者がはっきりしなくなってきた...。
書き込み可能なchannelを関数に渡すのではなく、関数内でchannelとgoroutineを生成し、読み取り専用channelを返すようにする。
具体的には、 handleSub
関数を以下のようにする。
// server.go func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) <-chan error { errCh := make(chan error) go func() { defer close(errCh) for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { errCh <- err } } }() return errCh }
errCh
channelのライフサイクルが handleSubscription
関数内に閉じている。関数内でchannelを生成、関数内でgoroutineを起動し生成したchanelの所有権を移譲する(goroutine内で書き込みと close
をする)。そして関数の返り値は読み取り専用channelとする。
このように1つの関数内にchannelのライフサイクルを閉じ込める書き方が「Go言語による並行処理パターン」では頻出してる。
この handleSub
関数の書き換えによって、 Run
関数で生成した doneSubscriptions
channelを孫goroutine( handleSub
goroutine)まで渡さなくて良くなる。が、 handleSub
関数から返ってきた読み取り専用channelの読み込みでブロックするため新しくgoroutineを作る必要がある。
case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } - sub := make(chan *packet.Publish) - subscriptionToBroker <- sub - go handleSub(conn, sub) + subscription, errCh := handleSub(clientID, conn) + subscriptionToBroker <- subscription + go func() { + err, ok := <-errCh + if !ok { + return + } + doneSubscriptions <- err + }()
サブスクリプションの特定
MQTTのClient ID
これで error
を handleSub
goroutineから Broker
goroutineへ渡すことができそう。 error
を受け取った Broker
はどのSubscriptionを閉じるかをどうやって決めれば良いだろう?
今まで無視してたMQTTのClient IDを使うことにする。Client IDはCONNECT時にペイロードに含まれていた。
- MQTTサーバーを実装しながらGoを学ぶ - その4 テストカバレッジ - yo-kari の 日記
- http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031
CONNECTパケットのハンドリング時に、Client IDを変数にセットしておく。
func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error { defer conn.Close() + var clientID string + for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r)
case packet.CONNECT: - connack, err := handler.HandleConnect(mqttReader) + connect, connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } + clientID = connect.Payload.ClientID case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil {
現在の実装だと以下のように読み取り専用channelを Subscription
としている。
type Subscription chan<- *packet.Publish
これをstructにしてClient IDを持たせるようにする。さらに、 handleSub
goroutineから Broker
goroutineへ伝える error
もsturctで包んでClient IDを関連づける。
// broker.go type Subscription struct { clientID string pubToSub chan<- *packet.Publish } func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) { pub := make(chan *packet.Publish) s := &Subscription{ clientID: clientID, pubToSub: pub, } return s, pub } type DoneSubscriptionResult struct { clientID string err error } func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult { return &DoneSubscriptionResult{clientID, err} } func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { ... }
map
現在の Broker
の実装では Subscription
を配列で管理してる。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションの配列 var ss []Subscription for { select { case sub := <- subscriptions: // channelからサブスクリプションを読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } case err := <-doneSubscriptions: fmt.Println(err) } } }
Subscription
を配列で管理してるのをやめて、キーをClientIDとしたmapを使うことにする。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションのmap var sMap map[string]*Subscription sMap = make(map[string]*Subscription) for { select { case sub := <-subscriptions: // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加 sMap[sub.clientID] = sub case message := <-fromPub: for _, sub := range sMap { sub.pubToSub <- message } case err := <-doneSubscriptions: fmt.Println(err) } } }
次は select
で doneSubscriptions
channelから読み取った時の処理。 close
とmapから削除する処理をすれば良い。
func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションのmap var sMap map[string]*Subscription sMap = make(map[string]*Subscription) for { select { case sub := <-subscriptions: // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加 sMap[sub.clientID] = sub case message := <-fromPub: for _, sub := range sMap { sub.pubToSub <- message } case done := <-doneSubscriptions: fmt.Printf("close subscription: %v\n", done.clientID) if done.err != nil { fmt.Println(done.err) } s, ok := sMap[sub.clientID] if ok { sub := s.(*Subscription) close(s.pubToSub) delete(sMap, done.clientID) } } } }
sync.Mutex
複数goroutineで同じmapを操作しているのが気になる。mapは複数のgoroutineから触れるように設計されていない。スレッドセーフならぬgoroutineセーフではない。
sync.Mutexパッケージを使って同期をとりgoroutineセーフにしておく。
sync.Mutexを使うときは1つの構造体、というか型に閉じておくと良さそう。 type
で subscriptionMap
というstructを作る。
type subscriptionMap struct { mu sync.Mutex value map[string]*Subscription } func newSubscriptionMap() *subscriptionMap { m := make(map[string]*Subscription) return &subscriptionMap{ mu: sync.Mutex{}, value: m, } } func (m *subscriptionMap) get(clientID string) *Subscription { m.mu.Lock() s, ok := sMap[sub.clientID] m.mu.Unlock() if ok { return s.(*Subscription) } return nil } func (m *subscriptionMap) put(clientID string, s *Subscription) { m.mu.Lock() m.value[clientID] = s m.mu.Unlock() } func (m *subscriptionMap) delete(clientID string) { m.mu.Lock() delete(m.value, clientID) m.mu.Unlock() }
と、ここまで書いて、mapに対する range
はどうしたら良いのだろう?という疑問が...
sync.Map
ググってると sync.Map
というのを発見!
- https://golang.org/pkg/sync/#Map
- [PDF] 2017-talks/lightningtalks/BryanCMills-AnOverviewOfSyncMap/An Overview of sync.Map.pdf at master · gophercon/2017-talks · GitHub
- go1.9から追加されたsync.Mapを使う #Go - Qiita
Go Docを見ると sync.Map
で扱う値はinterface{}であることが分かる。 type
を使って自前のstructで包んだ方が良さそう。
結果的にbroker.goは以下のようになった。
type Subscription struct { clientID string pubToSub chan<- *packet.Publish } func NewSubscription(clientID string) (*Subscription, <-chan *packet.Publish) { pub := make(chan *packet.Publish) s := &Subscription{ clientID: clientID, pubToSub: pub, } return s, pub } type DoneSubscriptionResult struct { clientID string err error } func NewDoneSubscriptionResult(clientID string, err error) *DoneSubscriptionResult { return &DoneSubscriptionResult{clientID, err} } type subscriptionMap struct { syncMap sync.Map } func newSubscriptionMap() *subscriptionMap { return &subscriptionMap{} } func (m *subscriptionMap) get(clientID string) *Subscription { s, ok := m.syncMap.Load(clientID) if !ok { return nil } return s.(*Subscription) } func (m *subscriptionMap) put(clientID string, s *Subscription) { m.syncMap.Store(clientID, s) } func (m *subscriptionMap) delete(clientID string) { m.syncMap.Delete(clientID) } func (m *subscriptionMap) apply(f func(s *Subscription)) { m.syncMap.Range(func(k, v interface{}) bool { s := v.(*Subscription) f(s) return true }) } func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan *Subscription, doneSubscriptions <-chan *DoneSubscriptionResult) { // サブスクリプションのmap sMap := newSubscriptionMap() for { select { case sub := <-subscriptions: // channelからサブスクリプションを読み取ったらキーをclientIDとしてmapに追加 sMap.put(sub.clientID, sub) case message := <-fromPub: // 全てのサブスクリプションにメッセージを配送 sMap.apply(func(sub *Subscription) { sub.pubToSub <- message }) case done := <-doneSubscriptions: fmt.Printf("close subscription: %v\n", done.clientID) if done.err != nil { fmt.Println(done.err) } sub := sMap.get(done.clientID) if sub != nil { close(sub.pubToSub) sMap.delete(done.clientID) } } } }
動かす
動かしてみる。
mosquitto_subでサブスクライブした後、すぐにCtrl-Cで切断する。その後にmosquitto_pubでパブリッシュするとサーバーが以下を出力する。
write tcp 127.0.0.1:1883->127.0.0.1:59452: use of closed network connection
ここまでは最初と変わらないけど、もう一度パブリッシュするともう出力されない。これは1回目の Write
でエラーが発生した時に、 Broker
でサブスクリプションの削除ができたから。
これでhandleSubでエラーが発生した場合にBrokerでハンドリングする実装ができた!
おしまい
ただこれだとエラーが発生するまでサブスクリプションが削除されないし、 handleSub
goroutineが残り続ける。本当はクライアントが切断したタイミングでサブスクリプションを削除したい。
次回はContextを使うことでこれを解決しようと思います。
今回の学び。
- MQTTのClient ID
- map
- symc.Map