目次

AlWorker IPC プロセス間通信

require "al_worker_ipc"


UNIX socket を利用した IPC プロセス間通信を実現します。 典型的な仕様例は、サーバプログラムが常駐し機器類の監視・制御を行います。そのサーバプログラムに対し、別プロセスとして起動したクライアントプログラムから指示を出したり、状態を取得したりします。

サンプル

ipc_server.rb
# IPC サーバー
require "al_worker_ipc"
 
class IpcServer < AlWorker
 
  # イニシャライザでIPCを用意する。
  def initialize2()
    @ipc = Ipc.new()
    @ipc.chmod = 0666
    @ipc.run( self )
  end
 
  # IPCコマンド idle を定義
  # 動作:何もしない
  def ipc_idle( sock, param )
    reply( sock, 200, "OK" )
  end
 
  # IPCコマンド hello を定義
  # 動作:文字列 hello を返す。
  def ipc_hello( sock, param )
    reply( sock, 200, "OK", reply: "hello" )
  end
 
  # IPCコマンド upper を定義
  # 動作:送られたパラメータの値を大文字にして返す
  def ipc_upper( sock, param )
    param.keys.each { |k| param[k].upcase! }
    reply( sock, 200, "OK", param )
  end
 
end
 
server = IpcServer.new( "ipc_server" )
server.daemon()
ipc_client.rb
# IPC クライアント
require "al_worker_ipc"
 
AlWorker::Ipc.open( "/tmp/ipc_server" ) { |ipc|
  puts "IPC command: idle. ret: " + ipc.call( "idle" ).to_s
  puts "IPC command: hello. ret: " + ipc.call( "hello" ).to_s
  puts "IPC command: upper with param {'A'=>'dog','B'=>'Cat'} ret: " + ipc.call( "upper", {'A'=>'dog','B'=>'Cat'} ).to_s
}

実行結果

IPC command: idle. ret: {}
IPC command: hello. ret: {"reply"=>"hello"}
IPC command: upper with param {"A"=>"dog", "B"=>"Cat"} ret: {"A"=>"DOG", "B"=>"CAT"}

解説

サーバーは、AlWorkerを継承したクラスに "ipc_" をプレフィックスとして付与したメソッドを定義すると、AloneにはIPCコマンドのイベントハンドラとして認識されます。
クライアントは、AlWorker::IPC.openメソッドでサーバーに接続し、call メソッドで IPCコールを行います。 サーバー/クライアント間のパラメータ送受は、Hashを使います。
各ハンドラでは、trueを返すとクライアントとのセッションを継続し、falseだとサーバー側から切断します。上記例ではreply()メソッドが必ずtrueを返すので、セッションが継続します。

CGIコントローラから使う

ウェブブラウザを使って状態監視等を行う場合には、クライアントプログラムがCGIプログラムになります。

サンプル

main.rb
require "al_template"
require "al_worker_ipc"
 
SERVER_NODE = "/tmp/ipc_server"
 
class IpcSampleController < AlController
 
  def action_index()
    @result_message = "ここにIPCプログラムからの戻り値が表示されます。"
    AlTemplate.run("index.rhtml")
  end
 
  def action_idle()
    ipc = AlWorker::Ipc.open(SERVER_NODE)
    @result_message = ipc.call("idle").to_s
    AlTemplate.run("index.rhtml")
  end
 
  def action_hello()
    ipc = AlWorker::Ipc.open(SERVER_NODE)
    @result_message = ipc.call("hello").to_s
    AlTemplate.run("index.rhtml")
  end
 
  def action_upper()
    ipc = AlWorker::Ipc.open(SERVER_NODE)
    @result_message = ipc.call("upper", {'A'=>'dog','B'=>'Cat'}).to_s
    AlTemplate.run("index.rhtml")
  end
 
end
index.rhtml
<%= header_section %>
<title>Test</title>
 
<%= body_section %>
IPC 送信コマンド
 
<p>
  <button onclick="location.href='<%=h Alone.make_uri(:action=>"idle") %>'">idle</button>
  <button onclick="location.href='<%=h Alone.make_uri(:action=>"hello") %>'">hello</button>
  <button onclick="location.href='<%=h Alone.make_uri(:action=>"upper") %>'">upper</button>
</p>
 
<p>
  <textarea rows="3" cols="50"><%=h @result_message %></textarea>
</p>
 
<%= footer_section %>

解説

