Kiba ETL とファラデーのリクエスト
概要
私は CDC のコンセプトを持ったプロジェクトに取り組んでいます。データベースから変更を読み取り、イベントを Rabbitmq キューにプッシュします (debezium を使用します)。
その後、KibaETL を使用して、そのキューから消費されるイベント メッセージを処理します。
KibaETL には 3 つの主要なプロセスがあります
奇妙な部分は。ファラデーを呼び出すと、サイレントに kiba etl パイプラインが終了します。エラーは表示されません….
これが私の変換クラスのスニペットです。
#frozen_string_literal: true
require 'faraday'
require 'faraday/net_http'
require 'json'
require 'kiba'
class RetrieveJobDataTransform
def process(row)
puts "\t[transform] Retrieve Job Information by UUID - job_uuid: #{row}"
raise ArgumentError, 'row cannot be nil' if row.nil?
job_uuid = row.dig(:after_data, 'job_uuid')
api_result = make_api_call(job_uuid)
transformed_data = transform_data(row, api_result)
transformed_data
end
private
def make_api_call(job_uuid)
conn = create_connection('https://url.com')
**# before conn.get call, it shows output properly**
response = conn.get("api/jobs/#{job_uuid}")
**# here it doesnt show any output when I was debugging**
if response.success?
JSON.parse(response.body)
else
nil
end
end
def transform_data(row, api_result)
row = api_result
end
def create_connection(api_url)
Faraday.new(api_url) do |f|
f.headers['x-api-key'] = 'api key'
f.response :json
f.adapter :net_http
f.options.timeout = 120
end
end
end
ETLを処理するために呼び出されるメインクラス
job = Kiba.parse do
source KibaSource, bunny_adapter, queue_name
transform ParseMessageTransform
transform ValidateMessageTransform
**#This is the class that silently quit with no error or warnings**
transform RetrieveJobDataTransform
destination KibaDestination
end
puts '# job started - Running Kiba job... '
Kiba.run(job)
奇妙な動作は、「irb」と入力してそのクラスコードを追加し、kiba etl パイプライン内に入れずにそれを呼び出すと、エラーなしで動作することです。
誰かアドバイスをいただけますか?私はこの問題に行き詰まっており、それを乗り越える方法がわかりません。
前もって感謝します
乾杯!
解決策
Kiba の仕組みにより、それが実際に Kiba 関連だったとしたら (たとえ IRB で動作させることができたとしても)、私はかなり驚くでしょう。しかし、私はこれを除外したいと思っています。
ソースと宛先を「スタブ」する、可能な限り最小の複製 (おそらく単一ファイルの Bundler 構成 https://bundler.io/guides/bundler_in_a_single_file_ruby_script.html を使用) を作成していただけますか?
すべての行に対してファラデー接続を再作成しているため、潜在的に奇妙なことが発生する可能性があります。
また、おそらく別の HTTP クライアントを使用してみます。発信中の通話の HTTPparty。何らかの理由で、あなたの呼び出しと RabbitMQ 呼び出しの間で競合が発生する可能性があります。
gist.github.com 経由で最小の再現を投稿していただければ、さらにお手伝いできることを嬉しく思います。