ユーザ用ツール

サイト用ツール


alworker:ipc_プロセス間通信

文書の過去の版を表示しています。


AlWorker IPC プロセス間通信

require "al_worker_ipc"


UNIX socket を利用した IPC プロセス間通信を実現します。

サンプル

ipc_server.rb
# IPC サーバー
require "al_worker_ipc"
 
class IpcServer < AlWorker
 
  # イニシャライザでIPCを用意する。
  def initialize2()
    @ipc = Ipc.new()
    @ipc.run( self )
  end
 
  # IPCコマンド idle を定義
  # 動作:何もしない
  def ipc_idle( sock, param )
    reply( sock, 200, "OK" )
  end
 
  # IPCコマンド hello を定義
  # 動作:文字列 hello を返す。
  def ipc_hello( sock, param )
    reply( sock, 200, "OK", reply: "hello" )
  end
 
  # IPCコマンド upper を定義
  # 動作:送られたパラメータの値を大文字にして返す
  def ipc_upper( sock, param )
    param.keys.each { |k| param[k].upcase! }
    reply( sock, 200, "OK", param )
  end
 
end
 
server = IpcServer.new( "ipc_server" )
server.daemon()
ipc_client.rb
# IPC クライアント
require "al_worker_ipc"
 
AlWorker::Ipc.open( "/tmp/ipc_server" ) { |ipc|
  puts "IPC command: idle. ret: " + ipc.call( "idle" ).to_s
  puts "IPC command: hello. ret: " + ipc.call( "hello" ).to_s
  puts "IPC command: upper with param {'A'=>'dog','B'=>'Cat'} ret: " + ipc.call( "upper", {'A'=>'dog','B'=>'Cat'} ).to_s
}

実行結果

IPC command: idle. ret: {}
IPC command: hello. ret: {"reply"=>"hello"}
IPC command: upper with param {"A"=>"dog", "B"=>"Cat"} ret: {"A"=>"DOG", "B"=>"CAT"}

解説

サーバーは、AlWorkerを継承したクラスに "ipc_" をプレフィックスとして付与したメソッドを定義すると、フレームワークにIPCコマンドのイベントハンドラとして認識されます。
クライアントは、AlWorker::IPC.openメソッドでサーバーに接続し、call メソッドで IPCコールを行います。 サーバー/クライアント間のパラメータ送受は、Hashを使います。
各ハンドラでは、trueを返すとクライアントとのセッションを継続し、falseだとサーバー側から切断します。上記例ではreply()メソッドが必ずtrueを返すので、セッションが継続します。

IPCソケット

接続用ソケットは、デフォルトで /tmp以下に AlWorkerに付けた名前で作成されます。 変更するには、イニシャライザでフルパスを指定します。
ソケットファイルのパーミッションは、システムのデフォルトとなります。変更するには、chmodアトリビュートを設定することで変更できます。

    @ipc = Ipc.new( "/PATH/TO/socketfile" )
    @ipc.chmod = 0666
    @ipc.run( self )

同期/非同期

イベントハンドラはデフォルトで同期的に呼び出され、たとえば2つのクライアントから同時にコマンドを受信しても、一つずつ実行します。
非同期にしたい場合は、"ipc_a_" をプレフィックスとして付けます。

  def ipc_a_idle( sock, param )
    reply( sock, 200, "OK" )
  end

IPCのエラー

httpステータスコードに倣った、IPCステータスコードを採用しています。
IPCコマンドメソッドでエラーを検出し、それをクライアントへ返したい場合、reply()メソッドの引数で指示します。
ステータスコード(数字)と、ステータス文字列は、httpステータスコードに近い状態がある場合はそれを使うように努力してください。
クライアント側では、ipcオブジェクトのstatus_codeアトリビュートを使って参照します。

  ipc.call( "idle" )
  puts ipc.status_code
 
  ipc.call( "noexist" )
  puts ipc.status_code

実行結果

200 OK
501 Error Command not implemented.

標準で用意してあるIPCコマンド

クライアント(IpcClient)に用意してあるメソッド

メソッド動作返り値
call任意のIPCコマンドを呼び出すHash
call_json任意のIPCコマンドを呼び出すJsonString
set_value@valuesのセッター
get_value@valuesのゲッターObject or Hash
get_value_wt@valuesのゲッター タイムアウト付きObject or Hash

サーバー側

IPCコマンド動作同期タイムアウト備考
quit終了async
get_values値取得 全データasyncしない
get_values "key1"値取得 指定データasyncしない標準プロトコルではないが、利便性のために用意してある
get_values {"key":["key1"…]}値取得 指定データasyncしない
get_values_wt値取得 全データasync1s
get_values_wt "key1"値取得 指定データasync1s標準プロトコルではないが、利便性のために用意してある
get_values_wt {"key":["key1"…],"timeout":5}値取得 指定データasync指定
set_values値設定asyncしない

IPCプロトコル詳細

IPCは、UNIX Socket + テキストベースで設計し、データ形式はJSONで統一してあります。これによって、対Rubyだけでなく、C言語など様々な言語からAlWorkerのIPCをコールすることができます。

リクエスト

command { parameter, encoded by json }

リプライ(ステータスのみの場合)

200 OK [any message if given.]

リプライ(データを含む場合)

200. OK [any message if given.]
{ data, encoded by json }
(LF)

リクエストは、コマンドに続いて、JSONエンコードしたパラメータを渡すことができます。
リプライは、httpプロトコルに倣ったステータス行を返し、必要に応じてJSONエンコードしたデータを返す事ができます。
データの有無は、ステータスコードの数字直後のピリオドの有無で判断します。

IPCパススルー

IPCプロトコルに準拠しない、自由なテキストデータの送受信も可能です。これを、IPCパススルーと呼んでいます。

コマンドごとに、以下のように実装します。

  def ipc_passthrough1( sock, param )
    # sock.gets / sock.puts を使って送受信する。
    while txt = sock.gets
      sock.puts txt
      break if txt == "END\n"
    end
 
    # コネクションを切る
    return false
  end

解説

クライアントは、IPCソケットに接続後、"passthrough1" コマンドを発行します。
フレームワークによって上記メソッドが呼び出され、以後 sock を使って自由フォーマットで送受信できます。
終了時は、falseを返すと、サーバー側からコネクションを切断します。trueを返すと、切断はされず、再びコマンド受付に戻ります。

alworker/ipc_プロセス間通信.1404182470.txt.gz · 最終更新: 2014/07/01 11:41 by 127.0.0.1