このサンプルではCGIモジュールのコントローラからIPCを発行しています。 IPC通信内容は、先に説明したサンプルと同様です。
プログラムの動作フローを以下に示します。

  1. テンプレートファイル index.rhtml で [idle] [hello] [upper] の3つのボタンが作られます。
  2. ボタンをクリックすると、action_idel, action_hello, action_upper の各アクションが呼ばれます。
  3. 各アクションでは、AlWorker::IPC.open() にてIPC通信路を確保し、ipc.call() でコマンドを発行します。
  4. IPCの結果は、@result_message → index.rhtml 経由で画面に表示されます。

IPCソケット

接続用ソケットは、デフォルトで /tmp以下に AlWorkerに付けた名前で作成されます。 変更するには、イニシャライザでフルパスを指定します。
ソケットファイルのパーミッションは、システムのデフォルトとなります。変更するには、chmodアトリビュートを設定することで変更できます。

    @ipc = Ipc.new( "/PATH/TO/socketfile" )
    @ipc.chmod = 0666
    @ipc.run( self )

同期/非同期

イベントハンドラはデフォルトで同期的に呼び出され、たとえば2つのクライアントから同時にコマンドを受信しても、一つずつ実行します。
非同期にしたい場合は、"ipc_a_" をプレフィックスとして付けます。

  def ipc_a_idle( sock, param )
    reply( sock, 200, "OK" )
  end

IPCのエラー

httpステータスコードに倣った、IPCステータスコードを採用しています。
IPCコマンドメソッドでエラーを検出し、それをクライアントへ返したい場合、reply()メソッドの引数で指示します。
ステータスコード(数字)と、ステータス文字列は、httpステータスコードに近い状態がある場合はそれを使うように努力してください。
クライアント側では、ipcオブジェクトのstatus_codeアトリビュートを使って参照します。

  ipc.call( "idle" )
  puts ipc.status_code
 
  ipc.call( "noexist" )
  puts ipc.status_code

実行結果

200 OK
501 Error Command not implemented.

標準で用意してあるIPCコマンド

クライアント(IpcClient)に用意してあるメソッド

メソッド動作返り値
call任意のIPCコマンドを呼び出すHash
call_json任意のIPCコマンドを呼び出すJsonString
set_value@valuesのセッター
get_value@valuesのゲッターObject or Hash
get_value_wt@valuesのゲッター タイムアウト付きObject or Hash

サーバー側

IPCコマンド動作同期タイムアウト備考
quit終了async
get_values値取得 全データasyncしない
get_values "key1"値取得 指定データasyncしない標準プロトコルではないが、利便性のために用意してある
get_values {"key":["key1"…]}値取得 指定データasyncしない
get_values_wt値取得 全データasync1s
get_values_wt "key1"値取得 指定データasync1s標準プロトコルではないが、利便性のために用意してある
get_values_wt {"key":["key1"…],"timeout":5}値取得 指定データasync指定
set_values値設定asyncしない

IPCプロトコル詳細

IPCは、UNIX Socket + テキストベースで設計し、データ形式はJSONで統一してあります。これによって、対Rubyだけでなく、C言語など様々な言語からAlWorkerのIPCをコールすることができます。

リクエスト

command { parameter, encoded by json }

リプライ(ステータスのみの場合)

200 OK [any message if given.]

リプライ(データを含む場合)

200. OK [any message if given.]
{ data, encoded by json }
(LF)

リクエストは、コマンドに続いて、JSONエンコードしたパラメータを渡すことができます。
リプライは、httpプロトコルに倣ったステータス行を返し、必要に応じてJSONエンコードしたデータを返す事ができます。
データの有無は、ステータスコードの数字直後のピリオドの有無で判断します。

IPCパススルー

IPCプロトコルに準拠しない、自由なテキストデータの送受信も可能です。これを、IPCパススルーと呼んでいます。

コマンドごとに、以下のように実装します。

サーバサイド

  def ipc_passthrough1( sock, param )
    # sock.gets / sock.puts を使って送受信する。
    while txt = sock.gets
      sock.puts txt
      break if txt == "END\n"
    end
 
    # コネクションを切る
    return false
  end

クライアント

require "al_worker_ipc"
 
AlWorker::Ipc.open("/tmp/ipc_server") {|ipc|
  # ipcは、AlWorekr::IpcClient < UNIXSocket のインスタンス
  ipc.puts("passthrough1")
  5.times {
    ipc.puts("hello world.")
    p ipc.gets()
  }
}

解説

クライアントは、IPCソケットに接続後、"passthrough1" コマンドを発行します。
フレームワークによって上記メソッドが呼び出され、以後 sock を使って自由フォーマットで送受信できます。
終了時は、falseを返すと、サーバー側からコネクションを切断します。trueを返すと、切断はされず、再びコマンド受付に戻ります。