MQTTとMessagePub+ :通知系トピック


トピック設計の主体

通知系トピックは、通知元を主体として設計します。例えばセンサーの値を、データ表示サーバーに表示するようなシステム構成の場合、センサー側の通知情報に基づいてトピックを設計します。センサーの値は、データ表示サーバー以外のサービスからも参照される可能性があり、ひとつのサービス主導でトピック設計を行うと、他のサービスで使いにくくなってしまうことがあるためです。

分割単位

通知系トピックは、意味のある情報のまとまりを単位として分割します。これを見つけるためには、同時に変化する情報に着目するのがひとつのポイントです。例えば人感センサーが各部屋に設置されていた場合を考えてみます。

トピック名を「人感センサー通知」として、データに部屋毎の感知情報を入れるとどうなるでしょう?データの内容を仮にJSON形式で表現するなら、以下のようになります。

6659_11.png

これはあまり良い分割とはいえません。確かに全て人感センサー情報ではありますが、それぞれの部屋で状態は独立して変化します。

この点を考慮すると、以下のように分割すべきです。

6659_21.png

トピックを分割すると、どんなメリットがあるのでしょう。まず、変化した情報だけパブリッシュすれば良いので通信量が少なくなります。トピックが「人感センサー通知」ひとつにまとまっていたとしても、変化した情報だけパブリッシュすることは可能です。しかし、その場合retain機能との相性が悪いという問題があります。retain機能をどう活用するのかということも含めて掘り下げていきます。

retain機能の活用

retain機能とは、デバイスなどの最新状態を、後からサブスクライブしてきたクライアントに知らせる際に便利な機能です。例えば、デバイスなどのMQTTクライアントの現在の状態を、画面などにデータを表示するサービスを考えてみます。もし、データを表示するMQTTクライアントが、デバイスよりも後からMQTTブローカに接続し、サブスクライブを行った場合、現在のデバイスの状態が分からないため、表示することができません。デバイス側で状態が変化して、パブリッシュが行われて初めて状態が分かります。

6659_31.png

retain機能を用いれば、この問題を簡単に解決できます。

retain機能とは、トピックにパブリッシュされたメッセージを、ブローカが記憶し、サブスクライブした直後にその内容を伝える機能です。CONNACKパケット、PUBLISHパケット、SUBSCRIBEパケット、それぞれretain機能に関連する設定がありますので、それも含めて見ていきます。

MQTT v5では、ブローカがretain機能をサポートしているかどうかをクライアントに伝えることができるようになりました。クライアントがCONNECTパケットをブローカに送って接続要求したら、ブローカはCONNACKパケットで接続結果を返信します。そのCONNACKパケットのなかに、Retain Availableというプロパティを設定して返します。この値が1ならばretain機能がサポートされ、0ならサポートされません。プロパティが存在しなかった場合は、retain機能はサポートされます。

パブリッシュしたメッセージをブローカに記憶させるためには、PUBLISHパケットに、RETAINフラグを1に設定します。記録されるのはトピック毎にひとつメッセージだけです。記録されるのは、PUBLISHパケットのQoSとPayload(メッセージの中身)、そしてプロパティです。記録したメッセージを消すには、Payloadの長さ0のメッセージをRETAINフラグを1にしてパブリッシュします。

6659_4.png

なお、RETAINフラグを1に設定したPUBLISHパケットは、通常通りサブスクライブしているクライアントに配信されます。メッセージを配信せずに、ブローカに記憶されるメッセージだけを更新することはできません。

retainされたメッセージのあるトピックをサブスクライブすると、SUBSCRIBEパケットの応答としてのSUBACKパケット受信に続いて、PUBLISHパケットとしてretainされたメッセージが配信されます。QoSはretainメッセージのQoSと、SUBSCRIBEパケットに指定したQoSの低い方になります。これは、通常のPUBLISHパケットの配信と同じ振る舞いです。

SUBSCRIBEパケットには、retainに関するオプションが2つあります。ひとつめのオプションはRetain Handlingです。Retain Handlingは0,1,2の3種類の値を指定することができます。Retain Handlingの値と、その他の条件によって、retainメッセージが配信されるかどうかが決まります。その判定ロジックを以下に示します。

6659_5.png

