Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Improve serialization for TaskResourceInfo #16700

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ansjcy
Copy link
Member

@ansjcy ansjcy commented Nov 21, 2024

Description

Use binary serialization to avoid the JSON parsing overhead when piggybacking task resource usage info from data nodes to coordinator node.

Related Issues

Resolves #16635

Tests

I ran the big5 benchmark tests on a cluster with 3 master nodes (c5.xlarge) and 2 data nodes (r5.4xlarge) and did CPU profiling for term queries like mentioned in #16635. The parsing overhead is less than 1% in my tests.
image

Also validated the functionalities of query insights is not impacted.

curl -X GET "localhost:9200/_insights/top_queries?pretty"
{
  "top_queries" : [
    {
      "timestamp" : 1732222066991,
      "total_shards" : 2,
      "indices" : [
        "my-index-*"
      ],
      "node_id" : "MhvRcvgYSH2-AAThxmjosQ",
      "source" : {
        "size" : 1000
      },
      "task_resource_usages" : [
        {
          "action" : "indices:data/read/search[phase/query]",
          "taskId" : 241,
          "parentTaskId" : 146,
          "nodeId" : "dfYZt3ZRQXadtHY4XPRoMg",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 7437000,
            "memory_in_bytes" : 807984
          }
        },
        {
          "action" : "indices:data/read/search[phase/query]",
          "taskId" : 127,
          "parentTaskId" : 146,
          "nodeId" : "Hek0j1IZQ4qfNsw6ftlbTQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 8863000,
            "memory_in_bytes" : 934232
          }
        },
        {
          "action" : "indices:data/read/search[phase/fetch/id]",
          "taskId" : 242,
          "parentTaskId" : 146,
          "nodeId" : "dfYZt3ZRQXadtHY4XPRoMg",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 5919000,
            "memory_in_bytes" : 852568
          }
        },
        {
          "action" : "indices:data/read/search[phase/fetch/id]",
          "taskId" : 128,
          "parentTaskId" : 146,
          "nodeId" : "Hek0j1IZQ4qfNsw6ftlbTQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 5759000,
            "memory_in_bytes" : 867528
          }
        },
        {
          "action" : "indices:data/read/search",
          "taskId" : 146,
          "parentTaskId" : -1,
          "nodeId" : "MhvRcvgYSH2-AAThxmjosQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 2200000,
            "memory_in_bytes" : 270560
          }
        }
      ],
      "search_type" : "query_then_fetch",
      "phase_latency_map" : {
        "expand" : 0,
        "query" : 50,
        "fetch" : 19
      },
      "labels" : {
        "X-Opaque-Id" : "cyji-id"
      },
      "measurements" : {
        "latency" : {
          "number" : 84,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "cpu" : {
          "number" : 30178000,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "memory" : {
          "number" : 3732872,
          "count" : 1,
          "aggregationType" : "NONE"
        }
      }
    },
    {
      "timestamp" : 1732222067165,
      "total_shards" : 1,
      "indices" : [
        "my-index-0"
      ],
      "node_id" : "MhvRcvgYSH2-AAThxmjosQ",
      "source" : {
        "size" : 20,
        "query" : {
          "bool" : {
            "must" : [
              {
                "match_phrase" : {
                  "message" : {
                    "query" : "document",
                    "slop" : 0,
                    "zero_terms_query" : "NONE",
                    "boost" : 1.0
                  }
                }
              },
              {
                "match" : {
                  "user.id" : {
                    "query" : "cyji",
                    "operator" : "OR",
                    "prefix_length" : 0,
                    "max_expansions" : 50,
                    "fuzzy_transpositions" : true,
                    "lenient" : false,
                    "zero_terms_query" : "NONE",
                    "auto_generate_synonyms_phrase_query" : true,
                    "boost" : 1.0
                  }
                }
              }
            ],
            "adjust_pure_negative" : true,
            "boost" : 1.0
          }
        }
      },
      "task_resource_usages" : [
        {
          "action" : "indices:data/read/search[phase/query]",
          "taskId" : 252,
          "parentTaskId" : 149,
          "nodeId" : "dfYZt3ZRQXadtHY4XPRoMg",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 4085000,
            "memory_in_bytes" : 483992
          }
        },
        {
          "action" : "indices:data/read/search",
          "taskId" : 149,
          "parentTaskId" : -1,
          "nodeId" : "MhvRcvgYSH2-AAThxmjosQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 656000,
            "memory_in_bytes" : 70880
          }
        }
      ],
      "search_type" : "query_then_fetch",
      "phase_latency_map" : {
        "expand" : 0,
        "query" : 16,
        "fetch" : 0
      },
      "labels" : { },
      "measurements" : {
        "latency" : {
          "number" : 17,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "cpu" : {
          "number" : 4741000,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "memory" : {
          "number" : 554872,
          "count" : 1,
          "aggregationType" : "NONE"
        }
      }
    },
    {
      "timestamp" : 1732222067129,
      "total_shards" : 1,
      "indices" : [
        "my-index-0"
      ],
      "node_id" : "MhvRcvgYSH2-AAThxmjosQ",
      "source" : {
        "size" : 20,
        "query" : {
          "term" : {
            "user.id" : {
              "value" : "cyji",
              "boost" : 1.0
            }
          }
        }
      },
      "task_resource_usages" : [
        {
          "action" : "indices:data/read/search[phase/query]",
          "taskId" : 250,
          "parentTaskId" : 148,
          "nodeId" : "dfYZt3ZRQXadtHY4XPRoMg",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 3152000,
            "memory_in_bytes" : 278544
          }
        },
        {
          "action" : "indices:data/read/search",
          "taskId" : 148,
          "parentTaskId" : -1,
          "nodeId" : "MhvRcvgYSH2-AAThxmjosQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 599000,
            "memory_in_bytes" : 61736
          }
        }
      ],
      "search_type" : "query_then_fetch",
      "phase_latency_map" : {
        "expand" : 0,
        "query" : 15,
        "fetch" : 0
      },
      "labels" : { },
      "measurements" : {
        "latency" : {
          "number" : 17,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "cpu" : {
          "number" : 3751000,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "memory" : {
          "number" : 340280,
          "count" : 1,
          "aggregationType" : "NONE"
        }
      }
    },
    {
      "timestamp" : 1732222067088,
      "indices" : [
        "my-index-*"
      ],
      "total_shards" : 2,
      "node_id" : "Hek0j1IZQ4qfNsw6ftlbTQ",
      "source" : {
        "size" : 1000
      },
      "task_resource_usages" : [
        {
          "action" : "indices:data/read/search[phase/query]",
          "taskId" : 134,
          "parentTaskId" : 133,
          "nodeId" : "Hek0j1IZQ4qfNsw6ftlbTQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 304000,
            "memory_in_bytes" : 8400
          }
        },
        {
          "action" : "indices:data/read/search[phase/query]",
          "taskId" : 248,
          "parentTaskId" : 133,
          "nodeId" : "dfYZt3ZRQXadtHY4XPRoMg",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 309000,
            "memory_in_bytes" : 8400
          }
        },
        {
          "action" : "indices:data/read/search[phase/fetch/id]",
          "taskId" : 136,
          "parentTaskId" : 133,
          "nodeId" : "Hek0j1IZQ4qfNsw6ftlbTQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 354000,
            "memory_in_bytes" : 21088
          }
        },
        {
          "action" : "indices:data/read/search[phase/fetch/id]",
          "taskId" : 249,
          "parentTaskId" : 133,
          "nodeId" : "dfYZt3ZRQXadtHY4XPRoMg",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 359000,
            "memory_in_bytes" : 21088
          }
        },
        {
          "action" : "indices:data/read/search",
          "taskId" : 133,
          "parentTaskId" : -1,
          "nodeId" : "Hek0j1IZQ4qfNsw6ftlbTQ",
          "taskResourceUsage" : {
            "cpu_time_in_nanos" : 1484000,
            "memory_in_bytes" : 193568
          }
        }
      ],
      "search_type" : "query_then_fetch",
      "phase_latency_map" : {
        "expand" : 0,
        "query" : 3,
        "fetch" : 3
      },
      "labels" : { },
      "measurements" : {
        "latency" : {
          "number" : 15,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "cpu" : {
          "number" : 2810000,
          "count" : 1,
          "aggregationType" : "NONE"
        },
        "memory" : {
          "number" : 252544,
          "count" : 1,
          "aggregationType" : "NONE"
        }
      }
    },
