ユーザ用ツール

サイト用ツール


alworker:番号付きメッセージキュー
no way to compare when less than two revisions

差分

このページの2つのバージョン間の差分を表示します。


alworker:番号付きメッセージキュー [2013/08/01 01:05] (現在) – 作成 - 外部編集 127.0.0.1
行 1: 行 1:
 +====== AlWorker 番号付きメッセージキュー ======
 +
 +require "al_worker_message"
 +----
 +
 +接続が断続的になる場合(httpなど)を想定して、 クライアントがメッセージの取りこぼしがないように、番号付きのキューを実現します。\\
 +番号はトランザクションIDと称し、単調増加させます。\\
 +クライアントはトランザクションIDを指定して、どこまでメッセージを得たかを管理します。\\
 +メッセージオブジェクトはHashのみで、キー :TID がフレームワークによって追加されます。\\
 +
 +====== 使い方 ======
 +
 +使用頻度が高いと思われる、キューとブロードキャストの組み合わせを先に説明します。
 +
 +送り側
 +<code ruby>
 + @nmsg = NumberedMessage.new()
 +  @nmsg.send( {:message=>"message here"} )
 +</code>
 +
 +受け側
 +<code ruby>
 +  def tcp_a_recv( sock, param )
 +    msg = @nmsg.receive( @tid )    # @tid(トランザクションID)は、別の箇所で確定させておく。
 +    sock.puts msg.to_s             # => [{:message=>"message here", :TID=>1}]
 +    return true
 +  end
 +</code>
 +
 +===== 解説 =====
 +
 +@nmsg.receive()がコールされると引数(トランザクションID)の値によって以下の動作をします。
 +  * 既に発生したトランザクションでキューに保存されている場合は、そこから最新までの全ての値を配列で返します。
 +  * 既に発生したトランザクションで既にキューから削除されている場合は、nilが返ります。
 +  * まだ指定したIDのトランザクションが発生していない場合は、動作を一旦停止し、メッセージが送られるのを待ちます。
 +
 +@nmsg.send()をコールしてメッセージを送ると、@nmsg.receive()の返り値としてメッセージを受け取り、動作を再開します。
 +
 +受け側は、メッセージを受信するごとに何らかの動作を行い、また受信待ちになるというサイクルを繰り返すパターンが多いので、専用のメソッド、cycle() も用意しています。使用例は、サンプルを参照してください。
 +
 +====== 注意点・動作上の制限 ======
 +
 +  * 内部で BroadcastMessageを使っていますので、使い方・動作上の制限がそのままこちらへも適用されます。BroadcastMessageの説明を併せてご覧ください。
 +
 +
 +====== サンプル ======
 +<file ruby nmsg.rb>
 +require "al_worker_message"
 +require "al_worker_tcp"
 +
 +class AlWorker1 < AlWorker
 +  def initialize2()
 +    @msg = NumberedMessage.new()
 +    @tcp = Tcp.new.run( self )
 +    @tid = 1
 +  end
 +
 +  # メッセージ送信コマンド
 +  def tcp_send( sock, param )
 +    @tid = @msg.send( {:message=>param[""]} )
 +    return true
 +  end
 +
 +  # メッセージ受信コマンド
 +  #  receive() コマンドの使用例
 +  def tcp_a_recv( sock, param )
 +    tid = @tid
 +    loop {
 +      m = @msg.receive( tid )
 +      sock.puts m.to_s
 +      tid += 1
 +    }
 +  end
 +
 +  # メッセージ受信コマンド
 +  #  cycle() メソッドの使用例
 +  def tcp_a_recv2( sock, param )
 +    @msg.cycle( @tid ) { |m|
 +      sock.puts m.to_s
 +    }
 +  end
 +end
 +
 +AlWorker1.new.run
 +</file>
 +
 +===== 解説 =====
 +
 +実行するとデフォルトのTCPポート番号、1944番で接続を待ち受けます。\\
 +接続されたクライアントから、"recv"コマンドを受信するとメッセージ受信モードになり、キューから最後のメッセージを取り出して表示します。\\
 +別のクライアントから、"send <message>" コマンドを送信すると、メッセージ待ちモードのクライアントにメッセージを転送します。\\
 +
 +送信側クライアント実行例
 +<code>
 +$telnet localhost 1944
 +Trying 127.0.0.1...
 +Connected to localhost.
 +Escape character is '^]'.
 +send MESSAGE ONE
 +send MESSAGE TWO
 + (このタイミングで受信側を実行)
 +send MESSAGE THREE
 +</code>
 +
 +受信側クライアント実行例
 +<code>
 +$telnet localhost 1944
 +Trying 127.0.0.1...
 +Connected to localhost.
 +Escape character is '^]'.
 +recv
 +[{:message=>"MESSAGE TWO", :TID=>2}]
 +[{:message=>"MESSAGE THREE", :TID=>3}]
 +</code>
 +
 +
 +====== キューのみの使用 ======
 +
 +(メッセージング機能を持たない)キューのみを使用することもできます。
 +
 +<code ruby>
 +  @mmsg = AlWorker::NumberedMessage.new
 +
 +  # enqueue
 +  tid = @nmsg.add( { :one=>"ichi" } )      # tid == 1
 +
 +  # dequeue
 +  @nmsg.get( 1 )      # [{ :one=>"ichi", :TID=>1 }]
 +  @nmsg.get( 0 )      # nil
 +  @nmsg.get( 2 )      # []
 +</code>
 +
 +TIDを指定して、キューの任意の位置からデータを取り出すことができます。\\
 +データを取り出してもデータが自動的に消されることはなく、キューがあふれることによってのみ消されます。\\
 +キューのサイズは、コンストラクタで指定できます。\\
  
alworker/番号付きメッセージキュー.txt · 最終更新: 2013/08/01 01:05 by hirohito