MQTTサーバーを実装しながらGoを学ぶ - その10 type, "chan<-"と"<-chan"
前回、goroutine, channel, selectを使って Broker
を実装しました。この Broker
をサーバーで利用し、MQTTでメッセージを配送してみます。
その前に <-chan chan<- []byte
という型が読みづらいので type
を使って対応します。その後、読み書き可能なchannelを、 chan<-
, <-chan
という型に変換しgoroutineへ渡しメッセージの配送を実現します。
目次
typeで型をつくる
現在の実装だとサブスクリプションを chan<- []byte
で表現している。channelのchannel( <-chan chan<- []byte
)やchannelの配列( []chan<- []byte
)が登場したので少々読みづらい。
type
を使って chan<- []byte
という型から Subscription
という型を作ることにする。
type Subscription chan<- []byte
Broker
の実装はこうなる。 <-chan chan<- []byte
が <-chan Subscription
、 []chan<- []byte
が []Subscription
となって読みやすい。
// broker.go package mqtt type Subscription chan<- []byte func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
サーバーに組み込む
[]byteからPublishへ変更
サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker
で配送してるメッセージの型を []byte
から packet.Publish
に変更する。 packet.Publish を生成するための packet.NewPublish()
という関数を追加。
func NewPublish(topicName string, message []byte) Publish { variableHeader := NewPublishVariableHeader(topicName) fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message))) return Publish{fixedHeader, variableHeader, message} }
Subscription
と Broker
の定義を変更する。
-type Subscription chan<- []byte +import "github.com/bati11/oreno-mqtt/mqtt/packet" -func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { +type Subscription chan<- *packet.Publish + +func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for {
server.go
実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。
package mqtt import ( "bufio" "fmt" "io" "net" "github.com/bati11/oreno-mqtt/mqtt/handler" "github.com/bati11/oreno-mqtt/mqtt/packet" ) func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } defer ln.Close() pub := make(chan *packet.Publish) defer close(pub) subscriptions := make(chan Subscription) defer close(subscriptions) go Broker(pub, subscriptions) for { conn, err := ln.Accept() if err != nil { panic(err) } go func() { err = handle(conn, pub, subscriptions) if err != nil { panic(err) } }() } } func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error { defer conn.Close() for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む case packet.PINGREQ: pingresp, err := handler.HandlePingreq(mqttReader) if err != nil { return err } _, err = conn.Write(pingresp.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil } } }
後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。
PUBLISHのハンドリング
PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker
へ配送する。
case packet.PUBLISH: - err = handler.HandlePublish(mqttReader) + publish, err := handler.HandlePublish(mqttReader) if err != nil { return err } + publishToBroker <- publish
SUBSCRIBEのハンドリング
SUBSCRIBEのハンドリングでは、Broker
からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub
関数を実行する。
case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } + sub := make(chan *packet.Publish) + subscriptionToBroker <- sub + go handleSub(conn, sub)
sub
というchannelを生成している。このchannelは2つのgoroutineに渡す。
- 1:
subscriptionToBroker
channelに書き込むことでBroker
goroutineに渡す。受け取り側では、型がchan *packet.Publish
からchan<- *packet.Publish
(つまりSubscription
)になる - 2:
handleSub
関数に引数でsub
を渡す。handleSub
関数では<-chan *packet.Publish
として引数の型を定義しておく
これで Broker
から配送されてくるメッセージを handleSub
goroutineで受け取れる。
channelの所有権の観点から見ると、 Broker
goroutineが sub
channelの所有者であり、 handleSub
goroutineが sub
channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish
)と読み取り専用channel( <-chan *packet.Publish
)は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。
handleSub
関数の実装は以下。
func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) { for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { // FIXME fmt.Println(err) } } }
Broker
から配送されてくるメッセージを channelから読み込んで、 net.Conn
に書き込む。 conn
に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。
動かす
実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。
$ mosquitto_sub -i custom-client-id -t hoge
mosquittoクライアントでパブリッシュする。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
サブスクライバの方に "Hello" と表示された!
$ mosquitto_sub -i custom-client-id -t hoge Hello
ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。
Topicのフィルタを無視していますが、そのうち・・・。
おしまい
ここまでの実装は以下にあります(tagが0.0.2)。
server.goで sub
というchannelを生成し Broker
goroutineへ渡しました。 Broker
goroutineはこのchannelの所有者ですが、channelを close
していません。サブスクライバへ送信するために net.Conn
に書き込みましたが、このときエラーが発生した場合は Broker
goroutineでchannelを close
したいです。
次回は、 handleSub
goroutineで発生するエラーのハンドリングを実装してみたいと思います。
今回の学び
MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect
前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。
目次。
サーバーからクライアントへのPUBLISH
MQTTのPUBLISHの仕様を読み直す。
The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.
クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。
サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。
The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.
SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。
MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。
引用元: https://cloud.google.com/pubsub/docs/overview
いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。
引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。
channel
今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。
Broker
という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte
と表現して、サブスクライバを chan<- []byte
と表現する。channelからの値の取り出しには range
を使う。 range
はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。
func Broker(fromPub <-chan []byte, toSub chan<- []byte) { defer close(toSub) for message := range fromPub { toSub <- message } }
channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。
チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。
channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。
どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。
今の実装では、 Broker
関数は toSub
に書き込むので所有権を持つことになる。所有権があるので、 defer
で close
するようにしてる。
所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。
- nilに対してclose
- closeしたchannelをclose
この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。
channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <-
で読み取る時は2つ目の返り値の値を確認するか、 range
で読み込むようにする。 <-
で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。
また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。
channelの所有者を意識しながら、 Broker
を使うコードを書いてみる。
package mqtt_test import ( "testing" "github.com/bati11/oreno-mqtt/mqtt" ) func TestBroker(t *testing.T) { pub := make(chan []byte) sub := make(chan []byte) // "broker" goroutine go mqtt.Broker(pub, sub) // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
"broker" goroutineは sub
channelに対して書き込みと close
を行うので sub
channelの所有者であり、 pub
channelの利用者である。
"pub" goroutineは pub
channelの所有者である。
"sub"以降の処理(main goroutine)は、 sub
channelの利用者である。値を読み込むのは1回だけなので range
ではなく <-
で読み取りをしている。
channelは値
Broker
を実行するときに sub
channelを渡している。 sub
channel、つまり Broker
が引数で受け取ってる chan<- []byte
は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。
しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker
goroutine生成時ではなく、後からgoroutineに chan<- []byte
を渡したい。
Broker
の引数を chan<- []byte
のchannel、つまり <-chan chan<- []byte
とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。
channelのchannelを使うとgoroutine起動後に chan<- []byte
を受け取ることができる。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { for sub := range subscriptions { ...
Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプション( chan<- []byte
)をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } ... }
fromPub
channelからメッセージを読み取ったら、サブスクリプションの配列を range
を使って順にメッセージを送る。以下のようなイメージ。channelに対する range
と配列に対する range
がごちゃ混ぜにならないように注意。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for sub := range subscriptions { // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = apend(ss, sub) } for message := range fromPub { // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } }
しかし、これだと subscriptions
channelが閉じられるまで1つ目の for
が終わらないのでダメ。
select
複数のチャネルから読み取りたいときは、select
を使う。
func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) { // サブスクリプションの配列 var ss []chan<- []byte for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
TestBroker
を書き換えてみる。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE sub := make(chan []byte) subscriptions <- sub // 新たなサブスクリプションをBrokerに送る // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result)) } }
これでOK!複数クライアントがSubscribeしても問題ないことを確認。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }
sync.WaitGroup
ところで、"sub"以降の処理で、 sub1
, sub2
という順番にchannelから読み込んでるが、 Broker
内でサブスクリプション( chan<- []byte
)にメッセージを流す順番に依存してる。仮に sub2
, sub1
と順番を逆にして読み取るとブロックしてテストが終了しない。
sub1
と sub2
を select
で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。
sub1
, sub2
からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker
自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup
を使ってそれぞれのgoroutineの終了を待つ。
func TestBroker(t *testing.T) { pub := make(chan []byte) subscriptions := make(chan chan<- []byte) defer close(subscriptions) // "broker" goroutine go mqtt.Broker(pub, subscriptions) // クライアントからのSUBSCRIBE(1つ目) sub1 := make(chan []byte) subscriptions <- sub1 // クライアントからのSUBSCRIBE(2つ目) sub2 := make(chan []byte) subscriptions <- sub2 // "pub" goroutine go func() { defer close(pub) pub <- []byte("hoge") // クライアントからのPUBLISH }() // "sub" wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result1) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result1)) } }() wg.Add(1) go func() { defer wg.Done() result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる if !ok { t.Fatalf("got %v, want true", ok) } if string(result2) != "hoge" { t.Fatalf("got %v, want \"hoge\"", string(result2)) } }() wg.Wait() }
これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。
おしまい
MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。
でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。
今回の学び
- MQTTのサブスクリプション
- channel
- select
- sync.WaitGroup
MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof
前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。
目次。
複数クライアントからの接続
クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello" Error: Unknown error.
すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error.
が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。
具体的にコードで確認する。
// server.go func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } fmt.Println("server starts at localhost:1883") for { conn, err := ln.Accept() if err != nil { panic(err) } err = handle(conn) if err != nil { fmt.Printf("%v", err) panic(err) } } } func handle(conn net.Conn) error { defer conn.Close() for { r := bufio.NewReader(conn) ... } }
最初のクライアントから接続が来ると、 Accept()
が net.Conn を返してきてhandle内で処理する。handle内では for { }
でループすることで、クライアントとやりとりする。
この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { }
でループしているので、 Accept()
が処理されない。そのため、1つのクライアントしか捌けない。
並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。
1つのクライアントを1つのgoroutineで取り扱うようにする。
--- a/mqtt/server.go +++ b/mqtt/server.go @@ -23,11 +23,13 @@ func Run() { panic(err) } - err = handle(conn) - if err != nil { - fmt.Printf("%v", err) - panic(err) - } + go func(conn net.Conn) { + err = handle(conn) + if err != nil { + fmt.Printf("%v", err) + panic(err) + } + }(conn) } }
goroutine
以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。
goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。
スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。
両者にはほかにも次のような細かい違いがあります。
・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない
Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。
こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。
pprof
goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。
pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。
pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。
- Go pprof 入門編 (CPU Profile とコマンドラインツール) : KLabGames Tech Blog
- Go pprof 応用編 (CPU 以外のプロファイル) : KLabGames Tech Blog
- Go pprof マスター編 (pprof の仕組み) : KLabGames Tech Blog
pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof"
してHTTPサーバーを起動するだけ。
package main -import "github.com/bati11/oreno-mqtt/mqtt" +import ( + "log" + "net/http" + _ "net/http/pprof" + + "github.com/bati11/oreno-mqtt/mqtt" +) func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() mqtt.Run() }
http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!
さらに、Go1.10以降ではGUIでグラフを確認することもできる。
以下のコマンドを実行すると
$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1
ブラウザが立ち上がり、こんなグラフが確認できる!
おしまい
goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。
今回の学び
- goroutine
- pprof