Kiba + postgres-copyでETL処理の実現(ShanonAdventCalendar2016 - 23日目)

このエントリーをはてなブックマークに追加

こんにちは、技術部のou.gです。リリースしたのは去年だけど、弊社BIサービスETL処理流れを簡単に紹介したいと思います。

テックスタック:

ETLのE(抽出):

SMPのDBから、必要となるデータを検索してCSVファイルとして落とします。

SMP::SourceModel.select(target_columns)
                .where('updated_at >= timestamp ?', last_succeeded_timestamp).copy_to do |line|
 source_csv_file.write line
end

ETLのT(変換・加工):

上記抽出したCSVファイルを使って、業務ロジックに合わせて、対象CSVファイルを作成します。

source Common::SMPSourceCSV, source_csv_file.path
destination Common::SMPDestinationCSV, destination_csv_file.path

transform do |row|
  row[:column1] = row.delete(:column1)
  row[:column2] = row.delete(:column2) || default_column2
  row
end

transform Common::TransformColumn3

Common::SMPSourceCSV、Common::SMPDestinationCSVとCommon::TransformColumn3の書き方はKibaのREADMEに書いてありますので、省略します。

ETLのL(ロード)

Kibaのpost_processに加工されたCSVファイルをDWHに反映します。

post_process do
  target_table_name = DWH::TragetModel.table_name
  temp_table_name = "#{target_table_name}_temp"
  # 同時実行の制御
  DWH::TragetModel.connection
    .execute("LOCK TABLE #{target_table_name} IN SHARE ROW EXCLUSIVE MODE;")
  
  # temp tableのindex/制約等を削除すると、もっと速くなる。
  DWH::TragetModel.connection
    .execute("CREATE TEMP TABLE #{temp_table_name} (LIKE #{target_table_name} INCLUDING ALL) ON COMMIT DROP;")
  DWH::TragetModel.copy_from(destination_csv_file.path, table: temp_table_name)
  DWH::TragetModel.connection.execute(upsert_sql)
end

upsert_sqlの返却SQLは下記のようなものです。

WITH updated AS (
  UPDATE target_table t
  SET    column1 = p.column1, column2 = p.column2, ..., columnN = p.columnN
  FROM   temp_copy_target_table p
  WHERE  t.smp_pk = p.smp_pk
  RETURNING p.smp_pk
)
INSERT INTO target_table(smp_pk, column1, column2, ..., columnN)
SELECT smp_pk, column1, column2, ..., columnN
FROM temp_copy_target_table p
WHERE p.smp_pk NOT IN (select smp_pk from updated)

これで、one sqlでupsertの実現できました。

説明しやすいために、実際ソースコードの共通処理を展開しました。

以上

次の記事
« Prev Post
前の記事
Next Post »
Related Posts Plugin for WordPress, Blogger...