VyOS+fluent-plugin-netflowでNetFlowを取得する

NetFlowまわりの諸々を触りたいとき,VyOSだとハードウェアを別途用意しなくても手軽に色々試せて便利.

VyOSの設定

PlixerManageEngineの記事を見つつ,下の設定さえあればとりあえず動く.Timeout値まわりは環境に合わせて要調整.バージョンはVyOS 1.0.4.

set system flow-accounting netflow version 5
set system flow-accounting interface eth0
set system flow-accounting netflow server 192.168.56.1 port 2055

# system flow-accounting netflow timeout以下は必要に応じて設定
set system flow-accounting netflow timeout expiry-interval 60

なお,BGPのOrigin ASやnext-hopも併せて取得したい場合,/opt/vyatta/bin/sudo-users/quagga_gen_as_network.plあたりを実行してやる必要があるように見える (未調査).

現在のフロー情報はshow flow-accountingで確認できる.

vyos@router1:~$ show flow-accounting
flow-accounting for [eth0]
Src Addr        Dst Addr        Sport Dport Proto    Packets      Bytes   Flows
172.16.0.2      10.1.1.100      0     0      icmp        106       8904       1

Total entries: 1
Total flows  : 1
Total pkts   : 106
Total bytes  : 8,904

NetFlowコレクタで確認

ここでは fluent-plugin-netflow をNetFlowコレクタとして使用する.ひとまず動作確認として,受け取ったNetFlowエントリをそのままstdoutに出力させる.

<source>
    type netflow
    tag netflow.event
    
    bind 0.0.0.0
    port 2055
</source>

<match netflow.*>
    type stdout
</match>

このconfigでFluentdを起動し,VyOS側でフローの収集対象にしたインタフェース(上の例ではeth0)に適当なトラフィックを流せば,fluentd側でフロー情報が収集できていることが確認できる.

collector$ fluentd
(snip)
2014-10-14 22:22:22 +0900 netflow.event: {"version":"5","flow_seq_num":"0","engine_type":"0","engine_id":"0","sampling_algorithm":"0","sampling_interval":"0","flow_records":"1","ipv4_src_addr":"172.16.0.2","ipv4_dst_addr":"10.1.1.100","ipv4_next_hop":"0.0.0.0","input_snmp":"2","output_snmp":"0","in_pkts":"92","in_bytes":"7728","first_switched":"2014-10-14T13:21:44.789Z","last_switched":"2014-10-14T13:22:02.789Z","l4_src_port":"0","l4_dst_port":"0","tcp_flags":"0","protocol":"1","src_tos":"0","src_as":"0","dst_as":"0","src_mask":"0","dst_mask":"0","host":"192.168.56.90"}

手軽に試せて便利なので,もう少し色々と触ってみたい.

Client Subnet in DNS Requests (draft-vandergaast-edns-client-subnet) を読んだ

先日googleとakamaiは仲良し?というエントリを読んだとき,恐らくEDNS Client Subnet Optionだろうとは思った*1ものの,EDNS Client Subnet Optionの動作をきちんと把握してはいなかったので,この機会にdraftを読んでみた.

概要

  • DNS権威サーバの中には,クエリ送信元のDNSキャッシュサーバのIPアドレスによって応答を変えるものが存在する
  • クライアントとキャッシュサーバのネットワーク的な距離が離れている場合,権威サーバは適切でない応答を返す恐れがある
  • キャッシュサーバから権威サーバへのクエリ内にクライアントのIPアドレスの一部を含めることで,権威サーバがより適切な応答を返すことを実現する
  • 利用にはキャッシュサーバと権威サーバの対応が必要 (クライアントの対応は不要)

問題: Public DNSと"不適切"な応答の発生

CDN事業者やWebサービス事業者が運用しているDNS権威サーバの中には,問い合わせ元のDNSキャッシュサーバのIPアドレスの応じて応答を変えるものが存在する.例えば,日本のISP Aのキャッシュサーバからの問い合わせに対しては日本に設置されたエッジサーバのIPアドレスを返せば,クライアントからエッジサーバへのレイテンシは抑えられると考えらえる.

