====== AlWorker IPC プロセス間通信 ====== require "al_worker_ipc" ---- UNIX socket を利用した IPC プロセス間通信を実現します。 典型的な仕様例は、サーバプログラムが常駐し機器類の監視・制御を行います。そのサーバプログラムに対し、別プロセスとして起動したクライアントプログラムから指示を出したり、状態を取得したりします。 {{:alworker:IPC_explain1.png?nolink|}} ====== サンプル ====== # IPC サーバー require "al_worker_ipc" class IpcServer < AlWorker # イニシャライザでIPCを用意する。 def initialize2() @ipc = Ipc.new() @ipc.chmod = 0666 @ipc.run( self ) end # IPCコマンド idle を定義 # 動作:何もしない def ipc_idle( sock, param ) reply( sock, 200, "OK" ) end # IPCコマンド hello を定義 # 動作:文字列 hello を返す。 def ipc_hello( sock, param ) reply( sock, 200, "OK", reply: "hello" ) end # IPCコマンド upper を定義 # 動作:送られたパラメータの値を大文字にして返す def ipc_upper( sock, param ) param.keys.each { |k| param[k].upcase! } reply( sock, 200, "OK", param ) end end server = IpcServer.new( "ipc_server" ) server.daemon() # IPC クライアント require "al_worker_ipc" AlWorker::Ipc.open( "/tmp/ipc_server" ) { |ipc| puts "IPC command: idle. ret: " + ipc.call( "idle" ).to_s puts "IPC command: hello. ret: " + ipc.call( "hello" ).to_s puts "IPC command: upper with param {'A'=>'dog','B'=>'Cat'} ret: " + ipc.call( "upper", {'A'=>'dog','B'=>'Cat'} ).to_s } ===== 実行結果 ===== IPC command: idle. ret: {} IPC command: hello. ret: {"reply"=>"hello"} IPC command: upper with param {"A"=>"dog", "B"=>"Cat"} ret: {"A"=>"DOG", "B"=>"CAT"} ===== 解説 ===== サーバーは、AlWorkerを継承したクラスに "ipc_" をプレフィックスとして付与したメソッドを定義すると、AloneにはIPCコマンドのイベントハンドラとして認識されます。\\ クライアントは、AlWorker::IPC.openメソッドでサーバーに接続し、call メソッドで IPCコールを行います。 サーバー/クライアント間のパラメータ送受は、Hashを使います。\\ 各ハンドラでは、trueを返すとクライアントとのセッションを継続し、falseだとサーバー側から切断します。上記例ではreply()メソッドが必ずtrueを返すので、セッションが継続します。\\ ====== CGIコントローラから使う ====== ウェブブラウザを使って状態監視等を行う場合には、クライアントプログラムがCGIプログラムになります。 {{:alworker:IPC_explain2.png?nolink|}} ===== サンプル ===== require "al_template" require "al_worker_ipc" SERVER_NODE = "/tmp/ipc_server" class IpcSampleController < AlController def action_index() @result_message = "ここにIPCプログラムからの戻り値が表示されます。" AlTemplate.run("index.rhtml") end def action_idle() ipc = AlWorker::Ipc.open(SERVER_NODE) @result_message = ipc.call("idle").to_s AlTemplate.run("index.rhtml") end def action_hello() ipc = AlWorker::Ipc.open(SERVER_NODE) @result_message = ipc.call("hello").to_s AlTemplate.run("index.rhtml") end def action_upper() ipc = AlWorker::Ipc.open(SERVER_NODE) @result_message = ipc.call("upper", {'A'=>'dog','B'=>'Cat'}).to_s AlTemplate.run("index.rhtml") end end <%= header_section %> Test <%= body_section %> IPC 送信コマンド

