【TreasureData】CSVファイルをS3からインポートするワークフロー
処理概要
S3に配置したCSVファイルをTreasureDataにインポートする
処理内容
- CSVファイルを一時テーブルにインポートする
- 一時テーブルに対してSQLを実行する
- SQL実行結果を目的のテーブルに格納する
- 一時テーブルを削除する
CSVファイル
- sample.csv
- AWSのS3バケット配下に配置
digファイル
- csv_import_test.dig
- ワークフローの設計書となるファイル
ymlファイル
- csv_import_test.yml
- CSVファイルの入出力設定を記載するファイル
- configディレクトリ配下に配置する
sqlファイル
- csv_import_test.sql
- 一時テーブルに対する処理内容を記載する
補足
time列
- time列がないのでインポート時に追加する設定をymlファイルに記載する
レコードを絞る必要がない場合
- sqlのwhere句を記載しない
- td_loadの対象テーブルを一時テーブルから目的のテーブルに変更する
- 後者の場合sqlファイルとdigの+cast: 以下の処理は不要になる
digファイルの内容
timezone: Asia/Tokyo schedule: daily>: 10:00:00 _export: td: dest_db: test dest_table: csv_import_test tmp_table: tmp_csv_import_test +drop_if_exist_table: td_ddl>: database: ${td.dest_db} drop_tables: ["${td.dest_table}", "${td.tmp_table}"] +prepare_table: td_ddl>: database: ${td.dest_db} create_databases: ["${td.dest_db}"] create_tables: ["${td.dest_table}", "${td.tmp_table}"] +load: td_load>: config/csv_import_test.yml database: ${td.dest_db} table: ${td.tmp_table} +cast: td>: csv_import_test.sql database: ${td.dest_db} engine: presto insert_into: ${td.dest_table} +drop_tmp_table: td_ddl>: database: ${td.dest_db} drop_tables: ["${td.tmp_table}"]
ymlファイルの内容
in: type: s3 access_key_id: ${secret:s3.access_key_id} secret_access_key: ${secret:s3.secret_access_key} bucket: bucket_name path_prefix: path/to/sample.csv use_modified_time: true incremental: true parser: charset: UTF-8 newline: LF type: csv skip_header_lines: 1 columns: - name: area type: string - name: tel type: string - name: mail type: string filters: - type: add_time to_column: name: time type: timestamp from_value: mode: upload_time out: {} exec: {}
CSVファイルの内容
area,tel,mail Miyagi,080-2075-2274,cea53M@sample.com Saga,090-7180-5522,vGkwV@test.com Tokyo,090-9144-1409,a9x5H@test.jp Okayama,090-6590-3913,j2aE9BU@sample.co.jp Iwate,090-6462-4110,kuLrh@example.com
SQLファイルの内容
select * from ${tmp_table} where area = 'Tokyo' ;