Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched PUT requests #205

Open
wants to merge 7 commits into
base: dev/1.0.0
Choose a base branch
from

Conversation

robagar
Copy link

@robagar robagar commented Sep 3, 2024

This adds the ability to send multiple data points in a single PUT to InfluxDB, giving a huge performance gain for high frequency data.

Volume configuration:

  • put_batch_size - Maximum number of data points per PUT request
  • put_batch_timeout_ms - Milliseconds before sending a batch if not full

If put_batch_size is not set it reverts to the original behaviour of sending each data point in its own PUT request.

@gabrik
Copy link
Contributor

gabrik commented Sep 3, 2024

Hi @robagar, thank you for your contribution.

In order to accept it we need you to sing the Eclipse Contribution Agreement.

@robagar
Copy link
Author

robagar commented Sep 3, 2024

ECA signed

@robagar robagar marked this pull request as ready for review September 3, 2024 10:59
@Mallets
Copy link
Member

Mallets commented Sep 4, 2024

@Charles-Schleich could you review the PR?
@robagar I see you are only targeting v1, before merging this kind of changes it would be good to ensure that it is available also for v2.

@robagar
Copy link
Author

robagar commented Sep 4, 2024

We don't use InfluxDB v2, and have no intention to. It looks like a dead end, with Flux being abandoned for v3.

@Charles-Schleich
Copy link
Member

Hi @robagar, thank you for the contribution !
A couple of things to note:
We strive to keep the different versions of the influxdb plugin aligned in terms of features and behavior, so to consider merging PR's we would need the same batching feature in version 2 as well.
The configuration options would also need to be added to the README for both versions so the users know they are available.
PR are generally acceptable if they cover config, documentation, tests, and ideally some performance tests/setup that we can reproduce our side.

@robagar
Copy link
Author

robagar commented Sep 5, 2024

Skimming the code it looks fairly simple to add the batching to v2 - the client write method takes a stream as input - but frankly that would be better done by someone who is going to use & test it properly.

v1/src/lib.rs Outdated

let client_clone = client.clone();
let name_clone = config.name.clone();
TOKIO_RUNTIME.spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugins can be linked statically or dynamically to an instance of zenohd.
In the case that we link statically, we want to reuse the Tokio runtime of zenohd and not use the TOKIO_RUNTIME stored in the lazy static:
This can be achieved by a check on the tokio handle, and running it on the correct executor.

let batch_future = async move{...};
match tokio::runtime::Handle::try_current() {
    Ok(handle) => handle.spawn(batch_future),
    Err(_) => TOKIO_RUNTIME.spawn(batch_future),
};

Comment on lines +83 to +84
pub const PROP_STORAGE_PUT_BATCH_SIZE: &str = "put_batch_size";
pub const PROP_STORAGE_PUT_BATCH_TIMEOUT_MS: &str = "put_batch_timeout_ms";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These config options must be documented in the README (there is a pending PR for an example Config file)

Comment on lines +606 to +607
// TODO - add pending status
Ok(StorageInsertionResult::Inserted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure about returning aStorageInsertionResult::Inserted as the insertion could fail inside the batch, but it has been reported to zenoh as inserted. This will likely have implications inside replication and storage alignment.

The suggestion to add a Pending status could work, however then we will also have to add a Completed status that will have to be signaled to zenoh that all of those Puts have been successful as a group.
This may require an additional ID per Put when batching, and upon successful write, the ID's of the batch would have to be signaled back to zenoh.

@JEnoch, What do we think about adding a Pending and Complete system ?
it adds a little bit of complication, and i'd like to make sure we don't break alignment due to batching.

v1/src/lib.rs Outdated
Comment on lines 410 to 413
Some(tx)
} else {
None
};
Copy link
Member

@Charles-Schleich Charles-Schleich Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion / nit: let mut put_batch_tx = None; above 322 and just over write with a Some() value on 410.

@robagar
Copy link
Author

robagar commented Sep 26, 2024

HIya.. any chance of this making it into the 1.0.0 release?

@Mallets
Copy link
Member

Mallets commented Oct 17, 2024

After some discussion, the proper support of batching would require some work on the backend API.
There are internal mechanisms in Zenoh, like storage alignment, that requires having the knowledge on wether the put of a message succeeded or not. With the current backend API, it's not possible to bubble up this information when batching is in place. This would lead to an incorrect behaviour when replication is activated.

So I'm afraid some more work is required in Zenoh to properly support this use case.

@robagar
Copy link
Author

robagar commented Oct 17, 2024

OK, but please be aware that as it stands writing to the InfluxDB backend is just too slow - it falls behind here even with ~100Hz updates, which is hardly excessive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants