Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
Signed-off-by: Chaitanya Gohel <[email protected]>
  • Loading branch information
gashutos authored Nov 20, 2023
2 parents 882f1ae + 00517eb commit d552d15
Show file tree
Hide file tree
Showing 41 changed files with 431 additions and 188 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993))
- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873))
- Handle canMatchSearchAfter for frozen context scenario ([#11249](https://github.com/opensearch-project/OpenSearch/pull/11249))

### Security
Expand Down Expand Up @@ -119,7 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `org.codehaus.woodstox:stax2-api` from 4.2.1 to 4.2.2 ([#10639](https://github.com/opensearch-project/OpenSearch/pull/10639))
- Bump `com.google.http-client:google-http-client` from 1.43.2 to 1.43.3 ([#10635](https://github.com/opensearch-project/OpenSearch/pull/10635))
- Bump `com.squareup.okio:okio` from 3.5.0 to 3.6.0 ([#10637](https://github.com/opensearch-project/OpenSearch/pull/10637))
- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.21.1 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000))
- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.22.0 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000), [#11270](https://github.com/opensearch-project/OpenSearch/pull/11270))
- Bump `aws-actions/configure-aws-credentials` from 2 to 4 ([#10504](https://github.com/opensearch-project/OpenSearch/pull/10504))
- Bump `stefanzweifel/git-auto-commit-action` from 4 to 5 ([#11171](https://github.com/opensearch-project/OpenSearch/pull/11171))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repositories {
}

dependencies {
implementation "org.apache.logging.log4j:log4j-core:2.21.1"
implementation "org.apache.logging.log4j:log4j-core:2.22.0"
}

["0.0.1", "0.0.2"].forEach { v ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt
public static SpanScope create(Span span, TracerContextStorage<String, Span> tracerContextStorage) {
final SpanScope beforeSpanScope = spanScopeThreadLocal.get();
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage);
spanScopeThreadLocal.set(newSpanScope);
return newSpanScope;
}

@Override
public void close() {
detach();
spanScopeThreadLocal.set(previousSpanScope);
}

@Override
public SpanScope attach() {
spanScopeThreadLocal.set(this);
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
return this;
}

private void detach() {
spanScopeThreadLocal.set(previousSpanScope);
if (previousSpanScope != null) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -53,7 +53,6 @@ public Span startSpan(SpanCreationContext context) {
parentSpan = getCurrentSpanInternal();
}
Span span = createSpan(context, parentSpan);
setCurrentSpanInContext(span);
addDefaultAttributes(span);
return span;
}
Expand Down Expand Up @@ -94,10 +93,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan
return tracingTelemetry.createSpan(spanCreationContext, parentSpan);
}

private void setCurrentSpanInContext(Span span) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span);
}

/**
* Adds default attributes in the span
* @param span the current active span
Expand All @@ -107,7 +102,7 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.http.HttpTracer;
import org.opensearch.telemetry.tracing.transport.TransportTracer;

import java.io.Closeable;

Expand All @@ -22,7 +22,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public interface Tracer extends HttpTracer, Closeable {
public interface Tracer extends TransportTracer, Closeable {
/**
* Starts the {@link Span} with given {@link SpanCreationContext}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand All @@ -36,7 +36,7 @@ public interface TracingContextPropagator {
* @param headers request headers to extract the context from
* @return current span
*/
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);
Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers);

/**
* Injects tracing context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public void close() {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
* TransportTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HTTP or TCP transport headers and propagate the span accordingly.
* <p>
* All methods on the Tracer object are multi-thread safe.
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface HttpTracer {
public interface TransportTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanCreationContext span name.
* @param header http request header.
* @return span.
* @param headers transport headers
* @return the span instance
*/
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Contains No-op implementations
* Contains HTTP or TCP transport related tracer capabilities
*/
package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase {
private Span mockSpan;
private Span mockParentSpan;

private SpanScope mockSpanScope;
private ThreadPool threadPool;
private ExecutorService executorService;
private SpanCreationContext spanCreationContext;
Expand Down Expand Up @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1"));
assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2"));
assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3"));
assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4"));
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
assertEquals(1.0, ((MockSpan) span).getAttribute("key1"));
assertEquals(2l, ((MockSpan) span).getAttribute("key2"));
assertEquals(true, ((MockSpan) span).getAttribute("key3"));
assertEquals("key4", ((MockSpan) span).getAttribute("key4"));
span.endSpan();
}

Expand All @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() {

Span span = defaultTracer.startSpan(spanCreationContext, null);

SpanContext parentSpan = defaultTracer.getCurrentSpan();

SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());
try (final SpanScope scope = defaultTracer.withSpanInScope(span)) {
SpanContext parentSpan = defaultTracer.getCurrentSpan();

Span span1 = defaultTracer.startSpan(spanCreationContext1);
SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());

assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
span1.endSpan();
span.endSpan();
try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) {
assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}
} finally {
span.endSpan();
}
}

@SuppressWarnings("unchecked")
Expand All @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
span.endSpan();
}