もうひとつのオプションはRetain As Published(RAP)です。これはクライアントが、RETAINフラグを1に設定してパブリッシュしたメッセージを、そのトピックをサブスクライブしているクライアントにブローカが配信する時に、RETAINフラグを1のまま配信するか、0にして配信するかを決めるオプションです。RAPが0なら、RETAINフラグは0で配信され、RAPが1ならRETAINフラグが1で配信されます。PUBLISHパケットにおけるRETAINフラグの設定に関して整理してみます。

まず、ブローカに記憶されているretainメッセージが、クライアントのサブスクライブによって配信される場合、サブスクライブのRAPの値に関係なく、メッセージ(PUBLISHパケット)のRETAINフラグは常に1になります。ブローカからクライアントへのPUBLISHパケットにおけるRETAINフラグは、ブローカにretainされた(記憶された)メッセージであることを示している、と考えれば理解しやすいでしょう。

6659_6.png

RAPが影響するのは、クライアントがRETAINフラグ1でパブリッシュしたメッセージを、ブローカに記憶するとともに、そのトピックを既にサブスクライブしているクライアントに配信する時です。サブスクライブ時にRAPを0に指定していた場合、配信されるメッセージのRETAINフラグは0となります。これはRAP指定のできないMQTT v3.1.1と同じ振る舞いです。RAPを1に指定していた場合、配信されるメッセージのRETAINフラグは1となります。クライアントがパブリッシュしたRETAINフラグをそのまま配信する、すなわちRetain As Publishedというわけです。

6659_7.png

なお、クライアントがRETAINフラグ0でパブリッシュしたメッセージの配信は、当然ながらRETAINフラグ0で行われます。


ブローカが記憶したretainメッセージの有無を判別するには(MessagePub+ の独自機能)

クライアントがトピックをサブスクライブした時、ブローカがretainメッセージを記憶しており、Retain Handlingの設定がretainメッセージの配信を要求する形になっていれば、retainメッセージが配信されます。しかし、ブローカがretainメッセージを記憶していない場合は、何も配信されません。例えば、データ表示クライアントがブローカに接続直後に、デバイスの現在状態を表示したい場合、ブローカにSUBSCRIBEパケット送信後、SUBACKパケット受信を待ち、さらにretainメッセージのPUBLISHパケットの受信を待つことになります。もしブローカがretainメッセージを記憶していれば速やかにPUBLISHパケットが送られてくるはずです。しかし、記憶していない場合は送られてきません。データ表示クライアントはいつまで待ち続ければ良いのでしょう。現在のMQTT v5には、ブローカがretainメッセージを記憶していないことを確認する手段がありません。よって、一定時間のタイマを設定して、その時間以内にPUBLISHパケットが送られてこなければ、ブローカがretainメッセージを記憶していないと判断する、といった設計が必要となります。その場合でも、ネットワークの遅延などで、後からretainメッセージが送られてくる可能性も考慮しなければなりません。

6659_8.png

現状はタイマ設定が、完全ではないものの現実的な妥協案といえます。MessagePub+ では、この問題を解決するための独自の拡張を、オプションで選択可能としています。それは、クライアントがサブスクライブを行った際、ブローカがSUBACKメッセージを送信後、retainメッセージを記憶していない場合でも、RETAINフラグ1でPayloadが空のPUBLISHメッセージを送信するという拡張です。クライアントは、RETAINフラグが1のPUBLISHパケットを必ず受け取る前提で設計が可能となり、タイムアウトが不要となります。受信時、Payloadを確認し、空ならばretainメッセージが記憶されていないことを知ることができます。MessagePub+ のbrokerの起動オプション--empty-retain-publishをtrueに設定することでこの機能が有効になります。デフォルトではfalseに設定されています。さらに、独自拡張だけではなく、MQTT規格を策定しているOASISにもコメント(外部サイト)を出しています。前述のように、現在、SUBSCRIBEパケットのRetain Handlingは、0,1,2の3種類の値を持つことができます。Retain Handlingは2bitの情報であり、値3は使われていません。そこで、その値3を指定した場合、上記拡張のように振る舞うという提案です。MQTTの将来バージョンで採用されることを願っています。


さて、retain機能を使うことで、デバイスの初期状態をサブスクライブ直後に知ることができることが分かりました。また、retain機能ではトピック単位でひとつのメッセージを保持し、それを上書き更新していくことが分かりました。これらを踏まえると、retain機能を有効に活用するためには、ひとつのトピックに情報をまとめるのではなく、分割する方が望ましいといえるでしょう。retain機能を考慮することで適切な分割単位が見えてきます。