...

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added bug Something isn't working Search:Performance labels Nov 21, 2024
@ansjcy ansjcy changed the title Improve serialization for TaskResourceInfo [WIP] Improve serialization for TaskResourceInfo Nov 21, 2024
Copy link
Contributor

❌ Gradle check result for 8fc3f92: ABORTED

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabh6788
Copy link
Contributor

@ansjcy you can also run performance benchmarks on your PR as well to see how it compares against nightly baseline runs.
See https://github.com/opensearch-project/OpenSearch/blob/main/PERFORMANCE_BENCHMARKS.md.

@andrross
Copy link
Member

andrross commented Nov 27, 2024

@ansjcy Why are we string-ifying and stashing this object into the ThreadContext at all? If we need to send data between nodes, we should be using the transport protocol and be serializing this thing using the Writable abstraction. This response headers thing seems to be intended for the REST API. Do you really want this thing to be sent back to rest clients as an HTTP header?

Update: this appears to be where the idea to use the thread context came from: #13172 (comment)

@rishabhmaurya What do you think here? I can see how using the thread context can be easier, but serializing this object to/from a string any any hot path code can be expensive (and doing a binary format into a base64 string feels hacky). Also the "response headers" concept seems like it is meant to be propagated ultimately as an HTTP header and that's not the case here.

