Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2 sinks with buffer.when_full drop_newest/block doesn't work as expected. #20764

Closed
nmiculinic opened this issue Jul 1, 2024 · 8 comments
Closed
Labels
domain: buffers Anything related to Vector's memory/disk buffers domain: delivery Anything related to delivering events within Vector such as end-to-end acknowledgements type: bug A code related bug.

Comments

@nmiculinic
Copy link

nmiculinic commented Jul 1, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I have vector unified architecture.

In the aggregator layer I have 2 sinks, A and B. Sink A is the production sink, while B is experimental one. Any errors in B should be ignored, and not block data sending to A. However that's not what happened despite my best configuration efforts (sink B has ack disabled, and data-dropping configured on buffer)

Configuration

expire_metrics_secs: 3600
acknowledgements:
  enabled: true
sources:
  vector:
    type: vector
    address: 0.0.0.0:6000
sinks:
  A:
    type: http
    inputs:
      - vector
    uri: <>
    compression: gzip
    encoding:
      codec: json
  B:
    type: vector
    acknowledgements:
      enabled: false
    inputs:
      - vector
    buffer:
      when_full: "drop_newest"
      max_events: 5000
    batch:
      max_bytes: 4000111  # < default gRPC message limit
      max_events: 100
    address: <>
    compression: true
    request:
      concurrency: "adaptive"
      adaptive_concurrency:
        initial_concurrency: 5

There's some transforms in the middle, but they're not really relevant here. Upon sink B failing/being unreachable on the network layer I see increase in discarded events for vector source ( vector_component_discarded_events_total{component_id="vector", component_kind="source"} metric ).

I would expect any failure on sink B would be ignored, and data would flow successfully to sink A. However it seems to completely stop any data sending to sink A. The processes have sufficient CPU/memory resources and aren't being throttled.

Version

0.38.0

Debug Output

No response

Example Data

No response

Additional Context

This is running in k8s, with vector-agent being node level agent, shipping data to envoy LB, shipping to vector-aggregator layer. I also see elevated 504 returned by vector-aggregator on the envoy LB layer envoy_cluster_upstream_rq metric

References

No response

@nmiculinic nmiculinic added the type: bug A code related bug. label Jul 1, 2024
@jszwedko
Copy link
Member

jszwedko commented Jul 3, 2024

Thanks for the report @nmiculinic . That definitely sounds wrong since you have when_full: "drop_newest" and acknowledgements.enabled: false on sink B. It should be shedding load such that A would be unaffected. I wonder if the acknowledgement feature isn't playing nicely with the load shedding of the buffer 🤔

@jszwedko jszwedko added domain: buffers Anything related to Vector's memory/disk buffers domain: delivery Anything related to delivering events within Vector such as end-to-end acknowledgements domain: config Anything related to configuring Vector and removed domain: config Anything related to configuring Vector labels Jul 3, 2024
@nmiculinic
Copy link
Author

@jszwedko btw I can confirm this is still an issue in 0.39.0 version as well

@nmiculinic
Copy link
Author

@jszwedko https://github.com/nmiculinic/vector/blob/6eecda55020214364fda844cf8ed16a9b6cc2a5c/lib/vector-core/src/config/mod.rs#L352

does this mean if acknowledgements.enabled are set to true on global level it will override any per-sink configuration?

I'm not sure am I following code, but in my tests once I disable global ack, and only configure ack on sink A things work as expected per my testing. Please let me know if I misunderstood something

@jszwedko
Copy link
Member

@jszwedko https://github.com/nmiculinic/vector/blob/6eecda55020214364fda844cf8ed16a9b6cc2a5c/lib/vector-core/src/config/mod.rs#L352

does this mean if acknowledgements.enabled are set to true on global level it will override any per-sink configuration?

I'm not sure am I following code, but in my tests once I disable global ack, and only configure ack on sink A things work as expected per my testing. Please let me know if I misunderstood something

