diff --git a/README.md b/README.md index f306432..cd5ccec 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Features Juncture implements connectivity to the following trading venues: + - [**Hotspot**](juncture-hotspot) - [**NASDAQ**](juncture-nasdaq) diff --git a/juncture-hotspot/README.md b/juncture-hotspot/README.md new file mode 100644 index 0000000..ef97351 --- /dev/null +++ b/juncture-hotspot/README.md @@ -0,0 +1,34 @@ +Juncture Hotspot +================ + +Juncture Hotspot implements connectivity to Hotspot on the JVM. + + +Features +-------- + +Juncture Hotspot implements the following protocols: + + - **Hotspot ITCH Protocol 1.59** + +See the [Wiki][] for links to the protocol specifications. + + [Wiki]: https://github.com/paritytrading/juncture/wiki/ + + +Download +-------- + +Add a Maven dependency to Juncture Hotspot: + + + com.paritytrading.juncture + juncture-hotspot + + + + +License +------- + +Juncture Hotspot is released under the Apache License, Version 2.0. diff --git a/juncture-hotspot/pom.xml b/juncture-hotspot/pom.xml new file mode 100644 index 0000000..6c8d814 --- /dev/null +++ b/juncture-hotspot/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + + com.paritytrading.juncture + juncture-parent + 0.2.1-SNAPSHOT + + + juncture-hotspot + + Juncture Hotspot + + + + com.paritytrading.foundation + foundation + + + junit + junit + test + + + org.jvirtanen.value + value + test + + + + diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/Clock.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/Clock.java new file mode 100644 index 0000000..360bf90 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/Clock.java @@ -0,0 +1,7 @@ +package com.paritytrading.juncture.hotspot.itch; + +interface Clock { + + long currentTimeMillis(); + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBook.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBook.java new file mode 100644 index 0000000..efec263 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBook.java @@ -0,0 +1,234 @@ +package com.paritytrading.juncture.hotspot.itch; + +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; + +/** + * Common definitions for Hotspot Book Protocol. + */ +public class HotspotBook { + + private HotspotBook() { + } + + public static final byte BUY = 'B'; + public static final byte SELL = 'S'; + + public static final byte MESSAGE_TYPE_NEW_ORDER = 'N'; + public static final byte MESSAGE_TYPE_MODIFY_ORDER = 'M'; + public static final byte MESSAGE_TYPE_CANCEL_ORDER = 'X'; + public static final byte MESSAGE_TYPE_MARKET_SNAPSHOT = 'S'; + public static final byte MESSAGE_TYPE_TICKER = 'T'; + + /** + * A message. + */ + public interface Message { + + /** + * Read this message from the buffer. + * + * @param buffer a buffer + * @throws BufferUnderflowException if there are fewer bytes remaining + * in the buffer than what this message consists of + */ + void get(ByteBuffer buffer); + + /** + * Write this message to the buffer. + * + * @param buffer a buffer + * @throws BufferUnderflowException if there are fewer bytes remaining + * in the buffer than what this message consists of + * @throws ReadOnlyBufferException if the buffer is read-only + */ + void put(ByteBuffer buffer); + + } + + /** + * A New Order message (2.2.1). + */ + public static class NewOrder implements Message { + public byte buyOrSellIndicator; + public byte[] currencyPair; + public byte[] orderId; + public byte[] price; + public byte[] amount; + public byte[] minqty; + public byte[] lotsize; + + /** + * Construct an instance. + */ + public NewOrder() { + currencyPair = new byte[7]; + orderId = new byte[15]; + price = new byte[10]; + amount = new byte[16]; + minqty = new byte[16]; + lotsize = new byte[16]; + } + + @Override + public void get(ByteBuffer buffer) { + buyOrSellIndicator = buffer.get(); + buffer.get(currencyPair); + buffer.get(orderId); + buffer.get(price); + buffer.get(amount); + buffer.get(minqty); + buffer.get(lotsize); + } + + @Override + public void put(ByteBuffer buffer) { + buffer.put(MESSAGE_TYPE_NEW_ORDER); + buffer.put(buyOrSellIndicator); + buffer.put(currencyPair); + buffer.put(orderId); + buffer.put(price); + buffer.put(amount); + buffer.put(minqty); + buffer.put(lotsize); + } + } + + /** + * A Modify Order message (2.2.2). + */ + public static class ModifyOrder implements Message { + public byte[] currencyPair; + public byte[] orderId; + public byte[] amount; + public byte[] minqty; + public byte[] lotsize; + + /** + * Construct an instance. + */ + public ModifyOrder() { + currencyPair = new byte[7]; + orderId = new byte[15]; + amount = new byte[16]; + minqty = new byte[16]; + lotsize = new byte[16]; + } + + @Override + public void get(ByteBuffer buffer) { + buffer.get(currencyPair); + buffer.get(orderId); + buffer.get(amount); + buffer.get(minqty); + buffer.get(lotsize); + } + + @Override + public void put(ByteBuffer buffer) { + buffer.put(MESSAGE_TYPE_MODIFY_ORDER); + buffer.put(currencyPair); + buffer.put(orderId); + buffer.put(amount); + buffer.put(minqty); + buffer.put(lotsize); + } + } + + /** + * A Cancel Order message (2.2.3). + */ + public static class CancelOrder implements Message { + public byte[] currencyPair; + public byte[] orderId; + + /** + * Construct an instance. + */ + public CancelOrder() { + currencyPair = new byte[7]; + orderId = new byte[15]; + } + + @Override + public void get(ByteBuffer buffer) { + buffer.get(currencyPair); + buffer.get(orderId); + } + + @Override + public void put(ByteBuffer buffer) { + buffer.put(MESSAGE_TYPE_CANCEL_ORDER); + buffer.put(currencyPair); + buffer.put(orderId); + } + } + + /** + * An entry in a Market Snapshot message (2.2.4). + */ + public static class MarketSnapshotEntry { + public byte[] currencyPair; + public byte buyOrSellIndicator; + public byte[] price; + public byte[] amount; + public byte[] minqty; + public byte[] lotsize; + public byte[] orderId; + + /** + * Construct an instance. + */ + public MarketSnapshotEntry() { + currencyPair = new byte[7]; + price = new byte[10]; + amount = new byte[16]; + minqty = new byte[16]; + lotsize = new byte[16]; + orderId = new byte[15]; + } + } + + /** + * A Ticker message (2.2.5). + */ + public static class Ticker implements Message { + public byte aggressorBuyOrSellIndicator; + public byte[] currencyPair; + public byte[] price; + public byte[] transactionDate; + public byte[] transactionTime; + + /** + * Construct an instance. + */ + public Ticker() { + currencyPair = new byte[7]; + price = new byte[10]; + transactionDate = new byte[8]; + transactionTime = new byte[6]; + } + + @Override + public void get(ByteBuffer buffer) { + aggressorBuyOrSellIndicator = buffer.get(); + buffer.get(currencyPair); + buffer.get(price); + buffer.get(transactionDate); + buffer.get(transactionTime); + } + + @Override + public void put(ByteBuffer buffer) { + buffer.put(MESSAGE_TYPE_TICKER); + buffer.put(aggressorBuyOrSellIndicator); + buffer.put(currencyPair); + buffer.put(price); + buffer.put(transactionDate); + buffer.put(transactionTime); + } + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookException.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookException.java new file mode 100644 index 0000000..4c19cb8 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookException.java @@ -0,0 +1,19 @@ +package com.paritytrading.juncture.hotspot.itch; + +import java.io.IOException; + +/** + * Indicates a protocol error while handling Hotspot Book Protocol. + */ +public class HotspotBookException extends IOException { + + /** + * Create an instance with the specified detail message. + * + * @param message the detail message + */ + public HotspotBookException(String message) { + super(message); + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookFormatter.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookFormatter.java new file mode 100644 index 0000000..5f997f1 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookFormatter.java @@ -0,0 +1,252 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.foundation.ByteBuffers.*; +import static com.paritytrading.juncture.hotspot.itch.HotspotBook.*; + +import com.paritytrading.foundation.ASCII; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * A formatter for outbound Hotspot Book Protocol messages. + */ +public class HotspotBookFormatter { + + private State state; + + /** + * Create a formatter for outbound Hotspot Book Protocol messages. + */ + public HotspotBookFormatter() { + state = new State(); + } + + /** + * Start a Market Snapshot message. + * + * @param buffer a buffer + */ + public void marketSnapshotStart(ByteBuffer buffer) { + state.reset(); + + // Message Type + buffer.put(MESSAGE_TYPE_MARKET_SNAPSHOT); + + state.lengthOfMessagePosition = buffer.position(); + + // Length of Message + ASCII.putLongRight(state.lengthOfMessage, 0); + buffer.put(state.lengthOfMessage); + + state.numberOfCurrencyPairsPosition = buffer.position(); + } + + /** + *

Add an entry to a Market Snapshot message. Subsequent entries must be + * grouped by the following fields:

+ * + *
    + *
  1. currency pair
  2. + *
  3. buy or sell indicator
  4. + *
  5. price
  6. + *
