diff --git a/LICENSES.txt b/LICENSES.txt index 81d5c8d0c..44e82361c 100644 --- a/LICENSES.txt +++ b/LICENSES.txt @@ -36,7 +36,7 @@ Apache-2.0 commons-configuration2-2.9.0.jar commons-csv-1.9.0.jar commons-daemon-1.0.13.jar - commons-io-2.13.0.jar + commons-io-2.14.0.jar commons-lang3-3.13.0.jar commons-logging-1.2.jar commons-math3-3.6.1.jar @@ -446,7 +446,7 @@ BSD-2-Clause jline-3.9.0.jar jsch-0.1.55.jar stax2-api-4.2.1.jar - zstd-jni-1.5.5-5.jar + zstd-jni-1.5.5-6.jar ------------------------------------------------------------------------------ Copyright @@ -475,11 +475,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ------------------------------------------------------------------------------ BSD-3-Clause asm-9.3.jar - asm-9.5.jar - asm-analysis-9.5.jar + asm-9.6.jar + asm-analysis-9.6.jar asm-commons-5.0.3.jar - asm-tree-9.5.jar - asm-util-9.5.jar + asm-tree-9.6.jar + asm-util-9.6.jar hamcrest-2.2.jar hamcrest-core-2.2.jar hamcrest-library-2.2.jar diff --git a/NOTICE.txt b/NOTICE.txt index 2d4efe22c..e9d939a35 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -66,7 +66,7 @@ Apache-2.0 commons-configuration2-2.9.0.jar commons-csv-1.9.0.jar commons-daemon-1.0.13.jar - commons-io-2.13.0.jar + commons-io-2.14.0.jar commons-lang3-3.13.0.jar commons-logging-1.2.jar commons-math3-3.6.1.jar @@ -278,15 +278,15 @@ BSD-2-Clause jline-3.9.0.jar jsch-0.1.55.jar stax2-api-4.2.1.jar - zstd-jni-1.5.5-5.jar + zstd-jni-1.5.5-6.jar BSD-3-Clause asm-9.3.jar - asm-9.5.jar - asm-analysis-9.5.jar + asm-9.6.jar + asm-analysis-9.6.jar asm-commons-5.0.3.jar - asm-tree-9.5.jar - asm-util-9.5.jar + asm-tree-9.6.jar + asm-util-9.6.jar hamcrest-2.2.jar hamcrest-core-2.2.jar hamcrest-library-2.2.jar diff --git a/core/build.gradle b/core/build.gradle index 48e9a3d25..1dcbc0518 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -55,15 +55,11 @@ dependencies { exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' exclude group: 'io.netty', module: 'netty-common' + exclude group: 'io.netty', module: 'netty-buffer' } implementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0', arrowExclusions implementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0', arrowExclusions - implementation group: 'io.netty', name: 'netty-buffer', { - version { - strictly '4.1.99.Final' - } - } // These will be dependencies not packaged with the .jar // They need to be provided either through the database or in an extra .jar diff --git a/core/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/core/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java deleted file mode 100644 index 7efaca04f..000000000 --- a/core/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.buffer; - -import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; - -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.arrow.memory.OutOfMemoryException; -import org.apache.arrow.memory.util.LargeMemoryUtil; - -import io.netty.util.internal.OutOfDirectMemoryError; -import io.netty.util.internal.StringUtil; - -/** - * The base allocator that we use for all of Arrow's memory management. Returns - * UnsafeDirectLittleEndian buffers. - */ -public class PooledByteBufAllocatorL { - - private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); - - private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; - public final UnsafeDirectLittleEndian empty; - private final AtomicLong hugeBufferSize = new AtomicLong(0); - private final AtomicLong hugeBufferCount = new AtomicLong(0); - private final AtomicLong normalBufferSize = new AtomicLong(0); - private final AtomicLong normalBufferCount = new AtomicLong(0); - private final InnerAllocator allocator; - - public PooledByteBufAllocatorL() { - allocator = new InnerAllocator(); - empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); - } - - /** - * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size. - */ - public UnsafeDirectLittleEndian allocate(long size) { - try { - return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE); - } catch (OutOfMemoryError e) { - /* - * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by - * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by - * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice - * as Netty is expected to throw an OutOfDirectMemoryError first. - */ - if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) { - throw new OutOfMemoryException("Failure allocating buffer.", e); - } - throw e; - } - } - - public int getChunkSize() { - return allocator.chunkSize; - } - - public long getHugeBufferSize() { - return hugeBufferSize.get(); - } - - public long getHugeBufferCount() { - return hugeBufferCount.get(); - } - - public long getNormalBufferSize() { - return normalBufferSize.get(); - } - - public long getNormalBufferCount() { - return normalBufferSize.get(); - } - - private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { - - private final long initialCapacity; - private final AtomicLong count; - private final AtomicLong size; - - private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { - super(buf); - this.initialCapacity = buf.capacity(); - this.count = count; - this.size = size; - } - - private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, - AtomicLong size) { - super(buf); - this.initialCapacity = buf.capacity(); - this.count = count; - this.size = size; - } - - @Override - public ByteBuf copy() { - throw new UnsupportedOperationException("copy method is not supported"); - } - - @Override - public ByteBuf copy(int index, int length) { - throw new UnsupportedOperationException("copy method is not supported"); - } - - @Override - public boolean release(int decrement) { - boolean released = super.release(decrement); - if (released) { - count.decrementAndGet(); - size.addAndGet(-initialCapacity); - } - return released; - } - - } - - private class InnerAllocator extends PooledByteBufAllocator { - - private final PoolArena[] directArenas; - private final MemoryStatusThread statusThread; - private final int chunkSize; - - public InnerAllocator() { - super(true); - - try { - Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); - f.setAccessible(true); - this.directArenas = (PoolArena[]) f.get(this); - } catch (Exception e) { - throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); - } - - this.chunkSize = directArenas[0].chunkSize; - - if (memoryLogger.isTraceEnabled()) { - statusThread = new MemoryStatusThread(); - statusThread.start(); - } else { - statusThread = null; - } - } - - private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { - PoolThreadCache cache = threadCache(); - PoolArena directArena = cache.directArena; - - if (directArena != null) { - - if (initialCapacity > directArena.chunkSize) { - // This is beyond chunk size so we'll allocate separately. - ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); - - hugeBufferSize.addAndGet(buf.capacity()); - hugeBufferCount.incrementAndGet(); - - // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); - return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, - hugeBufferSize); - } else { - // within chunk, use arena. - ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); - if (!(buf instanceof PooledUnsafeDirectByteBuf)) { - fail(); - } - - if (!ASSERT_ENABLED) { - return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); - } - - normalBufferSize.addAndGet(buf.capacity()); - normalBufferCount.incrementAndGet(); - - return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, - normalBufferCount, normalBufferSize); - } - - } else { - throw fail(); - } - } - - private UnsupportedOperationException fail() { - return new UnsupportedOperationException( - "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + - "didn't provide that functionality."); - } - - @Override - public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { - if (initialCapacity == 0 && maxCapacity == 0) { - newDirectBuffer(initialCapacity, maxCapacity); - } - validate(initialCapacity, maxCapacity); - return newDirectBufferL(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - throw new UnsupportedOperationException("Arrow doesn't support using heap buffers."); - } - - - private void validate(int initialCapacity, int maxCapacity) { - if (initialCapacity < 0) { - throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)"); - } - if (initialCapacity > maxCapacity) { - throw new IllegalArgumentException(String.format( - "initialCapacity: %d (expected: not greater than maxCapacity(%d)", - initialCapacity, maxCapacity)); - } - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append(directArenas.length); - buf.append(" direct arena(s):"); - buf.append(StringUtil.NEWLINE); - for (PoolArena a : directArenas) { - buf.append(a); - } - - buf.append("Large buffers outstanding: "); - buf.append(hugeBufferCount.get()); - buf.append(" totaling "); - buf.append(hugeBufferSize.get()); - buf.append(" bytes."); - buf.append('\n'); - buf.append("Normal buffers outstanding: "); - buf.append(normalBufferCount.get()); - buf.append(" totaling "); - buf.append(normalBufferSize.get()); - buf.append(" bytes."); - return buf.toString(); - } - - private class MemoryStatusThread extends Thread { - - public MemoryStatusThread() { - super("allocation.logger"); - this.setDaemon(true); - } - - @Override - public void run() { - while (true) { - memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this); - try { - Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); - } catch (InterruptedException e) { - return; - } - } - } - } - - - } -} diff --git a/licenses-source-header.gradle b/licenses-source-header.gradle index 95781a4c6..cc81663b5 100644 --- a/licenses-source-header.gradle +++ b/licenses-source-header.gradle @@ -11,7 +11,6 @@ allprojects { proj -> java = 'SLASHSTAR_STYLE' } // exclude 'test/resources/**' - exclude '**/PooledByteBufAllocatorL.java' include '**/*.java' } tasks.check.dependsOn tasks.license