ZEALS TECH BLOG

チャットボットでネットにおもてなし革命を起こす、チャットコマース『Zeals』を開発する株式会社ZEALSのテックブログです。技術やエンジニア文化について情報を発信します。

行動ログをRailsからBigQueryに流す仕組みの導入

f:id:zeals-engineer:20190820120026p:plain

はじめに

こんにちは、分析基盤を担当している鍵本です。
本日は DB に保存されている行動ログを BigQuery に流すように修正 したお話をしようと思います。

背景

これまでチャットボットに流入させる「モーダルを開いた」とか「クリックした」といった行動ログは 直接DB に保存されていました。

レコード数が少ないサービス開始直後はそれでもよかったのですが、だんだんお客様の数が増えてくると、それに応じてモーダルにアクセスしてくださるユーザーさんの数も増え、DB への書き込み頻度が高まり、次第にアプリへの負荷が高くなってきました。
このままサービスを続けていくと、いずれDBへの書き込み処理が足枷になってしまうだろうということで、 このログを DB に書き込むのではなく、ログデータとして外部出力 することにしました。

非RDB化の方針

ZEALS では GKE でアプリケーションサーバを運用しているので、Rails アプリでは単に必要なデータを標準出力するだけとなります。
標準出力されたログは、同一クラスタ内で稼働している fluentd によって Stackdriver Logging に送られますので、それを Cloud Storage を経由させて BigQuery にロードするようにしました。

流れ図は以下の通りです。

f:id:zeals-engineer:20190819194127p:plain (ログ出力構成図)

Stackdriver Logging から直接 BigQuery にインポートすることはできますが、データとスキーマとの間の不一致によるインポートエラーが発生した場合に再取り込みが面倒なこと、リアルタイム性がそれほど必要ではなかったことから Cloud Storage 経由にしています。

なお Stackdriver Logging から Cloud Storage へのエクスポートは一時間に一回行われます。
したがって、BigQuery では約一時間遅れでデータの閲覧が可能となります。

Rails 側の設定

Rails 側では次の3つの対応をしました。

  • logger の再定義
  • 出力データ用の Struct の定義
  • 上記 logger による出力処理の追加

logger の再定義

Simma さんの GKE上RailsのアプリケーションログをStackdriver Loggingで運用する方法 で紹介されている Custom log を参考にさせていただきました。
修正した点は progname が定義されてなかった場合に rails_app を代入するという点のみです。

当初は Rails.logger を上書きして利用することを想定していたため、production.log が progname = rails_app で送られるようにして、行動ログと分離できるようにしたかったからです。

出力データ用の Struct の定義

ログ出力部分に勝手に項目を追加されないように、 出力データを構造体で縛る ことにしました。
以下のようなものを lib 以下に配置しています。

FooStruct = Struct.new(:foo, :bar, :baz) do
  def to_emptify
    to_h.map { |k, v| [k.to_s, v.to_s] }.to_h
  end
end

null を空文字にするためのメソッドを内部で定義しています。
ZEALS ではまだ ruby-2.5 系なので上ような書き方をしていますが、2.6 以降では前後の to_h が不要となります。

出力処理の追加

実際にDBに書き出している部分にログ出力を追加します。

JsonStdOutLogger.instance.logger.info(:foo_log) do
    FooStruct.new(
        foo_val,
        bar_val,
        baz_val
    ).to_emptify
end

上記のように Rails.logger を上書きするのではなく、JsonStdOutLogger.instance.logger を直接呼ぶようにしています。
なので「当初は…」なんて言い方を先ほどいたしました。

Rails.logger = JsonStdOutLogger.instance.logger

を記述して上書きしたところ ActiveSupport::TaggedLogger が継承されてしまいました。
その結果 ActiveSupport::TaggedLogger::Formatter#call が先に呼ばれ、その内部で Hash データが文字列化され想定通りの出力にならなかったため JsonStdOutLogger.instance.logger を直接呼ぶ方法を採用しました。

Stackdriver Logging のシンク設定

