Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: setup momento-local (wip) #394

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Conversation

rishtigupta
Copy link
Contributor

@rishtigupta rishtigupta commented Oct 2, 2024

PR Description:

  1. Tweaked gRPC Configuration: This commit adjusts the gRPC configuration to allow testing against Momento Local.
  2. Added Test Program: Introduces a simple test program (Program.java) that performs batch get/set operations, designed to trigger the RetryInterceptor logic for testing retry behavior.

Details

1. Momento Local Update:
- Modified the get_batch logic inside Momento Local to simulate a "SERVER_UNAVAILABLE" error on the first request and return the actual getBatch response on subsequent attempts.
- This allows us to verify the RetryInterceptor functionality by reproducing failure scenarios locally.


#[derive(Debug)]
pub struct LocalCacheService<C>
where
    C: Cache + Control,
{
    cache: C,
    request_counter: Arc<Mutex<u32>>,
}

impl<C> LocalCacheService<C>
where
    C: Cache + Control + Clone + Send + Sync + 'static,
{
    pub fn new(cache: C) -> Self {
        Self { cache, request_counter: Arc::new(Mutex::new(0)) }
    }
}

async fn get_batch(
        &self,
        request: tonic::Request<cache_client::GetBatchRequest>,
    ) -> Result<tonic::Response<Self::GetBatchStream>, tonic::Status> {

        let mut counter = self.request_counter.lock().unwrap();
        *counter += 1;

        // Check the counter for the first request
        println!("Counter: {}", *counter);
        if *counter == 1 {
            println!("Returning unavailable status for the first request.");
            return Err(tonic::Status::unavailable("Service is unavailable"));
        }

        let cache_name = utils::read_cache_name(&request)?;
        let get_requests = request.into_inner();

        let mut results: Vec<Result<cache_client::GetResponse, tonic::Status>> = vec![];
        for get_request in get_requests.items {
            match self
                .cache
                .scalar_get(ACCOUNT_ID, cache_name.as_str(), get_request.cache_key)
            {
                Ok(Some(value)) => results.push(Ok(cache_client::GetResponse {
                    result: cache_client::ECacheResult::Hit as i32,
                    cache_body: value,
                    message: "".into(), // unused field
                })),
                Ok(None) => results.push(Ok(cache_client::GetResponse {
                    result: cache_client::ECacheResult::Miss as i32,
                    cache_body: vec![],
                    message: "".into(), // unused field
                })),
                Err(e) => results.push(Err(tonic::Status::internal(e.to_string()))),
            }
        }

        let response_stream = futures::StreamExt::boxed(async_stream::stream! {
            for result in results {
                println!("Yielding result: {:?}", result);
                yield result
            }
            println!("All results yielded, stream will now close.");
        });
        Ok(tonic::Response::new(response_stream))
    }

2. Testing Workflow:

  • Step 1: In one terminal, run cargo run to start Momento Local on port 8080.
  • Step 2: In a second terminal, compile and run the Java test program (javac momento/sdk/Program.java) to observe the retry behavior after the "SERVER_UNAVAILABLE" error is triggered and resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant