Secret Ninja Blog

Customer Experience Senior Directorしてます

私とTreasureData vol.1 ~ bulk_import編 ~

2013年4月に新卒でとある中小SIer会社に入社し、前年に発足したばかりのビッグデータチームに配属された私はそこでTreasure Dataを初めて知ることとなった。 その後、紆余曲折があり、2018年現在Treasure DataのSupport Engineering Managerとしてグローバルサポートチームのマネージャーとして働いている。 このシリーズでは、私が新卒からずっとTreasure Dataに関わってきた5~6年間を通して、Treasure Dataの新古様々な機能について説明をしていきたいと思う。

私とトレジャーデータ

新人研修もそこそこに、ビッグデータチームに所属した私が最初に行なったことは、この見たことも触れたこともないクラウドサービスについて色々試してみることだった。 そして最初に書いたTreasure Dataに関するブログ記事は、CSVからMessagePack形式への変換スクリプトについての記事だった。Ref. http://sstd-bigdata.blogspot.jp/2013/04/csvmessagepack.html

なぜこの記事を書いたかの記憶は定かではないが、当時のtdコマンドのバージョンはtd v0.10.75だし、当時のWebコンソール(Console v1)は下図である。現在のConsole v3を見て振り返るとだいぶ進化しており、感慨深い。

f:id:toru-takahashi:20180824035024p:plain

当時のシステム構成図の概略を示すと、下図のようになるかと思う。 この当時はまだHadoop As a Service, Bigdata As a Serviceとしてのポジショニングをしていた。

f:id:toru-takahashi:20180824035040p:plain

概略でみると、コアなコンポーネント自体は今も変わらず使われていることがわかる。 とはいえ、この時のインポート方法は、

  1. fluentd ( fluent-plugin-td )を使ったストリーミングインポート
  2. tdコマンドに付属するbulk_importコマンドというバッチインポート

の二つしかなかった。

当時の顧客の多くは先進的なWebやアドテク、ソーシャルゲームの企業であり、課題の多くは巨大MySQLクラスタの運用やログのスキーマの増減に対する対応が辛い、といったことを解消することであった。 そのため、この当時の顧客のほとんどはfluentdを難なく導入しており、fluentdがデータインポート方法のメインストリームであった。

昨今、バッチ処理の重要性が当たり前のように認識されているが、当時は”リアルタイム!”という言葉が脚光を浴びており、バッチ処理で安定的に高速にデータをインポートすること自体はあまり重要視されておらず、bulk_importコマンドを使うことがあったが、運用のことまで十二分に考慮したコマンド設計になっていなかったようには思う。その中で様々なサポートが必要となり、右葉曲折があり、embulkを開発することに至り、多くのユーザに利用されている訳である。

さて、そんな古のbulk_importコマンドについて第1回では語っておきたい。

私とbulk_import

はじめに、私自身は世界でもっとも多種多様なデータをトレジャーデータに入れたことがある人間であると言っても過言ではないくらい、様々なお客様のデータインポートトラブルを解決し、データインポートの手助けをしたことがある。

さて、新卒1年目の初案件としてこのTreasure Dataを扱うことになったのだが、その時はとある広告のログを入れて特定の集計を行なってExcelに出すと簡単でアドホックな集計支援であった。

その時、最初にデータのインポートに利用したツールがtd bulk_importコマンドである。

ちなみにbulk_importとアンダースコアをつけている事にお気づきだろうか? トレジャーデータにコマンドラインでバッチ処理としてデータをインポートするには現在のところ下記の方法がある。

  • td table:import
  • td bulk_import
  • td import
  • td connector
  • Embulk

それぞれの違いについては後述するが、2013年の当初はバッチインポート方法としては5つの中の2つだけであった。

  • td table:import
  • td bulk_import

