Skip to content

Commit

Permalink
Add testing tools.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Dec 19, 2023
1 parent 12dcaba commit 4399388
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 0 deletions.
2 changes: 2 additions & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ dependencies {
// junit
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'

testImplementation group: 'io.netty', name: 'netty-codec-redis', version: '4.1.100.Final'
}

tasks.register('protobuf', Exec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.ServerDomainSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.OptionalInt;
Expand Down Expand Up @@ -121,6 +127,36 @@ public static Class<? extends DomainSocketChannel> getClientUdsNettyChannelType(
throw new RuntimeException("Current platform supports no known socket types");
}

/**
* Get a channel class required by Netty to open a server UDS channel.
*
* @return Return a class, supported by the current native platform.
*/
public static Class<? extends ServerDomainSocketChannel> getServerUdsNettyChannelType() {
if (capabilities.isKQueueAvailable()) {
return KQueueServerDomainSocketChannel.class;
}
if (capabilities.isEPollAvailable()) {
return EpollServerDomainSocketChannel.class;
}
throw new RuntimeException("Current platform supports no known socket types");
}

/**
* Get a channel class required by Netty to open a server TCP channel.
*
* @return Return a class, supported by the current native platform.
*/
public static Class<? extends ServerSocketChannel> getServerTcpNettyChannelType() {
if (capabilities.isKQueueAvailable()) {
return KQueueServerSocketChannel.class;
}
if (capabilities.isEPollAvailable()) {
return EpollServerSocketChannel.class;
}
throw new RuntimeException("Current platform supports no known socket types");
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
Expand Down
35 changes: 35 additions & 0 deletions java/client/src/test/java/babushka/utils/Awaiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package babushka.utils;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Awaiter {
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 30000;

/** Get the future result with default timeout. */
public static <T> T await(Future<T> future) {
return await(future, DEFAULT_TIMEOUT_MILLISECONDS);
}

/** Get the future result with given timeout in ms. */
public static <T> T await(Future<T> future, long timeout) {
try {
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new RuntimeException("Request timed out", e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e.getCause());
} catch (InterruptedException e) {
if (Thread.currentThread().isInterrupted()) {
// restore interrupt
Thread.interrupted();
}
throw new RuntimeException("The thread was interrupted", e);
} catch (CancellationException e) {
throw new RuntimeException("Request was cancelled", e);
}
}
}
42 changes: 42 additions & 0 deletions java/client/src/test/java/babushka/utils/RedisMockTestBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package babushka.utils;

import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public class RedisMockTestBase {

public static boolean started = false;

@SneakyThrows
public static void startRedisMock(RedisServerMock.ServerMock serverMock) {
assert !started
: "Previous `RedisMock` wasn't stopped, probably your test class does not inherit"
+ " `RedisMockTestBase`.";
RedisServerMock.start(serverMock);
started = true;
}

@BeforeEach
public void preTestCheck() {
assert started
: "You missed to call `startRustCoreLibMock` in a `@BeforeAll` method of your test class"
+ " inherited from `RedisMockTestBase`.";
}

@AfterEach
public void afterTestCheck() {
assert !RedisServerMock.failed() : "Error occurred in `RedisMock`";
}

@AfterAll
@SneakyThrows
public static void stopRedisMock() {
assert started
: "You missed to call `startRustCoreLibMock` in a `@BeforeAll` method of your test class"
+ " inherited from `RedisMockTestBase`.";
RedisServerMock.stop();
started = false;
}
}
219 changes: 219 additions & 0 deletions java/client/src/test/java/babushka/utils/RedisServerMock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package babushka.utils;

import babushka.connectors.resources.Platform;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.redis.ArrayRedisMessage;
import io.netty.handler.codec.redis.ErrorRedisMessage;
import io.netty.handler.codec.redis.FullBulkStringRedisMessage;
import io.netty.handler.codec.redis.IntegerRedisMessage;
import io.netty.handler.codec.redis.RedisArrayAggregator;
import io.netty.handler.codec.redis.RedisBulkStringAggregator;
import io.netty.handler.codec.redis.RedisDecoder;
import io.netty.handler.codec.redis.RedisEncoder;
import io.netty.handler.codec.redis.RedisMessage;
import io.netty.handler.codec.redis.SimpleStringRedisMessage;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Setter;

public class RedisServerMock {

public abstract static class ServerMock {
/** Return `null` to do not reply. */
public abstract RedisMessage reply(String cmd);

protected RedisMessage reply0(String cmd) {
return reply(cmd);
}

public static RedisMessage error(String text) {
return new ErrorRedisMessage(text);
}

public static RedisMessage error(String prefix, String text) {
// https://redis.io/docs/reference/protocol-spec/#simple-errors
if (prefix.contains(" ") || prefix.contains("\r") || prefix.contains("\n")) {
throw new IllegalArgumentException();
}
return new ErrorRedisMessage(prefix.toUpperCase() + " " + text);
}

public static RedisMessage simpleString(String text) {
return new SimpleStringRedisMessage(text);
}

public static RedisMessage OK() {
return simpleString("OK");
}

public static RedisMessage number(long value) {
return new IntegerRedisMessage(value);
}

/** A multi-line message. */
public static RedisMessage multiString(String text) {
return new FullBulkStringRedisMessage(Unpooled.copiedBuffer(text.getBytes()));
}
}

public abstract static class ServerMockConnectAll extends ServerMock {
@Override
protected RedisMessage reply0(String cmd) {
if (cmd.startsWith("CLIENT SETINFO")) {
return OK();
} else if (cmd.startsWith("INFO REPLICATION")) {
var response =
"# Replication\r\n"
+ "role:master\r\n"
+ "connected_slaves:0\r\n"
+ "master_failover_state:no-failover\r\n"
+ "master_replid:d7646c8d14901de9347f1f675c70bcf269a503eb\r\n"
+ "master_replid2:0000000000000000000000000000000000000000\r\n"
+ "master_repl_offset:0\r\n"
+ "second_repl_offset:-1\r\n"
+ "repl_backlog_active:0\r\n"
+ "repl_backlog_size:1048576\r\n"
+ "repl_backlog_first_byte_offset:0\r\n"
+ "repl_backlog_histlen:0\r\n";
return multiString(response);
}
return reply(cmd);
}
}

// TODO support configurable port to test cluster mode
public static final int PORT = 6380;

/** Thread pool supplied to <em>Netty</em> to perform all async IO. */
private EventLoopGroup group;

private Channel channel;

private static RedisServerMock instance;

private ServerMock messageProcessor;

/** Update {@link ServerMock} into a running {@link RedisServerMock}. */
public static void updateServerMock(ServerMock newMock) {
instance.messageProcessor = newMock;
}

private final AtomicBoolean failed = new AtomicBoolean(false);

/** Get and clear failure status. */
public static boolean failed() {
return instance.failed.compareAndSet(true, false);
}

@Setter private static boolean debugLogging = false;

private RedisServerMock() {
try {
channel =
new ServerBootstrap()
.group(group = Platform.createNettyThreadPool("RedisMock", OptionalInt.empty()))
.channel(Platform.getServerTcpNettyChannelType())
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/redis/RedisClient.java
.addLast(new RedisDecoder())
.addLast(new RedisBulkStringAggregator())
.addLast(new RedisArrayAggregator())
.addLast(new RedisEncoder())
.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RedisMessage redisMessage = (RedisMessage) msg;
var str = RedisMessageToString(redisMessage);
if (debugLogging) {
System.out.printf("-- Received%n %s%n", str);
}
var response = messageProcessor.reply0(str);
if (response != null) {
if (debugLogging) {
System.out.printf(
"-- Replying with%n %s%n",
RedisMessageToString(response));
}
ctx.writeAndFlush(response);
} else if (debugLogging) {
System.out.printf("-- Ignoring%n");
}
}

@Override
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
failed.setPlain(true);
}
});
}
})
.bind(new InetSocketAddress(PORT))
// .sync()
.channel();
} catch (Exception e) {
System.err.printf(
"Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage());
e.printStackTrace(System.err);
}
}