<%= footer_section %>
===== 解説 ===== このサンプルではCGIモジュールのコントローラからIPCを発行しています。 IPC通信内容は、先に説明したサンプルと同様です。\\ プログラムの動作フローを以下に示します。 - テンプレートファイル index.rhtml で [idle] [hello] [upper] の3つのボタンが作られます。 - ボタンをクリックすると、action_idel, action_hello, action_upper の各アクションが呼ばれます。 - 各アクションでは、AlWorker::IPC.open() にてIPC通信路を確保し、ipc.call() でコマンドを発行します。 - IPCの結果は、@result_message → index.rhtml 経由で画面に表示されます。 ====== IPCソケット ====== 接続用ソケットは、デフォルトで /tmp以下に AlWorkerに付けた名前で作成されます。 変更するには、イニシャライザでフルパスを指定します。\\ ソケットファイルのパーミッションは、システムのデフォルトとなります。変更するには、chmodアトリビュートを設定することで変更できます。 @ipc = Ipc.new( "/PATH/TO/socketfile" ) @ipc.chmod = 0666 @ipc.run( self ) ====== 同期/非同期 ====== イベントハンドラはデフォルトで同期的に呼び出され、たとえば2つのクライアントから同時にコマンドを受信しても、一つずつ実行します。\\ 非同期にしたい場合は、"ipc_a_" をプレフィックスとして付けます。\\ def ipc_a_idle( sock, param ) reply( sock, 200, "OK" ) end ====== IPCのエラー ====== httpステータスコードに倣った、IPCステータスコードを採用しています。\\ IPCコマンドメソッドでエラーを検出し、それをクライアントへ返したい場合、reply()メソッドの引数で指示します。\\ ステータスコード(数字)と、ステータス文字列は、httpステータスコードに近い状態がある場合はそれを使うように努力してください。\\ クライアント側では、ipcオブジェクトのstatus_codeアトリビュートを使って参照します。 ipc.call( "idle" ) puts ipc.status_code ipc.call( "noexist" ) puts ipc.status_code ===== 実行結果 ===== 200 OK 501 Error Command not implemented. ====== 標準で用意してあるIPCコマンド ====== クライアント(IpcClient)に用意してあるメソッド ^メソッド^動作^返り値^ |call|任意のIPCコマンドを呼び出す|Hash| |call_json|任意のIPCコマンドを呼び出す|JsonString| |set_value|@valuesのセッター| | |get_value|@valuesのゲッター|Object or Hash| |get_value_wt|@valuesのゲッター タイムアウト付き|Object or Hash| サーバー側 ^IPCコマンド^動作^同期^タイムアウト^備考^ |quit|終了|async|--| | |get_values|値取得 全データ|async|しない| | |get_values "key1"|値取得 指定データ|async|しない|標準プロトコルではないが、利便性のために用意してある| |get_values {"key":["key1"...]}|値取得 指定データ|async|しない| | |get_values_wt|値取得 全データ|async|1s| | |get_values_wt "key1"|値取得 指定データ|async|1s|標準プロトコルではないが、利便性のために用意してある| |get_values_wt {"key":["key1"...],"timeout":5}|値取得 指定データ|async|指定| | |set_values|値設定|async|しない| | ====== IPCプロトコル詳細 ====== IPCは、UNIX Socket + テキストベースで設計し、データ形式はJSONで統一してあります。これによって、対Rubyだけでなく、C言語など様々な言語からAlWorkerのIPCをコールすることができます。 リクエスト command { parameter, encoded by json } リプライ(ステータスのみの場合) 200 OK [any message if given.] リプライ(データを含む場合) 200. OK [any message if given.] { data, encoded by json } (LF) リクエストは、コマンドに続いて、JSONエンコードしたパラメータを渡すことができます。\\ リプライは、httpプロトコルに倣ったステータス行を返し、必要に応じてJSONエンコードしたデータを返す事ができます。\\ データの有無は、ステータスコードの数字直後のピリオドの有無で判断します。 ====== IPCパススルー ====== IPCプロトコルに準拠しない、自由なテキストデータの送受信も可能です。これを、IPCパススルーと呼んでいます。 コマンドごとに、以下のように実装します。 ===== サーバサイド ===== def ipc_passthrough1( sock, param ) # sock.gets / sock.puts を使って送受信する。 while txt = sock.gets sock.puts txt break if txt == "END\n" end # コネクションを切る return false end ===== クライアント ===== require "al_worker_ipc" AlWorker::Ipc.open("/tmp/ipc_server") {|ipc| # ipcは、AlWorekr::IpcClient < UNIXSocket のインスタンス ipc.puts("passthrough1") 5.times { ipc.puts("hello world.") p ipc.gets() } } ===== 解説 ===== クライアントは、IPCソケットに接続後、"passthrough1" コマンドを発行します。\\ フレームワークによって上記メソッドが呼び出され、以後 sock を使って自由フォーマットで送受信できます。\\ 終了時は、falseを返すと、サーバー側からコネクションを切断します。trueを返すと、切断はされず、再びコマンド受付に戻ります。