Skip to content

Commit

Permalink
Send ConnectionRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanl-bq committed Oct 12, 2023
1 parent f5c74d2 commit 7df9805
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,76 @@
package javababushka.benchmarks;

import connection_request.ConnectionRequestOuterClass.AddressInfo;
import connection_request.ConnectionRequestOuterClass.AuthenticationInfo;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy;
import connection_request.ConnectionRequestOuterClass.ReadFromReplicaStrategy;
import connection_request.ConnectionRequestOuterClass.TlsMode;
import java.io.IOException;
import java.net.StandardProtocolFamily;
import java.net.UnixDomainSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import javababushka.client.RedisClient;

public class BenchmarkingApp {

/*
def _varint_encoder():
"""Return an encoder for a basic varint value (does not include tag)."""
local_int2byte = struct.Struct(">B").pack
def encode_varint(write, value, unused_deterministic=None):
bits = value & 0x7F
value >>= 7
while value:
write(local_int2byte(0x80 | bits))
bits = value & 0x7F
value >>= 7
return write(local_int2byte(bits))
return encode_varint
@classmethod
def _varint_bytes(cls, value: int) -> bytes:
"""Encode the given integer as a varint and return the bytes.
TODO: Improve performance
"""
pieces: List[bytes] = []
func = cls._varint_encoder()
func(pieces.append, value, True)
return b"".join(pieces)
*/

private static Byte[] varintBytes(int value) {
ArrayList<Byte> output = new ArrayList();
int bits = value & 0x7F;
value >>= 7;
while (value > 0) {
output.add(new Byte((byte) (0x80 | bits)));
bits = value & 0x7F;
value >>= 7;
}
output.add(new Byte((byte) bits));
Byte[] arr = new Byte[] {};
return output.toArray(arr);
}

private static String readSocketMessage(SocketChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);
if (bytesRead < 0) return null;

byte[] bytes = new byte[bytesRead];
buffer.flip();
buffer.get(bytes);
String message = new String(bytes);
return message;
}

// main application entrypoint
public static void main(String[] args) throws InterruptedException {
RedisClient client = new RedisClient();
Expand All @@ -26,7 +88,55 @@ public static void main(String[] args) throws InterruptedException {
try {
SocketChannel channel = SocketChannel.open(StandardProtocolFamily.UNIX);
channel.connect(socketAddress);
ConnectionRequest request = ConnectionRequest.newBuilder().build();
ConnectionRequest request =
ConnectionRequest.newBuilder()
.addAddresses(AddressInfo.newBuilder().setHost("localhost").setPort(6379))
.setTlsMode(TlsMode.NoTls)
.setClusterModeEnabled(false)
// In millis
.setResponseTimeout(250)
// In millis
.setClientCreationTimeout(2500)
.setReadFromReplicaStrategy(ReadFromReplicaStrategy.AlwaysFromPrimary)
.setConnectionRetryStrategy(
ConnectionRetryStrategy.newBuilder()
.setNumberOfRetries(1)
.setFactor(1)
.setExponentBase(1))
.setAuthenticationInfo(
AuthenticationInfo.newBuilder().setPassword("").setUsername("default"))
.setDatabaseId(0)
.build();

Byte[] varint = varintBytes(request.toByteArray().length);

System.out.println("Request: \n" + request.toString());
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
for (Byte b : varint) {
buffer.put(b);
}
buffer.put(request.toByteArray());
buffer.flip();
// System.out.println("Buffer: \n" + StandardCharsets.UTF_8.decode(buffer).toString());
while (buffer.hasRemaining()) {
channel.write(buffer);
}

timeout = 0;
String response = "";
while (response == "" && timeout < maxTimeout) {
timeout++;
System.out.println("iteration");
response = readSocketMessage(channel);
Thread.sleep(250);
}

if (response == null) {
System.out.println("WARNING: response null");
}
System.out.println("Response: " + response);

} catch (IOException e) {
e.printStackTrace();
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7df9805

Please sign in to comment.