@rishabhmaurya
Copy link
Contributor

rishabhmaurya commented Dec 2, 2024

@andrross thanks for opening #16635. Features like these should be well benchmarked and profiled before merging them in.
I think task resource consumption across cluster belongs to distributed telemetry feature category and usually context propagation both in OpenTelemetry and in our version of Otel in OpenSearch happens using thread context.

Do you really want this thing to be sent back to rest clients as an HTTP header?

Ideally, we should not expose any header on 9200 port exposed to the end clients and should be only limited to 9300 transport. @ansjcy could you please confirm?

I'm still of the opinion that using thread context and attaching header to transport actions is probably fine here if we sample the requests especially when they are of hot code path like search and overhead is high like more than 1% of overall cpu time.
We need to build a robust and trustworthy benchmark around cluster insights to profile and validate the overhead across releases to avoid running into situation like these. @ansjcy what checks do we have in place currently?
Features like cluster insights can do more bad than good if overhead is high and we have to be extra careful enabling them hot paths like search without sampling in place.

@rishabhmaurya
Copy link
Contributor

rishabhmaurya commented Dec 2, 2024

#13172 (comment) this comments kind of contradicts what we are seeing in #16635

@andrross
Copy link
Member

andrross commented Dec 2, 2024

#13172 (comment) this comments kind of contradicts what we are seeing in #16635

@rishabhmaurya To be fair, I'm not surprised this overhead wouldn't be noticeable in most normal workloads. I was deliberately profiling a query that was as simple as possible looking for other overhead. The macro benchmarks have enough variation that the addition of small overhead is hard to spot.

Ideally, we should not expose any header on 9200 port exposed to the end clients and should be only limited to 9300 transport

The round tripping through a string is what ends up being costly here. I'm guessing the concept of these headers comes originally from HTTP where they were just string values? Now that we're stashing complex objects it would be better just to keep them as binary, since they end up being written and read as binary via StreamInput/StreamOutput anyway.

@rishabhmaurya
Copy link
Contributor

rishabhmaurya commented Dec 3, 2024

The round tripping through a string is what ends up being costly here. I'm guessing the concept of these headers comes originally from HTTP where they were just string values? Now that we're stashing complex objects it would be better just to keep them as binary, since they end up being written and read as binary via StreamInput/StreamOutput anyway.

Ah! I missed your point earlier. Ideally, in any telemetry solution, we inject trace or some form of id and key, value pair baggage to it.
I'm thinking if we have a scope of reducing all information we are sending over wire and also into thread context to some form of id? like task id/trace id and just stash the measurements associated with them, which is minimal?