ここで問題となるのは,クライアントとキャッシュサーバのネットワーク的な距離が離れている場合に,適切でない応答が発生することである.例えば,日本のISP A内のクライアントが台湾のキャッシュサーバを利用していた場合,権威サーバは台湾に設置されたエッジサーバのIPアドレスを返してしまう.これにより,クライアントにとってより適切なエッジサーバが存在するにも関わらず適切でないサーバに接続してしまい,結果としてパフォーマンス上の問題が発生する.

以前であればISPのキャッシュサーバを利用することが殆どであったため問題は発生しなかった*2.しかし,最近はGoogle Public DNSをはじめとした公開DNSキャッシュサーバの普及によって,クライアントとDNSキャッシュサーバの距離が離れているケースが生じるようになった。その結果、上述したような問題が顕著化するようになった.

解決策: クライアントのIPアドレスをクエリに埋め込む

この問題を解決するために考えらえたのが、キャッシュサーバから権威サーバへのクエリ内にクライアントのIPアドレスを埋め込むことで,権威サーバが適切な応答を返せるようにする,というもの.

EDNS Client Subnet Optionに対応したキャッシュサーバは,クライアントのIPアドレスの一部 (ADDRESS) とプレフィクス長 (SOURCE NETMASK; 標準ではIPv4で/24*3 ) をOPT RRに埋め込んで権威サーバに送信する.

f:id:yunazuno:20140817180611p:plain

これを受け取ったEDNS Client Subnet Option対応権威サーバは,クライアントのIPアドレスに応じた応答を返す.このとき,適切な応答を生成するのに必要だったプレフィクス長 (SCOPE NETMASK) を含める.たとえば,キャッシュサーバからSOURCE NETMASKが24のIPアドレスが与えられたとして,権威サーバは/22の情報しか使用しなかった場合,SCOPE NETMASKは22となる.

f:id:yunazuno:20140817180615p:plain

キャッシュサーバはクライアントに対して応答を返すと共に,権威サーバからの応答を自身のキャッシュに格納する.このときのキーは (FAMILY, ADDRESS, SCOPE NETMASK) の組となる.

ちなみに,EDNS Client Subnet Optionに対応していない権威サーバにオプション付きのクエリを投げたとしても,単純に未対応のオプションとして無視されるだけである.

実際に試してみる

Client Subnet Option付きクエリを投げる機能を追加するpatchを当てたdigで実験.ns1-4.google.comはClient Subnet Option対応権威サーバとして謳われているので,今回はこれを使用.

まずは適当な日本国内のIPアドレス帯でクエリを投げる.

$ dig @ns1.google.com. www.google.com. +norecurse +client=122.102.128.0/24

; <<>> DiG 9.9.3 <<>> @ns1.google.com. www.google.com. +norecurse +client=122.102.128.0/24
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 42644
;; flags: qr aa; QUERY: 1, ANSWER: 5, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 512
; CLIENT-SUBNET: 122.102.128.0/24/21
;; QUESTION SECTION:
;www.google.com.                        IN      A

;; ANSWER SECTION:
www.google.com.         300     IN      A       173.194.126.209
www.google.com.         300     IN      A       173.194.126.210
www.google.com.         300     IN      A       173.194.126.211
www.google.com.         300     IN      A       173.194.126.212
www.google.com.         300     IN      A       173.194.126.208

;; Query time: 44 msec
;; SERVER: 216.239.32.10#53(216.239.32.10)
;; WHEN: Sun Aug 17 15:46:53 JST 2014
;; MSG SIZE  rcvd: 134

続いてシンガポールの適当なIPアドレス帯で同様に.

$ ./bin/dig/dig @ns1.google.com. www.google.com. +norecurse +client=118.189.0.0/24

; <<>> DiG 9.9.3 <<>> @ns1.google.com. www.google.com. +norecurse +client=118.189.0.0/24
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 57557
;; flags: qr aa; QUERY: 1, ANSWER: 6, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 512
; CLIENT-SUBNET: 118.189.0.0/24/20
;; QUESTION SECTION:
;www.google.com.                        IN      A

;; ANSWER SECTION:
www.google.com.         300     IN      A       74.125.130.104
www.google.com.         300     IN      A       74.125.130.105
www.google.com.         300     IN      A       74.125.130.147
www.google.com.         300     IN      A       74.125.130.103
www.google.com.         300     IN      A       74.125.130.106
www.google.com.         300     IN      A       74.125.130.99

