大量イベントを記録するWeb APIの設計から実装まで


Loading...

KADOKAWA Connected / ドワンゴの @saka1です。

私の部署ではデータ分析に関する様々なシステムを開発していますが、最近とあるデータ入力APIシステムの開発を行いました。記事執筆時点ではそのシステムへの移行が段階的に行われている最中です。

行ったことはWeb APIを提供するシステムを開発しただけではあるのですが、

  • データ基盤を保守するチームが直接Web APIを開発するのは珍しいかもしれないと思いました
  • 与えられた技術的制約から、シンプルなシステム構成を少し曲げて、いくつか追加の工夫をしなければなりませんでした

この辺りが面白いかなと思って記事を書きました。以降では、データ組織が開発するWeb APIの事例として、今回取り組んだこと・悩んだこと・学んだことを紹介していきます。

全体の考え方から個別の実装まで、様々なトピックに触れるのでやや長い記事になっています。暇な時にでもお読みください。

※ なお、諸般の事情で具体的な値を曖昧にしたまま解説している場所がありますのでご了承ください。

データ入力のためのWeb API

データのないデータ基盤はただの箱です。データ基盤を活用するには、データソースからデータ基盤へデータを送る仕組みも必要になります。一般に、データ基盤にデータを送るためのAPIのことをデータ入力API(Data Ingestion API)などと呼ぶこともあるようです。

今回取り上げるのは、iOSアプリやWebブラウザなどのクライアント上で発生したイベントをサーバサイドで受け取り記録するAPIです。Web APIとして見ると非常に単純で、特定形式のJSONをbodyにとるHTTP POSTをひたすら受け付けるだけの仕組みです1

このAPIは古くから存在しており長年利用されてきたのですが、中身のシステムの老朽化が進んでいました。そこで、API仕様を保ったまま後継システムをAWS上に構築しようとなったのでした。

シンプルなWeb APIの開発は簡単でしょ?

Web API開発は世間的にはたくさんノウハウがあるので、技術的には簡単……と言いたいところですが、新システムの開発にはいくつか考慮が必要な条件・制約がありました。

  • リクエスト数が膨大
    • 月間で数十億リクエストを処理しなければならなさそうでした
    • 下手なWebサービスよりもアクセス量が多そうです
  • 低コストが強く求められていた
    • 予算上の都合等で、どうしてもシステムのランニングコストを低く抑える必要がりました
    • もちろん低コストが良いのは当たり前の話ではあるのですが、このプロジェクトでは特に費用面に気を使ったという意味です

要するに、単純ながら大量のアクセスをさばくAPIをできるだけ安く開発するのが重要課題になっていました。この要件は、現在も頭を悩ませている問題ではある一方、様々な設計上の意思決定において指針になりました。

制約に合う技術スタックの検討

システム構成

AWS上でWeb APIを開発する場合、様々な構成パターンがあります。今回はシンプルなALB + ECSのパターンを採用することにしました。HTTPでのアクセスを受け付けるサーバアプリを書き、受け取ったイベントメッセージをfluent-bit経由でAmazon Kinesis Data Streams(これは既に構成済みのものがあったので繋ぎこみました)に送信する構成です。

イベント入力APIのシステム構成(図では省略していますが3AZ構成を採用しています)

他の選択肢としてサーバレス(API Gateway + Lambda等)にする事も少しだけ検討しましたが、リクエスト数での課金は今回の用途と相性が悪く、大幅にコストがかかりそうなため選択肢から外しました。

移行元システムには、大量アクセスからの保護機構として、単一IPアドレスからの大量アクセスに対して429 Too Many Requestsを返す機能がありました。それをマネージドサービスによって移植しようと、AWS WAFを噛ませています。ここは実は費用面での問題が発生することになり厄介でした(後述)。

プログラミング言語の選定

開発にはGo言語を採用することにしました。部署でよく使われているのはPython、Scalaですが、ここでは新言語を導入するという選択をしました。

  • Pythonは性能面でやや懸念がありました
    • もっとも、FastAPI等のモダンなフレームワーク上でちゃんと書けば何も問題なかった可能性はあります
  • JVMは起動が重く、JITがかかるまで性能が不安定になるなど、特にECSタスクのオートスケーリングと組み合わせた時の扱いの難しさを懸念しました
    • もちろん暖気の導入(例: ZOZOMATの事例)やGraalVMによるAOTコンパイルなど対応策はありそうですが、単純なAPIの開発に対してシステムが過度に複雑になりそうな印象を持ちました

