Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-18221: Add AccordConfig to track accord specific configs #39

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions accord-core/src/main/java/accord/impl/SimpleProgressLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import javax.annotation.Nullable;

import accord.utils.AccordConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should have accord.config package, this is likely to expand over time

import accord.utils.IntrusiveLinkedList;
import accord.utils.IntrusiveLinkedListNode;
import accord.coordinate.*;
Expand Down Expand Up @@ -88,11 +89,13 @@ boolean isAtLeastCommitted()
enum DisseminateStatus { NotExecuted, Durable, Done }

final Node node;
final AccordConfig config;
final List<Instance> instances = new CopyOnWriteArrayList<>();

public SimpleProgressLog(Node node)
public SimpleProgressLog(Node node, AccordConfig config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should Node expose config? That way you don't need to pass all over?

{
this.node = node;
this.config = config;
}

class Instance extends IntrusiveLinkedList<Instance.State.Monitoring> implements ProgressLog, Runnable
Expand Down Expand Up @@ -822,7 +825,7 @@ void ensureScheduled()
return;

isScheduled = true;
node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()).begin(commandStore.agent()), 200L, TimeUnit.MILLISECONDS);
node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()).begin(commandStore.agent()), config.progress_log_scheduler_delay_in_ms, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are trying to move away from _in_ms postfix... this may call out that we should likely move some of the config classes to accord?

}

@Override
Expand Down
8 changes: 5 additions & 3 deletions accord-core/src/main/java/accord/local/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand All @@ -34,6 +35,7 @@
import accord.messages.*;
import accord.primitives.*;
import accord.primitives.Routable.Domain;
import accord.utils.AccordConfig;
import accord.utils.MapReduceConsume;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
Expand Down Expand Up @@ -131,9 +133,9 @@ public boolean isCoordinating(TxnId txnId, Ballot promised)
// TODO (expected, liveness): monitor the contents of this collection for stalled coordination, and excise them
private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = new ConcurrentHashMap<>();

public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
public Node(Id id, AccordConfig config, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
BiFunction<Node, AccordConfig, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
{
this.id = id;
this.messageSink = messageSink;
Expand All @@ -145,7 +147,7 @@ public Node(Id id, MessageSink messageSink, ConfigurationService configService,
this.agent = agent;
this.random = random;
this.scheduler = scheduler;
this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this));
this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this, config));

configService.registerListener(this);
onTopologyUpdate(topology, false);
Expand Down
11 changes: 11 additions & 0 deletions accord-core/src/main/java/accord/utils/AccordConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package accord.utils;

public class AccordConfig {
public long progress_log_scheduler_delay_in_ms = 200L;

public AccordConfig(long progress_log_scheduler_delay_in_ms) {
this.progress_log_scheduler_delay_in_ms = progress_log_scheduler_delay_in_ms;
}
Comment on lines +6 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constructor isn't something we can maintain, as we add more and more configs this will become an issue


public AccordConfig() {}
}
3 changes: 2 additions & 1 deletion accord-core/src/test/java/accord/impl/basic/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import accord.messages.Request;
import accord.topology.TopologyRandomizer;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.RandomSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -219,7 +220,7 @@ public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier, Consume
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates);
lookup.put(node, new Node(node, messageSink, configService, nowSupplier.get(),
lookup.put(node, new Node(node, new AccordConfig(), messageSink, configService, nowSupplier.get(),
() -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
new ListAgent(30L, onFailure),
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
Expand Down
2 changes: 2 additions & 0 deletions accord-core/src/test/java/accord/impl/mock/MockCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import accord.local.Node.Id;
import accord.local.ShardDistributor;
import accord.primitives.Ranges;
import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;
import accord.utils.EpochFunction;
import accord.utils.RandomSource;
Expand Down Expand Up @@ -105,6 +106,7 @@ private Node createNode(Id id, Topology topology)
MessageSink messageSink = messageSinkFactory.apply(id, this);
MockConfigurationService configurationService = new MockConfigurationService(messageSink, onFetchTopology, topology);
return new Node(id,
new AccordConfig(),
messageSink,
configurationService,
nowSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import accord.primitives.*;
import accord.topology.Topology;
import com.google.common.collect.Lists;

import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -95,9 +97,9 @@ private static class NoOpProgressLog implements ProgressLog

private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
return new Node(id, new AccordConfig(), null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), null,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
SizeOfIntersectionSorter.SUPPLIER, (ignore, ignore2) -> ignore3 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions accord-core/src/test/java/accord/messages/PreAcceptTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import accord.impl.mock.MockCluster.Clock;
import accord.primitives.*;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;
import accord.utils.EpochFunction;
import accord.utils.ThreadPoolScheduler;
Expand Down Expand Up @@ -69,6 +70,7 @@ private static Node createNode(Id nodeId, MessageSink messageSink, Clock clock)
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
return new Node(nodeId,
new AccordConfig(),
messageSink,
new MockConfigurationService(messageSink, EpochFunction.noop(), TOPOLOGY),
clock,
Expand Down
3 changes: 2 additions & 1 deletion accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import accord.messages.Request;
import accord.api.Scheduler;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.RandomSource;

// TODO (low priority, testing): merge with accord.impl.basic.Cluster
Expand Down Expand Up @@ -307,7 +308,7 @@ public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer<Packet>
for (Id node : nodes)
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
lookup.put(node, new Node(node, new AccordConfig(), messageSink, new SimpleConfigService(topology),
nowSupplier.get(), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE,
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
Expand Down
3 changes: 2 additions & 1 deletion accord-maelstrom/src/main/java/accord/maelstrom/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import accord.local.ShardDistributor;
import accord.messages.ReplyContext;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;
import accord.utils.ThreadPoolScheduler;
import accord.maelstrom.Packet.Type;
Expand Down Expand Up @@ -160,7 +161,7 @@ public static void listen(TopologyFactory topologyFactory, Supplier<String> in,
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
on = new Node(init.self, new AccordConfig(), sink, new SimpleConfigService(topology), System::currentTimeMillis,
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
Expand Down