SQL クエリ (on Treasure Workflow) の単体テストを少ない手間で実現している話


Loading...


Integrated Data Service 部の中野 (takamoto) です。現在は主にデータプラットフォーム上のデータパイプラインの開発・運用をメインで担当しています。

私の所属しているチームでは Snowflake を中心に構築されたデータパイプラインを主に管理していますが、 Treasure Data CDP を中心としたものについても管理しています。この Treasure Data CDP を中心としたデータパイプラインは規模がそれほど大きくはないこともあり、現在は素朴に Treasure Workflow から SQL (Presto) クエリを実行する形で構築されています。

今回は、こちらのデータパイプラインで行っている ELT 処理のロジックに対して、どのような構成で単体テストを行っているかについて紹介します。

前置き: ELT 処理の品質担保について

ELT 処理によって生成されるデータは様々な判断に利用されることから、データの品質管理は重要です。また、データの品質にも複数の観点がありますが、その中でも基本的な観点の1つにデータ加工処理のロジックそのものに誤りが無いか? ロジック変更時にデグレーションが発生していないか? といったものがあるかと思います。

ロジックの誤りは、ユニーク制約チェックのような加工後のデータに対するチェックだけでは検知ができないケースも多く、検知にはクエリそのものを対象としたロジックテストを行うことが必要になってきます。

近年 ELT 処理の文脈でデファクトスタンダードとなっている dbt を利用している場合は、 dbt-unit-testing などを導入することで独自の作り込みの必要なく単体テストが実現できますし、今後リリースされる dbt v1.8 では標準で単体テストを行う機能が追加される見込みとなっています*1。ですが、弊チームのような素朴に Treasure Workflow と SQL クエリを組み合わせて ELT 処理を行っているケースでは、単体テストを実現するためには何かしらの別の仕組みを用意する必要が出てきます。

弊チームでの単体テスト構成

上述の背景から、弊チームでは極力追加の仕組みを導入することなく単体テストが行えるような構成を用意しています。

大まかな構成としては以下のようなものとなっています。なお、 Presto の API を利用して、テストケースごとに以下のような処理を行う仕組みを用意する。 の部分は、弊チームでは Python を主に利用しているため Python で実装しています。

  • SQL クエリ中の入力テーブル名は変数の形で記述し、実際のテーブル名は dig ファイル側で記述する。
  • Presto の API を利用して、テストケースごとに以下のような処理を行う仕組みを用意する。
    1. (衝突しないように名前の末尾にランダム文字列が付与された)一時的なテーブルを作成する。
    2. 一時的なテーブルを入力テーブルとするように、 SQL クエリの変数部分を置換する。
    3. テストケースの内容に合わせて、一時的なテーブルにレコードを追加する。
    4. 変数置換後の SQL クエリを実行し、クエリ結果が意図したものかチェックする。
    5. 一時的に作成したテーブルを削除する。

こちらの具体的な実装イメージについて、以下で軽く解説していきます。

実装イメージ: テスト対象(SQL クエリ + dig ファイル)部分

上述のように、入力テーブルは SQL クエリ側では変数の形で記述し、 dig ファイル側で実際のテーブル名を指定する形を取ります (以下の例では、入力テーブル database_name.input_table_name のカラム col1, col2 を連結したカラム col3 を返すクエリを記載しています)。

SQL クエリ query.sql :

SELECT concat(col1, col2) AS col3
FROM ${input.table}

dig ファイル:

_export:
  input:
    table: database_name.input_table_name

elt:
  td>: query.sql
  create_table: database_name.output_table_name

実装イメージ: Presto API のラッパー部分

テストごとに直接 Presto の API を叩く処理を記述するのは面倒なので、以下のようなラッパークラス TdPrestoClient を用意しています。なお、 Presto のクライアントライブラリとしては trino-python-client を利用しています。

import dataclasses
import os
import random
import string
from abc import abstractmethod
from typing import TypeVar