開発メンバーが「Goを書いてみたい」と言っていたのも重要な判断材料でした。技術的なチャレンジは常に少しずつ取り入れたいものですが、今回のような小規模なシステムの方が技術スタックを操作しやすく、リスクを取りやすいからです。

数ヶ月ほどGoを学習しながら書いてみた感想ですが、Goを選択したのは悪くなかったと感じています。基本的にはとても素直で良い開発体験ができました。ただし、負荷試験で問題が起きた際に解決に手間取るなど、経験値不足が露呈した局面もありました(後述)。

発生した課題とその解決

特に何も問題なく完成すればよかったのですが、いざ開発を進めてみるといくつかの問題が出てきたので、それらについて対応を迫られました。ここからは、特に対処が大変だった箇所について触れていきます。

AWS WAFが高価すぎる問題

AWS WAFの料金体系には、リクエスト数に比例する部分が存在します。記事執筆時点で東京リージョンにおける料金はリクエスト 0.60USD 100 万リクエストあたりとなっています。

aws.amazon.com

今回のAPIはアクセス数が大量であるため、WAFに必要な料金が高額になってしまう問題が予想されました。

回避方法をあれこれ検討はしたのですが、結論としては、自前でアプリケーション上にIP単位のアクセス数を数えレート制限する機能を実装することにしました。このアプローチを用いるとWAFを使用せずに済みますが欠点も多く、かなり悩ましく感じています。

  • AWS WAFを用いた設計は、AWS的には「自然」なので、そこから離れるのはベストプラクティスを外している感がある
  • AWS WAFであれば、アプリケーションサーバの外側からの保護が受けられるので例えばDDoS等のリスクは下げられるが、大量アクセスをアプリケーションで受ける構成はシステムに高負荷がかかる可能性がある
  • IP単位のアクセス数カウントと簡単に言うが、実装はそれほど自明ではない

しかし、前述のようにコスト削減は重要なので、この部分はソフトウェアの重要な価値の一部とみなして、頑張って実装することにしました。

ちなみに、AWSのSolution Architectの方からは、AWS Shied Advancedを適用する案も提示いただきました。そうできるとAWS WAFのリクエスト数課金は発生しない上にShield Advancedの様々な恩恵を受けられそうでした。ただ、今回利用するAWSアカウントの契約の都合上、この案は採用しませんでした。

IPアドレス単位のアクセス数カウント

カウントのやり方として最初に思いつくのが、tollboothなどのOSSミドルウェアを入れてしまうことです。 ただ、我々が望むようなアクセス制限が実現できるかや、メモリ消費量なども含めて期待したコントロールができるかは、あまり自信を持てませんでした。それに、どのOSSが良いものかは経験値が不足していると目利きが難しいです。

Goの良いところは、標準ライブラリだけで大抵のことができ依存ライブラリの保守コスト2を下げられる点もあると感じていたので、外部ライブラリを入れすぎるとその良さが削がれる懸念もありました(とはいえzapなど有用なものは採用しています)。

しばらく悩みましたが、今回は自前で実装してしまうことにしました。アクセス数のカウントはよく研究されている分野なので、その世界の知見をそのまま使います。

christina04.hatenablog.com

dev.to

求められてるのは一定時間内にアクセス数が多いIPアドレスを検出することです。採用したSliding Window(Sliding Window Counters)アルゴリズムは、現在の区間とその直前の区間において出現数をカウントし、現在時刻に応じて重み付き平均をとる手法です。

実装例としてはこんな感じでしょうか。新しく来た値でカウントを更新するUpdateと、それが区間内に何回出現したかを返すCountをインタフェースとして持つデータ構造です。区間内でのカウントについてはHitCounterという別のデータ構造に移譲しています。

// イベント列からの出現数を数える機能を持つもの、を表す。
// - Updateは新しく来たものを引数にとり状態を更新する
// - Countは数えたいものを引数にとり、その数を返す
type HitCounter interface {
    Update([]byte)
    Count([]byte) uint32
}

type SlidingWindowCounter struct {
    prev                 HitCounter
    current              HitCounter
    currentWindowStartAt time.Time
    windowInterval       time.Duration
    now                  func() time.Time
}

func NewSlidingWindowCounter(windowInterval time.Duration) *SlidingWindowCounter {
    return &SlidingWindowCounter{
        prev:                 NewHitCounter(),
        current:              NewHitCounter(),
        currentWindowStartAt: time.Now(),
        windowInterval:       windowInterval,
        now:                  time.Now,
    }
}

