PythonからBigQueryを叩くには公式の google-cloud-bigquery があるのですが、asyncioに対応していません。
一部のライブラリではv2側でasyncioに対応しているのですが、BigQueryはv2でも対応していないように見えます。ちなみにv1はほとんどの実装でREST APIを叩いて、v2はほとんどの実装でRPC APIを叩いています(どちらも全部確認したわけではありません)。一部のライブラリでv2のほうを叩くとサーバーエラーが多発したこともあり、今のところなるべくv1でasyncioを使用したいです。
GCPのPythonライブラリをasyncioで動くように書き換えたものが gcloud-aio で用意されており、その中に gcloud-aio-bigquery があります。あまり使われているのを見かけませんが、AirflowのGCP周りの一部で使われているようです。
ただしREST APIそのままの実装になっているため、REST APIの知識が必要です。手軽に使えるとは言い難いです。
REST APIでの流れ
まずクエリを実行するには jobs.query を呼びます。
代表的なパラメーターは以下の通りです。
- query : 必須。SQLクエリを指定する。
- maxResults : 1レスポンスに含まれる行数。指定しなくてもバイト数でページングされるが、指定しておいたほうが安全。
- timeoutMs : タイムアウトの秒数
- useLegacySql : BigQueryのレガシーSQLを使用するかどうか。デフォルトtrueなのでfalseを指定しておいたほうがいい。
実行するとジョブが作成されます。
レスポンスを確認してデータが完結しているか確認します。 jobComplete
が true の場合、ジョブが完了して結果の行を取得できる状態です。この場合、 rows
に結果の行が含まれています。また、 schema
にスキーマが含まれています。 rows
だけ見てもどの列の何のデータがの判別が難しいので、 schema
と合わせて確認します。
jobComplete
が true で、 pageToken
が含まれている場合、まだ読み込むべきデータが残っているので続きを読み込みます。 pageToken
が無ければ終了です。
データが完結していない場合は、レスポンスの jobReference
の jobId
を拾って、( pageToken
があればそれも一緒に) jobs.getQueryResults を実行します。結果は jobs.query と同じです。
全体の流れをまとめると以下のようになります。
- jobs.query でジョブを作成
- jobComplete が true なら
- rows がない(totalRows=0)→終了
- rows がある→そのデータを使う
- pageToken がない→終了
- jobComplete が false 、または pageToken がある→ jobs.getQueryResults を実行し、上の判定ロジックに戻る
実際に使ってみる
実際に動くものが以下になります。
import asyncio from collections.abc import AsyncGenerator import aiohttp from gcloud.aio.bigquery import Job from google.cloud import bigquery from google.cloud.bigquery._helpers import _rows_from_json from google.cloud.bigquery.table import _parse_schema_resource # type:ignore async def query( sql: str, max_results: int = 100 ) -> AsyncGenerator[list[bigquery.Row], None]: loop = asyncio.get_running_loop() async with aiohttp.ClientSession(loop=loop) as session: job = Job(session=session) query_request = { "query": sql, "maxResults": max_results, "useLegacySql": "false", } response = await job.query(query_request=query_request) while True: errors = response.get("errors") page_token = response.get("pageToken") if errors is not None: raise RuntimeError(errors) if response.get("jobComplete"): if int(response.get("totalRows", 0)) == 0: return schema = _parse_schema_resource(response.get("schema", {})) yield _rows_from_json(response.get("rows", ()), schema) if page_token is None: return else: await asyncio.sleep(1.0) # 終わってなさそうなので少し待つ # ジョブの結果を確認 job = Job( job_id=response["jobReference"]["jobId"], project=response["jobReference"]["projectId"], session=session, ) params = { "location": response["jobReference"]["location"], "maxResults": max_results, "pageToken": page_token, } response = await job.get_query_results(params=params) async def main() -> None: async for rows in query("SELECT * FROM `project_id.dataset_id.table`"): for row in rows: # row["key"] で値を取得可能 for key, value in row.items(): print(f"{key}={value}") if __name__ == "__main__": asyncio.run(main())
雑に書いているので必要に応じて書き換えてください。タイムアウトの設定や例外の処理などが別途必要になります。
結果行の整形に公式の google-cloud-bigquery
も使っています。また、 aiohttp
パッケージも必要です。
プロジェクトIDを指定したい場合は Job のコンストラクタで指定してもいいですし、環境変数 GOOGLE_CLOUD_PROJECT
などで指定しても反映されます。
まとめ
公式で対応してほしいです。