GAE Go1.11 のdev_appserver.pyでホットリロードが効かないとき

GAE Go1.11 の dev_appserver.py でホットリロードが効かないなーというとき、dev_appserver.py実行時に以下のメッセージが出力されてました。最後の1行が出力されているとホットリロードが効きません。

$ dev_appserver.py dev.yaml
INFO     2019-09-22 10:30:40,611 api_server.py:275] Starting API server at: http://localhost:51877
INFO     2019-09-22 10:30:40,613 instance_factory.py:170] Building with dependencies from GOPATH.
INFO     2019-09-22 10:30:40,618 dispatcher.py:256] Starting module "sample" running at: http://localhost:8080
INFO     2019-09-22 10:30:40,621 admin_server.py:150] Starting admin server at: http://localhost:8000
WARNING  2019-09-22 10:30:40,621 devappserver2.py:373] No default module found. Ignoring.
/Users/hoge/local/google-cloud-sdk/platform/google_appengine/google/appengine/tools/devappserver2/mtime_file_watcher.py:182: UserWarning: There are too many files in your application for changes in all of them to be monitored. You may have to restart the development server to see some changes to your files.

監視ファイル対象が多すぎるのが原因らしい。node_modulesとかホットリロードの対象にしなくて良いファイル/ディレクトリ除外して対応したいです。

結果

以下のように実行すればOKでした。

$ GO111MODULE=on dev_appserver.py dev.yaml --watcher_ignore_re ".*/node_modules/.*"
  • --watcher_ignore_re オプションで正規表現を使ってプロジェクト内で監視対象が除きたいものを指定できます
  • GO111MODULE=on を明示的に指定してないとdev_appserver.pyでは $GOPATH/src 配下のファイルたちも監視対象としてしまうようです( GO111MODULE=auto もダメ)

けど、公式ドキュメントを読むとGo1.11からはローカルで動かすときにdev_appserver.pyの話は出てこなく go run と書いてあるからホットリロードは別の方法を検討しすべきなのかなぁ。

cloud.google.com

結果に行き着くまでの流れ

--watcher_ignore_re

無視するオプションないかなぁと思って確認すると --watcher_ignore_re というオプションがあるようだ。

$ dev_appserver.py --help
...
  --watcher_ignore_re WATCHER_IGNORE_RE
                        Regex string to specify files to be ignored by the
                        filewatcher. (default: None)
...

実行コマンドを以下のようにしてみた。

$ dev_appserver.py --watcher_ignore_re ".*/node_modules/.*"
...
/Users/hoge/local/google-cloud-sdk/platform/google_appengine/google/appengine/tools/devappserver2/mtime_file_watcher.py:182: UserWarning: There are too many files in your application for changes in all of them to be monitored. You may have to restart the development server to see some changes to your files.

まだダメだ...。

GO111MODULE=on

なぜだろうと dev_appserver.py のコードを読み始める。Printデバッグしながら頑張る。

まず、dev_appserver.pyからは <gcloudをインストールしたディレクトリ>/platform/google_appengine/google/appengine/tools/devappserver2/devappserver.py を実行してる。

ここからはdevappserver.py と同じディレクトリのファイルを見ていく。

devappserver.py => dispatcher.py (Dispatcherをインスタンス化) => dispatcher.py (startメソッド)という流れで処理が行われる。

dispatcher.py

dispatcher.pyの252行目。このメッセージは見たことがある。dev_appserver.pyを実行した時に3行目に出力されていたログメッセージだ。

log_message = 'Starting module "%s" running at: http://%s' % (
          module_configuration.module_name, _service.balanced_address)

