Secret Ninja Blog

プロダクトマネージャーしてます

私とトレジャーデータ vol.3:Bulk Importの話

気付けばこのシリーズも3回目になった。 Vol.1を書いたのが2018年、Vol.2が2023年。そして2025年。 相変わらずTreasure Dataにいるし、まだ書くことがあるというのは、ある意味ありがたいことだと思う。

私とTreasureData カテゴリーの記事一覧 - Secret Ninja Blog

今回は、Bulk Importの話を書く。

ストリーミング処理が先進的だった時代(2013年-2015年ごろ)

当時、Treasure Dataといえば、Fluentd → Treasure Dataという流れが代名詞だった。 ゲームログやアクセスログをリアルタイムにストリーミングで送る仕組みは、当時のデータ基盤としてはかなり先進的だった。 もちろんそういう基盤もあったが、ストリーミングでデータを入れる場合はコストが高めで、割高だったり、種々の制約があった記憶がある。

一方で、

  1. MySQLのようなRDB
  2. 業務システムから吐き出される定期CSV
  3. 社内バッチ処理のアウトプット

こうしたデータを分析基盤に定期的に取り込む運用は、今も昔も変わらずバッチが主流だった。

そして、その役割を担っていたのが、Vol.1でも触れたRubyで実装された td bulk_import コマンドだった。

Ruby版 Bulk Import の“助かる”挙動

Ruby版のBulk Importは、単なる「データを送る」だけではなく、 CSV、TSV、JSONなどをローカルでMessagePackに変換してから送る仕組みとなっており、現場のデータにある“揺らぎ”を自然に吸収してくれる存在だった。

  • 型推論が柔らかい
  • 日付フォーマットの違いに寛容
  • 空白・NULL・文字列数値をそれっぽく扱う

実運用では、ログやCSVが毎回統一されていることの方が珍しい。 多様な顧客環境でも、Rubyで書かれたパーサーはいい感じに処理してくれたので、型変換に関しては問題が少なかった。

Java版 td import の開発とその課題

Treasure Dataを使う顧客が増え、データ量が大きくなるにつれ、1ファイルが数GBのファイルを扱うと、この変換処理のパフォーマンスが出ない問題が出てきてしまっていた。 そこで、Java版の td import コマンドが開発された。

2013年当時、私はまだ新卒で、TDにはパートナーとして関わっていた。 そのため、Java版 td import の実装を担当したmugaさんから連絡をもらい、検証を手伝った記憶がある。

Ruby版(1.8か1.9系だった気がする)はマルチスレッドで処理を行うものの、マルチコアでの処理ができなかったため、 Java版ではマルチコアで処理が行えるように並列処理が実装され、ローカルの変換処理がかなり高速になった記憶がある。

ただ実際に、顧客のデータで使い始めると、Ruby版では普通に通っていたファイルが、Java版ではエラーになることが多く、 Javaの型変換の厳密さを考慮しないといけないことが多く、大変だった。

Treasure Dataの場合、入れてからSQLでなんとかすればいいやというところもあったので、 苦肉の策として、すべてとりあえず文字列として扱ってしまってTDに突っ込むというオプション(td import --all-string)が爆誕したりもした。

td importで一番変わったのは、ローカルの処理部分だけで、TDにアップロードしてMPC1フォーマットに変換する部分のAPIは、 Ruby版のときから今も現役で使われているので、息の長い、よくできた仕組みだなと思う(内部的に変わっている部分はあるが)。

実際、embulk-output-tdではこのBulk Import APIがアップロード時にまだ使われている。

そしてEmbulkへ

さて、パフォーマンスについてはJava版で解決したものの、 多様なデータソースから定期的にデータを取り込みたいというニーズに対してはtd importコマンドでは解決できなかった。

また、Treasure Dataから外部にクエリ結果を出す仕組みについては、Result Outputと呼ばれ、 (細かい実装は覚えていないが)Hadoopからクエリ結果をS3やSalesforceに書き出す機構が取られていた。 これはHadoopの基盤と密結合しており、なかなか柔軟性に欠けていたため、どんどん連携先を増やすことが難しかった。

これらの問題を解決するために、Fluentdのような柔軟なPlugin機構を持ち、td importコマンドなどのように大規模なデータであっても高速に処理できるEmbulkが開発された。

今でもEmbulkのFolk版はTreasure Dataで動いているが、OSSとしてのEmbulkはメンテナンスモードになった。

これについてのEmbulkは、Embulkをサポートとしてサポートしてきた観点、 そして今はプロダクトマネージャーとして見た時の観点から、別の機会に思い出を書いていこうと思う。