Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: setup momento-local (wip) #394

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions momento-sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ tasks.named("analyzeIntTestClassesDependencies").configure {
tasks.named("analyzeTestClassesDependencies").configure {
enabled = false
}

tasks.register<JavaExec>("run") {
group = "application"
description = "Runs the Program class"
mainClass.set("momento.sdk.Program") // Fully qualified class name
classpath = sourceSets["main"].runtimeClasspath
}
44 changes: 44 additions & 0 deletions momento-sdk/src/intTest/java/momento/sdk/CacheGetBatchTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package momento.sdk;

import java.time.Duration;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configurations;
import momento.sdk.responses.cache.control.CacheCreateResponse;
import org.junit.jupiter.api.Test;

final class CacheGetBatchTest {
@Test
public void getBatchSetBatchHappyPath() {
CredentialProvider credentialProvider =
CredentialProvider.withMomentoLocal(System.getenv("MOMENTO_API_KEY"));
CacheClient cacheClient =
new CacheClientBuilder(
credentialProvider, Configurations.Laptop.latest(), Duration.ofMinutes(1))
.build();

CacheCreateResponse resp = cacheClient.createCache("cache").join();
System.out.println(resp);
// SetResponse setCacheResponse = cacheClient.set("cache", "key1", "val1",
// Duration.ofMinutes(1)).join();
// System.out.println(setCacheResponse);

// final Map<String, String> items = new HashMap<>();
// items.put("key1", "val1");
// items.put("key2", "val2");
// items.put("key3", "val3");
// final SetBatchResponse setBatchResponse =
// cacheClient.setBatch("cache", items, Duration.ofMinutes(1)).join();
// assertThat(setBatchResponse).isInstanceOf(SetBatchResponse.Success.class);
// for (SetResponse setResponse :
// ((SetBatchResponse.Success) setBatchResponse).results().values()) {
// assertThat(setResponse).isInstanceOf(SetResponse.Success.class);
// }
//
// final GetBatchResponse getBatchResponse = cacheClient.getBatch("cache",
// items.keySet()).join();
//
// assertThat(getBatchResponse).isInstanceOf(GetBatchResponse.Success.class);
// assertThat(((GetBatchResponse.Success) getBatchResponse).valueMapStringString())
// .containsExactlyEntriesOf(items);
}
}
102 changes: 102 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/Program.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package momento.sdk;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.config.Configurations;
import momento.sdk.responses.cache.GetBatchResponse;
import momento.sdk.responses.cache.GetResponse;
import momento.sdk.responses.cache.SetBatchResponse;
import momento.sdk.responses.cache.SetResponse;
import momento.sdk.responses.cache.control.CacheCreateResponse;

public class Program {
public static void main(String[] args) {
String cacheName = "test-cache";
CredentialProvider credentialProvider =
CredentialProvider.withMomentoLocal(System.getenv("MOMENTO_API_KEY"));

Configuration configuration = Configurations.Laptop.latest();
CacheClient cacheClient =
new CacheClientBuilder(credentialProvider, configuration, Duration.ofMinutes(1)).build();

CacheCreateResponse createResponse = cacheClient.createCache(cacheName).join();
if (createResponse instanceof CacheCreateResponse.Success) {
System.out.println("Cache created successfully");
} else {
System.out.println("Failed to create cache" + createResponse.toString());
CacheCreateResponse.Error error = (CacheCreateResponse.Error) createResponse;
System.out.println("Failed to create cache: " + error.getErrorCode());
}


// Set requests
// for (int i = 0; i < 10; i++) {
// SetResponse setResponse =
// cacheClient
// .set(
// cacheName,
// "key" + i,
// "val" + i,
// Duration.ofMinutes(1))
// .join();
// if (setResponse instanceof SetResponse.Success) {
// System.out.println("Set successful");
// } else {
// System.out.println("Failed to set batch" + setResponse.toString());
// SetResponse.Error error = (SetResponse.Error) setResponse;
// System.out.println("Failed to set: " + error.getErrorCode());
// }
// }
//
// // Get requests
// for (int i = 0; i < 10; i++) {
// final GetResponse getResponse =
// cacheClient.get(cacheName, "key" + i).join();
// if (getResponse instanceof GetResponse.Hit) {
// System.out.println("Get successful");
// String value =
// ((GetResponse.Hit) getResponse).valueString();
// System.out.println("Value: " + value);
// } else if (getResponse instanceof GetResponse.Miss) {
// System.out.println("Get successful");
// System.out.println("Value: null");
// } else {
// System.out.println("Failed to get" + getResponse.toString());
// GetResponse.Error error = (GetResponse.Error) getResponse;
// System.out.println("Failed to get: " + error.getErrorCode());
// }
// }

final Map<String, String> items = new HashMap<>();
for (int i = 0; i < 10; i++) {
items.put("key" + i, "val" + i);
}
final SetBatchResponse setBatchResponse =
cacheClient.setBatch(cacheName, items, Duration.ofMinutes(1)).join();
if (setBatchResponse instanceof SetBatchResponse.Success) {
System.out.println("Set batch successful");
} else {
System.out.println("Failed to set batch" + setBatchResponse.toString());
SetBatchResponse.Error error = (SetBatchResponse.Error) setBatchResponse;
System.out.println("Failed to set batch: " + error.getErrorCode());
}

final GetBatchResponse getBatchResponse =
cacheClient.getBatch(cacheName, items.keySet()).join();
if (getBatchResponse instanceof GetBatchResponse.Success) {
System.out.println("Get batch successful");
Map<String, String> values =
((GetBatchResponse.Success) getBatchResponse).valueMapStringString();
for (Map.Entry<String, String> entry : values.entrySet()) {
System.out.println("Key: " + entry.getKey() + " Value: " + entry.getValue());
}
} else {
System.out.println("Failed to get batch" + getBatchResponse.toString());
GetBatchResponse.Error error = (GetBatchResponse.Error) getBatchResponse;
System.out.println("Failed to get batch: " + error.getErrorCode());
}
}
}
16 changes: 5 additions & 11 deletions momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel channel) {
CallOptions newCallOptions = callOptions.withDeadlineAfter(10, TimeUnit.SECONDS);
// currently the SDK only supports unary operations which we want to retry on
if (!method.getType().clientSendsOneMessage()) {
return channel.newCall(method, callOptions);
Expand All @@ -91,18 +92,9 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {

/**
* At this point, the ClientCall has been closed. Any additional calls to the
* ClientCall will not be processed by the server. The server does not send any
* further messages, acknowledgements, or notifications. This is the point where we
* can safely check the status of the initial request that was made, and determine if
* we want to retry or not.
*
* @param status the result of the remote call.
* @param trailers metadata provided at call completion.
*/
@Override
public void onClose(Status status, Metadata trailers) {
System.out.println("onClose function invoked" + method.getFullMethodName());
// we don't have any more business with the server, and since we either complete the
// call or retry
// later on, we try to cancel the current attempt. If the request was successful,
Expand Down Expand Up @@ -131,7 +123,7 @@ public void onClose(Status status, Metadata trailers) {
}

logger.debug(
"Retrying request {} on error code {} with delay {} millisecodns",
"Retrying request {} on error code {} with delay {} milliseconds",
method.getFullMethodName(),
status.getCode().toString(),
retryDelay.get().toMillis());
Expand Down Expand Up @@ -164,6 +156,8 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
private void cancelAttempt() {
if (future != null) {
future.cancel(true);
} else {
System.out.println("Future is null");
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ final class ScsControlGrpcStubsManager implements AutoCloseable {
private static ManagedChannel setupConnection(
CredentialProvider credentialProvider, Configuration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 443);
NettyChannelBuilder.forAddress(credentialProvider.getControlEndpoint(), 8080)
.usePlaintext();

// Override grpc config to disable keepalive for control clients
final GrpcConfiguration controlConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private static void eagerlyConnect(
private ManagedChannel setupChannel(
CredentialProvider credentialProvider, Configuration configuration) {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 443);
NettyChannelBuilder.forAddress(credentialProvider.getCacheEndpoint(), 8080).usePlaintext();

// set additional channel options (message size, keepalive, auth, etc)
GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder(
Expand Down
10 changes: 10 additions & 0 deletions momento-sdk/src/main/java/momento/sdk/auth/CredentialProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ public static CredentialProvider fromEnvVar(@Nonnull String envVar) {
return new EnvVarCredentialProvider(envVar);
}

public static CredentialProvider withMomentoLocal(@Nonnull String authToken) {
String momentoLocalOverride = "127.0.0.1";
return new StringCredentialProvider(
authToken,
momentoLocalOverride,
momentoLocalOverride,
momentoLocalOverride,
momentoLocalOverride);
}

/**
* Gets the token used to authenticate to Momento.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ public class GrpcChannelOptions {

public static void applyGrpcConfigurationToChannelBuilder(
GrpcConfiguration grpcConfig, NettyChannelBuilder channelBuilder) {
channelBuilder.useTransportSecurity();
channelBuilder.disableRetry();
// channelBuilder.useTransportSecurity();
// channelBuilder.disableRetry();
channelBuilder.enableRetry();
channelBuilder.maxRetryAttempts(3);

final Optional<Integer> maxMessageSize = grpcConfig.getMaxMessageSize();
if (maxMessageSize.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DefaultRetryEligibilityStrategy implements RetryEligibilityStrategy
add("cache_client.Scs/ListLength");
// not idempotent: "/cache_client.Scs/ListConcatenateFront",
// not idempotent: "/cache_client.Scs/ListConcatenateBack"
add("cache_client.Scs/GetBatch");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,20 @@ public void testCredentialProviderV1MissingApiKey() {
.isThrownBy(() -> new StringCredentialProvider(TEST_V1_MISSING_API_KEY))
.withMessageContaining("parse auth token");
}

// @Test
// public void testCredentialProviderWithMomentoLocal() {
// String apiKey =
// "eyJlbmRwb2ludCI6ImNlbGwtYWxwaGEtZGV2LnByZXByb2QuYS5tb21lbnRvaHEuY29tIiwiYXBpX2tleSI6ImV5SmhiR2NpT2lKSVV6STFOaUo5LmV5SnpkV0lpT2lKeWFYTm9kR2xBYlc5dFpXNTBiMmh4TG1OdmJTSXNJblpsY2lJNk1Td2ljQ0k2SWtOQlFUMGlMQ0psZUhBaU9qRTNNekEwTURZMU9ESjkuTHJmNTNMNEg2bDlVY05XNGJMLXJtRm96elluQm1RUjFSaUUxZzdGZktiWSJ9";
// assertThat(CredentialProvider.withMomentoLocal(apiKey))
// .satisfies(
// provider -> {
//// assertThat(provider.getAuthToken()).isEqualTo(apiKey);
//
// assertThat(provider.getControlEndpoint()).isEqualTo("127.0.0.1:8080");
// assertThat(provider.getCacheEndpoint()).isEqualTo("127.0.0.1:8080");
//
// assertThat(provider.getStorageEndpoint()).isEqualTo("127.0.0.1:8080");
// });
// }
}
Loading