では、分割は何処まで行うべきでしょう。例えば、位置情報をx, y座標で通知することを考えてみます。この場合、トピックは「位置情報」とし、中身に構造化された形式でx, yの値を保持すべきでしょうか。それとも、「位置情報x」「位置情報y」という2つのトピックを用意すべきでしょうか。唯一の正解はありませんが、xとyが通常同時に変化するようなケースに置いては「位置情報」というひとつのトピックにまとめるべきでしょう。 これが冒頭に述べた「意味のある情報のまとまりを単位とする」ということです。

メッセージの内容

トピックの分割について理解したところで、次に、トピックにパブリッシュされるメッセージの内容について考えてみましょう。先ほどの「位置情報」の例では、メッセージの内容を構造化するアプローチを紹介しました。構造化には、様々な方法があります。よく知られている構造化表現としてJSONがあります。JSONで位置情報を表現する例を示します。

{ "x": 10, "y" 20 }

JSONは柔軟な階層表現が可能で、人間が見て理解しやすいという長所があります。その一方で、テキストベースなのでサイズが大きくなりがち、バイナリデータを扱いにくい(エスケープ表現が必要)、必要なデータをプログラム上で取り出すためには構文解析が必要で、ライブラリは整備されているものの計算量はある程度かかる、といった短所があります。

MQTTは、パケットのヘッダが2バイトで、バイナリベースとなっており、例えばHTTPと比較して省サイズを指向しているプロトコルといえます。省サイズを目指すならばメッセージの内容もコンパクトに表現したいものです。JSONと同じ表現力がありながらもバイナリデータを容易に扱え、比較的省サイズなデータ表現方法として、MessagePack(外部サイト)があります。先ほどのJSON表現では、空白を削除すると15バイトとなります。一方、MessagePackフォーマット(外部サイト)で同じ情報を表現すると7バイトで表現できます(外部サイト)。また、多くのライブラリ実装でJSONへの変換もサポートしています。よって、デバッグ時などに人間に分かりやすいJSON形式に変換してログを出力するといったことも可能です。MessagePub+ のSDKでは、PUBLISHパケット送信時に、任意の型をMessagePackに変換して送信する機能をサポートしています。そして、PUBLISHパケット受信時には、MessagePackから元の型に戻すことができます。

パブリッシュのタイミング

では、メッセージをパブリッシュするのはいつでしょう。まずは、デバイスなどが起動した直後に一度パブリッシュが必要でしょう。後は、基本的に、状態に変化があった時にだけパブリッシュすれば良いでしょう。retain機能を使うことで、定期的にパブリッシュしなくても、他のクライアントはデバイスの最新状態を知ることができます。筆者は基本的にこのアプローチを推奨します。ただし、センサーなど通知元の処理をシンプルにしておきたい場合は、定期パブリッシュも現実的な選択といえるでしょう。変化があった時にパブリッシュというアプローチは、イベントドリブンと呼ばれ、多くの現代的アプリケーションサービスとの相性も良いです。さらに、通信量の削減や省電力に寄与するなどメリットも多いです。しかし、センサーのデータが頻繁に変化する場合は、逆にパブリッシュ回数が多くなってしまいます。このような場合、変化を判定する有効数字を調整するなどの、前処理が有効です。

retain機能による最新状態の保持に関しては、注意点もあります。例えばデバイスがダウンしてしまい、状態変化をパブリッシュできない状況になっても、トピックには直近のretainメッセージが記憶されたままです。他のクライアントは、切断前のデバイスの状態を現在の状態と判断してしまうでしょう。ユースケースによってはこれが問題となる場合があります。これを解決するにはwill機能を使います。

will機能

will機能のwillという英単語は「遺言」という意味です。ブローカに接続中のクライアントが切断した時、あらかじめ登録しておいたメッセージを、トピックにパブリッシュするという機能です。

willメッセージは、接続時CONNECTパケットに以下の設定を行うことで登録されます。

  1. Will Flagを1にする。
  2. Will QoSを指定する。PUBLISHパケットと同様0,1,2のいずれかを指定。
  3. Will Retainを指定する。willメッセージ配信時retainメッセージとしても記憶する場合は1、しない場合は0を指定。
  4. Will Topicに、宛先のトピックを指定。
  5. Will Propertiesに、必要に応じてプロパティを指定。
  6. Will Payloadにメッセージの内容を指定。

