Skip to content

Commit

Permalink
[TH2-5026] Improve logging. (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
isengrims authored Aug 14, 2023
1 parent f97ed1e commit 0fb3114
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 36 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (0.3.0)
# th2-conn-dirty-fix (0.3.1)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -325,6 +325,9 @@ spec:
```

# Changelog
### 0.3.1
* Improve logging: log session group and session alias for each log message.

## 0.3.0
* Ability to recover messages from cradle.

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release_version=0.3.0
release_version=0.3.1
jackson_version=2.11.2
86 changes: 52 additions & 34 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void onStart() {
channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE);
if(settings.isLoadSequencesFromCradle()) {
SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias());
LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq());
info("Loaded sequences are: client - %d, server - %d", sequences.getClientSeq(), sequences.getServerSeq());
msgSeqNum.set(sequences.getClientSeq());
serverMsgSeqNum.set(sequences.getServerSeq());
}
Expand All @@ -256,11 +256,11 @@ public CompletableFuture<MessageID> send(@NotNull RawMessage rawMessage) {
}

while (channel.isOpen() && !enabled.get()) {
if (LOGGER.isWarnEnabled()) LOGGER.warn("Session is not yet logged in: {}", channel.getSessionAlias());
warn("Session is not yet logged in");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping.");
error("Error while sleeping.", null);
}
}

Expand Down Expand Up @@ -325,14 +325,14 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
FixField msgSeqNumValue = findField(message, MSG_SEQ_NUM_TAG);
if (msgSeqNumValue == null) {
metadata.put(REJECT_REASON, "No msgSeqNum Field");
if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgSeqNum in message: {}", message.toString(US_ASCII));
if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgSeqNum in message: %s", null, message.toString(US_ASCII));
return metadata;
}

FixField msgType = findField(message, MSG_TYPE_TAG);
if (msgType == null) {
metadata.put(REJECT_REASON, "No msgType Field");
if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgType in message: {}", message.toString(US_ASCII));
if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgType in message: %s", null, message.toString(US_ASCII));
return metadata;
}

Expand All @@ -355,7 +355,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
sendLogout();
reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS);
metadata.put(REJECT_REASON, "SeqNum is less than expected.");
if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get(), message.toString(US_ASCII));
error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII));
return metadata;
}

Expand All @@ -368,18 +368,18 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu

switch (msgTypeValue) {
case MSG_TYPE_HEARTBEAT:
if (LOGGER.isInfoEnabled()) LOGGER.info("Heartbeat received - {}", message.toString(US_ASCII));
if(LOGGER.isInfoEnabled()) info("Heartbeat received - %s", message.toString(US_ASCII));
checkHeartbeat(message);
break;
case MSG_TYPE_LOGON:
if (LOGGER.isInfoEnabled()) LOGGER.info("Logon received - {}", message.toString(US_ASCII));
if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII));
boolean connectionSuccessful = checkLogon(message);
if (connectionSuccessful) {
if(settings.useNextExpectedSeqNum()) {
FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG);
if(nextExpectedSeqField == null) {
metadata.put(REJECT_REASON, "No NextExpectedSeqNum field");
if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No NextExpectedSeqNum in message: {}", message.toString(US_ASCII));
if(LOGGER.isErrorEnabled()) error("Invalid message. No NextExpectedSeqNum in message: %s", null, message.toString(US_ASCII));
return metadata;
}

Expand Down Expand Up @@ -413,15 +413,13 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS);
}
break;
case MSG_TYPE_LOGOUT: //extract logout reason
handleLogout(message);
break;
//extract logout reason
case MSG_TYPE_RESEND_REQUEST:
if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII));
if(LOGGER.isInfoEnabled()) info("Resend request received - %s", message.toString(US_ASCII));
handleResendRequest(message);
break;
case MSG_TYPE_SEQUENCE_RESET: //gap fill
if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII));
if(LOGGER.isInfoEnabled()) info("Sequence reset received - %s", message.toString(US_ASCII));
resetSequence(message);
break;
}
Expand All @@ -434,15 +432,15 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
}

private void handleLogout(@NotNull ByteBuf message) {
if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII));
if(LOGGER.isInfoEnabled()) info("Logout received - %s", message.toString(US_ASCII));
FixField sessionStatus = findField(message, SESSION_STATUS_TAG);
boolean isSequenceChanged = false;
if(sessionStatus != null) {
int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue()));
if(statusCode != SUCCESSFUL_LOGOUT_CODE) {
FixField text = findField(message, TEXT_TAG);
if (text != null) {
LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue());
warn("Received Logout has text (58) tag: %s", text.getValue());
String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received");
if (wrongClientSequence != null) {
msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1);
Expand Down Expand Up @@ -477,7 +475,7 @@ private void resetSequence(ByteBuf message) {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1);
}
} else {
LOGGER.trace("Failed to reset servers MsgSeqNum. No such tag in message: {}", message.toString(US_ASCII));
if(LOGGER.isWarnEnabled()) warn("Failed to reset servers MsgSeqNum. No such tag in message: %s", message.toString(US_ASCII));
}
}

Expand Down Expand Up @@ -542,7 +540,7 @@ private void recovery(int beginSeqNo, int endSeqNo) {
}

int endSeq = endSeqNo;
LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo);
info("Loading messages from %d to %d", beginSeqNo, endSeqNo);
if(settings.isLoadMissedMessagesFromCradle()) {
Function1<ByteBuf, Boolean> processMessage = (buf) -> {
FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG);
Expand Down Expand Up @@ -606,7 +604,7 @@ private void recovery(int beginSeqNo, int endSeqNo) {
resetHeartbeatTask();

} catch (Exception e) {
LOGGER.error("Error while loading messages for recovery", e);
error("Error while loading messages for recovery", e);
String seqReset =
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString();
channel.send(
Expand Down Expand Up @@ -663,9 +661,7 @@ private boolean checkLogon(ByteBuf message) {
public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map<String, String> metadata) {
onOutgoingUpdateTag(message, metadata);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Outgoing message: {}", message.toString(US_ASCII));
}
if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII));
}

public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, String> metadata) {
Expand All @@ -688,9 +684,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, S
FixField msgType = findField(message, MSG_TYPE_TAG, US_ASCII, bodyLength);

if (msgType == null) { //should we interrupt sending message?
if (LOGGER.isErrorEnabled()) {
LOGGER.error("No msgType in message {}", message.toString(US_ASCII));
}
if(LOGGER.isErrorEnabled()) error("No msgType in message %s", null, message.toString(US_ASCII));

if (metadata.get("MsgType") != null) {
msgType = bodyLength.insertNext(MSG_TYPE_TAG, metadata.get("MsgType"));
Expand Down Expand Up @@ -785,7 +779,7 @@ public void sendHeartbeat() {
setChecksumAndBodyLength(heartbeat);

if (enabled.get()) {
LOGGER.info("Send Heartbeat to server - {}", heartbeat);
info("Send Heartbeat to server - %s", heartbeat);
channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
resetHeartbeatTask();

Expand All @@ -801,7 +795,7 @@ public void sendTestRequest() { //do private
setChecksumAndBodyLength(testRequest);
if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
LOGGER.info("Send TestRequest to server - {}", testRequest);
info("Send TestRequest to server - %s", testRequest);
resetTestRequestTask();
resetHeartbeatTask();
} else {
Expand All @@ -812,7 +806,7 @@ public void sendTestRequest() { //do private

public void sendLogon() {
if(!sessionActive.get() || !channel.isOpen()) {
LOGGER.info("Logon is not sent to server because session is not active.");
info("Logon is not sent to server because session is not active.");
return;
}
StringBuilder logon = new StringBuilder();
Expand Down Expand Up @@ -844,7 +838,7 @@ public void sendLogon() {
}

setChecksumAndBodyLength(logon);
LOGGER.info("Send logon - {}", logon);
info("Send logon - %s", logon);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
}

Expand All @@ -854,7 +848,7 @@ private void sendLogout() {
setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null);
setChecksumAndBodyLength(logout);

LOGGER.debug("Sending logout - {}", logout);
debug("Sending logout - %s", logout);

try {
channel.send(
Expand All @@ -864,9 +858,9 @@ private void sendLogout() {
IChannel.SendMode.MANGLE
).get();

LOGGER.info("Sent logout - {}", logout);
info("Sent logout - %s", logout);
} catch (Exception e) {
LOGGER.error("Failed to send logout - {}", logout, e);
error("Failed to send logout - %s", e, logout);
}
}
}
Expand Down Expand Up @@ -896,11 +890,11 @@ public void close() {
private void waitLogoutResponse() {
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < settings.getDisconnectRequestDelay() && enabled.get()) {
if (LOGGER.isWarnEnabled()) LOGGER.warn("Waiting session logout: {}", channel.getSessionAlias());
warn("Waiting session logout");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping.");
error("Error while sleeping.", null);
}
}
}
Expand Down Expand Up @@ -1036,4 +1030,28 @@ private void resetTestRequestTask() {
private void cancelFuture(AtomicReference<Future<?>> future) {
future.get().cancel(false);
}

private void info(String message, Object... args) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args));
}
}

private void error(String message, Throwable throwable, Object... args) {
if(LOGGER.isErrorEnabled()) {
LOGGER.error("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args), throwable);
}
}

private void warn(String message, Object... args) {
if(LOGGER.isWarnEnabled()) {
LOGGER.warn("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args));
}
}

private void debug(String message, Object... args) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args));
}
}
}

0 comments on commit 0fb3114

Please sign in to comment.