Kiba は 1 つずつではなく「一括」宛先をサポートできますか?
概要
Kiba のコアは行を 1 つずつ処理することであると理解しています。そして、目的のステップまでこれが必要です。 変換されたデータを Kafka トピックにプッシュしたいのですが、個別ではなく一括で行うことをお勧めします。これは可能でしょうか?
次のクラスがあると仮定すると、
class TransactionProducer
def intialize(data: [])
@data = data
end
def push_to_kafka
$kafka.push(data)
end
end
post_process を使用し、変換されたデータを配列に保存することでこれが可能だと思います。
data = []
job = Kiba.parse do
source MySource, source_config
transform do |row|
row = Transform...
data << row
end
post_process do
TransactionProducer.new(data).push_to_kafka
end
end
しかし、別の方法があるのではないかと考えています。
解決策
これには post_process を使用することもできますが、代わりに宛先が close メソッドを実装できるという事実を活用することをお勧めします (https://github.com/thbar/kiba/wiki/Implementing-ETL-destinations を参照)。これをターゲットに「バッファアウト」するために使用します (https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 の集約変換で示されているものと少し似ています)。
行数が多い場合は、write と close の両方を使用して、指定された行数に達したらすぐにバッファーをフラッシュすることもできます (ただし、close 呼び出しに残っているすべての行を必ずフラッシュしてください)。