Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup after sending to avoid memory leak #334

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public void handleConfirmation(String uniqueId, Confirmation confirmation) {
promiseRepository.getPromise(uniqueId);
if (promiseOptional.isPresent()) {
promiseOptional.get().complete(confirmation);
promiseRepository.removePromise(uniqueId);
} else {
logger.debug("Promise not found for confirmation {}", confirmation);
}
Expand All @@ -105,11 +104,9 @@ public void handleError(
Optional<CompletableFuture<Confirmation>> promiseOptional =
promiseRepository.getPromise(uniqueId);
if (promiseOptional.isPresent()) {
promiseOptional
.get()
promiseOptional.get()
.completeExceptionally(
new CallErrorException(errorCode, errorDescription, payload));
promiseRepository.removePromise(uniqueId);
} else {
logger.debug("Promise not found for error {}", errorDescription);
}
Expand Down Expand Up @@ -158,10 +155,16 @@ public CompletableFuture<Confirmation> send(Request request)
throw new OccurenceConstraintException();
}

String id = session.storeRequest(request);
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(id);
String requestUuid = session.storeRequest(request);
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(requestUuid);

session.sendRequest(featureOptional.get().getAction(), request, id);
// Clean up after the promise has completed, no matter if it was successful or had an error or a timeout.
promise.whenComplete((confirmation, throwable) -> {
session.removeRequest(requestUuid);
promiseRepository.removePromise(requestUuid);
});

session.sendRequest(featureOptional.get().getAction(), request, requestUuid);
return promise;
}

Expand Down
2 changes: 2 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface ISession {

String storeRequest(Request payload);

void removeRequest(String ticket);

void sendRequest(String action, Request payload, String uuid);

boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException;
Expand Down
10 changes: 10 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public Optional<Request> restoreRequest(String ticket) {
return Optional.empty();
}

/**
* Remove a stored {@link Request} using a unique identifier.
* If no request is found for the identifier this method has no effect.
*
* @param ticket unique identifier returned when {@link Request} was initially stored.
*/
public void removeRequest(String ticket) {
requestQueue.remove(ticket);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("requestQueue", requestQueue).toString();
Expand Down
18 changes: 11 additions & 7 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public void handleConfirmation(String uniqueId, Confirmation confirmation) {
promiseRepository.getPromise(uniqueId);
if (promiseOptional.isPresent()) {
promiseOptional.get().complete(confirmation);
promiseRepository.removePromise(uniqueId);
} else {
logger.debug("Promise not found for confirmation {}", confirmation);
}
Expand Down Expand Up @@ -135,11 +134,9 @@ public void handleError(
Optional<CompletableFuture<Confirmation>> promiseOptional =
promiseRepository.getPromise(uniqueId);
if (promiseOptional.isPresent()) {
promiseOptional
.get()
promiseOptional.get()
.completeExceptionally(
new CallErrorException(errorCode, errorDescription, payload));
promiseRepository.removePromise(uniqueId);
} else {
logger.debug("Promise not found for error {}", errorDescription);
}
Expand Down Expand Up @@ -216,9 +213,16 @@ public CompletableFuture<Confirmation> send(UUID sessionIndex, Request request)
throw new OccurenceConstraintException();
}

String id = session.storeRequest(request);
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(id);
session.sendRequest(featureOptional.get().getAction(), request, id);
String requestUuid = session.storeRequest(request);
CompletableFuture<Confirmation> promise = promiseRepository.createPromise(requestUuid);

// Clean up after the promise has completed, no matter if it was successful or had an error or a timeout.
promise.whenComplete((confirmation, throwable) -> {
session.removeRequest(requestUuid);
promiseRepository.removePromise(requestUuid);
});

session.sendRequest(featureOptional.get().getAction(), request, requestUuid);
return promise;
}

Expand Down
10 changes: 10 additions & 0 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ public String storeRequest(Request payload) {
return queue.store(payload);
}

/**
* Remove a stored {@link Request} using a unique identifier.
* If no request is found for the identifier this method has no effect.
*
* @param ticket unique identifier returned when {@link Request} was initially stored.
*/
public void removeRequest(String ticket) {
queue.removeRequest(ticket);
}

/**
* Send a {@link Confirmation} to a {@link Request}
*
Expand Down
84 changes: 78 additions & 6 deletions ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,29 @@ of this software and associated documentation files (the "Software"), to deal
SOFTWARE.
*/

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.*;

import eu.chargetime.ocpp.*;
import eu.chargetime.ocpp.feature.Feature;
import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;
import eu.chargetime.ocpp.model.TestConfirmation;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
public class ClientTest {
private Client client;
Expand All @@ -63,8 +69,10 @@ public void setup() {
.when(session)
.open(any(), any());

when(promiseRepository.createPromise(any())).then(invocation -> new CompletableFuture<Confirmation>());
when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature));
when(session.getFeatureRepository()).thenReturn(featureRepository);
when(session.storeRequest(any())).then(invocation -> UUID.randomUUID().toString());
client = new Client(session, promiseRepository);
}

Expand Down Expand Up @@ -164,4 +172,68 @@ public void send_aMessage_validatesMessage() throws Exception {
// Then
verify(request, times(1)).validate();
}

@Test
public void send_aMessage_promiseCompletes() throws Exception {
// Given
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);

// When
CompletableFuture<Confirmation> returnedFuture = client.send(request);
TestConfirmation confirmation = new TestConfirmation();
internalFuture.complete(confirmation);

// Then
assertThat(returnedFuture, is(internalFuture));
assertThat(returnedFuture.isDone(), is(true));
assertThat(returnedFuture.get(), is(confirmation));
verify(session, times(1)).removeRequest(any());
verify(promiseRepository, times(1)).removePromise(any());
}