Expand Down Expand Up @@ -403,7 +405,6 @@ private void setupMocks() {
mockTracingTelemetry = mock(TracingTelemetry.class);
mockSpan = mock(Span.class);
mockParentSpan = mock(Span.class);
mockSpanScope = mock(SpanScope.class);
mockTracerContextStorage = mock(TracerContextStorage.class);
when(mockSpan.getSpanName()).thenReturn("span_name");
when(mockSpan.getSpanId()).thenReturn("span_id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,6 @@ public void testInvalidFieldMember() {
}

public void testSpecialValueVariable() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/10079",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
// i.e. _value for aggregations
createIndex("test");
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class ExpressionAggregationScript implements AggregationScript.LeafFactory {
final SimpleBindings bindings;
final DoubleValuesSource source;
final boolean needsScore;
final ReplaceableConstDoubleValueSource specialValue; // _value
final PerThreadReplaceableConstDoubleValueSource specialValue; // _value

ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, ReplaceableConstDoubleValueSource v) {
ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, PerThreadReplaceableConstDoubleValueSource v) {
exprScript = e;
bindings = b;
source = exprScript.getDoubleValuesSource(bindings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ private static AggregationScript.LeafFactory newAggregationScript(
// instead of complicating SimpleBindings (which should stay simple)
SimpleBindings bindings = new SimpleBindings();
boolean needsScores = false;
ReplaceableConstDoubleValueSource specialValue = null;
PerThreadReplaceableConstDoubleValueSource specialValue = null;
for (String variable : expr.variables) {
try {
if (variable.equals("_score")) {
bindings.add("_score", DoubleValuesSource.SCORES);
needsScores = true;
} else if (variable.equals("_value")) {
specialValue = new ReplaceableConstDoubleValueSource();
specialValue = new PerThreadReplaceableConstDoubleValueSource();
bindings.add("_value", specialValue);
// noop: _value is special for aggregations, and is handled in ExpressionScriptBindings
// TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a
Expand Down Expand Up @@ -388,15 +388,15 @@ private static ScoreScript.LeafFactory newScoreScript(Expression expr, SearchLoo
// NOTE: if we need to do anything complicated with bindings in the future, we can just extend Bindings,
// instead of complicating SimpleBindings (which should stay simple)
SimpleBindings bindings = new SimpleBindings();
ReplaceableConstDoubleValueSource specialValue = null;
PerThreadReplaceableConstDoubleValueSource specialValue = null;
boolean needsScores = false;
for (String variable : expr.variables) {
try {
if (variable.equals("_score")) {
bindings.add("_score", DoubleValuesSource.SCORES);
needsScores = true;
} else if (variable.equals("_value")) {
specialValue = new ReplaceableConstDoubleValueSource();
specialValue = new PerThreadReplaceableConstDoubleValueSource();
bindings.add("_value", specialValue);
// noop: _value is special for aggregations, and is handled in ExpressionScriptBindings
// TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@
import org.apache.lucene.search.IndexSearcher;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double.
* A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. This is made
* thread-safe for concurrent segment search use case by keeping the {@link DoubleValues} per thread. Any update to the value happens in
* thread specific {@link DoubleValuesSource} instance.
*/
final class ReplaceableConstDoubleValueSource extends DoubleValuesSource {
final ReplaceableConstDoubleValues fv;
final class PerThreadReplaceableConstDoubleValueSource extends DoubleValuesSource {
// Multiple slices can be processed by same thread but that will be sequential, so keeping per thread is fine
final Map<Long, ReplaceableConstDoubleValues> perThreadDoubleValues;

ReplaceableConstDoubleValueSource() {
fv = new ReplaceableConstDoubleValues();
PerThreadReplaceableConstDoubleValueSource() {
perThreadDoubleValues = new ConcurrentHashMap<>();
}

@Override
public DoubleValues getValues(LeafReaderContext ctx, DoubleValues scores) throws IOException {
return fv;
return perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues());
}

@Override
Expand All @@ -62,7 +67,11 @@ public boolean needsScores() {

@Override
public Explanation explain(LeafReaderContext ctx, int docId, Explanation scoreExplanation) throws IOException {
if (fv.advanceExact(docId)) return Explanation.match((float) fv.doubleValue(), "ReplaceableConstDoubleValues");
final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(
Thread.currentThread().getId(),
threadId -> new ReplaceableConstDoubleValues()
);
if (currentFv.advanceExact(docId)) return Explanation.match((float) currentFv.doubleValue(), "ReplaceableConstDoubleValues");
else return Explanation.noMatch("ReplaceableConstDoubleValues");
}

Expand All @@ -77,7 +86,11 @@ public int hashCode() {
}

public void setValue(double v) {
fv.setValue(v);
final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(
Thread.currentThread().getId(),
threadId -> new ReplaceableConstDoubleValues()
);
currentFv.setValue(v);
}

@Override
Expand Down
Loading

0 comments on commit d552d15

Please sign in to comment.