読者です 読者をやめる 読者になる 読者になる

PythonでTreasureDataにBulkImportした話

Python TreasureData 技術

こんにちは、g0eです。

mats氏によって「中の人」の紹介記事が勝手に書かれていることに最近気づきました。javaはもう忘れました。

 

弊社では各種ローデータの収集や一次加工にTreasureData(以下:TD)を活用しています。

tresuredata

そして、アプリケーションは主にpythonで書いていまして、tdコマンドの中を覗く(lessする)とruby派っぽい雰囲気が伝わってくるんですが、pythonでも結構戦えたので紹介したいと思います。

今回のお題は、pythonでTDにtsvとかcsvのファイルをbulk_importした、という話です。 男らしくtdコマンドをsubprocessで叩けばいいんじゃないか、と思わなくもないですが、ちょっとややこしい環境で動かそうとしてうまくいかなかったので、td-client-pythonを使うことにしました。

このライブラリ、コードとコメントを読むと実は結構色々なことが出来るみたいで、そもそも最初はbulk_importの機能も実装されている事に気づきませんでした。

 

TDの環境設定とかtdコマンドの設定とかは既に終わっている前提で、まずはさくっとpipでインストールします。

(今回の記事は td-client==0.2.0 をベースに書いています)

$ pip install td-client

 

サンプルコード

pythonのコード的には↓みたいな感じで動くはず (既に動いているコードから切り貼りしてきただけなんで、エラー出たら教えて下さい)

# -*- coding: utf-8 -*-
import tdclient
import time

TD_APIKEY = "XXXX" # TDのAPIキー
TD_DB = "hoge" # TDのデータベース名
TD_TB = "piyo" # TDのテーブル名
TSV_FILE ="/path/to/hoge.tsv" # bulk_importしたいtsvファイル

cl = tdclient.Client(TD_APIKEY)
ts = int(time.time()) # timestamp

# bulk_importするtsvファイル(tdコマンドでimport:auto出来るフォーマットで)
f = open(TSV_FILE,"r")

# bulk_import用のセッション名
# ユニークになればOKっぽいのでファイル名とtimestampを使ってみる
# ただ、/か.かを含んでいるとエラーになったっぽいので除去
session_name = f.name.split("/")[-1].replace(".","") + "_" + str(ts)

# bulk_import用のsessionを生成
bulk = cl.create_bulk_import(session_name,TD_DB,TD_TB)

# ファイルをアップロード
bulk.upload_file(session_name,"tsv",f)

# ファイルの追加・変更がないことを通知するっぽい
# freezeしないと先に進めないのでおまじない的に
bulk.freeze()

# TDのUIからJobsを確認するとこの段階でMap-Reduceが走っている様子
# wait=Trueにして処理が完了するまで待たないとcommitでエラーが出ます
bulk.perform(wait=True)

# この処理が終わって、やっとUI上で追加されたデータを確認できる
bulk.commit(wait=True)

# 処理に成功した件数とエラーになった件数はこんな感じで取得できる
print("valid_records:" + str(bulk.valid_records))
print("error_records:" + str(bulk.error_records))

# 後処理
bulk.delete()
f.close()

 

freezeとかperformとかcommitとかの概念はBulk Import Internalsに書いているみたいなんで、気になる方は見てみて下さい。

以上