alworker:番号付きメッセージキュー
no way to compare when less than two revisions
差分
このページの2つのバージョン間の差分を表示します。
| — | alworker:番号付きメッセージキュー [2013/08/01 01:05] (現在) – 作成 - 外部編集 127.0.0.1 | ||
|---|---|---|---|
| 行 1: | 行 1: | ||
| + | ====== AlWorker 番号付きメッセージキュー ====== | ||
| + | |||
| + | require " | ||
| + | ---- | ||
| + | |||
| + | 接続が断続的になる場合(httpなど)を想定して、 クライアントがメッセージの取りこぼしがないように、番号付きのキューを実現します。\\ | ||
| + | 番号はトランザクションIDと称し、単調増加させます。\\ | ||
| + | クライアントはトランザクションIDを指定して、どこまでメッセージを得たかを管理します。\\ | ||
| + | メッセージオブジェクトはHashのみで、キー :TID がフレームワークによって追加されます。\\ | ||
| + | |||
| + | ====== 使い方 ====== | ||
| + | |||
| + | 使用頻度が高いと思われる、キューとブロードキャストの組み合わせを先に説明します。 | ||
| + | |||
| + | 送り側 | ||
| + | <code ruby> | ||
| + | @nmsg = NumberedMessage.new() | ||
| + | @nmsg.send( {: | ||
| + | </ | ||
| + | |||
| + | 受け側 | ||
| + | <code ruby> | ||
| + | def tcp_a_recv( sock, param ) | ||
| + | msg = @nmsg.receive( @tid ) # @tid(トランザクションID)は、別の箇所で確定させておく。 | ||
| + | sock.puts msg.to_s | ||
| + | return true | ||
| + | end | ||
| + | </ | ||
| + | |||
| + | ===== 解説 ===== | ||
| + | |||
| + | @nmsg.receive()がコールされると引数(トランザクションID)の値によって以下の動作をします。 | ||
| + | * 既に発生したトランザクションでキューに保存されている場合は、そこから最新までの全ての値を配列で返します。 | ||
| + | * 既に発生したトランザクションで既にキューから削除されている場合は、nilが返ります。 | ||
| + | * まだ指定したIDのトランザクションが発生していない場合は、動作を一旦停止し、メッセージが送られるのを待ちます。 | ||
| + | |||
| + | @nmsg.send()をコールしてメッセージを送ると、@nmsg.receive()の返り値としてメッセージを受け取り、動作を再開します。 | ||
| + | |||
| + | 受け側は、メッセージを受信するごとに何らかの動作を行い、また受信待ちになるというサイクルを繰り返すパターンが多いので、専用のメソッド、cycle() も用意しています。使用例は、サンプルを参照してください。 | ||
| + | |||
| + | ====== 注意点・動作上の制限 ====== | ||
| + | |||
| + | * 内部で BroadcastMessageを使っていますので、使い方・動作上の制限がそのままこちらへも適用されます。BroadcastMessageの説明を併せてご覧ください。 | ||
| + | |||
| + | |||
| + | ====== サンプル ====== | ||
| + | <file ruby nmsg.rb> | ||
| + | require " | ||
| + | require " | ||
| + | |||
| + | class AlWorker1 < AlWorker | ||
| + | def initialize2() | ||
| + | @msg = NumberedMessage.new() | ||
| + | @tcp = Tcp.new.run( self ) | ||
| + | @tid = 1 | ||
| + | end | ||
| + | |||
| + | # メッセージ送信コマンド | ||
| + | def tcp_send( sock, param ) | ||
| + | @tid = @msg.send( {: | ||
| + | return true | ||
| + | end | ||
| + | |||
| + | # メッセージ受信コマンド | ||
| + | # receive() コマンドの使用例 | ||
| + | def tcp_a_recv( sock, param ) | ||
| + | tid = @tid | ||
| + | loop { | ||
| + | m = @msg.receive( tid ) | ||
| + | sock.puts m.to_s | ||
| + | tid += 1 | ||
| + | } | ||
| + | end | ||
| + | |||
| + | # メッセージ受信コマンド | ||
| + | # cycle() メソッドの使用例 | ||
| + | def tcp_a_recv2( sock, param ) | ||
| + | @msg.cycle( @tid ) { |m| | ||
| + | sock.puts m.to_s | ||
| + | } | ||
| + | end | ||
| + | end | ||
| + | |||
| + | AlWorker1.new.run | ||
| + | </ | ||
| + | |||
| + | ===== 解説 ===== | ||
| + | |||
| + | 実行するとデフォルトのTCPポート番号、1944番で接続を待ち受けます。\\ | ||
| + | 接続されたクライアントから、" | ||
| + | 別のクライアントから、" | ||
| + | |||
| + | 送信側クライアント実行例 | ||
| + | < | ||
| + | $telnet localhost 1944 | ||
| + | Trying 127.0.0.1... | ||
| + | Connected to localhost. | ||
| + | Escape character is ' | ||
| + | send MESSAGE ONE | ||
| + | send MESSAGE TWO | ||
| + | | ||
| + | send MESSAGE THREE | ||
| + | </ | ||
| + | |||
| + | 受信側クライアント実行例 | ||
| + | < | ||
| + | $telnet localhost 1944 | ||
| + | Trying 127.0.0.1... | ||
| + | Connected to localhost. | ||
| + | Escape character is ' | ||
| + | recv | ||
| + | [{: | ||
| + | [{: | ||
| + | </ | ||
| + | |||
| + | |||
| + | ====== キューのみの使用 ====== | ||
| + | |||
| + | (メッセージング機能を持たない)キューのみを使用することもできます。 | ||
| + | |||
| + | <code ruby> | ||
| + | @mmsg = AlWorker:: | ||
| + | |||
| + | # enqueue | ||
| + | tid = @nmsg.add( { : | ||
| + | |||
| + | # dequeue | ||
| + | @nmsg.get( 1 ) # [{ : | ||
| + | @nmsg.get( 0 ) # nil | ||
| + | @nmsg.get( 2 ) # [] | ||
| + | </ | ||
| + | |||
| + | TIDを指定して、キューの任意の位置からデータを取り出すことができます。\\ | ||
| + | データを取り出してもデータが自動的に消されることはなく、キューがあふれることによってのみ消されます。\\ | ||
| + | キューのサイズは、コンストラクタで指定できます。\\ | ||
alworker/番号付きメッセージキュー.txt · 最終更新: 2013/08/01 01:05 by hirohito