td table:importは、bulk_importと同様にコマンドで実行可能なインポート処理だが、バッチ処理には向かない仕組みになっていた。内部的には、fluentd(fluent-plugin-td)のエンドポイントとして利用されているStreaming API (api-import.treasuredata.com、当時はapi.treasure-data.com)に対してデータを送るようになっており、また数千件のデータをちょろっとアップロードすることを目的に実装されていたため、大量のデータを送るには処理が遅く、また途中で処理が中断した場合に中途半端にデータが入ってしまう可能性もコマンドとなっていた。また、ストリーミング処理用のAPIでは、データを受け取ってからインポート処理を行うまでに別の内部的な非同期処理があり、コマンド側でアップロード完了したタイミングと、実際にインポート処理が完了しクエリ可能になるまでにタイムラグがあり、連続的に処理を行う必要があるバッチ処理には向かないものとなっていた。

さて、上記のような特性がtd table:importがあるため、ローカルからCSVやJSONといったデータをTreasureDataに送りたい場合には、td bulk_importが推奨されていた。

td bulk_importコマンドを見ると、prepare / upload / perform / commit というサブコマンドがあることがわかる。

$ td bulk_import
Additional commands, type "td help COMMAND" for more details:

  bulk_import:list                           # List bulk import sessions
  bulk_import:show <name>                    # Show list of uploaded parts
  bulk_import:create <name> <db> <table>     # Create a new bulk import session to the table
  bulk_import:prepare_parts <files...>       # Convert files into part file format
  bulk_import:upload_parts <name> <files...>   # Upload or re-upload files into a bulk import session
  bulk_import:delete_parts <name> <ids...>   # Delete uploaded files from a bulk import session
  bulk_import:perform <name>                 # Start to validate and convert uploaded files
  bulk_import:error_records <name>           # Show records which did not pass validations
  bulk_import:commit <name>                  # Start to commit a performed bulk import session
  bulk_import:delete <name>                  # Delete a bulk import session
  bulk_import:freeze <name>                  # Reject succeeding uploadings to a bulk import session
  bulk_import:unfreeze <name>                # Unfreeze a frozen bulk import session

データアップロードするだけだったらuploadだけでいいじゃんと思ってしまうのだが、トレジャーデータはファイルストレージでもオブジェクトストレージでもないので、CSVをそのまま配置してもクエリ可能な状態にはならない。これは、クエリの高速化やストレージの省スペース化などなどのために必要不可欠なことである。 そのため、以前は、ユーザ側でトレジャーデータに最適なデータ構造(mpc1形式)に変換するまでの処理の実行開始をユーザ側に実施してもらうような仕組みになっていた。

つまり、お手元にCSVファイルがあったときには下記のようなステップが求められていた。

CSVファイル

-- prepare_partsコマンド --> msgpack形式にローカルで一旦変換。ファイル分割なども行う。

-- upload_partsコマンド --> TreasureDataのbulkimport用一時ストレージにmsgpackをアップロード

-- performコマンド --> Hadoopジョブを実行して、msgpackからmpc1形式への変換

-- commitコマンド --> mpc1形式のファイルをストレージに格納

この一連の流れを通して初めてクエリ可能な状態になる。 パッと見てかなり手間なのだけれども、個々のコマンドでデータに過不足がないかのバリデーションが可能な点はメリットではあった。

しかし、prepare_partsの箇所でファイル分割やmsgpack形式への変換処理に時間がかかるという問題をかかえており、データ量が数億件の場合には数時間かかってしまうこともあった。 そのため、この処理の高速化・並列化が求められていた。 そこで、bulkimportが開発されることになるが、この話は次章で説明したいと思う。

そんなわけで2013年前半に苦楽を共にしたトレジャーデータでのデータバルクアップロード方法であったbulk_importについて簡単に紹介した。 ここまでは、トレジャーデータのサポートエンジニアになる前の話だけれども、書いてる途中で手が進まなくなり、期間が空いてしまったので一旦記事公開をする。 次は、私とTreasureData vol.1 ~ bulk_importじゃなくてbulkimport編 ~ です。