nazolabo

フリーランスのWebエンジニアが近況や思ったことを発信しています。

gcloud-aio-bigqueryでasync/awaitを使ってBigQueryのクエリを叩く

PythonからBigQueryを叩くには公式の google-cloud-bigquery があるのですが、asyncioに対応していません。

一部のライブラリではv2側でasyncioに対応しているのですが、BigQueryはv2でも対応していないように見えます。ちなみにv1はほとんどの実装でREST APIを叩いて、v2はほとんどの実装でRPC APIを叩いています(どちらも全部確認したわけではありません)。一部のライブラリでv2のほうを叩くとサーバーエラーが多発したこともあり、今のところなるべくv1でasyncioを使用したいです。

GCPPythonライブラリをasyncioで動くように書き換えたものが gcloud-aio で用意されており、その中に gcloud-aio-bigquery があります。あまり使われているのを見かけませんが、AirflowのGCP周りの一部で使われているようです。

ただしREST APIそのままの実装になっているため、REST APIの知識が必要です。手軽に使えるとは言い難いです。

REST APIでの流れ

まずクエリを実行するには jobs.query を呼びます。

代表的なパラメーターは以下の通りです。

  • query : 必須。SQLクエリを指定する。
  • maxResults : 1レスポンスに含まれる行数。指定しなくてもバイト数でページングされるが、指定しておいたほうが安全。
  • timeoutMs : タイムアウトの秒数
  • useLegacySql : BigQueryのレガシーSQLを使用するかどうか。デフォルトtrueなのでfalseを指定しておいたほうがいい。

実行するとジョブが作成されます。

レスポンスを確認してデータが完結しているか確認します。 jobComplete が true の場合、ジョブが完了して結果の行を取得できる状態です。この場合、 rows に結果の行が含まれています。また、 schemaスキーマが含まれています。 rows だけ見てもどの列の何のデータがの判別が難しいので、 schema と合わせて確認します。

jobComplete が true で、 pageToken が含まれている場合、まだ読み込むべきデータが残っているので続きを読み込みます。 pageToken が無ければ終了です。

データが完結していない場合は、レスポンスの jobReferencejobId を拾って、( pageToken があればそれも一緒に) jobs.getQueryResults を実行します。結果は jobs.query と同じです。

全体の流れをまとめると以下のようになります。

  • jobs.query でジョブを作成
  • jobComplete が true なら
    • rows がない(totalRows=0)→終了
    • rows がある→そのデータを使う
    • pageToken がない→終了
  • jobComplete が false 、または pageToken がある→ jobs.getQueryResults を実行し、上の判定ロジックに戻る

実際に使ってみる

実際に動くものが以下になります。

import asyncio
from collections.abc import AsyncGenerator

import aiohttp
from gcloud.aio.bigquery import Job
from google.cloud import bigquery
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.table import _parse_schema_resource  # type:ignore


async def query(
    sql: str, max_results: int = 100
) -> AsyncGenerator[list[bigquery.Row], None]:
    loop = asyncio.get_running_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        job = Job(session=session)
        query_request = {
            "query": sql,
            "maxResults": max_results,
            "useLegacySql": "false",
        }
        response = await job.query(query_request=query_request)
        while True:
            errors = response.get("errors")
            page_token = response.get("pageToken")
            if errors is not None:
                raise RuntimeError(errors)
            if response.get("jobComplete"):
                if int(response.get("totalRows", 0)) == 0:
                    return
                schema = _parse_schema_resource(response.get("schema", {}))
                yield _rows_from_json(response.get("rows", ()), schema)
                if page_token is None:
                    return
            else:
                await asyncio.sleep(1.0)  # 終わってなさそうなので少し待つ
            # ジョブの結果を確認
            job = Job(
                job_id=response["jobReference"]["jobId"],
                project=response["jobReference"]["projectId"],
                session=session,
            )
            params = {
                "location": response["jobReference"]["location"],
                "maxResults": max_results,
                "pageToken": page_token,
            }
            response = await job.get_query_results(params=params)


async def main() -> None:
    async for rows in query("SELECT * FROM `project_id.dataset_id.table`"):
        for row in rows:
            # row["key"] で値を取得可能
            for key, value in row.items():
                print(f"{key}={value}")


if __name__ == "__main__":
    asyncio.run(main())

