-
Notifications
You must be signed in to change notification settings - Fork 1.1k
BulkProcessor
Warning: The BulkProcessor is still considered experimental. The API might change considerably. Feedback welcome.
A Bulk Processor is a service that can be started to receive bulk requests and commit them to Elasticsearch in the background. It is similar to the BulkProcessor in the Java Client API but has some conceptual differences.
The Bulk Processor is created by a service, just like any other service in Elastic.
client, err := elastic.NewClient()
if err != nil { ... }
// Setup a bulk processor
service := client.BulkProcessor().Name("MyBackgroundWorker-1")
The service is responsible for setting up and starting the bulk processor. In the example above, we set just one property: The name of the bulk processor to be created. There are some other parameters you can use, e.g. the number of workers or tresholds that describe when a list of bulk requests will be committed (see below for a complete list). The service is basically a builder pattern.
To finally have a started bulk processor you can send bulk requests to, use the Do
method of the service. With other services, the Do
methods executes a request. With the Bulk Processor service, it starts a Bulk Processor. Here's a typical example:
client, err := elastic.NewClient()
if err != nil { ... }
// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do()
if err != nil { ... }
Notice that the Do
method actually spins up some goroutines. If you want to safely clean up, you need to call Close
on the bulk processor. The bulk processor implements the io.Closer
interface, so you can eventually wrap it with other resources that need cleanup when your application stops. Here's an example of starting and stopping:
client, err := elastic.NewClient()
if err != nil { ... }
// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do()
if err != nil { ... }
// ... Do some work here
// Stop the bulk processor and do some cleanup
err := p.Close()
if err != nil { ... }
You can add bulk requests to the bulk processor by using its Add
function. Add accepts a BulkableRequest
, so it's either BulkIndexRequest
(for indexing), BulkUpdateRequest
(for updating), or BulkDeleteRequest
(for deleting).
// Say we want to index a tweet
t := Tweet{User: "telexico", Message: "Elasticsearch is great."}
// Create a bulk index request
// NOTICE: It is important to set the index and type here!
r := elastic.NewBulkIndexRequest().Index("twitter").Type("tweet").Id("12345")
// Add the request r to the processor p
p.Add(r)
Notice how we set the index and type on the request level. We need to do this with every request sent to a bulk processor; otherwise it won't be able to tell Elasticsearch how to index the document.
When you add a new request via Add
, the request is not automatically committed to Elasticsearch. The whole idea of a bulk processor is to gather requests and finally send them to Elasticsearch in a batch.
Now, when does bulk processor send these batches? There are 3 parameters that you can control:
- When the batch exceeds a certain number.
- When the batch exceeds a certain size (in bytes).
- When the batch exceeds a certain timeout.
To specify the threshold for "number of requests", use the BulkActions(int)
function on the bulk processor service.
To specify the threshold for the "size of the requests", use the BulkSize(int)
function.
To specify an automatic flush, use the FlushInterval(time.Duration)
function.
You can combine all of the three options:
client, err := elastic.NewClient()
if err != nil { ... }
// Setup a bulk processor
p, err := client.BulkProcessor().
Name("MyBackgroundWorker-1").
Workers(2).
BulkActions(1000). // commit if # requests >= 1000
BulkSize(2 << 20). // commit if size of requests >= 2 MB
FlushInterval(30*time.Second). // commit every 30s
Do()
if err != nil { ... }
Automatically committing bulk requests based on a policy is all fine, especially for long-running background tasks. However, sometimes you need to write e.g. a migration process that needs to commit all requests before the program exits. While the Close
call on the bulk processor ensures that, there's a second method of ensuring all data is sent to Elasticsearch: Flush
.
Here's an example that manually asks all workers on the bulk processor to flush its data to Elasticsearch:
client, err := elastic.NewClient()
if err != nil { ... }
// Setup a bulk processor
p, err := client.BulkProcessor().
Name("MyBackgroundWorker-1").
Workers(2).
Do()
if err != nil { ... }
// ... Do some work here ...
// Ask workers to commit all requests
err = p.Flush()
if err != nil { ... }
Notice that Flush
is synchronous: It waits until all workers acknowledged that requests have been written.
To get more control about how bulk processor sends requests to Elasticsearch and processes its responses, you can use the Before
and After
callbacks.
The Before callback is a function that gets a sequential execution identifier and the list of bulk requests that will be sent to Elasticsearch. As its name implies, it gets called before sending the requests to Elasticsearch.
The After callback is a function that gets a sequential execution identifier, the list of bulk requests that we've tried to send, the response from Elasticsearch, and an error (which can be nil
). The After callback is called after the requests have been sent to Elasticsearch.
What is the purpose of the Before and After callbacks? Well, first you can collect some statistics on throughput or use it for logging. But more importantly, you can use the callbacks to handle accordingly when things go wrong. For example, if your Elasticsearch cluster goes down, the bulk processor will try to commit all your requests. If they cannot be processed and you do nothing, requests bulk up and--eventually--the system might crash. So a good way to find out that Elasticsearch has a problem is that you set up an After callback and watch the error parameter. If it is not nil, something's gone wrong, and you should probably throttle or even stop passing more requests to the bulk processor.
Starting a background process like a bulk processor is straightforward as long as things go smooth. It gets much harder when errors occur.
As we saw in the last chapter, error handling and throttling can be done with the After callback. This section describes what happens when errors occur and what options you have to build resilient systems.
First of all, bulk processor retries on failure using exponential backoff. This means that when a worker fails to commit a list of requests to Elasticsearch, it will automatically retry. First the worker will choose a small retry interval of say 500ms. The retry interval will increase exponentially with every failed attempt. This is why it's called exponential backoff.
If bulk processor fails many many times, it will eventually give up. This is when the After callback gets triggered. If the After callback passes a non-nil error parameter, you should be warned that there's something going wrong. However, bulk processor has no way of knowing what's the correct way to handle the problem. It needs the caller to decide an appropriate solution. E.g. one application is indexing log files and could, without further ado, stop processing and simply restart a new bulk processor later. Another application has critical data and might decide to stop passing requests to bulk processor until the Elasticsearch cluster is up again. There is no promise from bulk processor other than "we try our best to put your data into Elasticsearch".
If you're the logging, tracing, and statistics person, we got you covered. If you ask bulk processor to collect statistics, you can later retrieve them.
// Create a client
client, err := elastic.NewClient()
if err != nil { ... }
// Setup a bulk processor
p, err := client.BulkProcessor().
Name("MyBackgroundWorker-1").
Stats(true). // enable collecting stats
Do()
if err != nil { ... }
// ... Do some work here ...
// Get a snapshot of stats (always blank if not enabled--see above)
stats = p.Stats()
fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
for i, w := range stats.Workers {
fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
fmt.Printf(" Last response time : %v\n", i, w.LastDuration)
}
Here's the list of options that can be passed into BulkProcessorService
to create a BulkProcessor
:
- Before: A callback that will be invoked before a commit to Elasticsearch.
- After: A callback that will be invoked after a commit to Elasticsearch.
- Name: An (optional) name you can give this bulk processor. This is helpful if you set up different bulk processors in your application and you want to e.g. print statistics.
- Workers: The number of workers that are able to receive bulk requests and eventually commit them to Elasticsearch. The default is 1.
- BulkActions: The number of requests that can be enqueued before the bulk processor decides to commit. The default is 1000.
- BulkSize: The number of bytes that the bulk requests can take up before the bulk processor decides to commit. The default is 5MB.
-
FlushInterval: A
time.Duration
after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled. -
Stats: A flag that indicates whether the bulk processor should collect stats while running. You need to set this to
true
if you want to get a snapshot of stats later. By default, this is disabled.
Here's an example of all settings:
// Setup a bulk processor
p, err := client.BulkProcessor().
Name("MyBackgroundWorker-1").
Before(beforeCallback). // func to call before commits
After(afterCallback). // func to call after commits
Workers(4). // number of workers
BulkActions(1000). // commit if # requests >= 1000
BulkSize(2 << 20). // commit if size of requests >= 2 MB
FlushInterval(30*time.Second). // commit every 30s
Stats(true). // collect stats
Do()
if err != nil { ... }
You are responsible for flushing/closing the bulk processor before your application quits. Otherwise you might lose a batch of documents.
Use e.g. the following pattern to ensure that the bulk processor flushes all documents before the application quits.
p, err := elastic.NewBulkProcessor()...Do()
if err != nil {
return err
}
defer p.Close() // Close flushes documents
The Failed
property of the bulk processor indicates the number of bulk requests returned as "failed" from Elasticsearch. There is nothing special in bulk processor, and this may happen if you use the Bulk API outside of bulk processor. You must handle these cases in your client. Some applications might want to re-add to the bulk processor, some want to trigger an event: That's application-specific.
Short: The application is responsible for handling those cases.
See this gist for a working example of a failure resilient process that uses BulkProcessor under the hood.