2015年3月16日月曜日

ネストされたJSONデータをFluentdでGoogle BigQueryに投入してクエリを実行する


ネストされたJSONデータ(https://cloud.google.com/bigquery/docs/personsData.json)を、fluentdを使ってGoogle BigQueryに投入します。
BigQueryは、こういった構造化されたデータに対応する WITHIN、FLATTEN といったSQL関数が準備されています。
これらを利用したSQLクエリを構築してデータを取り出してみます。



プロジェクトを作成
https://console.developers.google.com/project
Google Developers Consoleを開き、プロジェクトを作成して、『プロジェクト ID』『メールアドレス』をメモしておいてください。
また、P12キーファイルを保存しておいてください。
認証で必要になります。

サーバ
AMI: CentOS 6 (x86_64) - with Updates HVM
Amazon Linuxは、td-agentが非サポートのため。CentOSを選びました。
Treasure AgentとAmazon Linux(http://qiita.com/repeatedly/items/a07f34c1be7f7309b521
追記:td-agent - Treasure Agent 2.2.0のリリース - Qiita『Amazon Linuxのサポート』(http://qiita.com/repeatedly/items/5904e81e1856e0671684

以下、サーバでの作業です。

Google Cloud SDKをインストール
Google Cloud SDKのために、Pythonのバージョンを2.7にアップしておきます。
# yum -y install gcc zlib-devel openssl-devel
# curl -O https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
# tar zxvf Python-2.7.9.tgz
# cd Python-2.7.9
# ./configure --prefix=/usr/local
# make
# make install
# /usr/local/bin/python --version
Python 2.7.9
Google Cloud SDKをインストール。
$ curl https://sdk.cloud.google.com | bash
$ source ~/.bashrc
$ gcloud components list
$ gcloud auth login --no-launch-browser
Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?xxxxxx

Enter verification code:
URLが表示されるので、それを手元のブラウザで開く。
許可リクエストを承認すると、認証用コードが表示されるので、ターミナルに貼り付けて認証します。

認証を行ったアカウントを確認。
$ gcloud auth list
BigQuery用のコマンドが有効になっているか確認。
$ bq
既存プロジェクトを確認。プロジェクト名は、my-project-id です。
$ bq ls
       projectId          friendlyName
 --------------------- ------------------
  my-project-id   My First Project

データセットを作成
データセットとは、MySQLに例えると、データベースのようなものです。
BigQueryコンソール(https://bigquery.cloud.google.com/)から作成できます。My First Project -> プルダウン -> Create new dataset
コマンドラインからデータセットを作成する場合は以下のようにします。
データセット名は、”my_dataset”です。
# bq mk your-project-id:my_dataset

テーブルを作成
テーブルのスキーマファイルを用意します。
こちらのサンプルを利用します。
Nested and Repeated Data(https://cloud.google.com/bigquery/docs/data#nested
# curl -o /tmp/personsDataSchema.json  https://cloud.google.com/bigquery/docs/personsDataSchema.json
テーブルを作成。テーブル名は、 my_table です。
$ bq mk -t my-project-id:my_dataset.my_table ./personsDataSchema.json
Table 'my-project-id:my_dataset.my_table' successfully created.
テーブルの情報を表示。
$ bq --project_id my-project-id show my_dataset.my_table
Table my-project-id:my_dataset.my_table

   Last modified                   Schema                  Total Rows   Total Bytes   Expiration
 ----------------- -------------------------------------- ------------ ------------- ------------
  16 Mar 14:33:01   |- kind: string
                    |- fullName: string (required)
                    |- age: integer
                    |- gender: string
                    +- phoneNumber: record
                    |  |- areaCode: integer
                    |  |- number: integer
                    +- children: record (repeated)
                    |  |- name: string
                    |  |- gender: string
                    |  |- age: integer
                    +- citiesLived: record (repeated)
                    |  |- place: string
                    |  |- yearsLived: integer (repeated)
BigQueryコンソールから確認してみましょう。


fluentdをインストール
td-agent2をインストール。
# curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
fluent-plugin-bigqueryプラグインをインストール。
# td-agent-gem install fluent-plugin-bigquery
td-agentの設定ファイルを編集します。
データセット名、テーブル名には、先ほど作った名前を設定します。
Google Developers Console(https://console.developers.google.com/project)に認証情報があるので、それをコピーして利用します。
プロジェクトを選択 -> APIと認証 -> 認証情報
emailにメールアドレスを、private_key_pathにダウンロードしたP12キーファイルのパスを設定します。
# emacs /etc/td-agent/td-agent.conf
<source>
  type tail
  format json
  path /tmp/fluentd_data_source.json
  tag tail.json
</source>

<match tail.json>
  type bigquery
  method insert
  auth_method private_key
  email メールアドレス
  private_key_path P12キーファイルのパス
  project my-project-id
  dataset my_dataset
  table my_table
  time_format %s
  time_field time
  schema_path /tmp/personsDataSchema.json
  flush_interval 1s
  try_flush_interval 0.05
  queued_chunk_flush_interval 0.01
  buffer_chunk_limit 512k
  buffer_chunk_records_limit 250
  buffer_queue_limit 1024
  retry_limit 5
  retry_wait 1s
  num_threads 10
</match>

td-agentをスタート。
# /etc/init.d/td-agent start
# cat /var/log/td-agent/td-agent.log

サンプルデータをBigQueryに投入
サンプルデータをダウンロード。
# curl -o /tmp/personsData.json  https://cloud.google.com/bigquery/docs/personsData.json
サンプルデータをfluentdに投入。fluentdは、すぐにデータをBigQueryに転送するはずです。
# cat personsData.json >> /tmp/fluentd_data_source.json

BigQueryにデータは投入されている?
データは投入されているでしょうか?
『Details』タブから確認できます。


データが投入されていることがわかりました。

SQLを発行してデータを取得
SQLを発行してデータを取得してみます。
SELECT * FROM [my_dataset.my_table] LIMIT 1000
Query Failed
Error: Cannot output multiple independently repeated fields at the same time. Found children_age and citiesLived_place
ワイルドカードを使うと、複数の繰り返しのあるフィールドにアクセスすることになるため。エラーになってしまいます。 代わりに、以下ようにフィールドを指定してクエリを実行します。
SELECT
  fullName AS name,
  age,
  gender,
  citiesLived.place,
  citiesLived.yearsLived
FROM [my_dataset.my_table]


コマンドラインからも同じ結果を取得できます。
# bq query --project_id my-project-id "SELECT fullName AS name, age, gender, citiesLived.place, citiesLived.yearsLived FROM [my_dataset.my_table]"
自動的にネストされたデータがフラット化されて取得できたことがわかります。

FLATTENについて
明示的に、複数の繰り返しフィールドを扱うときには、FLATTENを使う必要があります。
例えば、次のクエリを実行します。
SELECT fullName, age
FROM [my_dataset.my_table]
WHERE
  (citiesLived.yearsLived > 1995 ) AND
  (children.age > 3)

BigQuery error in query operation: Error processing job 'xxxx:bqjob_xxxxx': Cannot query the
cross product of repeated fields children.age and citiesLived.yearsLived.
エラーになります。
複数の繰り返しフィールド全体に問い合せるには、フィールドのいずれかをフラット化する必要があります。
この例では、childrenをフラットにしています。
SELECT
  fullName,
  age,
  gender,
  citiesLived.place
FROM (FLATTEN([my_dataset.my_table], children))
WHERE
  (citiesLived.yearsLived > 1995) AND
  (children.age > 3)
GROUP BY fullName, age, gender, citiesLived.place
+------------+-----+--------+-------------------+
|  fullName  | age | gender | citiesLived_place |
+------------+-----+--------+-------------------+
| John Doe   |  22 | Male   | Stockholm         |
| Mike Jones |  35 | Male   | Los Angeles       |
| Mike Jones |  35 | Male   | Washington DC     |
| Mike Jones |  35 | Male   | Portland          |
| Mike Jones |  35 | Male   | Austin            |
+------------+-----+--------+-------------------+

ほしい情報を出力することが出来ました。

WITHINについて
WITHINは、ネストされたフィールド内の繰り返しフィールドを集計するときに役立ちます。
以下の2つの使用方法があります。
WITHIN RECORD: レコード内の繰り返しの集合体データ。
WITHIN node_name: レコード内の親ノードで指定した、繰り返しの集合体データ。

例えば、それぞれの子供の数をカウントする場合には、WITHIN RECORD使って、レコード内の繰り返しの集合体データを計算します。
SELECT
  fullName,
  COUNT(children.name) WITHIN RECORD AS numberOfChildren
FROM [my_dataset.my_table]
+---------------+------------------+
|   fullName    | numberOfChildren |
+---------------+------------------+
| Anna Karenina |                0 |
| John Doe      |                2 |
| Mike Jones    |                3 |
+---------------+------------------+
全ての子供のリストです。上記の結果が間違いないことがわかります。
SELECT
  fullName, children.name
FROM [my_dataset.my_table]
+---------------+---------------+
|   fullName    | children_name |
+---------------+---------------+
| Anna Karenina | NULL          |
| John Doe      | Jane          |
| John Doe      | John          |
| Mike Jones    | Earl          |
| Mike Jones    | Sam           |
| Mike Jones    | Kit           |
+---------------+---------------+

別のSQLを発行してみましょう。
citiesLived.place対してWITHIN RECORDを実行して、それぞれの人が住んでいる街の数をカウントします。
citiesLived.yearsLived対してWITHINを実行して、それぞれの人がにその街に住んでいた年をカウント(citiesLivedの数)します。WITHINで親ノードを指定して、繰り返しの集合体データを計算しています。
SELECT
  fullName,
  COUNT(citiesLived.place) WITHIN RECORD AS numberOfPlacesLived,
  citiesLived.place,
  COUNT(citiesLived.yearsLived) WITHIN citiesLived AS numberOfTimesInEachCity,
FROM [my_dataset.my_table]
+---------------+---------------------+-------------------+-------------------------+
|   fullName    | numberOfPlacesLived | citiesLived_place | numberOfTimesInEachCity |
+---------------+---------------------+-------------------+-------------------------+
| Anna Karenina |                   3 | Stockholm         |                       4 |
| Anna Karenina |                   3 | Russia            |                       3 |
| Anna Karenina |                   3 | Austin            |                       2 |
| John Doe      |                   2 | Seattle           |                       1 |
| John Doe      |                   2 | Stockholm         |                       1 |
| Mike Jones    |                   4 | Los Angeles       |                       4 |
| Mike Jones    |                   4 | Washington DC     |                       4 |
| Mike Jones    |                   4 | Portland          |                       4 |
| Mike Jones    |                   4 | Austin            |                       4 |
+---------------+---------------------+-------------------+-------------------------+

まとめ。

ネストされたJSONデータをBigQueryにインポートして、データを取得するようにしました。
また、FLATTEN関数、WITHIN関数を利用してシンプルに計算をすることができました。

繰り返しやネストされたJSONデータが扱えるようになったことで、データのや計算方法の幅が広くなったと思います。
しかしながら、こういったデータを扱うとすれば、BigQueryは、RDBとは別物であることを念頭にデータの持ち方を考える必要があるので、慎重な設計と十分にテストをする必要があるでしょう。
Query Reference(https://cloud.google.com/bigquery/query-reference
ほかにも、使えそうな関数がありますので、試してみたいところです。

Fluentdのように連続してデータをBigQueryにインポートする場合は、リトライ時に重複データが投入されることがあります。それを防ぐためにインポートするデータに insertId を付けることを推奨されています。fluent-plugin-bigqueryでも insertId がサポートされているので導入は難しくありません。(https://github.com/kaizenplatform/fluent-plugin-bigquery#specifying-insertid-property
ほかにも、いくつか制限がありますのでドキュメントを熟読すると良いでしょう。

Streaming Data Into BigQuery - Google BigQuery — Google Cloud Platform(https://cloud.google.com/bigquery/streaming-data-into-bigquery