+ * + * @param buffer a buffer + * @param entry an entry + */ + public void marketSnapshotEntry(ByteBuffer buffer, MarketSnapshotEntry entry) { + if (!Arrays.equals(entry.currencyPair, state.currencyPair)) { + if (state.currencyPairs == 0) { + + // Number of Currency Pairs + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + } + + state.currencyPairs++; + + System.arraycopy(entry.currencyPair, 0, state.currencyPair, 0, state.currencyPair.length); + + // Currency Pair + buffer.put(entry.currencyPair); + + state.buyOrSellIndicator = entry.buyOrSellIndicator; + + state.numberOfPricesPosition = buffer.position(); + + // Number of Bid Prices + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + + if (entry.buyOrSellIndicator == SELL) { + state.numberOfPricesPosition = buffer.position(); + + // Number of Offer Prices + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + } + + System.arraycopy(entry.price, 0, state.price, 0, state.price.length); + + // Bid/Offer Price + buffer.put(entry.price); + + state.numberOfOrdersPosition = buffer.position(); + + // Number of Bid/Offer Orders + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + + state.prices = 1; + state.orders = 0; + } + else if (entry.buyOrSellIndicator != state.buyOrSellIndicator) { + + // Number of Bid Prices + ASCII.putLongRight(state.numberOfItems, state.prices); + put(buffer, state.numberOfItems, state.numberOfPricesPosition); + + // Number of Bid Orders + ASCII.putLongRight(state.numberOfItems, state.orders); + put(buffer, state.numberOfItems, state.numberOfOrdersPosition); + + state.numberOfPricesPosition = buffer.position(); + + state.buyOrSellIndicator = entry.buyOrSellIndicator; + + // Number of Offer Prices + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + + System.arraycopy(entry.price, 0, state.price, 0, state.price.length); + + // Offer Price + buffer.put(entry.price); + + state.numberOfOrdersPosition = buffer.position(); + + // Number of Offer Orders + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + + state.prices = 1; + state.orders = 0; + } + else if (!Arrays.equals(entry.price, state.price)) { + + // Number of Bid/Offer Orders + ASCII.putLongRight(state.numberOfItems, state.orders); + put(buffer, state.numberOfItems, state.numberOfOrdersPosition); + + // Bid/Offer Price + buffer.put(entry.price); + + state.numberOfOrdersPosition = buffer.position(); + + // Number of Bid/Offer Orders + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + + state.orders = 0; + + state.prices++; + } + + buffer.put(entry.amount); + buffer.put(entry.minqty); + buffer.put(entry.lotsize); + buffer.put(entry.orderId); + + state.orders++; + } + + /** + * End a Market Snapshot message. + * + * @param buffer a buffer + */ + public void marketSnapshotEnd(ByteBuffer buffer) { + if (state.currencyPairs > 0) { + + // Number of Currency Pairs + ASCII.putLongRight(state.numberOfItems, state.currencyPairs); + put(buffer, state.numberOfItems, state.numberOfCurrencyPairsPosition); + } + + if (state.buyOrSellIndicator == BUY) { + + // Number Of Offer Prices + ASCII.putLongRight(state.numberOfItems, 0); + buffer.put(state.numberOfItems); + } + + if (state.prices > 0) { + + // Number of Bid/Offer Prices + ASCII.putLongRight(state.numberOfItems, state.prices); + put(buffer, state.numberOfItems, state.numberOfPricesPosition); + } + + if (state.orders > 0) { + + // Number of Bid/Offer Orders + ASCII.putLongRight(state.numberOfItems, state.orders); + put(buffer, state.numberOfItems, state.numberOfOrdersPosition); + } + + int endPosition = buffer.position(); + + int lengthOfMessage = endPosition - state.numberOfCurrencyPairsPosition; + + // Length of Message + ASCII.putLongRight(state.lengthOfMessage, lengthOfMessage); + put(buffer, state.lengthOfMessage, state.lengthOfMessagePosition); + } + + private static class State { + public byte[] lengthOfMessage; + public byte[] numberOfItems; + + public int lengthOfMessagePosition; + public int numberOfCurrencyPairsPosition; + public int numberOfPricesPosition; + public int numberOfOrdersPosition; + + public int currencyPairs; + public int prices; + public int orders; + + public byte[] currencyPair; + public byte buyOrSellIndicator; + public byte[] price; + + public State() { + lengthOfMessage = new byte[6]; + numberOfItems = new byte[4]; + + currencyPair = new byte[7]; + price = new byte[10]; + + reset(); + } + + public void reset() { + ASCII.putLongRight(lengthOfMessage, 0); + ASCII.putLongRight(numberOfItems, 0); + + lengthOfMessagePosition = -1; + numberOfCurrencyPairsPosition = -1; + numberOfPricesPosition = -1; + numberOfOrdersPosition = -1; + + currencyPairs = 0; + prices = 0; + orders = 0; + + ASCII.putLeft(currencyPair, " "); + buyOrSellIndicator = ' '; + ASCII.putLeft(price, " "); + } + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookListener.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookListener.java new file mode 100644 index 0000000..3e2a5ef --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookListener.java @@ -0,0 +1,68 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.HotspotBook.*; + +import java.io.IOException; + +/** + * The interface for inbound Hotspot Book Protocol messages. + */ +public interface HotspotBookListener { + + /** + * Receive a New Order message (2.2.1). + * + * @param message the message + * @throws IOException if an I/O error occurs + */ + void newOrder(NewOrder message) throws IOException; + + /** + * Receive a Modify Order message (2.2.2). + * + * @param message the message + * @throws IOException if an I/O error occurs + */ + void modifyOrder(ModifyOrder message) throws IOException; + + /** + * Receive a Cancel Order message (2.2.3). + * + * @param message the message + * @throws IOException if an I/O error occurs + */ + void cancelOrder(CancelOrder message) throws IOException; + + /** + * Receive an indication of the start of a Market Snapshot message + * (2.2.4). + * + * @throws IOException if an I/O error occurs + */ + void marketSnapshotStart() throws IOException; + + /** + * Receive an indication of an entry in a Market Snapshot message + * (2.2.4). + * + * @param entry the entry + * @throws IOException if an I/O error occurs + */ + void marketSnapshotEntry(MarketSnapshotEntry entry) throws IOException; + + /** + * Receive an indication of the end of a Market Snapshot message (2.2.4). + * + * @throws IOException if an I/O error occurs + */ + void marketSnapshotEnd() throws IOException; + + /** + * Receive a Ticker message (2.2.5). + * + * @param message the message + * @throws IOException if an I/O error occurs + */ + void ticker(Ticker message) throws IOException; + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookParser.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookParser.java new file mode 100644 index 0000000..4145d23 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/HotspotBookParser.java @@ -0,0 +1,167 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.HotspotBook.*; + +import com.paritytrading.foundation.ASCII; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A parser for inbound Hotspot Book Protocol messages. + */ +public class HotspotBookParser { + + private NewOrder newOrder; + private ModifyOrder modifyOrder; + private CancelOrder cancelOrder; + private MarketSnapshotEntry marketSnapshotEntry; + private Ticker ticker; + + private State state; + + private HotspotBookListener listener; + + /** + * Create a parser for inbound Hotspot Book Protocol messages. + * + * @param listener the listener + */ + public HotspotBookParser(HotspotBookListener listener) { + this.newOrder = new NewOrder(); + this.modifyOrder = new ModifyOrder(); + this.cancelOrder = new CancelOrder(); + this.marketSnapshotEntry = new MarketSnapshotEntry(); + this.ticker = new Ticker(); + + this.state = new State(); + + this.listener = listener; + } + + /** + * Parse a Hotspot Book Protocol message. + * + * @param buffer a buffer containing a Hotspot Book Protocol message + * @throws IOException if an I/O error occurs + */ + public void parse(ByteBuffer buffer) throws IOException { + byte messageType = buffer.get(); + + switch (messageType) { + case MESSAGE_TYPE_NEW_ORDER: + newOrder.get(buffer); + listener.newOrder(newOrder); + break; + case MESSAGE_TYPE_MODIFY_ORDER: + modifyOrder.get(buffer); + listener.modifyOrder(modifyOrder); + break; + case MESSAGE_TYPE_CANCEL_ORDER: + cancelOrder.get(buffer); + listener.cancelOrder(cancelOrder); + break; + case MESSAGE_TYPE_MARKET_SNAPSHOT: + marketSnapshot(buffer); + break; + case MESSAGE_TYPE_TICKER: + ticker.get(buffer); + listener.ticker(ticker); + break; + default: + throw new HotspotBookException("Unknown message type: " + (char)messageType); + } + } + + private void marketSnapshot(ByteBuffer buffer) throws IOException { + listener.marketSnapshotStart(); + + // Length of Message + buffer.get(state.lengthOfMessage); + + long lengthOfMessage = ASCII.getLong(state.lengthOfMessage); + + if (lengthOfMessage < state.numberOfItems.length) { + listener.marketSnapshotEnd(); + + return; + } + + // Number of Currency Pairs + buffer.get(state.numberOfItems); + + long numberOfCurrencyPairs = ASCII.getLong(state.numberOfItems); + + for (int i = 0; i < numberOfCurrencyPairs; i++) { + + // Currency Pair + buffer.get(marketSnapshotEntry.currencyPair); + + marketSnapshotEntry.buyOrSellIndicator = BUY; + + // Number of Bid Prices + buffer.get(state.numberOfItems); + + long numberOfBidPrices = ASCII.getLong(state.numberOfItems); + + for (int j = 0; j < numberOfBidPrices; j++) { + + // Bid Price + buffer.get(marketSnapshotEntry.price); + + // Number of Bid Orders + buffer.get(state.numberOfItems); + + long numberOfBidOrders = ASCII.getLong(state.numberOfItems); + + for (int k = 0; k < numberOfBidOrders; k++) { + buffer.get(marketSnapshotEntry.amount); + buffer.get(marketSnapshotEntry.minqty); + buffer.get(marketSnapshotEntry.lotsize); + buffer.get(marketSnapshotEntry.orderId); + + listener.marketSnapshotEntry(marketSnapshotEntry); + } + } + + marketSnapshotEntry.buyOrSellIndicator = SELL; + + // Number of Offer Prices + buffer.get(state.numberOfItems); + + long numberOfOfferPrices = ASCII.getLong(state.numberOfItems); + + for (int j = 0; j < numberOfOfferPrices; j++) { + + // Offer Price + buffer.get(marketSnapshotEntry.price); + + // Number of Offer Orders + buffer.get(state.numberOfItems); + + long numberOfOfferOrders = ASCII.getLong(state.numberOfItems); + + for (int k = 0; k < numberOfOfferOrders; k++) { + buffer.get(marketSnapshotEntry.amount); + buffer.get(marketSnapshotEntry.minqty); + buffer.get(marketSnapshotEntry.lotsize); + buffer.get(marketSnapshotEntry.orderId); + + listener.marketSnapshotEntry(marketSnapshotEntry); + } + } + } + + listener.marketSnapshotEnd(); + } + + private static class State { + public byte[] lengthOfMessage; + public byte[] numberOfItems; + + public State() { + lengthOfMessage = new byte[6]; + numberOfItems = new byte[4]; + } + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCH.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCH.java new file mode 100644 index 0000000..4a46c61 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCH.java @@ -0,0 +1,300 @@ +package com.paritytrading.juncture.hotspot.itch; + +import com.paritytrading.foundation.ASCII; +import java.nio.ByteBuffer; + +/** + * Common definitions for ITCH Session Management Protocol. + */ +public class ITCH { + + public static final int MAX_NUMBER_OF_CURRENCY_PAIRS = 256; + + /* + * These messages are sent by the server to the client. + */ + static final byte MESSAGE_TYPE_LOGIN_ACCEPTED = 'A'; + static final byte MESSAGE_TYPE_LOGIN_REJECTED = 'J'; + static final byte MESSAGE_TYPE_SEQUENCED_DATA = 'S'; + static final byte MESSAGE_TYPE_SERVER_HEARTBEAT = 'H'; + static final byte MESSAGE_TYPE_ERROR_NOTIFICATION = 'E'; + static final byte MESSAGE_TYPE_INSTRUMENT_DIRECTORY = 'R'; + + /* + * These messages are sent by the client to the server. + */ + static final byte MESSAGE_TYPE_LOGIN_REQUEST = 'L'; + static final byte MESSAGE_TYPE_LOGOUT_REQUEST = 'O'; + static final byte MESSAGE_TYPE_CLIENT_HEARTBEAT = 'R'; + static final byte MESSAGE_TYPE_MARKET_SNAPSHOT_REQUEST = 'M'; + static final byte MESSAGE_TYPE_TICKER_SUBSCRIBE_REQUEST = 'T'; + static final byte MESSAGE_TYPE_TICKER_UNSUBSCRIBE_REQUEST = 'U'; + static final byte MESSAGE_TYPE_MARKET_DATA_SUBSCRIBE_REQUEST = 'A'; + static final byte MESSAGE_TYPE_MARKET_DATA_UNSUBSCRIBE_REQUEST = 'B'; + static final byte MESSAGE_TYPE_INSTRUMENT_DIRECTORY_REQUEST = 'I'; + + public static final byte TRUE = 'T'; + public static final byte FALSE = 'F'; + + /** + * A Login Accepted packet (1.2.1). + */ + public static class LoginAccepted { + public byte[] sequenceNumber; + + /** + * Create a new instance. + */ + public LoginAccepted() { + sequenceNumber = new byte[10]; + } + + void get(ByteBuffer buffer) { + buffer.get(sequenceNumber); + } + + void put(ByteBuffer buffer) { + buffer.put(sequenceNumber); + } + } + + /** + * A Login Rejected packet (1.2.2). + */ + public static class LoginRejected { + public byte[] reason; + + /** + * Create a new instance. + */ + public LoginRejected() { + reason = new byte[20]; + } + + void get(ByteBuffer buffer) { + buffer.get(reason); + } + + void put(ByteBuffer buffer) { + buffer.put(reason); + } + } + + /** + * A Sequenced Data packet (1.2.3). + */ + public static class SequencedData { + public byte[] time; + + /** + * Create a new instance. + */ + public SequencedData() { + time = new byte[9]; + } + + void get(ByteBuffer buffer) { + buffer.get(time); + } + + void put(ByteBuffer buffer) { + buffer.put(time); + } + } + + /** + * An Error Notification packet (1.2.6). + */ + public static class ErrorNotification { + public byte[] errorExplanation; + + /** + * Create a new instance. + */ + public ErrorNotification() { + errorExplanation = new byte[100]; + } + + void get(ByteBuffer buffer) { + buffer.get(errorExplanation); + } + + void put(ByteBuffer buffer) { + buffer.put(errorExplanation); + } + } + + /** + * An Instrument Directory packet (1.2.7). + */ + public static class InstrumentDirectory { + public byte[] numberOfCurrencyPairs; + public byte[][] currencyPair; + + public InstrumentDirectory() { + numberOfCurrencyPairs = new byte[4]; + currencyPair = new byte[MAX_NUMBER_OF_CURRENCY_PAIRS][7]; + } + + void get(ByteBuffer buffer) { + buffer.get(numberOfCurrencyPairs); + + for (int i = 0; i < ASCII.getLong(numberOfCurrencyPairs); i++) + buffer.get(currencyPair[i]); + } + + void put(ByteBuffer buffer) { + buffer.put(numberOfCurrencyPairs); + + for (int i = 0; i < ASCII.getLong(numberOfCurrencyPairs); i++) + buffer.put(currencyPair[i]); + } + } + + /** + * A Login Request packet (1.3.1). + */ + public static class LoginRequest { + public byte[] loginName; + public byte[] password; + public byte marketDataUnsubscribe; + public byte[] reserved; + + /** + * Create an instance. + */ + public LoginRequest() { + loginName = new byte[40]; + password = new byte[40]; + reserved = new byte[9]; + } + + void get(ByteBuffer buffer) { + buffer.get(loginName); + buffer.get(password); + marketDataUnsubscribe = buffer.get(); + buffer.get(reserved); + } + + void put(ByteBuffer buffer) { + buffer.put(loginName); + buffer.put(password); + buffer.put(marketDataUnsubscribe); + buffer.put(reserved); + } + } + + /** + * A Market Snapshot Request packet (1.3.4). + */ + public static class MarketSnapshotRequest { + public byte[] currencyPair; + + /** + * Construct an instance. + */ + public MarketSnapshotRequest() { + currencyPair = new byte[7]; + } + + void get(ByteBuffer buffer) { + buffer.get(currencyPair); + } + + void put(ByteBuffer buffer) { + buffer.put(currencyPair); + } + } + + /** + * A Ticker Subscribe Request packet (1.3.5). + */ + public static class TickerSubscribeRequest { + public byte[] currencyPair; + + /** + * Construct an instance. + */ + public TickerSubscribeRequest() { + currencyPair = new byte[7]; + } + + void get(ByteBuffer buffer) { + buffer.get(currencyPair); + } + + void put(ByteBuffer buffer) { + buffer.put(currencyPair); + } + } + + /** + * A Ticker Unsubscribe Request packet (1.3.6). + */ + public static class TickerUnsubscribeRequest { + public byte[] currencyPair; + + /** + * Construct an instance. + */ + public TickerUnsubscribeRequest() { + currencyPair = new byte[7]; + } + + void get(ByteBuffer buffer) { + buffer.get(currencyPair); + } + + void put(ByteBuffer buffer) { + buffer.put(currencyPair); + } + } + + /** + * A Market Data Subscribe Request packet (1.3.7). + */ + public static class MarketDataSubscribeRequest { + public byte[] currencyPair; + + /** + * Construct an instance. + */ + public MarketDataSubscribeRequest() { + currencyPair = new byte[7]; + } + + void get(ByteBuffer buffer) { + buffer.get(currencyPair); + } + + void put(ByteBuffer buffer) { + buffer.put(currencyPair); + } + } + + /** + * A Market Data Unsubscribe Request packet (1.3.8). + */ + public static class MarketDataUnsubscribeRequest { + public byte[] currencyPair; + + /** + * Construct an instance. + */ + public MarketDataUnsubscribeRequest() { + currencyPair = new byte[7]; + } + + void get(ByteBuffer buffer) { + buffer.get(currencyPair); + } + + void put(ByteBuffer buffer) { + buffer.put(currencyPair); + } + } + + private ITCH() { + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHClient.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHClient.java new file mode 100644 index 0000000..b546c24 --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHClient.java @@ -0,0 +1,191 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.ITCH.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * An implementation of the client side of ITCH Session Management Protocol. + */ +public class ITCHClient extends ITCHSession { + + private LoginAccepted loginAccepted; + private LoginRejected loginRejected; + private SequencedData sequencedData; + private ErrorNotification errorNotification; + private InstrumentDirectory instrumentDirectory; + + private ByteBuffer txPayload; + + private ITCHClientListener listener; + + /** + * Create a client. The underlying socket channel can be either blocking + * or non-blocking. + * + * @param channel the underlying socket channel + * @param rxBufferCapacity the receive buffer capacity + * @param listener the inbound packet listener + */ + public ITCHClient(SocketChannel channel, int rxBufferCapacity, + ITCHClientListener listener) { + this(SystemClock.INSTANCE, channel, rxBufferCapacity, listener); + } + + ITCHClient(Clock clock, SocketChannel channel, int rxBufferCapacity, + ITCHClientListener listener) { + super(clock, channel, rxBufferCapacity, MESSAGE_TYPE_CLIENT_HEARTBEAT); + + this.loginAccepted = new LoginAccepted(); + this.loginRejected = new LoginRejected(); + this.sequencedData = new SequencedData(); + this.errorNotification = new ErrorNotification(); + this.instrumentDirectory = new InstrumentDirectory(); + + this.txPayload = ByteBuffer.allocate(90); + + this.listener = listener; + } + + /** + * Send a Login Request packet (1.3.1). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void login(LoginRequest packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_LOGIN_REQUEST, txPayload); + } + + /** + * Send a Logout Request packet (1.3.2). + * + * @throws IOException if an I/O error occurs + */ + public void logout() throws IOException { + send(MESSAGE_TYPE_LOGOUT_REQUEST); + } + + /** + * Send a Market Snapshot Request packet (1.3.4). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void request(MarketSnapshotRequest packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_MARKET_SNAPSHOT_REQUEST, txPayload); + } + + /** + * Send a Ticker Subscribe Request packet (1.3.5). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void request(TickerSubscribeRequest packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_TICKER_SUBSCRIBE_REQUEST, txPayload); + } + + /** + * Send a Ticker Unsubscribe Request packet (1.3.6). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void request(TickerUnsubscribeRequest packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_TICKER_UNSUBSCRIBE_REQUEST, txPayload); + } + + /** + * Send a Market Data Subscribe Request packet (1.3.7). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void request(MarketDataSubscribeRequest packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_MARKET_DATA_SUBSCRIBE_REQUEST, txPayload); + } + + /** + * Send a Market Data Unsubscribe Request packet (1.3.8). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void request(MarketDataUnsubscribeRequest packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_MARKET_DATA_UNSUBSCRIBE_REQUEST, txPayload); + } + + /** + * Send an Instrument Directory Request packet (1.3.9). + * + * @throws IOException if an I/O error occurs + */ + public void requestInstrumentDirectory() throws IOException { + send(MESSAGE_TYPE_INSTRUMENT_DIRECTORY_REQUEST); + } + + @Override + protected void heartbeatTimeout() throws IOException { + listener.heartbeatTimeout(this); + } + + @Override + protected void packet(byte messageType, ByteBuffer packet) throws IOException { + switch (messageType) { + case MESSAGE_TYPE_LOGIN_ACCEPTED: + loginAccepted.get(packet); + listener.loginAccepted(this, loginAccepted); + break; + case MESSAGE_TYPE_LOGIN_REJECTED: + loginRejected.get(packet); + listener.loginRejected(this, loginRejected); + break; + case MESSAGE_TYPE_SEQUENCED_DATA: + if (packet.hasRemaining()) { + sequencedData.get(packet); + listener.sequencedData(this, sequencedData, packet); + } else { + listener.endOfSession(this); + } + break; + case MESSAGE_TYPE_SERVER_HEARTBEAT: + break; + case MESSAGE_TYPE_ERROR_NOTIFICATION: + errorNotification.get(packet); + listener.errorNotification(this, errorNotification); + break; + case MESSAGE_TYPE_INSTRUMENT_DIRECTORY: + instrumentDirectory.get(packet); + listener.instrumentDirectory(this, instrumentDirectory); + break; + } + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHClientListener.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHClientListener.java new file mode 100644 index 0000000..868c49b --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHClientListener.java @@ -0,0 +1,75 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.ITCH.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * The interface for inbound events on the client side. + */ +public interface ITCHClientListener { + + /** + * Receive an indication of a heartbeat timeout. + * + * @param session the session + * @throws IOException if an I/O error occurs + */ + void heartbeatTimeout(ITCHClient session) throws IOException; + + /** + * Receive a Login Accepted packet (1.2.1). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void loginAccepted(ITCHClient session, LoginAccepted packet) throws IOException; + + /** + * Receive a Login Rejected packet (1.2.2). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void loginRejected(ITCHClient session, LoginRejected packet) throws IOException; + + /** + * Receive a Sequenced Data packet (1.2.3). + * + * @param session the session + * @param header the header + * @param payload the payload + * @throws IOException if an I/O error occurs + */ + void sequencedData(ITCHClient session, SequencedData header, ByteBuffer payload) throws IOException; + + /** + * Receive an indication of the end of session (1.2.5). + * + * @param session the session + * @throws IOException if an I/O error occurs + */ + void endOfSession(ITCHClient session) throws IOException; + + /** + * Receive an Error Notification packet (1.2.6). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void errorNotification(ITCHClient session, ErrorNotification packet) throws IOException; + + /** + * Receive an Instrument Directory packet (1.2.7). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void instrumentDirectory(ITCHClient session, InstrumentDirectory packet) throws IOException; + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHException.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHException.java new file mode 100644 index 0000000..882f86d --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHException.java @@ -0,0 +1,19 @@ +package com.paritytrading.juncture.hotspot.itch; + +import java.io.IOException; + +/** + * Indicates a protocol error while handling ITCH Session Management Protocol. + */ +public class ITCHException extends IOException { + + /** + * Create an instance with the specified detail message. + * + * @param message the detail message + */ + public ITCHException(String message) { + super(message); + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHServer.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHServer.java new file mode 100644 index 0000000..17f519e --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHServer.java @@ -0,0 +1,162 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.ITCH.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * An implementation of the server side of ITCH Session Management Protocol. + */ +public class ITCHServer extends ITCHSession { + + private LoginRequest loginRequest; + private MarketSnapshotRequest marketSnapshotRequest; + private TickerSubscribeRequest tickerSubscribeRequest; + private TickerUnsubscribeRequest tickerUnsubscribeRequest; + private MarketDataSubscribeRequest marketDataSubscribeRequest; + private MarketDataUnsubscribeRequest marketDataUnsubscribeRequest; + + private ByteBuffer txPayload; + + private ITCHServerListener listener; + + /** + * Create a server. The underlying socket channel can be either blocking + * or non-blocking. + * + * @param channel the underlying socket channel + * @param rxBufferCapacity the receive buffer capacity + * @param listener the inbound packet listener + */ + public ITCHServer(SocketChannel channel, int rxBufferCapacity, + ITCHServerListener listener) { + this(SystemClock.INSTANCE, channel, rxBufferCapacity, listener); + } + + ITCHServer(Clock clock, SocketChannel channel, int rxBufferCapacity, + ITCHServerListener listener) { + super(clock, channel, rxBufferCapacity, MESSAGE_TYPE_SERVER_HEARTBEAT); + + this.loginRequest = new LoginRequest(); + this.marketSnapshotRequest = new MarketSnapshotRequest(); + this.tickerSubscribeRequest = new TickerSubscribeRequest(); + this.tickerUnsubscribeRequest = new TickerUnsubscribeRequest(); + this.marketDataSubscribeRequest = new MarketDataSubscribeRequest(); + this.marketDataUnsubscribeRequest = new MarketDataUnsubscribeRequest(); + + this.txPayload = ByteBuffer.allocate(8192); + + this.listener = listener; + } + + /** + * Send a Login Accepted packet (1.2.1). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void accept(LoginAccepted packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_LOGIN_ACCEPTED, txPayload); + } + + /** + * Send a Login Rejected packet (1.2.2). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void reject(LoginRejected packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_LOGIN_REJECTED, txPayload); + } + + /** + * Send an indication of the end of session (1.2.5). + * + * @throws IOException if an I/O error occurs + */ + public void endSession() throws IOException { + send(MESSAGE_TYPE_SEQUENCED_DATA); + } + + /** + * Send an Error Notification packet (1.2.6). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + * + */ + public void notifyError(ErrorNotification packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_ERROR_NOTIFICATION, txPayload); + } + + /** + * Send an Instrument Directory packet (1.2.7). + * + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + public void instrumentDirectory(InstrumentDirectory packet) throws IOException { + txPayload.clear(); + packet.put(txPayload); + txPayload.flip(); + + send(MESSAGE_TYPE_INSTRUMENT_DIRECTORY, txPayload); + } + + @Override + protected void heartbeatTimeout() throws IOException { + listener.heartbeatTimeout(this); + } + + @Override + protected void packet(byte messageType, ByteBuffer packet) throws IOException { + switch (messageType) { + case MESSAGE_TYPE_LOGIN_REQUEST: + loginRequest.get(packet); + listener.loginRequest(this, loginRequest); + break; + case MESSAGE_TYPE_LOGOUT_REQUEST: + listener.logoutRequest(this); + break; + case MESSAGE_TYPE_CLIENT_HEARTBEAT: + break; + case MESSAGE_TYPE_MARKET_SNAPSHOT_REQUEST: + marketSnapshotRequest.get(packet); + listener.marketSnapshotRequest(this, marketSnapshotRequest); + break; + case MESSAGE_TYPE_TICKER_SUBSCRIBE_REQUEST: + tickerSubscribeRequest.get(packet); + listener.tickerSubscribeRequest(this, tickerSubscribeRequest); + break; + case MESSAGE_TYPE_TICKER_UNSUBSCRIBE_REQUEST: + tickerUnsubscribeRequest.get(packet); + listener.tickerUnsubscribeRequest(this, tickerUnsubscribeRequest); + break; + case MESSAGE_TYPE_MARKET_DATA_SUBSCRIBE_REQUEST: + marketDataSubscribeRequest.get(packet); + listener.marketDataSubscribeRequest(this, marketDataSubscribeRequest); + break; + case MESSAGE_TYPE_MARKET_DATA_UNSUBSCRIBE_REQUEST: + marketDataUnsubscribeRequest.get(packet); + listener.marketDataUnsubscribeRequest(this, marketDataUnsubscribeRequest); + break; + case MESSAGE_TYPE_INSTRUMENT_DIRECTORY_REQUEST: + listener.instrumentDirectoryRequest(this); + } + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHServerListener.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHServerListener.java new file mode 100644 index 0000000..0296cfa --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHServerListener.java @@ -0,0 +1,90 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.ITCH.*; + +import java.io.IOException; + +/** + * The interface for inbound events on the server side. + */ +public interface ITCHServerListener { + + /** + * Receive an indication of a heartbeat timeout. + * + * @param session the session + * @throws IOException if an I/O error occurs + */ + void heartbeatTimeout(ITCHServer session) throws IOException; + + /** + * Receive a Login Request packet (1.3.1). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void loginRequest(ITCHServer session, LoginRequest packet) throws IOException; + + /** + * Receive a LogoutRequest packet (1.3.2). + * + * @param session the session + * @throws IOException if an I/O error occurs + */ + void logoutRequest(ITCHServer session) throws IOException; + + /** + * Receive a Market Snapshot Request packet (1.3.4). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void marketSnapshotRequest(ITCHServer session, MarketSnapshotRequest packet) throws IOException; + + /** + * Receive a Ticker Subscribe Request packet (1.3.5). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void tickerSubscribeRequest(ITCHServer session, TickerSubscribeRequest packet) throws IOException; + + /** + * Receive an Ticker Unsubscribe Request packet (1.3.6). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void tickerUnsubscribeRequest(ITCHServer session, TickerUnsubscribeRequest packet) throws IOException; + + /** + * Receive a Market Data Subscribe Request packet (1.3.7). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void marketDataSubscribeRequest(ITCHServer session, MarketDataSubscribeRequest packet) throws IOException; + + /** + * Receive a Market Data Unsubscribe Request packet (1.3.8). + * + * @param session the session + * @param packet the packet + * @throws IOException if an I/O error occurs + */ + void marketDataUnsubscribeRequest(ITCHServer session, MarketDataUnsubscribeRequest packet) throws IOException; + + /** + * Receive an Instrument Directory Request packet (1.3.9). + * + * @param session the session + * @throws IOException if an I/O error occurs + */ + void instrumentDirectoryRequest(ITCHServer session) throws IOException; + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHSession.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHSession.java new file mode 100644 index 0000000..92159be --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/ITCHSession.java @@ -0,0 +1,235 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.ITCH.*; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * The base for both the client and server side of ITCH Session Management + * Protocol. + */ +public abstract class ITCHSession implements Closeable { + + private static final long RX_HEARTBEAT_TIMEOUT_MILLIS = 15000; + private static final long TX_HEARTBEAT_INTERVAL_MILLIS = 1000; + + private static final byte TLF = 0x0A; + + private Clock clock; + + private SocketChannel channel; + + /* + * This variable is written on data reception and read on session + * keep-alive. These two functions can run on different threads + * without locking. + */ + private volatile long lastRxMillis; + + /* + * This variable is written on data transmission and read on session + * keep-alive. These two functions can run on different threads but + * require locking. + */ + private long lastTxMillis; + + private ByteBuffer rxBuffer; + + private ByteBuffer txHeader; + private ByteBuffer txTrailer; + + private ByteBuffer[] txBuffers; + + private byte heartbeatMessageType; + + protected ITCHSession(Clock clock, SocketChannel channel, int rxBufferCapacity, + byte heartbeatMessageType) { + this.clock = clock; + this.channel = channel; + + this.lastRxMillis = clock.currentTimeMillis(); + this.lastTxMillis = clock.currentTimeMillis(); + + this.rxBuffer = ByteBuffer.allocate(rxBufferCapacity); + + this.txHeader = ByteBuffer.allocate(1); + this.txTrailer = ByteBuffer.allocate(1); + + this.txTrailer.put(TLF); + + this.txBuffers = new ByteBuffer[3]; + + this.txBuffers[0] = txHeader; + this.txBuffers[2] = txTrailer; + + this.heartbeatMessageType = heartbeatMessageType; + } + + /** + * Get the underlying socket channel. + * + * @return the underlying socket channel + */ + public SocketChannel getChannel() { + return channel; + } + + /** + * Receive data from the underlying socket channel. For each packet + * received, invoke the corresponding listener if applicable. + * + * @return The number of bytes read, possibly zero, or -1 + * if the channel has reached end-of-stream + * @throws IOException if an I/O error occurs + */ + public int receive() throws IOException { + int bytes = channel.read(rxBuffer); + + if (bytes <= 0) + return bytes; + + rxBuffer.flip(); + + while (parse()); + + if (rxBuffer.limit() == rxBuffer.capacity()) + throw new ITCHException("Packet length exceeds buffer capacity"); + + rxBuffer.compact(); + + receivedData(); + + return bytes; + } + + private boolean parse() throws IOException { + if (rxBuffer.remaining() < 2) + return false; + + rxBuffer.mark(); + + byte messageType = rxBuffer.get(); + + int trailerIndex = trailerIndex(); + if (trailerIndex < 0) { + rxBuffer.reset(); + return false; + } + + int packetLength = trailerIndex - rxBuffer.position(); + + int limit = rxBuffer.limit(); + + rxBuffer.limit(rxBuffer.position() + packetLength); + + int position = rxBuffer.limit() + 1; + + packet(messageType, rxBuffer); + + rxBuffer.limit(limit); + rxBuffer.position(position); + + return true; + } + + private int trailerIndex() { + for (int i = rxBuffer.position(); i < rxBuffer.limit(); i++) { + if (rxBuffer.get(i) == TLF) + return i; + } + + return -1; + } + + /** + * Keep the session alive. + * + *

If the heartbeat interval duration has passed since the last packet + * was sent, send a Heartbeat packet. If the heartbeat timeout duration + * has passed since the last packet was received, invoke the corresponding + * method on the listener.

+ * + * @throws IOException if an I/O error occurs + */ + public void keepAlive() throws IOException { + long currentTimeMillis = clock.currentTimeMillis(); + + if (currentTimeMillis - lastTxMillis > TX_HEARTBEAT_INTERVAL_MILLIS) + send(heartbeatMessageType); + + if (currentTimeMillis - lastRxMillis > RX_HEARTBEAT_TIMEOUT_MILLIS) + handleHeartbeatTimeout(); + } + + /** + * Close the underlying socket channel. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + channel.close(); + } + + protected abstract void heartbeatTimeout() throws IOException; + + protected abstract void packet(byte messageType, ByteBuffer payload) throws IOException; + + protected void send(byte messageType) throws IOException { + txHeader.clear(); + txHeader.put(messageType); + txHeader.flip(); + + txBuffers[1] = txTrailer; + + txTrailer.flip(); + + int remaining = txHeader.remaining() + txTrailer.remaining(); + + do { + remaining -= channel.write(txBuffers, 0, 2); + } while (remaining > 0); + + sentData(); + } + + protected void send(byte messageType, ByteBuffer payload) throws IOException { + txHeader.clear(); + txHeader.put(messageType); + txHeader.flip(); + + txBuffers[1] = payload; + + txTrailer.flip(); + + int remaining = txHeader.remaining() + payload.remaining() + txTrailer.remaining(); + + do { + remaining -= channel.write(txBuffers); + } while (remaining > 0); + + sentData(); + } + + protected void unexpectedMessageType(byte messageType) throws ITCHException { + throw new ITCHException("Unexpected message type: " + (char)messageType); + } + + private void handleHeartbeatTimeout() throws IOException { + heartbeatTimeout(); + + receivedData(); + } + + private void receivedData() { + lastRxMillis = clock.currentTimeMillis(); + } + + private void sentData() { + lastTxMillis = clock.currentTimeMillis(); + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/SystemClock.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/SystemClock.java new file mode 100644 index 0000000..abd455d --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/SystemClock.java @@ -0,0 +1,15 @@ +package com.paritytrading.juncture.hotspot.itch; + +class SystemClock implements Clock { + + public static final SystemClock INSTANCE = new SystemClock(); + + private SystemClock() { + } + + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + +} diff --git a/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/package-info.java b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/package-info.java new file mode 100644 index 0000000..8f7b0ae --- /dev/null +++ b/juncture-hotspot/src/main/java/com/paritytrading/juncture/hotspot/itch/package-info.java @@ -0,0 +1,30 @@ +/** + * This package contains an implementation of Hotspot ITCH Protocol 1.59. + * + *

The implementation is based on the Java NIO API and consists of two + * parts:

+ * + * + *

The implementation of ITCH Session Management Protocol consists of + * three primary functions:

+ * + * + *

Data reception can run on one thread and data transmission and session + * keep-alive on another without locking. Data transmission and session + * keep-alive can run on different threads but require locking.

+ * + *

The underlying socket channels can be either blocking or non-blocking. + * In both cases, data transmission always blocks.

+ */ +package com.paritytrading.juncture.hotspot.itch; diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/FixedClock.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/FixedClock.java new file mode 100644 index 0000000..fa8aecc --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/FixedClock.java @@ -0,0 +1,16 @@ +package com.paritytrading.juncture.hotspot.itch; + +class FixedClock implements Clock { + + private long currentTimeMillis; + + @Override + public long currentTimeMillis() { + return currentTimeMillis; + } + + public void setCurrentTimeMillis(long currentTimeMillis) { + this.currentTimeMillis = currentTimeMillis; + } + +} diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/HotspotBookEvents.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/HotspotBookEvents.java new file mode 100644 index 0000000..3b81a2a --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/HotspotBookEvents.java @@ -0,0 +1,190 @@ +package com.paritytrading.juncture.hotspot.itch; + +import com.paritytrading.foundation.ASCII; +import java.util.ArrayList; +import java.util.List; +import org.jvirtanen.value.Value; + +class HotspotBookEvents implements HotspotBookListener { + + public static final int PRICE_DECIMALS = 4; + + private List events; + + public HotspotBookEvents() { + events = new ArrayList(); + } + + public List collect() { + return events; + } + + @Override + public void newOrder(HotspotBook.NewOrder message) { + byte buyOrSellIndicator = message.buyOrSellIndicator; + String currencyPair = ASCII.get(message.currencyPair); + String orderId = ASCII.get(message.orderId); + long price = ASCII.getFixed(message.price, PRICE_DECIMALS); + long amount = ASCII.getLong(message.amount); + long minqty = ASCII.getLong(message.minqty); + long lotsize = ASCII.getLong(message.lotsize); + + events.add(new NewOrder(buyOrSellIndicator, currencyPair, orderId, + price, amount, minqty, lotsize)); + } + + @Override + public void modifyOrder(HotspotBook.ModifyOrder message) { + String currencyPair = ASCII.get(message.currencyPair); + String orderId = ASCII.get(message.orderId); + long amount = ASCII.getLong(message.amount); + long minqty = ASCII.getLong(message.minqty); + long lotsize = ASCII.getLong(message.lotsize); + + events.add(new ModifyOrder(currencyPair, orderId, amount, + minqty, lotsize)); + } + + @Override + public void cancelOrder(HotspotBook.CancelOrder message) { + String currencyPair = ASCII.get(message.currencyPair); + String orderId = ASCII.get(message.orderId); + + events.add(new CancelOrder(currencyPair, orderId)); + } + + @Override + public void marketSnapshotStart() { + events.add(new MarketSnapshotStart()); + } + + @Override + public void marketSnapshotEntry(HotspotBook.MarketSnapshotEntry entry) { + String currencyPair = ASCII.get(entry.currencyPair); + byte buyOrSellIndicator = entry.buyOrSellIndicator; + long price = ASCII.getFixed(entry.price, PRICE_DECIMALS); + long amount = ASCII.getLong(entry.amount); + long minqty = ASCII.getLong(entry.minqty); + long lotsize = ASCII.getLong(entry.lotsize); + String orderId = ASCII.get(entry.orderId); + + events.add(new MarketSnapshotEntry(currencyPair, buyOrSellIndicator, + price, amount, minqty, lotsize, orderId)); + } + + @Override + public void marketSnapshotEnd() { + events.add(new MarketSnapshotEnd()); + } + + @Override + public void ticker(HotspotBook.Ticker message) { + byte aggressorBuyOrSellIndicator = message.aggressorBuyOrSellIndicator; + String currencyPair = ASCII.get(message.currencyPair); + long price = ASCII.getFixed(message.price, PRICE_DECIMALS); + long transactionDate = ASCII.getLong(message.transactionDate); + long transactionTime = ASCII.getLong(message.transactionTime); + + events.add(new Ticker(aggressorBuyOrSellIndicator, currencyPair, + price, transactionDate, transactionTime)); + } + + public interface Event { + } + + public static class NewOrder extends Value implements Event { + public byte buyOrSellIndicator; + public String currencyPair; + public String orderId; + public long price; + public long amount; + public long minqty; + public long lotsize; + + public NewOrder(byte buyOrSellIndicator, String currencyPair, + String orderId, long price, long amount, long minqty, + long lotsize) { + this.buyOrSellIndicator = buyOrSellIndicator; + this.currencyPair = currencyPair; + this.orderId = orderId; + this.price = price; + this.amount = amount; + this.minqty = minqty; + this.lotsize = lotsize; + } + } + + public static class ModifyOrder extends Value implements Event { + public String currencyPair; + public String orderId; + public long amount; + public long minqty; + public long lotsize; + + public ModifyOrder(String currencyPair, String orderId, long amount, + long minqty, long lotsize) { + this.currencyPair = currencyPair; + this.orderId = orderId; + this.amount = amount; + this.minqty = minqty; + this.lotsize = lotsize; + } + } + + public static class CancelOrder extends Value implements Event { + public String currencyPair; + public String orderId; + + public CancelOrder(String currencyPair, String orderId) { + this.currencyPair = currencyPair; + this.orderId = orderId; + } + } + + public static class MarketSnapshotStart extends Value implements Event { + } + + public static class MarketSnapshotEntry extends Value implements Event { + public String currencyPair; + public byte buyOrSellIndicator; + public long price; + public long amount; + public long minqty; + public long lotsize; + public String orderId; + + public MarketSnapshotEntry(String currencyPair, byte buyOrSellIndicator, + long price, long amount, long minqty, long lotsize, + String orderId) { + this.currencyPair = currencyPair; + this.buyOrSellIndicator = buyOrSellIndicator; + this.price = price; + this.amount = amount; + this.minqty = minqty; + this.lotsize = lotsize; + this.orderId = orderId; + } + } + + public static class MarketSnapshotEnd extends Value implements Event { + } + + public static class Ticker extends Value implements Event { + public final byte aggressorBuyOrSellIndicator; + public final String currencyPair; + public final long price; + public final long transactionDate; + public final long transactionTime; + + public Ticker(byte aggressorBuyOrSellIndicator, String currencyPair, + long price, long transactionDate, long transactionTime) { + this.aggressorBuyOrSellIndicator = aggressorBuyOrSellIndicator; + this.currencyPair = currencyPair; + this.price = price; + this.transactionDate = transactionDate; + this.transactionTime = transactionTime; + } + } + + +} diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/HotspotBookTest.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/HotspotBookTest.java new file mode 100644 index 0000000..0c6626a --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/HotspotBookTest.java @@ -0,0 +1,301 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.HotspotBookEvents.*; +import static java.util.Arrays.*; +import static org.junit.Assert.*; + +import com.paritytrading.foundation.ASCII; +import java.nio.ByteBuffer; +import org.junit.Before; +import org.junit.Test; + +public class HotspotBookTest { + + private static final String SNAPSHOT_EMPTY = "" + + "S" + // Message Type + " 0"; // Length of Message + + private static final String SNAPSHOT_BID = "" + + "S" + // Message Type + " 96" + // Length of Message + " 1" + // Number of Currency Pairs + "FOO/BAR" + // Currency Pair + " 1" + // Number of Bid Prices + "0.9500 " + // Bid Price + " 1" + // Number of Bid Orders + "100 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "100 " + // Order ID + " 0"; // Number of Offer Prices + + private static final String SNAPSHOT_OFFER = "" + + "S" + // Message Type + " 96" + // Length of Message + " 1" + // Number of Currency Pairs + "FOO/BAR" + // Currency Pair + " 0" + // Number of Bid Prices + " 1" + // Number of Offer Prices + "1.0500 " + // Offer Price + " 1" + // Number of Offer Orders + "100 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "200 "; // Order ID + + private static final String SNAPSHOT_BID_OFFER = "" + + "S" + // Message Type + " 173" + // Length of Message + " 1" + // Number of Currency Pairs + "FOO/BAR" + // Currency Pair + " 1" + // Number of Bid Prices + "0.9500 " + // Bid Price + " 1" + // Number Of Bid Orders + "100 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "100 " + // Order ID + " 1" + // Number of Offer Prices + "1.0500 " + // Offer Price + " 1" + // Number of Offer Orders + "100 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "200 "; // Order ID + + private static final String SNAPSHOT_BID_BID_2 = "" + + "S" + // Message Type + " 159" + // Length of Message + " 1" + // Number of Currency Pairs + "FOO/BAR" + // Currency Pair + " 1" + // Number of Bid Prices + "0.9500 " + // Bid Price + " 2" + // Number of Bid Orders + "100 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "100 " + // Order ID + "50 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "101 " + // Order ID + " 0"; // Number of Offer Prices + + private static final String SNAPSHOT_OFFER_OFFER_2 = "" + + "S" + // Message Type + " 173" + // Length of Message + " 1" + // Number of Currency Pairs + "FOO/BAR" + // Currency Pair + " 0" + // Number of Bid Prices + " 2" + // Number of Offer Prices + "1.0500 " + // Offer Price + " 1" + // Number of Offer Orders + "100 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "200 " + // Order ID + "1.1000 " + // Offer Price + " 1" + // Number of Offer Orders + "50 " + // Amount + "0 " + // Minqty + "0 " + // Lotsize + "201 "; // Order ID + + private static final Event START = new MarketSnapshotStart(); + + private static final Event END = new MarketSnapshotEnd(); + + private static final MarketSnapshotEntry BID = new MarketSnapshotEntry( + "FOO/BAR", + HotspotBook.BUY, + 9500, + 100, + 0, + 0, + "100 " + ); + + private static final MarketSnapshotEntry BID_2 = new MarketSnapshotEntry( + "FOO/BAR", + HotspotBook.BUY, + 9500, + 50, + 0, + 0, + "101 " + ); + + private static final MarketSnapshotEntry OFFER = new MarketSnapshotEntry( + "FOO/BAR", + HotspotBook.SELL, + 10500, + 100, + 0, + 0, + "200 " + ); + + private static final MarketSnapshotEntry OFFER_2 = new MarketSnapshotEntry( + "FOO/BAR", + HotspotBook.SELL, + 11000, + 50, + 0, + 0, + "201 " + ); + + private ByteBuffer buffer; + + private HotspotBookEvents events; + + private HotspotBookFormatter formatter; + private HotspotBookParser parser; + + @Before + public void setUp() { + buffer = ByteBuffer.allocate(1024); + + events = new HotspotBookEvents(); + + formatter = new HotspotBookFormatter(); + parser = new HotspotBookParser(events); + } + + @Test + public void formatEmptySnapshot() { + formatter.marketSnapshotStart(buffer); + formatter.marketSnapshotEnd(buffer); + + buffer.flip(); + + assertEquals(SNAPSHOT_EMPTY, remaining(buffer)); + } + + @Test + public void parseEmptySnapshot() throws Exception { + parser.parse(wrap(SNAPSHOT_EMPTY)); + + assertEquals(asList(START, END), events.collect()); + } + + @Test + public void formatSnapshotWithBid() { + formatter.marketSnapshotStart(buffer); + formatter.marketSnapshotEntry(buffer, entry(BID)); + formatter.marketSnapshotEnd(buffer); + + buffer.flip(); + + assertEquals(SNAPSHOT_BID, remaining(buffer)); + } + + @Test + public void parseSnapshotWithBid() throws Exception { + parser.parse(wrap(SNAPSHOT_BID)); + + assertEquals(asList(START, BID, END), events.collect()); + } + + @Test + public void formatSnapshotWithOffer() { + formatter.marketSnapshotStart(buffer); + formatter.marketSnapshotEntry(buffer, entry(OFFER)); + formatter.marketSnapshotEnd(buffer); + + buffer.flip(); + + assertEquals(SNAPSHOT_OFFER, remaining(buffer)); + } + + @Test + public void parseSnapshotWithOffer() throws Exception { + parser.parse(wrap(SNAPSHOT_OFFER)); + + assertEquals(asList(START, OFFER, END), events.collect()); + } + + @Test + public void formatSnapshotWithBidAndOffer() { + formatter.marketSnapshotStart(buffer); + formatter.marketSnapshotEntry(buffer, entry(BID)); + formatter.marketSnapshotEntry(buffer, entry(OFFER)); + formatter.marketSnapshotEnd(buffer); + + buffer.flip(); + + assertEquals(SNAPSHOT_BID_OFFER, remaining(buffer)); + } + + @Test + public void parseSnapshotWithBidAndOffer() throws Exception { + parser.parse(wrap(SNAPSHOT_BID_OFFER)); + + assertEquals(asList(START, BID, OFFER, END), events.collect()); + } + + @Test + public void formatSnapshotWithBidAndBid2() { + formatter.marketSnapshotStart(buffer); + formatter.marketSnapshotEntry(buffer, entry(BID)); + formatter.marketSnapshotEntry(buffer, entry(BID_2)); + formatter.marketSnapshotEnd(buffer); + + buffer.flip(); + + assertEquals(SNAPSHOT_BID_BID_2, remaining(buffer)); + } + + @Test + public void parseSnapshotWithBidAndBid2() throws Exception { + parser.parse(wrap(SNAPSHOT_BID_BID_2)); + + assertEquals(asList(START, BID, BID_2, END), events.collect()); + } + + @Test + public void formatSnapshotWithOfferAndOffer2() { + formatter.marketSnapshotStart(buffer); + formatter.marketSnapshotEntry(buffer, entry(OFFER)); + formatter.marketSnapshotEntry(buffer, entry(OFFER_2)); + formatter.marketSnapshotEnd(buffer); + + buffer.flip(); + + assertEquals(SNAPSHOT_OFFER_OFFER_2, remaining(buffer)); + } + + @Test + public void parseSnapshotWithOfferAndOffer2() throws Exception { + parser.parse(wrap(SNAPSHOT_OFFER_OFFER_2)); + + assertEquals(asList(START, OFFER, OFFER_2, END), events.collect()); + } + + private ByteBuffer wrap(String string) { + return ByteBuffer.wrap(ASCII.put(string)); + } + + private String remaining(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + + buffer.get(bytes); + + return ASCII.get(bytes); + } + + private HotspotBook.MarketSnapshotEntry entry(MarketSnapshotEntry event) { + HotspotBook.MarketSnapshotEntry entry = new HotspotBook.MarketSnapshotEntry(); + + ASCII.putLeft(entry.currencyPair, event.currencyPair); + entry.buyOrSellIndicator = event.buyOrSellIndicator; + ASCII.putFixedLeft(entry.price, event.price, PRICE_DECIMALS); + ASCII.putLongLeft(entry.amount, event.amount); + ASCII.putLongLeft(entry.minqty, event.minqty); + ASCII.putLongLeft(entry.lotsize, event.lotsize); + ASCII.putLeft(entry.orderId, event.orderId); + + return entry; + } + +} diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHClientEvents.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHClientEvents.java new file mode 100644 index 0000000..0bc0a24 --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHClientEvents.java @@ -0,0 +1,119 @@ +package com.paritytrading.juncture.hotspot.itch; + +import com.paritytrading.juncture.hotspot.itch.ITCHSessionEvents.*; + +import com.paritytrading.foundation.ASCII; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.jvirtanen.value.Value; + +class ITCHClientEvents implements ITCHClientListener { + + private List events; + + public ITCHClientEvents() { + this.events = new ArrayList<>(); + } + + public List collect() { + return events; + } + + @Override + public void heartbeatTimeout(ITCHClient session) { + events.add(new HeartbeatTimeout()); + } + + @Override + public void loginAccepted(ITCHClient session, ITCH.LoginAccepted packet) { + long sequenceNumber = ASCII.getLong(packet.sequenceNumber); + + events.add(new LoginAccepted(sequenceNumber)); + } + + @Override + public void loginRejected(ITCHClient session, ITCH.LoginRejected packet) { + String reason = ASCII.get(packet.reason); + + events.add(new LoginRejected(reason)); + } + + @Override + public void sequencedData(ITCHClient session, ITCH.SequencedData header, ByteBuffer payload) { + String time = ASCII.get(header.time); + + events.add(new SequencedData(time)); + } + + @Override + public void endOfSession(ITCHClient session) { + events.add(new EndOfSession()); + } + + @Override + public void errorNotification(ITCHClient session, ITCH.ErrorNotification packet) { + String errorExplanation = ASCII.get(packet.errorExplanation); + + events.add(new ErrorNotification(errorExplanation)); + } + + @Override + public void instrumentDirectory(ITCHClient session, ITCH.InstrumentDirectory packet) { + long numberOfCurrencyPairs = ASCII.getLong(packet.numberOfCurrencyPairs); + + List currencyPairs = new ArrayList<>(); + + for (int i = 0; i < numberOfCurrencyPairs; i++) + currencyPairs.add(ASCII.get(packet.currencyPair[i])); + + events.add(new InstrumentDirectory(currencyPairs)); + } + + public interface Event { + } + + public static class LoginAccepted extends Value implements Event { + public final long sequenceNumber; + + public LoginAccepted(long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + } + + public static class LoginRejected extends Value implements Event { + public final String reason; + + public LoginRejected(String reason) { + this.reason = reason; + } + } + + public static class SequencedData extends Value implements Event { + public final String time; + + public SequencedData(String time) { + this.time = time; + } + } + + public static class EndOfSession extends Value implements Event { + } + + public static class ErrorNotification extends Value implements Event { + public final String errorExplanation; + + public ErrorNotification(String errorExplanation) { + this.errorExplanation = errorExplanation; + } + } + + public static class InstrumentDirectory extends Value implements Event { + public final List currencyPairs; + + public InstrumentDirectory(List currencyPairs) { + this.currencyPairs = currencyPairs; + } + } + +} diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHServerEvents.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHServerEvents.java new file mode 100644 index 0000000..8c0c2d1 --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHServerEvents.java @@ -0,0 +1,147 @@ +package com.paritytrading.juncture.hotspot.itch; + +import com.paritytrading.juncture.hotspot.itch.ITCHSessionEvents.*; + +import com.paritytrading.foundation.ASCII; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.jvirtanen.value.Value; + +class ITCHServerEvents implements ITCHServerListener { + + private List events; + + public ITCHServerEvents() { + this.events = new ArrayList<>(); + } + + public List collect() { + return events; + } + + @Override + public void heartbeatTimeout(ITCHServer session) { + events.add(new HeartbeatTimeout()); + } + + @Override + public void loginRequest(ITCHServer session, ITCH.LoginRequest packet) { + String loginName = ASCII.get(packet.loginName); + String password = ASCII.get(packet.password); + byte marketDataUnsubscribe = packet.marketDataUnsubscribe; + long reserved = ASCII.getLong(packet.reserved); + + events.add(new LoginRequest(loginName, password, marketDataUnsubscribe, reserved)); + } + + @Override + public void logoutRequest(ITCHServer session) { + events.add(new LogoutRequest()); + } + + @Override + public void marketSnapshotRequest(ITCHServer session, ITCH.MarketSnapshotRequest packet) { + String currencyPair = ASCII.get(packet.currencyPair); + + events.add(new MarketSnapshotRequest(currencyPair)); + } + + @Override + public void tickerSubscribeRequest(ITCHServer session, ITCH.TickerSubscribeRequest packet) { + String currencyPair = ASCII.get(packet.currencyPair); + + events.add(new TickerSubscribeRequest(currencyPair)); + } + + @Override + public void tickerUnsubscribeRequest(ITCHServer session, ITCH.TickerUnsubscribeRequest packet) { + String currencyPair = ASCII.get(packet.currencyPair); + + events.add(new TickerUnsubscribeRequest(currencyPair)); + } + + @Override + public void marketDataSubscribeRequest(ITCHServer session, ITCH.MarketDataSubscribeRequest packet) { + String currencyPair = ASCII.get(packet.currencyPair); + + events.add(new MarketDataSubscribeRequest(currencyPair)); + } + + @Override + public void marketDataUnsubscribeRequest(ITCHServer session, ITCH.MarketDataUnsubscribeRequest packet) { + String currencyPair = ASCII.get(packet.currencyPair); + + events.add(new MarketDataUnsubscribeRequest(currencyPair)); + } + + @Override + public void instrumentDirectoryRequest(ITCHServer session) { + events.add(new InstrumentDirectoryRequest()); + } + + public interface Event { + } + + public static class LoginRequest extends Value implements Event { + public final String loginName; + public final String password; + public final byte marketDataUnsubscribe; + public final long reserved; + + public LoginRequest(String loginName, String password, byte marketDataUnsubscribe, + long reserved) { + this.loginName = loginName; + this.password = password; + this.marketDataUnsubscribe = marketDataUnsubscribe; + this.reserved = reserved; + } + } + + public static class LogoutRequest extends Value implements Event { + } + + public static class MarketSnapshotRequest extends Value implements Event { + public final String currencyPair; + + public MarketSnapshotRequest(String currencyPair) { + this.currencyPair = currencyPair; + } + } + + public static class TickerSubscribeRequest extends Value implements Event { + public final String currencyPair; + + public TickerSubscribeRequest(String currencyPair) { + this.currencyPair = currencyPair; + } + } + + public static class TickerUnsubscribeRequest extends Value implements Event { + public final String currencyPair; + + public TickerUnsubscribeRequest(String currencyPair) { + this.currencyPair = currencyPair; + } + } + + public static class MarketDataSubscribeRequest extends Value implements Event { + public final String currencyPair; + + public MarketDataSubscribeRequest(String currencyPair) { + this.currencyPair = currencyPair; + } + } + + public static class MarketDataUnsubscribeRequest extends Value implements Event { + public final String currencyPair; + + public MarketDataUnsubscribeRequest(String currencyPair) { + this.currencyPair = currencyPair; + } + } + + public static class InstrumentDirectoryRequest extends Value implements Event { + } + +} diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHSessionEvents.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHSessionEvents.java new file mode 100644 index 0000000..6696d93 --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHSessionEvents.java @@ -0,0 +1,11 @@ +package com.paritytrading.juncture.hotspot.itch; + +import org.jvirtanen.value.Value; + +class ITCHSessionEvents { + + public static class HeartbeatTimeout extends Value + implements ITCHClientEvents.Event, ITCHServerEvents.Event { + } + +} diff --git a/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHSessionTest.java b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHSessionTest.java new file mode 100644 index 0000000..99c01fc --- /dev/null +++ b/juncture-hotspot/src/test/java/com/paritytrading/juncture/hotspot/itch/ITCHSessionTest.java @@ -0,0 +1,275 @@ +package com.paritytrading.juncture.hotspot.itch; + +import static com.paritytrading.juncture.hotspot.itch.ITCHClientEvents.*; +import static com.paritytrading.juncture.hotspot.itch.ITCHServerEvents.*; +import static com.paritytrading.juncture.hotspot.itch.ITCHSessionEvents.*; +import static java.util.Arrays.*; +import static org.junit.Assert.*; + +import com.paritytrading.foundation.ASCII; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class ITCHSessionTest { + + private static final int RX_BUFFER_CAPACITY = 1024; + + @Rule + public Timeout timeout = new Timeout(1000); + + private ITCH.LoginAccepted loginAccepted; + private ITCH.LoginRejected loginRejected; + private ITCH.ErrorNotification errorNotification; + private ITCH.InstrumentDirectory instrumentDirectory; + + private ITCH.LoginRequest loginRequest; + private ITCH.MarketSnapshotRequest marketSnapshotRequest; + private ITCH.TickerSubscribeRequest tickerSubscribeRequest; + private ITCH.TickerUnsubscribeRequest tickerUnsubscribeRequest; + private ITCH.MarketDataSubscribeRequest marketDataSubscribeRequest; + private ITCH.MarketDataUnsubscribeRequest marketDataUnsubscribeRequest; + + private FixedClock clock; + + private ITCHClientEvents clientEvents; + private ITCHServerEvents serverEvents; + + private ITCHClient client; + private ITCHServer server; + + @Before + public void setUp() throws Exception { + loginAccepted = new ITCH.LoginAccepted(); + loginRejected = new ITCH.LoginRejected(); + errorNotification = new ITCH.ErrorNotification(); + instrumentDirectory = new ITCH.InstrumentDirectory(); + + loginRequest = new ITCH.LoginRequest(); + marketSnapshotRequest = new ITCH.MarketSnapshotRequest(); + tickerSubscribeRequest = new ITCH.TickerSubscribeRequest(); + tickerUnsubscribeRequest = new ITCH.TickerUnsubscribeRequest(); + marketDataSubscribeRequest = new ITCH.MarketDataSubscribeRequest(); + marketDataUnsubscribeRequest = new ITCH.MarketDataUnsubscribeRequest(); + + clock = new FixedClock(); + + ServerSocketChannel acceptor = ServerSocketChannel.open(); + acceptor.bind(null); + + SocketChannel clientChannel = SocketChannel.open(); + clientChannel.connect(acceptor.getLocalAddress()); + + SocketChannel serverChannel = acceptor.accept(); + acceptor.close(); + + clientEvents = new ITCHClientEvents(); + serverEvents = new ITCHServerEvents(); + + client = new ITCHClient(clock, clientChannel, RX_BUFFER_CAPACITY, clientEvents); + server = new ITCHServer(clock, serverChannel, RX_BUFFER_CAPACITY, serverEvents); + } + + @After + public void tearDown() throws Exception { + client.close(); + server.close(); + } + + @Test + public void loginAccepted() throws Exception { + ASCII.putLongRight(loginAccepted.sequenceNumber, 1); + + server.accept(loginAccepted); + + while (clientEvents.collect().size() != 1) + client.receive(); + + assertEquals(asList(new LoginAccepted(1)), clientEvents.collect()); + } + + @Test + public void loginRejected() throws Exception { + ASCII.putLeft(loginRejected.reason, "foo"); + + server.reject(loginRejected); + + while (clientEvents.collect().size() != 1) + client.receive(); + + assertEquals(asList(new LoginRejected("foo ")), + clientEvents.collect()); + } + + /* Sequenced Data */ + + @Test + public void endOfSession() throws Exception { + server.endSession(); + + while (clientEvents.collect().size() != 1) + client.receive(); + + assertEquals(asList(new EndOfSession()), clientEvents.collect()); + } + + @Test + public void errorNotification() throws Exception { + ASCII.putLeft(errorNotification.errorExplanation, "foo"); + + server.notifyError(errorNotification); + + while (clientEvents.collect().size() != 1) + client.receive(); + + assertEquals(asList(new ErrorNotification( + "foo " + + " ")), + clientEvents.collect()); + } + + @Test + public void instrumentDirectory() throws Exception { + ASCII.putLongRight(instrumentDirectory.numberOfCurrencyPairs, 3); + ASCII.putLeft(instrumentDirectory.currencyPair[0], "FOO/BAR"); + ASCII.putLeft(instrumentDirectory.currencyPair[1], "BAR/BAZ"); + ASCII.putLeft(instrumentDirectory.currencyPair[2], "BAZ/FOO"); + + server.instrumentDirectory(instrumentDirectory); + + while (clientEvents.collect().size() != 1) + client.receive(); + + assertEquals(asList(new InstrumentDirectory(asList("FOO/BAR", "BAR/BAZ", "BAZ/FOO"))), + clientEvents.collect()); + } + + @Test + public void loginRequest() throws Exception { + ASCII.putLeft(loginRequest.loginName, "foo"); + ASCII.putLeft(loginRequest.password, "bar"); + loginRequest.marketDataUnsubscribe = ITCH.TRUE; + ASCII.putLongRight(loginRequest.reserved, 0); + + client.login(loginRequest); + + while (serverEvents.collect().size() != 1) + server.receive(); + + assertEquals(asList(new LoginRequest( + "foo ", + "bar ", + ITCH.TRUE, 0)), + serverEvents.collect()); + } + + @Test + public void logoutRequest() throws Exception { + client.logout(); + + while (serverEvents.collect().size() != 1) + server.receive(); + + assertEquals(asList(new LogoutRequest()), serverEvents.collect()); + } + + @Test + public void marketSnapshotRequest() throws Exception { + ASCII.putLeft(marketSnapshotRequest.currencyPair, "FOO/BAR"); + + client.request(marketSnapshotRequest); + + while (serverEvents.collect().size() != 1) + server.receive(); + + assertEquals(asList(new MarketSnapshotRequest("FOO/BAR")), + serverEvents.collect()); + } + + @Test + public void tickerSubscription() throws Exception { + ASCII.putLeft(tickerSubscribeRequest.currencyPair, "FOO/BAR"); + ASCII.putLeft(tickerUnsubscribeRequest.currencyPair, "FOO/BAR"); + + client.request(tickerSubscribeRequest); + client.request(tickerUnsubscribeRequest); + + while (serverEvents.collect().size() != 2) + server.receive(); + + assertEquals(asList(new TickerSubscribeRequest("FOO/BAR"), + new TickerUnsubscribeRequest("FOO/BAR")), + serverEvents.collect()); + } + + @Test + public void marketDataSubscription() throws Exception { + ASCII.putLeft(marketDataSubscribeRequest.currencyPair, "FOO/BAR"); + ASCII.putLeft(marketDataUnsubscribeRequest.currencyPair, "FOO/BAR"); + + client.request(marketDataSubscribeRequest); + client.request(marketDataUnsubscribeRequest); + + while (serverEvents.collect().size() != 2) + server.receive(); + + assertEquals(asList(new MarketDataSubscribeRequest("FOO/BAR"), + new MarketDataUnsubscribeRequest("FOO/BAR")), + serverEvents.collect()); + } + + @Test + public void instrumentDirectoryRequest() throws Exception { + client.requestInstrumentDirectory(); + + while (serverEvents.collect().size() != 1) + server.receive(); + + assertEquals(asList(new InstrumentDirectoryRequest()), serverEvents.collect()); + } + + @Test + public void serverKeepAlive() throws Exception { + clock.setCurrentTimeMillis(1500); + + client.keepAlive(); + server.keepAlive(); + + server.receive(); + + clock.setCurrentTimeMillis(15500); + + server.keepAlive(); + + clock.setCurrentTimeMillis(16750); + + server.keepAlive(); + + assertEquals(asList(new HeartbeatTimeout()), serverEvents.collect()); + } + + @Test + public void clientKeepAlive() throws Exception { + clock.setCurrentTimeMillis(1500); + + client.keepAlive(); + server.keepAlive(); + + client.receive(); + + clock.setCurrentTimeMillis(15500); + + client.keepAlive(); + + clock.setCurrentTimeMillis(16750); + + client.keepAlive(); + + assertEquals(asList(new HeartbeatTimeout()), clientEvents.collect()); + } + +} diff --git a/pom.xml b/pom.xml index ffd2722..62a3644 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ + juncture-hotspot juncture-nasdaq @@ -60,6 +61,16 @@ nassau 0.9.0 + + junit + junit + 4.12 + + + org.jvirtanen.value + value + 0.1.0 +