Class: AlWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/al_worker.rb

Overview

ワーカースーパークラス

Defined Under Namespace

Modules: Debug, IpcAction Classes: BroadcastMessage, Fd, Ipc, IpcClient, NumberedMessage, Program, Tcp, Timer

Constant Summary

DEFAULT_WORKDIR =
"/tmp"
DEFAULT_NAME =
"al_worker"
LOG_SEVERITY =
{ :fatal=>Logger::FATAL, :error=>Logger::ERROR,
:warn=>Logger::WARN, :info=>Logger::INFO, :debug=>Logger::DEBUG }
@@log =

Returns ロガー

Returns:

  • (Logger)

    ロガー

nil
@@mutex_sync =

Returns 同期実行用mutex

Returns:

  • (Mutex)

    同期実行用mutex

Mutex.new

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (AlWorker) initialize(name = nil)

constructor

Parameters:

  • name (String) (defaults to: nil)

    識別名



160
161
162
163
164
165
166
167
168
# File 'lib/al_worker.rb', line 160

def initialize( name = nil )
  @values = {}
  @values_rwlock = Sync.new
  @workdir = DEFAULT_WORKDIR
  @name = name || DEFAULT_NAME
  @state = ""

  Signal::trap( :QUIT, proc{ signal_quit } )
end

Instance Attribute Details

- (String) log_filename

Returns ログファイル名(フルパス)

Returns:

  • (String)

    ログファイル名(フルパス)



136
137
138
# File 'lib/al_worker.rb', line 136

def log_filename
  @log_filename
end

- (String) name (readonly)

Returns ユニークネーム

Returns:

  • (String)

    ユニークネーム



139
140
141
# File 'lib/al_worker.rb', line 139

def name
  @name
end

- (String) pid_filename

Returns pidファイル名(フルパス)

Returns:

  • (String)

    pidファイル名(フルパス)



133
134
135
# File 'lib/al_worker.rb', line 133

def pid_filename
  @pid_filename
end

- (String) privilege

Returns 実行権限ユーザ名

Returns:

  • (String)

    実行権限ユーザ名



145
146
147
# File 'lib/al_worker.rb', line 145

def privilege
  @privilege
end

- (String) program_name

Returns 現在実行中のRubyスクリプトの名前を表す文字列 $PROGRAM_NAME

Returns:

  • (String)

    現在実行中のRubyスクリプトの名前を表す文字列 $PROGRAM_NAME



142
143
144
# File 'lib/al_worker.rb', line 142

def program_name
  @program_name
end

- (Boolean) software_watchdog

Returns ソフトウェアウォッチドッグ機能を使用

Returns:

  • (Boolean)

    ソフトウェアウォッチドッグ機能を使用



151
152
153
# File 'lib/al_worker.rb', line 151

def software_watchdog
  @software_watchdog
end

- (String) state (readonly)

Returns ステート(ステートマシン用)

Returns:

  • (String)

    ステート(ステートマシン用)



148
149
150
# File 'lib/al_worker.rb', line 148

def state
  @state
end

- (Hash) values

Returns 外部提供を目的とする値のHash IPCの関係でキーは文字列のみとする。

Returns:

  • (Hash)

    外部提供を目的とする値のHash IPCの関係でキーは文字列のみとする。



124
125
126
# File 'lib/al_worker.rb', line 124

def values
  @values
end

- (Sync) values_rwlock (readonly)

Returns @values の reader writer lock

Returns:

  • (Sync)

    @values の reader writer lock



127
128
129
# File 'lib/al_worker.rb', line 127

def values_rwlock
  @values_rwlock
end

- (String) workdir

Returns ワークファイルの作成場所

Returns:

  • (String)

    ワークファイルの作成場所



130
131
132
# File 'lib/al_worker.rb', line 130

def workdir
  @workdir
end

Class Method Details

+ (Logger) log(arg = nil, severity = nil, progname = nil)

ログ出力