@Test
public void send_aMessage_promiseCompletesExceptionally() throws Exception {
// Given
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);

// When
CompletableFuture<Confirmation> returnedFuture = client.send(request);
internalFuture.completeExceptionally(new IllegalStateException());

// Then
assertThat(returnedFuture, is(internalFuture));
assertThat(returnedFuture.isDone(), is(true));
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
assertThat(executionException.getCause().getClass(), is(equalTo(IllegalStateException.class)));
verify(session, times(1)).removeRequest(any());
verify(promiseRepository, times(1)).removePromise(any());
}

@Test
public void send_aMessage_promiseCompletesWithTimeout() throws Exception {
// Given
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);

// When
CompletableFuture<Confirmation> returnedFuture = client.send(request);
// If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..) ..
// returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS);
assertThat(returnedFuture.isDone(), is(false));
Thread.sleep(100);
// .. alternatively, it can be implemented manually
returnedFuture.completeExceptionally(new TimeoutException());

// Then
assertThat(returnedFuture, is(internalFuture));
assertThat(returnedFuture.isDone(), is(true));
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
assertThat(executionException.getCause().getClass(), is(equalTo(TimeoutException.class)));
verify(session, times(1)).removeRequest(any());
verify(promiseRepository, times(1)).removePromise(any());
}
}
92 changes: 87 additions & 5 deletions ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package eu.chargetime.ocpp.test;

import static org.mockito.Mockito.*;

import eu.chargetime.ocpp.*;
import eu.chargetime.ocpp.feature.Feature;
import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;
import eu.chargetime.ocpp.model.SessionInformation;
import java.util.Optional;
import java.util.UUID;
import eu.chargetime.ocpp.model.TestConfirmation;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.*;

/*
ChargeTime.eu - Java-OCA-OCPP

Expand Down Expand Up @@ -58,7 +68,7 @@ public class ServerTest {
@Mock private Request request;
@Mock private SessionInformation information;
@Mock private IFeatureRepository featureRepository;
@Mock private IPromiseRepository promiseRepository;
@Mock IPromiseRepository promiseRepository;

@Before
public void setup() {
Expand All @@ -75,8 +85,10 @@ public void setup() {
.when(serverEvents)
.newSession(any(), any());

when(promiseRepository.createPromise(any())).then(invocation -> new CompletableFuture<Confirmation>());
when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature));
when(session.getFeatureRepository()).thenReturn(featureRepository);
when(session.storeRequest(any())).thenAnswer(invocation -> UUID.randomUUID().toString());
server = new Server(listener, promiseRepository);
}

Expand Down Expand Up @@ -143,4 +155,74 @@ public void send_aMessage_validatesMessage() throws Exception {
// Then
verify(request, times(1)).validate();
}

@Test
public void send_aMessage_promiseCompletes() throws Exception {
// Given
server.open(LOCALHOST, PORT, serverEvents);
listenerEvents.newSession(session, information);
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);

// When
CompletableFuture<Confirmation> returnedFuture = server.send(sessionIndex, request);
TestConfirmation confirmation = new TestConfirmation();
internalFuture.complete(confirmation);

// Then
assertThat(returnedFuture, is(internalFuture));
assertThat(returnedFuture.isDone(), is(true));
assertThat(returnedFuture.get(), is(confirmation));
verify(session, times(1)).removeRequest(any());
verify(promiseRepository, times(1)).removePromise(any());
}

@Test
public void send_aMessage_promiseCompletesExceptionally() throws Exception {
// Given
server.open(LOCALHOST, PORT, serverEvents);
listenerEvents.newSession(session, information);
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);

// When
CompletableFuture<Confirmation> returnedFuture = server.send(sessionIndex, request);
internalFuture.completeExceptionally(new IllegalStateException());

// Then
assertThat(returnedFuture, is(internalFuture));
assertThat(returnedFuture.isDone(), is(true));
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
assertThat(executionException.getCause().getClass(), is(equalTo(IllegalStateException.class)));
verify(session, times(1)).removeRequest(any());
verify(promiseRepository, times(1)).removePromise(any());
}

@Test
public void send_aMessage_promiseCompletesWithTimeout() throws Exception {
// Given
server.open(LOCALHOST, PORT, serverEvents);
listenerEvents.newSession(session, information);
CompletableFuture<Confirmation> internalFuture = new CompletableFuture<>();
when(promiseRepository.createPromise(any())).thenReturn(internalFuture);

// When
CompletableFuture<Confirmation> returnedFuture = server.send(sessionIndex, request);
// If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..).
// returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS);
assertThat(returnedFuture.isDone(), is(false));
Thread.sleep(100);
// .. alternatively, it can be implemented manually
returnedFuture.completeExceptionally(new TimeoutException());

// Then
assertThat(returnedFuture, is(internalFuture));
assertThat(returnedFuture.isDone(), is(true));
assertThat(returnedFuture.isCompletedExceptionally(), is(true));
ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get);
assertThat(executionException.getCause().getClass(), is(equalTo(TimeoutException.class)));
verify(session, times(1)).removeRequest(any());
verify(promiseRepository, times(1)).removePromise(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public void sendRequest(String action, Request payload, String uuid) {
this.session.sendRequest(action, payload, uuid);
}

@Override
public void removeRequest(String ticket) {
this.session.removeRequest(ticket);
}

@Override
public boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException {
return this.session.completePendingPromise(id, confirmation);
Expand Down