Techioz Blog

Kiba は 1 つずつではなく「一括」宛先をサポートできますか?

概要

Kiba のコアは行を 1 つずつ処理することであると理解しています。そして、目的のステップまでこれが必要です。 変換されたデータを Kafka トピックにプッシュしたいのですが、個別ではなく一括で行うことをお勧めします。これは可能でしょうか?

次のクラスがあると仮定すると、

class TransactionProducer
  def intialize(data: [])
    @data = data
  end

  def push_to_kafka
    $kafka.push(data)
  end
end

post_process を使用し、変換されたデータを配列に保存することでこれが可能だと思います。

data = []

job = Kiba.parse do
  source MySource, source_config

  transform do |row|
    row = Transform...
    data << row
  end

  post_process do
    TransactionProducer.new(data).push_to_kafka
  end
end

しかし、別の方法があるのではないかと考えています。

解決策

これには post_process を使用することもできますが、代わりに宛先が close メソッドを実装できるという事実を活用することをお勧めします (https://github.com/thbar/kiba/wiki/Implementing-ETL-destinations を参照)。これをターゲットに「バッファアウト」するために使用します (https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 の集約変換で示されているものと少し似ています)。

行数が多い場合は、write と close の両方を使用して、指定された行数に達したらすぐにバッファーをフラッシュすることもできます (ただし、close 呼び出しに残っているすべての行を必ずフラッシュしてください)。