From 4f96eae49d884f142c116c4e98cbcfa444b5a553 Mon Sep 17 00:00:00 2001 From: Maciej Podkomorzy Date: Tue, 18 Jan 2022 00:53:59 +0000 Subject: [PATCH 1/7] Applied JDK11 compatibility fix based on https://github.com/bulldog2011/bigqueue/issues/39 --- .../bigqueue/page/MappedPageImpl.java | 149 +++++++++--------- 1 file changed, 77 insertions(+), 72 deletions(-) diff --git a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java index 8286ca3..6276df7 100644 --- a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -2,51 +2,58 @@ import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.Method; +//import java.lang.reflect.Method; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sun.misc.Unsafe; public class MappedPageImpl implements IMappedPage, Closeable { - - private final static Logger logger = Logger.getLogger(MappedPageImpl.class); - + + private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); + private ThreadLocalByteBuffer threadLocalBuffer; private volatile boolean dirty = false; private volatile boolean closed = false; private String pageFile; private long index; - + + // patch for OpenJDK11 + private static Unsafe unsafe; + public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) { this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb); this.pageFile = pageFile; this.index = index; } - + public void close() throws IOException { synchronized(this) { if (closed) return; flush(); - + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); unmap(srcBuf); - + this.threadLocalBuffer = null; // hint GC - + closed = true; if (logger.isDebugEnabled()) { logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed."); } } } - + @Override public void setDirty(boolean dirty) { this.dirty = dirty; } - + @Override public void flush() { synchronized(this) { @@ -68,81 +75,79 @@ public byte[] getLocal(int position, int length) { buf.get(data); return data; } - + @Override public ByteBuffer getLocal(int position) { ByteBuffer buf = this.threadLocalBuffer.get(); buf.position(position); return buf; } - + private static void unmap(MappedByteBuffer buffer) { Cleaner.clean(buffer); } - - /** - * Helper class allowing to clean direct buffers. - */ - private static class Cleaner { - public static final boolean CLEAN_SUPPORTED; - private static final Method directBufferCleaner; - private static final Method directBufferCleanerClean; - - static { - Method directBufferCleanerX = null; - Method directBufferCleanerCleanX = null; - boolean v; - try { - directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner"); - directBufferCleanerX.setAccessible(true); - directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean"); - directBufferCleanerCleanX.setAccessible(true); - v = true; - } catch (Exception e) { - v = false; - } - CLEAN_SUPPORTED = v; - directBufferCleaner = directBufferCleanerX; - directBufferCleanerClean = directBufferCleanerCleanX; - } - - public static void clean(ByteBuffer buffer) { - if (buffer == null) return; - if (CLEAN_SUPPORTED && buffer.isDirect()) { - try { - Object cleaner = directBufferCleaner.invoke(buffer); - directBufferCleanerClean.invoke(cleaner); - } catch (Exception e) { - // silently ignore exception - } - } - } - } - - private static class ThreadLocalByteBuffer extends ThreadLocal { - private ByteBuffer _src; - - public ThreadLocalByteBuffer(ByteBuffer src) { - _src = src; - } - - public ByteBuffer getSourceBuffer() { - return _src; - } - - @Override - protected synchronized ByteBuffer initialValue() { - ByteBuffer dup = _src.duplicate(); - return dup; - } - } + + /** + * Helper class allowing to clean direct buffers. + */ + private static class Cleaner { + //public static final boolean CLEAN_SUPPORTED; + //private static final Method directBufferCleaner; + //private static final Method directBufferCleanerClean; + + static { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); + } + catch (Exception e) { + logger.warn("Unsafe is not supported on this platform", e.getMessage(), e); + } + } + + public static void clean(ByteBuffer buffer) { + if (buffer == null) return; + if (buffer.isDirect()) { + try { + if(unsafe != null) { + unsafe.invokeCleaner(buffer); + } + else { + logger.warn("Unable to clean the byte buffer!"); + } + } + catch (Exception e) { + // silently ignore exception + } + } + } + } + + private static class ThreadLocalByteBuffer extends ThreadLocal { + private ByteBuffer _src; + + public ThreadLocalByteBuffer(ByteBuffer src) { + _src = src; + } + + public ByteBuffer getSourceBuffer() { + return _src; + } + + @Override + protected synchronized ByteBuffer initialValue() { + ByteBuffer dup = _src.duplicate(); + return dup; + } + } @Override public boolean isClosed() { return closed; } - + public String toString() { return "Mapped page for " + this.pageFile + ", index = " + this.index + "."; } From 35a2ab985344f8c69c7c3b05b4e6fa83744a1fce Mon Sep 17 00:00:00 2001 From: Maciej Podkomorzy Date: Tue, 18 Jan 2022 01:00:56 +0000 Subject: [PATCH 2/7] Update ignore list --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c708c36..33c89b4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /.settings /.classpath /.project +/.idea From fc735b7f92b83aa6d0b91eca5b966ce6aed1ab07 Mon Sep 17 00:00:00 2001 From: Maciej Podkomorzy Date: Tue, 18 Jan 2022 01:22:38 +0000 Subject: [PATCH 3/7] Updated git ignore list again --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 33c89b4..f3b49c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target +/out /.settings /.classpath /.project From 7c6bc2a65f7287a2094defcbf3046bfedce578e5 Mon Sep 17 00:00:00 2001 From: Maciej Podkomorzy Date: Tue, 18 Jan 2022 01:31:05 +0000 Subject: [PATCH 4/7] Removed additional import of SLF4J --- .../java/com/leansoft/bigqueue/page/MappedPageImpl.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java index 6276df7..ad45799 100644 --- a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -2,19 +2,17 @@ import java.io.Closeable; import java.io.IOException; -//import java.lang.reflect.Method; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.Logger; import sun.misc.Unsafe; public class MappedPageImpl implements IMappedPage, Closeable { - private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); + private final static Logger logger = Logger.getLogger(MappedPageImpl.class); private ThreadLocalByteBuffer threadLocalBuffer; private volatile boolean dirty = false; @@ -103,7 +101,7 @@ private static class Cleaner { unsafe = (Unsafe) f.get(null); } catch (Exception e) { - logger.warn("Unsafe is not supported on this platform", e.getMessage(), e); + logger.warn("Unsafe is not supported on this platform: " + e.getMessage(), e); } } From aacd04e82016144b75081b2a37b76005164c13ea Mon Sep 17 00:00:00 2001 From: Maciej Podkomorzy Date: Tue, 18 Jan 2022 01:33:17 +0000 Subject: [PATCH 5/7] Bumped compiler Java version to 11 --- pom.xml | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 673b204..0ffe3ed 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.leansoft bigqueue - 0.7.1-SNAPSHOT + 0.7.2-RC1-JDK11 jar bigqueue @@ -48,8 +48,9 @@ - UTF-8 - UTF-8 + UTF-8 + UTF-8 + 11 @@ -81,8 +82,9 @@ org.apache.maven.plugins maven-compiler-plugin - 1.6 - 1.6 + UTF-8 + 11 + 11 UTF-8 @@ -94,6 +96,7 @@ + maven-jar-plugin 2.3.2 @@ -116,6 +120,7 @@ + org.apache.maven.plugins maven-assembly-plugin @@ -185,5 +191,5 @@ - + From 5d0d271d6f91ec36bcf6d9679ea87e5c78042d49 Mon Sep 17 00:00:00 2001 From: D4rkM4tterz <55996716+d4rkm4tterz@users.noreply.github.com> Date: Tue, 18 Jan 2022 01:34:50 +0000 Subject: [PATCH 6/7] Update README.md --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1ccf180..52ed772 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ A big, fast and persistent queue based on memory mapped file. ***Notice***, bigqueue is just a standalone library, for a high-throughput, persistent, distributed, publish-subscrible messaging system, please refer to [Luxun](https://github.com/bulldog2011/luxun), Luxun messaging system uses bigqueue internally as fast and persistent queue. -##Feature Highlight: +## Feature Highlight: 1. **Fast**: close to the speed of direct memory access, both enqueue and dequeue are close to O(1) memory access. 2. **Big**: the total size of the queue is only limited by the available disk space. 3. **Persistent**: all data in the queue is persisted on disk, and is crash resistant. @@ -18,12 +18,12 @@ A big, fast and persistent queue based on memory mapped file. ## The Big Picture -###Memory Mapped Sliding Window +### Memory Mapped Sliding Window ![design](http://bulldog2011.github.com/images/luxun/sliding_window.png) -##Performance Highlight: +## Performance Highlight: * In concurrent producing and consuming case, the average throughput is around ***166M bytes per second***. * In sequential producing then consuming case, the average throughput is around ***333M bytes per second***. @@ -32,7 +32,7 @@ Suppose the average message size is 1KB, then big queue can concurrently produci [here is a detailed performance report](https://github.com/bulldog2011/bigqueue/wiki/Performance-Test-Report) -##How to Use +## How to Use 1. Direct jar or source reference Download jar from repository mentioned in version history section below, latest stable release is [0.7.0](https://github.com/bulldog2011/bulldog-repo/tree/master/repo/releases/com/leansoft/bigqueue/0.7.0). ***Note*** : bigqueue depends on log4j, please also added log4j jar reference if you use bigqueue. @@ -51,7 +51,7 @@ Download jar from repository mentioned in version history section below, latest -##Docs +## Docs 1. [a simple design doc](http://bulldog2011.github.com/blog/2013/01/23/big-queue-design/) 2. [big queue tutorial](http://bulldog2011.github.com/blog/2013/01/24/big-queue-tutorial/) From 08b564e3495669ca3e08921bf1f37ee49c18f9c1 Mon Sep 17 00:00:00 2001 From: D4rkM4tterz <55996716+d4rkm4tterz@users.noreply.github.com> Date: Tue, 18 Jan 2022 01:35:30 +0000 Subject: [PATCH 7/7] Update README.md --- README.md | 65 ++++--------------------------------------------------- 1 file changed, 4 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 52ed772..4df074c 100644 --- a/README.md +++ b/README.md @@ -1,67 +1,11 @@ # Big Queue - A big, fast and persistent queue based on memory mapped file. -***Notice***, bigqueue is just a standalone library, for a high-throughput, persistent, distributed, publish-subscrible messaging system, please refer to [Luxun](https://github.com/bulldog2011/luxun), Luxun messaging system uses bigqueue internally as fast and persistent queue. - -## Feature Highlight: -1. **Fast**: close to the speed of direct memory access, both enqueue and dequeue are close to O(1) memory access. -2. **Big**: the total size of the queue is only limited by the available disk space. -3. **Persistent**: all data in the queue is persisted on disk, and is crash resistant. -4. **Reliable**: OS will be responsible to presist the produced messages even your process crashes. -5. **Realtime**: messages produced by producer threads will be immediately visible to consumer threads. -6. **Memory-efficient**: automatic paging & swapping algorithm, only most-recently accessed data is kept in memory. -7. **Thread-safe**: multiple threads can concurrently enqueue and dequeue without data corruption. -8. **Simple&Light-weight**: current number of source files is 12 and the library jar is less than 30K. - - -## The Big Picture - -### Memory Mapped Sliding Window - -![design](http://bulldog2011.github.com/images/luxun/sliding_window.png) - - -## Performance Highlight: -* In concurrent producing and consuming case, the average throughput is around ***166M bytes per second***. -* In sequential producing then consuming case, the average throughput is around ***333M bytes per second***. +For more information see https://github.com/bulldog2011/bigqueue -Suppose the average message size is 1KB, then big queue can concurrently producing and consuming -166K message per second. Basically, the throughput is only limited by disk IO bandwidth. - -[here is a detailed performance report](https://github.com/bulldog2011/bigqueue/wiki/Performance-Test-Report) - -## How to Use -1. Direct jar or source reference -Download jar from repository mentioned in version history section below, latest stable release is [0.7.0](https://github.com/bulldog2011/bulldog-repo/tree/master/repo/releases/com/leansoft/bigqueue/0.7.0). -***Note*** : bigqueue depends on log4j, please also added log4j jar reference if you use bigqueue. - -2. Maven reference - - - com.leansoft - bigqueue - 0.7.0 - - - - github.release.repo - https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/ - - - -## Docs - -1. [a simple design doc](http://bulldog2011.github.com/blog/2013/01/23/big-queue-design/) -2. [big queue tutorial](http://bulldog2011.github.com/blog/2013/01/24/big-queue-tutorial/) -3. [fanout queue tutorial](http://bulldog2011.github.com/blog/2013/03/25/fanout-queue-tutorial/) -4. [big array tutorial](http://bulldog2011.github.com/blog/2013/01/24/big-array-tutorial/) -5. [how to turn big queue into a thrift based queue service](http://bulldog2011.github.com/blog/2013/01/27/thrift-queue/) -6. [use case : producing and consuming 4TB log daily on one commodity machine](http://bulldog2011.github.com/blog/2013/01/28/log-collecting/) -7. [use case : sort and search 100GB data on a single commodity machine](http://bulldog2011.github.com/blog/2013/01/25/merge-sort-using-big-queue/) -8. [the architecture and design of a pub-sub messaging system tailored for big data collecting and analytics](http://bulldog2011.github.com/blog/2013/03/27/the-architecture-and-design-of-a-pub-sub-messaging-system/) -9. [a big, fast and persistent queue[ppt]](http://www.slideshare.net/yang75108/a-big-fast-and-persistent-queue) +# About this project/fork +The project is a fork of the original repo found under https://github.com/bulldog2011/bigqueue. The intent behind the fork was to make the library work with JDK version 11 for use in homebrew projects. ## Version History @@ -74,7 +18,7 @@ Download jar from repository mentioned in version history section below, latest * Initial version:) -##Copyright and License +## Copyright and License Copyright 2012 Leansoft Technology <51startup@sina.com> Licensed under the Apache License, Version 2.0 (the "License"); you may not use this work except in compliance with the License. You may obtain a copy of the License in the LICENSE file, or at: @@ -82,4 +26,3 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use [http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0) Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -