Skip to content

Commit

Permalink
Change to enable support for Redis Cluster and Sentinel (#23)
Browse files Browse the repository at this point in the history
* change to enable support for Redis Cluster and Sentinel

* Update to unit tests to reflect changes

---------

Co-authored-by: Neil <[email protected]>
  • Loading branch information
macca2317 and Neil authored Sep 13, 2023
1 parent 7f212ec commit b407582
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.redis.dynoqueue.ConfigurationHostSupplier;
Expand All @@ -40,7 +39,6 @@ public class RedisClusterConfiguration {
// Same as redis.clients.jedis.BinaryJedisCluster
protected static final int DEFAULT_MAX_ATTEMPTS = 5;

@Bean
public JedisCluster getJedisCluster(RedisProperties properties) {
GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = new GenericObjectPoolConfig<>();
genericObjectPoolConfig.setMaxTotal(properties.getMaxConnectionsPerHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.redis.dynoqueue.ConfigurationHostSupplier;
Expand All @@ -37,7 +36,6 @@ public class RedisSentinelConfiguration {

private static final Logger log = LoggerFactory.getLogger(RedisSentinelConfiguration.class);

@Bean
protected JedisSentinel getJedisSentinel(RedisProperties properties) {
GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = new GenericObjectPoolConfig<>();
genericObjectPoolConfig.setMinIdle(properties.getMinIdleConnections());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.redis.dynoqueue.ConfigurationHostSupplier;
Expand All @@ -29,9 +28,8 @@
@ConditionalOnProperty(name = "conductor.db.type", havingValue = "redis_standalone")
public class RedisStandaloneConfiguration {

private static final Logger log = LoggerFactory.getLogger(RedisSentinelConfiguration.class);
private static final Logger log = LoggerFactory.getLogger(RedisStandaloneConfiguration.class);

@Bean
public JedisPool getJedisPool(RedisProperties redisProperties) {
JedisPoolConfig config = new JedisPoolConfig();
config.setMinIdle(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/** A {@link JedisCommands} implementation that delegates to {@link JedisPool}. */
@Component
@ConditionalOnProperty(name = "conductor.db.type", havingValue = "redis_standalone")
public class JedisStandalone implements JedisCommands {
public class JedisStandalone implements OrkesJedisCommands {

private final JedisPool jedisPool;

Expand Down Expand Up @@ -1154,10 +1154,12 @@ public List<StreamConsumersInfo> xinfoConsumers(String key, String group) {
return executeInJedis(jedis -> jedis.xinfoConsumers(key, group));
}

@Override
public String set(byte[] key, byte[] value) {
return executeInJedis(jedis -> jedis.set(key, value));
}

@Override
public byte[] getBytes(byte[] key) {
return executeInJedis(jedis -> jedis.get(key));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.netflix.conductor.redis.jedis;

import redis.clients.jedis.commands.JedisCommands;

public interface OrkesJedisCommands extends JedisCommands {

String set(byte[] key, byte[] value);

byte[] getBytes(byte[] key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import com.netflix.conductor.redis.config.AnyRedisCondition;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.Tuple;
Expand All @@ -37,10 +36,10 @@ public class OrkesJedisProxy {

private static final Logger LOGGER = LoggerFactory.getLogger(OrkesJedisProxy.class);

protected JedisStandalone jedisCommands;
protected OrkesJedisCommands jedisCommands;

public OrkesJedisProxy(JedisPool jedisPool) {
this.jedisCommands = new JedisStandalone(jedisPool);
public OrkesJedisProxy(OrkesJedisCommands jedisCommands) {
this.jedisCommands = jedisCommands;
}

public Set<String> zrange(String key, long start, long end) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.redis.config.RedisProperties;
import com.netflix.conductor.redis.jedis.JedisMock;
import com.netflix.conductor.redis.jedis.JedisStandalone;
import com.netflix.conductor.redis.jedis.OrkesJedisProxy;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -56,7 +57,7 @@ public void init() {
when(properties.getTaskDefCacheRefreshInterval()).thenReturn(Duration.ofSeconds(60));
JedisPool jedisPool = mock(JedisPool.class);
when(jedisPool.getResource()).thenReturn(new JedisMock());
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(jedisPool);
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(new JedisStandalone(jedisPool));

redisEventHandlerDAO =
new RedisEventHandlerDAO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.redis.config.RedisProperties;
import com.netflix.conductor.redis.jedis.JedisMock;
import com.netflix.conductor.redis.jedis.JedisStandalone;
import com.netflix.conductor.redis.jedis.OrkesJedisProxy;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -56,7 +57,7 @@ public void init() {
when(properties.getTaskDefCacheRefreshInterval()).thenReturn(Duration.ofSeconds(60));
JedisPool jedisPool = mock(JedisPool.class);
when(jedisPool.getResource()).thenReturn(new JedisMock());
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(jedisPool);
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(new JedisStandalone(jedisPool));

executionDAO =
new RedisExecutionDAO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.netflix.conductor.dao.EventHandlerDAO;
import com.netflix.conductor.redis.config.RedisProperties;
import com.netflix.conductor.redis.jedis.JedisMock;
import com.netflix.conductor.redis.jedis.JedisStandalone;
import com.netflix.conductor.redis.jedis.OrkesJedisProxy;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -58,7 +59,7 @@ public void init() {
when(properties.getTaskDefCacheRefreshInterval()).thenReturn(Duration.ofSeconds(60));
JedisPool jedisPool = mock(JedisPool.class);
when(jedisPool.getResource()).thenReturn(new JedisMock());
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(jedisPool);
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(new JedisStandalone(jedisPool));
EventHandlerDAO eventHandlerDAO =
new RedisEventHandlerDAO(
orkesJedisProxy, objectMapper, conductorProperties, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.netflix.conductor.dao.PollDataDAOTest;
import com.netflix.conductor.redis.config.RedisProperties;
import com.netflix.conductor.redis.jedis.JedisMock;
import com.netflix.conductor.redis.jedis.JedisStandalone;
import com.netflix.conductor.redis.jedis.OrkesJedisProxy;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -49,7 +50,7 @@ public void init() {
when(properties.getTaskDefCacheRefreshInterval()).thenReturn(Duration.ofSeconds(60));
JedisPool jedisPool = mock(JedisPool.class);
when(jedisPool.getResource()).thenReturn(new JedisMock());
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(jedisPool);
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(new JedisStandalone(jedisPool));

redisPollDataDAO =
new RedisPollDataDAO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.redis.config.RedisProperties;
import com.netflix.conductor.redis.jedis.JedisMock;
import com.netflix.conductor.redis.jedis.JedisStandalone;
import com.netflix.conductor.redis.jedis.OrkesJedisProxy;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -53,7 +54,7 @@ public void init() {
when(properties.getTaskDefCacheRefreshInterval()).thenReturn(Duration.ofSeconds(60));
JedisPool jedisPool = mock(JedisPool.class);
when(jedisPool.getResource()).thenReturn(new JedisMock());
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(jedisPool);
OrkesJedisProxy orkesJedisProxy = new OrkesJedisProxy(new JedisStandalone(jedisPool));

rateLimitingDao =
new RedisRateLimitingDAO(
Expand Down

0 comments on commit b407582

Please sign in to comment.