Skip to content

Commit

Permalink
Merge branch 'apache:master' into miland-db/string-repeat-function
Browse files Browse the repository at this point in the history
  • Loading branch information
miland-db authored Mar 25, 2024
2 parents 702c90f + 1b55fd3 commit ec66d04
Show file tree
Hide file tree
Showing 279 changed files with 9,282 additions and 7,911 deletions.
7 changes: 5 additions & 2 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ DSTREAM:
- any-glob-to-any-file: [
'streaming/**/*',
'data/streaming/**/*',
'connector/kinesis*',
'connector/kafka*',
'connector/kinesis-asl/**/*',
'connector/kinesis-asl-assembly/**/*',
'connector/kafka-0-10/**/*',
'connector/kafka-0-10-assembly/**/*',
'connector/kafka-0-10-token-provider/**/*',
'python/pyspark/streaming/**/*'
]

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name: Publish Snapshot

on:
schedule:
- cron: '0 0 * * *'
- cron: '0 0,12 * * *'
workflow_dispatch:
inputs:
branch:
Expand Down
1 change: 0 additions & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ commons-cli:commons-cli
commons-dbcp:commons-dbcp
commons-io:commons-io
commons-lang:commons-lang
commons-logging:commons-logging
commons-net:commons-net
commons-pool:commons-pool
io.fabric8:zjsonpatch
Expand Down
8 changes: 0 additions & 8 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,6 @@ benchmarking framework, which can be obtained at:
* HOMEPAGE:
* https://github.com/google/caliper

This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:

* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/

This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:

Expand Down
1 change: 0 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ exportMethods("%<=>%",
"map_keys",
"map_values",
"map_zip_with",
"map_sort",
"max",
"max_by",
"md5",
Expand Down
17 changes: 0 additions & 17 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -4552,23 +4552,6 @@ setMethod("map_zip_with",
)
})

#' @details
#' \code{map_sort}: Sorts the input map in ascending or descending order according to
#' the natural ordering of the map keys.
#'
#' @rdname column_collection_functions
#' @param asc a logical flag indicating the sorting order.
#' TRUE, sorting is in ascending order.
#' FALSE, sorting is in descending order.
#' @aliases map_sort map_sort,Column-method
#' @note map_sort since 4.0.0
setMethod("map_sort",
signature(x = "Column"),
function(x, asc = TRUE) {
jc <- callJStatic("org.apache.spark.sql.functions", "map_sort", x@jc, asc)
column(jc)
})

