FalconとFluentdを用いたEvent Webhookロガーの作り方

FalconとFluentdを用いたEvent Webhook ロガー模式図

この記事は Falcon, Fluentd, and Event Webhook の抄訳です。

FalconというWEBフレームワークをご存じですか?使いやすさとHTTP APIのみにフォーカスしたシンプルさが魅力のPythonのWEBフレームワークです。

今回は、このFalconとログ収集ツールであるFluentdを使い、SendGridからのEvent Webhookをロギングする方法を紹介したいと思います。

前提条件

事前に以下がインストールされていることが前提となります。

また、SendGridアカウントも取得しておいてください。

環境構築

まずは、Fluentdをインストールしましょう。FluentdのQuickStartにはいくつかの方法が記載されていますが、ここではruby gemを使います。

$ gem install fluentd --no-ri --no-rdoc

次にフォルダを作成し、その下にvirtualenvを用いてPython実行環境を作成します。

$ mkdir falcon_sample
$ cd falcon_sample
$ touch server.py
$ virtualenv venv
$ source venv/bin/activate
$ pip install cython falcon fluent-logger gunicorn
$ fluentd --setup ./fluent

最後のコマンドで、Fluentdを実行するための設定ファイルのひな形が作られます。生成されたfluent.confに以下の設定を追記してください。

<match events.log.**>
  type stdout
</match>

この設定により、受信したログイベントのうち、タグ文字列が”event.log”で始まる全てのイベントが標準出力(STDOUT)に出力されるようになります。出力先は複数指定することもできます。

以下のコマンドを実行して、Fluentdデーモンを起動してください。

$ fluentd -c ./fluent/fluent.conf -vv

server.py の実装

それでは、server.py にコード(主にFalconに関するコードになります)を書いていきましょう。

次の2行だけで最もシンプルなFalconのAPI Serverが作れます。

import falcon
app = falcon.API()

これだけでも動きますが「何もしない」サーバとなります。これをベースに、API Serverに「フック」と「Resource」を追加して実装を進めていきます。

フックの追加

Falconには「フック(Hook)」と呼ばれる概念があります。これは、HTTPリクエストを後述のResourceを用いて処理する「前」に実行される処理のことを指します。今回は、リクエストのコンテンツタイプがapplication/jsonかどうかを判定する、シンプルなフックを作ってみましょう。


# -*- coding: utf-8 -*-
import falcon

def check_content_type(req, resp, params):
    &amp;quot;&amp;quot;&amp;quot;リクエストが適切にエンコードされているかをチェックする&amp;quot;&amp;quot;&amp;quot;
    if 'application/json' not in req.content_type:
        # Falconでは便利な例外クラスがたくさん用意されています
        raise falcon.HTTPUnsupportedMediaType('データは application/jsonではありません', href='https://sendgrid.com/docs/API_Reference/Webhooks/event.html')

app = falcon.API(before=[check_content_type]) # “before”では、どのハンドラよりも前に実行するhookを指定できます

フックの実体は3つの引数(request, response, params)をとる関数です。この関数を、falcon.APIコンストラクタのbeforeパラメータに指定します。コンテンツタイプがapplication/jsonでない場合は、HTTPUnsupportedMediaType例外を発生させています。この例外が発生すると、Falconは応答コード415を呼び出し元に返します。

Resourceの追加

Resourceを作る前にまず、ログを送信するためのFluentdクライアントの初期化を行いましょう。以下のコードを追加してください。この設定でlocalhostのポート24224にログが送信されるようになります。

from fluent import sender as fluent_sender
from fluent import event as fluent_event

fluent_sender.setup('events.log', host='localhost', port=24224)

それでは、Event Webhookを処理するResourceクラスを作成しましょう。Resourceクラスは、受信したHTTPリクエストの処理を行うための、特別な名前のメソッドを持ったクラスです。「on_」という名前のメソッドで、各HTTP verbs(GET, POSTなど)に対応したリクエストの処理方法を規定します。今回は、HTTP POSTを処理するon_postメソッドを実装します。

class EventResource:
    def on_post(self, req, resp):
        &amp;quot;&amp;quot;&amp;quot;sendgridからのリクエストを処理する&amp;quot;&amp;quot;&amp;quot;
        payload = json.loads(req.stream.read().decode('utf-8')) # RequestsはJSONのペイロードを持つ
        for event in payload: # 複数のイベントが到来
            fluent_event.Event(event['event'], event) # イベントをそのtypeによりタグづけ
        resp.status = falcon.HTTP_204 # SendGridに204を返す

event_resource = EventResource()
app.add_route('/event', event_resource) # /eventへのアクセスをEventResourceにルーティング

on_postでは、POSTされたJSON文字列をデコードしてオブジェクトに変換したのち、含まれるイベントを一つ一つfluent_event.Event関数を用いてFluentdに送信しています。また、fluent.APIクラスのadd_route関数を使って、”/event”へのアクセスを今回作成したResourceクラスにルーティングするよう設定を行っています。

最終的なserver.pyの中身は以下のようになります。


# -*- coding: utf-8 -*-
import json
import falcon
import logging
from fluent import sender as fluent_sender
from fluent import event as fluent_event

