Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-1189] prepare for 1.5.0 release (#1194)
Browse files Browse the repository at this point in the history
* fix jni unload (#1180)

fix the seg fault on spark-shell exit

Signed-off-by: Yuan Zhou <[email protected]>

* [NSE-1191] fix AQE exchange reuse in Spark3.2 (#1192)

Fix AQE exchange reuse in Spark3.2

Note with this patch, the jars for Spark321 and Spark322 are separated.
- to build jar for Spark322 use -Pspark-3.2.2
- to build jar for Spark321 use -Pspark3.2.1

Signed-off-by: Yuan Zhou [email protected]

Signed-off-by: Yuan Zhou <[email protected]>
Signed-off-by: Yuan Zhou [email protected]
  • Loading branch information
zhouyuan authored Dec 16, 2022
1 parent a24906f commit 6ce6a55
Show file tree
Hide file tree
Showing 12 changed files with 1,903 additions and 3 deletions.
30 changes: 30 additions & 0 deletions gazelle-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2.1</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark32</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark321</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2.2</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark32</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark322</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
Expand Down
34 changes: 34 additions & 0 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,40 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2.1</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark32</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark321</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.2.2</id>
<dependencies>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark32</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-spark322</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package com.intel.oap.vectorized;

import org.apache.spark.util.GazelleShutdownManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.runtime.BoxedUnit;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLConnection;
Expand All @@ -30,9 +38,15 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Vector;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
Expand All @@ -53,6 +67,23 @@ public class JniUtils {
private static List<String> codegenJarsLoadedCache = new ArrayList<>();
private static volatile JniUtils INSTANCE;
private static String tmp_dir;
private static final Logger LOG =
LoggerFactory.getLogger(JniUtils.class);

public static Set<String> LOADED_LIBRARY_PATHS = new HashSet<>();
public static Set<String> REQUIRE_UNLOAD_LIBRARY_PATHS = new LinkedHashSet<>();

static {
GazelleShutdownManager.registerUnloadLibShutdownHook(new Function0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
List<String> loaded = new ArrayList<>(REQUIRE_UNLOAD_LIBRARY_PATHS);
Collections.reverse(loaded); // use reversed order to unload
loaded.forEach(JniUtils::unloadFromPath);
return BoxedUnit.UNIT;
}
});
}

public static JniUtils getInstance() throws IOException {
String tmp_dir = System.getProperty("java.io.tmpdir");
Expand Down Expand Up @@ -118,6 +149,19 @@ public void setJars(List<String> list_jars) throws IOException, IllegalAccessExc
}
}

private static synchronized void loadFromPath0(String libPath, boolean requireUnload) {
if (LOADED_LIBRARY_PATHS.contains(libPath)) {
LOG.debug("Library in path {} has already been loaded, skipping", libPath);
} else {
System.load(libPath);
LOADED_LIBRARY_PATHS.add(libPath);
LOG.info("Library {} has been loaded using path-loading method", libPath);
}
if (requireUnload) {
REQUIRE_UNLOAD_LIBRARY_PATHS.add(libPath);
}
}

static void loadLibraryFromJar(String tmp_dir) throws IOException, IllegalAccessException {
synchronized (JniUtils.class) {
if (tmp_dir == null) {
Expand All @@ -127,15 +171,18 @@ static void loadLibraryFromJar(String tmp_dir) throws IOException, IllegalAccess
Path arrowMiddleLink = createSoftLink(arrowlibraryFile, ARROW_PARENT_LIBRARY_NAME);
Path arrowShortLink = createSoftLink(new File(arrowMiddleLink.toString()), ARROW_PARENT_LIBRARY_SHORT);
System.load(arrowShortLink.toAbsolutePath().toString());
loadFromPath0(arrowShortLink.toAbsolutePath().toString(), true);

final File gandivalibraryFile = moveFileFromJarToTemp(tmp_dir, GANDIVA_LIBRARY_NAME);
Path gandivaMiddleLink = createSoftLink(gandivalibraryFile, GANDIVA_PARENT_LIBRARY_NAME);
Path gandivaShortLink = createSoftLink(new File(gandivaMiddleLink.toString()), GANDIVA_PARENT_LIBRARY_SHORT);
System.load(gandivaShortLink.toAbsolutePath().toString());
loadFromPath0(gandivaShortLink.toAbsolutePath().toString(), true);

final String libraryToLoad = System.mapLibraryName(LIBRARY_NAME);
final File libraryFile = moveFileFromJarToTemp(tmp_dir, libraryToLoad);
System.load(libraryFile.getAbsolutePath());
loadFromPath0(libraryFile.getAbsolutePath(), true);
}
}

Expand Down Expand Up @@ -282,4 +329,41 @@ public static void copyResourcesToDirectory(URLConnection urlConnection,
}
}
}

