過去に蓄積されたデータの集計にNorikraを使う

最近Norikraを触っていて,ある程度使い方が分かってきたのと,丁度v1.0.0もリリースされたので,忘れないうちにこのタイミングでメモしておく.

本来Norikraはリアルタイムなログ等に対して処理をするものであるけども,今回は過去に蓄積されたログに対して集計処理を行ってみることにする*1

今回のキーポイントは,

  • win:ext_timed_batchを使って,ログの発生時刻を指定する
  • LOOPBACKを使って,あるクエリで発生したeventを別のtargetに直接流し込む

の2点.

やりたいこと

下のような,2台のホストで1分毎に生成されたデータがあるとする.

# host1.csv
timestamp, metric1, metric2, ...
1396278000, 0, 0
1396278060, 1, 2
1396278120, 2, 4
1396278180, 3, 6
1396278240, 4, 8
1396278300, 5, 10
1396278360, 6, 12
1396278420, 7, 14
1396278480, 8, 16
1396278540, 9, 18
1396278600, 10, 20
# host2.csv
timestamp, metric1, metric2, ...
1396278000, 0, 0
1396278060, 2, 4
1396278120, 4, 8
1396278180, 6, 12
1396278240, 8, 16
1396278300, 10, 20
1396278360, 12, 24
1396278420, 14, 28
1396278480, 16, 32
1396278540, 18, 36
1396278600, 20, 40

これらのデータに対し,同一時刻に発生したメトリックどうしを加算したい.

timestamp, metric1_sum, metric2_sum, ...
1396278000, 0, 0
1396278060, 3, 6
1396278120, 6, 12
1396278180, 9, 18
1396278240, 12, 24
1396278300, 15, 30
1396278360, 18, 36
1396278420, 21, 42
1396278480, 24, 48
1396278540, 27, 54
1396278600, 30, 60

さらに,5分間の平均値と最大値を調べたい.

timestamp, metric1_avg, metric1_max
1396278000, 6.0, 12
1396278300, 21.0, 27

実際には,ホスト数もメトリックももっとたくさんある,という想定. こういった用途であれば,データベースに突っ込んで集計するなり,自前で集計スクリプトを書くなりすれば対応できるような気がする.ただ,集計対象のメトリックが頻繁に変更されるような状況で,かつデータベースや集計スクリプトのメンテナンスにあまり重きを置けない場合*2,そのあたりを簡単に扱えるものが欲しくなってくる.そこでNorikra,という流れ.

過去のログを扱う::ext_timed_batchでログの発生時刻を指定する

通常,ログがnorikraに入った時のシステムの実時間=ログの発生時刻として扱われる.それだと過去のデータを処理対象とする場合困るので,発生時刻を外部から与えてやる必要がある.そこで使うのがext_timed_batch

例えばこんな感じのクエリを登録する.

select
  min(timestamp) as timestamp,
  sum(metric1) as metric_sum
from
  host_data.win:ext_timed_batch(timestamp * 1000, 1 min, 1396278000000L)
where hostname in ("host1", "host2")

このクエリを登録した状態で,ターゲットhost_data

[{"timestamp": 1396278000, "hostname": "host1", "metric1": 0}]

というようなデータを送ると,2014-04-01 00:00(JST) (=unix epochで1396278000)に発生したログとして扱ってもらえる.ext_timed_batchの第3引数はtime windowの開始点を与える.設定しない場合,最初のイベントが発生した時刻を基準にbatchが実行されるようになる.外部から与えるtimestampのタイムゾーム周りがややこしい場合,明示的に指定しておいたほうが無難に思える.

注意点として,流し込むデータは時間順にソートされている必要がある.ext_timed_batchに限らず*_batchでは,あるtime windowの境界を跨ぐデータが到着した時点で,前のtime windowに対するeventが発行される.なので,データが時間順にソートされていないと,正しい結果が得られない.

複数のクエリで処理する::LOOPBACK()でeventを別のtargetに送る

あるクエリを実行した後,その結果得られたeventに対して更にクエリを投げたい場合,以前であればeventをfetchした上で対象targetに送り直す必要があった*3.Norikra v1.0.0からLOOPBACK()が導入され,この処理が自動化された.

使い方は簡単で,クエリ登録時のGroupをLOOPBACK(target名)としてやればよい. 例えば,上で紹介したクエリのGroupをLOOPBACK(metric_aggregated)とした上で,下のクエリを登録する.

select
  min(timestamp) as timestamp,
  avg(metric_sum) as metric_avg,
  max(metric_sum) as matric_max
from
  metric_aggregated.win:ext_timed_batch(timestamp * 1000, 10 min, 1396278000000L)

この状態でターゲットhost_dataにeventを送ると,最初のクエリを実行した上でその結果がターゲットmetric_aggregatedに送られ,2番目のクエリが実行される. あとはこのクエリのeventをfetchすれば,当初のお目当てのデータが得られる.

まとめ

今回は,過去に蓄積されたログデータの集計にNorikraを使ってみた.本来想定されている使い方ではない上に,ext_timed_batch使うのはオススメしないと@tagomorisさんが言っていたりするので,気付いていない落とし穴があるのかもしれない.

ただ,データ集計の条件が簡単に書ける&データストア類が必須でないという点は,コードを書いたりデータベースを運用することが日常的ではない場所で使う上で,意外とメリットになりそうな気はする.

サンプルコード

gist466004c500c6576c3644

*1:このスライドの表で言うところのschema-less dataに対するretrospection.

*2:具体的にはopsな現場

*3:Fluentdで自動化できるとはいえ,面倒臭い