ストリーミング データの処理: Pub/Sub へのストリーミング データのパブリッシュ
ラボ
2時間
universal_currency_alt
クレジット: 5
show_chart
入門
info
このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
概要
Google Cloud Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、個別のアプリケーション間でメッセージを送受信することができます。Cloud Pub/Sub を使用して、複数のソースのデータに対してパブリッシュやサブスクライブを行った後、Google Cloud Dataflow を使ってデータをリアルタイムで処理できます。
このラボでは、後で Dataflow パイプラインで処理するためにトラフィック センサーデータを Pub/Sub トピックにシミュレートし、最終的に BigQuery テーブルで詳しく分析します。
注: このドキュメントの作成時点では、Dataflow Python SDK でストリーミング パイプラインを使用することはできません。そのため、ストリーミングのラボでは Java を使用しています。
目標
このラボでは、次のタスクを行います。
- Pub/Sub トピックとサブスクリプションを作成する
- Pub/Sub にトラフィック センサーデータをシミュレートする
設定
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
Qwiklabs にシークレット ウィンドウでログインします。
-
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
-
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
-
利用規約に同意し、再設定用のリソースページをスキップします。
タスク 1. 準備
トレーニング用 VM からセンサー シミュレータを実行します。複数のファイルと環境設定が必要になります。
SSH ターミナルを開いてトレーニング用 VM に接続する
- コンソールのナビゲーション メニュー(
)で、[Compute Engine] > [VM インスタンス] をクリックします。
-
training-vm という名前のインスタンスがある行を見つけます。
- 右端の [接続] の下にある [SSH] をクリックしてターミナル ウィンドウを開きます。
- このラボでは、training-vm 上で CLI コマンドを入力します。
初期化が完了していることを確認する
-
training-vm によってバックグラウンドでソフトウェアのインストールが行われます。新しいディレクトリの内容を調べて設定が完了していることを確認します。
ls /training
list(ls)コマンドの出力結果が次の画像のように表示された場合、設定は完了しています。完全なリストが表示されない場合は、数分待ってからもう一度実行してください。注: バックグラウンドのすべての処理が完了するまで、2~3 分かかることがあります。

コード リポジトリをダウンロードする
- このラボで使用するコード リポジトリをダウンロードします。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
プロジェクトを特定する
$DEVSHELL_PROJECT_ID という環境変数を設定します。この変数には、課金対象リソースにアクセスするために必要な Google Cloud プロジェクト ID が格納されます。
- コンソールのナビゲーション メニュー(
)で、[Home] をクリックします。プロジェクト情報を記載したパネルに、[プロジェクト ID] が表示されています。この情報は、[Qwiklabs] タブの [接続の詳細] にある [GCP プロジェクト ID] というラベルの項目でも確認できます。
-
training-vm の SSH ターミナルで、DEVSHELL_PROJECT_ID 環境変数を設定し、他のシェルで使用できるようにエクスポートします。 次のコマンドは Google Cloud 環境からアクティブなプロジェクト ID を取得します。
export DEVSHELL_PROJECT_ID=$(gcloud config get-value project)
タスク 2. Pub/Sub トピックとサブスクリプションを作成する
-
training-vm の SSH ターミナルで、このラボのディレクトリに移動します。
cd ~/training-data-analyst/courses/streaming/publish
gcloud コマンドを使用して、Pub/Sub サービスがアクセス可能で、かつ動作していることを確認します。
- トピックを作成して、簡単なメッセージをパブリッシュします。
gcloud pubsub topics create sandiego
- 簡単なメッセージをパブリッシュします。
gcloud pubsub topics publish sandiego --message "hello"
- トピックのサブスクリプションを作成します。
gcloud pubsub subscriptions create --topic sandiego mySub1
- トピックにパブリッシュされた最初のメッセージを pull します。
gcloud pubsub subscriptions pull --auto-ack mySub1
結果は表示されましたか。表示されない場合、それはなぜですか。
- 別のメッセージをパブリッシュし、サブスクリプションを使用してそれを pull してみます。
gcloud pubsub topics publish sandiego --message "hello again"
gcloud pubsub subscriptions pull --auto-ack mySub1
今度はレスポンスがありましたか。
出力:

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Pub/Sub トピックとサブスクリプションを作成する
-
training-vm の SSH ターミナルでサブスクリプションをキャンセルします。
gcloud pubsub subscriptions delete mySub1
タスク 3. Pub/Sub にトラフィック センサーデータをシミュレートする
- python スクリプトを調べて、San Diego トラフィック センサーデータをシミュレートします。コードは変更しないでください。
cd ~/training-data-analyst/courses/streaming/publish
nano send_sensor_data.py
このシミュレータによって、トラフィック センサーが Pub/Sub にリアルタイムでデータを送っているかのようにスクリプトが動作します。speedFactor パラメータによってシミュレートの速度が決まります。Ctrl+X キーを押してファイルを閉じます。
- トラフィック シミュレーション データセットをダウンロードします。
./download_data.sh
ストリーミング センサーデータをシミュレートする
-
send_sensor_data.py を実行します。
./send_sensor_data.py --speedFactor=60 --project $DEVSHELL_PROJECT_ID
このコマンドは、記録されたセンサーデータを Pub/Sub メッセージで送信することにより、センサーデータをシミュレートします。センサーデータの元の時刻が抽出され、センサーデータの実際のタイミングをシミュレートするため、間隔を置いて各メッセージが送信されます。speedFactor の値に応じてメッセージの間隔が変更されます。speedFactor を 60 にすると、記録されたタイミングより「60 倍高速」になり、およそ 1 時間分のデータが 60 秒ごとに送信されます。
ターミナルを開いたまま、シミュレータの実行を継続します。
タスク 4: メッセージが受信されたことを確認する
SSH ターミナルをもう 1 つ開いてトレーニング用 VM に接続する
- コンソールのナビゲーション メニュー(
)で、[Compute Engine] > [VM インスタンス] をクリックします。
-
training-vm という名前のインスタンスがある行を見つけます。
- 右端の [接続] で、[SSH] をクリックして 2 つ目のターミナル ウィンドウを開きます。
- さきほど作業していたディレクトリに移動します。
cd ~/training-data-analyst/courses/streaming/publish
- トピックのサブスクリプションを作成して pull を実行し、メッセージが取得されることを確認します(注: pull コマンドを何度か発行しないとメッセージが表示されない場合があります)。
gcloud pubsub subscriptions create --topic sandiego mySub2
gcloud pubsub subscriptions pull --auto-ack mySub2
- トラフィック センサー情報を含むメッセージが表示されることを確認してください。

- このサブスクリプションをキャンセルします。
gcloud pubsub subscriptions delete mySub2
- 2 つ目のターミナルを終了します。
exit
センサー シミュレータを停止する
- 最初のターミナルに戻ります。
-
Ctrl+C キーを押してパブリッシャーを停止します。
- 最初のターミナルを終了します。
exit
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。