プラグイン: 全てのパーティション上でリクエストを処理し、ストレージを操作する新たなコマンドを追加する

チュートリアルのゴール

このチュートリアルでは、各ボリュームでのhadling phaseにおいて分散された処理を実行するプラグインを開発するための方法を学びます。 言い換えると、このチュートリアルでは 新しいコマンドをDroonga Engineに加える方法 を説明します。

前提条件

リクエストのハンドリング

適合フェーズからリクエストが転送されてくると、Droonga Engineは処理フェーズ(processing phase)に入ります。

処理フェーズでは、Droonga Engineはリクエストを「ステップ」ごとに段階的に処理します。 1つの ステップ は、立案フェーズ配布フェーズハンドリング・フェーズ、そして 集約フェーズ という4つのフェーズから成り立っています。

すべてのステップの処理が終了すると、Droonga Engineは結果を後適合フェーズへと転送します。

ハンドリング・フェーズでの操作を定義するクラスは、ハンドラーと呼ばれます。 簡単に言うと、新しいハンドラーを追加するということは、新しいコマンドを追加するということを意味します。

読み取り専用のコマンド countRecords を設計する

このチュートリアルでは、新しい独自のコマンド countRecords を実装することにします。 まず、コマンドの仕様を設計しましょう。

このコマンドは、個々のsingle volumeにおける指定テーブルの全レコードの数を報告します。 これは、クラスタ内でどのようにレコードが分散されているかを調べる助けになるでしょう。 このコマンドはデータベースの内容を何も変更しないので、これは読み取り専用のコマンドと言うことができます。

リクエストは、以下のようにテーブル名を必ず1つ含まなくてはなりません

{
  "dataset" : "Starbucks",
  "type"    : "countRecords",
  "body"    : {
    "table": "Store"
  }
}

上記のような内容のJSON形式のファイル count-records.json を作成します。 以降の検証では、このファイルを使い続けていきましょう。

レスポンスは、各single volumeごとのそのテーブルにあるレコードの数を含んでいなくてはなりません。 これは以下のように、配列として表現できます:

{
  "inReplyTo": "(message id)",
  "statusCode": 200,
  "type": "countRecords.result",
  "body": [10, 10]
}

ボリュームが2つある場合、20個のレコードが均等に保持されているはずなので、配列は上記のように2つの要素を持つことになるでしょう。 この例は、各ボリュームがレコードを10個ずつ保持している事を示しています。

それでは、ここまでで述べたような形式のリクエストを受け付けて上記のようなレスポンスを返す、というプラグインを作っていきましょう。

ディレクトリ構成

プラグインのディレクトリ構成は、適合フェーズ用のプラグインのチュートリアルでの説明と同じ様式に則ります。 count-records.rb というファイルとして、count-records プラグインを作りましょう。ディレクトリツリーは以下のようになります:

lib
└── droonga
    └── plugins
            └── count-records.rb

次に、以下のようにしてプラグインの骨組みを作ります:

lib/droonga/plugins/count-records.rb:

require "droonga/plugin"

module Droonga
  module Plugins
    module CountRecordsPlugin
      extend Plugin
      register("count-records")
    end
  end
end

コマンドのための「ステップ」を定義する

以下のようにして、プラグインの中で新しいコマンド countRecords のための「ステップ」を定義します:

lib/droonga/plugins/count-records.rb:

require "droonga/plugin"

module Droonga
  module Plugins
    module CountRecordsPlugin
      extend Plugin
      register("count-records")

      define_single_step do |step|
        step.name = "countRecords"
      end
    end
  end
end

step.name の値は、コマンド自身の名前と同じです。 今のところは、コマンドの名前を定義しただけです。 それ以上のことはしていません。

ハンドリングの仕方を定義する

このコマンドはハンドラーを持っていないため、まだ何も処理が行われません。 それではコマンドの挙動を定義しましょう。

lib/droonga/plugins/count-records.rb:

require "droonga/plugin"