その前の357行目。

    module_instance = module_class(

この部分でGAEのmodule(現在はserviceと呼ばれる)を表すオブジェクトを生成しているようだ。Printデバッグしたところ module_classmodule.AutoScalingModule だったのでコードを読み進める。

module.py

module.py の1251行目。 Module クラスを継承してる。

class AutoScalingModule(Module):

1309行目。 __init__ の中。

    super(AutoScalingModule, self).__init__(**kwargs)

親クラスを見る。フィールドを初期化してるだけのコンストラクタだった。

dispatcher.pyに戻る。

      _service = self._create_module(module_configuration, service_port,
                                     ssl_port)
      _service.start()

インスタンスを生成して start メソッドを呼んでるので AutScalingModuleの start を確認。

  def start(self):
    """Start background management of the Module."""
    self._balanced_module.start()
    self._port_registry.add(self.balanced_port, self, None)
    if self._ssl_port:
      self._port_registry.add(self._ssl_port, self, None)
    if self._watcher:
      self._watcher.start()
    self.report_start_metrics()
    self._instance_adjustment_thread.start()

self._balanced_module.start() を呼んでる。あと self._watcher.start() も呼んでる。

ファイルが多すぎるとメッセージを出力していたのは mtime_file_watcher.py だった。 self._watcher は怪しい。self._watcher はなんだったか。

再びmodule.pyへ。

module.pyの600行目。ここで self._watcher をセットしてる。

    if self._automatic_restarts:
      self._watcher = file_watcher.get_file_watcher(
          [self._module_configuration.application_root] +
          self._instance_factory.get_restart_directories(),
          self._use_mtime_file_watcher)
      if hasattr(self._watcher, 'set_watcher_ignore_re'):
        self._watcher.set_watcher_ignore_re(self._watcher_ignore_re)
      if hasattr(self._watcher, 'set_skip_files_re'):
        self._watcher.set_skip_files_re(self._module_configuration.skip_files)
    else:
      self._watcher = None

file_watcher.get_file_watcher( ここだ!

file_watcher.py

file_watcher.pyの151行目

def get_file_watcher(directories, use_mtime_file_watcher):

この directories をprintしてみると

['/Users/hoge/work/sample-app', '/Users/hoge/go/src']

むむ、思ってるのと違うディレクトリが混ざっている。 /Users/hoge/go/src ...。これは GOPATH/src だな。これが監視対象ならそりゃ監視対象となるファイル数が多くなってしまう。どこで紛れ込んだのだろう。

またまたmodule.pyへ

さっきのmodule.pyに戻る。 get_file_watcherdirecotories 引数を指定している部分を見てみる。

[self._module_configuration.application_root] + self._instance_factory.get_restart_directories()

1つ目のリストが '/Users/hoge/work/sample-app' だから、おそらく self._instance_factory.get_restart_directories() これが怪しい。これが '/Users/hoge/go/src' を生成してるのではないか。

self._instance_factory は何か。 get_file_watcher を呼び出し箇所のすぐ上、597行目。

    self._instance_factory = self._create_instance_factory(
        self._module_configuration)

_create_instance_factory をチェック。193行目。

  def _create_instance_factory(self,
                               module_configuration):

追っていくと、runtime_factories.goの FACTORIES という辞書でruntimeごとに定義されている!

FACTORIES = {
    'go': go_factory.GoRuntimeInstanceFactory,
    'go111': go_factory.GoRuntimeInstanceFactory,
    'php55': php_factory.PHPRuntimeInstanceFactory,
    'php72': php_factory.PHPRuntimeInstanceFactory,
    'python': python_factory.PythonRuntimeInstanceFactory,
    'python37': python_factory.PythonRuntimeInstanceFactory,
    'python27': python_factory.PythonRuntimeInstanceFactory,
    'python-compat': python_factory.PythonRuntimeInstanceFactory,
    'custom': custom_factory.CustomRuntimeInstanceFactory,
}

go/instance_factory.py

今回のruntimeはgo111なので、 go_factory.GoRuntimeInstanceFactory_instance_factory の実体だ。知りたかったのは self._instance_factory.get_restart_directories() なので確認。

...devappserver2/go/instance_factory.pyに get_restart_directories() を発見。

    # Go < 1.11 should always watch GOPATH
    # Go == 1.11 should only watch GOPATH if GO111MODULE != on
    # Go > 1.11 should only watch go.mod dir
    go_mod_dir = self._find_go_mod_dir(
        self._module_configuration.application_root)
    if os.getenv('GO111MODULE', '').lower() == 'on' and go_mod_dir:
      logging.info('Building with dependencies from go.mod.')
      return [go_mod_dir]
    logging.info('Building with dependencies from GOPATH.')

あー、この 'Building with dependencies from GOPATH.' というログもdev_appserver.pyを起動する時に見るやつ。

ん、ということはGO111MODULE=autoに対応してないのでは?とよく読むとコメントに書いてるー!

# Go == 1.11 should only watch GOPATH if GO111MODULE != on

ということで、以下のようなコマンドでdev_appserver.pyを実行する。

$ GO111MODULE=on dev_appserver.py local.yaml --watcher_ignore_re ".*/node_modules/.*"
INFO     2019-09-22 11:08:14,068 api_server.py:275] Starting API server at: http://localhost:52385
INFO     2019-09-22 11:08:14,070 instance_factory.py:168] Building with dependencies from go.mod.
INFO     2019-09-22 11:08:14,075 dispatcher.py:256] Starting module "sample" running at: http://localhost:8080
INFO     2019-09-22 11:08:14,079 admin_server.py:150] Starting admin server at: http://localhost:8000
WARNING  2019-09-22 11:08:14,079 devappserver2.py:373] No default module found. Ignoring.

