AlWorker 番号付きメッセージキュー
require "al_worker_message"
接続が断続的になる場合(httpなど)を想定して、 クライアントがメッセージの取りこぼしがないように、番号付きのキューを実現します。
番号はトランザクションIDと称し、単調増加させます。
クライアントはトランザクションIDを指定して、どこまでメッセージを得たかを管理します。
メッセージオブジェクトはHashのみで、キー :TID がフレームワークによって追加されます。
使い方
使用頻度が高いと思われる、キューとブロードキャストの組み合わせを先に説明します。
送り側
@nmsg = NumberedMessage.new() @nmsg.send( {:message=>"message here"} )
受け側
def tcp_a_recv( sock, param ) msg = @nmsg.receive( @tid ) # @tid(トランザクションID)は、別の箇所で確定させておく。 sock.puts msg.to_s # => [{:message=>"message here", :TID=>1}] return true end
解説
@nmsg.receive()がコールされると引数(トランザクションID)の値によって以下の動作をします。
- 既に発生したトランザクションでキューに保存されている場合は、そこから最新までの全ての値を配列で返します。
- 既に発生したトランザクションで既にキューから削除されている場合は、nilが返ります。
- まだ指定したIDのトランザクションが発生していない場合は、動作を一旦停止し、メッセージが送られるのを待ちます。
@nmsg.send()をコールしてメッセージを送ると、@nmsg.receive()の返り値としてメッセージを受け取り、動作を再開します。
受け側は、メッセージを受信するごとに何らかの動作を行い、また受信待ちになるというサイクルを繰り返すパターンが多いので、専用のメソッド、cycle() も用意しています。使用例は、サンプルを参照してください。
注意点・動作上の制限
- 内部で BroadcastMessageを使っていますので、使い方・動作上の制限がそのままこちらへも適用されます。BroadcastMessageの説明を併せてご覧ください。
サンプル
- nmsg.rb
require "al_worker_message" require "al_worker_tcp" class AlWorker1 < AlWorker def initialize2() @msg = NumberedMessage.new() @tcp = Tcp.new.run( self ) @tid = 1 end # メッセージ送信コマンド def tcp_send( sock, param ) @tid = @msg.send( {:message=>param[""]} ) return true end # メッセージ受信コマンド # receive() コマンドの使用例 def tcp_a_recv( sock, param ) tid = @tid loop { m = @msg.receive( tid ) sock.puts m.to_s tid += 1 } end # メッセージ受信コマンド # cycle() メソッドの使用例 def tcp_a_recv2( sock, param ) @msg.cycle( @tid ) { |m| sock.puts m.to_s } end end AlWorker1.new.run
解説
実行するとデフォルトのTCPポート番号、1944番で接続を待ち受けます。
接続されたクライアントから、"recv"コマンドを受信するとメッセージ受信モードになり、キューから最後のメッセージを取り出して表示します。
別のクライアントから、"send <message>" コマンドを送信すると、メッセージ待ちモードのクライアントにメッセージを転送します。
送信側クライアント実行例
$telnet localhost 1944 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. send MESSAGE ONE send MESSAGE TWO (このタイミングで受信側を実行) send MESSAGE THREE
受信側クライアント実行例
$telnet localhost 1944 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. recv [{:message=>"MESSAGE TWO", :TID=>2}] [{:message=>"MESSAGE THREE", :TID=>3}]
キューのみの使用
(メッセージング機能を持たない)キューのみを使用することもできます。
@mmsg = AlWorker::NumberedMessage.new # enqueue tid = @nmsg.add( { :one=>"ichi" } ) # tid == 1 # dequeue @nmsg.get( 1 ) # [{ :one=>"ichi", :TID=>1 }] @nmsg.get( 0 ) # nil @nmsg.get( 2 ) # []
TIDを指定して、キューの任意の位置からデータを取り出すことができます。
データを取り出してもデータが自動的に消されることはなく、キューがあふれることによってのみ消されます。
キューのサイズは、コンストラクタで指定できます。