;; Query time: 52 msec
;; SERVER: 216.239.32.10#53(216.239.32.10)
;; WHEN: Sun Aug 17 15:48:30 JST 2014
;; MSG SIZE  rcvd: 150

最後にイギリスの適当なIPアドレス帯で.

$ ./bin/dig/dig @ns1.google.com. www.google.com. +norecurse +client=212.140.0.0/16

; <<>> DiG 9.9.3 <<>> @ns1.google.com. www.google.com. +norecurse +client=212.140.0.0/16
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 47653
;; flags: qr aa; QUERY: 1, ANSWER: 5, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 512
; CLIENT-SUBNET: 212.140.0.0/16/18
;; QUESTION SECTION:
;www.google.com.                        IN      A

;; ANSWER SECTION:
www.google.com.         300     IN      A       74.125.230.112
www.google.com.         300     IN      A       74.125.230.115
www.google.com.         300     IN      A       74.125.230.114
www.google.com.         300     IN      A       74.125.230.113
www.google.com.         300     IN      A       74.125.230.116

;; Query time: 42 msec
;; SERVER: 216.239.32.10#53(216.239.32.10)
;; WHEN: Sun Aug 17 15:49:24 JST 2014
;; MSG SIZE  rcvd: 133

各クエリで得られたIPアドレス宛に東京からpingを打ってみたところ,下のような結果になった.

  • 173.194.126.209 (東京): 12ms
  • 74.125.130.104 (シンガポール): 80ms
  • 74.125.230.112 (イギリス): 230ms

東京-シンガポール,東京-イギリス間の一般的なRTTに近い値であることから,それぞれのクエリにおいてクライアントに近いサーバのアドレスが返されていると考えられる.

まとめ

EDNS Client Subnet Optionのdraftを読み,おおまかな動作の流れをまとめてみた.

現時点での対応状況として,キャッシュサーバ側ではGoogle Public DNSやOpenDNSが,権威サーバ側(コンテンツ提供側)ではAkamaiやCloudFrontが対応しているようである.逆に言うと,普通のキャッシュサーバや権威サーバが対応するメリットは無いので,特に気にする必要は無い.

ただし,Public DNSに限らずISPDNSキャッシュサーバが対応するとメリットが出るような場合もある.たとえば,この資料ではインドネシアのように国土が分散している場合におけるClient Subnetのメリットが述べれらている.また権威サーバ側でも,海外など広い範囲でサービスを提供していて,かつ海外にもPOP (Point of Presence) を設置しているような事業者の場合,対応することでクライアントのレイテンシ改善を実現できるかもしれない.

何にせよ,現状はパブリックなサーバ側実装が無い状態なので,まずはオープンかつ信頼できる実装が出てくると良いのかなぁ,と感じた*4

参考

*1:件のエントリにも同様の指摘のコメントが何件が付いている

*2:広いエリアを1箇所に設置したDNSキャッシュサーバでカバーするような運用を行っていたISPを除く

*3:ちなみにIPv6については未定(!)

*4:DNSサーバ自前実装は避けたいのが正直なところで、特にClient Subnetに関してはキャッシュ管理辛そう&セキュリティ的な地雷が埋まってそうなので尚更

grepcidr: IPアドレスをサブネットで一括grep

あるファイルに含まれるIPアドレスgrepしたいとき,/32なり/128なりのIPアドレスそのものではなく,サブネットでgrepできると便利な場面が多々ある.それらしき物が無いか探したところ,grepcidrというツールがあったのでメモ.

導入

単純にソースコードをダウンロードして make & make install するだけ.

$ wget http://www.pc-tools.net/files/unix/grepcidr-2.0.tar.gz
$ tar xvf grepcidr-2.0.tar.gz
$ cd grepcidr-2.0
$ make
# make install

使い方

たとえば下のようなファイルがあるとする.

$ cat ipaddr.txt
192.0.2.0/24
192.0.2.0/25
192.0.2.128/25
192.0.2.254/32

2001:db8::/32
2001:db8:a:1::/64
2001:db8:a:2::/64
2001:db8:a:1::1/128

これが色々なpatternでgrepできる.