func (swc *SlidingWindowCounter) rorateWindowIfNeeded() {
    // 現在時刻がcurrentのウインドウ開始時点と大きくズレている場合は、currentとprevに意味がないので捨てて作り直す
    if diff := swc.now().Sub(swc.currentWindowStartAt); diff > swc.windowInterval*2 {
        swc.current = NewHitCounter()
        swc.prev = NewHitCounter()
        swc.currentWindowStartAt = swc.now()
        return
    }
    // 現在時刻が現在のウインドウを超えているなら、次のウインドウが必要なのでローテートする
    if swc.currentWindowStartAt.Add(swc.windowInterval).Before(swc.now()) {
        swc.prev = swc.current
        swc.current = NewHitCounter()
        swc.currentWindowStartAt = swc.currentWindowStartAt.Add(swc.windowInterval)
    }
}

func (swc *SlidingWindowCounter) Update(input []byte) {
    swc.rorateWindowIfNeeded()
    swc.current.Update(input)
}

func (swc *SlidingWindowCounter) Count(input []byte) uint32 {
    swc.rorateWindowIfNeeded()
    currentWeight := float64(swc.now().UnixMilli()-swc.currentWindowStartAt.UnixMilli()) / float64(swc.windowInterval.Milliseconds())
    prevWeight := 1.0 - currentWeight
    currentCount, prevCount := float64(swc.current.Count(input)), float64(swc.prev.Count(input))
    return uint32(math.Round(currentCount*currentWeight + prevCount*prevWeight))
}

各区間において、IPアドレス単位で出現数を数えるには、HitCounterの実装としてカウンタを用意する必要がありそうです……が、ここでmapを使うとメモリ消費量の予測が難しくなってしまいます。具体的な設定値を元に試算すると、まず問題なさそうではあったものの、一抹の不安がありました3

そもそも、おおむね正確な値が知りたいのは、大量アクセスをしてくるごく一部のIPアドレスだけです。なぜなら大部分のIPアドレスからのアクセスは制限に引っかかることはないはずだからです。その点を踏まえて一種の近似カウントを行うアルゴリズムもよく知られています。

tech.plaid.co.jp

今回はCount-Min Sketch(CMS)アルゴリズムを実装しました。出現数の近似カウントを行うアルゴリズムの1つです。ナイーブな実装はとても簡単なのと、許容する誤差とその確率を固定したときメモリ消費量が(=内部的に使う配列のサイズが)予め固定される特徴があります。

これが実装例です。

type CMSketchHitCounter [][]uint32

func NewCMSketchHitCounter(epsilon float64, delta float64) CMSketchHitCounter {
    if !((0 < epsilon && epsilon < 1) && (0 < delta && delta < 1)) {
        panic("invalid argument to create CMSketchHitCounter")
    }
    w := int(math.Ceil(math.E / epsilon))
    d := int(math.Ceil(math.Log(1.0 / delta)))
    sketch := make([][]uint32, d)
    for i := 0; i < d; i++ {
        sketch[i] = make([]uint32, w)
    }
    return sketch
}

func calcKHashValues(input []byte, k int) (values []uint32) {
    buff := make([]byte, k*4)
    h := sha3.NewShake256()
    h.Write(input)
    h.Read(buff)
    values = make([]uint32, k)
    for i := 0; i < k; i++ {
        values[i] = binary.LittleEndian.Uint32(buff[4*i : 4*i+4])
    }
    return
}

func (sketch CMSketchHitCounter) Update(input []byte) {
    w := uint32(len(sketch[0]))
    h := calcKHashValues(input, len(sketch))
    for i := range sketch {
        // オーバーフローはまずないと思うが念のため飽和カウントにしておく
        if sketch[i][h[i]%w] < math.MaxUint32 {
            sketch[i][h[i]%w] += 1
        }
    }
}

func (sketch CMSketchHitCounter) Count(input []byte) uint32 {
    w := uint32(len(sketch[0]))
    h := calcKHashValues(input, len(sketch))
    var min uint32 = math.MaxUint32
    for i := range sketch {
        if v := sketch[i][h[i]%w]; v < min {
            min = v
        }
    }
    return min
}

CMSketchHitCounterHitCounterの実装になっています。これらの実装をアプリケーションに組み込むことで効率的なIPアドレス単位のアクセス数カウントを実現しました。

