From bfe95318a1fccc407825558b2e0db687cd18c99d Mon Sep 17 00:00:00 2001 From: David Zane Date: Wed, 7 Feb 2024 12:22:50 -0800 Subject: [PATCH] Tracing for deep search path Signed-off-by: David Zane --- CHANGELOG.md | 1 + .../core/listener/QueryInsightsListener.java | 2 +- .../search/AbstractSearchAsyncAction.java | 3 +- .../action/search/SearchRequestContext.java | 2 +- .../SearchRequestOperationsListener.java | 6 +- .../action/search/SearchRequestSlowLog.java | 2 +- .../action/search/SearchRequestStats.java | 2 +- .../action/search/TransportSearchAction.java | 106 +++++++++++------- .../telemetry/tracing/AttributeNames.java | 15 +++ .../telemetry/tracing/SpanBuilder.java | 40 +++++++ ...ceableSearchRequestOperationsListener.java | 73 ++++++++++++ ...erationsCompositeListenerFactoryTests.java | 2 +- .../SearchRequestOperationsListenerTests.java | 2 +- .../search/SearchRequestStatsTests.java | 4 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../telemetry/tracing/SpanBuilderTests.java | 11 ++ 16 files changed, 222 insertions(+), 52 deletions(-) create mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java diff --git a/CHANGELOG.md b/CHANGELOG.md index da03b2e4430a6..4a908d1b27007 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887)) - [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028)) +- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 705273f52a567..9ec8673147c38 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -116,7 +116,7 @@ public void onPhaseStart(SearchPhaseContext context) {} public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - public void onPhaseFailure(SearchPhaseContext context) {} + public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} @Override public void onRequestStart(SearchRequestContext searchRequestContext) {} diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 3c27d3ce59e4c..88f9dac401a56 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -220,6 +220,7 @@ public final void start() { null ) ); + onRequestEnd(searchRequestContext); return; } executePhase(this); @@ -717,7 +718,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { if (SearchPhaseName.isValidName(phase.getName())) { - this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause); } raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 383d9b5e82fe2..b8bbde65ca6bc 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -78,7 +78,7 @@ void setTotalHits(TotalHits totalHits) { this.totalHits = totalHits; } - TotalHits totalHits() { + public TotalHits totalHits() { return totalHits; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 2acb35af667f0..d00acd572d939 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -35,7 +35,7 @@ protected SearchRequestOperationsListener(final boolean enabled) { protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - protected abstract void onPhaseFailure(SearchPhaseContext context); + protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause); protected void onRequestStart(SearchRequestContext searchRequestContext) {} @@ -91,10 +91,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onPhaseFailure(context); + listener.onPhaseFailure(context, cause); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 74e04d976cb1c..a9a07c6aca7f4 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -140,7 +140,7 @@ protected void onPhaseStart(SearchPhaseContext context) {} protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - protected void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} @Override protected void onRequestStart(SearchRequestContext searchRequestContext) {} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index ac32b08afb7f6..97ef94055faf7 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 79e599ec9387b..da19f01c079a4 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -88,6 +88,12 @@ import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; +import org.opensearch.telemetry.tracing.listener.TraceableSearchRequestOperationsListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterAware; import org.opensearch.transport.RemoteClusterService; @@ -173,6 +179,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -215,6 +223,7 @@ public TransportSearchAction( this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory; clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); + this.tracer = tracer; } private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) { @@ -431,49 +440,68 @@ private void executeRequest( if (originalSearchRequest.isPhaseTook() == null) { originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED)); } - SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory - .buildCompositeListener(originalSearchRequest, logger); - SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); - searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); - - PipelinedRequest searchRequest; - ActionListener listener; - try { - searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); - listener = searchRequest.transformResponseListener(originalListener); - } catch (Exception e) { - originalListener.onFailure(e); - return; - } - ActionListener requestTransformListener = ActionListener.wrap(sr -> { - if (searchQueryMetricsEnabled) { - try { - searchQueryCategorizer.categorize(sr.source()); - } catch (Exception e) { - logger.error("Error while trying to categorize the query.", e); - } - } + final Span requestSpan = tracer.startSpan(SpanBuilder.from(task, actionName)); + try (final SpanScope spanScope = tracer.withSpanInScope(requestSpan)) { - ActionListener rewriteListener = buildRewriteListener( - sr, - task, - timeProvider, - searchAsyncActionProvider, - listener, - searchRequestContext - ); - if (sr.source() == null) { - rewriteListener.onResponse(sr.source()); + SearchRequestOperationsListener.CompositeListener requestOperationsListeners; + if (tracer.isRecording()) { + originalListener = TraceableActionListener.create(originalListener, requestSpan, tracer); + TraceableSearchRequestOperationsListener traceableSearchRequestOperationsListener = + new TraceableSearchRequestOperationsListener(tracer, requestSpan); + requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener( + originalSearchRequest, + logger, + traceableSearchRequestOperationsListener + ); } else { - Rewriteable.rewriteAndFetch( - sr.source(), - searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), - rewriteListener + requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener( + originalSearchRequest, + logger ); } - }, listener::onFailure); - searchRequest.transformRequest(requestTransformListener); + SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); + searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); + + PipelinedRequest searchRequest; + ActionListener listener; + try { + searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); + listener = searchRequest.transformResponseListener(originalListener); + } catch (Exception e) { + originalListener.onFailure(e); + return; + } + + ActionListener requestTransformListener = ActionListener.wrap(sr -> { + if (searchQueryMetricsEnabled) { + try { + searchQueryCategorizer.categorize(sr.source()); + } catch (Exception e) { + logger.error("Error while trying to categorize the query.", e); + } + } + + ActionListener rewriteListener = buildRewriteListener( + sr, + task, + timeProvider, + searchAsyncActionProvider, + listener, + searchRequestContext + ); + if (sr.source() == null) { + rewriteListener.onResponse(sr.source()); + } else { + Rewriteable.rewriteAndFetch( + sr.source(), + searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), + rewriteListener + ); + } + }, listener::onFailure); + searchRequest.transformRequest(requestTransformListener); + } } private ActionListener buildRewriteListener( diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index b6b2cf360d1c5..70ce212eea582 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -70,6 +70,16 @@ private AttributeNames() { */ public static final String TRANSPORT_ACTION = "action"; + /** + * Task id + */ + public static final String TASK_ID = "task_id"; + + /** + * Parent task id + */ + public static final String PARENT_TASK_ID = "parent_task_id"; + /** * Index Name */ @@ -94,4 +104,9 @@ private AttributeNames() { * Refresh Policy */ public static final String REFRESH_POLICY = "refresh_policy"; + + /** + * Search Response Total Hits + */ + public static final String TOTAL_HITS = "total_hits"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 1dce422943b7a..5c76c4327da59 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -14,6 +14,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.http.HttpRequest; import org.opensearch.rest.RestRequest; +import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.attributes.Attributes; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.Transport; @@ -170,4 +171,43 @@ private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequ return attributes; } + /** + * Creates {@link SpanCreationContext} with parent set to specified SpanContext. + * @param spanName name of span. + * @param parentSpan target parent span. + * @return context + */ + public static SpanCreationContext from(String spanName, SpanContext parentSpan) { + return SpanCreationContext.server().name(spanName).parent(parentSpan); + } + + /** + * Creates {@link SpanCreationContext} with parent set to specified SpanContext. + * @param task search task. + * @param actionName action. + * @return context + */ + public static SpanCreationContext from(Task task, String actionName) { + return SpanCreationContext.server().name(createSpanName(task, actionName)).attributes(buildSpanAttributes(task, actionName)); + } + + private static Attributes buildSpanAttributes(Task task, String actionName) { + Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, actionName); + if (task != null) { + attributes.addAttribute(AttributeNames.TASK_ID, task.getId()); + if (task.getParentTaskId() != null && task.getParentTaskId().isSet()) { + attributes.addAttribute(AttributeNames.PARENT_TASK_ID, task.getParentTaskId().getId()); + } + } + return attributes; + + } + + private static String createSpanName(Task task, String actionName) { + if (task != null) { + return task.getType() + SEPARATOR + task.getAction(); + } else { + return actionName; + } + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java new file mode 100644 index 0000000000000..0acc7d9067eb8 --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.telemetry.tracing.AttributeNames; +import org.opensearch.telemetry.tracing.ScopedSpan; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanContext; +import org.opensearch.telemetry.tracing.Tracer; + +import static org.opensearch.core.common.Strings.capitalize; + +/** + * SearchRequestOperationsListener subscriber for search request tracing + * + * @opensearch.internal + */ +public final class TraceableSearchRequestOperationsListener extends SearchRequestOperationsListener { + private final Tracer tracer; + private final Span requestSpan; + private ScopedSpan phaseScopedSpan; + + public TraceableSearchRequestOperationsListener(final Tracer tracer, final Span requestSpan) { + this.tracer = tracer; + this.requestSpan = requestSpan; + this.phaseScopedSpan = null; + } + + @Override + protected void onPhaseStart(SearchPhaseContext context) { + assert phaseScopedSpan == null : "There should be only one search phase active at a time"; + phaseScopedSpan = tracer.startScopedSpan( + SpanBuilder.from(capitalize(context.getCurrentPhase().getName()), new SpanContext(requestSpan)) + ); + } + + @Override + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + assert phaseScopedSpan != null : "There should be a search phase active at that time"; + phaseScopedSpan.close(); + phaseScopedSpan = null; + } + + @Override + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { + assert phaseScopedSpan != null : "There should be a search phase active at that time"; + phaseScopedSpan.setError(new Exception(cause)); + phaseScopedSpan.close(); + phaseScopedSpan = null; + } + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + // add response-related attributes on request end + requestSpan.addAttribute( + AttributeNames.TOTAL_HITS, + searchRequestContext.totalHits() == null ? 0 : searchRequestContext.totalHits().value + ); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 1cb336e18b12c..845543fbd9f57 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -125,7 +125,7 @@ protected void onPhaseStart(SearchPhaseContext context) {} protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - protected void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} }; } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java index a53c35a8401b3..990ed95f1aebc 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -40,7 +40,7 @@ public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRe } @Override - public void onPhaseFailure(SearchPhaseContext context) { + public void onPhaseFailure(SearchPhaseContext context, Throwable cause) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } }; diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 377ccebbfd418..fb9b26e3f3ad1 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -36,7 +36,7 @@ public void testSearchRequestPhaseFailure() { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); testRequestStats.onPhaseStart(ctx); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseFailure(ctx, new Throwable()); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); } } @@ -156,7 +156,7 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseFailure(ctx, new Throwable()); countDownLatch.countDown(); }); threads[i].start(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 7c50e961853b5..4b43b5f9c3ff5 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2310,7 +2310,8 @@ public void onFailure(final Exception e) { client ), NoopMetricsRegistry.INSTANCE, - searchRequestOperationsCompositeListenerFactory + searchRequestOperationsCompositeListenerFactory, + NoopTracer.INSTANCE ) ); actions.put( diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java index b4183412cdf02..9cc0ea03db25e 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java @@ -19,6 +19,7 @@ import org.opensearch.http.HttpResponse; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.telemetry.tracing.noop.NoopSpan; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportException; @@ -67,6 +68,16 @@ public void testTransportContext() { assertEquals(connection.getNode().getHostAddress(), attributes.getAttributesMap().get(AttributeNames.TRANSPORT_TARGET_HOST)); } + public void testParentSpan() { + String spanName = "test-name"; + SpanContext parentSpanContext = new SpanContext(NoopSpan.INSTANCE); + SpanCreationContext context = SpanBuilder.from(spanName, parentSpanContext); + Attributes attributes = context.getAttributes(); + assertNull(attributes); + assertEquals(spanName, context.getSpanName()); + assertEquals(parentSpanContext, context.getParent()); + } + private static Transport.Connection createTransportConnection() { return new Transport.Connection() { @Override