$ grepcidr 192.0.2.0/24 ipaddr.txt
192.0.2.0/24
192.0.2.0/25
192.0.2.128/25
192.0.2.254/32
$ grepcidr 192.0.2.1/25 ipaddr.txt
192.0.2.0/24
192.0.2.0/25
$ grepcidr 192.0.2.128/25 ipaddr.txt
192.0.2.128/25
192.0.2.254/32
$ grepcidr 2001:db8:a::/48 ipaddr.txt
2001:db8:a:1::/64
2001:db8:a:2::/64
2001:db8:a:1::1/128

かなり便利.

lxmlのxpathで正規表現を使う方法,あるいはrrdtool-dumpのバージョン差異

PythonXMLパーサモジュールであるlxmlxpathにおいて,正規表現を使って検索をする方法のメモ.

いま,下のようなXMLがあるとする.

<rrd>
  <rra>
    <cf>AVERAGE</cf>
    <pdp_per_row>1</pdp_per_row>
  </rra>
  <rra>
    <cf> AVERAGE </cf>
    <pdp_per_row>5</pdp_per_row>
  </rra>
  <rra>
    <cf>MAX</cf>
    <pdp_per_row>1</pdp_per_row>
  </rra>
  <rra>
    <cf> MAX </cf>
    <pdp_per_row>5</pdp_per_row>
  </rra>
</rrd>

このとき,cfの中身が"AVERAGE"あるいは" AVERAGE "(前後にスペースあり)であるツリーのpdp_per_rowの値を取得したい場合,正規表現で検索すると簡単に扱うことができる.

pdp_per_row_values = list(
    map(
        lambda e: int(e.text),
        tree.xpath(
            './rra/cf[re:match(text(), "[ ]*AVERAGE[ ]*")]/parent::node()/pdp_per_row',
            namespaces={"re": "http://exslt.org/regular-expressions"})))
                                            
print(pdp_per_row_values) # [1, 5]

何故こういうことをやりたかったかというと,rrdtool dumpコマンドでRRDファイルをXMLでダンプしたときに出力されるXMLrrdtoolのバージョンによって異なるから.

rrdtool 1.3.8 ("AVERAGE"の前後にスペースあり):

$ rrdtool dump traffic.rrd | xpath '//rra/cf'
Found 5 nodes:
-- NODE --
<cf> AVERAGE </cf>-- NODE --
<cf> AVERAGE </cf>-- NODE --
<cf> AVERAGE </cf>-- NODE --
<cf> AVERAGE </cf>-- NODE --
<cf> AVERAGE </cf>

rrdtool 1.4.8 ("AVERAGE"の前後にスペースなし):

$ rrdtool dump traffic.rrd | xpath '//rra/cf'
Found 5 nodes:
-- NODE --
<cf>AVERAGE</cf>-- NODE --
<cf>AVERAGE</cf>-- NODE --
<cf>AVERAGE</cf>-- NODE --
<cf>AVERAGE</cf>-- NODE --
<cf>AVERAGE</cf>

rrdtool 1.4.8が入っている手元の環境からrrdtool 1.3.8が入っている別の環境にスクリプトを持っていったところ動かなくなったので,この罠に気付いた*1

参考

*1:どのバージョンから挙動が変わったのかまでは調べていない

dnspythonでDNSサーバのzone fileを扱う

BINDやNSDといったDNSサーバのゾーンファイルをpythonで扱うライブラリは幾つかある.その中でもdnspythonはある程度メンテされていて,かつ使いやすそう.

インストール

pipでインストール可能.ただしPython 2.xと3.xとでパッケージが分かれているので注意が必要.

Python 2.xの場合

$ pip install dnspython

Python 3.xの場合

$ pip install dnspython3

Zone fileの読み込み & レコードの参照

下のようなzone fileが/path/to/example.com.zoneに置いてある場合を考える.

$ORIGIN example.com.
$TTL 3600
@       IN  SOA ns1.example.com. root.example.com. (
                    2014060801  ; Serial
                    28800       ; Refresh
                    14400       ; Retry
                    3600000     ; Expire
                    3600        ; Minimum TTL
                )

        IN  NS  ns1.example.com.

ns1     IN  A   192.168.1.2

;;; Network
gateway     IN  A       192.168.1.1
            IN  AAAA    2001:db8::1

;;; Server
svr1        IN  A       192.168.1.101
svr2        IN  A       192.168.1.102
svr3        IN  A       192.168.2.201
wiki        IN  CNAME   svr3