キター!ホットリロードもできました!

おしまい

ちなみに、GO111MODULE=on だけでもいけるんじゃないかと思って試しましたが、今回は --watcher_ignore_re ".*/node_modules/.*" も必要でした。

SQLアンチパターン・ジェイウォークのクエリをシェルでやる

SQLアンチパターンという本があります。 その本の1章がジェイウォーク(信号無視)。ジェイウォークで紹介されているようなデータがtsvファイルとして手元にある場合に、SQLではなくシェルでなんとかするお話です。

SQLアンチパターン

SQLアンチパターン

試しに使ったMySQLのバージョンは5.7です。

ジェイウォーク

製品テーブルとアカウントテーブルがあり、製品ごとに複数人の担当者(アカウント)がいる、とする

  CREATE TABLE `accounts` (
    `account_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
    `account_name` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
    PRIMARY KEY (`account_id`),
    UNIQUE KEY `account_id` (`account_id`)
  )
  
  CREATE TABLE `products` (
    `product_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
    `product_name` varchar(1000) COLLATE utf8mb4_bin DEFAULT NULL,
    `account_id` varchar(100) COLLATE utf8mb4_bin DEFAULT NULL,
    PRIMARY KEY (`product_id`),
    UNIQUE KEY `product_id` (`product_id`)
  )
  mysql> SELECT * FROM products;
  +------------+---------------------+------------+
  | product_id | product_name        | account_id |
  +------------+---------------------+------------+
  |          1 | Visual TurboBuilder | 12,34      |
  |          2 | hoge fuga           | 555,666    |
  +------------+---------------------+------------+
  
  mysql> SELECT * FROM accounts;
  +------------+--------------+
  | account_id | account_name |
  +------------+--------------+
  |         12 | taro         |
  |         34 | hanako       |
  |        555 | goro         |
  +------------+--------------+

製品テーブルのaccount_idカラムに、複数のアカウントIDをカンマ区切りの文字列として保持するアンチパターン。

tsvファイル

同じようなtsvファイルがあるとする。

$ cat products.tsv
product_id  product_name    account_id
1  Visual TurboBuilder 12,34
2  hoge fuga   555,666,777

$ cat accounts.tsv
account_id  account_name
12 taro
34 hanako
555    goro

SQL vs シェル

特定のアカウントに関連する製品の検索

account_id=12に関連するproductsを検索する。

SQLアンチパターンで紹介されているSQLは以下。正規表現を使った WHERE 句の指定がなかなか強烈...。

SELECT * FROM products;
+------------+---------------------+-------------+
| product_id | product_name        | account_id  |
+------------+---------------------+-------------+
|          1 | Visual TurboBuilder | 12,34       |
|          2 | hoge fuga           | 555,666,777 |
+------------+---------------------+-------------+
SELECT * FROM products WHERE account_id REGEXP '[[:<:]]12[[:>:]]';
+------------+---------------------+------------+
| product_id | product_name        | account_id |
+------------+---------------------+------------+
|          1 | Visual TurboBuilder | 12,34      |
+------------+---------------------+------------+

対して、シェルでやってみる。 NR で行番号を取得してヘッダー行かそれ以外の行かで分岐、ヘッダー行以外の場合は $3 (3カラム目の値)と正規表現を使って行を絞り込む。

$ cat products.tsv
product_id  product_name    account_id
1   Visual TurboBuilder 12,34
2   hoge fuga   555,666,777
$ cat products.tsv | awk -F '\t' '{
>     if (NR == 1) { print }         #ヘッダー行を出力
>     else if ($3 ~ /12/) { print }  #ヘッダー行以外は account_id=12 で絞り込み
> }'
product_id  product_name    account_id
1   Visual TurboBuilder 12,34

