本連載ではKubernetesやマイクロサービスを活用するにあたりどんな準備を進めておけばいいか整理します。第4回はDaprを活用したアプリケーションの構築方法を紹介します。
1. はじめに
これまでの連載でKubernetes(AWS EKS)を構築するIaCやアプリケーションのデプロイ手順、ローカルで開発するためのツールなどを紹介してきました。今回はその上で動作するマイクロサービスの連携を考えてみたいと思います。
マイクロサービスは個々のサービスでチームが形成され実装が進められますが、サービス間やシステム全体についても目を向ける必要があります。例えば下記のようなポイントがあるかと思います。
- サービス間通信(同期・非同期)
- サービス間での状態共有
- システム全体の観測
- 部分稼働(障害部分のみ切り離してサービス継続)
- etc…
今回これらのポイントをアプリケーションに組み込まずに実現する手段として、Daprを用いた実現手順を紹介します。
なお、今回の記事のコードはGithubで公開していますので、全コードはそちらを参照してください。
2. Dapr
DaprとはMicrosoftが開発したクラウドやエッジで実行可能なランタイムで、マイクロサービス開発を容易にする機能を提供します。様々な言語や開発Frameworkと併用可能で、ポータビリティの高いアプリケーション開発を実現します。今回の記事ではKubernetes環境でのDaprのアーキテクチャや利用方法について紹介します。各種詳細についてはDaprの公式を参照してください。
2.1 アーキテクチャ
Kubernetes環境でのDaprのアーキテクチャ図は以下のようになります。Daprはサイドカーパターンによってアプリケーションに機能を提供します。アプリケーションはHTTP/gRPCでDaprの機能を利用することができます。HTTP/gRPCによる機能提供のため、言語や実行環境に依存しないポータビリティの高い機能提供方式となっています。

- 上記図中のコンテナの概要
- My svc-a/b
開発者が実装するアプリケーション。 - Dapr Sidecar
Daprのサイドカーです。アプリケーションと同一Podに配置され、アプリケーションにHTTP/gRPCで各種機能を提供します。 - Dapr SidecarInjector
DaprのサイドカーをPodにInejctします。Kubernetesマニフェストのannotationを参照し、サイドカーをInjectします。 - Dapr Placement
アクターズパターンの機能を管理します。 - Dapr Sentry
サービス間通信でmTSLを管理します。 - Dapr Operator
Daprコンポーネントの更新とKubernetesサービスのエンドポイントを管理します。
- My svc-a/b
2.2 主要な機能
Daprが提供する主な機能は以下になります。今回の記事では、マイクロサービスのアプリケーション実装で利用する機会が多いService Invocation,State management,Publish and subscribeについて掘り下げて紹介します。
- Service Invocation
リトライ、分散トレースなどのマイクロサービスに不可欠な機能をサポートするサービス間通信機能を提供します。 - State management
キー/バリュー形式の状態管理を提供します。状態を保管するコンポーネントとしてRedis、Dynamodb、RDBなどから選択することができます。 - Publish and subscribe
publish/Subscribe形式のメッセージング機能を提供します。 - Resource bindings※
データベースやキュー、ファイルシステムなどの外部リソースを抽象化し、イベントを送受信する機能を提供します。 - Actors※
アクターズパターンに関連する機能を提供します。 - Observability※
デバッグやモニタリングのための各種メトリックス、ログ、トレース機能を提供します。 - Secrets※
秘匿情報の管理機能を提供します。AWS Secrets Manager、GCP Secrets Manager、Azure Key Vaultなどのサービスと連携して秘匿情報を管理します。
※今回の記事では掘り下げて紹介しません。興味のある方はDapr公式を参照してください。
3. ServiceInvocation
マイクロサービスではサービス間通信をセキュアに、そして確実に実行するための様々な考慮が必要となります。開発者は、Daprを利用することで通信の暗号化、リトライ、分散トレーシングなどを意識することなくサービス間通信を実現することができます。
3.1 構成図
アプリケーションはDaprのサイドカーにサービスの呼び出しを依頼することでサービス間通信を行います。サンプルのユースケースとして svc-a(サービス呼び出し)がDaprを利用してsvc-b(サービス提供)を実行する構成とします。