これを読み込んだ上で,全てのレコードを表示する.

import dns
from dns import (zone, rdataclass, rdatatype)

zone_example = zone.from_file("/path/to/example.com.zone")

print("Origin: {}".format(zone_example.origin))

for name, node in zone_example.nodes.items():
    for rdataset in node.rdatasets:
        for rdata in rdataset:
            print("{}: {} {} {}".format(name,
                                        rdataclass.to_text(rdataset.rdclass),
                                        rdatatype.to_text(rdataset.rdtype),
                                        rdataset.ttl))

            rdata_property_names = set(dir(rdata)) - set(dir(dns.rdata.Rdata))

            for property_name in rdata_property_names:
                if not property_name.startswith("_"):
                    print("    {}: {}".format(property_name,
                                              getattr(rdata, property_name)))

この場合の出力はこのような感じ:

Origin: example.com.
@: IN SOA 3600
    mname: ns1
    minimum: 3600
    serial: 2014060800
    rname: root
    expire: 3600000
    refresh: 28800
    retry: 14400
@: IN NS 3600
    target: ns1
svr1: IN A 3600
    address: 192.168.1.101
svr2: IN A 3600
    address: 192.168.1.102
svr3: IN A 3600
    address: 192.168.2.201
wiki: IN CNAME 3600
    target: svr3
ns1: IN A 3600
    address: 192.168.1.2
gateway: IN A 3600
    address: 192.168.1.1
gateway: IN AAAA 3600
    address: 2001:db8::1

取得したいレコードのドメイン名が既知である場合,find_node()/find_rdataset()を使う.似た名前のメソッドfind_rrset()があるが,こちらは返ってくるオブジェクトにドメイン名の情報が含まれているかの違いがある.

node_gateway = zone_example.find_node("gateway")
rdataset_gateway_a = zone_example.find_rdataset("gateway", "A")
rrset_gateway_a = zone_example.find_rrset("gateway", "A")

print(node_gateway)
print(node_gateway.rdatasets)
print(rdataset_gateway_a.__repr__(),
      rdataset_gateway_a,
      hasattr(rdataset_gateway_a, "name"))
print(rrset_gateway_a.__repr__(),
      rrset_gateway_a,
      hasattr(rrset_gateway_a, "name"))

この場合の出力はこうなる:

<DNS node 140259498981032>
[<DNS IN A rdataset>, <DNS IN AAAA rdataset>]
<DNS IN A rdataset> 3600 IN A 192.168.1.1 False
<DNS gateway IN A RRset> gateway 3600 IN A 192.168.1.1 True

指定した名前のレコードが存在しない場合,KeyErrorが投げられる.Noneを返してほしい場合,get_node()/get_rdataset()/get_rrset()を使う. 読み込んだzone fileはレコードの新規追加や編集・削除もできる.このあたりは参考元のページにサンプルコード付きでかなり詳しく書かれているので割愛.

Zone fileの編集の半自動化に使ったり,テストのようなものを書くのに使えそう.

参考

ISC DHCP serverでIPアドレスのリース時に外部コマンドを実行する

ISC DHCP serverDHCPサーバを運用するとき,IPアドレスをリースするタイミングで任意のコマンドを実行したいことがある.例えば,クライアントのMACアドレスとリースしたIPアドレスのペアをデータベースに入れておきたい,といった場合.

こういった場合,on commit/on release/on expiryが使える.dhcpd.confに次のような設定を書いておくと,それぞれのタイミングで外部のコマンドが実行される.

on commit {
    execute("/path/to/script", "arg1", "arg2", "arg3");
}

on release {
    execute("/path/to/script", "arg1", "arg2", "arg3");
}

on expiry {
    execute("/path/to/script", "arg1", "arg2", "arg3");
}

各コマンドはそれぞれ次のタイミングで実行される:

  • on commit
    • サーバがクライアントにIPアドレスをリースしたタイミング
  • on release
    • クライアントがIPアドレスをリリースしたタイミング
  • on expiry
    • サーバがクライアントにリースしたIPアドレスのリース期限が切れたタイミング

実行する外部コマンドにIPアドレスMACアドレスを渡す

クライアントのMACアドレスおよびIPアドレスは,それぞれhardware変数およびleased-address変数に格納されている.ただしこれらの変数の値は整数で表現されているので,通常そのままでは扱い辛い.そこで文字列に変換した上で外部コマンドに渡す.

