前回、別goroutineで発生したエラーハンドリングをしました。具体的にはサブスクライバへの書き込みでエラーが発生した場合にサブスクリプションの削除処理するようにしました。今回は、 context.Context
を使ってgoroutineを停止することで、クライアントが接続を切ったタイミングでサブスクリプションの削除をするようにしてみます。
目次。
今のgoroutine
現状の実装だと、クライアントからSUBSCRIBEパケットが送信されてくると、いくつかのgoroutineが生成され以下のような親子関係になる。
- main
- サーバーのメインgoroutine
Run
関数内のconn, err := ln.Accept()
でブロックしてる
- Broker
- "main" goroutineによって1つだけ生成される
- パブリッシャからのメッセージをサブスクリプションへ配送するgoroutine
Broker
関数内のfor ... select ...
で無限ループしてる
- handle
- "main" goroutineによってクライアントからのTCP接続がある度に生成される
- MQTTパケットを受け取り処理する
handle
関数内からmqttReader.ReadPacketType()
の呼び出し、最終的にはnet.Conn
に対するRead
でブロックしてる
- handleSub
- "handle" goroutineによってクライアントからのSUBSCRIBEパケットにより生成される
Broker
からchannel経由でPUBLISHを受け取り、net.Conn
に書き込む- channelの読み取りでブロックしてる
- handle内の無名関数
- "handle" goroutineによってクライアントからのSUBSCRIBEパケットにより生成される
handleSub
関数で発生したerror
をerror channel経由で読み取りBroker
に伝える- channelの読み取りでブロックしてる
net.Conn
に対する Read
をしているのは"handle" goroutineなので、サブスクライバが切断したことは"handle" goroutineで処理できるはず。
goroutineリーク
サブスクライバが切断、つまりmosquitto_subをCtrl-Cで終了したとき、handleSubのgoroutineが減らない。
2つmosquitto_subを実行してる状態でpprofを使いgoroutieの状態を見てみる。pprofは導入済みなので http://localhost:6060/debug/pprof/goroutine?debug=1
にアクセスすれば良い。
結果の一部は以下。
2 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327a6b 0x1328457 0x105ca31 # 0x1029d55 internal/poll.runtime_pollWait+0x65 /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173 # 0x108e229 internal/poll.(*pollDesc).wait+0x99 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85 # 0x108e33c internal/poll.(*pollDesc).waitRead+0x3c /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90 # 0x108f0e5 internal/poll.(*FD).Read+0x1d5 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169 # 0x118011e net.(*netFD).Read+0x4e /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202 # 0x11926f7 net.(*conn).Read+0x67 /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177 # 0x11330ee bufio.(*Reader).fill+0x10e /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100 # 0x1133988 bufio.(*Reader).ReadByte+0x38 /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242 # 0x13239b6 github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20 # 0x1327a6a github.com/bati11/oreno-mqtt/mqtt.handle+0x13a /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:54 # 0x1328456 github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:38 2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x1328501 0x105ca31 # 0x1328500 github.com/bati11/oreno-mqtt/mqtt.handle.func1+0x40 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:91 2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x132865c 0x105ca31 # 0x132865b github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0x6b /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:118
3つのブロックがあるが、最初のブロックは packet.(*MQTTReader).ReadPacketType+0x76
とあり mqttReader.ReadPacketType()
の呼び出しのことなので "handle" goroutineである。先頭の 2
という数値から "handle" goroutineが2つ存在してることになる。
2つめのブロックは、"handle" goroutineの数は 1
に減っているが、"handle
関数内の無名関数" goroutine、3つめのブロックは "handleSub" goroutineである。2つサブスクライバがいるので、goroutineの数もそれぞれ2つである。
1つCtrl-Cで終了してから再度状態を確認。"handle
内の無名関数" goroutineと"handleSub" goroutineが減ってない。
goroutine profile: total 10 2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x1328501 0x105ca31 # 0x1328500 github.com/bati11/oreno-mqtt/mqtt.handle.func1+0x40 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:91 2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x132865c 0x105ca31 # 0x132865b github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0x6b /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:118 1 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327a6b 0x1328457 0x105ca31 # 0x1029d55 internal/poll.runtime_pollWait+0x65 /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173 # 0x108e229 internal/poll.(*pollDesc).wait+0x99 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85 # 0x108e33c internal/poll.(*pollDesc).waitRead+0x3c /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90 # 0x108f0e5 internal/poll.(*FD).Read+0x1d5 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169 # 0x118011e net.(*netFD).Read+0x4e /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202 # 0x11926f7 net.(*conn).Read+0x67 /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177 # 0x11330ee bufio.(*Reader).fill+0x10e /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100 # 0x1133988 bufio.(*Reader).ReadByte+0x38 /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242 # 0x13239b6 github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20 # 0x1327a6a github.com/bati11/oreno-mqtt/mqtt.handle+0x13a /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:54 # 0x1328456 github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:38
goroutineが残り続けてしまうことを、goroutineリークというらしい。
"handleSub" goroutineの親goroutineである "handle" goroutineが終了するときに、子goroutineを終了させたい。
おなじみの「Go言語による並行処理」に以下のように書いてある。

