MQTTサーバーを実装しながらGoを学ぶ - その10 type, "chan<-"と"<-chan"

前回、goroutine, channel, selectを使って Broker を実装しました。この Broker をサーバーで利用し、MQTTでメッセージを配送してみます。

その前に <-chan chan<- []byte という型が読みづらいので type を使って対応します。その後、読み書き可能なchannelを、 chan<- , <-chan という型に変換しgoroutineへ渡しメッセージの配送を実現します。

目次

typeで型をつくる

現在の実装だとサブスクリプションchan<- []byte で表現している。channelのchannel( <-chan chan<- []byte )やchannelの配列( []chan<- []byte )が登場したので少々読みづらい。

type を使って chan<- []byte という型から Subscription という型を作ることにする。

type Subscription chan<- []byte

Broker の実装はこうなる。 <-chan chan<- []byte<-chan Subscription[]chan<- []byte[]Subscription となって読みやすい。

// broker.go
package mqtt

type Subscription chan<- []byte

func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) {
    // サブスクリプションの配列
    var ss []Subscription
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

サーバーに組み込む

[]byteからPublishへ変更

サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker で配送してるメッセージの型を []byte から packet.Publish に変更する。 packet.Publish を生成するための packet.NewPublish() という関数を追加。

func NewPublish(topicName string, message []byte) Publish {
    variableHeader := NewPublishVariableHeader(topicName)
    fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message)))
    return Publish{fixedHeader, variableHeader, message}
}

SubscriptionBroker の定義を変更する。

-type Subscription chan<- []byte
+import "github.com/bati11/oreno-mqtt/mqtt/packet"
 
-func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) {
+type Subscription chan<- *packet.Publish
+
+func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) {
     // サブスクリプションの配列
     var ss []Subscription
     for {

server.go

実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。

package mqtt

import (
    "bufio"
    "fmt"
    "io"
    "net"

    "github.com/bati11/oreno-mqtt/mqtt/handler"
    "github.com/bati11/oreno-mqtt/mqtt/packet"
)

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    pub := make(chan *packet.Publish)
    defer close(pub)
    subscriptions := make(chan Subscription)
    defer close(subscriptions)

    go Broker(pub, subscriptions)

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        go func() {
            err = handle(conn, pub, subscriptions)
            if err != nil {
                panic(err)
            }
        }()
    }
}

func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        mqttReader := packet.NewMQTTReader(r)
        packetType, err := mqttReader.ReadPacketType()
        if err != nil {
            if err == io.EOF {
                return nil
            }
            return err
        }
        switch packetType {
        case packet.PUBLISH:
            err = handler.HandlePublish(mqttReader)
            if err != nil {
                return err
            }
            // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む
        case packet.CONNECT:
            connack, err := handler.HandleConnect(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(connack.ToBytes())
            if err != nil {
                return err
            }
        case packet.SUBSCRIBE:
            suback, err := handler.HandleSubscribe(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(suback.ToBytes())
            if err != nil {
                return err
            }
            // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む
        case packet.PINGREQ:
            pingresp, err := handler.HandlePingreq(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(pingresp.ToBytes())
            if err != nil {
                return err
            }
        case packet.DISCONNECT:
            return nil
        }
    }
}

後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。

PUBLISHのハンドリング

PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker へ配送する。

                case packet.PUBLISH:
-                       err = handler.HandlePublish(mqttReader)
+                       publish, err := handler.HandlePublish(mqttReader)
                        if err != nil {
                                return err
                        }
+                       publishToBroker <- publish

SUBSCRIBEのハンドリング

SUBSCRIBEのハンドリングでは、Broker からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub 関数を実行する。

                case packet.SUBSCRIBE:
                        suback, err := handler.HandleSubscribe(mqttReader)
                        if err != nil {
                            return err
                        }
                        _, err = conn.Write(suback.ToBytes())
                        if err != nil {
                                return err
                        }
+                       sub := make(chan *packet.Publish)
+                       subscriptionToBroker <- sub
+                       go handleSub(conn, sub)

sub というchannelを生成している。このchannelは2つのgoroutineに渡す。

  • 1: subscriptionToBroker channelに書き込むことで Broker goroutineに渡す。受け取り側では、型が chan *packet.Publish から chan<- *packet.Publish (つまり Subscription )になる
  • 2: handleSub 関数に引数で sub を渡す。 handleSub 関数では <-chan *packet.Publish として引数の型を定義しておく

これで Broker から配送されてくるメッセージを handleSub goroutineで受け取れる。

channelの所有権の観点から見ると、 Broker goroutineが sub channelの所有者であり、 handleSub goroutineが sub channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish )と読み取り専用channel( <-chan *packet.Publish )は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。