3.2 アプリケーションの準備
KubernetesのマニフェストとPythonのアプリケーションコードのサンプルを紹介します。DockerfileなどDaprの利用方法に直接的に関係ないものについてはGithubを参考にしてください。
svc-a(サービス呼び出し)のKubernetesマニフェスト
Daprを有効化するためのアノテーション、アプリケーションをユニークに識別するためのID、アプリケーションのリッスンポートを定義しています。
apiVersion: apps/v1
kind: Deployment
metadata:
name: svc-a
labels:
app: svc-a
spec:
replicas: 1
selector:
matchLabels:
app: svc-a
template:
metadata:
labels:
app: svc-a
annotations:
dapr.io/enabled: "true" #daprのサイドカーを有効化
dapr.io/app-id: "svc-a" #アプリケーションIDをsvc-aで定義
dapr.io/app-port: "5000" #アプリケーションのリッスンポート5000を定義
spec:
containers:
- name: svc-a
image: svc-a:1.0.0
ports:
- containerPort: 5000
svc-b(サービス提供)のKubernetesマニフェスト
Daprを有効化するためのアノテーション、アプリケーションをユニークに識別するためのID、アプリケーションのリッスンポートを定義しています。
apiVersion: apps/v1
kind: Deployment
metadata:
name: svc-b
labels:
app: svc-b
spec:
replicas: 1
selector:
matchLabels:
app: svc-b
template:
metadata:
labels:
app: svc-b
annotations:
dapr.io/enabled: "true" #daprのサイドカーを有効化
dapr.io/app-id: "svc-b" #アプリケーションIDをsvc-bで定義
dapr.io/app-port: "5000" #アプリケーションのリッスンポート5000を定義
spec:
containers:
- name: svc-b
image: svc-b:1.0.0
ports:
- containerPort: 5000
svc-a(サービス呼び出し)のアプリケーションコード(Python)
Daprのサイドカーが以下のエンドポイントでサービス間通信機能を提供しています。パス中の{app-id}は前述したマニフェストファイルで定義したアプリケーションIDを指定します。パス中の{api}は実行したいアプリケーションのAPIのパスを記載します。
エンドポイント
http://localhost:3500/v1.0/invoke/{app-id}/method/{api}
以下のサンプルではアプリケーションID svc-b の、パス api-b1 を実行し、その応答をそのまま返却しています。エンドポイントのホストがlocalhostである点に注目してください。同一Podに配置されるDaprのサイドカーへHTTPリクエストする実装です。svc-bがkubernetesクラスタ上のどこに配置されているかはDaprが把握して適切にルーティングします。svc-aのソースコードからルーティングにまつわる処理を排除できていることがわかります。
@app.route('/api-a1', methods=['GET'])
def apia1():
response = requests.get('http://localhost:3500/v1.0/invoke/svc-b/method/api-b1', timeout=5)
if not response.ok:
print("HTTP %d => %s" % (response.status_code, response.content.decode("utf-8")), flush=True)
data = response.text
return data
svc-b(サービス提供)のアプリケーションコード(Python)
サービス提供側は特別な実装は不要です。以下のサンプルはパス /api-b1 でリクストを受け付けて svc-b api-b1 called! を応答します。
@app.route('/api-b1', methods=['GET'])
def apib1():
return 'svc-b api-b1 called!'
4. StateManagement
多くのアプリケーションではセッション情報などの状態管理が必要になることがあります。DaprはKey/Value形式の状態を管理する機能を提供します。Daprはデータストアのサービスやミドルウェアを隠蔽し、アプリケーションに状態管理機能を提供します。利用できるデータストアは多いですが、ステータスがGAのものは少ないので注意が必要です。
4.1 構成図
Daprのサイドカーが状態管理機能を提供します。アプリケーションはサイドカーを呼び出すことで機能を利用することができます。サンプルのユースケースとして
svc-cがDaprを利用して状態の保存と、保存した状態の取得を行う構成とします。今回は状態の保存先にkubernetesにデプロイしたRedisを利用します。