ヘッダー行の出力をなんとかして if をなくしたいなぁ、いい方法ないだろうか?

特定の製品に関連するアカウントの検索

product_id=1に関連するアカウント情報の一覧を取得する。

SQLアンチパターンで紹介されているSQLは以下。

SELECT * FROM products;
+------------+---------------------+-------------+
| product_id | product_name        | account_id  |
+------------+---------------------+-------------+
|          1 | Visual TurboBuilder | 12,34       |
|          2 | hoge fuga           | 555,666,777 |
+------------+---------------------+-------------+

SELECT * FROM accounts;
+------------+--------------+
| account_id | account_name |
+------------+--------------+
|         12 | taro         |
|         34 | hanako       |
|        555 | goro         |
+------------+--------------+
SELECT *
  FROM products AS p
 INNER JOIN accounts AS a
         ON p.account_id REGEXP CONCAT('[[:<:]]', a.account_id, '[[:>:]]')
 WHERE p.product_id = 1;
+------------+---------------------+------------+------------+--------------+
| product_id | product_name        | account_id | account_id | account_name |
+------------+---------------------+------------+------------+--------------+
|          1 | Visual TurboBuilder | 12,34      |         12 | taro         |
|          1 | Visual TurboBuilder | 12,34      |         34 | hanako       |
+------------+---------------------+------------+------------+--------------+

シェルでやってみる。

$ cat products.tsv
product_id  product_name    account_id
1   Visual TurboBuilder 12,34
2   hoge fuga   555,666,777

$ cat accounts.tsv
account_id  account_name
12  taro
34  hanako
555 goro

ヘッダー行の取り扱いはさっきと同じ。 split を使ってカンマ区切りの文字列を配列にセット、配列に対するループ内で print することで、account_id毎の行に展開する。結果は、tmp_product.tsvというファイルに書き込んでおく。

$ cat products.tsv | awk -F '\t' '{
>     if (NR == 1) { print }                 #ヘッダー行を出力
>     else if ($1 == 1) {                    #ヘッダー行以外は product_id=1 で絞り込み
>         split($3, arr, ",");               #splitでカンマで分割
>         for (i in arr) {
>             print $1 "\t" $2 "\t" arr[i]   #複数行に展開
>         }
>     }
> }' > tmp_product.tsv

$ cat tmp_product.tsv
product_id  product_name    account_id
1   Visual TurboBuilder 12
1   Visual TurboBuilder 34

作成したtmp_product.tsvとaccounts.tsvとを、account_id列でjoinする。タブ文字区切りにしたいので -t オプションを使うが "\t" という指定をしてもタブ文字として扱ってくれないので、bashの ${string}の記法を使って指定する(この記法は何か呼び名はあるのかな?)。

$ join --header -t $'\t' -1 3 -2 1 tmp_product.tsv accounts.tsv 
account_id  product_id  product_name    account_name
12  1   Visual TurboBuilder taro
34  1   Visual TurboBuilder hanako

できた!

ワンライナーで書きたい場合は、bashだとプロセス置換を使ってjoinコマンドで読む。

$ join --header -t $'\t' -1 3 -2 1 <(cat products.tsv | awk -F '\t' '{ if (NR == 1) { print } else if ($1 == 1) { split($3, arr, ","); for (i in arr) { print $1 "\t" $2 "\t" arr[i] }}}') accounts.tsv 
account_id  product_id  product_name    account_name
12  1   Visual TurboBuilder taro
34  1   Visual TurboBuilder hanako

集約クエリ

product毎に関連するaccountの数を取得する。

SQLアンチパターンで紹介されてるクエリは以下。文字列長からカンマ以外の文字列長を引くことでカンマの数を取得して +1 する。これもなかなか強烈。

SELECT product_id, LENGTH(account_id) - LENGTH(REPLACE(account_id, ',', '')) + 1 AS contracts_per_product
  FROM products;
+------------+-----------------------+
| product_id | contracts_per_product |
+------------+-----------------------+
|          1 |                     2 |
|          2 |                     3 |
+------------+-----------------------+

これはシェルなら簡単。awkのsplitが配列の要素の数を返してくれるので、それを出力すればOK!