#' @details
#' \code{element_at}: Returns element of array at given index in \code{extraction} if
#' \code{x} is array. Returns value for the given key in \code{extraction} if \code{x} is map.
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1224,10 +1224,6 @@ setGeneric("map_values", function(x) { standardGeneric("map_values") })
#' @name NULL
setGeneric("map_zip_with", function(x, y, f) { standardGeneric("map_zip_with") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("map_sort", function(x, asc = TRUE) { standardGeneric("map_sort") })

#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("max_by", function(x, y) { standardGeneric("max_by") })
Expand Down
6 changes: 0 additions & 6 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1648,12 +1648,6 @@ test_that("column functions", {
expected_entries <- list(as.environment(list(x = 1, y = 2, a = 3, b = 4)))
expect_equal(result, expected_entries)

# Test map_sort
df <- createDataFrame(list(list(map1 = as.environment(list(c = 3, a = 1, b = 2)))))
result <- collect(select(df, map_sort(df[[1]])))[[1]]
expected_entries <- list(as.environment(list(a = 1, b = 2, c = 3)))
expect_equal(result, expected_entries)

# Test map_entries(), map_keys(), map_values() and element_at()
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
result <- collect(select(df, map_entries(df$map)))[[1]]
Expand Down
2 changes: 1 addition & 1 deletion assembly/README
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command

If you need to build an assembly for a different version of Hadoop the
hadoop-version system property needs to be set as in this example:
-Dhadoop.version=3.3.6
-Dhadoop.version=3.4.0
6 changes: 6 additions & 0 deletions common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-variant_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import org.apache.spark.unsafe.Platform;
import org.apache.spark.types.variant.Variant;

import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -104,8 +105,7 @@ public String debugString() {
*/
@Override
public String toString() {
// NOTE: the encoding is not yet implemented, this is not the final implementation.
return new String(value);
return new Variant(value, metadata).toJson();
}

/**
Expand Down
24 changes: 18 additions & 6 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@
},
"UNRELEASED_THREAD_ERROR" : {
"message" : [
"<loggingId>: RocksDB instance could not be acquired by <newAcquiredThreadInfo> as it was not released by <acquiredThreadInfo> after <timeWaitedMs> ms.",
"<loggingId>: RocksDB instance could not be acquired by <newAcquiredThreadInfo> for operationType=<operationType> as it was not released by <acquiredThreadInfo> after <timeWaitedMs> ms.",
"Thread holding the lock has trace: <stackTraceOutput>"
]
}
Expand Down Expand Up @@ -2876,6 +2876,12 @@
},
"sqlState" : "22023"
},
"MALFORMED_VARIANT" : {
"message" : [
"Variant binary is malformed. Please check the data source is valid."
],
"sqlState" : "22023"
},
"MERGE_CARDINALITY_VIOLATION" : {
"message" : [
"The ON search condition of the MERGE statement matched a single row from the target table with multiple rows of the source table.",
Expand Down Expand Up @@ -4345,6 +4351,11 @@
"Can't insert into the target."
],
"subClass" : {
"MULTI_PATH" : {
"message" : [
"Can only write data to relations with a single path but given paths are <paths>."
]
},
"NOT_ALLOWED" : {
"message" : [
"The target relation <relationId> does not allow insertion."
Expand Down Expand Up @@ -4550,6 +4561,12 @@
],
"sqlState" : "42883"
},
"VARIANT_CONSTRUCTOR_SIZE_LIMIT" : {
"message" : [
"Cannot construct a Variant larger than 16 MiB. The maximum allowed size of a Variant value is 16 MiB."
],
"sqlState" : "22023"
},
"VARIANT_SIZE_LIMIT" : {
"message" : [
"Cannot build variant bigger than <sizeLimit> in <functionName>.",
Expand Down Expand Up @@ -5269,11 +5286,6 @@
"The ordering of partition columns is <partColumns>. All partition columns having constant values need to appear before other partition columns that do not have an assigned constant value."
]
},
"_LEGACY_ERROR_TEMP_1148" : {
"message" : [
"Can only write data to relations with a single path."
]
},
"_LEGACY_ERROR_TEMP_1149" : {
"message" : [
"Fail to rebuild expression: missing key <filter> in `translatedFilterToExpr`."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[spark] object Logging {
val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
// jul-to-slf4j bridge order to route their logs to JUL.
val bridgeClass = SparkClassUtils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.types.variant;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;

import java.io.CharArrayWriter;
import java.io.IOException;

import static org.apache.spark.types.variant.VariantUtil.*;

/**
* This class is structurally equivalent to {@link org.apache.spark.unsafe.types.VariantVal}. We
* define a new class to avoid depending on or modifying Spark.
*/
public final class Variant {
private final byte[] value;
private final byte[] metadata;

public Variant(byte[] value, byte[] metadata) {
this.value = value;
this.metadata = metadata;
// There is currently only one allowed version.
if (metadata.length < 1 || (metadata[0] & VERSION_MASK) != VERSION) {
throw malformedVariant();
}
// Don't attempt to use a Variant larger than 16 MiB. We'll never produce one, and it risks
// memory instability.
if (metadata.length > SIZE_LIMIT || value.length > SIZE_LIMIT) {
throw variantConstructorSizeLimit();
}
}

public byte[] getValue() {
return value;
}

public byte[] getMetadata() {
return metadata;
}

// Stringify the variant in JSON format.
// Throw `MALFORMED_VARIANT` if the variant is malformed.
public String toJson() {
StringBuilder sb = new StringBuilder();
toJsonImpl(value, metadata, 0, sb);
return sb.toString();
}

// Escape a string so that it can be pasted into JSON structure.
// For example, if `str` only contains a new-line character, then the result content is "\n"
// (4 characters).
static String escapeJson(String str) {
try (CharArrayWriter writer = new CharArrayWriter();
JsonGenerator gen = new JsonFactory().createGenerator(writer)) {
gen.writeString(str);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

static void toJsonImpl(byte[] value, byte[] metadata, int pos, StringBuilder sb) {
switch (VariantUtil.getType(value, pos)) {
case OBJECT:
handleObject(value, pos, (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> {
sb.append('{');
for (int i = 0; i < size; ++i) {
int id = readUnsigned(value, idStart + idSize * i, idSize);
int offset = readUnsigned(value, offsetStart + offsetSize * i, offsetSize);
int elementPos = dataStart + offset;
if (i != 0) sb.append(',');
sb.append(escapeJson(getMetadataKey(metadata, id)));
sb.append(':');
toJsonImpl(value, metadata, elementPos, sb);
}
sb.append('}');
return null;
});
break;
case ARRAY:
handleArray(value, pos, (size, offsetSize, offsetStart, dataStart) -> {
sb.append('[');
for (int i = 0; i < size; ++i) {
int offset = readUnsigned(value, offsetStart + offsetSize * i, offsetSize);
int elementPos = dataStart + offset;
if (i != 0) sb.append(',');
toJsonImpl(value, metadata, elementPos, sb);
}
sb.append(']');
return null;
});
break;
case NULL:
sb.append("null");
break;
case BOOLEAN:
sb.append(VariantUtil.getBoolean(value, pos));
break;
case LONG:
sb.append(VariantUtil.getLong(value, pos));
break;
case STRING:
sb.append(escapeJson(VariantUtil.getString(value, pos)));
break;
case DOUBLE:
sb.append(VariantUtil.getDouble(value, pos));
break;
case DECIMAL:
sb.append(VariantUtil.getDecimal(value, pos).toPlainString());
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.variant;
package org.apache.spark.types.variant;

import java.io.IOException;
import java.math.BigDecimal;
Expand All @@ -32,7 +32,7 @@
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.exc.InputCoercionException;

import static org.apache.spark.variant.VariantUtil.*;
import static org.apache.spark.types.variant.VariantUtil.*;

/**
* Build variant value and metadata by parsing JSON values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.variant;
package org.apache.spark.types.variant;

/**
* An exception indicating that we are attempting to build a variant with it value or metadata
Expand Down
Loading

0 comments on commit ec66d04

Please sign in to comment.