4.2 アプリケーションの準備
KubernetesのマニフェストとPythonのアプリケーションコードのサンプルを紹介します。DockerfileなどDaprの利用方法に直接的に関係ないものについてはGithubを参考にしてください。
状態を保存するRedisのマニフェスト
RedisをデプロイするためのKubernetesマニフェストにはDaprを利用するための固有の設定は不要です。標準的なRedisのデプロイ定義のため説明は省きます。Githubを参照してください。
StateStoreのマニフェスト
Daprが状態を保存するデータストアへの接続情報を定義します。以下のサンプルではRedisへの接続先を定義し、cartという名称を付けています。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: cart # StateStoreの名称を指定します。
labels:
app: cart
spec: #状態の保存先であるRedisへの接続情報の定義
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis:6379
- name: redisPassword
value: password
※サンプルのため接続情報のパスワードを直接記載していますが。機密情報を直接記述しない方法で定義することも可能です。
状態を保存するアプリケーションコード(Python)
Daprのサイドカーが以下のエンドポイントで状態保存機能を提供しています。パス中の{statestore}は前述したマニフェストファイルで定義したStateStoreの名称を指定します。jsonの key が保存する状態のキーになり、value が保存する状態の値になります。
状態保存のエンドポイント
http://localhost:3500/v1.0/state/{statestore}
以下のサンプルでは、cart というStateStoreにログインユーザーIDをキーに状態を保存しています。
@app.route('/save', methods=['POST'])
def save():
r_data = request.json
product_id = r_data.get('product_id')
quantity = r_data.get('quantity')
# 受信した状態をDaprに連携するためjson形式にする
data = [{'key': login_user_id, 'value': [{'product_id': product_id, 'quantity': quantity }]}]
# Daprサイドカーの状態保存用URLを呼び出す
response = requests.post('http://localhost:3500/v1.0/state/{statestore}'.format(statestore = 'cart'), json = data, timeout=5)
if not response.ok:
print('HTTP %d => %s' % (response.status_code, response.content.decode('utf-8')), flush=True)
return 'OK'
状態を取得するアプリケーションコード(Python)
Daprのサイドカーが以下のエンドポイントで状態取得機能を提供しています。パス中の{statestore}は前述したマニフェストファイルで定義したStateStoreの名称を指定します。パス中の{key}には取得したい状態のキーを指定します。
状態取得のエンドポイント
http://localhost:3500/v1.0/state/{statestore}/{key}
以下のサンプルでは、cartというStateStoreからログインユーザーIDをキーに状態を取得しています。
@app.route('/get', methods=['GET'])
def get():
# Daprサイドカーの状態取得用URLを呼び出す
response = requests.get('http://localhost:3500/v1.0/state/{statestore}/{key}'.format(statestore = 'cart', key = login_user_id), timeout=5)
if not response.ok:
print('HTTP %d => %s' % (response.status_code, response.content.decode('utf-8')), flush=True)
data = response.text
return jsonify(data)
5. Publish and Subscribe
次はパブリッシュ/サブスクライブパターンです。
マイクロサービス間の通信は障害の分離や処理の並行実行などを目的として非同期通信が用いられるケースがあります。非同期通信ではサービス間にメッセージブローカーやキューが入ることでメッセージを送信するパブリッシャーと受信するサブスクライバーに分離します。一般的に、同期通信と非同期通信ではアプリケーション実装が異なりますが、Daprを用いると同期通信とほぼ同じ実装で非同期通信も可能になります。また、パブリッシュ/サブスクライブを実現するメッセージブローカーもプラガブルになっており簡単に切り替えできる特徴があります。
本章ではDaprのPub/Subコンポーネントを用いた非同期通信の動作を検証します。またその際にローカル環境ではOSSのNATS、AWSではマネージドサービスのSNS,SQSと、メッセージブローカーの実装も切り替えてみます。
※Pub/Subコンポーネントが対応しているメッセージブローカーについてはこちらを参照して下さい。
5.1 構成図(プログラム的な記述版)
ここではサンプルのユースケースとして、注文リクエストを受けたときに非同期で在庫サービスと決済サービスにリクエストを投げる構成を考えてみます。(本来は単純な並行ではなく在庫を確保してから決済のようなフローの検討が必要ですが、サンプルのため簡素化しています)
構成を図に表すと下記のようになります。

