AsyncCloseを使うな
どういうこと?
AsyncProducerには
- AsyncClose
- Close
の2種類がある。
なんの違いが?
それぞれのコメント読みましょ。
AsyncClose
AsyncClose triggers a shutdown of the producer. The shutdown has completed when both the Errors and Successes channels have been closed. When calling AsyncClose, you must continue to read from those channels in order to drain the results of any messages in flight.
AsyncCloseがプロデューサのシャットダウンをトリガします。 エラーと成功の両方のチャネルが閉じられたときにシャットダウンが完了しました。 AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。
Close
Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.
Closeはプロデューサをシャットダウンし、バッファされたメッセージがフラッシュされるのを待ちます。 プロデューサオブジェクトがスコープの外に出る前に、この関数を呼び出す必要があります。 基になるクライアントでCloseを呼び出す前にこれを呼び出す必要があります。
よくわからん🤔
コードを読もう
AsyncClose
func (p *asyncProducer) AsyncClose() { go withRecover(p.shutdown) }
withRecover()
を shutdown
というメソッドを引数にとって実行しています。
func withRecover(fn func()) { defer func() { handler := PanicHandler if handler != nil { if err := recover(); err != nil { handler(err) } } }() fn() }
単純にpanicしたときにrecoverしてerrorを吐き出してる感じのdeferをつけて、引数のメソッドを実行しているだけですね。
func (p *asyncProducer) shutdown() { Logger.Println("Producer shutting down.") p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} p.inFlight.Wait() if p.ownClient { err := p.client.Close() if err != nil { Logger.Println("producer/shutdown failed to close the embedded client:", err) } } close(p.input) close(p.retries) close(p.errors) close(p.successes) }
で、肝心のshutdown処理なのですが、一度シャットダウンのための値を p.input
に渡して、Producer上で実行中のメッセージの受け渡しが終わり次第、Producerをcloseしようとしています。
Close
func (p *asyncProducer) Close() error { p.AsyncClose() if p.conf.Producer.Return.Successes { go withRecover(func() { for range p.successes { } }) } var errors ProducerErrors if p.conf.Producer.Return.Errors { for event := range p.errors { errors = append(errors, event) } } else { <-p.errors } if len(errors) > 0 { return errors } return nil }
1行目から AsyncClose
呼んでますね・・・
その後は、brokerへのメッセージ送信の成否を示す Successes
と Errors
をClose側でhandlingしています。
p.successes
も p.errors
で値を受け取り続けているのは、最初の説明で書いてあった "AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。" に従っているわけですね。
この飛行中ってのはイコール、メッセージの送信中って意味で、それを表しているのが inFlight
というわけです。
inFlight
自体は sync.WaitGroup
で、メッセージを送信する際に Add(1)
, 送信し終わったら Done()
します。
今回で言うと、 shutdown
処理の際に、安全にシャットダウンするために inFlight
を使ってますね。
まとめ
というわけで、 AsyncClose
のみを使用するときには結局 Close
でやっているような処理を実装しなくてはならないので、それなら Close
使った方がいいよ。という結論に至りました。
なら何故 AsyncClose
は外部からも読めるようになってるんだ?という感じもするかもしれませんが、 p.successes
とかは処理内容がNoopなので、自分で色々やりたい人はどうぞ、っていうキャパシティを残してるんですかね?🤔