ユーザ用ツール

サイト用ツール


alworker:番号付きメッセージキュー

AlWorker 番号付きメッセージキュー

require "al_worker_message"


接続が断続的になる場合(httpなど)を想定して、 クライアントがメッセージの取りこぼしがないように、番号付きのキューを実現します。
番号はトランザクションIDと称し、単調増加させます。
クライアントはトランザクションIDを指定して、どこまでメッセージを得たかを管理します。
メッセージオブジェクトはHashのみで、キー :TID がフレームワークによって追加されます。

使い方

使用頻度が高いと思われる、キューとブロードキャストの組み合わせを先に説明します。

送り側

 @nmsg = NumberedMessage.new()
  @nmsg.send( {:message=>"message here"} )

受け側

  def tcp_a_recv( sock, param )
    msg = @nmsg.receive( @tid )    # @tid(トランザクションID)は、別の箇所で確定させておく。
    sock.puts msg.to_s             # => [{:message=>"message here", :TID=>1}]
    return true
  end

解説

@nmsg.receive()がコールされると引数(トランザクションID)の値によって以下の動作をします。

  • 既に発生したトランザクションでキューに保存されている場合は、そこから最新までの全ての値を配列で返します。
  • 既に発生したトランザクションで既にキューから削除されている場合は、nilが返ります。
  • まだ指定したIDのトランザクションが発生していない場合は、動作を一旦停止し、メッセージが送られるのを待ちます。

@nmsg.send()をコールしてメッセージを送ると、@nmsg.receive()の返り値としてメッセージを受け取り、動作を再開します。

受け側は、メッセージを受信するごとに何らかの動作を行い、また受信待ちになるというサイクルを繰り返すパターンが多いので、専用のメソッド、cycle() も用意しています。使用例は、サンプルを参照してください。

注意点・動作上の制限

  • 内部で BroadcastMessageを使っていますので、使い方・動作上の制限がそのままこちらへも適用されます。BroadcastMessageの説明を併せてご覧ください。

サンプル

nmsg.rb
require "al_worker_message"
require "al_worker_tcp"
 
class AlWorker1 < AlWorker
  def initialize2()
    @msg = NumberedMessage.new()
    @tcp = Tcp.new.run( self )
    @tid = 1
  end
 
  # メッセージ送信コマンド
  def tcp_send( sock, param )
    @tid = @msg.send( {:message=>param[""]} )
    return true
  end
 
  # メッセージ受信コマンド
  #  receive() コマンドの使用例
  def tcp_a_recv( sock, param )
    tid = @tid
    loop {
      m = @msg.receive( tid )
      sock.puts m.to_s
      tid += 1
    }
  end
 
  # メッセージ受信コマンド
  #  cycle() メソッドの使用例
  def tcp_a_recv2( sock, param )
    @msg.cycle( @tid ) { |m|
      sock.puts m.to_s
    }
  end
end
 
AlWorker1.new.run

解説

実行するとデフォルトのTCPポート番号、1944番で接続を待ち受けます。
接続されたクライアントから、"recv"コマンドを受信するとメッセージ受信モードになり、キューから最後のメッセージを取り出して表示します。
別のクライアントから、"send <message>" コマンドを送信すると、メッセージ待ちモードのクライアントにメッセージを転送します。

送信側クライアント実行例

$telnet localhost 1944
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
send MESSAGE ONE
send MESSAGE TWO
 (このタイミングで受信側を実行)
send MESSAGE THREE

受信側クライアント実行例

$telnet localhost 1944
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
recv
[{:message=>"MESSAGE TWO", :TID=>2}]
[{:message=>"MESSAGE THREE", :TID=>3}]

キューのみの使用

(メッセージング機能を持たない)キューのみを使用することもできます。

  @mmsg = AlWorker::NumberedMessage.new
 
  # enqueue
  tid = @nmsg.add( { :one=>"ichi" } )      # tid == 1
 
  # dequeue
  @nmsg.get( 1 )      # [{ :one=>"ichi", :TID=>1 }]
  @nmsg.get( 0 )      # nil
  @nmsg.get( 2 )      # []

TIDを指定して、キューの任意の位置からデータを取り出すことができます。
データを取り出してもデータが自動的に消されることはなく、キューがあふれることによってのみ消されます。
キューのサイズは、コンストラクタで指定できます。

alworker/番号付きメッセージキュー.txt · 最終更新: 2013/08/01 01:05 by hirohito