module Droonga
  module Plugins
    module CountRecordsPlugin
      extend Plugin
      register("count-records")

      define_single_step do |step|
        step.name = "countRecords"
        step.handler = :Handler
      end

      class Handler < Droonga::Handler
        def handle(message)
          [0]
        end
      end
    end
  end
end

Handler というクラスは、新しいコマンドのためのハンドラークラスです。

現時点で、このハンドラーは何も処理を行わず、単に数値1つからなる配列を含む処理結果を返すだけです。 戻り値はレスポンスのbodyを組み立てるのに使われます。

ハンドラーはstep.handler設定でステップに紐付けられます。 ここではHandlerクラスをdefine_single_stepの後で定義しているため、:Handlerというシンボルでハンドラークラスを指定しています。 もしハンドラークラスをdefine_single_stepよりも前で定義していれば、単にstep.handler = Handlerと書くことができます。 更に、"OtherPlugin::Handler"のようなクラスパスの文字列も使用できます。

次に、step.collector設定を使ってコレクターをステップに紐付ける必要があります。

lib/droonga/plugins/count-records.rb:

# (snip)
      define_single_step do |step|
        step.name = "countRecords"
        step.handler = :Handler
        step.collector = Collectors::Sum
      end
# (snip)

Collectors::Sumは組み込みコレクターの一つです。 これは、各ボリュームのハンドラーインスタンスから返って来た結果を結合して一つの結果にします。

catalog.jsonでプラグインを有効化する

Update catalog.json to activate this plugin. Add "count-records" to "plugins".