$ cat products.tsv | awk -F '\t' '
> BEGIN { print "product_id" "\t" "contracts_per_product"} #BEGINでヘッダー行を出力する
> NR > 1 {
>     n = split($3, arr, ",");
>     print $1 "\t" n
> }'
product_id  contracts_per_product
1   2
2   3

おしまい

シェルはおもしろいですね!

MQTTサーバーを実装しながらGoを学ぶ - その12 Contextを使ったgoroutineの停止

前回、別goroutineで発生したエラーハンドリングをしました。具体的にはサブスクライバへの書き込みでエラーが発生した場合にサブスクリプションの削除処理するようにしました。今回は、 context.Context を使ってgoroutineを停止することで、クライアントが接続を切ったタイミングでサブスクリプションの削除をするようにしてみます。

目次。

今のgoroutine

現状の実装だと、クライアントからSUBSCRIBEパケットが送信されてくると、いくつかのgoroutineが生成され以下のような親子関係になる。

https://i.gyazo.com/6bf617a5406e91b099915ce86e0aff09.png

  • main
    • サーバーのメインgoroutine
    • Run 関数内の conn, err := ln.Accept() でブロックしてる
  • Broker
    • "main" goroutineによって1つだけ生成される
    • パブリッシャからのメッセージをサブスクリプションへ配送するgoroutine
    • Broker 関数内の for ... select ... で無限ループしてる
  • handle
    • "main" goroutineによってクライアントからのTCP接続がある度に生成される
    • MQTTパケットを受け取り処理する
    • handle 関数内から mqttReader.ReadPacketType() の呼び出し、最終的には net.Conn に対する Read でブロックしてる
  • handleSub
    • "handle" goroutineによってクライアントからのSUBSCRIBEパケットにより生成される
    • Broker からchannel経由でPUBLISHを受け取り、 net.Conn に書き込む
    • channelの読み取りでブロックしてる
  • handle内の無名関数
    • "handle" goroutineによってクライアントからのSUBSCRIBEパケットにより生成される
    • handleSub 関数で発生した error をerror channel経由で読み取り Broker に伝える
    • channelの読み取りでブロックしてる

net.Conn に対する Read をしているのは"handle" goroutineなので、サブスクライバが切断したことは"handle" goroutineで処理できるはず。

goroutineリーク

サブスクライバが切断、つまりmosquitto_subをCtrl-Cで終了したとき、handleSubのgoroutineが減らない。

2つmosquitto_subを実行してる状態でpprofを使いgoroutieの状態を見てみる。pprofは導入済みなので http://localhost:6060/debug/pprof/goroutine?debug=1 にアクセスすれば良い。

結果の一部は以下。

2 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327a6b 0x1328457 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327a6a   github.com/bati11/oreno-mqtt/mqtt.handle+0x13a                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:54
#   0x1328456   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:38

2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x1328501 0x105ca31
#   0x1328500   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0x40 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:91

2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x132865c 0x105ca31
#   0x132865b   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0x6b  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:118

3つのブロックがあるが、最初のブロックは packet.(*MQTTReader).ReadPacketType+0x76 とあり mqttReader.ReadPacketType() の呼び出しのことなので "handle" goroutineである。先頭の 2 という数値から "handle" goroutineが2つ存在してることになる。

2つめのブロックは、"handle" goroutineの数は 1 に減っているが、"handle 関数内の無名関数" goroutine、3つめのブロックは "handleSub" goroutineである。2つサブスクライバがいるので、goroutineの数もそれぞれ2つである。

1つCtrl-Cで終了してから再度状態を確認。"handle内の無名関数" goroutineと"handleSub" goroutineが減ってない。

goroutine profile: total 10
2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x1328501 0x105ca31
#   0x1328500   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0x40 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:91

2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x132865c 0x105ca31
#   0x132865b   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0x6b  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:118

1 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327a6b 0x1328457 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327a6a   github.com/bati11/oreno-mqtt/mqtt.handle+0x13a                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:54
#   0x1328456   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:38

goroutineが残り続けてしまうことを、goroutineリークというらしい。

"handleSub" goroutineの親goroutineである "handle" goroutineが終了するときに、子goroutineを終了させたい。

おなじみの「Go言語による並行処理」に以下のように書いてある。

Go言語による並行処理

