生涯未熟

生涯未熟

プログラミングをちょこちょこと。

saramaでAsyncProducerを使う時に気を付けなければいけないたったひとつのこと

AsyncCloseを使うな


どういうこと?

AsyncProducerには

  • AsyncClose
  • Close

の2種類がある。

なんの違いが?

それぞれのコメント読みましょ。

AsyncClose

AsyncClose triggers a shutdown of the producer. The shutdown has completed when both the Errors and Successes channels have been closed. When calling AsyncClose, you must continue to read from those channels in order to drain the results of any messages in flight.


AsyncCloseがプロデューサのシャットダウンをトリガします。 エラーと成功の両方のチャネルが閉じられたときにシャットダウンが完了しました。 AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。

Close

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.


Closeはプロデューサをシャットダウンし、バッファされたメッセージがフラッシュされるのを待ちます。 プロデューサオブジェクトがスコープの外に出る前に、この関数を呼び出す必要があります。 基になるクライアントでCloseを呼び出す前にこれを呼び出す必要があります。

よくわからん🤔

コードを読もう

AsyncClose

func (p *asyncProducer) AsyncClose() {
    go withRecover(p.shutdown)
}

withRecover()shutdown というメソッドを引数にとって実行しています。

func withRecover(fn func()) {
    defer func() {
        handler := PanicHandler
        if handler != nil {
            if err := recover(); err != nil {
                handler(err)
            }
        }
    }()

    fn()
}

単純にpanicしたときにrecoverしてerrorを吐き出してる感じのdeferをつけて、引数のメソッドを実行しているだけですね。

func (p *asyncProducer) shutdown() {
    Logger.Println("Producer shutting down.")
    p.inFlight.Add(1)
    p.input <- &ProducerMessage{flags: shutdown}

    p.inFlight.Wait()

    if p.ownClient {
        err := p.client.Close()
        if err != nil {
            Logger.Println("producer/shutdown failed to close the embedded client:", err)
        }
    }

    close(p.input)
    close(p.retries)
    close(p.errors)
    close(p.successes)
}

で、肝心のshutdown処理なのですが、一度シャットダウンのための値を p.input に渡して、Producer上で実行中のメッセージの受け渡しが終わり次第、Producerをcloseしようとしています。

Close

func (p *asyncProducer) Close() error {
    p.AsyncClose()

    if p.conf.Producer.Return.Successes {
        go withRecover(func() {
            for range p.successes {
            }
        })
    }

    var errors ProducerErrors
    if p.conf.Producer.Return.Errors {
        for event := range p.errors {
            errors = append(errors, event)
        }
    } else {
        <-p.errors
    }

    if len(errors) > 0 {
        return errors
    }
    return nil
}

1行目から AsyncClose 呼んでますね・・・
その後は、brokerへのメッセージ送信の成否を示す SuccessesErrors をClose側でhandlingしています。
p.successesp.errors で値を受け取り続けているのは、最初の説明で書いてあった "AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。" に従っているわけですね。
この飛行中ってのはイコール、メッセージの送信中って意味で、それを表しているのが inFlight というわけです。

inFlight 自体は sync.WaitGroup で、メッセージを送信する際に Add(1) , 送信し終わったら Done() します。
今回で言うと、 shutdown 処理の際に、安全にシャットダウンするために inFlight を使ってますね。

まとめ

というわけで、 AsyncClose のみを使用するときには結局 Close でやっているような処理を実装しなくてはならないので、それなら Close 使った方がいいよ。という結論に至りました。
なら何故 AsyncClose は外部からも読めるようになってるんだ?という感じもするかもしれませんが、 p.successes とかは処理内容がNoopなので、自分で色々やりたい人はどうぞ、っていうキャパシティを残してるんですかね?🤔