import sqlalchemy
import sqlalchemy.exc
from sqlalchemy.engine import create_engine
from sqlalchemy.sql import text

class PrestoUserError(RuntimeError):
    pass

@dataclasses.dataclass(frozen=True)
class PrestoTableFqn:
    schema_name: str
    table_name: str

    def __str__(self) -> str:
        return f"{self.schema_name}.{self.table_name}"

OutputSchema = TypeVar("OutputSchema")

class PrestoClient:
    @property
    @abstractmethod
    def engine(self) -> sqlalchemy.Engine:
        return NotImplemented

    @abstractmethod
    def generate_table_fqn(self, id: str) -> PrestoTableFqn:
        return NotImplemented

    def __init__(self) -> None:
        self._tables: dict[str, tuple[PrestoTableFqn, sqlalchemy.Table]] = {}

    # 渡されたクエリを実行し、結果を受け取るためのメソッド
    def execute(self, output_schema: type[OutputSchema], query: str) -> list[OutputSchema]:
        with self.engine.connect() as connection:
            rows = connection.execute(text(query))
            return [output_schema(**row._asdict()) for row in rows]

    # 一時的な入力テーブルを作成するためのメソッド
    def register_table(
        self,
        column_definitions: list[str],
        id: str,
        records: list[dict[str, str]] | None = None
    ) -> None:
        table_fqn = self.generate_table_fqn(id)
        table = sqlalchemy.Table(
            table_fqn.table_name,
            sqlalchemy.MetaData(),
            schema=table_fqn.schema_name
        )
        self._tables[id] = (table_fqn, table)
        with self.engine.connect() as connection:
            connection.execute(text(
                f"CREATE TABLE {table_fqn} ({', '.join(column_definitions)})"
            ))
            if records:
                columns = records[0].keys()
                assert all([record.keys() == columns for record in records])
                values = [
                    f"({', '.join([record[column] for column in columns])})"
                    for record in records
                ]
                # 単体テスト時のレコード数は知れているので、素朴に INSERT INTO する。
                connection.execute(text(
                    f"INSERT INTO {table_fqn} ({', '.join(columns)}) VALUES {', '.join(values)}"
                ))

    # 作成した一時的な入力テーブルをすべて削除するためのメソッド
    def unregister_all_tables(self) -> None:
        for table in self._tables.values():
            table[1].drop(self.engine)
        self._tables = {}

    @property
    def tables(self) -> dict[str, PrestoTableFqn]:
        return {id: table_fqn for id, (table_fqn, _) in self._tables.items()}

# 環境変数 TD_API_KEY に接続情報が記載されていることを仮定している
class TdPrestoClient(PrestoClient):
    # 環境変数 TD_DB_NAME が指定されていない場合に利用されるデータベースの名前
    DEFAULT_TD_DB_NAME = "database_name"

    def __init__(self) -> None:
        super().__init__()
        self._engine = create_engine(
            "trino://api-presto.treasuredata.com/td-presto",
            connect_args={
                "user": os.environ.get("TD_API_KEY"),
                "port": 443,
                "http_scheme": "https"
            },
        )
        self.random_suffix = "".join(
            random.choices(string.ascii_lowercase + string.digits, k=15)
        )
        self.td_db_name = os.environ.get("TD_DB_NAME", self.DEFAULT_TD_DB_NAME)

    @property
    def engine(self) -> sqlalchemy.Engine:
        return self._engine

    # 一時的な入力テーブルの名前を返すメソッド
    def generate_table_fqn(self, id: str) -> PrestoTableFqn:
        return PrestoTableFqn(
            schema_name=self.td_db_name,
            table_name=f"{id}_{self.random_suffix}"
        )

また、テストケースの実行ごとに一時的なテーブルは自動的に削除されると便利なので、以下のようなテスト用のベースクラス TdPrestoTestBase を用意する一手間を加えています。なお、テスト用ライブラリには pytest を利用しています。

from types import TracebackType
from typing import Generator

import pytest
from contextlib import AbstractContextManager

