前回は、Treasure Workflowを扱う前のPC環境のセットアップをしました。 secret-ninja.hatenablog.com
Treasure Workflowでは処理のワークフローを設定するために、.digという拡張子をつけたファイルを作成します。 今回はこのdigファイルの書き方を説明します。
タスクとオペレータについて
digファイルには大きく分けて、タスクとオペレータの2種類とパラメータを使って記述をします。 下記の例でいうと、
- +XXXX:と記載をしているのがタスクです。タスクは、1つの処理単位を表します。このタスクを直列に並べたり、階段的に配置することでタスクに依存関係を持たせることができます。
- XXXX>:と書いてあるのがオペレータです。オペレータは処理内容を表します。オペレータの一覧はこちらにて参照可能です。また一つのタスクには1つのオペレータが基本的には必要となります。
- export: さっそくタスクでもオペレータでもない例外がでてしまっているのですが、export:は変数定義になります。ここではtd及びtd_runオペレータのデフォルトのデータベースを指定しています。 (クエリエディタでも最初にデータベースを選択しますが、それと思ってください。)
例:
_export: td: database: sample_datasets +task1: td_run>: クエリA +task2: td_run>: クエリB
上記の例ではtask1とtask2という二つのタスクがあります。 それぞれのタスクには、td_runオペレータを利用した処理が行われます。 td_runオペレータは、Treasure Data上のSaved Query名またはSaved Query IDを設定することで、そのクエリを実行させることができます。 つまりここでは、クエリAを最初に実行し、その次にクエリBというSaved Queryを実行する。ということになります。
またタスクは、+XXXX:のXXXXは任意の英数字の名前を利用可能です。 タスクを書いた後のオペレータは、インデントとして半角スペースを使うひつようがあります。半角スペースを2つまたは4つ入れておくと見やすくなります。 つまり、
+task1: td_run>: クエリA
は、エラーになります。これはtask1と同じインデントにあるため、task1のオペレータということを識別できないためです。
+task1: td_run>: クエリA
上記のように半角スペースを入れる必要があります。
実際に動かしてみる。
1 - Saved Queryの準備
まずはConsoleにログインして、実行させるクエリAとクエリBを作成して見ましょう。 すでにクエリがあるようであればそちらでも大丈夫です。
TreasureDataにはデフォルトのサンプルデータとしてsample_datasetsデータベースがあります。 このデータを使ってクエリを作ってみます。またsample_datasetsは書き出し不可のため、適当な名前でデータベースを作成しておきましょう。
クエリエディタ: https://console.treasuredata.com/app/queries/editor
クエリAではsample_datasetsデータベースの中のnasdaqテーブルから一部のデータの抽出を行い、そのデータをsupportデータベースのtestingテーブルへ書き出しを行なっています。 クエリBではsupportというデータベースのtestingテーブル(事前に作成しておきます)に対して、集計を行います。 そしてそれぞれをSaveします。
クエリA :
INSERT INTO support.testing SELECT TD_SCHEDULED_TIME() as time, symbol, count(1) as cnt FROM nasdaq GROUP BY symbol
クエリB:
SELECT TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as d, COUNT(1) as cnt FROM testing GROUP BY 1
2 - Treasure Workflowの準備
さて、上記の二つのクエリをワークフローで実行してみましょう。 下記のワークフローの設定をコピペして、new.digファイルを作ります。
_export: td: database: sample_datasets +task1: td_run>: クエリA +task2: td_run>: クエリB
補足: クエリ名を頻繁に変える可能性がある場合は、クエリ名ではなくURLに記載されているqueryID=XXXXXXの数値をクエリAやクエリBの代わり使うのが良いです。
3 - Treasure Workflowの実行
さて設定ファイルも準備できたので、これを実行してみましょう。 実行するためのコマンドは、td wf runとなります。これを実行することにより、new.digにて定義されたワークフローを自分のパソコン上で実行することができます。 (保存したファイルと同じディレクトリにする必要があるため、cd Desktopなどと実行してファイルのある場所を目ざしましょう。)
td wf run new.dig
また、一度成功したワークフローを再度実行したいときは td wf run new.dig --rerun と実行します。
td wf run new.dig --rerun
上記の結果は下記のようになります。
C:\Users\td\Desktop>td wf run new.dig --rerun 2017-07-31 10:09:48 -0700: Digdag v0.9.13 2017-07-31 10:09:51 -0700 [WARN] (main): Reusing the last session time 2017-07-31T00:00:00+00:00. 2017-07-31 10:09:51 -0700 [INFO] (main): Using session C:\Users\td\Desktop\.digdag\status\20170731T000000+0000. 2017-07-31 10:09:52 -0700 [INFO] (main): Starting a new session project id=1 workflow name=new session_time=2017-07-31T00:00:00+00:00 2017-07-31 10:09:53 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A 2017-07-31 10:09:53 -0700 [INFO] (0017@[0:default]+new+task1): td-client version: 0.7.36 2017-07-31 10:09:53 -0700 [INFO] (0017@[0:default]+new+task1): Logging initialized @5938ms 2017-07-31 10:09:54 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A 2017-07-31 10:09:56 -0700 [INFO] (0017@[0:default]+new+task1): Started a saved query name=???A with time=2017-07-31T00:00:00Z, job id= 160491966 2017-07-31 10:09:58 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A 2017-07-31 10:10:01 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A 2017-07-31 10:10:06 -0700 [INFO] (0017@[0:default]+new+task1): td_run>: ???A 2017-07-31 10:15:49 -0700 [INFO] (0017@[0:default]+new+task2): td_run>: ???B 2017-07-31 10:15:50 -0700 [INFO] (0017@[0:default]+new+task2): td_run>: ???B 2017-07-31 10:15:51 -0700 [INFO] (0017@[0:default]+new+task2): Started a saved query name=???B with time=2017-07-31T00:00:00Z, job id= 160493053 2017-07-31 10:15:53 -0700 [INFO] (0017@[0:default]+new+task2): td_run>: ???B Success. Task state is saved at C:\Users\td\Desktop\.digdag\status\20170731T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
これでログに記載されているJOB ID を参照することで集計結果を取得することができます。
2017-07-31 10:15:51 -0700 [INFO] (0017@[0:default]+new+task2): Started a saved query name=???B with time=2017-07-31T00:00:00Z, job id= 160493053
自分のパソコン上でワークフローを実行させるには現状CLIを利用する方法しかありません。しかし、Treasure Dataに上記のワークフローをアップロードすることで、Treasure Data上でワークフローの処理を行うことができ、GUIでも一部ワークフローの作成が可能です。
それについてはまた次の記事で説明します。