Secret Ninja Blog

プロダクトマネージャーしてます

Treasure Workflow for ビギナー ~digファイル書き方編~

前回は、Treasure Workflowを扱う前のPC環境のセットアップをしました。 secret-ninja.hatenablog.com

Treasure Workflowでは処理のワークフローを設定するために、.digという拡張子をつけたファイルを作成します。 今回はこのdigファイルの書き方を説明します。

タスクとオペレータについて

digファイルには大きく分けて、タスクオペレータの2種類とパラメータを使って記述をします。 下記の例でいうと、

  • +XXXX:と記載をしているのがタスクです。タスクは、1つの処理単位を表します。このタスクを直列に並べたり、階段的に配置することでタスクに依存関係を持たせることができます。
  • XXXX>:と書いてあるのがオペレータです。オペレータは処理内容を表します。オペレータの一覧はこちらにて参照可能です。また一つのタスクには1つのオペレータが基本的には必要となります。
  • export: さっそくタスクでもオペレータでもない例外がでてしまっているのですが、export:は変数定義になります。ここではtd及びtd_runオペレータのデフォルトのデータベースを指定しています。 (クエリエディタでも最初にデータベースを選択しますが、それと思ってください。)

例:

_export:
  td:
     database: sample_datasets
+task1:
  td_run>: クエリA
+task2:
  td_run>: クエリB

上記の例ではtask1とtask2という二つのタスクがあります。 それぞれのタスクには、td_runオペレータを利用した処理が行われます。 td_runオペレータは、Treasure Data上のSaved Query名またはSaved Query IDを設定することで、そのクエリを実行させることができます。 つまりここでは、クエリAを最初に実行し、その次にクエリBというSaved Queryを実行する。ということになります。

またタスクは、+XXXX:のXXXXは任意の英数字の名前を利用可能です。 タスクを書いた後のオペレータは、インデントとして半角スペースを使うひつようがあります。半角スペースを2つまたは4つ入れておくと見やすくなります。 つまり、

+task1:
td_run>: クエリA

は、エラーになります。これはtask1と同じインデントにあるため、task1のオペレータということを識別できないためです。

+task1:
  td_run>: クエリA

上記のように半角スペースを入れる必要があります。

実際に動かしてみる。

1 - Saved Queryの準備

まずはConsoleにログインして、実行させるクエリAとクエリBを作成して見ましょう。 すでにクエリがあるようであればそちらでも大丈夫です。

TreasureDataにはデフォルトのサンプルデータとしてsample_datasetsデータベースがあります。 このデータを使ってクエリを作ってみます。またsample_datasetsは書き出し不可のため、適当な名前でデータベースを作成しておきましょう。

クエリエディタ: https://console.treasuredata.com/app/queries/editor

クエリAではsample_datasetsデータベースの中のnasdaqテーブルから一部のデータの抽出を行い、そのデータをsupportデータベースのtestingテーブルへ書き出しを行なっています。 クエリBではsupportというデータベースのtestingテーブル(事前に作成しておきます)に対して、集計を行います。 そしてそれぞれをSaveします。

クエリA :

INSERT INTO support.testing
SELECT
  TD_SCHEDULED_TIME() as time,
  symbol,
  count(1) as cnt
FROM nasdaq
GROUP BY symbol

クエリB:

SELECT
  TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as d,
  COUNT(1) as cnt
FROM testing
GROUP BY 1

2 - Treasure Workflowの準備

さて、上記の二つのクエリをワークフローで実行してみましょう。 下記のワークフローの設定をコピペして、new.digファイルを作ります。

_export:
  td:
    database: sample_datasets
+task1:
  td_run>: クエリA
+task2:
  td_run>: クエリB

補足: クエリ名を頻繁に変える可能性がある場合は、クエリ名ではなくURLに記載されているqueryID=XXXXXXの数値をクエリAやクエリBの代わり使うのが良いです。

3 - Treasure Workflowの実行

さて設定ファイルも準備できたので、これを実行してみましょう。 実行するためのコマンドは、td wf runとなります。これを実行することにより、new.digにて定義されたワークフローを自分のパソコン上で実行することができます。 (保存したファイルと同じディレクトリにする必要があるため、cd Desktopなどと実行してファイルのある場所を目ざしましょう。)

td wf run new.dig

また、一度成功したワークフローを再度実行したいときは td wf run new.dig --rerun と実行します。

td wf run new.dig --rerun 

上記の結果は下記のようになります。

C:\Users\td\Desktop>td wf run new.dig --rerun
2017-07-31 10:09:48 -0700: Digdag v0.9.13
2017-07-31 10:09:51 -0700 [WARN] (main): Reusing the last session time 2017-07-31T00:00:00+00:00.
2017-07-31 10:09:51 -0700 [INFO] (main): Using session C:\Users\td\Desktop\.digdag\status\20170731T000000+0000.
2017-07-31 10:09:52 -0700 [INFO] (main): Starting a new session project id=1 workflow name=new session_time=2017-07-31T00:00:00+00:00
2017-07-31 10:09:53 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A
2017-07-31 10:09:53 -0700 [INFO] (0017@[0:default]+new+task1): td-client version: 0.7.36
2017-07-31 10:09:53 -0700 [INFO] (0017@[0:default]+new+task1): Logging initialized @5938ms
2017-07-31 10:09:54 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A
2017-07-31 10:09:56 -0700 [INFO] (0017@[0:default]+new+task1): Started a saved query name=???A with time=2017-07-31T00:00:00Z, job id= 160491966
2017-07-31 10:09:58 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A
2017-07-31 10:10:01 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A
2017-07-31 10:10:06 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A
2017-07-31 10:15:49 -0700 [INFO] (0017@[0:default]+new+task2): td_run>: ???B
2017-07-31 10:15:50 -0700 [INFO] (0017@[0:default]+new+task2): td_run>: ???B
2017-07-31 10:15:51 -0700 [INFO] (0017@[0:default]+new+task2): Started a saved query name=???B with time=2017-07-31T00:00:00Z, job id= 160493053
2017-07-31 10:15:53 -0700 [INFO] (0017@[0:default]+new+task2): td_run>: ???B
Success. Task state is saved at C:\Users\td\Desktop\.digdag\status\20170731T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

これでログに記載されているJOB ID を参照することで集計結果を取得することができます。

2017-07-31 10:15:51 -0700 [INFO] (0017@[0:default]+new+task2): Started a saved query name=???B with time=2017-07-31T00:00:00Z, job id= 160493053

自分のパソコン上でワークフローを実行させるには現状CLIを利用する方法しかありません。しかし、Treasure Dataに上記のワークフローをアップロードすることで、Treasure Data上でワークフローの処理を行うことができ、GUIでも一部ワークフローの作成が可能です。

それについてはまた次の記事で説明します。