handleSub 関数の実装は以下。

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) {
    for publishMessage := range fromBroker {
        bs := publishMessage.ToBytes()
        _, err := conn.Write(bs)
        if err != nil {
            // FIXME
            fmt.Println(err)
        }
    }
}

Broker から配送されてくるメッセージを channelから読み込んで、 net.Conn に書き込む。 conn に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。

動かす

実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。

$ mosquitto_sub -i custom-client-id -t hoge

mosquittoクライアントでパブリッシュする。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"

サブスクライバの方に "Hello" と表示された!

$ mosquitto_sub -i custom-client-id -t hoge
Hello

ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。

Topicのフィルタを無視していますが、そのうち・・・。

おしまい

ここまでの実装は以下にあります(tagが0.0.2)。

github.com

server.goで sub というchannelを生成し Broker goroutineへ渡しました。 Broker goroutineはこのchannelの所有者ですが、channelを close していません。サブスクライバへ送信するために net.Conn に書き込みましたが、このときエラーが発生した場合は Broker goroutineでchannelを close したいです。

次回は、 handleSub goroutineで発生するエラーのハンドリングを実装してみたいと思います。

今回の学び

MQTTサーバーを実装しながらGoを学ぶ - その9 channelとselect

前回はgoroutineを使って、複数クライアントと同時に接続できるようにしました。今回は、複数のgoroutine間をchannelをつないでメッセージやりとりし、Publishされたメッセージを別のSubscribeしているクライアントに配送します。channelの所有権やselectを組み合わせてchannelをより便利に使えるようにします。

目次。

サーバーからクライアントへのPUBLISH

MQTTのPUBLISHの仕様を読み直す。

The Client uses a PUBLISH Packet to send an Application Message to the Server, for distribution to Clients with matching subscriptions. The Server uses a PUBLISH Packet to send an Application Message to each Client which has a matching subscription.

クライアントからPUBLISHパケットを受け取ったサーバーは、マッチするサブスクリプションのクライアントへメッセージを配布する。このとき、サーバーからクライアントへのPUBLISHパケットを送信する。

サブスクリプションとはなんだったというと、SUBSCRIBEの仕様を読み直す。

The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to forward Application Messages that were published to Topics that match these Subscriptions.

SUBSCRIBEを送ってきたクライアントに対してサーバーが作成するものでTopicとサブスクライブしてるクライアントを結びつけている。

MQTTじゃないけど、GCPの Cloud PubSub の図が分かりやすいので拝借。

https://cloud.google.com/pubsub/images/many-to-many.svg

引用元: https://cloud.google.com/pubsub/docs/overview

いつもの以下の図だとサブスクリプションは登場していないけど、上記の通り接続中のサブスクライバがどのTopicのメッセージを購読しているのかはサブスクリプションという概念で管理している。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元: https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/

というわけで、Publishされたメッセージをサブスクリプションへ配布する処理を実装していく。

channel

今のところサーバーの実装では、クライアントごとにgoroutineを生成している。goroutine間でデータをやりとりするには、channelが使える。

Broker という関数を作り、パブリッシャから読み込んだメッセージをサブスクライバに書き込む。パブリッシャを <-chan []byte と表現して、サブスクライバを chan<- []byte と表現する。channelからの値の取り出しには range を使う。 range はchannelが閉じられるまで値を取り出し続ける(もしくは値が来るまでブロックして待つ)。

func Broker(fromPub <-chan []byte, toSub chan<- []byte) {
    defer close(toSub)
    for message := range fromPub {
        toSub <- message
    }
}

channelは読み取り専用もしくは書き込み専用にすることができる。これを利用してchannelの所有権を持つgoroutineを決めよう、ということが 「Go言語による並行処理」に書かれている。

チャネルの所有権を割り振ることです。ここでは所有権を、チャネルを初期化し、書き込み、閉じるゴルーチンとして定義します。