public static void start(ServerMock messageProcessor) {
if (instance != null) {
stop();
}
instance = new RedisServerMock();
instance.messageProcessor = messageProcessor;
}

public static void stop() {
instance.channel.close();
instance.group.shutdownGracefully();
instance = null;
}

private static String RedisMessageToString(RedisMessage msg) {
if (msg instanceof SimpleStringRedisMessage) {
return ((SimpleStringRedisMessage) msg).content();
} else if (msg instanceof ErrorRedisMessage) {
return ((ErrorRedisMessage) msg).content();
} else if (msg instanceof IntegerRedisMessage) {
return String.valueOf(((IntegerRedisMessage) msg).value());
} else if (msg instanceof FullBulkStringRedisMessage) {
return getString((FullBulkStringRedisMessage) msg);
} else if (msg instanceof ArrayRedisMessage) {
return ((ArrayRedisMessage) msg)
.children().stream()
.map(RedisServerMock::RedisMessageToString)
.collect(Collectors.joining(" "));
} else {
throw new CodecException("unknown message type: " + msg);
}
}

private static String getString(FullBulkStringRedisMessage msg) {
if (msg.isNull()) {
return "(null)";
}
return msg.content().toString(CharsetUtil.UTF_8);
}
}
Loading

0 comments on commit 4399388

Please sign in to comment.