なお、複数言語間でのサービス連携を確認するため、注文サービスはNode.js、在庫サービス、決済サービスはPythonで実装しています。
5.2 アプリケーションの準備
Kubernetesのマニフェストとアプリケーションコードのサンプルを紹介します。DockerfileなどDaprの利用方法に直接的に関係ないものについてはGithubを参考にしてください。
NATSインストール
ローカル環境上のMinikubeなどkubernetesクラスタにNATSを公式ドキュメントに従い導入します。
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-server/single-server-nats.yml $ kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-streaming-server/single-server-stan.yml
typeにNATSを指定してpub/subコンポーネントをデプロイ
NATS用のpub/subコンポーネントの記載例は下記のようになります。今回公式のYAMLをそのままデプロイしているため、サービス名やクラスタIDが固定値になっていいます。他の名前で導入する場合はNATSのYAMLを一度ダウンロードして適宜変更の上適用してください。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: order-pubsub # コンポーネント名
namespace: default
spec:
type: pubsub.natsstreaming
version: v1
metadata:
- name: natsURL
value: "nats://nats:4222" # 公式が用意しているYAMLでNATSを導入した場合、サービス名が"nats"となっている
- name: natsStreamingClusterID
value: "stan" # 公式が用意しているYAMLでNATSを導入した場合、ClusterIDが"stan"となっている
# below are subscription configuration.
- name: subscriptionType
value: topic
Orderアプリケーションコード(Node.js)
パブリッシュ側はDaprサイドカーのpub/subエンドポイントにトピックを指定してPOSTします。
pub/subエンドポイント
http://localhost:${daprPort}/v1.0/publish/${pubsubName}/${topic}
コードサンプル
const server = http.createServer((request, response) => {
response.writeHead(200, {
"Content-Type": "text/html"
});
const publishUrl = `http://localhost:3500/v1.0/publish/order-pubsub/order`;
axios
.post(publishUrl, {
id: "0001",
name: "sample"
})
.then((response) => console.log(response.data))
.catch(console.log);
response.end("Hello Hiroba");
});
Stock、Paymentアプリケーションコード(Python)
サブスクライブ側の実装は主に2つあります。
- DaprにTopicと呼び出しメソッドの関連を提示するためのメソッド
- トピックにメッセージが流れてきたときに呼び出されるメソッド
1. DaprにTopicと実装メソッドの関連を提示するためのメソッド
Daprはアプリケーション起動時に自動的にこのメソッドを呼び出すことで、どのPub/Subコンポーネントのどのトピックをサブスクライブしておけばいいか、またメッセージが送られてきた際にどのメソッドを呼び出せばよいかを解決します。この処理は固定で/dapr/subscribeをGET呼び出しすることで実現しています。
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{'pubsubname': 'order-pubsub',
'topic': 'order',
'route': 'order'}]
return jsonify(subscriptions)
2.Orderからメッセージが送られてきた際、実際に呼び出されるメソッド
Broker(ここではNATS)にサブスクライブするのはDaprサイドカーの責務であり、アプリケーションはDaprサイドカーからPOSTで呼び出されます。そのため、Stock, Payment側で必要な実装は通常のPOST通信と同様となります。(非同期のためレスポンスでデータを返さないなど、通常のPOST通信と実装面で異なるところはあります)
@app.route('/order', methods=['POST'])
def subscriber():
print(request.json, flush=True)
return json.dumps({'success':True, 'message': 'accept order'}), 200, {'ContentType':'application/json'}
ここまでで準備は整いましたので、実際にアプリケーションをローカルのMinikubeなどにデプロイして動作確認すると、Orderへリクエストを投げるとStock, Paymentが呼び出されることが確認できます。
5.3 宣言的な記述への変更
先ほどの例ではSubscribeの定義をプログラム内に記述していました。
この記述方式だとプログラマが自由にSubscribeを定義できる反面、実装コードにPub/Sub独自の記述が入り込むデメリットがあります。宣言的な記述を用いることで実装コードからアーキテクチャ固有の記載は排除してロジックだけにしてみます。
5.4 構成図(宣言的な記述版)
宣言的な記述にする場合はSubscriptionリソースを追加で作成します。その代わりアプリケーション側は/dapr/subscribeで待ち受けしていた処理が不要となります。
構成を図に表すと下記のようになります。