高度なフィルタに変更し、以下を設定します。

resource.type="container"
resource.labels.cluster_name="クラスタ名"
resource.labels.namespace_id="ネームスペース名"
logName="projects/プロジェクト名/logs/サービス名"
jsonPayload.progname="foo_log"

これによって抽出されたログに対してシンクの設定をします。

冒頭でご説明しましたように、シンクサービスには Cloud Storage を選択します。
同時に必要ならばバケットも作成しますが、その際の注意点は以下の二点です。

  • アクセス制御モデルとして オブジェクトレベルを選択 する
  • ストレージの場所はアプリケーション、BigQuery とで 全て統一 する

BigQuery でのテーブル作成

Stackdriver Logging のログには timestamp が存在するので、これをログが生成された日時として BigQuery にも取り込むことにしました。

また、この値を用いてテーブルパーティションすることにしました。
これは常に全件検索をしていると、有名な BigQuery で破産しました みたいなことになってしまうからです。

qiita.com

テーブルは以下のようなクエリで生成しています。

CREATE TABLE app_logs.foo_logs (
  foo int64,
  bar string,
  baz string,
  timestamp timestamp
) PARTITION BY DATE(timestamp);

Cloud Functions の設定

以下のような Python スクリプトを作成して、google.storage.object.finalize をトリガーとして実行されるようにデプロイします。

import os
import sys
import json

import gcsfs
from google.cloud import bigquery

MAX_REC_LENGTH = 1000
bigquery_client = bigquery.Client()

def bq_insert(bq_client, table, data):
    errors = bq_client.insert_rows(table, data)
    if errors != []:
        print("BQインポートエラー({})".format(errors))
        sys.exit(1)
    return len(data)

def gcs_placement_handler(data, context):
    gcs_fs = gcsfs.GCSFileSystem(project = os.environ['PROJECT_ID'])
    table_ref = bigquery_client.dataset(os.environ['DATASET_ID']) \
                               .table(os.environ['TABLE_ID'])
    table = bigquery_client.get_table(table_ref)

    total_length = 0
    rows_to_insert = []
    with gcs_fs.open('/'.join([data['bucket'], data['name']]), 'r') as fd:
        for line in fd:
            line_dict = json.loads(line.rstrip('\n'))
            payload = line_dict['jsonPayload']
            # foo が空の場合に NULL に変換する
            if payload['foo'] == '':
                payload['foo'] = None
            payload.update({'timestamp': line_dict['timestamp']})
            rows_to_insert.append(payload)

            # rows_to_insert のサイズが MAX_REC_LENGTH に達したら BigQuery にインサートする
            if len(rows_to_insert) >= MAX_REC_LENGTH:
                total_length += bq_insert(bigquery_client, table, rows_to_insert)
                rows_to_insert = []

    # バッファリングした残りを BigQuery にインサートする
    if len(rows_to_insert) > 0:
        total_length += bq_insert(bigquery_client, table, rows_to_insert)

    return 'Successfully insert {} records.'.format(total_length)

Rails から出力されたログが jsonPayload に入っており、BigQuery に取り込みたいものがjsonPayload の中身と timestamp の値なので、それらをマージしてインポートするのが上記スクリプトの役割です。

また整数値 foo が null となる可能性があるのですが、Rails 側から空文字で送るようにしているので、BigQuery にインポートできるよう null に設定する処理も入れています。
(ここは Rails から直接 null で出力しても良かったのかもしれません)

また一時間に一回とはいえ、レコード数が多かった場合にメモリが足りなくなる可能性もあるので、1000件ずつインポートするようにしています。

まとめ

今回は Rails アプリから行動ログを BigQuery に流し込む仕組みについてご紹介しました。
フルマネージドな環境を用いているので、とても簡単に導入できました。

まだ分析基盤と呼べるようなものを構築できたわけではありませんので、このような仕組みを一つ一つ広げていって、ログを収集し分析できる環境を引き続き整えていくつもりです。
ご清覧いただきありがとうございました。

hrmos.co