前回、goroutine, channel, selectを使って Broker を実装しました。この Broker をサーバーで利用し、MQTTでメッセージを配送してみます。
その前に <-chan chan<- []byte という型が読みづらいので type を使って対応します。その後、読み書き可能なchannelを、 chan<- , <-chan という型に変換しgoroutineへ渡しメッセージの配送を実現します。
目次
typeで型をつくる
現在の実装だとサブスクリプションを chan<- []byte で表現している。channelのchannel( <-chan chan<- []byte )やchannelの配列( []chan<- []byte )が登場したので少々読みづらい。
type を使って chan<- []byte という型から Subscription という型を作ることにする。
type Subscription chan<- []byte
Broker の実装はこうなる。 <-chan chan<- []byte が <-chan Subscription 、 []chan<- []byte が []Subscription となって読みやすい。
// broker.go package mqtt type Subscription chan<- []byte func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
サーバーに組み込む
[]byteからPublishへ変更
サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker で配送してるメッセージの型を []byte から packet.Publish に変更する。 packet.Publish を生成するための packet.NewPublish() という関数を追加。
func NewPublish(topicName string, message []byte) Publish { variableHeader := NewPublishVariableHeader(topicName) fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message))) return Publish{fixedHeader, variableHeader, message} }
Subscription と Broker の定義を変更する。
-type Subscription chan<- []byte +import "github.com/bati11/oreno-mqtt/mqtt/packet" -func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { +type Subscription chan<- *packet.Publish + +func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for {
server.go
実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。
package mqtt import ( "bufio" "fmt" "io" "net" "github.com/bati11/oreno-mqtt/mqtt/handler" "github.com/bati11/oreno-mqtt/mqtt/packet" ) func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } defer ln.Close() pub := make(chan *packet.Publish) defer close(pub) subscriptions := make(chan Subscription) defer close(subscriptions) go Broker(pub, subscriptions) for { conn, err := ln.Accept() if err != nil { panic(err) } go func() { err = handle(conn, pub, subscriptions) if err != nil { panic(err) } }() } } func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error { defer conn.Close() for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む case packet.PINGREQ: pingresp, err := handler.HandlePingreq(mqttReader) if err != nil { return err } _, err = conn.Write(pingresp.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil } } }
後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。
PUBLISHのハンドリング
PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker へ配送する。
case packet.PUBLISH: - err = handler.HandlePublish(mqttReader) + publish, err := handler.HandlePublish(mqttReader) if err != nil { return err } + publishToBroker <- publish
SUBSCRIBEのハンドリング
SUBSCRIBEのハンドリングでは、Broker からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub 関数を実行する。
                case packet.SUBSCRIBE:
                        suback, err := handler.HandleSubscribe(mqttReader)
                        if err != nil {
                            return err
                        }
                        _, err = conn.Write(suback.ToBytes())
                        if err != nil {
                                return err
                        }
+                       sub := make(chan *packet.Publish)
+                       subscriptionToBroker <- sub
+                       go handleSub(conn, sub)
sub というchannelを生成している。このchannelは2つのgoroutineに渡す。
- 1: subscriptionToBrokerchannelに書き込むことでBrokergoroutineに渡す。受け取り側では、型がchan *packet.Publishからchan<- *packet.Publish(つまりSubscription)になる
- 2: handleSub関数に引数でsubを渡す。handleSub関数では<-chan *packet.Publishとして引数の型を定義しておく
これで Broker から配送されてくるメッセージを handleSub goroutineで受け取れる。
channelの所有権の観点から見ると、 Broker goroutineが sub channelの所有者であり、 handleSub goroutineが sub channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish )と読み取り専用channel( <-chan *packet.Publish )は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。
handleSub 関数の実装は以下。
func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) { for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { // FIXME fmt.Println(err) } } }
Broker から配送されてくるメッセージを channelから読み込んで、 net.Conn に書き込む。 conn に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。
動かす
実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。
$ mosquitto_sub -i custom-client-id -t hoge
mosquittoクライアントでパブリッシュする。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
サブスクライバの方に "Hello" と表示された!
$ mosquitto_sub -i custom-client-id -t hoge Hello
ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。
Topicのフィルタを無視していますが、そのうち・・・。
おしまい
ここまでの実装は以下にあります(tagが0.0.2)。
server.goで sub というchannelを生成し Broker goroutineへ渡しました。 Broker goroutineはこのchannelの所有者ですが、channelを close していません。サブスクライバへ送信するために net.Conn に書き込みましたが、このときエラーが発生した場合は Broker goroutineでchannelを close したいです。
次回は、 handleSub goroutineで発生するエラーのハンドリングを実装してみたいと思います。
今回の学び