fluent_sender.setup('events.log', host='localhost', port=24224)

def check_content_type(req, resp, params):
    &amp;quot;&amp;quot;&amp;quot;リクエストが適切にエンコードされているかをチェックする&amp;quot;&amp;quot;&amp;quot;
    if 'application/json' not in req.content_type:
        raise falcon.HTTPUnsupportedMediaType('データがapplication/json形式ではありません', href='https://sendgrid.com/docs/API_Reference/Webhooks/event.html')

class EventResource:
    def on_post(self, req, resp):
        &amp;quot;&amp;quot;&amp;quot;sendgridからのリクエストを処理する&amp;quot;&amp;quot;&amp;quot;
        payload = json.loads(req.stream.read().decode('utf-8')) # RequestsはJSONのペイロードを持つ
        for event in payload: # 複数のイベントが到来
            fluent_event.Event(event['event'], event) # イベントをそのtypeによりタグづけ
        resp.status = falcon.HTTP_204 # SendGridに204を返す

app = falcon.API(before=[check_content_type])
event_resource = EventResource()
app.add_route('/event', event_resource) # /eventへのアクセスをEventResourceにルーティング

実行例

それでは実行してみましょう。以下のコマンドでserver.pyを起動します。

$ gunicorn server:app
[2015-05-26 20:17:17 +0000] [2365] [INFO] Starting gunicorn 19.3.0
[2015-05-26 20:17:17 +0000] [2365] [INFO] Listening at: http://127.0.0.1:8000 (2365)
[2015-05-26 20:17:17 +0000] [2365] [INFO] Using worker: sync
[2015-05-26 20:17:17 +0000] [2370] [INFO] Booting worker with pid: 2370
[2015-05-26 20:17:56 +0000] [2365] [INFO] Handling signal: winch

SendGridのEvent Webhookをローカル環境に転送するために、ngrokを使ってトンネルを作ります。gunicornで起動したサーバはポート8000で待ち受けを行いますので、localhost:8000へトンネルさせます。

$ ngrok http 8000
Tunnel Status                 online
Version                       2.0.17/2.0.17
Web Interface                 http://127.0.0.1:4040
Forwarding                    http://cf8ab791.ngrok.io -&amp;gt; localhost:8000
Forwarding                    https://cf8ab791.ngrok.io -&amp;gt; localhost:8000

表示されたngrok URLに”/event” を付加したURLをEvent Webhookの送信先とします。Notification AppのPOST先URLに設定してください。

Webhook送信テスト画面

Event Webhookをテスト実行すると、Fluentdの標準出力にEventの内容が表示されます。

2015-05-27 09:43:04 +0900 events.log.processed: {&amp;quot;category&amp;quot;:[&amp;quot;category1&amp;quot;,&amp;quot;category2&amp;quot;,&amp;quot;category3&amp;quot;],&amp;quot;smtp-id&amp;quot;:&amp;quot;&amp;lt;142d9f3f351.7618.254f56@sendgrid.com&amp;gt;&amp;quot;,&amp;quot;sg_event_id&amp;quot;:&amp;quot;VzcPxPv7SdWvUugt-xKymw&amp;quot;,&amp;quot;uid&amp;quot;:&amp;quot;123456&amp;quot;,&amp;quot;purchase&amp;quot;:&amp;quot;PO1452297845&amp;quot;,&amp;quot;timestamp&amp;quot;:1386636112,&amp;quot;email&amp;quot;:&amp;quot;john.doe@sendgrid.com&amp;quot;,&amp;quot;sg_message_id&amp;quot;:&amp;quot;142d9f3f351.7618.254f56.filter-147.22649.52A663508.0&amp;quot;,&amp;quot;id&amp;quot;:&amp;quot;001&amp;quot;,&amp;quot;event&amp;quot;:&amp;quot;processed&amp;quot;}
2015-05-27 09:43:04 +0900 events.log.dropped: {&amp;quot;category&amp;quot;:[&amp;quot;category1&amp;quot;,&amp;quot;category2&amp;quot;,&amp;quot;category3&amp;quot;],&amp;quot;smtp-id&amp;quot;:&amp;quot;&amp;lt;4FB29F5D.5080404@sendgrid.com&amp;gt;&amp;quot;,&amp;quot;uid&amp;quot;:&amp;quot;123456&amp;quot;,&amp;quot;purchase&amp;quot;:&amp;quot;PO1452297845&amp;quot;,&amp;quot;timestamp&amp;quot;:1386636115,&amp;quot;email&amp;quot;:&amp;quot;not an email address&amp;quot;,&amp;quot;reason&amp;quot;:&amp;quot;Invalid&amp;quot;,&amp;quot;id&amp;quot;:&amp;quot;001&amp;quot;,&amp;quot;event&amp;quot;:&amp;quot;dropped&amp;quot;}
… 略 ...

さいごに

いかがでしたか?今回はログを標準出力に出しただけですが、fluentdの設定次第でファイルやデータベースへの保存のほか、外部のモニタリングツールや検索ツールなどと連携させることも可能です。みなさんの工夫次第で多くの応用が考えられますので、是非お試しください!

参考記事