雑に書いているので必要に応じて書き換えてください。タイムアウトの設定や例外の処理などが別途必要になります。

結果行の整形に公式の google-cloud-bigquery も使っています。また、 aiohttp パッケージも必要です。

プロジェクトIDを指定したい場合は Job のコンストラクタで指定してもいいですし、環境変数 GOOGLE_CLOUD_PROJECT などで指定しても反映されます。

まとめ

公式で対応してほしいです。

nazoさんの2023年とこれから(お仕事募集中です!!)

何してたの?

一応、仕事しています。

あまり細かいことは言えないのですが、PythonでGraphQLでBigQuery x BigTableでPub/Subで…とかしつつ開発組織のコンサルティング的なこともしていました。キューでタスクを分割する仕組みが思ったより快適すぎるので今後もっと使っていきたいと思いました。

最近のPythonは型検査にasync/awaitとか使えて快適ですね。開発環境はryeで、black+mypy+ruff で各種検査を回しまくるとVSCodeで自動修正やら補完やらが効きまくって今までのPythonとは別世界です。強いて言えば各種ライブラリ(特にGoogle系)が最新のあれこれに対応していなかったりするのが難点ですね。

GitHub Copliotが優秀すぎてコードの書き方より指示コメントの書き方がうまくなってきました。テストケースなんかも半自動生成できて、ちまちま作ってたようなものがどんどん楽になりますね。CodeじゃないVIsual Studioだと標準で入っているので、Unityを書くのとかでもものすごく楽になりました。知ってるか知らないか、知っていたら勝ちみたいな領域で勝負していた方々は今後辛いんじゃないでしょうか。

SNS断ち(特にX)

もう存在を忘れられているかと思いますが、結構前からSNSをほぼ引退しました。X(旧Twitter)もFacebookも過去の投稿をほぼ全て削除してありますし、特殊な事情がない限り告知以外の投稿をすることはもうありません。この記事の通知は流れているはずなので、https://x.com/nazo は告知用アカウントだと思ってください。

理由としては、コロナ禍あたりからの空気の変化により、少なくともXを使い続けるのは自分にとってメリットよりデメリットのほうが上回ってきたからです。X社関係の騒動とはあまり関係ありません。

このブログはSNSではないので、基本的にこのブログは更新し続けるつもりです。

