Cold to Hot変換が必要になる場面と落とし穴
Cold to Hot変換とは何か。Cold to Hot変換は「ストリームを複数箇所で共有する」際に必要であり、Publish+Connect(またはRefCount)の組み合わせで実現できます。変換が必要な場面を見極め、適切に適用することでRxの意図しない挙動を防げるのです。
この記事で分かること
- Hot ObservableとCold Observableの違いと動作原理
- Publish、Connect、RefCount、shareオペレータの使い分け
- Hot変換が必要な場面を判断するためのチェックリスト
- よくあるバグパターンとその回避方法
Reactive Extensions(Rx)を使い始めたエンジニアが最初につまずきやすいのが、Hot/Coldの概念です。「同じObservableを2回subscribeしたら、APIが2回呼ばれてしまった」「なぜか処理が重複実行される」といった問題に直面した経験はないでしょうか。
Cold Observableとは、購読(subscribe)されて初めてデータを流し始める受動的なObservableです。各購読者に対して独立したストリームが生成されるという特性を持っています。一方、Hot Observableは、購読の有無に関わらず自発的にデータを発行し続けるObservableで、全購読者が同じデータストリームを共有します。
この違いを理解せずにコードを書くと、意図しない重複実行やメモリリークの原因になります。本記事では、Hot/Coldの概念を正しく理解し、適切な場面でHot変換を適用できるようになるための判断基準を解説します。
Hot ObservableとCold Observableの違いを正しく理解する
Hot ObservableとCold Observableの違いは、「データの流れ始めるタイミング」と「ストリームの共有方法」にあります。この違いを理解することが、Hot変換を適切に使いこなす第一歩です。
よく使われるアナロジーとして、Cold Observableは「録画動画」、Hot Observableは「ライブ配信」に例えられます。録画動画は再生ボタンを押した人それぞれが最初から視聴できます。一方、ライブ配信は配信開始時刻から始まっており、途中参加した人は参加した時点以降の内容しか見られません。
この概念はRxJS、UniRx、RxSwift、RxJavaなど、Rxファミリー全体で共通しています。各ライブラリでAPIの詳細は異なりますが、基本的な考え方は同じです。
Cold Observableの特性と動作
Cold Observableの最大の特徴は、購読するたびに処理が実行されることです。subscribeが呼ばれた瞬間にデータの生成が始まり、その購読者専用のストリームが作られます。
Cold Observableの代表例として、以下のようなものがあります。
- HTTPリクエスト(API呼び出し)
- タイマー(interval、timerオペレータ)
- ファイル読み込み
- データベースクエリ
これらは「購読されるまで何も起きない」という特性を持っています。2人が同じCold Observableをsubscribeすれば、2回分の処理が独立して実行されます。
Hot Observableの特性と動作
Hot Observableは、購読の有無に関わらずデータを発行し続けます。購読者は「途中から参加」することになり、参加以降のデータだけを受け取ります。
Hot Observableの代表例には、以下のようなものがあります。
- マウスイベント(クリック、移動)
- キーボード入力
- WebSocketからのメッセージ
- UIイベント全般
これらは「イベントソース自体が独立して動いている」という特性を持っています。マウスのクリックは、誰かがsubscribeしているかどうかに関係なく発生します。
Cold to Hot変換オペレータの仕組みと使い方
ColdなObservableをHotに変換するには、Publishオペレータとその関連オペレータを使用します。Publishオペレータは内部的にmulticast + Subjectで実装されており、ColdなObservableをsubscribe時にSubjectで包むことでHot化する仕組みになっています。
Publishオペレータは、ColdなObservableをConnectableObservableに変換し、connect()呼び出し時にHot化して値を流し始めるオペレータです。ConnectableObservableは、Publishで生成されるObservableの型で、connect()を呼ぶまでデータを流さず、呼んだタイミングでHot化します。
PublishとConnectの基本パターン
Publish + Connectの基本的な流れは以下の通りです。
- ColdなObservableに対してpublish()を呼び、ConnectableObservableを取得
- 必要な数のsubscribeを設定
- connect()を呼んでデータの流れを開始
- 全購読者が同じストリームを共有
よくある失敗パターンとして、「とりあえずPublishしておけばよい」と考えてconnect()を呼び忘れるケースがあります。Publishだけでは不十分で、connect()を呼ばないとデータは流れ始めません。この点を理解していないと、「subscribeしたのに何も起きない」という状況に陥ります。
擬似コードで示すと以下のようになります。
// ColdなObservable(例:API呼び出し)
coldObservable = createApiRequest()
// Publishで変換
connectable = coldObservable.publish()
// 複数箇所でsubscribe
connectable.subscribe(handleResultA)
connectable.subscribe(handleResultB)
// connect()で開始(これを忘れるとデータが流れない)
connectable.connect()
RefCountとshareオペレータ
手動でconnect()を管理するのは煩雑です。そこで便利なのがRefCountの仕組みです。RefCountは購読者数を参照カウントし、最初の購読で自動的にconnect、購読者が0になったら自動で切断します。
shareオペレータはpublish().refCount()相当として機能し、購読者数が0になったら自動で切断する特性を持っています。多くの場面では、shareオペレータを使うことで手動Connect管理が不要になり、コードがシンプルになります。
// shareを使った簡潔な書き方
sharedObservable = coldObservable.share()
// 最初のsubscribeで自動的にconnect
sharedObservable.subscribe(handleResultA)
// 2番目の購読者も同じストリームを共有
sharedObservable.subscribe(handleResultB)
// 全購読者がdisposeされると自動切断
Hot変換オペレータの比較と使い分け
各オペレータには特性の違いがあり、使用場面に応じて適切なものを選ぶ必要があります。以下の比較表で違いを整理します。
【比較表】Hot変換オペレータ比較表
| オペレータ | 接続タイミング | 切断タイミング | 主な用途 | 注意点 |
|---|---|---|---|---|
| publish() + connect() | 手動(connect呼び出し時) | 手動(dispose呼び出し時) | 接続タイミングを厳密に制御したい場合 | connect忘れに注意 |
| publish() + refCount() | 自動(最初のsubscribe時) | 自動(購読者が0になった時) | 購読者がいる間だけ接続したい場合 | 再購読時は再接続される |
| share() | 自動(最初のsubscribe時) | 自動(購読者が0になった時) | 最も一般的なHot変換 | publish().refCount()と同等 |
| shareReplay(n) | 自動(最初のsubscribe時) | 自動(購読者が0になった時) | 直近n件の値を新規購読者にも配信したい場合 | メモリ使用量に注意 |
| publishLast() | 手動(connect呼び出し時) | 完了時 | 最後の1件だけを全購読者に配信したい場合 | 完了しないストリームでは使えない |
使い分けの判断基準
- 単純にストリームを共有したいだけなら、shareが最も簡潔
- 購読タイミングに関わらず直近の値を受け取りたいなら、shareReplay
- 接続タイミングを完全に制御したいなら、publish + connect
- 購読者がいる間だけリソースを使いたいなら、publish + refCount(またはshare)
Hot変換が必要な場面とよくあるバグパターン
「とりあえずPublishしておけばよい」という考え方も、「Hot変換を全く使わない」という考え方も、どちらも問題を引き起こします。重要なのは、Hot変換が必要な場面を正しく見極めることです。
以下のチェックリストを使って、Hot変換が必要かどうかを判断してください。
【チェックリスト】Hot変換が必要な場面チェックリスト
- 同じObservableを複数箇所でsubscribeしている
- APIリクエストの結果を複数のコンポーネントで使用している
- subscribeのたびにリソース(ネットワーク、ファイルI/O等)が消費される
- 購読者間でデータの一貫性を保ちたい(全員が同じ値を受け取るべき)
- タイマーやインターバルを複数箇所で共有している
- 処理の重複実行がパフォーマンス問題を引き起こす可能性がある
- 副作用のある処理(ログ出力、カウントアップ等)が含まれている
- WebSocketやイベントソースを複数のリスナーで共有したい
- キャッシュ的な挙動(一度取得したら共有)を実現したい
- デバッグ時に「同じ処理が複数回実行されている」ことに気づいた
上記のいずれかに該当する場合は、Hot変換を検討すべきです。
Hot変換を忘れた場合のバグ例
Hot変換を忘れた場合の典型的なバグパターンを紹介します。
パターン1: APIリクエストの重複実行
同じAPIレスポンスを画面の複数箇所で使用するケースを考えます。Cold ObservableのままでUIコンポーネントAとBがそれぞれsubscribeすると、同じAPIが2回呼ばれてしまいます。これはサーバー負荷の増大や、レスポンス内容の不整合(タイミング差による)を引き起こす可能性があります。
パターン2: 意図しないリソース生成
タイマーやインターバルをHot変換せずに複数subscribeすると、購読者の数だけタイマーが生成されます。これはメモリリークやパフォーマンス劣化の原因になります。
過剰なHot変換のデメリット
一方で、全てのObservableをHot変換すればよいわけではありません。不要なHot変換には以下のデメリットがあります。
コードの複雑化: Hot変換を追加すると、ストリームのライフサイクル管理が必要になります。単一の購読者しかいない場合、この複雑さは無駄です。
予期せぬ挙動のリスク: RefCountを使ったHot変換では、購読者が0になると切断され、再購読時に再接続されます。この挙動を理解せずに使うと、「データが最初から流れてこない」といった問題を引き起こすことがあります。
デバッグの困難さ: Hot変換されたストリームでは、データの流れが複数の購読者間で共有されるため、問題発生時のデバッグが複雑になる場合があります。
まとめ|ストリーム共有の判断基準を身につける
本記事では、Reactive ExtensionsにおけるCold to Hot変換について、概念の理解から実践的な使い方まで解説しました。
重要なポイントを整理します。
- Cold Observableは購読されて初めてデータを流し始め、各購読者に独立したストリームが生成される
- Hot Observableは購読の有無に関わらずデータを発行し続け、全購読者が同じストリームを共有する
- Publishオペレータは内部的にmulticast + Subjectで実装されており、connect()呼び出しでHot化する
- shareオペレータはpublish().refCount()相当として機能し、最も使いやすいHot変換方法
- Hot変換が必要かどうかは、チェックリストを使って判断する
Hot変換で最も避けるべき失敗パターンは2つあります。1つは「とりあえずPublishしておけばよい」と考えて全てのストリームをHot変換すること。もう1つはHot変換を忘れて同じ処理が複数回実行されるバグを起こすことです。どちらも問題を引き起こします。
Cold to Hot変換は「ストリームを複数箇所で共有する」際に必要であり、Publish+Connect(またはRefCount)の組み合わせで実現できます。変換が必要な場面を見極め、適切に適用することでRxの意図しない挙動を防げるのです。本記事のチェックリストと比較表を活用し、判断基準を身につけてください。