willメッセージは、willメッセージを登録したクライアントがブローカから切断した場合に配信されます。ただし、ブローカがクライアントから、切断前にDISCONNECTパケットをReason Code 0x00 (Success)で受信した場合、配信されません。なお、Reason Codeを省略した場合、0x00と等価と判断されます。これは、will機能が、クライアントの意図せぬ切断を知らせる目的で導入されており、正常切断時には配信すべきではないためです。

このwill機能を用い、あらかじめ接続時にWill Retainを1、Will Payloadを空に指定したメッセージを登録しておけば、意図せぬ切断が発生した場合に、その旨を通知するとともに、retainメッセージを削除することができます。ただし、MQTT規格では、Will Topicはひとつしか設定できません。そのため、切断時に削除したいretainメッセージを記憶しているトピックが複数存在する場合、うまくいきません。解決のためには次のようなアプローチをとることができます。

  1. 通知系トピック毎に接続を分ける。ただし、接続を分けると接続を跨いだメッセージ順序の保証ができなくなる点に注意。
  2. デバイスそのものの状態を表す通知系トピック「デバイス状態」を導入し、接続中/切断中という内容をretain機能で記憶させる。will機能で、retainメッセージを切断中に更新する。そのデバイスの他の通知系トピックの情報は、「デバイス状態」トピックが接続中の時だけ有効とみなすようにアプリケーションを設計する。
  3. メッセージにタイムスタンプを含め、受信側で古いメッセージは破棄するなどの処理を行う。ただし、本当に状態変化がないのか、デバイスが切断されて変化が検知できていないのか区別できない点に注意。

willで複数のトピックに通知するMessagePub+ の拡張機能

MQTT規格では、Will Topicがひとつしか指定できないため、切断時に直接的に複数のトピックに通知する方法がありません。また、willの内容はCONNECTパケット送信時に決めておく必要があります。MessagePub+ では、トピックにラベルというタグを付けることができるラベル機能と呼ばれる拡張機能があります。トピックトラベルは多対多の関係が設定できるので、例えば「温度1」「湿度1」というトピックに「センサー1」というラベルを付けることができます。ラベルはトピックと同じようにパブリッシュやwillの宛先として使用できるため「センサー1」を指定すれば、「温度1」「湿度1」の両方のトピックに通知するとともに、retainメッセージ更新することができます。全てはブローカのなかで完結しているので高速です。さらに、SDKにはupdate_willと呼ばれるAPIがあり、接続後任意のタイミングでwillの内容を追加、更新、削除することができます。willはクライアントに不意の切断が発生した時に、brokerが代わりにパブリッシュするメッセージですが、willの内容を最新に更新しておくことで、なんらかのアプリケーション処理前に切断が発生したのか、処理後に切断が発生したのか、を伝えることができます。


メッセージの有効期限

トピックにパブリッシュされたメッセージに有効期限を持たせたい場合があります。このような場合、メッセージにタイムスタンプを含め、受信側でそれを現在時刻と比較することで有効性を判断することもできますが、MQTT v5のMessage Expiry Intervalというプロパティを用いることで、よりシンプルに解決できることがあります。Message Expiry Intervalはメッセージの有効期限を、現在時刻を起点とする秒単位で指定することが可能です。その時間を超えると、ブローカはメッセージを破棄します。といっても、ブローカは通常、すぐにサブスクライブしているクライアントにメッセージを配信します。Message Expiry Intervalが機能するのは、retainメッセージとして記憶されたメッセージや、切断後もセッションが維持されているクライアントに配信するためにブローカが記憶しているメッセージとなります。なお、Will PropertyとしてMessage Expiry Intervalが指定された場合、接続時ではなく、willの配信が行われた時刻を起点として有効期限が判定されます。

単にメッセージの有効期限を指定した運用を行いたい場合は、タイムスタンプよりも、Message Expiry Intervalを使う方が、アプリケーションはシンプルになるでしょう。データ分析や可視化のため、そのデータが発生した時刻を知りたい場合は、タイムスタンプを用いるべきでしょう。


MQTTとMessagePub+

MQTTとMessagePub+ 目次に戻る

※この記事に掲載されている内容、および製品仕様、所属情報(会社名・部署名)は公開当時のものです。予告なく変更される場合がありますので、あらかじめご了承ください。

関連サービス