Skip to content

Latest commit

 

History

History
206 lines (164 loc) · 9.34 KB

File metadata and controls

206 lines (164 loc) · 9.34 KB

Index Stage SDK

Java SDK framework for developing custom Fusion Index Stage plugins.

Basic concepts

Plugin

Plugin is a zip file that contains one or more stages implementation. It consists of one or more jar files (the plugin jar with Stage definition and additional dependencies jar files) and a manifest file that contains metadata used by Fusion to run the plugin.

In provided examples plugin zip file is generated by assemblePlugin gradle task. Alternatively it can be assembled by any other build tool as long as the file and directory structure is correct.

Example of custom META-INF/MANIFEST.MF file:

Manifest-Version: 1.0
Plugin-Id: sample-plugin
Plugin-SDK-Version: 1.1.0
Plugin-Base-Package: com.lucidworks.sample
Plugin-Version: 0.0.1

Index Stage class contains implementation of custom processing logic that is applied to each document passing through the index stage.

Plugin stage class must implement com.lucidworks.indexing.api.IndexStage interface and be annotated with com.lucidworks.indexing.api.Stage annotation. For additional convenience stage implementation can extend com.lucidworks.indexing.api.IndexStageBase class, which already contains initialization logic and some helpful methods.

Example of a custom index stage implementation:

@Stage(type = "simple", configClass = SimpleStageConfig.class)
public class SimpleStage extends IndexStageBase<SimpleStageConfig> {

  private static final Logger logger = LoggerFactory.getLogger(SimpleStage.class);

  @Override
  public Document process(Document document, Context context) {
    String text = StringUtils.trim(config.text());

    document.field(config.newField(), String.class).set(text).type(Types.STRING);

    return document;
  }
}

Lifecycle

First Fusion will create IndexStage instance and initialize it. Once the stage created and initialized, Fusion will start calling process method for each document being processed through the index pipeline. One stage instance can be used by Fusion for processing multiple documents and process method may be called from multiple concurrently running threads, additionally Fusion can initialize and maintain multiple stage instances with the same configuration in separate indexing service nodes. Therefore it is important to make sure that plugin stage implementation is thread-safe and processing logic is stateless.

Stage may throw an exception while processing a document. This will not cause any other side effect except the fact, that current document will not be processed anymore. Whole indexing pipeline will be still in use and fusion will call
process method for other documents. Information about thrown exception will be visible in logs.

Initialization

After creation each stage object will be initialized using the init(T config, Fusion fusion) method. This allows to create some internal stage structures and validate configuration.

Note that initialization occurs immediately after stage configuration is saved in Fusion. After this the stage instance can be maintained and re-used by Fusion for extensive periods of time even if no documents are being processed through the stage. It is strongly advised to be mindful of this fact when making decisions on resource allocation.

Document processing

After the initialization is finished, Fusion will start sending documents for processing to the stage by invoking process(Document document, Context context, Consumer<Document> output) for every incoming document. Since in the majority of cases index stages process and emit single document it is often easier to implement the alternative process(Document document, Context context) method.

  • If your stage is transforming single document (or filtering), you need to implement process(Document document, Context context) method only.
  • If your stage is intending to emit multiple documents for a single input document, you need to implement process(Document document, Context context, Consumer<Document> output) method and send each output document by calling output.accept(doc).

Note that multiple threads can call process method concurrently, therefore its implementation must be thread-safe.

Logging

Index Stage SDK is using slf4j logging API. Following example demonstrates logging inside a plugin stage.

public class MyStage extends IndexStageBase<MyStageConfig> {
  
  private static final Logger logger = LoggerFactory.getLogger(MyStage.class);

  @Override
  public Document process(Document document, Context context) {
    // ......
    logger.info("Processing document with id '{}'", document.getId());
    // ......
  }
}

Metrics

Fusion is using Prometheus and Grafana for enhanced metrics collection and querying. Index Stage plugins can add their own custom metrics to the list of default metrics already generated by Fusion indexing service.

To be able to publish custom metrics prometheus client library must be added to the plugin dependencies

provided 'io.prometheus:simpleclient_dropwizard:0.7.0'

After that, prometheus client can be used to record custom metrics. More information on prometheus Java client API can be found at https://github.com/prometheus/client_java#instrumenting

Following example code demonstrates capturing external request time metrics using prometheus client API.

public class SampleStage {

  private static final Histogram EXTERNAL_REQUEST_TIME = Histogram.build()
      .help("Time to execute external query request.")
      .name("external_query_request_time")
      .labelNames("request_url")
      .register();
  
    @Override
    public Document process(Document document, Context context) {
      // ......
      Histogram.Timer externalQueryTimer = EXTERNAL_REQUEST_TIME
          .labels(requestUrl)
          .startTimer();
      try {
        // perform external request...
      } finally {
        externalQueryTimer.observeDuration();
      }
      // ......
    }
}

Index Stage config defines configuration options specific to particular index stage instance. These options will be available to the end user via Fusion UI and API. The plugin config class must extend com.lucidworks.indexing.config.IndexStageConfig and annotated with @RootSchema.

By adding @Property and type annotations to your stage configuration interface methods, you can define metadata and type constraints for your plugin configuration fields. This is very similar to Fusion's connector configuration schema More detailed information on the configuration and schema capabilities can be found here.

Here is an example of simple stage configuration schema definition:

@RootSchema(
    title = "Simple",
    description = "Simple Index Stage"
)
public interface SimpleStageConfig extends IndexStageConfig {

  @Property(
      title = "Field",
      description = "Name of field to add to document.",
      required = true
  )
  @StringSchema(minLength = 1)
  String newField();

  @Property(
      title = "Text",
      description = "Sample text to put into the field."
  )
  @StringSchema
  String text();
}

Exposed Fusion APIs

SDK based plugins are capable of communication with other Fusion's components via Fusion object. This object is passed to stage on initialization phase.

This API provides access to Fusion REST API. You can find an example of its usage in ExternalQueryStage class.

This is specialized API for interactions with Fusion Blob Store API

This API provides methods for creation of new Document instances. It is useful for those stages that need to create new documents instead of or in addition to modifying the original input document passed to the stage.

Data structures

This is a representation of a Fusion document - base data structure stage is operating with. On a high level this can be thought of as a list of named fields containing one or multiple values.

New documents can be created using the Documents API.

Field

Field is a part of document with its own unique name and one or more values.