channelの所有権を持つgoroutineと、channelを利用するだけのgoroutineに分けて考える。

どのゴルーチンがチャネルを所有しているかをはっきりさせることが重要です。単方向チャネルを宣言することはチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できるようにするための道具です。チャネルを所有しているゴルーチンにはチャネルに対する書き込み権限(chanまたはchan<-)があり、チャネルを利用するだけのゴルーチンには読み込み専用権限(<-chan)しかありません。

今の実装では、 Broker 関数は toSub に書き込むので所有権を持つことになる。所有権があるので、 deferclose するようにしてる。

所有権を意識すると何が嬉しいのか?channelにはpanicを発生させる操作がある。

  • nilに対してclose
  • closeしたchannelをclose

この2つは、channelの所有者だけが書き込みとcloseを行うことで防ぎやすくなる。

channelの利用者は読み取り時にchannelが閉じられている可能性があるので、 <- で読み取る時は2つ目の返り値の値を確認するか、 range で読み込むようにする。 <- で値を取り出した時の2つ目の返り値はboolで、channelが閉じられてる場合はfalseになる。

また、chanelが閉じられておらず、値もない場合は、読取り時にブロックすることを考慮する。

channelの所有者を意識しながら、 Broker を使うコードを書いてみる。

package mqtt_test
  
import (
    "testing"

    "github.com/bati11/oreno-mqtt/mqtt"
)

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    sub := make(chan []byte)

    // "broker" goroutine
    go mqtt.Broker(pub, sub)

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

"broker" goroutineは sub channelに対して書き込みと close を行うので sub channelの所有者であり、 pub channelの利用者である。

"pub" goroutineは pub channelの所有者である。

"sub"以降の処理(main goroutine)は、 sub channelの利用者である。値を読み込むのは1回だけなので range ではなく <- で読み取りをしている。

channelは値

Broker を実行するときに sub channelを渡している。 sub channel、つまり Broker が引数で受け取ってる chan<- []byte は、メッセージの書き込み先なのでMQTTのサブスクリプションに該当する。

しかし、サブスクリプションは、この記事の最初の方で見た通り、サーバーがクライアントからSUBSCRIBEパケットを受け取ったタイミングで作られる。なので Broker goroutine生成時ではなく、後からgoroutineに chan<- []byte を渡したい。

Broker の引数を chan<- []byte のchannel、つまり <-chan chan<- []byte とする。channelはファーストクラスな値なので、channelのchannelや、channelの配列というようなこともできる。

channelのchannelを使うとgoroutine起動後に chan<- []byte を受け取ることができる。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    for sub := range subscriptions {
        ...

Publishされたメッセージを全てのサブスクリプションに配送しないといけないので、サブスクリプションを管理しておく必要がある。新たなサブスクリプションchan<- []byte )をchannelから取得したら、channelの配列に追加するようにする。これはサブスクリプションの配列を表す。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }
    ...
}

fromPub channelからメッセージを読み取ったら、サブスクリプションの配列を range を使って順にメッセージを送る。以下のようなイメージ。channelに対する range と配列に対する range がごちゃ混ぜにならないように注意。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte

    for sub := range subscriptions {
        // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
        ss = apend(ss, sub)
    }

    for message := range fromPub {
        // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
        for _, sub := range ss {
            sub <- message
        }
    }
}

しかし、これだと subscriptions channelが閉じられるまで1つ目の for が終わらないのでダメ。

select

複数のチャネルから読み取りたいときは、select を使う。

func Broker(fromPub <-chan []byte, subscriptions <-chan chan<- []byte) {
    // サブスクリプションの配列
    var ss []chan<- []byte
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

TestBroker を書き換えてみる。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE
    sub := make(chan []byte)
    subscriptions <- sub // 新たなサブスクリプションをBrokerに送る

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result, ok := <-sub // BrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result))
    }
}

これでOK!複数クライアントがSubscribeしても問題ないことを確認。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result1) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result1))
    }
    result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
    if !ok {
        t.Fatalf("got %v, want true", ok)
    }
    if string(result2) != "hoge" {
        t.Fatalf("got %v, want \"hoge\"", string(result2))
    }
}

sync.WaitGroup

