====== AlWorker 番号付きメッセージキュー ======
require "al_worker_message"
----
接続が断続的になる場合(httpなど)を想定して、 クライアントがメッセージの取りこぼしがないように、番号付きのキューを実現します。\\
番号はトランザクションIDと称し、単調増加させます。\\
クライアントはトランザクションIDを指定して、どこまでメッセージを得たかを管理します。\\
メッセージオブジェクトはHashのみで、キー :TID がフレームワークによって追加されます。\\
====== 使い方 ======
使用頻度が高いと思われる、キューとブロードキャストの組み合わせを先に説明します。
送り側
@nmsg = NumberedMessage.new()
@nmsg.send( {:message=>"message here"} )
受け側
def tcp_a_recv( sock, param )
msg = @nmsg.receive( @tid ) # @tid(トランザクションID)は、別の箇所で確定させておく。
sock.puts msg.to_s # => [{:message=>"message here", :TID=>1}]
return true
end
===== 解説 =====
@nmsg.receive()がコールされると引数(トランザクションID)の値によって以下の動作をします。
* 既に発生したトランザクションでキューに保存されている場合は、そこから最新までの全ての値を配列で返します。
* 既に発生したトランザクションで既にキューから削除されている場合は、nilが返ります。
* まだ指定したIDのトランザクションが発生していない場合は、動作を一旦停止し、メッセージが送られるのを待ちます。
@nmsg.send()をコールしてメッセージを送ると、@nmsg.receive()の返り値としてメッセージを受け取り、動作を再開します。
受け側は、メッセージを受信するごとに何らかの動作を行い、また受信待ちになるというサイクルを繰り返すパターンが多いので、専用のメソッド、cycle() も用意しています。使用例は、サンプルを参照してください。
====== 注意点・動作上の制限 ======
* 内部で BroadcastMessageを使っていますので、使い方・動作上の制限がそのままこちらへも適用されます。BroadcastMessageの説明を併せてご覧ください。
====== サンプル ======
require "al_worker_message"
require "al_worker_tcp"
class AlWorker1 < AlWorker
def initialize2()
@msg = NumberedMessage.new()
@tcp = Tcp.new.run( self )
@tid = 1
end
# メッセージ送信コマンド
def tcp_send( sock, param )
@tid = @msg.send( {:message=>param[""]} )
return true
end
# メッセージ受信コマンド
# receive() コマンドの使用例
def tcp_a_recv( sock, param )
tid = @tid
loop {
m = @msg.receive( tid )
sock.puts m.to_s
tid += 1
}
end
# メッセージ受信コマンド
# cycle() メソッドの使用例
def tcp_a_recv2( sock, param )
@msg.cycle( @tid ) { |m|
sock.puts m.to_s
}
end
end
AlWorker1.new.run
===== 解説 =====
実行するとデフォルトのTCPポート番号、1944番で接続を待ち受けます。\\
接続されたクライアントから、"recv"コマンドを受信するとメッセージ受信モードになり、キューから最後のメッセージを取り出して表示します。\\
別のクライアントから、"send " コマンドを送信すると、メッセージ待ちモードのクライアントにメッセージを転送します。\\
送信側クライアント実行例
$telnet localhost 1944
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
send MESSAGE ONE
send MESSAGE TWO
(このタイミングで受信側を実行)
send MESSAGE THREE
受信側クライアント実行例
$telnet localhost 1944
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
recv
[{:message=>"MESSAGE TWO", :TID=>2}]
[{:message=>"MESSAGE THREE", :TID=>3}]
====== キューのみの使用 ======
(メッセージング機能を持たない)キューのみを使用することもできます。
@mmsg = AlWorker::NumberedMessage.new
# enqueue
tid = @nmsg.add( { :one=>"ichi" } ) # tid == 1
# dequeue
@nmsg.get( 1 ) # [{ :one=>"ichi", :TID=>1 }]
@nmsg.get( 0 ) # nil
@nmsg.get( 2 ) # []
TIDを指定して、キューの任意の位置からデータを取り出すことができます。\\
データを取り出してもデータが自動的に消されることはなく、キューがあふれることによってのみ消されます。\\
キューのサイズは、コンストラクタで指定できます。\\