一応Blueskyのアカウント( https://bsky.app/profile/nazo.dev )も取得してあり、万が一普段遣いとして復帰したくなった場合はこちらを使うかもしれませんが、まあそれも今のところはなさそうです。

SNS断ちをして思ったことは、思ったよりSNSを見てメールをくれたり連絡をしてくれたりする人というのはいたんだなということです。SNSを動かしている間はそれなりに反応があったのですが、断ってからはスパム的なもの以外は全く来なくなってしまいました。まあブログくらい書いていればもう少し違うのでしょうが…。

交友関係もほぼ断ってしまったので、すっかりお仕事の話も途絶えてしまいました。難しい話です。

今後について

今の生活はとても自分に合っているので、できれば今の生活を続けていきたいのですが、特にこの先の見通しもないし収入面の不安もあるので難しそうです。

自分ももうすぐ42歳になり、日に日に体力も衰え、絶対にこれをやるんだ!みたいなモチベーションはもうないのですが、成功事例も増えてきましたし、自分にしかできないことも少なくないはずなので、淡々とできることをこなしていこうと思います。

お仕事募集中

引き続きお仕事を募集しております。週2〜4くらいまで・フルリモート・単価多分そこそこ高め?です。解決できる課題などは https://nazo.dev/profile をご覧下さい。フルタイム正社員でないからこそできる内容をメインにしています。

各種SNSのDMでご連絡をお待ちしております。

webtransport-goでチャットを作ってみた

github.com

突然興味が出てきて書きましたが特に実用性はありません。webtransport-goがそれなりに使えるのかどうか調べるために書いたものです。

webtransport-goは紆余曲折あって(?)quic-goに取り込まれたようですが、中のファイルがまだ github.com/marten-seemann/webtransport-go のままなので(2023年1月27日現在)、こちらの名前でimportするのが良いです。修正を投げようとも思いましたが関連する他リポジトリも全て変わっていないので、そのうち修正されるのを待とうと思います。

切断周りの対応はちゃんと書いていません。

チャットメッセージはUnidirectional Streamでやりとりしています。一部で話題のTwitchのWarpもUnidirectionalで実装されているようです。webtransport-goはDatagramはまだ未対応のようです。

WebTransport自体はコネクションさえ確立されれば後はQUICなので、どうしてもブラウザ上でやりたいことがなければ普通にQUICについて調査することをフォーカスしたほうが良さそうです。ブラウザならではで想定できそうなのはやはりWarpのような動画配信でしょうか。WebCodecsと組み合わせて実装するようです。

参考記事

qiita.com

2022年の仕事と近況

2022年の仕事

去年からの続きの仕事のみで、大きく話せるようなことは特にありません。仕事をしていないわけではないですが、一度区切りをつけて来年からは大幅縮小になる予定です。

近況

2022年は引っ越したりしていました。一応首都圏ではあるものの、通勤することを完全に捨てた場所にいます。その代わり仕事部屋を手に入れたので、リモートワークで会議が頻繁に発生することにも対応できることになりました。

去年今年と辛いことが続いていますが、来年は本厄らしいのでもうだめかもしれません。がんばります。

まとめ

今後はブログを中心に情報発信を行っていこうと思います。技術的な話は Zenn にも転載していますので、技術的な話のみ読みたい方はそちらをフォローしてください。

来年からの予定は今のところ未定なので、週2〜3くらいでの案件を引き続き募集しております。詳しくは https://nazo.dev/ をご覧下さい。

最近の Terraform のディレクトリ構造の分け方

概要

modules / envs / apps でディレクトリを切っています。

基礎知識: modules と envs

誰が言い出したのかはわかりませんが、modules と envs に分けるパターンはよく見かけます。

modules は文字通りモジュールを配置するところで、 envs はそれらを使う環境が入っているという感じです。

.
├── modules
│   ├── vpc
│   ├── app
│   └── db
└── envs
    ├── production
    └── staging

modules の抽象度は、 12 factor app の バックエンドサービス くらいのイメージを持っていればいいのではないかと思います。VPC みたいな基盤は特殊として、それ以外は「外しても使える程度」を意識するのが良いと思います。

一般的に envs で書かれる tf は、modules を呼び出すだけにしておくのが良いと思います。直接リソースを書くのは最終手段です。

modules と envs はわかるけど apps って何?

全環境で VPC も DB も分けるなら必要ないですが、「複数のアプリで同一の DB を参照したい」というケースは多々あります。進捗が別々の環境を同時に作りたい場合などです。また、費用を節約するために同一の DB リソースを共有するという場合もあります。

そのような状況を考えた時、「土台のバックエンドサービスと、自分で書いたアプリケーションは別の扱いにしたほうがいいのでは」となりました。この「土台のバックエンドサービス」が「envs」で、「自分で書いたアプリケーション」が「apps」になります。

「envs」は、VPC や DB などがまとまったもので、「apps」はデプロイ単位で分けることになります。これにより、1つの VPC や DB に、複数のアプリケーションを同居させることが簡単になります。

.
├── modules
│   ├── vpc
│   ├── app
│   └── db
├── apps
│   ├── staging
│   ├── production
│   └── featurexxxx
└── envs
    ├── production
    └── staging

「envs」以下より「apps」以下のほうが必ず数が多くなります。また、「apps」は「envs」の設定に依存しますが、逆の依存は必ず存在しないようにします。

git の pre-commit hook はなるべく使わないほうがいいのでは

git に pre-commit hook という、コミット直前に何かのプログラムを実行する機能があります。これを用いて lint や test を実行したりすることがありますが、この利用は極力避けたほうが良いのではと考えています。

なぜ?

  • 個人の環境で実行した結果は信用できない
  • ローカルコンピューターで待たせてはいけない
  • pre-commit の挙動や中断などで動作が怪しくなることがある

個人の環境で実行した結果は信用できない

全員が完全に同じ環境で pre-commit hook を実行しているかは誰にもわかりません。言語やライブラリのバージョンに差がある可能性もあります。また、hook をスキップする方法もあります。

それだけを信用してチェックを行ったと判断するのは危険であるため、CI で実行することを優先すべきです。

ローカルコンピューターで待たせてはいけない

チェック項目が増えると、hook の実行時間も増えます。

あくまで hook で行う処理の大半はガードレールなので、ちゃんと書いていれば一発通過するはずで、そのために毎回各自が待たされるのは無駄です。

チェック内容に自信がなければコミット前に手動実行すると思いますので、それでチェックが通っても pre-commit hook は実行され、さらに時間をかけてしまいます。

チェックに時間がかかりすぎるようになると、チェックを迂回するための策を考えてしまう可能性があります。

チェックが通る前提でコードを書き、素早く push して開発を効率化しましょう。

pre-commit の挙動や中断などで動作が怪しくなることがある

ライブラリの更新などで pre-commit hook がアップデートされた場合などや、極端にリビジョンに差があって pre-commit の挙動に差があるブランチに切り替えた場合などに、pre-commit スクリプトとライブラリが一致していない等でおかしな挙動をすることがあります。他にも様々な理由で予期せぬ結果を起こすことがあります。

git commit に機能を追加してしまうことで、 git commit 時に git commit 以外のことを考える必要が出てきます。

そんなこと言っても変なコードが push されたらどうするの?

CI で防げばいいです。そのための CI です。

CI と同等の動作を「任意で」ローカルで実行できることは必要ですが、最終判断はあくまで CI にやらせましょう。

CI で必ずチェックすることで、変なコードがマージされるのを確実に防ぐことができます。また、常に同じ環境である CI に実行させることによって、環境ごとで結果に違いが出るといった現象も防ぐことができます。

その割には有名なライブラリたくさんあるし使っている人もいっぱいいるよ!

Ruby の pre-commit gem や nodejs の Husky + lint-staged などがあり、特に nodejs 界隈ではボイラープレートに組み込まれていたりします。

どれも有名かつ人気があるので使うべきかと思いますが、「出来がよく実用しても問題ないから人気である」と「必ず使う必要がある」は全く関係がありません。これまでの理由により、基本的には使う必要はないものです。

どうしても忘れっぽい人やチェックする習慣がない初学者は任意で有効化して利用する、というのはありかもしれませんが、強制するものではないと思います。

リモートに push されること自体が望ましくないものを防ぐ場合は使って良いと思います。例えば git-secrets のようなものです。一般的な lint エラーやテストの失敗といったものはリモートに push されても何の問題もありません。誰もマージできないだけです。

まとめ

よほどの理由がない限り、pre-commit hookは使わないほうがいいと思ったほうが良いでしょう。各自の判断で手動実行 + 必ず CI でチェック、でほとんどの場合は十分です。絶対に使うなというわけではないですが、なぜ pre-commit hook を使う必要があるのか、何のために使うのかをよく考えて利用しましょう。

参考

www.thoughtworks.com

2021年の仕事まとめ

Go による API サーバーと React による管理画面開発

Go で gRPC サーバーを作りつつ、 React と gqlgen で 管理画面を作るというような内容でした。

都合により短期でしか関われなかったのですが、あまりお目にかかれない環境での開発で非常に楽しい経験をさせて頂きました。

株式会社ハグカム 様

GLOBAL CROWN という子ども向け英会話サービスを提供しております。今年初めに資金調達も行っており 、絶賛成長中のサービスになっております。

私も少しだけお世話になっており、主にコアドメインからは少し距離を置いたものを中心に、以下のような内容を行っておりました。

  • GCE で動いていたインフラを Cloud Run に移行
  • インフラの Terraform 化
  • linter を整備してコード品質を上げる
  • CI / CD の整備
  • 負荷チューニング
  • Docker 開発環境の整備
  • 言語やフレームワークのアップグレード

サーバーサイドの言語としては PHPRuby を利用しており、久々に長期で PHP を触っていました。

また、コーポレートサイト を Next.js + Netlify で SSG 化したり、未公開の内容についての対応なども行っておりました。

今年は仕事以外のところでいろいろあったのですが、ハグカム様にはかなり融通を利かせて頂いて助かりました。

Rails エンジニアや Flutter エンジニアなどを積極的に採用中ですので、興味があれば以下から応募するか私までご連絡下さい。

https://www.hugcome.co.jp/engineerwww.hugcome.co.jp

www.green-japan.com

まとめ

今年はプライベートで良くないことが続く年でしたが、来年後半からはまた良い状況が生まれてくるはずなので、そこから活動を増やしていきたいと考えています。

引き続きお仕事を募集しております。詳細は https://nazo.dev/profile をご覧下さい。