ところで、"sub"以降の処理で、 sub1 , sub2 という順番にchannelから読み込んでるが、 Broker 内でサブスクリプションchan<- []byte )にメッセージを流す順番に依存してる。仮に sub2 , sub1 と順番を逆にして読み取るとブロックしてテストが終了しない。

sub1sub2select で読み取るようにしても良いけど、それぞれのchannelから1回ずつしかメッセージを読まないので、それぞれgoroutineで実行することにする。

sub1 , sub2 からの読み取りをそれぞれ別goroutineで実行するように変更するだけだと、subからの読み取りが並行で行われるためgoroutineが切り替わらずに TestBroker 自体がすぐに終わってしまい、subからの読み込みのテストができない。そこで、 sync.WaitGroup を使ってそれぞれのgoroutineの終了を待つ。

func TestBroker(t *testing.T) {
    pub := make(chan []byte)
    subscriptions := make(chan chan<- []byte)
    defer close(subscriptions)

    // "broker" goroutine
    go mqtt.Broker(pub, subscriptions)

    // クライアントからのSUBSCRIBE(1つ目)
    sub1 := make(chan []byte)
    subscriptions <- sub1
    // クライアントからのSUBSCRIBE(2つ目)
    sub2 := make(chan []byte)
    subscriptions <- sub2

    // "pub" goroutine
    go func() {
        defer close(pub)
        pub <- []byte("hoge") // クライアントからのPUBLISH
    }()

    // "sub"
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        result1, ok := <-sub1 // 1つ目のサブスクリプションに対してBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result1) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result1))
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        result2, ok := <-sub2 // 2つ目のサブスクリプションに対してもBrokerからPUBLISHメッセージが配送されてくる
        if !ok {
            t.Fatalf("got %v, want true", ok)
        }
        if string(result2) != "hoge" {
            t.Fatalf("got %v, want \"hoge\"", string(result2))
        }
    }()

    wg.Wait()
}

これで1つのメッセージを複数のサブスクリプションに配送する処理ができた。

おしまい

MQTTブローカーをgoroutine, channel, selectを使って実装してみました。この3つを使って色々なパターンを実装できそうです。しかも、GoのランタイムがgoroutineをOSのスレッドにいい感じに割り当ててくれます。

でも、どっかのgoroutineでエラーが起きた時はどうすればいいの?というところを次回はやっていきたいなぁ。

今回の学び

MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof

前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。

目次。

複数クライアントからの接続

クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
Error: Unknown error.

すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error. が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。

具体的にコードで確認する。

// server.go
func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    fmt.Println("server starts at localhost:1883")

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        err = handle(conn)
        if err != nil {
            fmt.Printf("%v", err)
            panic(err)
        }
    }
}

func handle(conn net.Conn) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        ...
    }
}

最初のクライアントから接続が来ると、 Accept() が net.Conn を返してきてhandle内で処理する。handle内では for { } でループすることで、クライアントとやりとりする。

この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { } でループしているので、 Accept() が処理されない。そのため、1つのクライアントしか捌けない。

並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。

1つのクライアントを1つのgoroutineで取り扱うようにする。

--- a/mqtt/server.go
+++ b/mqtt/server.go
@@ -23,11 +23,13 @@ func Run() {
                        panic(err)
                }
 
-               err = handle(conn)
-               if err != nil {
-                       fmt.Printf("%v", err)
-                       panic(err)
-               }
+               go func(conn net.Conn) {
+                       err = handle(conn)
+                       if err != nil {
+                               fmt.Printf("%v", err)
+                               panic(err)
+                       }
+               }(conn)
        }
 }

goroutine

以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。

goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。

スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。


両者にはほかにも次のような細かい違いがあります。

・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない

Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。

こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。

pprof

goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。

pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。

pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。

pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof" してHTTPサーバーを起動するだけ。

 package main
 
-import "github.com/bati11/oreno-mqtt/mqtt"
+import (
+       "log"
+       "net/http"
+       _ "net/http/pprof"
+
+       "github.com/bati11/oreno-mqtt/mqtt"
+)
 
 func main() {
+       go func() {
+               log.Println(http.ListenAndServe("localhost:6060", nil))
+       }()
        mqtt.Run()
 }

http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!

さらに、Go1.10以降ではGUIでグラフを確認することもできる。

以下のコマンドを実行すると

$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1

ブラウザが立ち上がり、こんなグラフが確認できる!

おしまい

goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。

今回の学び