気付けばこのシリーズも3回目になった。 Vol.1を書いたのが2018年、Vol.2が2023年。そして2025年。 相変わらずTreasure Dataにいるし、まだ書くことがあるというのは、ある意味ありがたいことだと思う。
私とTreasureData カテゴリーの記事一覧 - Secret Ninja Blog
今回は、Bulk Importの話を書く。
ストリーミング処理が先進的だった時代(2013年-2015年ごろ)
当時、Treasure Dataといえば、Fluentd → Treasure Dataという流れが代名詞だった。 ゲームログやアクセスログをリアルタイムにストリーミングで送る仕組みは、当時のデータ基盤としてはかなり先進的だった。 もちろんそういう基盤もあったが、ストリーミングでデータを入れる場合はコストが高めで、割高だったり、種々の制約があった記憶がある。
一方で、
- MySQLのようなRDB
- 業務システムから吐き出される定期CSV
- 社内バッチ処理のアウトプット
こうしたデータを分析基盤に定期的に取り込む運用は、今も昔も変わらずバッチが主流だった。
そして、その役割を担っていたのが、Vol.1でも触れたRubyで実装された td bulk_import コマンドだった。
Ruby版 Bulk Import の“助かる”挙動
Ruby版のBulk Importは、単なる「データを送る」だけではなく、 CSV、TSV、JSONなどをローカルでMessagePackに変換してから送る仕組みとなっており、現場のデータにある“揺らぎ”を自然に吸収してくれる存在だった。
- 型推論が柔らかい
- 日付フォーマットの違いに寛容
- 空白・NULL・文字列数値をそれっぽく扱う
実運用では、ログやCSVが毎回統一されていることの方が珍しい。 多様な顧客環境でも、Rubyで書かれたパーサーはいい感じに処理してくれたので、型変換に関しては問題が少なかった。
Java版 td import の開発とその課題
Treasure Dataを使う顧客が増え、データ量が大きくなるにつれ、1ファイルが数GBのファイルを扱うと、この変換処理のパフォーマンスが出ない問題が出てきてしまっていた。 そこで、Java版の td import コマンドが開発された。
2013年当時、私はまだ新卒で、TDにはパートナーとして関わっていた。 そのため、Java版 td import の実装を担当したmugaさんから連絡をもらい、検証を手伝った記憶がある。
Ruby版(1.8か1.9系だった気がする)はマルチスレッドで処理を行うものの、マルチコアでの処理ができなかったため、 Java版ではマルチコアで処理が行えるように並列処理が実装され、ローカルの変換処理がかなり高速になった記憶がある。
ただ実際に、顧客のデータで使い始めると、Ruby版では普通に通っていたファイルが、Java版ではエラーになることが多く、 Javaの型変換の厳密さを考慮しないといけないことが多く、大変だった。
Treasure Dataの場合、入れてからSQLでなんとかすればいいやというところもあったので、 苦肉の策として、すべてとりあえず文字列として扱ってしまってTDに突っ込むというオプション(td import --all-string)が爆誕したりもした。
td importで一番変わったのは、ローカルの処理部分だけで、TDにアップロードしてMPC1フォーマットに変換する部分のAPIは、 Ruby版のときから今も現役で使われているので、息の長い、よくできた仕組みだなと思う(内部的に変わっている部分はあるが)。
実際、embulk-output-tdではこのBulk Import APIがアップロード時にまだ使われている。
そしてEmbulkへ
さて、パフォーマンスについてはJava版で解決したものの、 多様なデータソースから定期的にデータを取り込みたいというニーズに対してはtd importコマンドでは解決できなかった。
また、Treasure Dataから外部にクエリ結果を出す仕組みについては、Result Outputと呼ばれ、 (細かい実装は覚えていないが)Hadoopからクエリ結果をS3やSalesforceに書き出す機構が取られていた。 これはHadoopの基盤と密結合しており、なかなか柔軟性に欠けていたため、どんどん連携先を増やすことが難しかった。
これらの問題を解決するために、Fluentdのような柔軟なPlugin機構を持ち、td importコマンドなどのように大規模なデータであっても高速に処理できるEmbulkが開発された。
今でもEmbulkのFolk版はTreasure Dataで動いているが、OSSとしてのEmbulkはメンテナンスモードになった。
これについてのEmbulkは、Embulkをサポートとしてサポートしてきた観点、 そして今はプロダクトマネージャーとして見た時の観点から、別の機会に思い出を書いていこうと思う。