Skip to content

Commit

Permalink
PS-1575: Daily cleanup for in-memory H2 DB to avoid OOM
Browse files Browse the repository at this point in the history
  • Loading branch information
galovics committed Sep 28, 2023
1 parent 28f90ff commit 292b4e0
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 45 deletions.
16 changes: 11 additions & 5 deletions src/main/java/org/test/consumer/domain/EventMessage.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package org.test.consumer.domain;

import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.Table;
import lombok.Getter;
import lombok.NoArgsConstructor;

import javax.persistence.*;

@Entity
@Table(name="event_message")
@Getter
@NoArgsConstructor
public class EventMessage {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(name="event_id")
private int eventId;
Expand All @@ -24,14 +30,14 @@ public class EventMessage {
@Column(name="tenant_id")
private String tenantId;
@Column(name="created_at")
private String createdAt;
private LocalDateTime createdAt;
@Lob
@Column(name="payload", columnDefinition="BLOB")
private byte[] payload;
@Column(name="business_date")
private String businessDate;

public EventMessage(int eventId, String type, String category, String schema, String tenantId, String createdAt, byte[] payload, String businessDate) {
public EventMessage(int eventId, String type, String category, String schema, String tenantId, LocalDateTime createdAt, byte[] payload, String businessDate) {
this.eventId = eventId;
this.type = type;
this.category = category;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.test.consumer.handler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.MessageV1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
Expand All @@ -12,36 +15,32 @@
import org.test.consumer.repository.EventMessageRepository;
import org.test.consumer.utility.ByteBufferConvertor;

import java.io.IOException;
import java.nio.ByteBuffer;


@Component
@Slf4j
public class JMSMessageConsumerHandler implements MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(JMSMessageConsumerHandler.class);

@Autowired
private ByteBufferConvertor byteBufferConvertor;

@Autowired
private EventMessageRepository repository;

@Override
public void handleMessage(Message<?> message) throws MessagingException{
ByteBuffer messageByteBuffer = byteBufferConvertor.convert((byte [] )message.getPayload());
MessageV1 messagePayload = null;
public void handleMessage(Message<?> message) throws MessagingException {
ByteBuffer messageByteBuffer = byteBufferConvertor.convert((byte[]) message.getPayload());
try {
messagePayload = MessageV1.fromByteBuffer(messageByteBuffer);
MessageV1 messagePayload = MessageV1.fromByteBuffer(messageByteBuffer);
log.info("Received message for event of Category = {}, Type = {}", messagePayload.getCategory(), messagePayload.getType());
saveMessage(messagePayload);
} catch (IOException e) {
logger.error("Unable to read message {}",e);
log.error("Unable to read message", e);
}
logger.info("Received message for event of Category = {}, Type = {}",messagePayload.getCategory(), messagePayload.getType());
saveMessage(messagePayload);
}

private void saveMessage(MessageV1 messagePayload){
EventMessage message = new EventMessage(messagePayload.getId(),messagePayload.getType(),messagePayload.getCategory(),messagePayload.getDataschema(),messagePayload.getTenantId(),messagePayload.getCreatedAt(),byteBufferConvertor.convert(messagePayload.getData()),
messagePayload.getBusinessDate());
private void saveMessage(MessageV1 messagePayload) {
LocalDateTime createdAt = LocalDateTime.parse(messagePayload.getCreatedAt(), DateTimeFormatter.ISO_DATE_TIME);
EventMessage message = new EventMessage(messagePayload.getId(), messagePayload.getType(), messagePayload.getCategory(), messagePayload.getDataschema(), messagePayload.getTenantId(), createdAt, byteBufferConvertor.convert(messagePayload.getData()),
messagePayload.getBusinessDate());
repository.save(message);
}
}
36 changes: 36 additions & 0 deletions src/main/java/org/test/consumer/job/PurgeOldEventsJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.test.consumer.job;

import java.time.Clock;
import java.time.LocalDate;
import java.time.LocalDateTime;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaDelete;
import javax.persistence.criteria.Root;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.test.consumer.domain.EventMessage;

@Component
@RequiredArgsConstructor
@Slf4j
public class PurgeOldEventsJob {
private final EntityManager entityManager;

@Scheduled(cron = "0 0 * * *")
@Transactional
public void purge() {
log.info("Starting the purging");
LocalDateTime todayStartOfDay = LocalDate.now(Clock.systemUTC()).atStartOfDay();
CriteriaBuilder cb = entityManager.getCriteriaBuilder();
CriteriaDelete<EventMessage> deleteQ = cb.createCriteriaDelete(EventMessage.class);
Root<EventMessage> from = deleteQ.from(EventMessage.class);
deleteQ.where(cb.lessThan(from.get("createdAt"), todayStartOfDay));

entityManager.createQuery(deleteQ).executeUpdate();
log.info("Purging finished");
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
package org.test.consumer.service;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.BulkMessageItemV1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.test.consumer.data.EventMessageDTO;
import org.test.consumer.domain.EventMessage;
import org.test.consumer.repository.EventMessageRepository;
import org.test.consumer.utility.ByteBufferConvertor;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

@Service
@AllArgsConstructor
@Slf4j
public class MessageConsumerServiceImpl implements MessageConsumerService {

private static final Logger logger = LoggerFactory.getLogger(MessageConsumerServiceImpl.class);

private final EventMessageRepository repository;
private final ByteBufferConvertor byteBufferConvertor;

Expand All @@ -31,8 +28,9 @@ public List<EventMessageDTO> getMessages() {
List<EventMessage> messages = repository.findAll();
try {
eventMessages = convertToReadableFormat(messages);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
logger.error("Unable to read message {}", e);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
log.error("Unable to read message", e);
}
return eventMessages;
}
Expand All @@ -43,8 +41,9 @@ public List<EventMessageDTO> getMessagesByType(String eventType) {
List<EventMessage> messages = repository.findByType(eventType);
try {
eventMessages = convertToReadableFormat(messages);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
logger.error("Unable to read message {}", e);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
log.error("Unable to read message", e);
}
return eventMessages;
}
Expand All @@ -55,7 +54,7 @@ public void deleteMessages() {
}

private List<EventMessageDTO> convertToReadableFormat(List<EventMessage> messages)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
List<EventMessageDTO> eventMessages = new ArrayList<>();
for (EventMessage message : messages) {
Class payLoadClass = Class.forName(message.getSchema());
Expand All @@ -71,22 +70,26 @@ private List<EventMessageDTO> convertToReadableFormat(List<EventMessage> message
bulkMessagePayload.append(bulkMessageData);
bulkMessagePayload.append(System.lineSeparator());
}
eventMessages.add(new EventMessageDTO(message.getEventId(), message.getType(), message.getCategory(), message.getTenantId(), message.getCreatedAt(),
bulkMessagePayload.toString(),
message.getBusinessDate()));
eventMessages.add(new EventMessageDTO(message.getEventId(), message.getType(), message.getCategory(), message.getTenantId(), getCreatedAt(message),
bulkMessagePayload.toString(),
message.getBusinessDate()));

} else {
eventMessages.add(
new EventMessageDTO(message.getEventId(), message.getType(), message.getCategory(), message.getTenantId(), message.getCreatedAt(), payLoad.toString(),
message.getBusinessDate()));
new EventMessageDTO(message.getEventId(), message.getType(), message.getCategory(), message.getTenantId(), getCreatedAt(message), payLoad.toString(),
message.getBusinessDate()));
}
}

return eventMessages;
}

private String getCreatedAt(EventMessage message) {
return DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(message.getCreatedAt());
}

private EventMessageDTO retrieveBulkMessage(BulkMessageItemV1 messageItem)
throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException {
throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException {
Class messageBulkMessagePayLoad = Class.forName(messageItem.getDataschema());
Method methodForPayLoad = messageBulkMessagePayLoad.getMethod("fromByteBuffer", ByteBuffer.class);
Object payLoadBulkItem = methodForPayLoad.invoke(null, messageItem.getData());
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ spring.datasource.username=sa
spring.datasource.password=
spring.datasource.driverClassName=org.h2.Driver
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.jpa.hibernate.ddl-auto=validate

2 changes: 1 addition & 1 deletion src/main/resources/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE event_message (
event_id INT NOT NULL,
type VARCHAR(100) NOT NULL,
data_schema VARCHAR(200) NOT NULL,
created_at VARCHAR(100),
created_at TIMESTAMP WITHOUT TIME ZONE,
tenant_id VARCHAR(100),
payload BLOB NOT NULL,
category VARCHAR(100) NOT NULL,
Expand Down

0 comments on commit 292b4e0

Please sign in to comment.