Go言語による並行処理

  • 作者: Katherine Cox-Buday,山口能迪
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2018/10/26
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

4.3 ゴルーチンリークを避ける

もしあるゴルーチンがゴルーチンの生成の責任を持っているのであれば、そのゴルーチンを停止できるようにする責任もあります。

書籍では done チャネルを使った方式と、それがGo1.7からは context パッケージとして標準化されたことが書いてある。

contextパッケージ

context.Contextを使う。

context.Contextを使うことで、親goroutineから子goroutineを停止することができる。

diff --git a/mqtt/server.go b/mqtt/server.go
index 5f53e6f..a050e0a 100644
--- a/mqtt/server.go
+++ b/mqtt/server.go
@@ -2,6 +2,7 @@ package mqtt
 
 import (
        "bufio"
+       "context"
        "fmt"
        "io"
        "net"
@@ -48,6 +49,9 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT
 
        var clientID string
 
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        for {
                r := bufio.NewReader(conn)
                mqttReader := packet.NewMQTTReader(r)
@@ -85,16 +89,21 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT
                        if err != nil {
                                return err
                        }
-                       subscription, errCh := handleSub(clientID, conn)
+                       subscription, errCh := handleSub(ctx, clientID, conn)
                        subscriptionToBroker <- subscription
-                       go func() {
-                               err, ok := <-errCh
-                               if !ok {
-                                       return
+                       go func(ctx context.Context) {
+                               var result *DoneSubscriptionResult
+                               select {
+                               case <-ctx.Done():
+                                       result = NewDoneSubscriptionResult(subscription.clientID, nil)
+                               case err, ok := <-errCh:
+                                       if !ok {
+                                               return
+                                       }
+                                       result = NewDoneSubscriptionResult(subscription.clientID, err)
                                }
-                               done := NewDoneSubscriptionResult(subscription.clientID, err)
-                               doneSubscriptions <- done
-                       }()
+                               doneSubscriptions <- result
+                       }(ctx)
                case packet.PINGREQ:
                        pingresp, err := handler.HandlePingreq(mqttReader)
                        if err != nil {
@@ -110,16 +119,24 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT
        }
 }
 
-func handleSub(clientID string, conn net.Conn) (*Subscription, <-chan error) {
+func handleSub(ctx context.Context, clientID string, conn net.Conn) (*Subscription, <-chan error) {
        errCh := make(chan error)
        subscription, pubFromBroker := NewSubscription(clientID)
        go func() {
                defer close(errCh)
-               for publishMessage := range pubFromBroker {
-                       bs := publishMessage.ToBytes()
-                       _, err := conn.Write(bs)
-                       if err != nil {
-                               errCh <- err
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case publishedMessage, ok := <-pubFromBroker:
+                               if !ok {
+                                       return
+                               }
+                               bs := publishedMessage.ToBytes()
+                               _, err := conn.Write(bs)
+                               if err != nil {
+                                       errCh <- err
+                               }
                        }
                }
        }()

試してみる。

2つsub

goroutine profile: total 12
2 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327abc 0x1328527 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327abb   github.com/bati11/oreno-mqtt/mqtt.handle+0x18b                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:58
#   0x1328526   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:39

2 @ 0x102f20b 0x103ef16 0x132865f 0x105ca31
#   0x132865e   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0xce /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:95

2 @ 0x102f20b 0x103ef16 0x1328890 0x105ca31
#   0x132888f   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0xff  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:127

1つをCtrl-Cで閉じる

goroutine profile: total 8
1 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327abc 0x1328527 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327abb   github.com/bati11/oreno-mqtt/mqtt.handle+0x18b                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:58
#   0x1328526   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:39

1 @ 0x102f20b 0x103ef16 0x132865f 0x105ca31
#   0x132865e   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0xce /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:95

1 @ 0x102f20b 0x103ef16 0x1328890 0x105ca31
#   0x132888f   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0xff  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:127

ちゃんと"handle内の無名関数" goroutineと"handleSub" goroutineが減ってる。

おしまい

context.Context を使って、子goroutineを停止させることができました。 ctx をどんどん渡していけば孫goroutineやひ孫goroutineなども同じように停止させることができます。Goでは複数の並行処理を協調させる仕組みが色々あって良いですね!

コードはこちら。

github.com

今回の学び。