Parameters:

  • arg (String, Object) (defaults to: nil)

    エラーメッセージ

  • severity (Symbol) (defaults to: nil)

    ログレベル :fatal, :error …

  • progname (String) (defaults to: nil)

    プログラム名

Returns:

  • (Logger)

    Loggerオブジェクト



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/al_worker.rb', line 50

def self.log( arg = nil, severity = nil, progname = nil )
  return nil  if ! @@log

  s = LOG_SEVERITY[ severity ]
  if arg.class == String
    @@log.add( s || Logger::INFO, arg, progname )

  elsif arg.is_a?( Exception )
    @@log.add( s || Logger::ERROR, "#{arg.class} / #{arg.message}", progname )
    @@log.add( s || Logger::ERROR, "BACKTRACE: \n  " + arg.backtrace.join( "\n  " ) + "\n", progname )

  elsif arg != nil
    @@log.add( s || Logger::INFO, arg.to_s, progname )
  end
  return @@log
end

+ (Object) mutex_sync

同期実行用mutexのアクセッサ



36
37
38
# File 'lib/al_worker.rb', line 36

def self.mutex_sync()
  return @@mutex_sync
end

+ (Object) na(method_name)

Note:

ステートマシンで無視するイベントの記述

クラス定義中に、na :state_XXX_event_YYY の様に記述する。



117
118
119
# File 'lib/al_worker.rb', line 117

def self.na( method_name )
  define_method( method_name ) { |*args| }
end

+ (String, Hash) parse_request(req)

IPC定形リクエストからコマンドとパラメータを解析・取り出し

Parameters:

  • req (String)

    リクエスト

Returns:

  • (String)

    コマンド

  • (Hash)

    パラメータ



75
76
77
78
79
80
81
# File 'lib/al_worker.rb', line 75

def self.parse_request( req )
  (cmd,param) = req.split( " ", 2 )
  return cmd,{}  if param == nil
  param.strip!
  return cmd,{}  if param.empty?
  return cmd,( JSON.parse( param ) rescue { ""=>param } )
end

+ (True) reply(sock, st_code, st_msg, val = nil)

Note:

IPC定形リプライ

定形リプライフォーマット

 (ステータスコード) "200. Message"
 (JSONデータ)       { .... }
JSONデータは付与されない場合がある。
その判断は、ステータスコードの数字直後のピリオドの有無で行う。

Parameters:

  • sock (Socket)

    返信先ソケット

  • st_code (Integer)

    ステータスコード

  • st_msg (String)

    ステータスメッセージ

  • val (Hash) (defaults to: nil)

    リプライデータ

Returns:

  • (True)


99
100
101
102
103
104
105
106
107
108
# File 'lib/al_worker.rb', line 99

def self.reply( sock, st_code, st_msg, val = nil )
  sock.puts ("%03d" % st_code) + (val ? ". " : " ") + st_msg
  if val
    sock.puts val.to_json, ""
  end
  return true

rescue Errno::EPIPE
  Thread.exit
end

Instance Method Details

- (Object) daemon

デーモンになって実行



430
431
432
433
434
435
436
# File 'lib/al_worker.rb', line 430

def daemon()
  if @flag_debug
    run()
  else
    run( :daemon )
  end
end

- (Object) get_value(key)

Note:

valueのゲッター タイムアウトなし(単一値)

値はdupして返す。

Parameters:

  • key (String)

    キー

Returns:

  • (Object)



239
240
241
242
243
# File 'lib/al_worker.rb', line 239

def get_value( key )
  @values_rwlock.synchronize( Sync::SH ) {
    return @values[ key.to_s ].dup rescue @values[ key.to_s ]
  }
end

- (Object, Boolean) get_value_wt(key, timeout = 1)

Note:

valueのゲッター タイムアウト付き(単一値)

値はdupして返す。

Parameters:

  • key (String)

    キー

  • timeout (Numeric) (defaults to: 1)

    タイムアウト時間

Returns:

  • (Object)

  • (Boolean)

    ロック状態