on commit {
    set clip = binary-to-ascii(10, 8, ".", leased-address);
    set clhw = concat (
        suffix (concat ("0", binary-to-ascii (16, 8, "", substring(hardware,1,1))),2), ":",
        suffix (concat ("0", binary-to-ascii (16, 8, "", substring(hardware,2,1))),2), ":",
        suffix (concat ("0", binary-to-ascii (16, 8, "", substring(hardware,3,1))),2), ":",
        suffix (concat ("0", binary-to-ascii (16, 8, "", substring(hardware,4,1))),2), ":",
        suffix (concat ("0", binary-to-ascii (16, 8, "", substring(hardware,5,1))),2), ":",
        suffix (concat ("0", binary-to-ascii (16, 8, "", substring(hardware,6,1))),2)
    );
    execute("/path/to/script", clip, clhw);
}

たまにMACアドレスの取得方法として次のような方法を紹介している記事がある.この方法だと"02:01:23:0a:bc:de"が"2:1:23:a:bc:de"と変換されてしまう.

set clhw = binary-to-ascii(16, 8, ":", substring(hardware, 1, 6));

外部コマンド側の実装

on commit時にIPアドレスMACアドレスのペアをRedisに突っ込み,on expiryで対応するエントリを削除するスクリプトだと,下のような感じになる.

#!/usr/bin/env python
"""Manage bindings between an IP address and a MAC address"""

import redis

import sys
import argparse

kvs = redis.Redis(host="127.0.0.1", port=6379, db=1)


def on_commit(ip_address, mac_address):
    """Insert a newly-leased IP address and its corresponding MAC address to the redis server"""
    kvs.set(ip_address, mac_address)

    return


def on_expiry(ip_address):
    """Remove the expired IP address from the redis server"""
    kvs.delete(ip_address)

    return


def main():
    """Parse command-line arguments and call a corresponding function"""
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers()

    subparser_commit = subparsers.add_parser("commit")
    subparser_commit.set_defaults(
        func=lambda command_args: on_commit(command_args.ip_address, command_args.mac_address))
    subparser_commit.add_argument("ip_address")
    subparser_commit.add_argument("mac_address")

    subparser_expiry = subparsers.add_parser("expiry")
    subparser_expiry.set_defaults(
        func=lambda command_args: on_expiry(command_args.ip_address))
    subparser_expiry.add_argument("ip_address")

    args = parser.parse_args()

    try:
        subcommand_func = args.func
    except AttributeError:
        parser.print_usage()
        sys.exit(2)

    subcommand_func(args)

if __name__ == '__main__':
    main()

以前,この仕組みを使ってCaptive Portalのようなものを書いて運用していた.WebページにアクセスしてきたクライアントのIPアドレスからMACアドレスが引けるようになるので,アカウント(人)とMACアドレス(デバイス)の紐付けが簡単に実現できる.

参考

過去に蓄積されたデータの集計にNorikraを使う

最近Norikraを触っていて,ある程度使い方が分かってきたのと,丁度v1.0.0もリリースされたので,忘れないうちにこのタイミングでメモしておく.

本来Norikraはリアルタイムなログ等に対して処理をするものであるけども,今回は過去に蓄積されたログに対して集計処理を行ってみることにする*1

今回のキーポイントは,

  • win:ext_timed_batchを使って,ログの発生時刻を指定する
  • LOOPBACKを使って,あるクエリで発生したeventを別のtargetに直接流し込む

の2点.

やりたいこと

下のような,2台のホストで1分毎に生成されたデータがあるとする.

# host1.csv
timestamp, metric1, metric2, ...
1396278000, 0, 0
1396278060, 1, 2
1396278120, 2, 4
1396278180, 3, 6
1396278240, 4, 8
1396278300, 5, 10
1396278360, 6, 12
1396278420, 7, 14
1396278480, 8, 16
1396278540, 9, 18
1396278600, 10, 20
# host2.csv
timestamp, metric1, metric2, ...
1396278000, 0, 0
1396278060, 2, 4
1396278120, 4, 8
1396278180, 6, 12
1396278240, 8, 16
1396278300, 10, 20
1396278360, 12, 24
1396278420, 14, 28
1396278480, 16, 32
1396278540, 18, 36
1396278600, 20, 40