class PrestoClientContextManager(AbstractContextManager[PrestoClient]):
    def __init__(self, client: PrestoClient) -> None:
        self.client = client

    def __enter__(self) -> PrestoClient:
        return self.client

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        self.client.unregister_all_tables()


class TdPrestoTestBase:
    @pytest.fixture(autouse=True, scope="function")
    def fixture_client(self) -> Generator[None, None, None]:
        with PrestoClientContextManager(TdPrestoClient()) as client:
            self.client: PrestoClient = client
            yield None

実装イメージ: SQL クエリ内の変数置換処理部分

Python 標準ライブラリの string.Template は digdag による変数置換程度であれば実現できるので、 こちらを継承した以下のようなクラス SqlTemplate を用意しています。

import re
import string

# NOTE: 本来は JavaScript 形式で関数の利用や演算を行う事が可能だが、そこまでは対応していない。
#       あくまで ${x.y} のような変数の単純な埋め込みのみに互換のある仕様にしている。
class SqlTemplate(string.Template):
    pattern = rf"""
    {re.escape('${')}(?:
        (?P<escaped>{re.escape('${')})  |
        (?P<named>(?a:[_a-z][_.a-z0-9]*)) |
        (?P<braced>(?a:[_a-z][_.a-z0-9]*)) |
        (?P<invalid>)
    ){re.escape('}')}
    """

実装イメージ: テスト本体

ここまでで用意したクラス群を利用して、以下のようにテストケースを記述します(今回の例では、 SQL クエリファイル query.sql で行っている文字列結合処理が、意図した結果になるかを確認するテストを記述しています)。

import dataclasses

# 入力テーブルのスキーマ情報
InputColumnSpec = ["col1 varchar", "col2 varchar"]

# 出力レコードに対応するデータクラス
@dataclasses.dataclass
class OutputRecord:
    col3: str

class TestLogicOnTd(TdPrestoTestBase):
    # 一時的な入力テーブルの作成〜クエリの実行 は定型処理なので、メソッドにまとめている。
    def __execute(self, input_records: list[dict[str, str]]) -> list[OutputRecord]:
        def query(tables: dict[str, PrestoTableFqn]) -> str:
            with open("query.sql") as f:
                 template = SqlTemplate(f.read())
            return template.substitute(
                {f"input.{id}": str(table_fqn) for id, table_fqn in tables.items()}
            )

        self.client.register_table(InputColumnSpec, "table", input_records)
        actual = self.client.execute(OutputRecord, query(self.client.tables))
        return actual

    # テストケース本体
    def test_case1(self) -> None:
        input_records = [
            {"col1": "'dummy'", "col2": "'text'"}
        ]
        actual = self.__execute(input_records=input_records)
        assert [x.col3 for x in actual] == ["dummytext"]

このように実装することで、あまり複雑な仕組みを用意することなく、テストケース本体は本質的な記述のみで済む形で単体テストを実現することができます。

まとめ

以前は dbt と Treasure Workflow を組み合わせた事例はほとんど見当たりませんでしたが、半年ほど前に Orchestrate dbt with Treasure Workflow といった記事が公開されたりと徐々に知見がたまりつつあります。なので、単体テスト以外も行いたい場合*2は dbt のエコシステムに載せ替える選択肢も考慮すべきかと思いますが、今回紹介した構成にも以下のような特徴があり、これは小規模な環境において依然として大きなメリットであると感じています。

  • 既存のワークフローにあまり手を入れずに実現できる点
  • 本番構成のシンプルさを維持できる点

この記事が Treasure Workflow を利用してパイプラインを構築されている方の参考になれば幸いです。

*1:https://docs.getdbt.com/docs/build/unit-tests

*2:今回は取り上げませんでしたが、例えば弊チームでは単体テストに加えて「Treasure Data 上ではなくローカルの Presto サーバ上でテストを実行する仕組み」や「テスト結果のドキュメントを自動生成する仕組み」等も用意しています。