276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/al_worker.rb', line 276

def get_value_wt( key, timeout = 1 )
  locked = false
  (timeout * 10).times {
    locked = @values_rwlock.try_lock( Sync::SH )
    break if locked
    sleep 0.1
  }

  return (@values[ key.to_s ].dup rescue @values[ key.to_s ]), locked

ensure
  @values_rwlock.unlock( Sync::SH ) if locked
end

- (Hash) get_values(keys)

Note:

valueのゲッター タイムアウトなし(複数値)

値はdupするが、簡素化のためにディープコピーは行っていない。 文字列では問題ないが、配列などが格納されている場合は注意が必要。

Parameters:

  • keys (Array)

    キーの配列

Returns:

  • (Hash)



255
256
257
258
259
260
261
262
263
# File 'lib/al_worker.rb', line 255

def get_values( keys )
  ret = {}
  @values_rwlock.synchronize( Sync::SH ) {
    keys.each do |k|
      ret[ k.to_s ] = @values[ k.to_s ].dup rescue @values[ k.to_s ]
    end
  }
  return ret
end

- (String) get_values_json(key = nil)

valueのゲッター JSON版 タイムアウトなし

Parameters:

  • key (String, Array) (defaults to: nil)

    取得する値のキー文字列

Returns:

  • (String)

    保存されている値のJSON文字列



327
328
329
330
331
332
333
334
335
336
# File 'lib/al_worker.rb', line 327

def get_values_json( key = nil )
  @values_rwlock.synchronize( Sync::SH ) {
    if key.class == Array
      ret = {}
      key.each { |k| ret[ k ] = @values[ k ] }
      return ret.to_json
    end
    return ( key ? { key => @values[key] } : @values ).to_json
  }
end

- (String, Boolean) get_values_json_wt(key = nil, timeout = nil)

valuesのゲッター JSON版 タイムアウト付き

Parameters:

  • key (String, Array) (defaults to: nil)

    取得する値のキー文字列

  • timeout (Numeric) (defaults to: nil)

    タイムアウト時間

Returns:

  • (String)

    保存されている値のJSON文字列

  • (Boolean)

    ロック状態



347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/al_worker.rb', line 347

def get_values_json_wt( key = nil, timeout = nil )
  locked = false
  timeout ||= 1       # can't change. see AlWorker::Ipc#ipc_a_get_values_wt()
  (timeout * 10).times {
    locked = @values_rwlock.try_lock( Sync::SH )
    break if locked
    sleep 0.1
  }
  if key.class == Array
    ret = {}
    key.each { |k| ret[ k ] = @values[ k ] }
    return ret.to_json, locked
  end
  return ( key ? { key => @values[key] } : @values ).to_json, locked

ensure
  @values_rwlock.unlock( Sync::SH ) if locked
end

- (Object, Boolean) get_values_wt(keys, timeout = 1)

Note:

valueのゲッター タイムアウト付き(複数値)

値はdupするが、簡素化のためにディープコピーは行っていない。 文字列では問題ないが、配列などが格納されている場合は注意が必要。

Parameters:

  • keys (Array)

    キーの配列

  • timeout (Numeric) (defaults to: 1)

    タイムアウト時間

Returns:

  • (Object)

  • (Boolean)

    ロック状態



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/al_worker.rb', line 302

def get_values_wt( keys, timeout = 1 )
  locked = false
  (timeout * 10).times {
    locked = @values_rwlock.try_lock( Sync::SH )
    break if locked
    sleep 0.1
  }

  ret = {}
  keys.each do |k|
    ret[ k.to_s ] = @values[ k.to_s ].dup rescue @values[ k.to_s ]
  end
  return ret, locked

ensure
  @values_rwlock.unlock( Sync::SH ) if locked
end

- (Object) initialize2

Note:

初期化2

常駐後に処理をさせるには、これをオーバライドする。



545
546
# File 'lib/al_worker.rb', line 545

def initialize2()
end

- (Object) load_values(filename = nil)

