-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sources): initial Pulsar source implementation #15180
Conversation
✅ Deploy Preview for vrl-playground canceled.
|
❌ Deploy Preview for vector-project failed.
|
- WORK IN PROGRESS! - add initial Pulsar source config - add initial Pulsar consumer implementation Tested: - Tested locally simple scenario with receiving a message from a topic - it works!
321a78a
to
c36e512
Compare
Regression Test Results
Baseline: 616958b Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
Regression Test Results
Baseline: a1b2590 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
Soak Test ResultsBaseline: 616958b ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
Soak Test ResultsBaseline: a1b2590 ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. Changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%:
Fine details of change detection per experiment.
|
- implement message acknowledgment - implement proper shutdown - make consumer_name and subscription_name optional in config Tested: - Local run
Well, I have tested it locally (tip: Apache Pulsar works on M1 with Docker). Now I would say it works fine. So the most interesting question here now is - is it the right place to acknowledge a received message according to the Vector policy? I do not know, honestly - need some advice from Vector dev team. Still, it could be easily tweaked. Also, I need to review the existing possibilities of |
- move Pulsar events to the dedicated file Tested: - Local build
- enable compression algorithms in pulsar-rs crate - now pulsar source is able to decompress payloads - add batch size configuration Tested: - Local build
Regression Test Results
Baseline: a1b2590 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
Soak Test ResultsBaseline: a1b2590 ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
Regression Test Results
Baseline: a1b2590 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
- add priority_level support - add dead letter queue policy support - add auth support Tested: - Local build
Regression Test Results
Baseline: a1b2590 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
- add pulsar source test (similar to pulsar sink test) Tested: - Local run
- add initial version of Pulsar source documentation. I am almost sure it's incomplete
Soak Test ResultsBaseline: a1b2590 ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
Regression Test Results
Baseline: a1b2590 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
Regression Test Results
Baseline: a1b2590 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
Soak Test ResultsBaseline: a1b2590 ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
In order to handle acknowedgements, you would need to optionally create an notifier and receiver with
I think it would be adequate to build up the minimum functionality first and add features in subsequent PRs, so long as the implementation is correct. Refactoring would be welcome, yes. We have frequently included the deduplication in the PR where the second usage is added, so that would fit here, but it isn't strictly necessary. |
- update Cargo.lock after merge
Another issue is with Cue stuff - CI complains about extra whitespace but I cannot find what is wrong with the current formatting. Local |
A note for future work: do not forget to add Pulsar statistics as metrics (similar to Kafka implementation in Vector, I suppose). It should be doable since |
Soak Test ResultsBaseline: ac28e1a ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
Soak Test ResultsBaseline: ac28e1a ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
Soak Test ResultsBaseline: ac28e1a ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core. The table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. An experiment is erratic if its coefficient of variation is greater than 0.3. The abbreviated table will be omitted if no interesting changes are observed. No interesting changes in throughput with confidence ≥ 90.00% and absolute Δ mean >= ±8.87%: Fine details of change detection per experiment.
|
Can the consumer be cloned for use in both places, to avoid the |
Regression Test Results
Run ID: aeffe53b-ac52-480b-98bf-06d147eff111 Explanation
A regression test is an integrated performance test for
The table below, if present, lists those experiments that have experienced a
statistically significant change in their No interesting changes in Fine details of change detection per experiment.
|
6453c3f
to
e1818bb
Compare
The PR is blocked by streamnative/pulsar-rs#245 |
/// Authentication configuration. | ||
#[configurable_component] | ||
#[derive(Clone, Debug)] | ||
struct AuthConfig { | ||
/// Basic authentication name/username. | ||
/// | ||
/// This can be used either for basic authentication (username/password) or JWT authentication. | ||
/// When used for JWT, the value should be `token`. | ||
name: Option<String>, | ||
|
||
/// Basic authentication password/token. | ||
/// | ||
/// This can be used either for basic authentication (username/password) or JWT authentication. | ||
/// When used for JWT, the value should be the signed JWT, in the compact representation. | ||
token: Option<SensitiveString>, | ||
|
||
#[configurable(derived)] | ||
oauth2: Option<OAuth2Config>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is shared with the source, it would be good to move bits like this into a common shared module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reviewing the sources, I think you may be able to avoid making a mutex-locked ARC copy of the consumer by just passing a reference into all the functions that need it. If I read right, you await
the results of each call in the main loop, so there is never more than one outstanding mutable reference and the compiler should be able to work that out. There are a couple other changes needed below, and some issues identified by make check
and make check-clippy
impl std::fmt::Debug for FinalizerEntry { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("FinalizerEntry") | ||
.field("message", &self.message.payload) | ||
.finish() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this struct formatted anywhere that requires this implementation?
async fn create_consumer( | ||
config: &PulsarSourceConfig, | ||
) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this into impl PulsarSourceConfig
so config
can be &self
.
if let Some(auth) = &config.auth { | ||
builder = match ( | ||
auth.name.as_ref(), | ||
auth.token.as_ref(), | ||
auth.oauth2.as_ref(), | ||
) { | ||
(Some(name), Some(token), None) => builder.with_auth(Authentication { | ||
name: name.clone(), | ||
data: token.inner().as_bytes().to_vec(), | ||
}), | ||
(None, None, Some(oauth2)) => { | ||
builder.with_auth_provider(OAuth2Authentication::client_credentials(OAuth2Params { | ||
issuer_url: oauth2.issuer_url.clone(), | ||
credentials_url: oauth2.credentials_url.clone(), | ||
audience: oauth2.audience.clone(), | ||
scope: oauth2.scope.clone(), | ||
})) | ||
} | ||
_ => return Err(Box::new(pulsar::error::Error::Authentication( | ||
AuthenticationError::Custom( | ||
"Invalid auth config: can only specify name and token or oauth2 configuration" | ||
.to_string(), | ||
), | ||
))), | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe a bunch of this is shared with the sink as well. This particular bit could probably actually be a method on the auth struct.
match finalizer { | ||
Some(finalizer) => { | ||
let (batch, receiver) = BatchNotifier::new_with_receiver(); | ||
let mut stream = stream.map(|event| event.with_batch_notifier(&batch)); | ||
|
||
match out.send_event_stream(&mut stream).await { | ||
Err(error) => { | ||
emit!(StreamClosedError { error, count: 1 }); | ||
} | ||
Ok(_) => { | ||
finalizer.add(FinalizerEntry{ consumer, message }, receiver); | ||
} | ||
} | ||
} | ||
None => match out.send_event_stream(&mut stream).await { | ||
Err(error) => { | ||
emit!(StreamClosedError { error, count: 1 }); | ||
} | ||
Ok(_) => { | ||
if let Err(error) = consumer.lock().await.ack(&message).await { | ||
emit!(PulsarAcknowledgmentError { error }); | ||
} | ||
} | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could have the duplication reduced with map
adapters, not sure if it's simpler:
match finalizer { | |
Some(finalizer) => { | |
let (batch, receiver) = BatchNotifier::new_with_receiver(); | |
let mut stream = stream.map(|event| event.with_batch_notifier(&batch)); | |
match out.send_event_stream(&mut stream).await { | |
Err(error) => { | |
emit!(StreamClosedError { error, count: 1 }); | |
} | |
Ok(_) => { | |
finalizer.add(FinalizerEntry{ consumer, message }, receiver); | |
} | |
} | |
} | |
None => match out.send_event_stream(&mut stream).await { | |
Err(error) => { | |
emit!(StreamClosedError { error, count: 1 }); | |
} | |
Ok(_) => { | |
if let Err(error) = consumer.lock().await.ack(&message).await { | |
emit!(PulsarAcknowledgmentError { error }); | |
} | |
} | |
}, | |
} | |
if let Err(error) = match finalizer { | |
Some(finalizer) => { | |
let (batch, receiver) = BatchNotifier::new_with_receiver(); | |
let mut stream = stream.map(|event| event.with_batch_notifier(&batch)); | |
out.send_event_stream(&mut stream).await.map(|_| | |
finalizer.add(FinalizerEntry{ consumer, message }, receiver) | |
) | |
} | |
None => out.send_event_stream(&mut stream).await.map(|_| { | |
if let Err(error) = consumer.lock().await.ack(&message).await { | |
emit!(PulsarAcknowledgmentError { error }); | |
} | |
}), | |
} { | |
emit!(StreamClosedError { error, count: 1 }); | |
} |
} | ||
} | ||
|
||
async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you pass a reference to the consumer into here instead of storing it in the FinalizerEntry
?
decoding: { | ||
description: "Configuration for building a `Deserializer`." | ||
required: false | ||
type: object: options: codec: { | ||
required: false | ||
type: string: { | ||
default: "bytes" | ||
enum: { | ||
bytes: "Configures the `BytesDeserializer`." | ||
gelf: "Configures the `GelfDeserializer`." | ||
json: "Configures the `JsonDeserializer`." | ||
native: "Configures the `NativeDeserializer`." | ||
native_json: "Configures the `NativeJsonDeserializer`." | ||
syslog: "Configures the `SyslogDeserializer`." | ||
} | ||
} | ||
} | ||
} | ||
framing: { | ||
description: "Configuration for building a `Framer`." | ||
required: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the description of these in other sources.
let decoder = DecodingConfig::new( | ||
self.framing.clone(), | ||
self.decoding.clone(), | ||
LogNamespace::Legacy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll want to make sure log namespacing is handled correctly as per this doc: https://github.com/vectordotdev/vector/blob/master/rfcs/2022-04-20-12187-log-namespacing.md
Here is a tutorial: https://github.com/vectordotdev/vector/blob/master/docs/tutorials/lognamespacing.md
|
||
let events = events.into_iter().map(|mut event| { | ||
if let Event::Log(ref mut log) = event { | ||
log.try_insert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will change with the log namespace changes.
subscription_name: None, | ||
priority_level: None, | ||
batch_size: None, | ||
auth: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to test all of the different auth types?
Is this PR still being actively developed? Can I contribute to getting this into shape in some way? |
@bmcadams-datastax Yes, you definitely can! Right now I am busy with other PRs and other work. Feel free to continue working on this PR. I suggest you fork this branch and start by fixing the review comments from the Vector dev team. Thanks in advance for your contribution! |
@zamazan4ik sounds good, and I'll do just that. We've got some Pulsar experts on our side (https://www.datastax.com/products/astra-streaming) that should be able to assist with any corner cases / questions on Pulsar itself. |
Closing in favor of #18475 |
Tested: