Techioz Blog

Kiba ETL とファラデーのリクエスト

概要

私は CDC のコンセプトを持ったプロジェクトに取り組んでいます。データベースから変更を読み取り、イベントを Rabbitmq キューにプッシュします (debezium を使用します)。

その後、KibaETL を使用して、そのキューから消費されるイベント メッセージを処理します。

KibaETL には 3 つの主要なプロセスがあります

奇妙な部分は。ファラデーを呼び出すと、サイレントに kiba etl パイプラインが終了します。エラーは表示されません….

これが私の変換クラスのスニペットです。

#frozen_string_literal: true

require 'faraday'
require 'faraday/net_http'
require 'json'
require 'kiba'

class RetrieveJobDataTransform
  def process(row)
    puts "\t[transform] Retrieve Job Information by UUID - job_uuid: #{row}"
    raise ArgumentError, 'row cannot be nil' if row.nil?

    job_uuid = row.dig(:after_data, 'job_uuid')
    
    api_result = make_api_call(job_uuid)

    transformed_data = transform_data(row, api_result)

    transformed_data
  end

  private

  def make_api_call(job_uuid)    
    
    conn = create_connection('https://url.com')    
    
    **# before conn.get call, it shows output properly**
    response = conn.get("api/jobs/#{job_uuid}")
    **# here it doesnt show any output when I was debugging**

    if response.success?
      JSON.parse(response.body)
    else
      nil
    end
  end

  def transform_data(row, api_result)
    row = api_result
  end

  def create_connection(api_url)
    Faraday.new(api_url) do |f|
      f.headers['x-api-key'] = 'api key'
      f.response :json 
      f.adapter :net_http 
      f.options.timeout = 120
    end
  end
end

ETLを処理するために呼び出されるメインクラス

job = Kiba.parse do
  source KibaSource, bunny_adapter, queue_name

  transform ParseMessageTransform
  transform ValidateMessageTransform

  **#This is the class that silently quit with no error or warnings**
  transform RetrieveJobDataTransform 
  
  destination KibaDestination
end

puts '# job started - Running Kiba job... '
Kiba.run(job)

奇妙な動作は、「irb」と入力してそのクラスコードを追加し、kiba etl パイプライン内に入れずにそれを呼び出すと、エラーなしで動作することです。

誰かアドバイスをいただけますか?私はこの問題に行き詰まっており、それを乗り越える方法がわかりません。

前もって感謝します

乾杯!

解決策

Kiba の仕組みにより、それが実際に Kiba 関連だったとしたら (たとえ IRB で動作させることができたとしても)、私はかなり驚くでしょう。しかし、私はこれを除外したいと思っています。

ソースと宛先を「スタブ」する、可能な限り最小の複製 (おそらく単一ファイルの Bundler 構成 https://bundler.io/guides/bundler_in_a_single_file_ruby_script.html を使用) を作成していただけますか?

すべての行に対してファラデー接続を再作成しているため、潜在的に奇妙なことが発生する可能性があります。

また、おそらく別の HTTP クライアントを使用してみます。発信中の通話の HTTPparty。何らかの理由で、あなたの呼び出しと RabbitMQ 呼び出しの間で競合が発生する可能性があります。

gist.github.com 経由で最小の再現を投稿していただければ、さらにお手伝いできることを嬉しく思います。