That does seem suspicious, but note that here the or is combining Option types rather than booleans. Here is where the merge happens:

vector/src/config/mod.rs

Lines 164 to 169 in ef4f175

.filter(|(_, sink)| {
sink.inner
.acknowledgements()
.merge_default(&self.global.acknowledgements)
.enabled()
})

That should use the sink level config, if it is set, otherwise use the global config. It certainly seems like something to verify is working correctly though.

@nmiculinic
Copy link
Author

nmiculinic commented Jul 18, 2024

@jszwedko actually this was a red herring.

I've tried explicitly disabling acks everywhere, and things kinda worked? But still I was suspicious something is amiss here... (plus overall less throughput in service for some reason; like 30% less logs passing through despite the enough CPU/memory capacity)

so for the sink B; the failing sink I've added following:

    request:
      concurrency: "adaptive"
      adaptive_concurrency:
        initial_concurrency: 5
      retry_attempts: 3  # default unlimited
      timeout_secs: 2   # default 60s

and this broke the whole thing. So it was acks disabled everywhere, one sink (B) failing, but had buffer with policy:

    buffer:
      when_full: "drop_newest"
      max_events: 5000

and I'd expect its failure not to impact main sink A, or the whole pipeline...but it did.

this is what I see in logs for vector-aggregator pods:

"vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:31:27.484963Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Request timed out. If this happens often while the events are actually reaching their destination, try decreasing `batch.max_bytes` and/or using `compression` if applicable. Alternatively `request.timeout_secs` can be increased. internal_log │
│ 2024-07-18T14:31:27.485426Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Internal log [Retrying after error.] has been suppressed 3 times.                                                                                                                                                                                 │
│ 2024-07-18T14:31:27.485441Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unknown, message: "operation was canceled: request has been canceled", details: [], metadata: MetadataMap { headers: {} } internal_log_rate_limit=true                                        │
│ 2024-07-18T14:31:27.485663Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Internal log [Retrying after error.] is being suppressed to avoid flooding.                                                                                                                                                                       │
│ 2024-07-18T14:31:36.642543Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 16 times.                                                                                     │
│ 2024-07-18T14:31:36.642569Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=4 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:31:40.036284Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:31:47.850450Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 6 times.                                                                                      │
│ 2024-07-18T14:31:47.850474Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=9 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:31:47.995622Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:31:58.082077Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 10 times.                                                                                     │
│ 2024-07-18T14:31:58.082101Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=2 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:31:59.013847Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:32:09.271378Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 12 times.                                                                                     │
│ 2024-07-18T14:32:09.271405Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=7 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:32:11.986565Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:32:19.501423Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector"

thus I'm confused what's happening. I'd expected vector-aggregator to start dropping data for sink B, and keep chugging along; and not apply backpressure to source/sink A.

@jszwedko
Copy link
Member

Thanks for the additional detail @nmiculinic . It seems possible that there is a bug here. I'm wondering if sink B is buffering the data in such a way that it causes the acks to remain open until it has processed the data (by dropping it). Let me ask a teammate more familiar with this area.

@jszwedko
Copy link
Member

@nmiculinic one way to validate my hypothesis would be to check if the buffers for sink B is actually filling up completely. Could you check the vector_buffer_events_total metric for that sink?

@jszwedko
Copy link
Member

I got some more info that clarified my understanding of how sink acknowledgements work. The current implementation has sources wait for all configured sinks even if only a subset have acknowledgements enabled. We intended to change this behavior to only wait for sinks that actually have acks enabled, but apparently never got to it yet: #7369

I think what you are experiencing is the lack of #7369 being implemented so that only sinks with acks enabled actually cause the source to wait. I'll close this one in-lieu of that one, but please feel free to leave any additional thoughts over there.

@jszwedko jszwedko closed this as not planned Won't fix, can't repro, duplicate, stale Jul 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: buffers Anything related to Vector's memory/disk buffers domain: delivery Anything related to delivering events within Vector such as end-to-end acknowledgements type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

2 participants