Integrated Data Service 部の中野 (takamoto) です。現在は主にデータプラットフォーム上のデータパイプラインの開発・運用をメインで担当しています。
私の所属しているチームでは Snowflake を中心に構築されたデータパイプラインを主に管理していますが、 Treasure Data CDP を中心としたものについても管理しています。この Treasure Data CDP を中心としたデータパイプラインは規模がそれほど大きくはないこともあり、現在は素朴に Treasure Workflow から SQL (Presto) クエリを実行する形で構築されています。
今回は、こちらのデータパイプラインで行っている ELT 処理のロジックに対して、どのような構成で単体テストを行っているかについて紹介します。
前置き: ELT 処理の品質担保について
ELT 処理によって生成されるデータは様々な判断に利用されることから、データの品質管理は重要です。また、データの品質にも複数の観点がありますが、その中でも基本的な観点の1つにデータ加工処理のロジックそのものに誤りが無いか? ロジック変更時にデグレーションが発生していないか? といったものがあるかと思います。
ロジックの誤りは、ユニーク制約チェックのような加工後のデータに対するチェックだけでは検知ができないケースも多く、検知にはクエリそのものを対象としたロジックテストを行うことが必要になってきます。
近年 ELT 処理の文脈でデファクトスタンダードとなっている dbt を利用している場合は、 dbt-unit-testing などを導入することで独自の作り込みの必要なく単体テストが実現できますし、今後リリースされる dbt v1.8 では標準で単体テストを行う機能が追加される見込みとなっています*1。ですが、弊チームのような素朴に Treasure Workflow と SQL クエリを組み合わせて ELT 処理を行っているケースでは、単体テストを実現するためには何かしらの別の仕組みを用意する必要が出てきます。
弊チームでの単体テスト構成
上述の背景から、弊チームでは極力追加の仕組みを導入することなく単体テストが行えるような構成を用意しています。
大まかな構成としては以下のようなものとなっています。なお、 Presto の API を利用して、テストケースごとに以下のような処理を行う仕組みを用意する。
の部分は、弊チームでは Python を主に利用しているため Python で実装しています。
- SQL クエリ中の入力テーブル名は変数の形で記述し、実際のテーブル名は dig ファイル側で記述する。
- Presto の API を利用して、テストケースごとに以下のような処理を行う仕組みを用意する。
- (衝突しないように名前の末尾にランダム文字列が付与された)一時的なテーブルを作成する。
- 一時的なテーブルを入力テーブルとするように、 SQL クエリの変数部分を置換する。
- テストケースの内容に合わせて、一時的なテーブルにレコードを追加する。
- 変数置換後の SQL クエリを実行し、クエリ結果が意図したものかチェックする。
- 一時的に作成したテーブルを削除する。
こちらの具体的な実装イメージについて、以下で軽く解説していきます。
実装イメージ: テスト対象(SQL クエリ + dig ファイル)部分
上述のように、入力テーブルは SQL クエリ側では変数の形で記述し、 dig ファイル側で実際のテーブル名を指定する形を取ります
(以下の例では、入力テーブル database_name.input_table_name
のカラム col1
, col2
を連結したカラム col3
を返すクエリを記載しています)。
SQL クエリ query.sql
:
SELECT concat(col1, col2) AS col3 FROM ${input.table}
dig ファイル:
_export: input: table: database_name.input_table_name elt: td>: query.sql create_table: database_name.output_table_name
実装イメージ: Presto API のラッパー部分
テストごとに直接 Presto の API を叩く処理を記述するのは面倒なので、以下のようなラッパークラス TdPrestoClient
を用意しています。なお、 Presto のクライアントライブラリとしては trino-python-client を利用しています。
import dataclasses import os import random import string from abc import abstractmethod from typing import TypeVar import sqlalchemy import sqlalchemy.exc from sqlalchemy.engine import create_engine from sqlalchemy.sql import text class PrestoUserError(RuntimeError): pass @dataclasses.dataclass(frozen=True) class PrestoTableFqn: schema_name: str table_name: str def __str__(self) -> str: return f"{self.schema_name}.{self.table_name}" OutputSchema = TypeVar("OutputSchema") class PrestoClient: @property @abstractmethod def engine(self) -> sqlalchemy.Engine: return NotImplemented @abstractmethod def generate_table_fqn(self, id: str) -> PrestoTableFqn: return NotImplemented def __init__(self) -> None: self._tables: dict[str, tuple[PrestoTableFqn, sqlalchemy.Table]] = {} # 渡されたクエリを実行し、結果を受け取るためのメソッド def execute(self, output_schema: type[OutputSchema], query: str) -> list[OutputSchema]: with self.engine.connect() as connection: rows = connection.execute(text(query)) return [output_schema(**row._asdict()) for row in rows] # 一時的な入力テーブルを作成するためのメソッド def register_table( self, column_definitions: list[str], id: str, records: list[dict[str, str]] | None = None ) -> None: table_fqn = self.generate_table_fqn(id) table = sqlalchemy.Table( table_fqn.table_name, sqlalchemy.MetaData(), schema=table_fqn.schema_name ) self._tables[id] = (table_fqn, table) with self.engine.connect() as connection: connection.execute(text( f"CREATE TABLE {table_fqn} ({', '.join(column_definitions)})" )) if records: columns = records[0].keys() assert all([record.keys() == columns for record in records]) values = [ f"({', '.join([record[column] for column in columns])})" for record in records ] # 単体テスト時のレコード数は知れているので、素朴に INSERT INTO する。 connection.execute(text( f"INSERT INTO {table_fqn} ({', '.join(columns)}) VALUES {', '.join(values)}" )) # 作成した一時的な入力テーブルをすべて削除するためのメソッド def unregister_all_tables(self) -> None: for table in self._tables.values(): table[1].drop(self.engine) self._tables = {} @property def tables(self) -> dict[str, PrestoTableFqn]: return {id: table_fqn for id, (table_fqn, _) in self._tables.items()} # 環境変数 TD_API_KEY に接続情報が記載されていることを仮定している class TdPrestoClient(PrestoClient): # 環境変数 TD_DB_NAME が指定されていない場合に利用されるデータベースの名前 DEFAULT_TD_DB_NAME = "database_name" def __init__(self) -> None: super().__init__() self._engine = create_engine( "trino://api-presto.treasuredata.com/td-presto", connect_args={ "user": os.environ.get("TD_API_KEY"), "port": 443, "http_scheme": "https" }, ) self.random_suffix = "".join( random.choices(string.ascii_lowercase + string.digits, k=15) ) self.td_db_name = os.environ.get("TD_DB_NAME", self.DEFAULT_TD_DB_NAME) @property def engine(self) -> sqlalchemy.Engine: return self._engine # 一時的な入力テーブルの名前を返すメソッド def generate_table_fqn(self, id: str) -> PrestoTableFqn: return PrestoTableFqn( schema_name=self.td_db_name, table_name=f"{id}_{self.random_suffix}" )
また、テストケースの実行ごとに一時的なテーブルは自動的に削除されると便利なので、以下のようなテスト用のベースクラス TdPrestoTestBase
を用意する一手間を加えています。なお、テスト用ライブラリには pytest を利用しています。
from types import TracebackType from typing import Generator import pytest from contextlib import AbstractContextManager class PrestoClientContextManager(AbstractContextManager[PrestoClient]): def __init__(self, client: PrestoClient) -> None: self.client = client def __enter__(self) -> PrestoClient: return self.client def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: self.client.unregister_all_tables() class TdPrestoTestBase: @pytest.fixture(autouse=True, scope="function") def fixture_client(self) -> Generator[None, None, None]: with PrestoClientContextManager(TdPrestoClient()) as client: self.client: PrestoClient = client yield None
実装イメージ: SQL クエリ内の変数置換処理部分
Python 標準ライブラリの string.Template
は digdag による変数置換程度であれば実現できるので、
こちらを継承した以下のようなクラス SqlTemplate
を用意しています。
import re import string # NOTE: 本来は JavaScript 形式で関数の利用や演算を行う事が可能だが、そこまでは対応していない。 # あくまで ${x.y} のような変数の単純な埋め込みのみに互換のある仕様にしている。 class SqlTemplate(string.Template): pattern = rf""" {re.escape('${')}(?: (?P<escaped>{re.escape('${')}) | (?P<named>(?a:[_a-z][_.a-z0-9]*)) | (?P<braced>(?a:[_a-z][_.a-z0-9]*)) | (?P<invalid>) ){re.escape('}')} """
実装イメージ: テスト本体
ここまでで用意したクラス群を利用して、以下のようにテストケースを記述します(今回の例では、 SQL クエリファイル query.sql
で行っている文字列結合処理が、意図した結果になるかを確認するテストを記述しています)。
import dataclasses # 入力テーブルのスキーマ情報 InputColumnSpec = ["col1 varchar", "col2 varchar"] # 出力レコードに対応するデータクラス @dataclasses.dataclass class OutputRecord: col3: str class TestLogicOnTd(TdPrestoTestBase): # 一時的な入力テーブルの作成〜クエリの実行 は定型処理なので、メソッドにまとめている。 def __execute(self, input_records: list[dict[str, str]]) -> list[OutputRecord]: def query(tables: dict[str, PrestoTableFqn]) -> str: with open("query.sql") as f: template = SqlTemplate(f.read()) return template.substitute( {f"input.{id}": str(table_fqn) for id, table_fqn in tables.items()} ) self.client.register_table(InputColumnSpec, "table", input_records) actual = self.client.execute(OutputRecord, query(self.client.tables)) return actual # テストケース本体 def test_case1(self) -> None: input_records = [ {"col1": "'dummy'", "col2": "'text'"} ] actual = self.__execute(input_records=input_records) assert [x.col3 for x in actual] == ["dummytext"]
このように実装することで、あまり複雑な仕組みを用意することなく、テストケース本体は本質的な記述のみで済む形で単体テストを実現することができます。
まとめ
以前は dbt と Treasure Workflow を組み合わせた事例はほとんど見当たりませんでしたが、半年ほど前に Orchestrate dbt with Treasure Workflow といった記事が公開されたりと徐々に知見がたまりつつあります。なので、単体テスト以外も行いたい場合*2は dbt のエコシステムに載せ替える選択肢も考慮すべきかと思いますが、今回紹介した構成にも以下のような特徴があり、これは小規模な環境において依然として大きなメリットであると感じています。
- 既存のワークフローにあまり手を入れずに実現できる点
- 本番構成のシンプルさを維持できる点
この記事が Treasure Workflow を利用してパイプラインを構築されている方の参考になれば幸いです。
*1:https://docs.getdbt.com/docs/build/unit-tests
*2:今回は取り上げませんでしたが、例えば弊チームでは単体テストに加えて「Treasure Data 上ではなくローカルの Presto サーバ上でテストを実行する仕組み」や「テスト結果のドキュメントを自動生成する仕組み」等も用意しています。