値(@values)読み込み



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/al_worker.rb', line 395

def load_values( filename = nil )
  filename ||= File.join( @workdir, @name ) + ".values"
  digest = Digest::SHA1.file( filename ) rescue nil
  return nil if ! digest      # same as file not found.

  digestfile = File.join( File.dirname(filename), File.basename(filename,".*") ) + ".sha1"
  digestfile_value = File.read( digestfile ) rescue nil
  if digestfile_value
    return nil  if digest != digestfile_value
  end

  json = ""
  File.open( filename, "r" ) { |f|
    while txt = f.gets
      break if txt == "VALUES: \n"
    end
    if txt == "VALUES: \n"
      while txt = f.gets
        json << txt
      end
    end
  }
  return nil  if json == ""
  begin
    @values = JSON.parse( json )
    return true
  rescue
    return false
  end
end

- (Object) log(arg = nil, severity = nil, progname = nil)

ログ出力

See Also:

  • log()


554
555
556
# File 'lib/al_worker.rb', line 554

def log( arg = nil, severity = nil, progname = nil )
  AlWorker.log( arg, severity, progname )
end

- (Object) no_method_error(event)

メソッドエラーの場合のエラーハンドラ



609
610
611
# File 'lib/al_worker.rb', line 609

def no_method_error( event )
  raise "No action defined. state: #{@state}, event: #{event}"
end

- (Object) parse_option(argv = ARGV)

基本的なオプションの解析



174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/al_worker.rb', line 174

def parse_option( argv = ARGV )
  i = 0
  while i < argv.size
    case argv[i]
    when "-d"
      @flag_debug = true
    when "-p"
      @pid_filename = argv[i += 1]
    when "-l"
      @log_filename = argv[i += 1]
    end
    i += 1
  end
end

- (Object) reply(sock, st_code, st_msg, val = nil)

IPC定形リプライ

See Also:

  • reply()


564
565
566
# File 'lib/al_worker.rb', line 564

def reply( sock, st_code, st_msg, val = nil )
  AlWorker.reply( sock, st_code, st_msg, val )
end

- (Object) run(*modes)

実行開始

Parameters:

  • modes (Symbol)

    動作モード nul デーモンにならずに実行 :daemon デーモンで実行 :nostop デーモンにならずスリープもしない :nopid プロセスIDファイルを作らない :nolog ログファイルを作らない :exit_idle_task アイドルタスクが終了したら

    プロセスも終了する


450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
# File 'lib/al_worker.rb', line 450

def run( *modes )
  # 実効権限変更(放棄)
  if @privilege
    uid = Etc.getpwnam( @privilege ).uid
    Process.uid = uid
    Process.euid = uid
  end

  # ログ準備
  if modes.include?( :nolog )
    @@log == nil
  elsif @@log == nil
    @log_filename ||= File.join( @workdir, @name ) + ".log"
    @@log = Logger.new( @log_filename, 3 )
    @@log.level = @flag_debug ? Logger::DEBUG : Logger::INFO
  end

  if ! modes.include?( :nopid )
    @pid_filename ||= File.join( @workdir, @name ) + ".pid"
    # 実行可/不可確認
    if File.directory?( @pid_filename )
      puts "ERROR: @pid_filename is directory."
      exit( 64 )
    end
    if File.exist?( @pid_filename )
      puts "ERROR: Still work."
      exit( 64 )
    end

    # プロセスIDファイル作成
    # (note) pid作成エラーの場合は、daemonになる前にここで検出される。
    File.open( @pid_filename, "w" ) { |file| file.write( Process.pid ) }
  end

  # 常駐処理
  if modes.include?( :daemon )
    Process.daemon()
    # プロセスIDファイル再作成
    if ! modes.include?( :nopid )
      File.open( @pid_filename, "w" ) { |file| file.write( Process.pid ) }
    end
  end
  if @program_name
    $PROGRAM_NAME = @program_name
  end

  # 終了時処理
  at_exit {
    if ! modes.include?( :nopid )
      File.unlink( @pid_filename ) rescue 0
    end
    AlWorker.log( "finish", :info, @name )
  }

  # 初期化2
  AlWorker.log( "start", :info, @name )
  begin
    initialize2()
  rescue Exception => ex
    raise ex  if ex.class == SystemExit
    AlWorker.log( ex )
    raise ex  if STDERR.isatty
    exit( 64 )
  end

  # アイドルタスク
  if respond_to?( :idle_task, true )
    Thread.start {
      Thread.current.priority -= 1
      begin
        idle_task()
      rescue Exception => ex
        raise ex  if ex.class == SystemExit
        AlWorker.log( ex )
        if STDERR.isatty
          STDERR.puts ex.to_s
          STDERR.puts ex.backtrace.join("\n") + "\n"
        end
      end
      exit  if modes.include?( :exit_idle_task )
    }
  end

  # メインスレッド停止
  return  if modes.include?( :nostop )
  sleep