public static synchronized void unloadFromPath(String libPath) {
if (!LOADED_LIBRARY_PATHS.remove(libPath)) {
throw new IllegalStateException("Library not exist: " + libPath);
}

REQUIRE_UNLOAD_LIBRARY_PATHS.remove(libPath);

try {
while (Files.isSymbolicLink(Paths.get(libPath))) {
libPath = Files.readSymbolicLink(Paths.get(libPath)).toString();
}

ClassLoader classLoader = JniUtils.class.getClassLoader();
Field field = ClassLoader.class.getDeclaredField("nativeLibraries");
field.setAccessible(true);
Vector<Object> libs = (Vector<Object>) field.get(classLoader);
Iterator it = libs.iterator();
while (it.hasNext()) {
Object object = it.next();
Field[] fs = object.getClass().getDeclaredFields();
for (int k = 0; k < fs.length; k++) {
if (fs[k].getName().equals("name")) {
fs[k].setAccessible(true);
String verbosePath = fs[k].get(object).toString();
if (verbosePath.endsWith(libPath)) {
Method finalize = object.getClass().getDeclaredMethod("finalize");
finalize.setAccessible(true);
finalize.invoke(object);
}
}
}
}
} catch (Throwable th) {
LOG.error("Unload native library error: ", th);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

object GazelleShutdownManager {

def registerUnloadLibShutdownHook(hook: () => Unit): AnyRef = {
ShutdownHookManager.addShutdownHook(hook)
}
}

21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>spark-3.2.1</id>
<properties>
<spark.version>${spark321.version}</spark.version>
<scala.version>2.12.15</scala.version>
<!--Jackson may be directly used in future UT. Align with the version in spark 3.2.-->
<jackson.version>2.12.0</jackson.version>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>spark-3.2.2</id>
<properties>
<spark.version>${spark322.version}</spark.version>
<scala.version>2.12.15</scala.version>
<!--Jackson may be directly used in future UT. Align with the version in spark 3.2.-->
<jackson.version>2.12.0</jackson.version>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>hadoop-2.7.4</id>
<properties>
Expand Down Expand Up @@ -128,6 +148,7 @@
<spark.version>3.1.1</spark.version>
<spark311.version>3.1.1</spark311.version>
<spark321.version>3.2.1</spark321.version>
<spark322.version>3.2.2</spark322.version>
<!-- Scala 2.12.10 is the version for default spark 3.1 -->
<scala.version>2.12.10</scala.version>
<java.version>1.8</java.version>
Expand Down
20 changes: 20 additions & 0 deletions shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@
<module>spark321</module>
</modules>
</profile>
<profile>
<id>spark-3.2.1</id>
<properties>
</properties>
<modules>
<module>common</module>
<module>spark32</module>
<module>spark321</module>
</modules>
</profile>
<profile>
<id>spark-3.2.2</id>
<properties>
</properties>
<modules>
<module>common</module>
<module>spark32</module>
<module>spark322</module>
</modules>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import com.intel.oap.sql.shims.{SparkShims, SparkShimDescriptor}

object SparkShimProvider {
val DESCRIPTOR = SparkShimDescriptor(3, 2, 1)
val DESCRIPTOR_SPARK321 = SparkShimDescriptor(3, 2, 1)
val DESCRIPTOR_SPARK322 = SparkShimDescriptor(3, 2, 2)
val DESCRIPTOR_STRINGS = Seq(s"$DESCRIPTOR_SPARK321", s"$DESCRIPTOR_SPARK322")
val DESCRIPTOR_STRINGS = Seq(s"$DESCRIPTOR")
}

class SparkShimProvider extends com.intel.oap.sql.shims.SparkShimProvider {
Expand Down
Loading

0 comments on commit 6ce6a55

Please sign in to comment.