Skip to content

V. Schema Definitions

Chris Heald edited this page Oct 26, 2017 · 6 revisions

Schema Definitions

Kraken transports data over the network in an efficient binary-serialized format called Avro. One consequence of this is that all messages must conform to a pre-defined schema. While this introduces a little overhead up front - we have to tell Kraken about the shape of the data we want to emit - it ensures that all data emitted conforms to expectations, and can catch malformed or missing data before it enters downstream parts of the system.

In this section, we'll learn how to define a schema for a worker to use when emitting messages.

Defining a Schema

Schemas are defined in workers by using the class method schema(String namespace, &block) where namespace is a unique identifier (following Java conventions).

The given block exposes a Domain Specific Language (DSL) which allows for the definition of schema fields with Ruby code. The DSL exposes two methods:

required(Symbol field_name, Symbol type[, Hash options])
optional(Symbol field_name, Symbol type[, Hash options])

required will define a field which must be present for the message to pass validation. optional defines a NULLABLE field.

Avro supports a number of primitive types, as well as Map, Enum, and Array types. We can also define custom types, but we try to avoid doing so where possible.

Valid primitive types are:

  • :null - no value
  • :boolean - a binary value
  • :int - 32-bit signed integer
  • :long - 64-bit signed integer
  • :float - single precision (32-bit) IEEE 754 floating-point number
  • :double - double precision (64-bit) IEEE 754 floating-point number
  • :bytes - sequence of 8-bit unsigned bytes
  • :string - unicode character sequence
  • :timestamp - expects a Ruby date object, which will be represented as a long in transit.

All schemas implicitly get a _timestamp field, and will have its value set to the time that the record is emitted. You do not need to add an additional timestamp field, unless you want to represent another date (such as the publication date of a page or post).

To define a hash or dict of values, you will use the :map type. It requires a values option, which enumerates the valid types for the value of each entry. Keys are always strings.

required(:dict_of_ints, :map, {values: :int})

If a map should accept multiple value types, you can use union to express that:

required(:dict_of_floats_or_ints, :map, {values: union(:int, :float)})

To define a list of values, you will use the :array type. It requires an items option which specifies the types that may be in the list:

required(:array_of_ints, :array, {items: :int})
required(:array_of_ints_or_longs, :array, {items: union(:int, :float)})

You can read about Avro types and the full documentation on the Avro- Builder Ruby DSL for a more in-depth tutorial on how to represent data.

Continuing with our tutorial worker, we can start the definition of a schema as follows:

module Tutorial
  class TutorialWorker < ::BaseWorker

    schema "com.mashable.kraken.tutorial.test" do
      # We'll define our schema with the schema DSL here
    end

    produce({every: 1.minute}) do
    end
 
    def perform
    end
  end
end

If I wanted the fields first_name and last_name to be Strings and required, middle_name to be optional, our schema would be defined below:

schema "com.mashable.kraken.tutorial.test" do
  required :first_name, :string, doc: "The user's first name"
  optional :middle_name, :string, doc: "The user's middle name"
  required :last_name, :string, doc: "The user's last name"
end

The doc option is optional, but can be used to register the schema with human-readable documentation, so that another person consuming this schema could have some context for a given field.

Registering Connectors

We have to tell Kraken what we want done with the data we write into the tutorial.test channel. This is done with the register_connector class method. You want to use predefined s3_json connector, which instructs Kraken to transport data collected from this worker to S3 as JSON.

The method signature is register_connector(String symbol_connector_name, (Array<String>|String) array_of_topics).

module Tutorial
  class TutorialWorker < ::BaseWorker

    register_connector :s3_json, "tutorial.test"

    schema "com.mashable.kraken.tutorial.test" do
      required :first_name, :string
      optional :middle_name, :string
      required :last_name, :string
    end

    produce({every: 1.minute}) do
     # todo
    end

    def perform
      #todo
    end   
  end
end