If i understand this logic correct, below is the what we are injecting -

    out.writeString(action);
    out.writeLong(taskId);
    out.writeLong(parentTaskId);
    out.writeString(nodeId);
    out.writeVLong(cpuTimeInNanos);
    out.writeVLong(memoryInBytes);

@andrross I think we might be able to get rid of some of these fields like action, parentTaskId and cordinator node can be get the parentTaskId and action corrensponding to a given task. Do you still consider it as a complex object?

I agree that we can probably read and write them as binary via StreamInput/StreamOutput. I think reason why we are converting it to string here is because the ThreadContext doesn't expose such API where we can directly pass byte values -


Maybe we can explore exposing such API, it definitely looks a little complicated at first look.

@andrross
Copy link
Member

andrross commented Dec 3, 2024

Do you still consider it as a complex object?

Our transport protocol defines these "headers" at a very low layer and they are typed as a simple map of string key-value pairs (or a string key to a set of strings in the case of response headers). If we have use cases to write data that doesn't conform to those types then it is "complex" in that it doesn't seem to match the original intent of the headers data structure. Round-tripping non-string data through a string here is a shortcut to avoid the (admittedly very complex!) work of changing the transport protocol to natively accept different data types.

I think the options to consider here are:

  1. Do nothing. Maybe my workload showing 7% CPU time on the serde is so unrepresentative to not matter in practice.
  2. Continue roundtripping the data through strings, but do it more efficiently than writing a whole JSON object (this PR is one way to do this option)
  3. Refactor the transport protocol to allow for writing binary header values

I honestly don't love option 2 because it will add complexity (particularly to handle backward compatibility) and seems like only a modest improvement over the status quo. And we probably need more evidence than my one profiling exercise to say that it is worth it to do option 3. @rishabhmaurya what do you think?

@dblock
Copy link
Member

dblock commented Dec 4, 2024

I imagine a protobuf-based implementation doesn't suffer from this problem? So there's 4 which is aggressively replace the binary protocol with that? I am not saying not to do 3, it's worth it as a short term solution, too.

@andrross
Copy link
Member

andrross commented Dec 4, 2024

I imagine a protobuf-based implementation doesn't suffer from this problem? So there's 4 which is aggressively replace the binary protocol with that? I am not saying not to do 3, it's worth it as a short term solution, too.

@dblock The binary serialization isn't really the issue here. We've built this whole ThreadContext abstraction layer on top of the binary protocol, and that is where the complex work comes in to make changes. Whether it is protobuf or StreamInput/StreamOutput at the bottom doesn't really change a whole lot here.

@rishabhmaurya
Copy link
Contributor

@andrross we should explore option 3 to allow binary values to be injected directly. @ansjcy it would be nice if we can evaluate it if not implement it right away.

I'm also concerned about its future usage, if we add more measurement and their computation logic into hot code paths, how do ensure that they don't run into unwanted cpu time increase. it is not an easy problem to solve, so looking for ideas on adding safety nets here.

@ansjcy
Copy link
Member Author

ansjcy commented Dec 5, 2024

Thansk for all the comments! Going through them I agree

Refactor the transport protocol to allow for writing binary header values

should be something we can explore here. Since as .andrross mentioned, the 2nd approach (this PR) can cause certain backward compatibility problems.

Another approach to totally avoid adding complexity to the hot search path is, instead of piggybacking the data with each shard search response, we can consider the "alternative approach" discussed here #12399: we can write an asynchronous post-processor job as part of the query insights data consumption pipeline. This post-processor would periodically gather data from data nodes and correlate it with queries to calculate the final resource usage accurately. What do you think about this approach? :) @rishabh6788 @andrross @dblock

I actually had a POC to explore this early this year: #12473

@andrross
Copy link
Member

This post-processor would periodically gather data from data nodes

@ansjcy Does this introduce a new bottleneck for very large clusters where the single cluster manager node has to aggregate data from all data nodes (assuming the post-processor runs on the cluster manager)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Search:Performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] TaskResourceTrackingService consuming more CPU than expected
5 participants