5.5 アプリケーションの変更
Subscriptionリソースの作成
実装コードからSubscriptionの定義を排除する代わりにSubscriptionリソースを作成します。
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: order-subscription spec: pubsubname: order-pubsub topic: order route: /order scopes: # サブスクライブするアプリケーションのdapr.io/app-idを記載 - stock - payment
アプリケーション(Stock、Paymentアプリケーションコード)の修正
アプリケーション側はsubscribeメソッドを削除するだけです。
#@app.route('/dapr/subscribe', methods=['GET'])
#def subscribe():
# subscriptions = [{'pubsubname': 'pubsub',
# 'topic': 'order',
# 'route': 'order'}]
# return jsonify(subscriptions)
このようにしても、先ほどと同様の動作となります。こちらはアプリケーションコードがシンプルになることと、サブスクライブの定義をまとめて管理できるなどがメリットになるかと思います。
5.6 AWS(EKS)にアプリケーションをデプロイ
次にEKSへデプロイしてみます。ここでは構成をNATSからSNS,SQSへ変更します。
5.7 構成図(AWS版)

5.8 EKSにpub/subリソースをデプロイ
ローカルで検証していた時と同じようにComponentおよびSubscriptionをデプロイします。
※EKS環境の構築については第1回の記事をご覧ください
pub/subコンポーネントリソース
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: order-pubsub # NATSの時と同じpub/sub名を指定
namespace: default
spec:
type: pubsub.snssqs # snssqsを指定
version: v1
metadata:
- name: region
value: "ap-northeast-1"
Subscriptionリソース
ローカルの時と変更点はありません。
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: order-subscription spec: pubsubname: order-pubsub topic: order route: /order scopes: # サブスクライブするアプリケーションのdapr.io/app-idを記載 - stock - payment
アプリケーションも特に変更は不要で、そのままEKSにデプロイするとSQSやSNSが作成されpub/subが動作します。Daprを活用することでアプリケーションのポータビリティが高いことが確認できますね。
5.9 その他の機能について
他にもPub/Subコンポーネントの機能として、パブリッシュ・サブスクライブ可能なアプリケーションの制限や、ルーティング機能(プレビュー版)、Dapr非対応アプリケーションとの連携などがありますので、気になる方はぜひ試してみてください。
(参考)Kubernetes・マイクロサービス技術のノウハウを集約したnautible™
最後にひとつご紹介です。
オージス総研では今回ご紹介したDaprや、これまで本連載で紹介したTerraformやArgoCDを使った環境構築をはじめ、SkaffoldやTelepresenceを活用した開発方法などのコードやドキュメントをプロダクト「nautible」としてGithubで一般公開しています。Kubernetesを活用してマイクロサービスを開発するための環境準備からアプリケーションの実装手順、運用環境までの一通りをテンプレート化して用意することで、開発・導入スピードの向上やナレッジの蓄積・共有を目指しています。本連載では記載していないことなども含まれていますので、こちらもぜひご覧ください。
6. おわりに
今回はマイクロサービス間の同期通信や非同期通信、状態の管理などを「Dapr」を活用して実装する例をご紹介しました。DaprのインターフェースはすべてHTTP通信もしくはgRPC通信で統一されているため、アプリケーションにアーキテクチャ固有の処理やミドルウェア固有の処理が入りにくく、実装の変更や環境の変更に強いアプリケーションを構築することが可能です。マイクロサービスの場合サービス間通信が増えますが、Daprを用いることでアプリケーションはロジックに集中した実装が可能となりますので、お試しいただければと思います。
次回はモニタリングやロギング等運用周りの機能について見ていきたいと思います。
