Class: AlWorker::NumberedMessage

Inherits:
Object
  • Object
show all
Includes:
Sync_m
Defined in:
lib/al_worker_message.rb

Overview

Numbered message queue

(note) 接続が断続的になる場合(httpなど)を想定して、 クライアントが メッセージの取りこぼしがないように、番号付きでキューを構成する。 番号はトランザクションIDと称し、単調増加させる。 クライアントはトランザクションIDを指定して、どこまでメッセージを 得たかを管理する。 メッセージオブジェクトは、Hashのみ。 スレッドセーフに実装してある。

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (NumberedMessage) initialize(size = 10)

constructor

Parameters:

  • size (Integer) (defaults to: 10)

    キューサイズ



132
133
134
135
136
137
138
# File 'lib/al_worker_message.rb', line 132

def initialize( size = 10 )
  super()     # Sync_m needs.
  @tid = 0
  @max_queue_size = size
  @queue = []
  @bc = AlWorker::BroadcastMessage.new()
end

Instance Attribute Details

- (BroadcastMessage) bc (readonly)

Returns send/receiveのためのBroadcastMessage

Returns:



124
125
126
# File 'lib/al_worker_message.rb', line 124

def bc
  @bc
end

- (Integer) max_queue_size (readonly)

Returns キューサイズ

Returns:

  • (Integer)

    キューサイズ



118
119
120
# File 'lib/al_worker_message.rb', line 118

def max_queue_size
  @max_queue_size
end

- (Array<Hash>) queue (readonly)

Returns メッセージキュー

Returns:

  • (Array<Hash>)

    メッセージキュー



121
122
123
# File 'lib/al_worker_message.rb', line 121

def queue
  @queue
end

- (Integer) tid (readonly)

Returns トランザクションID 1から単調増加する。

Returns:

  • (Integer)

    トランザクションID 1から単調増加する。



115
116
117
# File 'lib/al_worker_message.rb', line 115

def tid
  @tid
end

Instance Method Details

- (Integer) add(msg)

Note:

:TIDが予約語としてメッセージ内に追加される。

キューへメッセージ追加

Parameters:

  • msg (Hash)

    メッセージ

Returns:

  • (Integer)

    トランザクションID



150
151
152
153
154
155
156
157
158
159
# File 'lib/al_worker_message.rb', line 150

def add( msg )
  synchronize( Sync::EX ) {
    msg[:TID] = (@tid += 1)
    @queue << msg
    while @queue.size > @max_queue_size
      @queue.shift
    end
    return @tid
  }
end

- (Object) cycle(tid, timeout = nil)

Note:

メッセージ受信繰り返し動作

メッセージを待ち、指示された動作を実行することを繰り返す。 タイムアウト以外では帰らない。

Parameters:

  • tid (Integer)

    初期トランザクションID

  • timeout (Integer) (defaults to: nil)

    待ち時間タイムアウト



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/al_worker_message.rb', line 242

def cycle( tid, timeout = nil )
  @bc.attach()
  loop do
    # get messages.
    msg = get( tid )
    tid = @tid  if msg == []
    msg = @queue.dup()  if msg == nil         # message has gone.

    # process each messages.
    msg.each do |m|
      yield( m )
      tid = m[:TID]
    end
    tid += 1

    # wait next messages.
    Timeout::timeout( timeout ) {
      @bc.receive()     # waiting message.
    }
  end

rescue Timeout::Error
  return nil

ensure
  @bc.detach()
end

- (Array<Hash>, NilClass) get(tid)

キューからメッセージを取り出し

Parameters:

  • tid (Integer)

    トランザクションID

Returns:

  • (Array<Hash>)

    メッセージの配列。まだ指定された番号のトランザクションが発生していない場合は、空配列を返す。

  • (NilClass)

    メッセージキューからすでに消えている場合



169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/al_worker_message.rb', line 169

def get( tid )
  synchronize( Sync::SH ) {
    i = @queue.size - 1
    return []  if i < 0 || @queue[i][:TID] < tid

    while i >= 0
      if @queue[i][:TID] <= tid
        return @queue[i..-1]
      end
      i -= 1
    end
  }
  return nil
end

- (Array<Hash>, NilClass) receive(tid)

Note:

キューからメッセージを取り出し

トランザクションがまだ発生していない場合、次メッセージがsendされるまで待つ。

Parameters:

  • tid (Integer)

    トランザクションID

Returns:

  • (Array<Hash>)

    メッセージの配列。

  • (NilClass)

    メッセージキューからすでに消えている場合



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/al_worker_message.rb', line 208

def receive( tid )
  @bc.attach()
  ret = get( tid )
  if ret == []
    tid = @bc.receive()
    ret = get( tid )
  end
  return ret

ensure
  @bc.detach()
end

- (Integer) send(msg)

キューへメッセージ追加するとともに receive待ちをしているスレッドを起こす

Parameters:

  • msg (Hash)

    メッセージ

Returns:

  • (Integer)

    トランザクションID



191
192
193
194
195
196
# File 'lib/al_worker_message.rb', line 191

def send( msg )
  tid = add( msg )
  @bc.send( tid )
  Thread.pass
  return tid
end

- (Integer) tid_min

キュー内の最小TIDを返す

Returns:

  • (Integer)

    最小TID



227
228
229
# File 'lib/al_worker_message.rb', line 227

def tid_min()
  return @queue[0] == nil ? 0 : @queue[0][:TID]
end