end

- (Object) save_values

Note:

値(@values)保存

排他処理なし。 バックアップファイルを3つまで作成する。



374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/al_worker.rb', line 374

def save_values()
  filename = File.join( @workdir, @name ) + ".values"
  File.rename( filename + ".bak2", filename + ".bak3" ) rescue 0
  File.rename( filename + ".bak1", filename + ".bak2" ) rescue 0
  File.rename( filename,           filename + ".bak1" ) rescue 0

  File.open( filename, "w" ) { |f|
    f.puts "DATE: #{Time.now}"
    f.puts "NAME: #{@name}"
    f.puts "SELF: #{self.inspect}"
    f.puts "VALUES: \n#{@values.to_json}"
  }
  File.open( File.join( @workdir, @name ) + ".sha1", "w" ) { |file|
    file.write( Digest::SHA1.file( filename ) )
  }
end

- (Object) set_state(state) Also known as: state=, next_state

現在のステートを宣言する

Parameters:

  • state (String)

    ステート文字列



619
620
621
622
# File 'lib/al_worker.rb', line 619

def set_state( state )
  @state = state.to_s
  AlWorker.log( "change state to #{@state}", :debug, @name )
end

- (Object) set_value(key, val)

valueのセッター(単一値)

Parameters:

  • key (String)

    キー

  • val (Object)



216
217
218
# File 'lib/al_worker.rb', line 216

def set_value( key, val )
  @values_rwlock.synchronize( Sync::EX ) { @values[ key.to_s ] = val }
end

- (Object) set_values(values)

valueのセッター(複数値)

Parameters:

  • values (Hash)

    セットする値



226
227
228
# File 'lib/al_worker.rb', line 226

def set_values( values )
  @values_rwlock.synchronize( Sync::EX ) { @values.merge!( values ) }
end

- (Object) signal_quit

Note:

シグナルハンドラ SIGQUIT

デバグ用

状態をファイルに書き出す。
画面があれば、表示する。


198
199
200
201
202
203
204
205
206
207
# File 'lib/al_worker.rb', line 198

def signal_quit()
  save_values()

  if STDOUT.isatty
    puts "\n===== @values ====="
    @values.keys.sort.each do |k|
      puts "#{k}=> #{@values[k]}"
    end
  end
end

- (Object) trigger_event(event, *args)

ステートマシン 実行メソッド割り当て

Parameters:

  • event (String)

    イベント名

  • args (Array)

    引数



575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
# File 'lib/al_worker.rb', line 575

def trigger_event( event, *args )
  @respond_to = "from_#{@state}_event_#{event}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  @respond_to = "state_#{@state}_event_#{event}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  @respond_to = "event_#{event}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  @respond_to = "state_#{@state}"
  if respond_to?( @respond_to )
    AlWorker.log( "st:#{@state} ev:#{event} call:#{@respond_to}", :debug, @name )
    return __send__( @respond_to, *args )
  end

  # 実行すべきメソッドが見つからない場合
  @respond_to = ""
  no_method_error( event )
end