diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6590696 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +*/logzio-storage +*.iml +*target +build.sh diff --git a/README.md b/README.md index 51b6a64..a506067 100644 --- a/README.md +++ b/README.md @@ -1 +1,70 @@ -# zipkin-logzio +# Ship Zipkin traces + +Zipkin-Logz.io Trace Storage is a storage option for Zipkin distributed traces on your Logz.io account. +It functions as both a collector and a span store. + +**Note**: + This integration requires Logz.io API access. + The Logz.io API is available for all Enterprise accounts. + If you're on a Pro account, reach out to your account manager or the Sales team to request API access. + +### Limitations + +When you use the Zipkin UI to find traces stored in Logz.io, there are a couple limitations. +For most users, these won't be an issue, but they're still good to know: + +* **Lookback** must be 2 days or less +* **Limit** must be 1000 traces or less + +## To integrate Zipkin server and Logz.io + +### 1. Download Zipkin server and Zipkin-Logz.io Trace Storage + +Download [Zipkin server](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec). + +```shell +wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec' +``` + +Download the [Zipkin-Logz.io Trace Storage](https://github.com/logzio/zipkin-logzio/releases) jar to the same directory. + +### 2. Run Zipkin server with the Logz.io extension + +You can configure the Logz.io extension with shell variables or environment variables. + +For a complete list of options, see the parameters below the code block.👇 + +```bash +STORAGE_TYPE=logzio \ +LOGZIO_ACCOUNT_TOKEN= \ +LOGZIO_LISTENER_HOST= \ +LOGZIO_API_TOKEN= \ +LOGZIO_API_HOST= \ +java -Dloader.path='zipkin-logzio.jar,zipkin-logzio.jar!lib' -Dspring.profiles.active=logzio -cp zipkin.jar org.springframework.boot.loader.PropertiesLauncher +``` + +**Pro tip**: +You can optionally run two discrete Zipkin-Logzio Trace Storage instances if you want to separate shipping and reading of your traces. +If you do, then the required fields change a bit from what's shown in the Parameters list: + +* The **shipping instance** uses `STORAGE_TYPE=logzio`, `LOGZIO_ACCOUNT_TOKEN`, and `LOGZIO_LISTENER_HOST`. +* The **reading instance** uses `STORAGE_TYPE=logzio`, `LOGZIO_API_TOKEN`, and `LOGZIO_API_HOST`. + +**Parameters** + +| Parameter | Description | +|---|---| +| **STORAGE_TYPE=logzio** | **Required**.
We wish there was a way to include this as a default. Alas, Zipkin needs it, so you'll need to include this bit. | +| **LOGZIO_ACCOUNT_TOKEN** | **Required**.
Required when using as a collector to ship logs to Logz.io.
Replace `` with the [token](https://app.logz.io/#/dashboard/settings/general) of the account you want to ship to. | +| **LOGZIO_API_TOKEN** | **Required**.
Required to read back traces from Logz.io.
Replace `` with an [API token](https://app.logz.io/#/dashboard/settings/api-tokens) from the account you want to use. | +| **LOGZIO_LISTENER_HOST** | **Default**: `listener.logz.io`
Replace `` with your region's listener URL. For more information on finding your account's region, see [Account region](https://docs.logz.io/user-guide/accounts/account-region.html). | +| **LOGZIO_API_HOST** | **Default**: `api.logz.io`
Required to read back spans from Logz.io.
Replace `` with your region's base API URL. For more information on finding your account's region, see [Account region](https://docs.logz.io/user-guide/accounts/account-region.html). | +| **STRICT_TRACE_ID** | **Default**: `true`
Use `false` if your version of Zipkin server generates 64-bit trace IDs (version 1.14 or lower). If `false`, spans are grouped by the rightmost 16 characters of the trace ID. For version 1.15 or later, we recommend leaving the default. | +| **SENDER_DRAIN_INTERVAL** | **Default**: `5`
Time interval, in seconds, to send the traces accumulated on the disk. | +| **CLEAN_SENT_TRACES_INTERVAL** | **Default**: `30`
Time interval, in seconds, to clean sent traces from the disk. | + +### 3. Check Logz.io for your traces + +Give your traces some time to get from your system to ours, and then open [Kibana](https://app.logz.io/#/dashboard/kibana). + +If you still don't see your logs, see [log shipping troubleshooting](https://docs.logz.io/user-guide/log-shipping/log-shipping-troubleshooting.html). \ No newline at end of file diff --git a/autoconfigure/pom.xml b/autoconfigure/pom.xml new file mode 100644 index 0000000..9df40d1 --- /dev/null +++ b/autoconfigure/pom.xml @@ -0,0 +1,28 @@ + + + 4.0.0 + + + io.logz.zipkin + zipkin-logzio-parent + + ${zipkin-logzio-version} + + + zipkin-autoconfigure + ${zipkin-logzio-version} + pom + + + ${project.basedir}/.. + + 1.8 + java18 + + + + storage-logzio-autoconfigure + + \ No newline at end of file diff --git a/autoconfigure/storage-logzio-autoconfigure/pom.xml b/autoconfigure/storage-logzio-autoconfigure/pom.xml new file mode 100644 index 0000000..a703bdf --- /dev/null +++ b/autoconfigure/storage-logzio-autoconfigure/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + + io.logz.zipkin + zipkin-autoconfigure + + ${zipkin-logzio-version} + + + + + + io.logz.zipkin + zipkin-storage-logzio + ${zipkin-logzio-version} + + + + + + + + maven-assembly-plugin + + + + fully.qualified.MainClass + + + + jar-with-dependencies + + false + + + + make-assembly + package + + single + + + + + + + + zipkin-autoconfigure-storage-logzio + + ${project.basedir}/../.. + + \ No newline at end of file diff --git a/autoconfigure/storage-logzio-autoconfigure/src/main/java/zipkin/autoconfigure/storage/logzio/ZipkinLogzioStorageAutoConfiguration.java b/autoconfigure/storage-logzio-autoconfigure/src/main/java/zipkin/autoconfigure/storage/logzio/ZipkinLogzioStorageAutoConfiguration.java new file mode 100644 index 0000000..feff302 --- /dev/null +++ b/autoconfigure/storage-logzio-autoconfigure/src/main/java/zipkin/autoconfigure/storage/logzio/ZipkinLogzioStorageAutoConfiguration.java @@ -0,0 +1,23 @@ +package zipkin.autoconfigure.storage.logzio; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import zipkin2.storage.StorageComponent; + +@Configuration +@EnableConfigurationProperties(ZipkinLogzioStorageProperties.class) +@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "logzio") +@ConditionalOnMissingBean(StorageComponent.class) +class ZipkinLogzioStorageAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + StorageComponent storage(ZipkinLogzioStorageProperties properties, + @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId) { + return properties.toBuilder().strictTraceId(strictTraceId).build(); + } +} diff --git a/autoconfigure/storage-logzio-autoconfigure/src/main/java/zipkin/autoconfigure/storage/logzio/ZipkinLogzioStorageProperties.java b/autoconfigure/storage-logzio-autoconfigure/src/main/java/zipkin/autoconfigure/storage/logzio/ZipkinLogzioStorageProperties.java new file mode 100644 index 0000000..722b330 --- /dev/null +++ b/autoconfigure/storage-logzio-autoconfigure/src/main/java/zipkin/autoconfigure/storage/logzio/ZipkinLogzioStorageProperties.java @@ -0,0 +1,91 @@ +package zipkin.autoconfigure.storage.logzio; + +import ch.qos.logback.classic.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import zipkin2.storage.logzio.ConsumerParams; +import zipkin2.storage.logzio.LogzioStorage; +import zipkin2.storage.logzio.LogzioStorageParams; + +import java.io.Serializable; + +@ConfigurationProperties("zipkin.storage.logzio") +class ZipkinLogzioStorageProperties implements Serializable { // for Spark jobs + private static final long serialVersionUID = 0L; + private static final Logger logger = (Logger) LoggerFactory.getLogger(ZipkinLogzioStorageProperties.class.getName()); + private static final String HTTPS_PREFIX = "https://"; + private static final String SEARCH_API_SUFFIX = "/v1/search"; + private static final String LISTENER_HTTPS_PORT = ":8071"; + + private String logzioAccountToken; + private String logzioApiToken; + private String logzioListenerHost; + private String logzioApiHost; + private int senderDrainInterval; + private int cleanSentTracesInterval; + + public void setLogzioAccountToken(String logzioAccountToken) { + this.logzioAccountToken = logzioAccountToken; + } + + public String getLogzioAccountToken() { + return this.logzioAccountToken; + } + + public void setLogzioApiToken(String logzioApiToken) { + this.logzioApiToken = logzioApiToken; + } + + public String getLogzioApiToken() { + return this.logzioApiToken; + } + + public String getLogzioListenerHost() { + return logzioListenerHost; + } + + public void setLogzioListenerHost(String logzioListenerHost) { + this.logzioListenerHost = logzioListenerHost; + } + + public String getLogzioApiHost() { + return logzioApiHost; + } + + public int getSenderDrainInterval() { + return senderDrainInterval; + } + + public void setSenderDrainInterval(int senderDrainInterval) { + this.senderDrainInterval = senderDrainInterval; + } + + public int getCleanSentTracesInterval() { + return cleanSentTracesInterval; + } + + public void setCleanSentTracesInterval(int cleanSentTracesInterval) { + this.cleanSentTracesInterval = cleanSentTracesInterval; + } + + public void setLogzioApiHost(String logzioApiHost) { + this.logzioApiHost = logzioApiHost; + } + + public LogzioStorage.Builder toBuilder() { + LogzioStorageParams config = new LogzioStorageParams(); + ConsumerParams consumerParams = new ConsumerParams(); + consumerParams.setListenerUrl(HTTPS_PREFIX + logzioListenerHost + LISTENER_HTTPS_PORT); + consumerParams.setAccountToken(logzioAccountToken); + consumerParams.setSenderDrainInterval(senderDrainInterval); + consumerParams.setCleanSentTracesInterval(cleanSentTracesInterval); + config.setConsumerParams(consumerParams); + config.setApiToken(logzioApiToken); + config.setSearchApiUrl(HTTPS_PREFIX + logzioApiHost + SEARCH_API_SUFFIX); + logger.info(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "config " + config.toString()); + return LogzioStorage.newBuilder().config(config); + } + + +} + diff --git a/autoconfigure/storage-logzio-autoconfigure/src/main/resources/META-INF/spring.factories b/autoconfigure/storage-logzio-autoconfigure/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..72d88ad --- /dev/null +++ b/autoconfigure/storage-logzio-autoconfigure/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +zipkin.autoconfigure.storage.logzio.ZipkinLogzioStorageAutoConfiguration diff --git a/autoconfigure/storage-logzio-autoconfigure/src/main/resources/zipkin-server-logzio.yml b/autoconfigure/storage-logzio-autoconfigure/src/main/resources/zipkin-server-logzio.yml new file mode 100644 index 0000000..90c7684 --- /dev/null +++ b/autoconfigure/storage-logzio-autoconfigure/src/main/resources/zipkin-server-logzio.yml @@ -0,0 +1,14 @@ +# When enabled, this allows shorter env properties (ex -Dspring.profiles.active=logzio) +zipkin: + storage: + strict-trace-id: ${STRICT_TRACE_ID:true} + logzio: + logzio-listener-host: ${LOGZIO_LISTENER_HOST:listener.logz.io} + logzio-api-token: ${LOGZIO_API_TOKEN:} + logzio-api-host: ${LOGZIO_API_HOST:api.logz.io} + logzio-account-token: ${LOGZIO_ACCOUNT_TOKEN:} + sender-drain-interval: ${SENDER_DRAIN_INTERVAL:5} + clean-sent-traces-interval: ${CLEAN_SENT_TRACES_INTERVAL:30} + + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..4121e48 --- /dev/null +++ b/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + io.logz.zipkin + zipkin-logzio-parent + ${zipkin-logzio-version} + pom + + + 0.0.1 + + + + autoconfigure + storage-logzio + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + maven-jar-plugin + 2.4 + + + + true + true + + true + + + + + + + + + io.zipkin.zipkin2 + zipkin + 2.12.0 + + + + org.springframework.boot + spring-boot-autoconfigure + 2.1.1.RELEASE + provided + + + ch.qos.logback + logback-classic + + + + + org.springframework.boot + spring-boot-test + 2.1.1.RELEASE + + test + + + ch.qos.logback + logback-classic + + + + + + org.slf4j + slf4j-api + 1.7.15 + + + + ch.qos.logback + logback-classic + 1.2.3 + + + + com.fasterxml.jackson.core + jackson-databind + 2.9.8 + + + com.google.guava + guava + 27.1-jre + + + + \ No newline at end of file diff --git a/storage-logzio/pom.xml b/storage-logzio/pom.xml new file mode 100644 index 0000000..33fb568 --- /dev/null +++ b/storage-logzio/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + io.logz.zipkin + zipkin-logzio-parent + + ${zipkin-logzio-version} + + + zipkin-storage-logzio + + + ${project.basedir}/.. + + + + + io.logz.sender + logzio-sender + 1.0.18 + + + com.googlecode.json-simple + json-simple + 1.1 + + + org.json + json + 20180813 + provided + + + junit + junit + 4.12 + test + + + + com.squareup.okio + okio + 2.1.0 + provided + + + + com.squareup.moshi + moshi + 1.8.0 + + + io.zipkin.zipkin2 + zipkin-storage-elasticsearch + 2.12.0 + + + org.mock-server + mockserver-netty + 5.5.1 + + + com.squareup.okhttp3 + mockwebserver + 3.12.1 + test + + + \ No newline at end of file diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/BodyConverters.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/BodyConverters.java new file mode 100644 index 0000000..e9bd032 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/BodyConverters.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.logzio; + +import com.squareup.moshi.JsonReader; +import okio.BufferedSource; +import zipkin2.Span; +import zipkin2.elasticsearch.internal.client.HttpCall; +import zipkin2.elasticsearch.internal.client.SearchResultConverter; + +import java.io.IOException; +import java.util.List; + +import static zipkin2.elasticsearch.internal.JsonReaders.collectValuesNamed; + +public final class BodyConverters { + static final HttpCall.BodyConverter NULL = + new HttpCall.BodyConverter() { + @Override + public Object convert(BufferedSource content) { + return null; + } + }; + static final HttpCall.BodyConverter> KEYS = + new HttpCall.BodyConverter>() { + @Override + public List convert(BufferedSource b) throws IOException { + return collectValuesNamed(JsonReader.of(b), "key"); + } + }; + public static final HttpCall.BodyConverter> SPANS = + SearchResultConverter.create(JsonAdapters.SPAN_ADAPTER); +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/ConsumerParams.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/ConsumerParams.java new file mode 100644 index 0000000..8d4dec4 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/ConsumerParams.java @@ -0,0 +1,113 @@ +package zipkin2.storage.logzio; + +import io.logz.sender.HttpsRequestConfiguration; +import io.logz.sender.LogzioSender; +import io.logz.sender.SenderStatusReporter; +import io.logz.sender.exceptions.LogzioParameterErrorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public class ConsumerParams { + + private static final Logger logger = LoggerFactory.getLogger(ConsumerParams.class); + + private String url; + public static final String type = "zipkinSpan"; + private String accountToken; + private final int threadPoolSize = 3; + private final boolean compressRequests = true; + private ScheduledExecutorService senderExecutors; + + // Disk queue parameters + private File queueDir; + private final int fileSystemFullPercentThreshold = 98; + private final int diskSpaceCheckInterval = 1000; + private int senderDrainInterval; + private int cleanSentTracesInterval; + + public ConsumerParams() { + String queuePath = System.getProperty("user.dir"); + queuePath += queuePath.endsWith("/") ? "" : "/"; + queuePath += "logzio-storage"; + this.queueDir = new File(queuePath); + } + + public String getUrl() { + return url; + } + + public void setListenerUrl(String url) { + this.url = url; + } + + public String getAccountToken() { + return accountToken; + } + + public void setAccountToken(String accountToken) { + this.accountToken = accountToken; + } + + public ScheduledExecutorService getSenderExecutors() { + return senderExecutors; + } + + public LogzioSender getLogzioSender() { + HttpsRequestConfiguration requestConf; + try { + requestConf = HttpsRequestConfiguration + .builder() + .setLogzioListenerUrl(getUrl()) + .setLogzioType(this.type) + .setLogzioToken(getAccountToken()) + .setCompressRequests(this.compressRequests) + .build(); + } catch (LogzioParameterErrorException e) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "problem in one or more parameters with error {}", e.getMessage()); + return null; + } + senderExecutors = Executors.newScheduledThreadPool(this.threadPoolSize); + SenderStatusReporter statusReporter = StatusReporterFactory.newSenderStatusReporter(LoggerFactory.getLogger(LogzioSender.class)); + LogzioSender.Builder senderBuilder = LogzioSender + .builder() + .setTasksExecutor(senderExecutors) + .setReporter(statusReporter) + .setHttpsRequestConfiguration(requestConf) + .setDebug(true) + .setDrainTimeoutSec(this.senderDrainInterval) + .withDiskQueue() + .setQueueDir(this.queueDir) + .setCheckDiskSpaceInterval(this.diskSpaceCheckInterval) + .setFsPercentThreshold(this.fileSystemFullPercentThreshold) + .setGcPersistedQueueFilesIntervalSeconds(this.cleanSentTracesInterval) + .endDiskQueue(); + try { + return senderBuilder.build(); + } catch (LogzioParameterErrorException e) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "problem in one or more parameters with error {}", e.getMessage()); + } + return null; + } + + @Override + public String toString() { + return "SpanConsumerConfig{" + + "listener_url='" + url + '\'' + + " account_token=" + (accountToken.isEmpty() ? "" : "********" + accountToken.substring(accountToken.length() - 4)) + + " cleanSentTracesInterval=" + cleanSentTracesInterval + + " senderDrainInterval=" + senderDrainInterval + + '}'; + } + + public void setSenderDrainInterval(int senderDrainInterval) { + this.senderDrainInterval = senderDrainInterval; + } + + public void setCleanSentTracesInterval(int cleanSentTracesInterval) { + this.cleanSentTracesInterval = cleanSentTracesInterval; + } +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/JsonAdapters.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/JsonAdapters.java new file mode 100644 index 0000000..c524e6c --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/JsonAdapters.java @@ -0,0 +1,218 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.logzio; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.JsonReader; +import com.squareup.moshi.JsonWriter; +import zipkin2.Annotation; +import zipkin2.DependencyLink; +import zipkin2.Endpoint; +import zipkin2.Span; +import zipkin2.internal.Nullable; + +import java.io.IOException; + +/** + * Read-only json adapters resurrected from before we switched to Java 6 as storage components can + * be Java 7+ + */ +final class JsonAdapters { + static final JsonAdapter SPAN_ADAPTER = + new JsonAdapter() { + @Override + public Span fromJson(JsonReader reader) throws IOException { + Span.Builder result = Span.newBuilder(); + reader.beginObject(); + while (reader.hasNext()) { + String nextName = reader.nextName(); + if (reader.peek() == JsonReader.Token.NULL) { + reader.skipValue(); + continue; + } + switch (nextName) { + case "traceId": + result.traceId(reader.nextString()); + break; + case "parentId": + result.parentId(reader.nextString()); + break; + case "id": + result.id(reader.nextString()); + break; + case "kind": + result.kind(Span.Kind.valueOf(reader.nextString())); + break; + case "name": + result.name(reader.nextString()); + break; + case "timestamp_micro": + result.timestamp(reader.nextLong()); + break; + case "duration": + result.duration(reader.nextLong()); + break; + case "localEndpoint": + result.localEndpoint(ENDPOINT_ADAPTER.fromJson(reader)); + break; + case "remoteEndpoint": + result.remoteEndpoint(ENDPOINT_ADAPTER.fromJson(reader)); + break; + case "annotations": + reader.beginArray(); + while (reader.hasNext()) { + Annotation a = ANNOTATION_ADAPTER.fromJson(reader); + result.addAnnotation(a.timestamp(), a.value()); + } + reader.endArray(); + break; + case "spanTags": + reader.beginObject(); + while (reader.hasNext()) { + result.putTag(reader.nextName(), reader.nextString()); + } + reader.endObject(); + break; + case "debug": + result.debug(reader.nextBoolean()); + break; + case "shared": + result.shared(reader.nextBoolean()); + break; + default: + reader.skipValue(); + } + } + reader.endObject(); + return result.build(); + } + + @Override + public void toJson(JsonWriter writer, @Nullable Span value) { + throw new UnsupportedOperationException(); + } + }; + + static final JsonAdapter ANNOTATION_ADAPTER = + new JsonAdapter() { + @Override + public Annotation fromJson(JsonReader reader) throws IOException { + reader.beginObject(); + Long timestamp = null; + String value = null; + while (reader.hasNext()) { + switch (reader.nextName()) { + case "timestamp": + timestamp = reader.nextLong(); + break; + case "value": + value = reader.nextString(); + break; + default: + reader.skipValue(); + } + } + reader.endObject(); + if (timestamp == null || value == null) { + throw new IllegalStateException("Incomplete annotation at " + reader.getPath()); + } + return Annotation.create(timestamp, value); + } + + @Override + public void toJson(JsonWriter writer, @Nullable Annotation value) { + throw new UnsupportedOperationException(); + } + }; + + static final JsonAdapter ENDPOINT_ADAPTER = + new JsonAdapter() { + @Override + public Endpoint fromJson(JsonReader reader) throws IOException { + reader.beginObject(); + String serviceName = null, ipv4 = null, ipv6 = null; + int port = 0; + while (reader.hasNext()) { + String nextName = reader.nextName(); + if (reader.peek() == JsonReader.Token.NULL) { + reader.skipValue(); + continue; + } + switch (nextName) { + case "serviceName": + serviceName = reader.nextString(); + break; + case "ipv4": + ipv4 = reader.nextString(); + break; + case "ipv6": + ipv6 = reader.nextString(); + break; + case "port": + port = reader.nextInt(); + break; + default: + reader.skipValue(); + } + } + reader.endObject(); + if (serviceName == null && ipv4 == null && ipv6 == null && port == 0) return null; + return Endpoint.newBuilder() + .serviceName(serviceName) + .ip(ipv4) + .ip(ipv6) + .port(port) + .build(); + } + + @Override + public void toJson(JsonWriter writer, @Nullable Endpoint value) { + throw new UnsupportedOperationException(); + } + }.nullSafe(); + + static final JsonAdapter DEPENDENCY_LINK_ADAPTER = + new JsonAdapter() { + @Override + public DependencyLink fromJson(JsonReader reader) throws IOException { + DependencyLink.Builder result = DependencyLink.newBuilder(); + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case "parent": + result.parent(reader.nextString()); + break; + case "child": + result.child(reader.nextString()); + break; + case "callCount": + result.callCount(reader.nextLong()); + break; + case "errorCount": + result.errorCount(reader.nextLong()); + break; + default: + reader.skipValue(); + } + } + reader.endObject(); + return result.build(); + } + + @Override + public void toJson(JsonWriter writer, @Nullable DependencyLink value) { + throw new UnsupportedOperationException(); + } + }; +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioSpanConsumer.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioSpanConsumer.java new file mode 100644 index 0000000..3c3c042 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioSpanConsumer.java @@ -0,0 +1,176 @@ +package zipkin2.storage.logzio; + +import com.squareup.moshi.JsonWriter; +import io.logz.sender.LogzioSender; +import okio.Buffer; +import okio.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Annotation; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; +import zipkin2.storage.SpanConsumer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class LogzioSpanConsumer implements SpanConsumer { + + private static final int INDEX_CHARS_LIMIT = 256; + private final ByteString EMPTY_JSON = ByteString.of(new byte[]{'{', '}'}); + private volatile LogzioSender logzioSender; + private volatile ExecutorService senderExecutors; + private volatile boolean closeCalled; + private static final Logger logger = LoggerFactory.getLogger(LogzioStorage.class); + + public LogzioSpanConsumer(ConsumerParams params) { + if (logzioSender == null && !params.getAccountToken().isEmpty()) { + synchronized (this) { + if (logzioSender == null) { + logzioSender = params.getLogzioSender(); + logzioSender.start(); + senderExecutors = params.getSenderExecutors(); + } + } + } + } + + public Call accept(List spans) { + if (closeCalled) throw new IllegalStateException("storage is closed"); + if (spans.isEmpty()) { + return Call.create(null); + } + byte[] message = new byte[0]; + try { + message = spansToJsonBytes(spans); + } catch (IOException e) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "failed converting spans to byte array: {}", e.getMessage()); + } + return new LogzioCall(message); + } + + private LogzioSender getSender() { + return logzioSender; + } + + public void close() { + if (closeCalled) { + return; + } + logzioSender.stop(); + if (senderExecutors != null) { + logger.info(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "Submitting shutdown request"); + senderExecutors.shutdown(); + boolean shutdownResult = false; + try { + shutdownResult = senderExecutors.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "Shutdown was interrupted"); + } + if (!shutdownResult) { + senderExecutors.shutdownNow(); + } + } + this.closeCalled = true; + } + + private byte[] spansToJsonBytes(List spans) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + + for (Span span : spans) { + long spanTimestamp = span.timestampAsLong(); + if (spanTimestamp != 0L) { + spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp); + } + byte[] spanBytes = getSpanBytes(span, spanTimestamp); + bos.write(spanBytes); + bos.write("\n".getBytes(StandardCharsets.UTF_8)); + } + return bos.toByteArray(); + } + } + + private byte[] getSpanBytes(Span span, long timestampMillis) { + return prefixWithTimestampMillisAndQuery(span, timestampMillis); + } + + private byte[] prefixWithTimestampMillisAndQuery(Span span, long timestampMillis) { + Buffer prefix = new Buffer(); + JsonWriter writer = JsonWriter.of(prefix); + try { + writer.beginObject(); + + if (timestampMillis != 0L) writer.name(LogzioStorage.JSON_TIMESTAMP_MILLIS_FIELD).value(timestampMillis); + if (!span.tags().isEmpty() || !span.annotations().isEmpty()) { + writer.name(LogzioStorage.JSON_ANNOTATION); + writer.beginArray(); + for (Annotation a : span.annotations()) { + if (a.value().length() > INDEX_CHARS_LIMIT) continue; + writer.value(a.value()); + } + for (Map.Entry tag : span.tags().entrySet()) { + int length = tag.getKey().length() + tag.getValue().length() + 1; + if (length > INDEX_CHARS_LIMIT) continue; + writer.value(tag.getKey()); // search is possible by key alone + writer.value(tag.getKey() + "=" + tag.getValue()); + } + writer.endArray(); + } + writer.endObject(); + } catch (IOException e) { + // very unexpected to have an IOE for an in-memory write + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "failed to add timestamp or tags to span: " + span + ":" + e.getMessage()); + + return SpanBytesEncoder.JSON_V2.encode(span); + } + byte[] document = SpanBytesEncoder.JSON_V2.encode(span); + if (prefix.rangeEquals(0L, EMPTY_JSON)) return document; + return mergeJson(prefix.readByteArray(), document); + } + + private byte[] mergeJson(byte[] prefix, byte[] suffix) { + byte[] newSpanBytes = new byte[prefix.length + suffix.length - 1]; + int pos = 0; + System.arraycopy(prefix, 0, newSpanBytes, pos, prefix.length); + pos += prefix.length; + newSpanBytes[pos - 1] = ','; + // starting at position 1 discards the old head of '{' + System.arraycopy(suffix, 1, newSpanBytes, pos, suffix.length - 1); + return newSpanBytes; + } + + public class LogzioCall extends Call.Base { + byte[] message; + + public LogzioCall(byte[] message) { + this.message = message; + } + + @Override + protected Void doExecute() { + getSender().send(message); + getSender().drainQueueAndSend(); + return null; + } + + @Override + protected void doEnqueue(Callback callback) { + getSender().send(message); + callback.onSuccess(null); + } + + @Override + public Call clone() { + return new LogzioCall(message.clone()); + } + } + +} + diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioSpanStore.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioSpanStore.java new file mode 100644 index 0000000..53b361b --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioSpanStore.java @@ -0,0 +1,173 @@ +package zipkin2.storage.logzio; + +import ch.qos.logback.classic.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Call; +import zipkin2.DependencyLink; +import zipkin2.Span; +import zipkin2.elasticsearch.internal.client.HttpCall; +import zipkin2.storage.GroupByTraceId; +import zipkin2.storage.QueryRequest; +import zipkin2.storage.SpanStore; +import zipkin2.storage.StrictTraceId; +import zipkin2.storage.logzio.client.Aggregation; +import zipkin2.storage.logzio.client.SearchCallFactory; +import zipkin2.storage.logzio.client.SearchRequest; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class LogzioSpanStore implements SpanStore { + private static final int MAX_AGGREGATION_SIZE = 1000; + private final long namesLookback; + private final boolean strictTraceId; + private final SearchCallFactory search; + private final Call.Mapper, List>> groupByTraceId; + private static final Logger logger = (Logger) LoggerFactory.getLogger(LogzioSpanStore.class.getName()); + + + public LogzioSpanStore(LogzioStorage storage, String apiToken) { + this.search = new SearchCallFactory(storage.http(), apiToken); + this.strictTraceId = storage.isStrictTraceId(); + this.groupByTraceId = GroupByTraceId.create(strictTraceId); + this.namesLookback = 1000*60*60*48L; //48 hours + } + + @Override + public Call>> getTraces(QueryRequest request) { + long endMillis = request.endTs(); + long beginMillis = endMillis - request.lookback(); + SearchRequest.Filters filters = new SearchRequest.Filters(); + filters.addRange(LogzioStorage.JSON_TIMESTAMP_FIELD, beginMillis, endMillis); + if (request.serviceName() != null) { + filters.addTerm(LogzioStorage.JSON_SERVICE_NAME_FIELD, request.serviceName()); + } + + if (request.spanName() != null) { + filters.addTerm(LogzioStorage.JSON_NAME_FIELD, request.spanName()); + } + + for (Map.Entry kv : request.annotationQuery().entrySet()) { + if (kv.getValue().isEmpty()) { + filters.addTerm(LogzioStorage.JSON_ANNOTATION, kv.getKey()); + } else { + filters.addTerm(LogzioStorage.JSON_ANNOTATION, kv.getKey() + "=" + kv.getValue()); + } + } + + if (request.minDuration() != null) { + filters.addRange(LogzioStorage.JSON_DURATION_FIELD, request.minDuration(), request.maxDuration()); + } + + // We need to filter to traces that contain at least one span that matches the request, + // but the zipkin API is supposed to order traces by first span, regardless of if it was + // filtered or not. This is not possible without either multiple, heavyweight queries + // or complex multiple indexing, defeating much of the elegance of using elasticsearch for this. + // So we fudge and order on the first span among the filtered spans - in practice, there should + // be no significant difference in user experience since span start times are usually very + // close to each other in human time. + if (request.limit() > 1000) { + throw new IllegalArgumentException(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "max search size is 1000"); + } + Aggregation traceIdTimestamp = + Aggregation.terms(LogzioStorage.JSON_TRACE_ID_FIELD, request.limit()) + .addSubAggregation(Aggregation.min(LogzioStorage.JSON_TIMESTAMP_FIELD)) + .orderBy(LogzioStorage.JSON_TIMESTAMP_FIELD, "desc"); + + SearchRequest esRequest = + SearchRequest.create().filters(filters).addAggregation(traceIdTimestamp); + HttpCall> traceIdsCall = search.newCall(esRequest, BodyConverters.KEYS); + + Call>> result = + traceIdsCall.flatMap(new GetSpansByTraceId(search, beginMillis, endMillis)).map(groupByTraceId); + // Elasticsearch lookup by trace ID is by the full 128-bit length, but there's still a chance of + // clash on lower-64 bit. When strict trace ID is enabled, we only filter client-side on clash. + return strictTraceId ? result.map(StrictTraceId.filterTraces(request)) : result; + } + + @Override + public Call> getTrace(String traceId) { + traceId = Span.normalizeTraceId(traceId); + if (!strictTraceId && traceId.length() == 32) traceId = traceId.substring(16); + SearchRequest.Filters filter = new SearchRequest.Filters().addTerm(LogzioStorage.JSON_TRACE_ID_FIELD, traceId); + SearchRequest request = SearchRequest.create().filters(filter); + return search.newCall(request, BodyConverters.SPANS); + } + + @Override + public Call> getServiceNames() { + long endMillis = System.currentTimeMillis(); + long beginMillis = endMillis - namesLookback; + + // Service name queries include both local and remote endpoints. This is different than + // Span name, as a span name can only be on a local endpoint. + SearchRequest.Filters filters = new SearchRequest.Filters(); + filters.addRange(LogzioStorage.JSON_TIMESTAMP_FIELD, beginMillis, endMillis); + SearchRequest request = + SearchRequest.create() + .filters(filters) + .addAggregation(Aggregation.terms(LogzioStorage.JSON_SERVICE_NAME_FIELD, MAX_AGGREGATION_SIZE)) + .addAggregation(Aggregation.terms(LogzioStorage.JSON_REMOTE_SERVICE_NAME_FIELD, MAX_AGGREGATION_SIZE)); + return search.newCall(request, BodyConverters.KEYS); + } + + @Override + public Call> getSpanNames(String serviceName) { + if (serviceName.isEmpty()) return Call.emptyList(); + + long endMillis = System.currentTimeMillis(); + long beginMillis = endMillis - namesLookback; + + // A span name is only valid on a local endpoint, as a span name is defined locally + SearchRequest.Filters filters = + new SearchRequest.Filters() + .addRange(LogzioStorage.JSON_TIMESTAMP_FIELD, beginMillis, endMillis) + .addTerm(LogzioStorage.JSON_SERVICE_NAME_FIELD, serviceName.toLowerCase(Locale.ROOT)); + + SearchRequest request = + SearchRequest.create() + .filters(filters) + .addAggregation(Aggregation.terms(LogzioStorage.JSON_NAME_FIELD, MAX_AGGREGATION_SIZE)); + + return search.newCall(request, BodyConverters.KEYS); + } + + @Override + public Call> getDependencies(long endTs, long lookback) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "Zipkin-logz.io doesn't support dependencies analysis"); + return Call.emptyList(); + } + + static final class GetSpansByTraceId implements Call.FlatMapper, List> { + final SearchCallFactory search; + private long beginMillis; + private long endMillis; + + public GetSpansByTraceId(SearchCallFactory search, long beginMillis, long endMillis) { + this.search = search; + this.beginMillis = beginMillis; + this.endMillis = endMillis; + long complement = TimeUnit.DAYS.toMillis(1) - (endMillis - beginMillis); + if (complement > 0) { //search trace ID's for longer period (24 hours) to include spans that are out of the time range + this.endMillis = Math.min(System.currentTimeMillis(), this.endMillis + Math.round(complement / 2)); + this.beginMillis -= TimeUnit.DAYS.toMillis(1) - (this.endMillis - this.beginMillis); + } + } + + @Override + public Call> map(List inputs) { + if (inputs.isEmpty()) return Call.emptyList(); + SearchRequest.Filters filters = new SearchRequest.Filters(); + + if (beginMillis != 0) { + filters = new SearchRequest.Filters().addRange(LogzioStorage.JSON_TIMESTAMP_FIELD, beginMillis, endMillis); + } + filters.addTerms(LogzioStorage.JSON_TRACE_ID_FIELD, inputs); + SearchRequest getTraces = SearchRequest.create().filters(filters); + return search.newCall(getTraces, BodyConverters.SPANS); + } + + } +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioStorage.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioStorage.java new file mode 100644 index 0000000..208dc16 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioStorage.java @@ -0,0 +1,134 @@ +package zipkin2.storage.logzio; + +import com.google.auto.value.extension.memoized.Memoized; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.elasticsearch.internal.client.HttpCall; +import zipkin2.storage.SpanConsumer; +import zipkin2.storage.SpanStore; +import zipkin2.storage.StorageComponent; + +public final class LogzioStorage extends StorageComponent { + + + private static final int MAX_HTTP_REQUESTS = 64; + public static final String JSON_TIMESTAMP_MILLIS_FIELD = "timestamp_millis"; + public static final String JSON_ANNOTATION = "_q"; + public static final String JSON_TIMESTAMP_FIELD = "@timestamp"; + public static final String JSON_SERVICE_NAME_FIELD = "localEndpoint.serviceName"; + public static final String JSON_NAME_FIELD = "name"; + public static final String JSON_DURATION_FIELD = "duration"; + public static final String JSON_TRACE_ID_FIELD = "traceId"; + public static final String JSON_REMOTE_SERVICE_NAME_FIELD = "remoteEndpoint.serviceName"; + public static final String ZIPKIN_LOGZIO_STORAGE_MSG = "[zipkin-logzio-storage] "; + private volatile boolean closeCalled; + private static final Logger logger = LoggerFactory.getLogger(LogzioStorage.class); + private final LogzioSpanConsumer spanConsumer; + private final LogzioSpanStore spanStore; + private boolean strictTraceId; + private String logzioApiHost; + + public static Builder newBuilder() { + return new Builder(); + } + + private LogzioStorage(LogzioStorageParams config) { + this.strictTraceId = config.isStrictTraceId(); + + if (!config.getConsumerParams().getAccountToken().isEmpty()) { + this.spanConsumer = new LogzioSpanConsumer(config.getConsumerParams()); + } else { + logger.warn(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "logz.io account token was not supplied, couldn't generate span consumer (traces will not be stored)"); + this.spanConsumer = null; + } + if (!config.getApiToken().isEmpty()) { + if (config.getSearchApiUrl() != null) { + logzioApiHost = config.getSearchApiUrl(); + } + this.spanStore = new LogzioSpanStore(this, config.getApiToken()); + } else { + logger.warn(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "logz.io API token was not supplied, couldn't generate span store (traces will be stored but not shown)"); + this.spanStore = null; + } + } + + public SpanStore spanStore() { + if (this.spanStore == null) { + throw new IllegalArgumentException("logz.io API token was not supplied, couldn't generate span store"); + } + return this.spanStore; + } + + public SpanConsumer spanConsumer() { + return this.spanConsumer; + } + + public boolean isStrictTraceId() { + return strictTraceId; + } + + // hosts resolution might imply a network call, and we might make a new okhttp instance + @Memoized + public + HttpCall.Factory http() { + OkHttpClient ok = client(); + ok.dispatcher().setMaxRequests(MAX_HTTP_REQUESTS); + ok.dispatcher().setMaxRequestsPerHost(MAX_HTTP_REQUESTS); + return new HttpCall.Factory(ok, HttpUrl.parse(logzioApiHost)); + } + + private OkHttpClient client() { + return new OkHttpClient(); + } + + @Override + public synchronized void close() { + if (closeCalled) return; + if (spanConsumer != null) { + spanConsumer.close(); + } + if (spanStore != null) { + http().close(); + } + closeCalled = true; + + } + + public static final class Builder extends StorageComponent.Builder { + private LogzioStorageParams storageParams; + + @Override + public Builder strictTraceId(boolean strictTraceId) { + this.storageParams.setStrictTraceId(strictTraceId); + return this; + } + + /** + * Ignored since is mandatory + * + * @param searchEnabled + */ + @Override + public Builder searchEnabled(boolean searchEnabled) { + return this; + } + + public Builder config(LogzioStorageParams storageParams) { + this.storageParams = storageParams; + return this; + } + + @Override + public LogzioStorage build() { + if (this.storageParams.getConsumerParams().getAccountToken().isEmpty() && this.storageParams.getApiToken().isEmpty()) { + throw new IllegalArgumentException("At least one of logz.io account token or api-token has to be valid"); + } + return new LogzioStorage(this.storageParams); + } + + Builder() { + } + } +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioStorageParams.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioStorageParams.java new file mode 100644 index 0000000..99311e2 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/LogzioStorageParams.java @@ -0,0 +1,56 @@ +package zipkin2.storage.logzio; + +public class LogzioStorageParams { + + private String apiToken; + + private ConsumerParams consumerParams; + private boolean strictTraceId = true; + private String searchApiUrl; + + public LogzioStorageParams() { + consumerParams = new ConsumerParams(); + } + + public ConsumerParams getConsumerParams() { + return consumerParams; + } + + @Override + public String toString() { + return "LogzioStorageConfig{" + + consumerParams.toString() + + " ApiToken=" + (apiToken.isEmpty() ? "" : "********" + apiToken.substring(apiToken.length()-4)) + "," + + " StrictTracId=" + strictTraceId + "," + + " ApiURL=" + searchApiUrl + + "}"; + } + + public String getApiToken() { + return apiToken; + } + + public void setApiToken(String apiToken) { + this.apiToken = apiToken; + } + + public void setConsumerParams(ConsumerParams consumerParams) { + this.consumerParams = consumerParams; + } + + public void setStrictTraceId(boolean strictTraceId) { + this.strictTraceId = strictTraceId; + } + + public boolean isStrictTraceId() { + return strictTraceId; + } + + public void setSearchApiUrl(String url) { + this.searchApiUrl = url; + } + + public String getSearchApiUrl() { + return searchApiUrl; + } +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/PseudoAddressRecordSet.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/PseudoAddressRecordSet.java new file mode 100644 index 0000000..b0862a5 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/PseudoAddressRecordSet.java @@ -0,0 +1,285 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.logzio; + +import okhttp3.Dns; +import okhttp3.HttpUrl; +import zipkin2.internal.Nullable; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * This returns a Dns provider that combines the IPv4 or IPv6 addresses from a supplied list of + * urls, provided they are all http and share the same port. + */ +final class PseudoAddressRecordSet { + + static Dns create(List urls, Dns actualDns) { + Set schemes = new LinkedHashSet<>(); + Set hosts = new LinkedHashSet<>(); + Set ipAddresses = new LinkedHashSet<>(); + Set ports = new LinkedHashSet<>(); + + for (String url : urls) { + HttpUrl httpUrl = HttpUrl.parse(url); + schemes.add(httpUrl.scheme()); + + // Kick out if we can't cheaply read the address + byte[] addressBytes = null; + try { + addressBytes = ipStringToBytes(httpUrl.host()); + } catch (RuntimeException e) { + } + + if (addressBytes != null) { + try { + ipAddresses.add(InetAddress.getByAddress(addressBytes)); + } catch (UnknownHostException e) { + hosts.add(httpUrl.host()); + } + } else { + hosts.add(httpUrl.host()); + } + ports.add(httpUrl.port()); + } + + if (ports.size() != 1) { + throw new IllegalArgumentException("Only one port supported with multiple hosts " + urls); + } + if (schemes.size() != 1 || !schemes.iterator().next().equals("http")) { + throw new IllegalArgumentException("Only http supported with multiple hosts " + urls); + } + + if (hosts.isEmpty()) return new StaticDns(ipAddresses); + return new ConcatenatingDns(ipAddresses, hosts, actualDns); + } + + static final class StaticDns implements Dns { + private final List ipAddresses; + + StaticDns(Set ipAddresses) { + this.ipAddresses = new ArrayList<>(ipAddresses); + } + + @Override + public List lookup(String hostname) { + return ipAddresses; + } + + @Override + public String toString() { + return "StaticDns(" + ipAddresses + ")"; + } + } + + static final class ConcatenatingDns implements Dns { + final Set ipAddresses; + final Set hosts; + final Dns actualDns; + + ConcatenatingDns(Set ipAddresses, Set hosts, Dns actualDns) { + this.ipAddresses = ipAddresses; + this.hosts = hosts; + this.actualDns = actualDns; + } + + @Override + public List lookup(String hostname) throws UnknownHostException { + List result = new ArrayList<>(ipAddresses.size() + hosts.size()); + result.addAll(ipAddresses); + for (String host : hosts) { + result.addAll(actualDns.lookup(host)); + } + return result; + } + + @Override + public String toString() { + return "ConcatenatingDns(" + ipAddresses + "," + hosts + ")"; + } + } + + // ** Start code from Guava v20 **// + private static final int IPV4_PART_COUNT = 4; + private static final int IPV6_PART_COUNT = 8; + + /** + * Returns the {@link InetAddress#getAddress()} having the given string representation or null if + * unable to parse. + * + *

This deliberately avoids all nameservice lookups (e.g. no DNS). + * + *

This is the same as com.google.common.net.InetAddresses.ipStringToBytes(), except internally + * Splitter isn't used (as that would introduce more dependencies). + * + * @param ipString {@code String} containing an IPv4 or IPv6 string literal, e.g. {@code + * "192.168.0.1"} or {@code "2001:db8::1"} + */ + @Nullable + static byte[] ipStringToBytes(String ipString) { + // PATCHED! adding null/empty escape + if (ipString == null || ipString.isEmpty()) return null; + // Make a first pass to categorize the characters in this string. + boolean hasColon = false; + boolean hasDot = false; + for (int i = 0; i < ipString.length(); i++) { + char c = ipString.charAt(i); + if (c == '.') { + hasDot = true; + } else if (c == ':') { + if (hasDot) { + return null; // Colons must not appear after dots. + } + hasColon = true; + } else if (Character.digit(c, 16) == -1) { + return null; // Everything else must be a decimal or hex digit. + } + } + + // Now decide which address family to parse. + if (hasColon) { + if (hasDot) { + ipString = convertDottedQuadToHex(ipString); + if (ipString == null) { + return null; + } + } + return textToNumericFormatV6(ipString); + } else if (hasDot) { + return textToNumericFormatV4(ipString); + } + return null; + } + + @Nullable + private static byte[] textToNumericFormatV4(String ipString) { + byte[] bytes = new byte[IPV4_PART_COUNT]; + int i = 0; + try { + // PATCHED! for (String octet : IPV4_SPLITTER.split(ipString)) { + for (String octet : ipString.split("\\.", 5)) { + bytes[i++] = parseOctet(octet); + } + } catch (NumberFormatException ex) { + return null; + } + + return i == IPV4_PART_COUNT ? bytes : null; + } + + @Nullable + private static byte[] textToNumericFormatV6(String ipString) { + // An address can have [2..8] colons, and N colons make N+1 parts. + String[] parts = ipString.split(":", IPV6_PART_COUNT + 2); + if (parts.length < 3 || parts.length > IPV6_PART_COUNT + 1) { + return null; + } + + // Disregarding the endpoints, find "::" with nothing in between. + // This indicates that a run of zeroes has been skipped. + int skipIndex = -1; + for (int i = 1; i < parts.length - 1; i++) { + if (parts[i].length() == 0) { + if (skipIndex >= 0) { + return null; // Can't have more than one :: + } + skipIndex = i; + } + } + + int partsHi; // Number of parts to copy from above/before the "::" + int partsLo; // Number of parts to copy from below/after the "::" + if (skipIndex >= 0) { + // If we found a "::", then check if it also covers the endpoints. + partsHi = skipIndex; + partsLo = parts.length - skipIndex - 1; + if (parts[0].length() == 0 && --partsHi != 0) { + return null; // ^: requires ^:: + } + if (parts[parts.length - 1].length() == 0 && --partsLo != 0) { + return null; // :$ requires ::$ + } + } else { + // Otherwise, allocate the entire address to partsHi. The endpoints + // could still be empty, but parseHextet() will check for that. + partsHi = parts.length; + partsLo = 0; + } + + // If we found a ::, then we must have skipped at least one part. + // Otherwise, we must have exactly the right number of parts. + int partsSkipped = IPV6_PART_COUNT - (partsHi + partsLo); + if (!(skipIndex >= 0 ? partsSkipped >= 1 : partsSkipped == 0)) { + return null; + } + + // Now parse the hextets into a byte array. + ByteBuffer rawBytes = ByteBuffer.allocate(2 * IPV6_PART_COUNT); + try { + for (int i = 0; i < partsHi; i++) { + rawBytes.putShort(parseHextet(parts[i])); + } + for (int i = 0; i < partsSkipped; i++) { + rawBytes.putShort((short) 0); + } + for (int i = partsLo; i > 0; i--) { + rawBytes.putShort(parseHextet(parts[parts.length - i])); + } + } catch (NumberFormatException ex) { + return null; + } + return rawBytes.array(); + } + + @Nullable + private static String convertDottedQuadToHex(String ipString) { + int lastColon = ipString.lastIndexOf(':'); + String initialPart = ipString.substring(0, lastColon + 1); + String dottedQuad = ipString.substring(lastColon + 1); + byte[] quad = textToNumericFormatV4(dottedQuad); + if (quad == null) { + return null; + } + String penultimate = Integer.toHexString(((quad[0] & 0xff) << 8) | (quad[1] & 0xff)); + String ultimate = Integer.toHexString(((quad[2] & 0xff) << 8) | (quad[3] & 0xff)); + return initialPart + penultimate + ":" + ultimate; + } + + private static byte parseOctet(String ipPart) { + // Note: we already verified that this string contains only hex digits. + int octet = Integer.parseInt(ipPart); + // Disallow leading zeroes, because no clear standard exists on + // whether these should be interpreted as decimal or octal. + if (octet > 255 || (ipPart.startsWith("0") && ipPart.length() > 1)) { + throw new NumberFormatException(); + } + return (byte) octet; + } + + private static short parseHextet(String ipPart) { + // Note: we already verified that this string contains only hex digits. + int hextet = Integer.parseInt(ipPart, 16); + if (hextet > 0xffff) { + throw new NumberFormatException(); + } + return (short) hextet; + } + // ** End code from Guava v20 **// +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/StatusReporterFactory.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/StatusReporterFactory.java new file mode 100644 index 0000000..2d59899 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/StatusReporterFactory.java @@ -0,0 +1,38 @@ +package zipkin2.storage.logzio; + +import io.logz.sender.SenderStatusReporter; +import org.slf4j.Logger; + + +public class StatusReporterFactory { + public static SenderStatusReporter newSenderStatusReporter(final Logger logger) { + return new SenderStatusReporter() { + + public void error(String s) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + s); + } + + public void error(String s, Throwable throwable) { + logger.error(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + s + " " + throwable.getMessage()); + } + + public void warning(String s) { + logger.warn(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + s); + } + + public void warning(String s, Throwable throwable) { + logger.warn(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + s + " " + throwable.getMessage()); + } + + @Override + public void info(String s) { + logger.debug(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + s); + } + + @Override + public void info(String s, Throwable throwable) { + logger.debug(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + s + " " + throwable.getMessage()); + } + }; + } +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/client/Aggregation.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/client/Aggregation.java new file mode 100644 index 0000000..a146238 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/client/Aggregation.java @@ -0,0 +1,56 @@ + +package zipkin2.storage.logzio.client; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +public class Aggregation { + transient final String field; + AggTerms terms; + Map min; + Map aggs; + + Aggregation(String field) { + this.field = field; + } + + public static Aggregation terms(String field, int size) { + Aggregation result = new Aggregation(field); + result.terms = new AggTerms(field, size); + return result; + } + + public Aggregation orderBy(String subAgg, String direction) { + terms.order(subAgg, direction); + return this; + } + + public static Aggregation min(String field) { + Aggregation result = new Aggregation(field); + result.min = Collections.singletonMap("field", field); + return result; + } + + static class AggTerms { + AggTerms(String field, int size) { + this.field = field; + this.size = size; + } + + final String field; + int size; + Map order; + + AggTerms order(String agg, String direction) { + order = Collections.singletonMap(agg, direction); + return this; + } + } + + public Aggregation addSubAggregation(Aggregation agg) { + if (aggs == null) aggs = new LinkedHashMap<>(); + aggs.put(agg.field, agg); + return this; + } +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/client/SearchCallFactory.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/client/SearchCallFactory.java new file mode 100644 index 0000000..ec5ce13 --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/client/SearchCallFactory.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.logzio.client; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import zipkin2.elasticsearch.internal.client.HttpCall; +import zipkin2.internal.Nullable; + +public class SearchCallFactory { + private static final MediaType APPLICATION_JSON = MediaType.parse("application/json"); + public static final String API_TOKEN_HEADER = "X-API-TOKEN"; + + private final HttpCall.Factory http; + private final String apiToken; + private final JsonAdapter searchRequest = + new Moshi.Builder().build().adapter(SearchRequest.class); + + public SearchCallFactory(HttpCall.Factory http, String apiToken) { + this.http = http; + this.apiToken = apiToken; + } + + public HttpCall newCall(SearchRequest request, HttpCall.BodyConverter bodyConverter) { + Request httpRequest = new Request.Builder().url(lenientSearch(request.type)) + .post(RequestBody.create(APPLICATION_JSON, searchRequest.toJson(request))) + .header("Content-Type", "application/json") + .header(API_TOKEN_HEADER, apiToken) + .tag(request.tag()).build(); + + return http.newCall(httpRequest, bodyConverter); + } + + private HttpUrl lenientSearch(@Nullable String type) { + HttpUrl.Builder builder = http.baseUrl.newBuilder(); + if (type != null) builder.addPathSegment(type); + return builder.build(); + } + +} diff --git a/storage-logzio/src/main/java/zipkin2/storage/logzio/client/SearchRequest.java b/storage-logzio/src/main/java/zipkin2/storage/logzio/client/SearchRequest.java new file mode 100644 index 0000000..30e274f --- /dev/null +++ b/storage-logzio/src/main/java/zipkin2/storage/logzio/client/SearchRequest.java @@ -0,0 +1,129 @@ +package zipkin2.storage.logzio.client; + +import zipkin2.internal.Nullable; +import zipkin2.storage.logzio.ConsumerParams; + +import java.util.*; + +public final class SearchRequest { + + public static SearchRequest create() { + return new SearchRequest(null); + } + + public static SearchRequest create(String type) { + return new SearchRequest(type); + } + + /** + * The maximum results returned in a query. This only affects non-aggregation requests. + * + *

Not configurable as it implies adjustments to the index template (index.max_result_window) + * + *

See https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html + */ + private static final int MAX_RESULT_WINDOW = 10000; // the default logz.io allowed limit + + @Nullable + transient final String type; + + private Integer size = MAX_RESULT_WINDOW; + private Boolean _source; + private Object query; + private Map aggs; + + SearchRequest(@Nullable String type) { + + this.type = type; + } + + public static class Filters extends ArrayList { + public Filters addRange(String field, long from, Long to) { + add(new Range(field, from, to)); + return this; + } + + public Filters addTerm(String field, String value) { + add(new Term(field, value)); + return this; + } + public Filters addTerms(String field, List values) { + add(new Terms(field, values)); + return this; + } + } + + public SearchRequest filters(Filters filters) { + filters.addTerm("type", ConsumerParams.type); + return query(new BoolQuery("must", filters)); + } + + public SearchRequest term(String field, String value) { + return query(new Term(field, value)); + } + + public SearchRequest terms(String field, List values) { + return query(new Terms(field, values)); + } + + public SearchRequest addAggregation(Aggregation agg) { + size = null; // we return aggs, not source data + _source = false; + if (aggs == null) aggs = new LinkedHashMap<>(); + aggs.put(agg.field, agg); + return this; + } + + String tag() { + return aggs != null ? "aggregation" : "search"; + } + + SearchRequest query(Object filter) { + query = Collections.singletonMap("bool", Collections.singletonMap("filter", filter)); + return this; + } + + static class Term { + final Map term; + + Term(String field, String value) { + term = Collections.singletonMap(field, value); + } + } + + static class Terms { + final Map> terms; + + Terms(String field, Collection values) { + this.terms = Collections.singletonMap(field, values); + } + } + + static class Range { + final Map range; + + Range(String field, long from, Long to) { + range = Collections.singletonMap(field, new Bounds(from, to)); + } + + static class Bounds { + final long from; + final Long to; + final boolean include_lower = true; + final boolean include_upper = true; + + Bounds(long from, Long to) { + this.from = from; + this.to = to; + } + } + } + + static class BoolQuery { + final Map bool; + + BoolQuery(String op, Object clause) { + bool = Collections.singletonMap(op, clause); + } + } +} diff --git a/storage-logzio/src/test/java/LogzioSpanConsumerTest.java b/storage-logzio/src/test/java/LogzioSpanConsumerTest.java new file mode 100644 index 0000000..c62e2cc --- /dev/null +++ b/storage-logzio/src/test/java/LogzioSpanConsumerTest.java @@ -0,0 +1,162 @@ +import org.jetbrains.annotations.NotNull; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Endpoint; +import zipkin2.Span; +import zipkin2.storage.logzio.ConsumerParams; +import zipkin2.storage.logzio.LogzioSpanConsumer; +import zipkin2.storage.logzio.LogzioStorage; +import zipkin2.storage.logzio.LogzioStorageParams; + +import java.io.IOException; +import java.util.Collections; + +import static java.lang.Thread.sleep; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class LogzioSpanConsumerTest { + private static final Logger logger = LoggerFactory.getLogger(LogzioSpanConsumerTest.class); + private static final Endpoint LOCAL_ENDPOINT = Endpoint.newBuilder().serviceName("local").build(); + private static MockServerClient mockServerClient = null; + private static ClientAndServer mockServer; + private static LogzioStorageParams storageParams = new LogzioStorageParams(); + + + @BeforeClass + public static void startMockServer() { + logger.debug(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "starting mock server"); + mockServer = startClientAndServer(8070); + + mockServerClient = new MockServerClient("localhost", 8070); + mockServerClient + .when(request().withMethod("POST")) + .respond(response().withStatusCode(200)); + } + + @BeforeClass + public static void setup() { + ConsumerParams consumerParams = new ConsumerParams(); + consumerParams.setAccountToken("notARealToken"); + consumerParams.setListenerUrl("http://127.0.0.1:8070"); + consumerParams.setCleanSentTracesInterval(30); + consumerParams.setSenderDrainInterval(3); + storageParams.setConsumerParams(consumerParams); + storageParams.setApiToken(""); + } + + @AfterClass + public static void stopMockServer() { + logger.info(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "stoping mock server..."); + mockServer.stop(); + } + + @Test + public void testConsumerAccept() { + int initialRequestsCount = mockServerClient.retrieveRecordedRequests(request().withMethod("POST")).length; + Span sampleSpan = getSampleSpan(); + LogzioStorage logzioStorage = LogzioStorage + .newBuilder() + .config(storageParams) + .build(); + LogzioSpanConsumer consumer = (LogzioSpanConsumer) logzioStorage.spanConsumer(); + try { + consumer.accept(Collections.singletonList(sampleSpan)).execute(); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + + HttpRequest[] recordedRequests = mockServerClient.retrieveRecordedRequests(request().withMethod("POST")); + Assert.assertEquals(initialRequestsCount + 1, recordedRequests.length); + String body = recordedRequests[0].getBodyAsString(); + Assert.assertTrue(body.contains("\"" + "traceId" + "\":\"" + sampleSpan.traceId() + "\"")); + Assert.assertTrue(body.contains("\"" + "kind" + "\":\"" + Span.Kind.CLIENT + "\"")); + Assert.assertTrue(body.contains("\"" + "timestamp" + "\":" + 1)); + } + + @Test + public void closeStorageTest() { + Span sampleSpan = getSampleSpan(); + LogzioStorage logzioStorage = LogzioStorage.newBuilder().config(storageParams).build(); + LogzioSpanConsumer consumer = (LogzioSpanConsumer) logzioStorage.spanConsumer(); + logzioStorage.close(); + try { + consumer.accept(Collections.singletonList(sampleSpan)).execute(); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } catch (IllegalStateException ex) { + return; + } + Assert.fail("Send traces succeeded but storage was closed"); + } + + @Test + public void interruptSenderCloseTest() { + Span sampleSpan = getSampleSpan(); + LogzioStorage logzioStorage = LogzioStorage.newBuilder().config(storageParams).build(); + LogzioSpanConsumer consumer = (LogzioSpanConsumer) logzioStorage.spanConsumer(); + + Thread storageThread = new Thread(() -> { + try { + for (int i = 0 ; i < 1000 ; i++) { + consumer.accept(Collections.singletonList(sampleSpan)); + } + } catch (IllegalStateException ex) { + logger.info("storage is closed"); + return; + } + Assert.fail("sent msgs but storage was closed"); + }); + + Thread closingThread = new Thread(logzioStorage::close); + + storageThread.start(); + closingThread.start(); + closingThread.interrupt(); + } + + @Test + public void interruptMidSendTest() { + int initialRequestsCount = mockServerClient.retrieveRecordedRequests(request().withMethod("POST")).length; + Span sampleSpan = getSampleSpan(); + LogzioStorage logzioStorage = LogzioStorage.newBuilder().config(storageParams).build(); + LogzioSpanConsumer consumer = (LogzioSpanConsumer) logzioStorage.spanConsumer(); + + Thread storageThread = new Thread(() -> { + for (int i = 0 ; i < 100 ; i++) { + try { + consumer.accept(Collections.singletonList(sampleSpan)).execute(); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + } + }); + storageThread.start(); + try { + sleep(1000); + storageThread.interrupt(); + HttpRequest[] recordedRequests = mockServerClient.retrieveRecordedRequests(request().withMethod("POST")); + Assert.assertEquals(initialRequestsCount + 100, recordedRequests.length); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + } + + private Span getSampleSpan() { + return Span.newBuilder() + .traceId("1234567890abcdef") + .id("2") + .timestamp(1L) + .localEndpoint(LOCAL_ENDPOINT) + .kind(Span.Kind.CLIENT) + .build(); + } +} diff --git a/storage-logzio/src/test/java/LogzioSpanStoreTest.java b/storage-logzio/src/test/java/LogzioSpanStoreTest.java new file mode 100644 index 0000000..ec54aef --- /dev/null +++ b/storage-logzio/src/test/java/LogzioSpanStoreTest.java @@ -0,0 +1,86 @@ +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Span; +import zipkin2.elasticsearch.internal.client.HttpCall; +import zipkin2.storage.logzio.BodyConverters; +import zipkin2.storage.logzio.LogzioSpanStore; +import zipkin2.storage.logzio.LogzioStorage; +import zipkin2.storage.logzio.LogzioStorageParams; +import zipkin2.storage.logzio.client.SearchCallFactory; +import zipkin2.storage.logzio.client.SearchRequest; + +import java.io.IOException; +import java.util.List; + + +public class LogzioSpanStoreTest { + private static MockWebServer mockWebServer = new MockWebServer(); + + private static final Logger logger = LoggerFactory.getLogger(LogzioSpanStoreTest.class); + + private static String apiToken = "not-a-real-api-token"; + private static LogzioStorageParams params = new LogzioStorageParams(); + private static LogzioStorage storage; + private static LogzioSpanStore spanStore; + + @BeforeClass + public static void setup() { + logger.info(LogzioStorage.ZIPKIN_LOGZIO_STORAGE_MSG + "Setting up test environment"); + try { + mockWebServer.start(8123); + } catch (IOException e) { + e.printStackTrace(); + } + params.setApiToken(apiToken); + params.setSearchApiUrl("http://localhost:8123"); + params.getConsumerParams().setAccountToken(""); + } + + @AfterClass + public static void close() throws Exception { + mockWebServer.close(); + storage.close(); + } + + @Test + public void doesntTruncateTraceIdByDefault() throws Exception { + params.setStrictTraceId(true); + storage = LogzioStorage.newBuilder().config(params).build(); + spanStore = new LogzioSpanStore(storage, apiToken); + mockWebServer.enqueue(new MockResponse()); + spanStore.getTrace("48fec942f3e78b893041d36dc43227fd").execute(); + RecordedRequest request = mockWebServer.takeRequest(); + String body = request.getBody().readUtf8(); + Assert.assertTrue(body.contains("\"traceId\":\"48fec942f3e78b893041d36dc43227fd\"")); + } + + @Test + public void truncatesTraceIdTo16CharsWhenNtStrict() throws Exception { + params.setStrictTraceId(false); + storage = LogzioStorage.newBuilder().config(params).build(); + spanStore = new LogzioSpanStore(storage, apiToken); + mockWebServer.enqueue(new MockResponse()); + spanStore.getTrace("48fec942f3e78b893041d36dc43227fd").execute(); + + Assert.assertTrue(mockWebServer.takeRequest().getBody().readUtf8().contains("\"traceId\":\"3041d36dc43227fd\"")); + } + + @Test + public void newHttpCallHeaderTest() { + storage = LogzioStorage.newBuilder().config(params).build(); + spanStore = new LogzioSpanStore(storage, apiToken); + SearchCallFactory searchCallFactory = new SearchCallFactory(storage.http(), apiToken); + HttpCall> call = searchCallFactory.newCall(SearchRequest.create(), BodyConverters.SPANS); + Assert.assertEquals(call.call.request().header(SearchCallFactory.API_TOKEN_HEADER), apiToken); + } + +} + +