- 作者: Katherine Cox-Buday,山口能迪
- 出版社/メーカー: オライリージャパン
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
4.3 ゴルーチンリークを避ける
もしあるゴルーチンがゴルーチンの生成の責任を持っているのであれば、そのゴルーチンを停止できるようにする責任もあります。
書籍では done
チャネルを使った方式と、それがGo1.7からは context
パッケージとして標準化されたことが書いてある。
contextパッケージ
context.Contextを使う。
context.Contextを使うことで、親goroutineから子goroutineを停止することができる。
diff --git a/mqtt/server.go b/mqtt/server.go index 5f53e6f..a050e0a 100644 --- a/mqtt/server.go +++ b/mqtt/server.go @@ -2,6 +2,7 @@ package mqtt import ( "bufio" + "context" "fmt" "io" "net" @@ -48,6 +49,9 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT var clientID string + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) @@ -85,16 +89,21 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT if err != nil { return err } - subscription, errCh := handleSub(clientID, conn) + subscription, errCh := handleSub(ctx, clientID, conn) subscriptionToBroker <- subscription - go func() { - err, ok := <-errCh - if !ok { - return + go func(ctx context.Context) { + var result *DoneSubscriptionResult + select { + case <-ctx.Done(): + result = NewDoneSubscriptionResult(subscription.clientID, nil) + case err, ok := <-errCh: + if !ok { + return + } + result = NewDoneSubscriptionResult(subscription.clientID, err) } - done := NewDoneSubscriptionResult(subscription.clientID, err) - doneSubscriptions <- done - }() + doneSubscriptions <- result + }(ctx) case packet.PINGREQ: pingresp, err := handler.HandlePingreq(mqttReader) if err != nil { @@ -110,16 +119,24 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT } } -func handleSub(clientID string, conn net.Conn) (*Subscription, <-chan error) { +func handleSub(ctx context.Context, clientID string, conn net.Conn) (*Subscription, <-chan error) { errCh := make(chan error) subscription, pubFromBroker := NewSubscription(clientID) go func() { defer close(errCh) - for publishMessage := range pubFromBroker { - bs := publishMessage.ToBytes() - _, err := conn.Write(bs) - if err != nil { - errCh <- err + for { + select { + case <-ctx.Done(): + return + case publishedMessage, ok := <-pubFromBroker: + if !ok { + return + } + bs := publishedMessage.ToBytes() + _, err := conn.Write(bs) + if err != nil { + errCh <- err + } } } }()
試してみる。
2つsub
goroutine profile: total 12 2 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327abc 0x1328527 0x105ca31 # 0x1029d55 internal/poll.runtime_pollWait+0x65 /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173 # 0x108e229 internal/poll.(*pollDesc).wait+0x99 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85 # 0x108e33c internal/poll.(*pollDesc).waitRead+0x3c /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90 # 0x108f0e5 internal/poll.(*FD).Read+0x1d5 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169 # 0x118011e net.(*netFD).Read+0x4e /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202 # 0x11926f7 net.(*conn).Read+0x67 /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177 # 0x11330ee bufio.(*Reader).fill+0x10e /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100 # 0x1133988 bufio.(*Reader).ReadByte+0x38 /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242 # 0x13239b6 github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20 # 0x1327abb github.com/bati11/oreno-mqtt/mqtt.handle+0x18b /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:58 # 0x1328526 github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:39 2 @ 0x102f20b 0x103ef16 0x132865f 0x105ca31 # 0x132865e github.com/bati11/oreno-mqtt/mqtt.handle.func1+0xce /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:95 2 @ 0x102f20b 0x103ef16 0x1328890 0x105ca31 # 0x132888f github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0xff /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:127
1つをCtrl-Cで閉じる
goroutine profile: total 8 1 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327abc 0x1328527 0x105ca31 # 0x1029d55 internal/poll.runtime_pollWait+0x65 /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173 # 0x108e229 internal/poll.(*pollDesc).wait+0x99 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85 # 0x108e33c internal/poll.(*pollDesc).waitRead+0x3c /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90 # 0x108f0e5 internal/poll.(*FD).Read+0x1d5 /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169 # 0x118011e net.(*netFD).Read+0x4e /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202 # 0x11926f7 net.(*conn).Read+0x67 /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177 # 0x11330ee bufio.(*Reader).fill+0x10e /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100 # 0x1133988 bufio.(*Reader).ReadByte+0x38 /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242 # 0x13239b6 github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20 # 0x1327abb github.com/bati11/oreno-mqtt/mqtt.handle+0x18b /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:58 # 0x1328526 github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:39 1 @ 0x102f20b 0x103ef16 0x132865f 0x105ca31 # 0x132865e github.com/bati11/oreno-mqtt/mqtt.handle.func1+0xce /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:95 1 @ 0x102f20b 0x103ef16 0x1328890 0x105ca31 # 0x132888f github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0xff /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:127
ちゃんと"handle
内の無名関数" goroutineと"handleSub" goroutineが減ってる。
おしまい
context.Context
を使って、子goroutineを停止させることができました。 ctx
をどんどん渡していけば孫goroutineやひ孫goroutineなども同じように停止させることができます。Goでは複数の並行処理を協調させる仕組みが色々あって良いですね!
コードはこちら。
今回の学び。