diff --git a/java/client/src/main/java/javababushka/Client.java b/java/client/src/main/java/javababushka/Client.java index 2f55bd1712..73bc8f613d 100644 --- a/java/client/src/main/java/javababushka/Client.java +++ b/java/client/src/main/java/javababushka/Client.java @@ -227,39 +227,40 @@ public void run() { } public void closeConnection() { - try { - channel.flush(); + // flush and close the channel + channel.flush(); + channel.close(); + // TODO: check that the channel is closed + + // shutdown the event loop group gracefully by waiting for the remaining response + // and then shutting down the connection + try { long waitStarted = System.nanoTime(); long waitUntil = waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos - for (var future : responses) { - if (future == null || future.isDone()) { + for (var responseFuture : responses) { + if (responseFuture == null || responseFuture.isDone()) { continue; } try { - future.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); + responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); } catch (InterruptedException | ExecutionException ignored) { + // TODO: print warning } catch (TimeoutException e) { - future.cancel(true); - // TODO cancel the rest + responseFuture.cancel(true); + // TODO: cancel the rest break; } } } finally { - // channel.closeFuture().sync() - channel.close(); var shuttingDown = group.shutdownGracefully(); try { shuttingDown.get(); - } catch (Exception e) { + } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } - if (group.isShutdown()) { - System.out.println("Done shutdownGracefully"); - } else { - System.out.println("Something went wrong"); - } + assert group.isShutdown() : "Redis connection did not shutdown gracefully"; } }