(snip)
      "datasets": {
        "Starbucks": {
          (snip)
          "plugins": ["count-records", "groonga", "crud", "search", "dump", "status"],
(snip)

実行と動作を確認する

Let’s get Droonga started. Note that you need to specify ./lib directory in RUBYLIB environment variable in order to make ruby possible to find your plugin.

# kill $(cat fluentd.pid)
# RUBYLIB=./lib fluentd --config fluentd.conf --log fluentd.log --daemon fluentd.pid

Then, send a request message for the countRecords command to the Droonga Engine.

# droonga-request --tag starbucks count-records.json
Elapsed time: 0.01494
[
  "droonga.message",
  1392621168,
  {
    "inReplyTo": "1392621168.0119512",
    "statusCode": 200,
    "type": "countRecords.result",
    "body": [
      0,
      0,
      0
    ]
  }
]

You’ll get a response message like above. Look at these points:

There are three elements in the array. Why?

As the result, just one array with three elements appears in the final response.

Read-only access to the storage

Now, each instance of the handler class always returns 0 as its result. Let’s implement codes to count up the number of records from the actual storage.

lib/droonga/plugins/count-records.rb:

# (snip)
      class Handler < Droonga::Handler
        def handle(message)
          request = message.request
          table_name = request["table"]
          table = @context[table_name]
          count = table.size
          [count]
        end
      end
# (snip)

Look at the argument of the handle method. It is different from the one an adapter receives. A handler receives a message meaning a distributed task. So you have to extract the request message from the distributed task by the code request = message.request.

The instance variable @context is an instance of Groonga::Context for the storage of the corresponding single volume. See the class reference of Rroonga. You can use any feature of Rroonga via @context. For now, we simply access to the table itself by its name and read the value of its size method - it returns the number of records.

Then, test it. Restart the Droonga Engine and send the request again.

# kill $(cat fluentd.pid)
# RUBYLIB=./lib fluentd --config fluentd.conf --log fluentd.log --daemon fluentd.pid
# droonga-request --tag starbucks count-records.json
Elapsed time: 0.01494
[
  "droonga.message",
  1392621168,
  {
    "inReplyTo": "1392621168.0119512",
    "statusCode": 200,
    "type": "countRecords.result",
    "body": [
      14,
      15,
      11
    ]
  }
]

Because there are totally 40 records, they are stored evenly like above.

Design a read-write command deleteStores

Next, let’s add another new custom command deleteStores.

The command deletes records of the Store table, from the storage. Because it modifies something in existing storage, it is a read-write command.

The request must have the condition to select records to be deleted, like:

{
  "dataset" : "Starbucks",
  "type"    : "deleteStores",
  "body"    : {
    "keyword": "Broadway"
  }
}

Any record including the given keyword "Broadway" in its "key" is deleted from the storage of all volumes.

Create a JSON file delete-stores-broadway.json with the content above. We’ll use it for testing.

The response must have a boolean value to indicate “success” or “fail”, like:

{
  "inReplyTo": "(message id)",
  "statusCode": 200,
  "type": "deleteStores.result",
  "body": true
}

If the request is successfully processed, the body becomes true. Otherwise false. The body is just one boolean value, because we don’t have to receive multiple results from volumes.

ディレクトリの構造

Now let’s create the delete-stores plugin, as the file delete-stores.rb. The directory tree will be:

lib
└── droonga
    └── plugins
            └── delete-stores.rb

次に、以下のようにしてプラグインの骨組みを作ります:

lib/droonga/plugins/delete-stores.rb:

require "droonga/plugin"

module Droonga
  module Plugins
    module DeleteStoresPlugin
      extend Plugin
      register("delete-stores")
    end
  end
end

コマンドのための「ステップ」を定義する

Define a “step” for the new deleteStores command, in your plugin. Like:

lib/droonga/plugins/delete-stores.rb:

require "droonga/plugin"

module Droonga
  module Plugins
    module DeleteStoresPlugin
      extend Plugin
      register("delete-stores")

      define_single_step do |step|
        step.name = "deleteStores"
        step.write = true
      end
    end
  end
end

Look at a new configuration step.write. Because this command modifies the storage, we must indicate it clearly.

ハンドリングの仕方を定義する

Let’s define the handler.

lib/droonga/plugins/delete-stores.rb:

require "droonga/plugin"

module Droonga
  module Plugins
    module DeleteStoresPlugin
      extend Plugin
      register("delete-stores")

      define_single_step do |step|
        step.name = "deleteStores"
        step.write = true
        step.handler = :Handler
        step.collector = Collectors::And
      end

      class Handler < Droonga::Handler
        def handle(message)
          request = message.request
          keyword = request["keyword"]
          table = @context["Store"]
          table.delete do |record|
            record.key =~ keyword
          end
          true
        end
      end
    end
  end
end

Remember, you have to extract the request message from the received task message.

The handler finds and deletes existing records which have the given keyword in its “key”, by the API of Rroonga.

And, the Collectors::And is bound to the step by the configuration step.collector. It is is also one of built-in collectors, and merges boolean values returned from handler instances for each volume, to one boolean value.

catalog.jsonでプラグインを有効化する

Update catalog.json to activate this plugin. Add "delete-stores" to "plugins".

(snip)
      "datasets": {
        "Starbucks": {
          (snip)
          "plugins": ["delete-stores", "count-records", "groonga", "crud", "search", "dump", "status"],
(snip)

実行と動作を確認する

Restart the Droonga Engine and send the request.

# kill $(cat fluentd.pid)
# RUBYLIB=./lib fluentd --config fluentd.conf --log fluentd.log --daemon fluentd.pid
# droonga-request --tag starbucks count-records.json
Elapsed time: 0.01494
[
  "droonga.message",
  1392621168,
  {
    "inReplyTo": "1392621168.0119512",
    "statusCode": 200,
    "type": "deleteStores.result",
    "body": true
  }
]

Because results from volumes are unified to just one boolean value, the response’s body is a true. As the verification, send the request of countRecords command.

# droonga-request --tag starbucks count-records.json
Elapsed time: 0.01494
[
  "droonga.message",
  1392621168,
  {
    "inReplyTo": "1392621168.0119512",
    "statusCode": 200,
    "type": "countRecords.result",
    "body": [
      7,
      13,
      6
    ]
  }
]

Note, the number of records are smaller than the previous result. This means that four or some records are deleted from each volume.

まとめ

We have learned how to add a new simple command working around the data. In the process, we also have learned how to create plugins working in the handling phrase.