これらのデータに対し,同一時刻に発生したメトリックどうしを加算したい.

timestamp, metric1_sum, metric2_sum, ...
1396278000, 0, 0
1396278060, 3, 6
1396278120, 6, 12
1396278180, 9, 18
1396278240, 12, 24
1396278300, 15, 30
1396278360, 18, 36
1396278420, 21, 42
1396278480, 24, 48
1396278540, 27, 54
1396278600, 30, 60

さらに,5分間の平均値と最大値を調べたい.

timestamp, metric1_avg, metric1_max
1396278000, 6.0, 12
1396278300, 21.0, 27

実際には,ホスト数もメトリックももっとたくさんある,という想定. こういった用途であれば,データベースに突っ込んで集計するなり,自前で集計スクリプトを書くなりすれば対応できるような気がする.ただ,集計対象のメトリックが頻繁に変更されるような状況で,かつデータベースや集計スクリプトのメンテナンスにあまり重きを置けない場合*2,そのあたりを簡単に扱えるものが欲しくなってくる.そこでNorikra,という流れ.

過去のログを扱う::ext_timed_batchでログの発生時刻を指定する

通常,ログがnorikraに入った時のシステムの実時間=ログの発生時刻として扱われる.それだと過去のデータを処理対象とする場合困るので,発生時刻を外部から与えてやる必要がある.そこで使うのがext_timed_batch

例えばこんな感じのクエリを登録する.

select
  min(timestamp) as timestamp,
  sum(metric1) as metric_sum
from
  host_data.win:ext_timed_batch(timestamp * 1000, 1 min, 1396278000000L)
where hostname in ("host1", "host2")

このクエリを登録した状態で,ターゲットhost_data

[{"timestamp": 1396278000, "hostname": "host1", "metric1": 0}]

というようなデータを送ると,2014-04-01 00:00(JST) (=unix epochで1396278000)に発生したログとして扱ってもらえる.ext_timed_batchの第3引数はtime windowの開始点を与える.設定しない場合,最初のイベントが発生した時刻を基準にbatchが実行されるようになる.外部から与えるtimestampのタイムゾーム周りがややこしい場合,明示的に指定しておいたほうが無難に思える.

注意点として,流し込むデータは時間順にソートされている必要がある.ext_timed_batchに限らず*_batchでは,あるtime windowの境界を跨ぐデータが到着した時点で,前のtime windowに対するeventが発行される.なので,データが時間順にソートされていないと,正しい結果が得られない.

複数のクエリで処理する::LOOPBACK()でeventを別のtargetに送る

あるクエリを実行した後,その結果得られたeventに対して更にクエリを投げたい場合,以前であればeventをfetchした上で対象targetに送り直す必要があった*3.Norikra v1.0.0からLOOPBACK()が導入され,この処理が自動化された.

使い方は簡単で,クエリ登録時のGroupをLOOPBACK(target名)としてやればよい. 例えば,上で紹介したクエリのGroupをLOOPBACK(metric_aggregated)とした上で,下のクエリを登録する.

select
  min(timestamp) as timestamp,
  avg(metric_sum) as metric_avg,
  max(metric_sum) as matric_max
from
  metric_aggregated.win:ext_timed_batch(timestamp * 1000, 10 min, 1396278000000L)

この状態でターゲットhost_dataにeventを送ると,最初のクエリを実行した上でその結果がターゲットmetric_aggregatedに送られ,2番目のクエリが実行される. あとはこのクエリのeventをfetchすれば,当初のお目当てのデータが得られる.

まとめ

今回は,過去に蓄積されたログデータの集計にNorikraを使ってみた.本来想定されている使い方ではない上に,ext_timed_batch使うのはオススメしないと@tagomorisさんが言っていたりするので,気付いていない落とし穴があるのかもしれない.

ただ,データ集計の条件が簡単に書ける&データストア類が必須でないという点は,コードを書いたりデータベースを運用することが日常的ではない場所で使う上で,意外とメリットになりそうな気はする.

サンプルコード

gist466004c500c6576c3644

*1:このスライドの表で言うところのschema-less dataに対するretrospection.

*2:具体的にはopsな現場

*3:Fluentdで自動化できるとはいえ,面倒臭い