Count-Min Sketchの実装は簡単とは書いたが

少しだけ実装について補足しておきます。CMSについて「ナイーブな実装は簡単」なのは本当ですが、実際に実装してみると考慮すべき点が多く、検討はそれなりに大変でした。理論通りに現実が動かない、という表現が妥当なのかもしれないですが。

まず、Goの標準ライブラリにある普通のハッシュ関数(non-cryptographic hash function)はfnv-1/fnv-1aなのですが、品質があまり理想的ではないらしく、実装してテストしてみると理論通りのエラー率にならなかったです。

また、CMSの実装にはk個(予め指定されたとある個数)のハッシュ関数が必要になります。よく用いられるのはDouble Hashing(Kirsch-Mitzenmacher optimization)と呼ばれるテクニックで、2つのハッシュ関数から生成式 hi = h1 + i * h2 (i = 0, 1, ...)によって複数のハッシュ関数を作ります。世の中にはGoによる実装例などもたくさん転がっています。

例: GolangでBloomFilterを実装してみた - 逆さまにした

ところが、少し調べてみると

  • 論文には素数pで剰余するように記述されているが、pを具体的にどう選べばよいのか
  • 現実の実装だとpによる剰余が省略されている場合があるように見える
  • 実験的には理論ほど上手く動かない等の指摘がある

などなど、ちょっと簡単には追えないような込み入った議論がありそうでした。例えば下記のissueに長い解説があります。

github.com

Double Hashingは性能をギリギリまで詰めたい場合に使うものであって、単純に高品質なハッシュ値がk個得られるなら今回の用途ではそれでよいはずです。そこで、今回の実装では、準標準パッケージから入手可能という点を踏まえて、golang.org/x/crypto/sha3のSHAKE256を使うことにしました。要するに、可変長のハッシュ関数を使ってそこから切り出せばよいという発想をしました。

負荷試験で性能が出ない問題

アプリケーションは基本的には受け取ったJSONをKinesis Data Streamsに流し込むだけのものなのですが、いくつかの部分で別のAPIに問い合わせを行い、JSONを加工するなど追加の処理をかける機能がありました。

問い合わせは、標準ライブラリのnet/httpのHTTPクライアントを使ってHTTP 1.1通信するだけなので実装は簡単だったのですが、負荷試験をしてみると期待したほどの性能が出せずに苦戦しました。HTTPクライアントがボトルネックになっていました。

プロファイラを当てる等の調査をした結果、チューニングとして重要な点は以下の通りだと分かりました。

  • MaxIdleConnsPerHostを十分に大きくすること
    • この値はデフォルト値が2と、大量のHTTPリクエストを外部に飛ばす場合においては、小さすぎるようです
  • MaxConnsPerHostをある程度の有限な値すること
    • この値はデフォルトで0(無制限)になっています。瞬間的に大量のHTTPリクエストを試みる場合、Goroutineを内部的に大量生成するなどリソースを消費し、スループットが下がってしまうという現象が見られました
      • 負荷試験特有の問題であって現実のワークロードでは稀かもしれません

他の選択肢としてfasthttpの調査も行いました。性能的にはチューニング後のnet/httpをいくらか上回る性能を示したため、有力な選択肢でした。ただし、net/httpのチューニングに成功し目標性能を出すことができるようになったため、現段階では採用しませんでした。

まとめ

ごく一般的なWeb APIの開発であっても、データ分析用途のエンドポイントは流量が多い場合など、少し変わった要件が課せられている事があると思います。今回の事例では、システムに求められる要件を踏まえた工夫を、システム構成から具体的なコードに至るまで、様々な粒度での判断の積み上げとして説明してみました。

今回開発したシステムへの移行作業等は完了していません。現段階でできることはしたつもりですが、記事でも述べたようにややオーバーエンジニアリングと取れる箇所もあり、すべての箇所の判断に絶対の自信があるわけではありません。

行った設計判断のうち、どれぐらいが妥当だったかは今後の実績を見ていかなければならないと考えています。


  1. 本当はもう少しややこしい機能を持っているのですが、本筋から外れるため触れません。
  2. 依存ライブラリのアップデートがあるたびに右往左往した事のある開発者は多いんじゃないでしょうか。
  3. 特に昨今はコンテナ技術を使うので、よりコンパクトな実装を作れるならば、例えばより小さいfargateのインスタンスを使え、コストを下げることができるかもしれません。