Class: AlWorker::NumberedMessage
- Inherits:
-
Object
- Object
- AlWorker::NumberedMessage
- Includes:
- Sync_m
- Defined in:
- lib/al_worker_message.rb
Overview
Numbered message queue
(note) 接続が断続的になる場合(httpなど)を想定して、 クライアントが メッセージの取りこぼしがないように、番号付きでキューを構成する。 番号はトランザクションIDと称し、単調増加させる。 クライアントはトランザクションIDを指定して、どこまでメッセージを 得たかを管理する。 メッセージオブジェクトは、Hashのみ。 スレッドセーフに実装してある。
Instance Attribute Summary collapse
-
#bc ⇒ BroadcastMessage
readonly
Send/receiveのためのBroadcastMessage.
-
#max_queue_size ⇒ Integer
readonly
キューサイズ.
-
#queue ⇒ Array<Hash>
readonly
メッセージキュー.
-
#tid ⇒ Integer
readonly
トランザクションID 1から単調増加する。.
Instance Method Summary collapse
-
#add(msg) ⇒ Integer
キューへメッセージ追加.
-
#cycle(tid, timeout = nil) ⇒ Object
メッセージ受信繰り返し動作.
-
#get(tid) ⇒ Array<Hash>, NilClass
キューからメッセージを取り出し.
-
#initialize(size = 10) ⇒ NumberedMessage
constructor
constructor.
-
#receive(tid) ⇒ Array<Hash>, NilClass
キューからメッセージを取り出し.
-
#send(msg) ⇒ Integer
キューへメッセージ追加するとともに receive待ちをしているスレッドを起こす.
-
#tid_min ⇒ Integer
キュー内の最小TIDを返す.
Constructor Details
#initialize(size = 10) ⇒ NumberedMessage
constructor
132 133 134 135 136 137 138 |
# File 'lib/al_worker_message.rb', line 132 def initialize( size = 10 ) super() # Sync_m needs. @tid = 0 @max_queue_size = size @queue = [] @bc = AlWorker::BroadcastMessage.new() end |
Instance Attribute Details
#bc ⇒ BroadcastMessage (readonly)
Returns send/receiveのためのBroadcastMessage.
124 125 126 |
# File 'lib/al_worker_message.rb', line 124 def bc @bc end |
#max_queue_size ⇒ Integer (readonly)
Returns キューサイズ.
118 119 120 |
# File 'lib/al_worker_message.rb', line 118 def max_queue_size @max_queue_size end |
#queue ⇒ Array<Hash> (readonly)
Returns メッセージキュー.
121 122 123 |
# File 'lib/al_worker_message.rb', line 121 def queue @queue end |
#tid ⇒ Integer (readonly)
Returns トランザクションID 1から単調増加する。.
115 116 117 |
# File 'lib/al_worker_message.rb', line 115 def tid @tid end |
Instance Method Details
#add(msg) ⇒ Integer
Note:
:TIDが予約語としてメッセージ内に追加される。
キューへメッセージ追加
150 151 152 153 154 155 156 157 158 159 |
# File 'lib/al_worker_message.rb', line 150 def add( msg ) synchronize( Sync::EX ) { msg[:TID] = (@tid += 1) @queue << msg while @queue.size > @max_queue_size @queue.shift end return @tid } end |
#cycle(tid, timeout = nil) ⇒ Object
Note:
メッセージ受信繰り返し動作
メッセージを待ち、指示された動作を実行することを繰り返す。 タイムアウト以外では帰らない。
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/al_worker_message.rb', line 242 def cycle( tid, timeout = nil ) @bc.attach() loop do # get messages. msg = get( tid ) tid = @tid if msg == [] msg = @queue.dup() if msg == nil # message has gone. # process each messages. msg.each do |m| yield( m ) tid = m[:TID] end tid += 1 # wait next messages. Timeout::timeout( timeout ) { @bc.receive() # waiting message. } end rescue Timeout::Error return nil ensure @bc.detach() end |
#get(tid) ⇒ Array<Hash>, NilClass
キューからメッセージを取り出し
169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/al_worker_message.rb', line 169 def get( tid ) synchronize( Sync::SH ) { i = @queue.size - 1 return [] if i < 0 || @queue[i][:TID] < tid while i >= 0 if @queue[i][:TID] <= tid return @queue[i..-1] end i -= 1 end } return nil end |
#receive(tid) ⇒ Array<Hash>, NilClass
Note:
キューからメッセージを取り出し
トランザクションがまだ発生していない場合、次メッセージがsendされるまで待つ。
208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/al_worker_message.rb', line 208 def receive( tid ) @bc.attach() ret = get( tid ) if ret == [] tid = @bc.receive() ret = get( tid ) end return ret ensure @bc.detach() end |
#send(msg) ⇒ Integer
キューへメッセージ追加するとともに receive待ちをしているスレッドを起こす
191 192 193 194 195 196 |
# File 'lib/al_worker_message.rb', line 191 def send( msg ) tid = add( msg ) @bc.send( tid ) Thread.pass return tid end |
#tid_min ⇒ Integer
キュー内の最小TIDを返す
227 228 229 |
# File 'lib/al_worker_message.rb', line 227 def tid_min() return @queue[0] == nil ? 0 : @queue[0][:TID] end |