====== AlWorker 番号付きメッセージキュー ====== require "al_worker_message" ---- 接続が断続的になる場合(httpなど)を想定して、 クライアントがメッセージの取りこぼしがないように、番号付きのキューを実現します。\\ 番号はトランザクションIDと称し、単調増加させます。\\ クライアントはトランザクションIDを指定して、どこまでメッセージを得たかを管理します。\\ メッセージオブジェクトはHashのみで、キー :TID がフレームワークによって追加されます。\\ ====== 使い方 ====== 使用頻度が高いと思われる、キューとブロードキャストの組み合わせを先に説明します。 送り側  @nmsg = NumberedMessage.new() @nmsg.send( {:message=>"message here"} ) 受け側 def tcp_a_recv( sock, param ) msg = @nmsg.receive( @tid ) # @tid(トランザクションID)は、別の箇所で確定させておく。 sock.puts msg.to_s # => [{:message=>"message here", :TID=>1}] return true end ===== 解説 ===== @nmsg.receive()がコールされると引数(トランザクションID)の値によって以下の動作をします。 * 既に発生したトランザクションでキューに保存されている場合は、そこから最新までの全ての値を配列で返します。 * 既に発生したトランザクションで既にキューから削除されている場合は、nilが返ります。 * まだ指定したIDのトランザクションが発生していない場合は、動作を一旦停止し、メッセージが送られるのを待ちます。 @nmsg.send()をコールしてメッセージを送ると、@nmsg.receive()の返り値としてメッセージを受け取り、動作を再開します。 受け側は、メッセージを受信するごとに何らかの動作を行い、また受信待ちになるというサイクルを繰り返すパターンが多いので、専用のメソッド、cycle() も用意しています。使用例は、サンプルを参照してください。 ====== 注意点・動作上の制限 ====== * 内部で BroadcastMessageを使っていますので、使い方・動作上の制限がそのままこちらへも適用されます。BroadcastMessageの説明を併せてご覧ください。 ====== サンプル ====== require "al_worker_message" require "al_worker_tcp" class AlWorker1 < AlWorker def initialize2() @msg = NumberedMessage.new() @tcp = Tcp.new.run( self ) @tid = 1 end # メッセージ送信コマンド def tcp_send( sock, param ) @tid = @msg.send( {:message=>param[""]} ) return true end # メッセージ受信コマンド # receive() コマンドの使用例 def tcp_a_recv( sock, param ) tid = @tid loop { m = @msg.receive( tid ) sock.puts m.to_s tid += 1 } end # メッセージ受信コマンド # cycle() メソッドの使用例 def tcp_a_recv2( sock, param ) @msg.cycle( @tid ) { |m| sock.puts m.to_s } end end AlWorker1.new.run ===== 解説 ===== 実行するとデフォルトのTCPポート番号、1944番で接続を待ち受けます。\\ 接続されたクライアントから、"recv"コマンドを受信するとメッセージ受信モードになり、キューから最後のメッセージを取り出して表示します。\\ 別のクライアントから、"send " コマンドを送信すると、メッセージ待ちモードのクライアントにメッセージを転送します。\\ 送信側クライアント実行例 $telnet localhost 1944 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. send MESSAGE ONE send MESSAGE TWO (このタイミングで受信側を実行) send MESSAGE THREE 受信側クライアント実行例 $telnet localhost 1944 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. recv [{:message=>"MESSAGE TWO", :TID=>2}] [{:message=>"MESSAGE THREE", :TID=>3}] ====== キューのみの使用 ====== (メッセージング機能を持たない)キューのみを使用することもできます。 @mmsg = AlWorker::NumberedMessage.new # enqueue tid = @nmsg.add( { :one=>"ichi" } ) # tid == 1 # dequeue @nmsg.get( 1 ) # [{ :one=>"ichi", :TID=>1 }] @nmsg.get( 0 ) # nil @nmsg.get( 2 ) # [] TIDを指定して、キューの任意の位置からデータを取り出すことができます。\\ データを取り出してもデータが自動的に消されることはなく、キューがあふれることによってのみ消されます。\\ キューのサイズは、コンストラクタで指定できます。\\