-
Notifications
You must be signed in to change notification settings - Fork 0
Ideas for a reactive store framework (not really boon per se)
Base persistence in key/value
import java.io.Closeable;
import java.io.Flushable;
import java.util.Map;
/**
*/
public interface KeyValueStore <K, V> extends Closeable, Flushable{
/**
* Put a key
* @param key key
* @param value value
*/
void put(K key, V value);
/**
* Put all
*/
void putAll(Map<K, V> values);
/**
* Remove all
*/
void removeAll(Iterable<K> keys);
/*
* Get a key.
*/
V get(K key);
void close();
void flush();
}
Base Level DB impl Could be impl in MySQL, BerkerlyDB, etc.
import org.boon.Exceptions;
import org.boon.Logger;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.impl.Iq80DBFactory;
import static org.boon.Boon.configurableLogger;
/**
* Created by Richard on 3/30/14.
*/
public class LevelDBKeyValueStore implements KeyValueStore<byte[], byte[]>{
private final String fileName;
private final boolean usingJNI;
private final Options options;
Logger logger = configurableLogger(LevelDBKeyValueStore.class);
DB database;
public LevelDBKeyValueStore( String fileName ) {
this (fileName, null, false);
}
public LevelDBKeyValueStore(String fileName, Options options, boolean log) {
this.fileName = fileName;
File file = new File(fileName);
if (options==null) {
logger.info("Using default options");
options =defaultOptions();
}
this.options = options;
if (log) {
options.logger(new org.iq80.leveldb.Logger() {
@Override
public void log(String message) {
logger.info("FROM DATABASE LOG", message);
}
});
}
usingJNI = openDB(file, options);
}
private Options defaultOptions() {
Options options = new Options();
options.createIfMissing(true);
options.blockSize(32_768); //32K
options.cacheSize(67_108_864);//64MB
return options;
}
private boolean openDB(File file, Options options) {
try {
database = JniDBFactory.factory.open(file, options);
logger.info("Using JNI Level DB");
return true;
} catch (IOException ex1) {
try {
database = Iq80DBFactory.factory.open(file, options);
logger.info("Using Java Level DB");
return false;
} catch (IOException ex2) {
return Exceptions.handle(Boolean.class, ex2);
}
}
}
@Override
public void put(byte[] key, byte[] value) {
database.put(key, value);
}
@Override
public void putAll(Map<byte[], byte[]> values) {
WriteBatch batch = database.createWriteBatch();
try {
for (Map.Entry<byte[], byte[]> entry : values.entrySet()) {
batch.put(entry.getKey(), entry.getValue());
}
database.write(batch);
} finally {
try {
batch.close();
} catch (IOException e) {
Exceptions.handle(e);
}
}
}
@Override
public void removeAll(Iterable<byte[]> keys) {
WriteBatch batch = database.createWriteBatch();
try {
for (byte[] key : keys) {
batch.delete(key);
}
database.write(batch);
} finally {
try {
batch.close();
} catch (IOException e) {
Exceptions.handle(e);
}
}
}
@Override
public byte[] get(byte[] key) {
return database.get(key);
}
@Override
public void close() {
try {
database.close();
} catch (IOException e) {
Exceptions.handle(e);
}
}
@Override
public void flush() {
this.close();
openDB(new File(fileName), this.options);
}
}
One option is just to use JSON as the store.
Define String interface.
/**
* Created by Richard on 3/30/14.
*/
public interface StringStringKeyValueStore extends KeyValueStore<String, String>{
}
import org.boon.cache.SimpleCache;
import org.boon.primitive.Byt;
import org.iq80.leveldb.Options;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by Richard on 3/30/14.
*/
public class SimpleStringLevelDBStore implements StringStringKeyValueStore {
KeyValueStore<byte[], byte[]> store;
SimpleCache<String, byte[]> keyCache = new SimpleCache<>(1_000);
public SimpleStringLevelDBStore(String fileName, Options options) {
store = new LevelDBKeyValueStore(fileName, options, false);
}
public SimpleStringLevelDBStore(String fileName, Options options, boolean log) {
store = new LevelDBKeyValueStore(fileName, options, log);
}
public SimpleStringLevelDBStore(String fileName) {
store = new LevelDBKeyValueStore(fileName);
}
@Override
public void put(String key, String value) {
store.put(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
}
@Override
public void putAll(Map<String, String> values) {
Map<byte[], byte[]> map = new HashMap<>();
for (Map.Entry<String, String> entry : values.entrySet()) {
byte[] key = Byt.bytes(entry.getKey());
byte[] value = Byt.bytes(entry.getValue());
map.put(key, value);
}
store.putAll(map);
}
@Override
public void removeAll(Iterable<String> keys) {
List<byte[]> keyBytes = new ArrayList<>();
for (String key : keys) {
keyBytes.add(Byt.bytes(key));
}
store.removeAll(keyBytes);
}
@Override
public String get(String key) {
byte[] bytes = store.get( keyToBytes(key) );
if (bytes==null) {
return null;
}
return new String(bytes, StandardCharsets.UTF_8);
}
private byte[] keyToBytes(String key) {
byte[] value = keyCache.get(key);
if (value == null) {
value = key.getBytes(StandardCharsets.UTF_8);
keyCache.put(key, value);
}
return value;
}
@Override
public void close() {
store.close();
}
@Override
public void flush() {
store.flush();
}
}
Then have a JSON specific API... (more like marker). I don't want to preclude Jackson and GSON.
/**
* Created by Richard on 3/30/14.
*/
public interface JSONKeyValueStore<K, V> extends KeyValueStore <K, V> {
}
BoonJSON store.
import org.boon.json.JsonParserAndMapper;
import org.boon.json.JsonParserFactory;
import org.boon.json.serializers.impl.JsonSimpleSerializerImpl;
import java.util.HashMap;
import java.util.Map;
/**
* Created by Richard on 3/30/14.
*/
public class BoonJsonKeyValueStore<K, V> implements JSONKeyValueStore<String, V>{
private final Class<V> type;
JsonSimpleSerializerImpl serializer = new JsonSimpleSerializerImpl();
JsonParserAndMapper deserializer = new JsonParserFactory().create();
StringStringKeyValueStore store;
public BoonJsonKeyValueStore(String fileName, Class<V> cls) {
store = new SimpleStringLevelDBStore(fileName);
type = cls;
}
private String toJson(V value) {
return serializer.serialize(value).toString();
}
@Override
public void put(String key, V value) {
store.put(key, toJson(value));
}
@Override
public void putAll(Map<String, V> values) {
Map<String, String> map = new HashMap<>();
for (Map.Entry<String, V> entry : values.entrySet()) {
String key = entry.getKey();
V v = entry.getValue();
String value = toJson(v);
map.put(key, value);
}
store.putAll(map);
}
@Override
public void removeAll(Iterable<String> keys) {
store.removeAll(keys);
}
@Override
public V get(String key) {
String value = store.get(key);
if (value==null) {
return null;
}
return deserializer.parse(type, value);
}
@Override
public void close() {
store.close();
}
@Override
public void flush() {
store.flush();
}
}
It just stores direct to leveldb, but the backing store could be MySQL. char, text db.
Now for the reactive part... add ideas of batcher project.
import org.boon.primitive.ByteBuf;
import java.util.List;
/**
* All threading and queueing logic go here
*/
public interface CollectorManager {
//two channels back channel and a forward channel see prototype called DataTracker
//Write to disk... see blog post/email and use best combo to save from too much GC.
//For prototype Friday use RAM disk
//Contains a BatchFileWriter.
//He gets called by the post receiver
//Uses scheduler to manage threads
//manages queue
//Used by HTTP post receiver
//change byte array to ByteBuf in boon
void sendPostToBeWritten(ByteBuf batch);
//Used by HTTP post receiver
//list = backChannel.poll (); can return null
ByteBuf allocateBuffer (int size);
//Used by HTTP post receiver
//channel.hasWaitingConsumer
boolean isWriterWaiting();
public void start(TimeAware postReceiver);
public void stop();
}
tick, tick, tick
public interface TimeAware {
public void tick(long time);
}
File writer... of course instead of files... we will write to dbs.
import boon.batcher.BatchFileWriter;
import boon.batcher.ExceptionHolder;
import boon.batcher.TimeAware;
import org.boon.Exceptions;
import org.boon.IO;
import org.boon.Lists;
import org.boon.primitive.ByteBuf;
import java.io.*;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.boon.Boon.puts;
import static org.boon.Boon.sputs;
import static org.boon.Exceptions.die;
import static org.boon.Exceptions.handle;
import static org.boon.primitive.Lng.str;
/**
* Writes batches of files. Optimized for high-speed batching by following principles of
* mechanical sympathy.
*
*/
public class BatchFileWriterImpl implements BatchFileWriter, TimeAware {
private long bytesTransferred = 0;
private long totalBytesTransferred = 0;
private long numFiles = 0;
private OutputStream outputStream;
public final static String FORMAT_PATTERN =
System.getProperty ( "........FILE_NAME_FORMAT_PATTERN",
"%s/user_data_collection_%s_%s_%s.json" );
public final static String OUTPUT_DIR =
System.getProperty ( "........OUTPUT_DIR",
"/var/..../........." );
public final static String SERVER_NAME =
System.getProperty ( "........SERVER_NAME",
"server1" );
private Path outputDir = IO.path ( OUTPUT_DIR );
private AtomicReference<String> fileName = new AtomicReference<> ( String.format ( FORMAT_PATTERN,
outputDir, numFiles, System.currentTimeMillis (), SERVER_NAME )
);
private volatile AtomicBoolean error = new AtomicBoolean ( false );
/**
* Default file size 20,000,000, if beyond 20M create a new file *
*/
public static int FILE_SIZE_BYTES = Integer.parseInt ( System.getProperty ( "...........FILE_SIZE_BYTES", "20000000" ) );
/**
* Default file batch time 10 minutes, if beyond ten, then create a new file. *
*/
public static int FILE_TIMEOUT_MINUTES = Integer.parseInt ( System.getProperty ( "......FILE_TIMEOUT_MINUTES", "10" ) );
private AtomicLong time = new AtomicLong ();
private AtomicBoolean fileTimeOut = new AtomicBoolean ();
private AtomicLong fileStartTime = new AtomicLong ();
private boolean dirty;
public void tick ( long time ) {
this.time.set ( time );
long startTime = fileStartTime.get ();
if ( time - startTime > ( FILE_TIMEOUT_MINUTES * 60 * 1_000 ) ) {
fileTimeOut.set ( true );
}
}
@Override
public long numFiles () {
return numFiles;
}
@Override
public long totalBytesTransferred () {
return totalBytesTransferred;
}
/**
* flush to disk.
*/
@Override
public boolean syncToDisk () {
/** if we have a stream and we are dirty then flush. */
if ( outputStream != null && dirty ) {
try {
outputStream.flush ();
dirty = false;
return true;
} catch ( Exception ex ) {
cleanupOutputStream ();
return false;
}
} else {
return false;
}
}
private void cleanupOutputStream() {
if ( outputStream!=null ) {
try {
outputStream.close ();
} catch ( IOException e ) {
e.printStackTrace (System.err);
} finally {
outputStream = null;
}
}
}
@Override
public long bytesTransferred () {
return bytesTransferred;
}
@Override
public void nextBufferToWrite ( final ByteBuf bufferOut ) {
dirty = true;
final int size = bufferOut.len ();
final byte[] bytes = bufferOut.readForRecycle ();
write ( bytes, size );
//only increment after successful write.
if ( !error.get () ) {
bytesTransferred += size;
}
if ( this.bytesTransferred >= FILE_SIZE_BYTES || fileTimeOut.get () ) {
try {
outputStream.close ();
} catch ( IOException e ) {
cleanupOutputStream ();
e.printStackTrace ( System.err );
} finally {
outputStream = null;
}
}
}
private void write ( final byte[] bytes, int size ) {
initOutputStream ();
try {
if (outputStream!=null) {
outputStream.write ( bytes, 0, size );
} else {
error.set ( true );
}
} catch ( Exception e ) {
cleanupOutputStream();
error.set ( true );
e.printStackTrace (System.err);
diagnose ();
Exceptions.handle ( e );
}
}
public void diagnose () {
Objects.requireNonNull ( fileName.get (), "the filename should not be null, " +
"you have misconfigured this service, fatal error" );
final Path path =
IO.path ( fileName.get () );
puts ("in diagnose");
puts ( "Filename :", path.toAbsolutePath () );
puts ( "File exists? :", Files.exists ( path ) );
puts ( "File writeable? :", Files.isWritable ( path ) );
Path outputDir = IO.path ( OUTPUT_DIR );
puts ( "Output dir :", outputDir.toAbsolutePath () );
puts ( "Output dir exists? :", Files.exists ( outputDir ) );
puts ( "Output dir writeable? :", Files.isWritable ( outputDir ) );
if (!Files.isWritable ( outputDir ) || !Files.exists ( outputDir )) {
error.set ( true );
}
try {
FileStore fileStore = Files.getFileStore ( path.getParent () );
puts ( "Total space :", str ( fileStore.getTotalSpace () ) );
puts ( "Use-able space :", str ( fileStore.getUsableSpace () ) );
puts ( "Free Space :", str ( fileStore.getUnallocatedSpace () ) );
puts ( "type :", fileStore.type () );
puts ( "name :", fileStore.name () );
puts ( "read-only :", fileStore.isReadOnly () );
} catch ( IOException e ) {
e.printStackTrace ();
}
}
public String outputDir () {
return outputDir.toString ();
}
private void initOutputStream () {
long time = this.time.get ();
if ( error.get () || this.totalBytesTransferred == 0) {
cleanupOutputStream ();
error.set ( false );
time = System.nanoTime () / 1_000_000;
}
if ( outputStream != null ) {
return;
}
fileName.set ( String.format ( FORMAT_PATTERN,
this.outputDirPath ().toString (), numFiles, time , SERVER_NAME ) );
try {
fileTimeOut.set ( false );
outputStream = streamCreator ();
fileStartTime.set ( time );
totalBytesTransferred += bytesTransferred;
bytesTransferred = 0;
} catch ( Exception ex ) {
cleanupOutputStream ();
error.set ( true );
Exceptions.handle ( ex );
} finally {
numFiles++;
}
}
protected OutputStream streamCreator () throws Exception {
return Files.newOutputStream ( IO.path ( fileName.get () ) ) ;
}
@Override
public String fileName () {
return fileName.get ();
}
protected Path outputDirPath () {
return IO.path(this.outputDir);
}
@Override
public void setError() {
this.error.set ( true );
}
}
Class that does the monitoring....
import org.boon.Exceptions;
import org.boon.IO;
import org.boon.primitive.ByteBuf;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.boon.Boon.putl;
import static org.boon.Boon.puts;
import static org.boon.Boon.sputs;
/**
* Manages incoming channel (queue) and byte buffer recycle channel.
*/
public class CollectionManagerImpl implements CollectorManager, TimeAware {
/**
* Byte Buffers that are done are put back on this queue.
*/
private final TransferQueue<ByteBuf> recycleChannel = new LinkedTransferQueue<> ();
/**
* Byte buffers that have been received from HTTP post, but not written to disk.
*/
private final TransferQueue<ByteBuf> inputChannel = new LinkedTransferQueue<> ();
/**
* BatchFileWriter is used to write out batches of data at once.
*/
private final BatchFileWriter writer = new BatchFileWriterImpl ();
/**
* Main thread scheduler.
*/
private ScheduledExecutorService scheduledExecutorService;
/**
* How many times have we done a flush.
*/
private AtomicLong numberOfFlushesTotal = new AtomicLong ();
/**
* current time, which we get every 20 mili-seconds.
*/
private AtomicLong time = new AtomicLong ();
/**
* Request main thread to stop.
*/
private AtomicBoolean stop = new AtomicBoolean ();
/**
* Holds the writerFuture for shutdown.
*/
private ScheduledFuture<?> writerFuture;
/**
* The last time we forced a sync to disk.
*/
private long lastFlushTime = 0;
/**
* Request main thread to stop.
*/
private AtomicBoolean recoverMode = new AtomicBoolean ();
/**
* Force flush if queue is empty after this many mili-seconds.
*/
private final static long FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS
= Long.parseLong ( System.getProperty (
".......FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS", "40" ) );
/**
* Periodic force flush. We can turn off periodic flushing and allow the OS
* to decide best time to sync to disk for speed.
* (Not much difference in speed on OSX).
*/
private final static boolean PERIODIC_FORCE_FLUSH
= Boolean.parseBoolean ( System.getProperty (
".....PERIODIC_FORCE_FLUSH", "true" ) );
/**
* Determines if we should see if the writer is busy before batching up a lot of
* data. Turning this off helps with throughput at the expense of data safety.
*/
private final static boolean TRANSFER_QUEUE_WRITER_WAITING_CHECK
= Boolean.parseBoolean ( System.getProperty (
"........WRITER_WAITING_CHECK", "true" ) );
/**
* Health monitor Future.
*/
private ScheduledFuture<?> monitorFuture;
/**
* How often should we report status?
*/
protected static int MONITOR_INTERVAL_SECONDS
= Integer.parseInt ( System.getProperty (
".....MONITOR_INTERVAL_SECONDS", "20" ) );
/**
* 20 ms timer to reduce expensive calls to System.nanoSecond.
*/
private ScheduledFuture<?> tickTock;
//Create with only six threads max.
public CollectionManagerImpl() {
scheduledExecutorService = Executors.newScheduledThreadPool ( 6 );
}
public CollectionManagerImpl( ScheduledExecutorService service ) {
scheduledExecutorService = service;
}
/**
* This gets called by the http post handler.
*/
@Override
public final void sendPostToBeWritten( ByteBuf batch ) {
//Can skip the queue altogether
if ( !inputChannel.tryTransfer ( batch ) ) {
inputChannel.offer ( batch );
}
}
/**
* This gets called from the http post handler.
*
* @param size the size of the buffer that you would like.
* @return
*/
public final ByteBuf allocateBuffer( int size ) {
ByteBuf spentBuffer = recycleChannel.poll ();
if ( spentBuffer == null ) {
spentBuffer = ByteBuf.create ( size );
}
return spentBuffer;
}
/**
* This checks to see if the output queue is waiting.
* We don't want the output queue to wait, but we also
* don't want it to thread sync too much either.
*/
@Override
public final boolean isWriterWaiting() {
// This call causes us to lose about 5% write throughput
// it has the advantage of reducing loss of buffered input data
// in the very rare occurrence of an outage.
return TRANSFER_QUEUE_WRITER_WAITING_CHECK &&
inputChannel.hasWaitingConsumer ();
}
/**
* This is the main processing loop for the batch writer processing.
*/
private void processWrites() {
while ( true ) {
try {
manageInputWriterChannel ();
} catch ( InterruptedException e ) {
if ( determineIfWeShouldExit () ) {
break;
}
}
}
}
/**
* See if it is time to stop
* We have been interrupted. Should we ignore it or break out of the loop.
*
* @return
*/
private boolean determineIfWeShouldExit() {
boolean shouldStop = stop.get ();
if ( !shouldStop ) {
Thread.interrupted ();
} else {
System.out.println ( "Exiting processing loop as requested" );
return true;
}
return false;
}
/**
* Queue and batch writer main logic.
* This is where the magic happens.
*
* @throws InterruptedException
*/
private void manageInputWriterChannel() throws InterruptedException {
try {
ByteBuf dataToWriteToFile;
dataToWriteToFile = inputChannel.poll (); //no wait
//If it is null, it means the inputChannel is empty and we need to flush.
if ( dataToWriteToFile == null ) {
queueEmptyMaybeFlush ();
dataToWriteToFile = inputChannel.poll ();
}
//If it is still null, this means that we need to wait
//for more items to show up in the inputChannel.
if ( dataToWriteToFile == null ) {
dataToWriteToFile = waitForNextDataToWrite ();
}
//We have to check for null again because we could have been interrupted.
if ( dataToWriteToFile != null ) {
//Write it
writer.nextBufferToWrite ( dataToWriteToFile );
//Then give it back
recycleChannel.offer ( dataToWriteToFile );
}
}catch (Exception ex) {
ex.printStackTrace ();
ex.printStackTrace (System.err);
}
}
/**
* If we detect that the in-coming transfer queue channel is empty
* then it could be an excellent time to sync to disk.
*/
private void queueEmptyMaybeFlush() {
if ( PERIODIC_FORCE_FLUSH ) {
long currentTime = time.get ();
/* Try not to flush more than once every x times per mili-seconds time period. */
if ( ( currentTime - lastFlushTime ) > FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS ) {
/* If the writer had things to flush, and we flushed then
increment the number of flushes.
*/
if ( writer.syncToDisk () ) { //could take 100 ms to 1 second
this.numberOfFlushesTotal.incrementAndGet ();
}
/* We update the flush time no matter what. */
lastFlushTime = time.get ();
}
}
}
/**
* If we don't have any data, and we have flushed,
* then we can wait on the queue. There is no sense spin-locking.
* The poll(time, timeunit) call will block until there is something to do
* or until the timeout.
*
* @return the next byte buffer.
* @throws InterruptedException
*/
private ByteBuf waitForNextDataToWrite() throws InterruptedException {
ByteBuf dataToWriteToFile;
dataToWriteToFile =
inputChannel.poll ( FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS,
TimeUnit.MILLISECONDS );
return dataToWriteToFile;
}
/**
* Start up the health monitor.
*/
private void startMonitor() {
final ScheduledExecutorService monitor = Executors.newScheduledThreadPool ( 2,
new ThreadFactory () {
@Override
public Thread newThread( Runnable runnable ) {
Thread thread = new Thread ( runnable );
thread.setPriority ( Thread.NORM_PRIORITY + 1 );
return thread;
}
} );
monitorFuture = monitor.scheduleAtFixedRate ( new Runnable () {
@Override
public void run() {
monitor ();
}
}, MONITOR_INTERVAL_SECONDS, MONITOR_INTERVAL_SECONDS, TimeUnit.SECONDS );
Runtime.getRuntime ().addShutdownHook ( new Thread ( new Runnable () {
@Override
public void run() {
System.err.println ( "shutting down...." );
monitor ();
}
} ) );
}
int monitorCount = 0;
private void monitor() {
if ( recoverMode.get () ) {
check ( "outputDir", this.writer.outputDir () );
}
//monitor runtime state... TBD
//Health check of system.
putl ( "Monitor:",
sputs ( " total bytes transferred: ", String.format ( "%,d", this.writer.totalBytesTransferred () ) ),
sputs ( " numberOfFlushesTotal: ", this.numberOfFlushesTotal.get () ),
sputs ( " input inputChannel size: ", this.inputChannel.size () ),
sputs ( " recycle inputChannel size: ", this.recycleChannel.size () ),
sputs ( " bytes transferred for file: ", String.format ( "%,d", this.writer.bytesTransferred () ) ),
sputs ( " current file ", this.writer.fileName ())
);
monitorCount ++;
if ( monitorCount % 5 == 0 ) {
String outputDir = this.writer.outputDir ();
check("output directory", outputDir);
}
}
private void check( String description, String fileName ) {
try {
Path path = IO.path ( fileName );
if ( !Files.isWritable ( path ) || !Files.exists ( path )) {
writer.setError ( );
Exception ex = new IOException ( );
ex.fillInStackTrace ();
ex.printStackTrace ( System.err );
puts ( "Unable to write to ", fileName, "which is the", description );
this.writer.diagnose ();
System.out.flush ();
System.err.flush ();
recoverMode.set ( true );
} else {
recoverMode.set ( false );
}
} catch (Exception ex) {
ex.printStackTrace (System.err);
}
}
public void stop() {
stop.set ( true );
writerFuture.cancel ( true );
monitorFuture.cancel ( true );
tickTock.cancel ( true );
}
/**
* Starts up the batch writer.
*/
public void start( final TimeAware postReceiver ) {
//This starts itself up again every 1/2 second if something really bad
//happens like disk full. As soon as the problem gets corrected
//then things start working again...happy day. Only
// one is running per instance of CollectionManagerImpl.
writerFuture =
scheduledExecutorService.scheduleAtFixedRate ( new Runnable () {
@Override
public void run() {
processWrites ();
}
}, 0, 500, TimeUnit.MILLISECONDS );
startMonitor ();
tickTock =
this.scheduledExecutorService.scheduleAtFixedRate ( new Runnable () {
@Override
public void run() {
if ( postReceiver != null ) {
postReceiver.tick ( -1 );
}
tick ( -1 );
}
}, 0, 20, TimeUnit.MILLISECONDS );
}
/**
* nano time cost 100 nano seconds to call.
* System.currentTimeMilis is not accurate (day light saving time shift)
* We want the speed of System.currentTimeMilis and the accuracy of
* System.nanoTime w/o the overhead so we call nano time
* every 20 miliseconds. and store the results in an atomic.
*/
@Override
public final void tick( long t ) {
long time = System.nanoTime () / 1_000_000;
this.time.set ( time );
this.writer.tick ( time );
}
}
****
import org.boon.core.Dates;
import org.boon.primitive.ByteBuf;
import java.util.concurrent.atomic.AtomicLong;
/**
* Receives HTTP post messages, adds a sequence number and a timestamp.
*
*/
public class PostReceiverImpl implements PostReceiver, TimeAware {
/** queue manager for batch writer. */
private final CollectorManager collector;
/** How big our buffer size is, this is the max size of each write. */
public final static int BUFFER_OUT_SIZE_MAX
= Integer.parseInt ( System.getProperty ( ".....BUFFER_OUT_SIZE_MAX", "100000" ) );
/** Current output buffer. */
private ByteBuf buffer = ByteBuf.create ( BUFFER_OUT_SIZE_MAX );
/** Index / sequence of the line we just wrote to the JSON file. */
private long index = 0;
/**
* The current UTC time within 20 mili-seconds accuracy.
*/
final private AtomicLong approxTime = new AtomicLong ( Dates.utcNow () );
/**
* PostReceiverImpl needs a collector to do its job.
* @param collector collector
*/
public PostReceiverImpl(CollectorManager collector) {
this.collector = collector;
}
/**
* Receive data from an http post.
* @param bodyOfPost body of post to send
*/
@Override
public void receivePost ( byte[] bodyOfPost, String address ) {
//build the header as a valid JSON array
//[sequence, timestamp, [...original array]]
buffer.add( (byte)'[');
buffer.add( "" + index++);
buffer.add( (byte)',');
buffer.add( ""+ approxTime.get () );
buffer.add( (byte)',');
buffer.add( "\""+ address + "\"");
buffer.add( (byte)',');
buffer.add ( bodyOfPost );
buffer.add( "]\n");
/* If the buffer is bigger than max or if the writer is waiting then send
buffer on output channel. */
if ( buffer.len () >= BUFFER_OUT_SIZE_MAX || collector.isWriterWaiting () ) {
collector.sendPostToBeWritten ( buffer );
buffer = collector.allocateBuffer ( BUFFER_OUT_SIZE_MAX );
}
}
/**
* Calculate utc time. This gets called every 20 mili-seconds or so.
*/
@Override
public void tick ( long time ) {
/*Foreign thread every 20 or so mili-seconds so we don't spend too
much time figuring out utc time. */
approxTime.set ( Dates.utcNow () );
}
}
Implementing search
@Override
public KeyValueIterable<byte[], byte[]> fromTo(final byte[] startKey, final byte[] stopKey) {
final DBIterator iterator = database.iterator();
iterator.seek( startKey );
return new KeyValueIterable<byte[], byte[]>() {
@Override
public void close() {
try {
iterator.close();
} catch (IOException e) {
Exceptions.handle(e);
}
}
@Override
public Iterator<Entry<byte[], byte[]>> iterator() {
return new Iterator<Entry<byte[], byte[]>>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Entry<byte[], byte[]> next() {
Map.Entry<byte[], byte[]> next = iterator.next();
return new Entry<>(next.getKey(), next.getValue());
}
@Override
public void remove() {
iterator.remove();
}
};
}
};
}
@Override
public KeyValueIterable<String, String> fromTo(final String startKey, final String stopKey) {
final KeyValueIterable<byte[], byte[]> iterable = store.fromTo(bytes(startKey),
bytes(stopKey));
return new KeyValueIterable<String, String>(){
@Override
public void close() {
iterable.close();
}
@Override
public Iterator<Entry<String, String>> iterator() {
final Iterator<Entry<byte[], byte[]>> iterator = iterable.iterator();
return new Iterator<Entry<String, String>>() {
Entry<String, String> current;
@Override
public boolean hasNext() {
if (iterator.hasNext()) {
if (current==null) {
return true;
}
return current.key().compareTo(stopKey)<0;
} else {
return false;
}
}
@Override
public Entry<String, String> next() {
Entry<byte[], byte[]> current;
current = iterator.next();
String key = str(current.key());
String value = str(current.value());
Entry<String, String> entry = new Entry<>(key, value);
this.current = entry;
return entry;
}
@Override
public void remove() {
iterator.remove();
}
};
}
} ;
}
YourKit supports Boon open source project with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: YourKit Java Profiler and YourKit .Net profiler.