Skip to content
This repository has been archived by the owner on Sep 7, 2022. It is now read-only.

Applied JDK11 compatibility fix #1

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/target
/out
/.settings
/.classpath
/.project
/.idea
65 changes: 4 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,67 +1,11 @@
# Big Queue


A big, fast and persistent queue based on memory mapped file.

***Notice***, bigqueue is just a standalone library, for a high-throughput, persistent, distributed, publish-subscrible messaging system, please refer to [Luxun](https://github.com/bulldog2011/luxun), Luxun messaging system uses bigqueue internally as fast and persistent queue.

##Feature Highlight:
1. **Fast**: close to the speed of direct memory access, both enqueue and dequeue are close to O(1) memory access.
2. **Big**: the total size of the queue is only limited by the available disk space.
3. **Persistent**: all data in the queue is persisted on disk, and is crash resistant.
4. **Reliable**: OS will be responsible to presist the produced messages even your process crashes.
5. **Realtime**: messages produced by producer threads will be immediately visible to consumer threads.
6. **Memory-efficient**: automatic paging & swapping algorithm, only most-recently accessed data is kept in memory.
7. **Thread-safe**: multiple threads can concurrently enqueue and dequeue without data corruption.
8. **Simple&Light-weight**: current number of source files is 12 and the library jar is less than 30K.


## The Big Picture

###Memory Mapped Sliding Window

![design](http://bulldog2011.github.com/images/luxun/sliding_window.png)


##Performance Highlight:
* In concurrent producing and consuming case, the average throughput is around ***166M bytes per second***.
* In sequential producing then consuming case, the average throughput is around ***333M bytes per second***.
For more information see https://github.com/bulldog2011/bigqueue

Suppose the average message size is 1KB, then big queue can concurrently producing and consuming
166K message per second. Basically, the throughput is only limited by disk IO bandwidth.

[here is a detailed performance report](https://github.com/bulldog2011/bigqueue/wiki/Performance-Test-Report)

##How to Use
1. Direct jar or source reference
Download jar from repository mentioned in version history section below, latest stable release is [0.7.0](https://github.com/bulldog2011/bulldog-repo/tree/master/repo/releases/com/leansoft/bigqueue/0.7.0).
***Note*** : bigqueue depends on log4j, please also added log4j jar reference if you use bigqueue.

2. Maven reference

<dependency>
<groupId>com.leansoft</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.0</version>
</dependency>

<repository>
<id>github.release.repo</id>
<url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>


##Docs

1. [a simple design doc](http://bulldog2011.github.com/blog/2013/01/23/big-queue-design/)
2. [big queue tutorial](http://bulldog2011.github.com/blog/2013/01/24/big-queue-tutorial/)
3. [fanout queue tutorial](http://bulldog2011.github.com/blog/2013/03/25/fanout-queue-tutorial/)
4. [big array tutorial](http://bulldog2011.github.com/blog/2013/01/24/big-array-tutorial/)
5. [how to turn big queue into a thrift based queue service](http://bulldog2011.github.com/blog/2013/01/27/thrift-queue/)
6. [use case : producing and consuming 4TB log daily on one commodity machine](http://bulldog2011.github.com/blog/2013/01/28/log-collecting/)
7. [use case : sort and search 100GB data on a single commodity machine](http://bulldog2011.github.com/blog/2013/01/25/merge-sort-using-big-queue/)
8. [the architecture and design of a pub-sub messaging system tailored for big data collecting and analytics](http://bulldog2011.github.com/blog/2013/03/27/the-architecture-and-design-of-a-pub-sub-messaging-system/)
9. [a big, fast and persistent queue[ppt]](http://www.slideshare.net/yang75108/a-big-fast-and-persistent-queue)
# About this project/fork
The project is a fork of the original repo found under https://github.com/bulldog2011/bigqueue. The intent behind the fork was to make the library work with JDK version 11 for use in homebrew projects.

## Version History

Expand All @@ -74,12 +18,11 @@ Download jar from repository mentioned in version history section below, latest
* Initial version:)


##Copyright and License
## Copyright and License
Copyright 2012 Leansoft Technology <[email protected]>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this work except in compliance with the License. You may obtain a copy of the License in the LICENSE file, or at:

[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

18 changes: 12 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>com.leansoft</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.1-SNAPSHOT</version>
<version>0.7.2-RC1-JDK11</version>
<packaging>jar</packaging>

<name>bigqueue</name>
Expand Down Expand Up @@ -48,8 +48,9 @@
</distributionManagement>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.release>11</maven.compiler.release>
</properties>

<dependencies>
Expand Down Expand Up @@ -81,8 +82,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
<source>11</source>
<target>11</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
Expand All @@ -94,6 +96,7 @@
<resources />
</configuration>
</plugin>
<!-- removed for now
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand All @@ -107,6 +110,7 @@
</execution>
</executions>
</plugin>
-->
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
Expand All @@ -116,6 +120,7 @@
</archive>
</configuration>
</plugin>
<!-- removed for now
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
Expand All @@ -137,6 +142,7 @@
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down Expand Up @@ -185,5 +191,5 @@
</plugins>
</pluginManagement>
</build>

</project>
143 changes: 73 additions & 70 deletions src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,56 @@

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;

import org.apache.log4j.Logger;

import sun.misc.Unsafe;

public class MappedPageImpl implements IMappedPage, Closeable {

private final static Logger logger = Logger.getLogger(MappedPageImpl.class);

private ThreadLocalByteBuffer threadLocalBuffer;
private volatile boolean dirty = false;
private volatile boolean closed = false;
private String pageFile;
private long index;


// patch for OpenJDK11
private static Unsafe unsafe;

public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) {
this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb);
this.pageFile = pageFile;
this.index = index;
}

public void close() throws IOException {
synchronized(this) {
if (closed) return;

flush();

MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
unmap(srcBuf);

this.threadLocalBuffer = null; // hint GC

closed = true;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed.");
}
}
}

@Override
public void setDirty(boolean dirty) {
this.dirty = dirty;
}

@Override
public void flush() {
synchronized(this) {
Expand All @@ -68,81 +73,79 @@ public byte[] getLocal(int position, int length) {
buf.get(data);
return data;
}

@Override
public ByteBuffer getLocal(int position) {
ByteBuffer buf = this.threadLocalBuffer.get();
buf.position(position);
return buf;
}

private static void unmap(MappedByteBuffer buffer)
{
Cleaner.clean(buffer);
}

/**
* Helper class allowing to clean direct buffers.
*/
private static class Cleaner {
public static final boolean CLEAN_SUPPORTED;
private static final Method directBufferCleaner;
private static final Method directBufferCleanerClean;

static {
Method directBufferCleanerX = null;
Method directBufferCleanerCleanX = null;
boolean v;
try {
directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
directBufferCleanerX.setAccessible(true);
directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
directBufferCleanerCleanX.setAccessible(true);
v = true;
} catch (Exception e) {
v = false;
}
CLEAN_SUPPORTED = v;
directBufferCleaner = directBufferCleanerX;
directBufferCleanerClean = directBufferCleanerCleanX;
}

public static void clean(ByteBuffer buffer) {
if (buffer == null) return;
if (CLEAN_SUPPORTED && buffer.isDirect()) {
try {
Object cleaner = directBufferCleaner.invoke(buffer);
directBufferCleanerClean.invoke(cleaner);
} catch (Exception e) {
// silently ignore exception
}
}
}
}

private static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private ByteBuffer _src;

public ThreadLocalByteBuffer(ByteBuffer src) {
_src = src;
}

public ByteBuffer getSourceBuffer() {
return _src;
}

@Override
protected synchronized ByteBuffer initialValue() {
ByteBuffer dup = _src.duplicate();
return dup;
}
}

/**
* Helper class allowing to clean direct buffers.
*/
private static class Cleaner {
//public static final boolean CLEAN_SUPPORTED;
//private static final Method directBufferCleaner;
//private static final Method directBufferCleanerClean;

static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
}
catch (Exception e) {
logger.warn("Unsafe is not supported on this platform: " + e.getMessage(), e);
}
}

public static void clean(ByteBuffer buffer) {
if (buffer == null) return;
if (buffer.isDirect()) {
try {
if(unsafe != null) {
unsafe.invokeCleaner(buffer);
}
else {
logger.warn("Unable to clean the byte buffer!");
}
}
catch (Exception e) {
// silently ignore exception
}
}
}
}

private static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private ByteBuffer _src;

public ThreadLocalByteBuffer(ByteBuffer src) {
_src = src;
}

public ByteBuffer getSourceBuffer() {
return _src;
}

@Override
protected synchronized ByteBuffer initialValue() {
ByteBuffer dup = _src.duplicate();
return dup;
}
}

@Override
public boolean isClosed() {
return closed;
}

public String toString() {
return "Mapped page for " + this.pageFile + ", index = " + this.index + ".";
}
Expand Down