Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1757822: Allow JDBC to handle ZSTD decompression #1932

Merged
merged 8 commits into from
Oct 28, 2024
4 changes: 2 additions & 2 deletions FIPS/scripts/check_content.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/bash -e

# scripts used to check if all dependency is shaded into snowflake internal path
# scripts used to check if all dependencies are shaded into snowflake internal path

set -o pipefail

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

if jar tvf $DIR/../target/snowflake-jdbc-fips.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types; then
if jar tvf $DIR/../target/snowflake-jdbc-fips.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types | grep -v -E "^com/github/" | grep -v -E "^aix/" | grep -v -E "^darwin/" | grep -v -E "^freebsd/" | grep -v -E "^linux/" | grep -v -E "^win/"; then
echo "[ERROR] JDBC jar includes class not under the snowflake namespace"
exit 1
fi
6 changes: 3 additions & 3 deletions ci/scripts/check_content.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ set -o pipefail

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types; then
if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types | grep -v -E "^com/github/" | grep -v -E "^aix/" | grep -v -E "^darwin/" | grep -v -E "^freebsd/" | grep -v -E "^linux/" | grep -v -E "^win/"; then
echo "[ERROR] JDBC jar includes class not under the snowflake namespace"
exit 1
fi

if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -E "^META-INF/versions/.*.class" | grep -v -E "^META-INF/versions/.*/(net|com)/snowflake"; then
echo "[ERROR] JDBC jar includes multi release classes not under the snowflake namespace"
if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -E "^META-INF/versions/.*.class" | grep -v -E "^META-INF/versions/.*/(net|com)/snowflake" | grep -v -E "^META-INF/versions/.*/com/github" | grep -v -E "^aix/" | grep -v -E "^darwin/" | grep -v -E "^freebsd/" | grep -v -E "^linux/" | grep -v -E "^win/"; then
echo "[ERROR] JDBC jar includes multi-release classes not under the snowflake namespace"
exit 1
fi
5 changes: 0 additions & 5 deletions linkage-checker-exclusion-rules.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
<Source><Package name="org.apache.commons.compress.compressors"/></Source>
<Reason>Optional</Reason>
</LinkageError>
<LinkageError>
<Target><Package name="com.github.luben.zstd"/></Target>
<Source><Package name="org.apache.commons.compress.compressors"/></Source>
<Reason>Optional</Reason>
</LinkageError>
<LinkageError>
<Target><Package name="com.google.appengine.api.urlfetch"/></Target>
<Source><Package name="com.google.api.client.extensions.appengine"/></Source>
Expand Down
10 changes: 10 additions & 0 deletions parent-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<apache.commons.text.version>1.10.0</apache.commons.text.version>
<apache.httpclient.version>4.5.14</apache.httpclient.version>
<apache.httpcore.version>4.4.16</apache.httpcore.version>
<zstd-jni.version>1.5.6-5</zstd-jni.version>
<arrow.version>17.0.0</arrow.version>
<asm.version>9.3</asm.version>
<avro.version>1.8.1</avro.version>
Expand Down Expand Up @@ -327,6 +328,11 @@
<artifactId>httpcore</artifactId>
<version>${apache.httpcore.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
Expand Down Expand Up @@ -650,6 +656,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package net.snowflake.client.jdbc;

import static net.snowflake.client.core.Constants.MB;
import static net.snowflake.common.core.FileCompressionType.GZIP;
import static net.snowflake.common.core.FileCompressionType.ZSTD;

import com.github.luben.zstd.ZstdInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.common.core.SqlState;
import org.apache.http.Header;

@SnowflakeJdbcInternalApi
public class CompressedStreamFactory {
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved

private static final int STREAM_BUFFER_SIZE = MB;

public InputStream createBasedOnEncodingHeader(InputStream is, Header encoding)
throws IOException, SnowflakeSQLException {
InputStream inputStream = is; // Determine the format of the response, if it is not
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
// either plain text or gzip, raise an error.
if (encoding != null) {
if (GZIP.name().equalsIgnoreCase(encoding.getValue())) {
/* specify buffer size for GZIPInputStream */
inputStream = new GZIPInputStream(is, STREAM_BUFFER_SIZE);
} else if (ZSTD.name().equalsIgnoreCase(encoding.getValue())) {
inputStream = new ZstdInputStream(is);
} else {
throw new SnowflakeSQLException(
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
"Exception: unexpected compression got " + encoding.getValue());
}
} else {
inputStream = detectGzipAndGetStream(is);
}

return inputStream;
}

private InputStream detectGzipAndGetStream(InputStream is) throws IOException {
PushbackInputStream pb = new PushbackInputStream(is, 2);
byte[] signature = new byte[2];
int len = pb.read(signature);
pb.unread(signature, 0, len);
// https://tools.ietf.org/html/rfc1952
if (signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b) {
return new GZIPInputStream(pb);
} else {
return pb;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package net.snowflake.client.jdbc;

import static net.snowflake.client.core.Constants.MB;

import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.ExecTimeTelemetryData;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.log.ArgSupplier;
Expand All @@ -34,7 +30,11 @@ public class DefaultResultStreamProvider implements ResultStreamProvider {
// SSE-C algorithm value
private static final String SSE_C_AES = "AES256";

private static final int STREAM_BUFFER_SIZE = MB;
private CompressedStreamFactory compressedStreamFactory;

public DefaultResultStreamProvider() {
this.compressedStreamFactory = new CompressedStreamFactory();
}

@Override
public InputStream getInputStream(ChunkDownloadContext context) throws Exception {
Expand Down Expand Up @@ -71,9 +71,11 @@ public InputStream getInputStream(ChunkDownloadContext context) throws Exception

InputStream inputStream;
final HttpEntity entity = response.getEntity();
Header encoding = response.getFirstHeader("Content-Encoding");
try {
// read the chunk data
inputStream = detectContentEncodingAndGetInputStream(response, entity.getContent());
// create stream based on compression type
inputStream =
compressedStreamFactory.createBasedOnEncodingHeader(entity.getContent(), encoding);
} catch (Exception ex) {
logger.error("Failed to decompress data: {}", response);

Expand Down Expand Up @@ -143,39 +145,4 @@ else if (context.getQrmk() != null) {
response);
return response;
}

private InputStream detectContentEncodingAndGetInputStream(HttpResponse response, InputStream is)
throws IOException, SnowflakeSQLException {
InputStream inputStream = is; // Determine the format of the response, if it is not
// either plain text or gzip, raise an error.
Header encoding = response.getFirstHeader("Content-Encoding");
if (encoding != null) {
if ("gzip".equalsIgnoreCase(encoding.getValue())) {
/* specify buffer size for GZIPInputStream */
inputStream = new GZIPInputStream(is, STREAM_BUFFER_SIZE);
} else {
throw new SnowflakeSQLException(
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
"Exception: unexpected compression got " + encoding.getValue());
}
} else {
inputStream = detectGzipAndGetStream(is);
}

return inputStream;
}

public static InputStream detectGzipAndGetStream(InputStream is) throws IOException {
PushbackInputStream pb = new PushbackInputStream(is, 2);
byte[] signature = new byte[2];
int len = pb.read(signature);
pb.unread(signature, 0, len);
// https://tools.ietf.org/html/rfc1952
if (signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b) {
return new GZIPInputStream(pb);
} else {
return pb;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package net.snowflake.client.jdbc;

import static org.junit.Assert.assertEquals;

import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.junit.Test;

public class CompressedStreamFactoryTest {

private final CompressedStreamFactory factory = new CompressedStreamFactory();

@Test
public void testDetectContentEncodingAndGetInputStream_Gzip() throws Exception {
// Original data to compress and validate
String originalData = "Some data in GZIP";

// Creating encoding header
Header encodingHeader = new BasicHeader("Content-Encoding", "gzip");

// Creating a gzip byte array using GZIPOutputStream
byte[] gzipData;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(originalData.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.close(); // close to flush and finish the compression
gzipData = byteArrayOutputStream.toByteArray();
}

// Mocking input stream with the gzip data
InputStream gzipStream = new ByteArrayInputStream(gzipData);

// Call the private method using reflection
InputStream resultStream = factory.createBasedOnEncodingHeader(gzipStream, encodingHeader);

// Decompress and validate the data matches original
ByteArrayOutputStream decompressedOutput = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
try (GZIPInputStream gzipInputStream = (GZIPInputStream) resultStream) {
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
while ((bytesRead = gzipInputStream.read(buffer)) != -1) {
decompressedOutput.write(buffer, 0, bytesRead);
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
}
}
String decompressedData = new String(decompressedOutput.toByteArray(), StandardCharsets.UTF_8);

assertEquals(originalData, decompressedData);
}

@Test
public void testDetectContentEncodingAndGetInputStream_Zstd() throws Exception {
// Original data to compress and validate
String originalData = "Some data in ZSTD";

// Creating encoding header
Header encodingHeader = new BasicHeader("Content-Encoding", "zstd");

// Creating a zstd byte array using ZstdOutputStream
byte[] zstdData;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ZstdOutputStream zstdOutputStream = new ZstdOutputStream(byteArrayOutputStream)) {
zstdOutputStream.write(originalData.getBytes(StandardCharsets.UTF_8));
zstdOutputStream.close(); // close to flush and finish the compression
zstdData = byteArrayOutputStream.toByteArray();
}

// Mocking input stream with the zstd data
InputStream zstdStream = new ByteArrayInputStream(zstdData);

// Call the private method using reflection
InputStream resultStream = factory.createBasedOnEncodingHeader(zstdStream, encodingHeader);

// Decompress and validate the data matches original
ByteArrayOutputStream decompressedOutput = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
try (ZstdInputStream zstdInputStream = (ZstdInputStream) resultStream) {
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
while ((bytesRead = zstdInputStream.read(buffer)) != -1) {
decompressedOutput.write(buffer, 0, bytesRead);
sfc-gh-dheyman marked this conversation as resolved.
Show resolved Hide resolved
}
}
String decompressedData = new String(decompressedOutput.toByteArray(), StandardCharsets.UTF_8);

assertEquals(originalData, decompressedData);
}
}
6 changes: 6 additions & 0 deletions thin_public_pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<slf4j.version>2.0.13</slf4j.version>
<threeten.version>1.6.9</threeten.version>
<zstd-jni.version>1.5.6-5</zstd-jni.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -267,6 +268,11 @@
<artifactId>jsoup</artifactId>
<version>${jsoup.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Loading