nazolabo

フリーランスのWebエンジニアが近況や思ったことを発信しています。

gcloud-aio-bigqueryでasync/awaitを使ってBigQueryのクエリを叩く

PythonからBigQueryを叩くには公式の google-cloud-bigquery があるのですが、asyncioに対応していません。

一部のライブラリではv2側でasyncioに対応しているのですが、BigQueryはv2でも対応していないように見えます。ちなみにv1はほとんどの実装でREST APIを叩いて、v2はほとんどの実装でRPC APIを叩いています(どちらも全部確認したわけではありません)。一部のライブラリでv2のほうを叩くとサーバーエラーが多発したこともあり、今のところなるべくv1でasyncioを使用したいです。

GCPPythonライブラリをasyncioで動くように書き換えたものが gcloud-aio で用意されており、その中に gcloud-aio-bigquery があります。あまり使われているのを見かけませんが、AirflowのGCP周りの一部で使われているようです。

ただしREST APIそのままの実装になっているため、REST APIの知識が必要です。手軽に使えるとは言い難いです。

REST APIでの流れ

まずクエリを実行するには jobs.query を呼びます。

代表的なパラメーターは以下の通りです。

  • query : 必須。SQLクエリを指定する。
  • maxResults : 1レスポンスに含まれる行数。指定しなくてもバイト数でページングされるが、指定しておいたほうが安全。
  • timeoutMs : タイムアウトの秒数
  • useLegacySql : BigQueryのレガシーSQLを使用するかどうか。デフォルトtrueなのでfalseを指定しておいたほうがいい。

実行するとジョブが作成されます。

レスポンスを確認してデータが完結しているか確認します。 jobComplete が true の場合、ジョブが完了して結果の行を取得できる状態です。この場合、 rows に結果の行が含まれています。また、 schemaスキーマが含まれています。 rows だけ見てもどの列の何のデータがの判別が難しいので、 schema と合わせて確認します。

jobComplete が true で、 pageToken が含まれている場合、まだ読み込むべきデータが残っているので続きを読み込みます。 pageToken が無ければ終了です。

データが完結していない場合は、レスポンスの jobReferencejobId を拾って、( pageToken があればそれも一緒に) jobs.getQueryResults を実行します。結果は jobs.query と同じです。

全体の流れをまとめると以下のようになります。

  • jobs.query でジョブを作成
  • jobComplete が true なら
    • rows がない(totalRows=0)→終了
    • rows がある→そのデータを使う
    • pageToken がない→終了
  • jobComplete が false 、または pageToken がある→ jobs.getQueryResults を実行し、上の判定ロジックに戻る

実際に使ってみる

実際に動くものが以下になります。

import asyncio
from collections.abc import AsyncGenerator

import aiohttp
from gcloud.aio.bigquery import Job
from google.cloud import bigquery
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.table import _parse_schema_resource  # type:ignore


async def query(
    sql: str, max_results: int = 100
) -> AsyncGenerator[list[bigquery.Row], None]:
    loop = asyncio.get_running_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        job = Job(session=session)
        query_request = {
            "query": sql,
            "maxResults": max_results,
            "useLegacySql": "false",
        }
        response = await job.query(query_request=query_request)
        while True:
            errors = response.get("errors")
            page_token = response.get("pageToken")
            if errors is not None:
                raise RuntimeError(errors)
            if response.get("jobComplete"):
                if int(response.get("totalRows", 0)) == 0:
                    return
                schema = _parse_schema_resource(response.get("schema", {}))
                yield _rows_from_json(response.get("rows", ()), schema)
                if page_token is None:
                    return
            else:
                await asyncio.sleep(1.0)  # 終わってなさそうなので少し待つ
            # ジョブの結果を確認
            job = Job(
                job_id=response["jobReference"]["jobId"],
                project=response["jobReference"]["projectId"],
                session=session,
            )
            params = {
                "location": response["jobReference"]["location"],
                "maxResults": max_results,
                "pageToken": page_token,
            }
            response = await job.get_query_results(params=params)


async def main() -> None:
    async for rows in query("SELECT * FROM `project_id.dataset_id.table`"):
        for row in rows:
            # row["key"] で値を取得可能
            for key, value in row.items():
                print(f"{key}={value}")


if __name__ == "__main__":
    asyncio.run(main())

雑に書いているので必要に応じて書き換えてください。タイムアウトの設定や例外の処理などが別途必要になります。

結果行の整形に公式の google-cloud-bigquery も使っています。また、 aiohttp パッケージも必要です。

プロジェクトIDを指定したい場合は Job のコンストラクタで指定してもいいですし、環境変数 GOOGLE_CLOUD_PROJECT などで指定しても反映されます。

まとめ

公式で対応してほしいです。