こんにちは。
新設されたデータソリューションセンター所属の須賀です。
先日、しんゆうさん主催の第3回 データアーキテクト(データ整備人)を”前向きに”考える会に参加してきました。
オンラインにも拘わらず600人近くが申込み、同じ悩みを抱えている方の多さに驚く共に、登壇者の発表に共感ばかりしていました。
チーム内でも盛り上がった実況スレ!
その中で、吉田康久さん(株式会社はてな)の登壇時が、実況スレでも盛り上がりました。
カスタマーサクセスのためのデータ整備人の活動記録
理由としてはいくつかありますが、3つ目のデータカタログの推進をまさに進めていたためです。
- スプレッドシートの依存関係があるワークフローへの既視感
- 意思決定者へのヒアリングなど、泥臭く足で稼ぐことの重要性を体感していたこと
- メタデータ活用として、データカタログの推進
データカタログというのは、BQ上に存在するテーブルやスキーマを検索できるサービスです。
BQのメタデータを管理すれば、クエリを書く際にテーブル定義書を探して、該当テーブルを探して、対象のカラムを探して、、という作業がGCP上で完結します。
データカタログでの検索画面
この画面から遷移すると、対象のテーブルに飛ぶことができます。
なぜメタデータを管理し始めたのか?
もう少し前置きが続くのですが、そもそもの始まりについて書いていきます。
チームの運用業務として、事業部の方から○○の数字を出してほしい、という依頼があったりします。
ETLを利用してBQをDWHとして利用していますが、いくつもサービスが存在するため、テーブルの数が多岐に渡ります。
そのため、そもそも依頼された数字の集計方法が分からない、という問題が生じました。
ExcelやConfluenceで管理されているテーブル定義書を探したり、
過去の作成クエリを探したり、サービス担当のエンジニアの方に確認したり。。
そういった業務を無くすため
1. 利用頻度が高いクエリのview化
2. スキーマ内容をviewの中身に記載する
ということをコード管理し始めました。
SQLに詳しい方だと処理スピードを考慮し、テーブル化しすればいいのでは?と思われるかもしれません。
viewにした理由として、データの持たせ方が変わるかも、ということを想定していました。
この形で当分問題なさそう、という段階になればテーブル化は検討しようと思っています。
前置きが非常に長くなりましたが、本題に入っていきます!
開発環境
- python 3.8.2
事前準備
- スキーマ更新を行うため、サービスアカウントを発行し必要な権限を付与
BigQueryのデータ編集者、閲覧者、ユーザー権限を付与しています。 - モジュールのinstallが完了していない場合、下記を参照ください
クイックスタート: クライアント ライブラリの使用
ファイル構成
・
┣ update_view.py
┣ upadte_schema.py
┣ key
┃ ┗ jsonキーファイル
┣ description
┃ ┣ import_yml.py
┃ ┗ schema_sample.yml
┗ sql
┗ create_view.sql
実装コード
ファイル構成に記載した通り、スキーマの記載をymlファイル内で行っています。
pythonで実装しても良かったのですが、運用保守面で平易に修正できることから採用しました。
create_view.sql
# StandardSQL
# 作成したいテーブルやビューの実行クエリを書いていただければ問題ないです
SELECT
*
FROM
test
update_view.py
実行コマンドサンプル
python create_view.py #1.sql/実行したいSQLファイル #2.tableもしくはview #3.テーブルもしくはビュー名
from google.cloud import bigquery
from google.oauth2 import service_account
import sys
import codecs
# 環境変数の設定を避けるため、パスを指定して実行
credentials = service_account.Credentials.from_service_account_file(
filename= './key/発行したサービスアカウントのJSONキーファイル',
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
args = sys.argv
# 読み込み用に開き、utfに変換しエラーの場合は文字型で渡す
file = codecs.open(args[1], mode='r', encoding='utf-8', errors='strict')
query = file.read()
# TABLEはREPLACE可能だがVIEWは不可能なので、既存のものがある場合は削除する
drop_data ='DROP '+args[2]+' IF EXISTS `作成先のプロジェクト名.データセット名.'+args[3]+'`'+';'+'\n'
create_data ='CREATE '+args[2]+' IF NOT EXISTS'+'\n'+'`作成先のプロジェクト名.データセット名.'+args[3]+'`'+'\n' + 'AS' + '\n'+ query
def exec_query():
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
exec_job = client.query(
drop_data + create_data
)
exec_job.result()
if __name__ == '__main__':
exec_query()
file.close()
schema_sample.yml
サンプルとして作成した、説明欄の内容を記載した ymlファイル
デフォルトでNULLABLEが設定されているため、modeの部分は除外しています
dataset: データセットの名称
name: テーブル・ビューの名称。view作成用のsqlと同じファイル名した方が管理が楽だと思います
desc: テーブル・ビューの説明を記載してください
desc: | 文章を途中で改行したい場合は、先頭に | を追加したこちらを利用
https://www.task-notes.com/entry/20150922/1442890800
columns:
- name: カラムの名称。
クエリを書くときに使用するため、名前から分かりやすい名称が好ましいです。
type: 型を記載してください。以下サンプルの対応表です。他の型を利用したい場合は下記を参照。
https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_rules?hl=ja
整数:INTEGER
数値:FLOAT
文字列:STRING
日付:DATE
時間:DATETIME
description: カラムの中身に関する説明を記載してください。
特定の条件だと、どんな数値が取れるのか?など記載いただけると良いと思います。
- name: test
type: STRING
description: こんな感じで書けば大丈夫です
import_yml.py
description配下のスキーマ説明ファイルを読み込みます。
import yaml
import sys
import codecs
args = sys.argv
file_path = r'.\description'+'\\'+args[1]+'.yml'
with codecs.open(file_path, 'r', 'utf-8') as file:
obj = yaml.safe_load(file)
schema_list = obj["columns"]
def schema():
from google.cloud.bigquery import SchemaField
res = []
for i in range(len(schema_list)):
input_schema = SchemaField(name=schema_list[i]['name'],
field_type=schema_list[i]['type'],
description=schema_list[i]['description'])
res.append(input_schema)
return res
upadte_schema.py
スキーマ更新用の実行コマンドサンプル
python upadte_schema.py #1.作成したymlファイル名
from google.cloud import bigquery
from google.oauth2 import service_account
from description import import_yaml
# サービスアカウントのJSONキーファイルを利用する
credentials = service_account.Credentials.from_service_account_file(
filename= './key/発行したサービスアカウントのJSONキーファイル',
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
project_id = "更新したいテーブル・ビューのProjectを指定"
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
# 対象のテーブル・ビュー名
table_name = import_yaml.obj.get("name")
dataset_id = import_yaml.obj.get("dataset")
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)
# 以下、スキーマの説明更新
schema = import_yaml.schema()
view_schema = bigquery.Table(table_id, schema=schema)
client.update_table(view_schema, ["schema"])
# 以下、テーブル・ビュー本体の説明更新
table_ref = client.dataset(dataset_id).table(table_name)
view = client.get_table(table_ref)
# 説明内容を読み込んで更新
view.description = import_yaml.obj.get("desc")
client.update_table(view, ["description"])
実際のBQ上の結果
データセット説明欄
スキーマ説明欄
今後の展望
今は手動で更新を行っていますが、今後はCI/CDの仕組みも取り入れていきたいです。
スキーマ更新ですが、対象のテーブルやビューのカラムを全て記載する必要があります。
そのためテーブル自体の更新後、スキーマ更新ファイルが実行される、というフローを構築していきたいです。
スキーマの充実によってデータカタログでの検索が容易になれば、分析者も自主的に作業ができます。
同時にエンジニアの方への確認作業が減り、お互いに本来やりたい業務に時間を割けるようになります。
データは生ものだと思いますので、分析をしたい方々が必要な情報を簡単に探せる環境づくりを頑張っていきます!
私個人はデータアナリストを目指しているのですが、データエンジニアとして分析しやすい環境づくりも楽しいなと感じているこの頃です。
自分の課題感として、エンジニアリング力が圧倒的に足りないなと思っていた部分もあります。
まだまだ社内には分析方面で課題が転がっていて、ある意味チームとしてもチャンスだなと思っています。
興味を持たれた方はぜひ一緒に解決していきましょう!
中途採用 | istyle 株式会社アイスタイル
長文になりましたが、最後までお付き合いいただきありがとうございました。