diff --git a/accord-maelstrom/build.gradle b/accord-maelstrom/build.gradle deleted file mode 100644 index 2499217ab..000000000 --- a/accord-maelstrom/build.gradle +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { - id 'accord.java-conventions' -} - -dependencies { - implementation project(':accord-core') - implementation group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2' - implementation 'com.google.code.gson:gson:2.8.7' - - testImplementation project(':accord-core').sourceSets.test.output -} - -jar { - manifest { - attributes( - 'Main-Class': 'accord.maelstrom.Main', - ) - } -} - -task fatJar(type: Jar) { - manifest.from jar.manifest - archiveClassifier = 'all' - from { - configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) } - } { - exclude "META-INF/*.SF" - exclude "META-INF/*.DSA" - exclude "META-INF/*.RSA" - } - with jar -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Body.java b/accord-maelstrom/src/main/java/accord/maelstrom/Body.java deleted file mode 100644 index 4705e84ec..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Body.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import com.google.gson.JsonArray; -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; -import accord.local.Node.Id; -import accord.primitives.Txn; -import accord.maelstrom.Packet.Type; - -import static accord.utils.Utils.toArray; - -public class Body -{ - public static final long SENTINEL_MSG_ID = Long.MIN_VALUE; - - final Type type; - final long msg_id; - final long in_reply_to; - - public Body(Type type, long msg_id, long in_reply_to) - { - this.type = Objects.requireNonNull(type); - this.msg_id = msg_id; - this.in_reply_to = in_reply_to; - } - - void writeBody(JsonWriter out) throws IOException - { - out.name("type"); - out.value(type.name()); - if (msg_id > SENTINEL_MSG_ID) - { - out.name("msg_id"); - out.value(msg_id); - } - if (in_reply_to > SENTINEL_MSG_ID) - { - out.name("in_reply_to"); - out.value(in_reply_to); - } - } - - public static final TypeAdapter GSON_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Body value) throws IOException - { - out.beginObject(); - value.writeBody(out); - out.endObject(); - } - - @Override - public Body read(JsonReader in) throws IOException - { - return Body.read(in, null); - } - }; - - public static final TypeAdapter FAIL_READ = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Body value) throws IOException - { - out.beginObject(); - value.writeBody(out); - out.endObject(); - } - - @Override - public Body read(JsonReader in) - { - throw new UnsupportedOperationException(); - } - }; - - public static Body read(JsonReader in, Id from) throws IOException - { - Type type = null; - long msg_id = 0, in_reply_to = 0; - int code = -1; - String text = null; - Txn txn = null; - MaelstromResult txn_ok = null; - Object body = null; - Id node_id = null; - List node_ids = null; - String deferredTxn = null; - - in.beginObject(); - while (in.hasNext()) - { - String field = in.nextName(); - switch (field) - { - case "type": - String v = in.nextString(); - type = Type.valueOf(v); - break; - case "msg_id": - msg_id = in.nextLong(); - break; - case "in_reply_to": - in_reply_to = in.nextLong(); - break; - case "code": - code = in.nextInt(); - break; - case "text": - text = in.nextString(); - break; - case "body": - body = Json.GSON.fromJson(in, type.type); - break; - case "txn": - if (from == null) - throw new IllegalStateException(); - if (msg_id == 0 || type == null) deferredTxn = Json.GSON.fromJson(in, JsonArray.class).toString(); - else if (type == Type.txn) txn = MaelstromRequest.readTxnExternal(in, from, msg_id); - else txn_ok = MaelstromReply.readResultExternal(in, from, msg_id); - break; - case "node_id": - node_id = Json.ID_ADAPTER.read(in); - break; - case "node_ids": - node_ids = new ArrayList<>(); - in.beginArray(); - while (in.hasNext()) - node_ids.add(Json.ID_ADAPTER.read(in)); - in.endArray(); - break; - default: - throw new IllegalStateException("Unexpected field " + field); - } - } - in.endObject(); - - if (deferredTxn != null) - { - JsonReader in2 = new JsonReader(new StringReader(deferredTxn)); - if (type == Type.txn) txn = MaelstromRequest.readTxnExternal(in2, from, msg_id); - else txn_ok = MaelstromReply.readResultExternal(in2, from, msg_id); - } - - switch (type) - { - case init: return new MaelstromInit(msg_id, node_id, toArray(node_ids, Id[]::new)); - case init_ok: return new Body(Type.init_ok, msg_id, in_reply_to); - case txn: return new MaelstromRequest(msg_id, txn); - case txn_ok: return new MaelstromReply(in_reply_to, txn_ok); - case error: return new Error(in_reply_to, code, text); - default: return new Wrapper(type, msg_id, in_reply_to, body); - } - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java deleted file mode 100644 index f8481b570..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.LongSupplier; -import java.util.function.Supplier; - -import accord.impl.SizeOfIntersectionSorter; -import accord.impl.SimpleProgressLog; -import accord.impl.InMemoryCommandStores; -import accord.local.AgentExecutor; -import accord.local.Node; -import accord.local.Node.Id; -import accord.api.MessageSink; -import accord.local.ShardDistributor; -import accord.messages.Callback; -import accord.messages.SafeCallback; -import accord.messages.Reply; -import accord.messages.ReplyContext; -import accord.messages.Request; -import accord.api.Scheduler; -import accord.topology.Topology; -import accord.utils.RandomSource; -import accord.utils.async.AsyncChains; -import accord.utils.async.AsyncResult; - -import static java.util.stream.Collectors.toList; - -// TODO (low priority, testing): merge with accord.impl.basic.Cluster -public class Cluster implements Scheduler -{ - public interface Queue - { - void add(T item); - void add(T item, long delay, TimeUnit units); - T poll(); - int size(); - } - - public interface QueueSupplier - { - Queue get(); - } - - public static class InstanceSink implements MessageSink - { - final Id self; - final Function lookup; - final Cluster parent; - final RandomSource random; - - int nextMessageId = 0; - Map callbacks = new LinkedHashMap<>(); - - public InstanceSink(Id self, Function lookup, Cluster parent, RandomSource random) - { - this.self = self; - this.lookup = lookup; - this.parent = parent; - this.random = random; - } - - @Override - public synchronized void send(Id to, Request send) - { - parent.add(self, to, Body.SENTINEL_MSG_ID, send); - } - - @Override - public void send(Id to, Request send, AgentExecutor executor, Callback callback) - { - long messageId = nextMessageId++; - SafeCallback sc = new SafeCallback(executor, callback); - callbacks.put(messageId, sc); - parent.add(self, to, messageId, send); - parent.pending.add((Runnable)() -> { - if (sc == callbacks.remove(messageId)) - sc.timeout(to); - }, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS); - } - - @Override - public void reply(Id replyToNode, ReplyContext replyContext, Reply reply) - { - long replyToMessage = ((Packet) replyContext).body.msg_id; - parent.add(self, replyToNode, replyToMessage, reply); - } - } - - final Function lookup; - final Queue pending; - final Consumer responseSink; - final Map sinks = new HashMap<>(); - final PrintWriter err; - int clock; - int recurring; - Set partitionSet; - - public Cluster(QueueSupplier queueSupplier, Function lookup, Consumer responseSink, OutputStream stderr) - { - this.pending = queueSupplier.get(); - this.lookup = lookup; - this.responseSink = responseSink; - this.err = new PrintWriter(stderr); - this.partitionSet = new HashSet<>(); - } - - InstanceSink create(Id self, RandomSource random) - { - InstanceSink sink = new InstanceSink(self, lookup, this, random); - sinks.put(self, sink); - return sink; - } - - private void add(Packet packet) - { - if (packet == null) - throw new IllegalArgumentException(); - - err.println(clock++ + " SEND " + packet); - err.flush(); - if (lookup.apply(packet.dest) == null) responseSink.accept(packet); - else pending.add(packet); - } - - void add(Id from, Id to, long messageId, Request send) - { - add(new Packet(from, to, messageId, send)); - } - - void add(Id from, Id to, long replyId, Reply send) - { - add(new Packet(from, to, replyId, send)); - } - - public boolean processPending() - { - if (pending.size() == recurring) - return false; - - Object next = pending.poll(); - if (next == null) - return false; - - if (next instanceof Packet) - { - Packet deliver = (Packet) next; - Node on = lookup.apply(deliver.dest); - switch (deliver.body.type) - { - case init: - throw new IllegalStateException(); - case txn: - err.println(clock++ + " RECV " + deliver); - err.flush(); - on.receive((MaelstromRequest)deliver.body, deliver.src, deliver); - break; - default: - // Drop the message if it goes across the partition - boolean drop = !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dest) - || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dest)); - if (drop) - { - err.println(clock++ + " DROP " + deliver); - err.flush(); - break; - } - err.println(clock++ + " RECV " + deliver); - err.flush(); - Object body = ((Wrapper)deliver.body).body; - // for some reason InformOfTxnReply has deliver.body.in_reply_to == Body.SENTINEL_MSG_ID, so is unique - // for all reply types - if (deliver.body.in_reply_to > Body.SENTINEL_MSG_ID || body instanceof Reply) - { - Reply reply = (Reply) body; - SafeCallback callback = sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to); - if (callback != null) - callback.success(deliver.src, reply); - } - else on.receive((Request) body, deliver.src, deliver); - } - } - else - { - ((Runnable) next).run(); - } - return true; - } - - class CancellableRunnable implements Runnable, Scheduled - { - final boolean recurring; - final long delay; - final TimeUnit units; - Runnable run; - - CancellableRunnable(Runnable run, boolean recurring, long delay, TimeUnit units) - { - this.run = run; - this.recurring = recurring; - this.delay = delay; - this.units = units; - } - - @Override - public void run() - { - if (run != null) - { - run.run(); - if (recurring) - pending.add(this, delay, units); - } - } - - @Override - public void cancel() - { - run = null; - } - } - - @Override - public Scheduled recurring(Runnable run, long delay, TimeUnit units) - { - CancellableRunnable result = new CancellableRunnable(run, true, delay, units); - ++recurring; - pending.add(result, delay, units); - return result; - } - - @Override - public Scheduled once(Runnable run, long delay, TimeUnit units) - { - CancellableRunnable result = new CancellableRunnable(run, false, delay, units); - pending.add(result, delay, units); - return result; - } - - @Override - public void now(Runnable run) - { - run.run(); - } - - public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer responseSink, Supplier randomSupplier, Supplier nowSupplier, TopologyFactory topologyFactory, InputStream stdin, OutputStream stderr) throws IOException - { - try (BufferedReader in = new BufferedReader(new InputStreamReader(stdin))) - { - run(nodes, queueSupplier, responseSink, randomSupplier, nowSupplier, topologyFactory, () -> { - try - { - return Packet.parse(in.readLine()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }, stderr); - } - } - - public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer responseSink, Supplier randomSupplier, Supplier nowSupplier, TopologyFactory topologyFactory, Supplier in, OutputStream stderr) - { - Topology topology = topologyFactory.toTopology(nodes); - Map lookup = new HashMap<>(); - try - { - Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr); - for (Id node : nodes) - { - MessageSink messageSink = sinks.create(node, randomSupplier.get()); - lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology), - nowSupplier.get(), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), - MaelstromAgent.INSTANCE, - randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new)); - } - - AsyncResult startup = AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()), (a, b) -> null).beginAsResult(); - while (sinks.processPending()); - if (!startup.isDone()) throw new AssertionError(); - - List nodesList = new ArrayList<>(Arrays.asList(nodes)); - sinks.recurring(() -> - { - Collections.shuffle(nodesList, randomSupplier.get().asJdkRandom()); - int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2); - sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize)); - }, 5L, TimeUnit.SECONDS); - - Packet next; - while ((next = in.get()) != null) - sinks.add(next); - - while (sinks.processPending()); - } - finally - { - lookup.values().forEach(Node::shutdown); - } - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java deleted file mode 100644 index 90f93e087..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.Objects; -import java.util.function.BiFunction; -import java.util.zip.CRC32; - -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; - -public class Datum implements Comparable -{ - public static final boolean COMPARE_BY_HASH = true; - - public static class Hash implements Comparable - { - final int hash; - public Hash(int hash) - { - this.hash = hash; - } - - @Override - public int compareTo(Hash that) - { - return Integer.compare(this.hash, that.hash); - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Hash hash1 = (Hash) o; - return hash == hash1.hash; - } - - @Override - public int hashCode() - { - return Objects.hash(hash); - } - - public String toString() - { - return "#" + hash; - } - } - - public enum Kind - { - STRING, LONG, DOUBLE, HASH; - - public MaelstromKey.Routing[] split(int count) - { - if (count <= 1) - throw new IllegalArgumentException(); - - MaelstromKey.Routing[] result = new MaelstromKey.Routing[count]; - if (this != Kind.HASH) - throw new UnsupportedOperationException(); - - int delta = 2 * (Integer.MAX_VALUE / count); - int start = Integer.MIN_VALUE; - for (int i = 0; i < count; ++i) - result[i] = new MaelstromKey.Routing(start + i * delta); - - return result; - } - - private static final int CHARS = 63; - - private static String toString(long v) - { - if (v == 0) return ""; - --v; - char[] buf = new char[4]; - for (int i = 3 ; i >= 0 ; --i) - { - buf[i] = toChar(v % CHARS); - v /= CHARS; - } - return new String(buf); - } - - private static char toChar(long v) - { - if (v == 0) return ' '; - v -= 1; - if (v < 10) return (char) ('0' + v); - v -= 10; - if (v < 26) return (char) ('A' + v); - v -= 26; - return (char) ('a' + v); - } - - } - - public final Kind kind; - public final Object value; - - Datum(Kind kind, Object value) - { - this.kind = kind; - this.value = value; - } - - public Datum(String value) - { - this(Kind.STRING, value); - } - - public Datum(Long value) - { - this(Kind.LONG, value); - } - - public Datum(Double value) - { - this(Kind.DOUBLE, value); - } - - public Datum(Hash value) - { - this(Kind.HASH, value); - } - - @Override - public int hashCode() - { - return value == null ? 0 : hash(value); - } - - @Override - public boolean equals(Object that) - { - return that == this || (that instanceof Datum && equals((Datum) that)); - } - - public boolean equals(Datum that) - { - return this.kind.equals(that.kind) && this.value.equals(that.value); - } - - @Override - public String toString() - { - return value == null ? kind + ":+Inf" : value.toString(); - } - - @Override - public int compareTo(Datum that) - { - int c = 0; - if (COMPARE_BY_HASH) - c = Integer.compare(hash(this.value), hash(that.value)); - if (c == 0) c = this.kind.compareTo(that.kind); - if (c != 0) return c; - if (this.value == null || that.value == null) - { - if (this.value == null && that.value == null) - return 0; - return this.value == null ? 1 : -1; - } - return ((Comparable)this.value).compareTo(that.value); - } - - static int hash(Object object) - { - if (object == null) - return Integer.MAX_VALUE; - - if (object instanceof Hash) - return ((Hash) object).hash; - - CRC32 crc32c = new CRC32(); - int i = object.hashCode(); - crc32c.update(i); - crc32c.update(i >> 8); - crc32c.update(i >> 16); - crc32c.update(i >> 24); - return (int)crc32c.getValue(); - } - - public static Datum read(JsonReader in) throws IOException - { - return read(in, Datum::new); - } - - public void write(JsonWriter out) throws IOException - { - if (!isSimple()) - { - out.beginArray(); - out.value(kind.toString()); - if (kind == Kind.HASH) - { - out.value(value != null); - if (value != null) - out.value(((Hash)value).hash); - } - out.endArray(); - return; - } - switch (kind) - { - default: throw new IllegalStateException(); - case LONG: out.value((Long) value); break; - case DOUBLE: out.value((Double) value); break; - case STRING: out.value((String) value); break; - } - } - - public boolean isSimple() - { - return value != null && kind != Kind.HASH; - } - - protected static V read(JsonReader in, BiFunction constructor) throws IOException - { - Datum.Kind type; - Object value; - switch (in.peek()) - { - default: - throw new IllegalStateException(); - case BEGIN_ARRAY: - in.beginArray(); - type = Kind.valueOf(in.nextString()); - if (type == Kind.HASH && in.nextBoolean()) value = new Hash(in.nextInt()); - else value = null; - in.endArray(); - break; - case STRING: - value = in.nextString(); - type = Datum.Kind.STRING; - break; - case NUMBER: - try { value = in.nextLong(); type = Datum.Kind.LONG; } - catch (IllegalArgumentException iae) { value = in.nextDouble(); type = Datum.Kind.DOUBLE; } - break; - } - return constructor.apply(type, value); - } - - public static final TypeAdapter GSON_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Datum value) throws IOException - { - value.write(out); - } - - @Override - public Datum read(JsonReader in) throws IOException - { - return Datum.read(in); - } - }; -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java deleted file mode 100644 index d06911807..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; - -import accord.maelstrom.Packet.Type; -import accord.messages.MessageType; -import com.google.gson.stream.JsonWriter; -import accord.messages.Reply; - -public class Error extends Body implements Reply -{ - final int code; - final String text; - - public Error(long in_reply_to, int code, String text) - { - super(Type.error, SENTINEL_MSG_ID, in_reply_to); - this.code = code; - this.text = text; - } - - @Override - void writeBody(JsonWriter out) throws IOException - { - super.writeBody(out); - out.name("code"); - out.value(code); - out.name("text"); - out.value(text); - } - - @Override - public MessageType type() - { - throw new UnsupportedOperationException(); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java deleted file mode 100644 index a74530487..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java +++ /dev/null @@ -1,556 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.*; - -import accord.api.RoutingKey; -import accord.local.Node; -import accord.api.Result; -import accord.messages.ReadData.ReadOk; -import accord.primitives.*; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; -import accord.local.Node.Id; -import accord.api.Key; - -public class Json -{ - public static final Gson GSON; - public static final TypeAdapter DEFAULT_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Object value) - { - GSON.toJson(value, Object.class, out); - } - - @Override - public Object read(JsonReader in) - { - return GSON.fromJson(in, Object.class); - } - }; - - public static final TypeAdapter ID_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Id value) throws IOException - { - if (value.id == 0) out.nullValue(); - else out.value(Json.toString(value)); - } - - @Override - public Id read(JsonReader in) throws IOException - { - if (in.peek() == JsonToken.NULL) - { - in.nextNull(); - return Id.NONE; - } - - return parseId(in.nextString()); - } - }; - - public static Id parseId(String id) - { - switch (id.charAt(0)) - { - //TODO(Review) - toString idn't remove the - so doing - parseInt makes the value positive, which changes - // the value - case 'c': return new Id( Integer.parseInt(id.substring(1))); - case 'n': return new Id( Integer.parseInt(id.substring(1))); - default: throw new IllegalStateException(); - } - } - - public static String toString(Id id) - { - if (id.id < 0) return "c" + id.id; - else return "n" + id.id; - } - - public static final TypeAdapter TIMESTAMP_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Timestamp value) throws IOException - { - if (value == null) out.nullValue(); - else writeTimestamp(out, value); - } - - @Override - public Timestamp read(JsonReader in) throws IOException - { - if (in.peek() == JsonToken.NULL) - { - in.nextNull(); - return null; - } - return readTimestamp(in, Timestamp::fromBits); - } - }; - - private interface TimestampFactory - { - T create(long msb, long lsb, Id node); - } - - private static T readTimestamp(JsonReader in, TimestampFactory factory) throws IOException - { - if (in.peek() == JsonToken.NULL) - { - in.nextNull(); - return null; - } - in.beginArray(); - long msb = in.nextLong(); - long lsb = in.nextLong(); - Id node = ID_ADAPTER.read(in); - in.endArray(); - return factory.create(msb, lsb, node); - } - - private static void writeTimestamp(JsonWriter out, Timestamp timestamp) throws IOException - { - if (timestamp == null) - { - out.nullValue(); - return; - } - out.beginArray(); - out.value(timestamp.msb); - out.value(timestamp.lsb); - ID_ADAPTER.write(out, timestamp.node); - out.endArray(); - } - - public static final TypeAdapter TXNID_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, TxnId value) throws IOException - { - writeTimestamp(out, value); - } - - @Override - public TxnId read(JsonReader in) throws IOException - { - return readTimestamp(in, TxnId::fromBits); - } - }; - - public static final TypeAdapter BALLOT_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Ballot value) throws IOException - { - writeTimestamp(out, value); - } - - @Override - public Ballot read(JsonReader in) throws IOException - { - return readTimestamp(in, Ballot::fromBits); - } - }; - - - public static final TypeAdapter KEYS_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Keys value) throws IOException - { - out.beginArray(); - for (Key key : value) - ((MaelstromKey)key).datum.write(out); - out.endArray(); - } - - @Override - public Keys read(JsonReader in) throws IOException - { - List keys = new ArrayList<>(); - in.beginArray(); - while (in.hasNext()) - keys.add(MaelstromKey.readKey(in)); - in.endArray(); - return Keys.of(keys); - } - }; - - public static final TypeAdapter TXN_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Txn txn) throws IOException - { - if (txn == null) - { - out.nullValue(); - return; - } - - Keys keys = (Keys)txn.keys(); - MaelstromRead read = (MaelstromRead) txn.read(); - MaelstromUpdate update = (MaelstromUpdate) txn.update(); - - out.beginObject(); - out.name("r"); - out.beginArray(); - for (int i = 0 ; i < keys.size() ; ++i) - { - MaelstromKey.Key key = (MaelstromKey.Key) keys.get(i); - if (read.keys.indexOf(key) >= 0) - { - key.datum.write(out); - } - } - out.endArray(); - out.name("append"); - out.beginArray(); - for (int i = 0 ; i < keys.size() ; ++i) - { - MaelstromKey key = (MaelstromKey) keys.get(i); - if (update != null && update.containsKey(key)) - { - out.beginArray(); - key.datum.write(out); - update.get(key).write(out); - out.endArray(); - } - } - out.endArray(); - out.name("client"); - out.value(((MaelstromQuery)txn.query()).client.id); - out.name("requestId"); - out.value(((MaelstromQuery)txn.query()).requestId); - out.endObject(); - } - - @Override - public Txn read(JsonReader in) throws IOException - { - if (in.peek() == JsonToken.NULL) - return null; - - NavigableSet buildReadKeys = new TreeSet<>(); - NavigableSet buildKeys = new TreeSet<>(); - MaelstromUpdate update = new MaelstromUpdate(); - - Node.Id client = null; - long requestId = Long.MIN_VALUE; - in.beginObject(); - while (in.hasNext()) - { - String kind = in.nextName(); - switch (kind) - { - default: throw new IllegalStateException("Invalid kind: " + kind); - case "r": - in.beginArray(); - while (in.hasNext()) - buildReadKeys.add(MaelstromKey.readKey(in)); - in.endArray(); - break; - case "append": - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - Key key = MaelstromKey.readKey(in); - buildKeys.add(key); - Value append = Value.read(in); - update.put(key, append); - in.endArray(); - } - in.endArray(); - break; - case "client": - client = ID_ADAPTER.read(in); - break; - case "requestId": - requestId = in.nextLong(); - break; - } - } - in.endObject(); - - if (client == null) - throw new IllegalStateException(); - - buildKeys.addAll(buildReadKeys); - Keys readKeys = new Keys(buildReadKeys); - Keys keys = new Keys(buildKeys); - MaelstromRead read = new MaelstromRead(readKeys, keys); - MaelstromQuery query = new MaelstromQuery(client, requestId); - - return new Txn.InMemory(keys, read, query, update); - } - }; - - public static final TypeAdapter DEPS_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Deps value) throws IOException - { - out.beginObject(); - out.name("keyDeps"); - out.beginArray(); - for (Map.Entry e : value.keyDeps) - { - out.beginArray(); - ((MaelstromKey)e.getKey()).datum.write(out); - GSON.toJson(e.getValue(), TxnId.class, out); - out.endArray(); - } - out.endArray(); - out.name("rangeDeps"); - out.beginArray(); - for (Map.Entry e : value.rangeDeps) - { - out.beginArray(); - ((MaelstromKey)e.getKey().start()).datum.write(out); - ((MaelstromKey)e.getKey().end()).datum.write(out); - GSON.toJson(e.getValue(), TxnId.class, out); - out.endArray(); - } - out.endArray(); - out.endObject(); - } - - @Override - public Deps read(JsonReader in) throws IOException - { - KeyDeps keyDeps = KeyDeps.NONE; - RangeDeps rangeDeps = RangeDeps.NONE; - in.beginObject(); - while (in.hasNext()) - { - String name; - switch (name = in.nextName()) - { - case "keyDeps": - { - try (KeyDeps.Builder builder = KeyDeps.builder()) - { - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - Key key = MaelstromKey.readKey(in); - TxnId txnId = GSON.fromJson(in, TxnId.class); - builder.add(key, txnId); - in.endArray(); - } - in.endArray(); - keyDeps = builder.build(); - } - } - break; - case "rangeDeps": - { - try (RangeDeps.Builder builder = RangeDeps.builder()) - { - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - RoutingKey start = MaelstromKey.readRouting(in); - RoutingKey end = MaelstromKey.readRouting(in); - TxnId txnId = GSON.fromJson(in, TxnId.class); - builder.add(new MaelstromKey.Range(start, end), txnId); - in.endArray(); - } - in.endArray(); - rangeDeps = builder.build(); - } - } - break; - default: throw new AssertionError("Unknown name: " + name); - } - } - in.endObject(); - return new Deps(keyDeps, rangeDeps); - } - }; - - public static final TypeAdapter TXN_WRITES_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Writes value) throws IOException - { - if (value == null) - { - out.nullValue(); - return; - } - out.beginObject(); - out.name("txnId"); - GSON.toJson(value.txnId, TxnId.class, out); - out.name("executeAt"); - GSON.toJson(value.executeAt, Timestamp.class, out); - out.name("keys"); - Keys keys = (Keys) value.keys; - KEYS_ADAPTER.write(out, keys); - out.name("writes"); - MaelstromWrite write = (MaelstromWrite) value.write; - out.beginArray(); - for (Key key : keys) - { - Value append = write.get(key); - if (append == null) out.nullValue(); - else append.write(out); - } - out.endArray(); - out.endObject(); - } - - @Override - public Writes read(JsonReader in) throws IOException - { - if (in.peek() == JsonToken.NULL) - return null; - - in.beginObject(); - TxnId txnId = null; - Timestamp executeAt = null; - Keys keys = null; - List writes = null; - while (in.hasNext()) - { - switch (in.nextName()) - { - default: throw new IllegalStateException(); - case "txnId": - txnId = GSON.fromJson(in, TxnId.class); - break; - case "executeAt": - executeAt = GSON.fromJson(in, Timestamp.class); - break; - case "keys": - keys = KEYS_ADAPTER.read(in); - break; - case "writes": - writes = new ArrayList<>(); - in.beginArray(); - while (in.hasNext()) - writes.add(Value.read(in)); - in.endArray(); - break; - } - } - in.endObject(); - - MaelstromWrite write = new MaelstromWrite(); - if (writes != null) - { - for (int i = 0 ; i < writes.size() ; ++i) - { - if (writes.get(i) != null) - write.put(keys.get(i), writes.get(i)); - } - } - return new Writes(txnId, executeAt, keys, write); - } - }; - - public static final TypeAdapter READ_OK_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, ReadOk value) throws IOException - { - out.beginArray(); - if (value.data != null) - { - for (Map.Entry e : ((MaelstromData)value.data).entrySet()) - { - out.beginArray(); - ((MaelstromKey)e.getKey()).datum.write(out); - e.getValue().write(out); - out.endArray(); - } - } - out.endArray(); - } - - @Override - public ReadOk read(JsonReader in) throws IOException - { - MaelstromData result = new MaelstromData(); - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - Key key = MaelstromKey.readKey(in); - Value value = Value.read(in); - result.put(key, value); - in.endArray(); - } - in.endArray(); - return new ReadOk(Ranges.EMPTY, result); - } - }; - - static final TypeAdapter FAIL = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Object value) - { - throw new UnsupportedOperationException(); - } - - @Override - public Object read(JsonReader in) - { - throw new UnsupportedOperationException(); - } - }; - - static - { - GSON = new GsonBuilder().registerTypeAdapter(Packet.class, Packet.GSON_ADAPTER) - .registerTypeAdapter(Id.class, ID_ADAPTER) - .registerTypeAdapter(Txn.class, TXN_ADAPTER) - .registerTypeAdapter(Ballot.class, BALLOT_ADAPTER) - .registerTypeAdapter(TxnId.class, TXNID_ADAPTER) - .registerTypeAdapter(Timestamp.class, TIMESTAMP_ADAPTER) - .registerTypeAdapter(Datum.class, Datum.GSON_ADAPTER) - .registerTypeAdapter(MaelstromKey.Key.class, MaelstromKey.GSON_KEY_ADAPTER) - .registerTypeAdapter(MaelstromKey.Routing.class, MaelstromKey.GSON_ROUTING_ADAPTER) - .registerTypeAdapter(Value.class, Value.GSON_ADAPTER) - .registerTypeAdapter(Writes.class, TXN_WRITES_ADAPTER) - .registerTypeAdapter(MaelstromResult.class, MaelstromResult.GSON_ADAPTER) - .registerTypeAdapter(ReadOk.class, READ_OK_ADAPTER) - .registerTypeAdapter(Deps.class, Json.DEPS_ADAPTER) - .registerTypeAdapter(Keys.class, KEYS_ADAPTER) - .registerTypeAdapter(Body.class, Body.FAIL_READ) - .registerTypeAdapter(Result.class, MaelstromResult.GSON_ADAPTER) - .registerTypeAdapter(MaelstromRequest.class, Body.FAIL_READ) - .registerTypeAdapter(MaelstromReply.class, Body.FAIL_READ) - .create(); - } - -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java deleted file mode 100644 index 672703803..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import accord.local.Command; -import accord.local.Node; -import accord.api.Agent; -import accord.api.Result; -import accord.primitives.*; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; - -import java.util.concurrent.TimeUnit; - -public class MaelstromAgent implements Agent -{ - static final MaelstromAgent INSTANCE = new MaelstromAgent(); - - @Override - public void onRecover(Node node, Result success, Throwable fail) - { - if (success != null) - { - MaelstromResult result = (MaelstromResult) success; - node.reply(result.client, MaelstromReplyContext.contextFor(result.requestId), new MaelstromReply(result.requestId, result)); - } - } - - @Override - public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next) - { - throw new AssertionError(); - } - - @Override - public void onFailedBootstrap(String phase, Ranges ranges, Runnable retry, Throwable failure) - { - throw new AssertionError(); - } - - @Override - public void onUncaughtException(Throwable t) - { - } - - @Override - public void onHandledException(Throwable t) - { - } - - @Override - public boolean isExpired(TxnId initiated, long now) - { - return TimeUnit.SECONDS.convert(now - initiated.hlc(), TimeUnit.MICROSECONDS) >= 10; - } - - @Override - public Txn emptyTxn(Txn.Kind kind, Seekables keysOrRanges) - { - return new Txn.InMemory(kind, keysOrRanges, new MaelstromRead(Keys.EMPTY, Keys.EMPTY), new MaelstromQuery(Node.Id.NONE, -1), null); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java deleted file mode 100644 index afa614d62..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.util.TreeMap; - -import accord.api.Data; -import accord.api.Key; - -public class MaelstromData extends TreeMap implements Data -{ - @Override - public Data merge(Data data) - { - if (data != null) - this.putAll(((MaelstromData)data)); - return this; - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromInit.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromInit.java deleted file mode 100644 index 14eba332d..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromInit.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; - -import accord.maelstrom.Packet.Type; -import com.google.gson.stream.JsonWriter; -import accord.local.Node.Id; - -public class MaelstromInit extends Body -{ - final Id self; - final Id[] cluster; - - public MaelstromInit(long msg_id, Id self, Id[] cluster) - { - super(Type.init, msg_id, SENTINEL_MSG_ID); - this.self = self; - this.cluster = cluster; - } - - @Override - void writeBody(JsonWriter out) throws IOException - { - super.writeBody(out); - out.name("node_id"); - out.value(self.id); - out.name("node_ids"); - out.beginArray(); - for (Id node : cluster) - out.value(node.id); - out.endArray(); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java deleted file mode 100644 index 7dbca6c7d..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.Objects; - -import accord.api.RoutingKey; - -import accord.local.ShardDistributor; -import accord.primitives.RoutableKey; -import accord.utils.Invariants; -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; - -import javax.annotation.Nonnull; - -public class MaelstromKey implements RoutableKey -{ - public static class Splitter implements ShardDistributor.EvenSplit.Splitter - { - private static long hash(RoutingKey routingKey) - { - Datum.Hash hash = ((Datum.Hash)((MaelstromKey)routingKey).datum.value); - if (hash == null) - return Integer.MAX_VALUE; - return hash.hash; - } - - @Override - public Long sizeOf(accord.primitives.Range range) - { - return hash(range.end()) - hash(range.start()); - } - - @Override - public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end) - { - Invariants.checkState(end - start <= Integer.MAX_VALUE); - long startHash = hash(range.start()); - Invariants.checkArgument(startHash + end <= hash(range.end())); - return range.newRange( - new Routing((int) (startHash + start)), - new Routing((int) (startHash + end)) - ); - } - - @Override - public Long zero() - { - return 0L; - } - - @Override - public Long add(Long a, Long b) - { - return a + b; - } - - @Override - public Long subtract(Long a, Long b) - { - return a - b; - } - - @Override - public Long divide(Long a, int i) - { - return a / i; - } - - @Override - public Long multiply(Long a, int i) - { - return a * i; - } - - @Override - public int min(Long v, int i) - { - return (int)Math.min(v, i); - } - - @Override - public int compare(Long a, Long b) - { - return a.compareTo(b); - } - } - - public static class Key extends MaelstromKey implements accord.api.Key - { - public Key(Datum.Kind kind, Object value) - { - super(kind, value); - } - - public Key(Double value) - { - super(value); - } - } - - public static class Routing extends MaelstromKey implements accord.api.RoutingKey - { - public Routing(Datum.Kind kind, Object hash) - { - super(kind, hash); - Invariants.checkArgument(kind == Datum.Kind.HASH); - } - - public Routing(int hash) - { - super(new Datum(new Datum.Hash(hash))); - } - - @Override - public accord.primitives.Range asRange() - { - return new Range(new Routing(datum.hashCode() - 1), - new Routing(datum.hashCode())); - } - } - - public static class Range extends accord.primitives.Range.EndInclusive - { - public Range(RoutingKey start, RoutingKey end) - { - super(start, end); - } - - @Override - public accord.primitives.Range newRange(RoutingKey start, RoutingKey end) - { - return new Range(start, end); - } - } - - final Datum datum; - - public MaelstromKey(Datum.Kind kind, Object value) - { - datum = new Datum(kind, value); - } - - public MaelstromKey(Double value) - { - datum = new Datum(value); - } - - MaelstromKey(Datum value) - { - datum = value; - } - - @Override - public int compareTo(@Nonnull RoutableKey that) - { - return datum.compareTo(((MaelstromKey) that).datum); - } - - public static Key readKey(JsonReader in) throws IOException - { - return Datum.read(in, Key::new); - } - - public static Routing readRouting(JsonReader in) throws IOException - { - return Datum.read(in, Routing::new); - } - - public static final TypeAdapter GSON_KEY_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Key value) throws IOException - { - value.datum.write(out); - } - - @Override - public Key read(JsonReader in) throws IOException - { - return MaelstromKey.readKey(in); - } - }; - - public static final TypeAdapter GSON_ROUTING_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Routing value) throws IOException - { - value.datum.write(out); - } - - @Override - public Routing read(JsonReader in) throws IOException - { - return MaelstromKey.readRouting(in); - } - }; - - @Override - public RoutingKey toUnseekable() - { - if (this instanceof Routing) - return (Routing)this; - return new Routing(datum.value.hashCode()); - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MaelstromKey that = (MaelstromKey) o; - return Objects.equals(datum, that.datum); - } - - @Override - public int hashCode() - { - return Objects.hash(datum); - } - - @Override - public String toString() - { - return datum.toString(); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java deleted file mode 100644 index cf0621b88..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.util.Map; - -import accord.api.Read; -import accord.api.Update; -import accord.local.Node; -import accord.local.Node.Id; -import accord.api.Data; -import accord.api.Key; -import accord.api.Query; -import accord.api.Result; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; - -public class MaelstromQuery implements Query -{ - final Node.Id client; - final long requestId; - - public MaelstromQuery(Id client, long requestId) - { - this.client = client; - this.requestId = requestId; - } - - @Override - public Result compute(TxnId txnId, Timestamp executeAt, Data data, Read untypedRead, Update update) - { - MaelstromRead read = (MaelstromRead) untypedRead; - Value[] values = new Value[read.readKeys.size()]; - for (Map.Entry e : ((MaelstromData)data).entrySet()) - values[read.readKeys.indexOf(e.getKey())] = e.getValue(); - return new MaelstromResult(client, requestId, read.readKeys, values, (MaelstromUpdate) update); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java deleted file mode 100644 index 08031512d..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import accord.api.*; -import accord.local.SafeCommandStore; -import accord.primitives.*; -import accord.primitives.Ranges; -import accord.primitives.Keys; -import accord.primitives.Timestamp; -import accord.primitives.Txn; -import accord.utils.async.AsyncChain; -import accord.utils.async.AsyncChains; - -public class MaelstromRead implements Read -{ - final Keys readKeys; - final Keys keys; - - public MaelstromRead(Keys readKeys, Keys keys) - { - this.readKeys = readKeys; - this.keys = keys; - } - - @Override - public Keys keys() - { - return keys; - } - - @Override - public AsyncChain read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) - { - MaelstromStore s = (MaelstromStore)store; - MaelstromData result = new MaelstromData(); - result.put((Key)key, s.get((Key)key)); - return AsyncChains.success(result); - } - - @Override - public Read slice(Ranges ranges) - { - return new MaelstromRead(readKeys.slice(ranges), keys.slice(ranges)); - } - - @Override - public Read merge(Read other) - { - MaelstromRead that = (MaelstromRead) other; - Keys readKeys = this.readKeys.with(that.readKeys); - Keys keys = this.keys.with(that.keys); - if (readKeys == this.readKeys && keys == this.keys) - return this; - if (readKeys == that.readKeys && keys == that.keys) - return that; - return new MaelstromRead(readKeys.with(that.readKeys), keys.with(that.keys)); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java deleted file mode 100644 index 2cb9fd2a2..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.NavigableMap; -import java.util.TreeMap; - -import accord.local.Node; -import accord.api.Key; -import accord.messages.MessageType; -import accord.primitives.Keys; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; -import accord.maelstrom.Packet.Type; -import accord.messages.Reply; - -public class MaelstromReply extends Body implements Reply -{ - final MaelstromResult result; - - public MaelstromReply(long in_reply_to, MaelstromResult result) - { - super(Type.txn_ok, SENTINEL_MSG_ID, in_reply_to); - this.result = result; - } - - @Override - public MessageType type() - { - throw new UnsupportedOperationException(); - } - - @Override - void writeBody(JsonWriter out) throws IOException - { - super.writeBody(out); - out.name("txn"); - Keys keys = result.keys; - Value[] reads = result.read; - MaelstromUpdate update = result.update; - out.beginArray(); - for (int i = 0 ; i < keys.size() ; ++i) - { - MaelstromKey key = (MaelstromKey) keys.get(i); - if (reads[i] != null) - { - out.beginArray(); - out.value("r"); - key.datum.write(out); - reads[i].writeVerbose(out); - out.endArray(); - } - if (update != null && update.containsKey(key)) - { - for (Datum append : update.get(key).contents) - { - out.beginArray(); - out.value("append"); - key.datum.write(out); - append.write(out); - out.endArray(); - } - } - } - out.endArray(); - } - - public static MaelstromResult readResultExternal(JsonReader in, Node.Id client, long requestId) throws IOException - { - if (in.peek() == JsonToken.NULL) - return null; - - NavigableMap reads = new TreeMap<>(); - MaelstromUpdate update = new MaelstromUpdate(); - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - String op = in.nextString(); - Key key = MaelstromKey.readKey(in); - switch (op) - { - default: throw new IllegalStateException("Invalid op: " + op); - case "r": - { - Value value = Value.read(in); - reads.put(key, value); - break; - } - case "append": - Datum value = Datum.read(in); - update.merge(key, new Value(value), Value::append); - } - in.endArray(); - } - in.endArray(); - - for (Key key : update.keySet()) - reads.putIfAbsent(key, null); - - Keys keys = new Keys(reads.navigableKeySet()); - Value[] values = reads.values().toArray(new Value[0]); - - return new MaelstromResult(client, requestId, keys, values, update); - } - -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java deleted file mode 100644 index 809f92cab..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import accord.messages.ReplyContext; - -public class MaelstromReplyContext implements ReplyContext -{ - public final long messageId; - - public MaelstromReplyContext(long messageId) - { - this.messageId = messageId; - } - - public static ReplyContext contextFor(long messageId) - { - return new MaelstromReplyContext(messageId); - } - - public static long messageIdFor(ReplyContext replyContext) - { - if (replyContext instanceof Packet) - return ((Packet) replyContext).body.msg_id; - return ((MaelstromReplyContext) replyContext).messageId; - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java deleted file mode 100644 index 95c770952..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.NavigableSet; -import java.util.TreeSet; - -import accord.api.Key; -import accord.messages.MessageType; -import accord.messages.ReplyContext; -import accord.primitives.Keys; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; -import accord.local.Node; -import accord.local.Node.Id; -import accord.primitives.Txn; -import accord.maelstrom.Packet.Type; -import accord.messages.Request; - -public class MaelstromRequest extends Body implements Request -{ - final Txn txn; - - public MaelstromRequest(long msg_id, Txn txn) - { - super(Type.txn, msg_id, SENTINEL_MSG_ID); - this.txn = txn; - } - - public void process(Node node, Id client, ReplyContext replyContext) - { - node.coordinate(txn).addCallback((success, fail) -> { - if (success != null) node.reply(client, replyContext, new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success)); -// else node.reply(client, messageId, new Error(messageId, 13, fail.getMessage())); - }); - } - - @Override - public MessageType type() - { - throw new UnsupportedOperationException(); - } - - @Override - void writeBody(JsonWriter out) throws IOException - { - super.writeBody(out); - out.name("txn"); - writeTxnExternal(out, txn); - } - - static void writeTxnExternal(JsonWriter out, Txn txn) throws IOException - { - if (txn == null) - { - out.nullValue(); - return; - } - - out.beginArray(); - Keys keys = (Keys)txn.keys(); - MaelstromRead read = (MaelstromRead) txn.read(); - MaelstromUpdate update = (MaelstromUpdate) txn.update(); - for (int i = 0 ; i < keys.size() ; ++i) - { - MaelstromKey.Key key = (MaelstromKey.Key) keys.get(i); - if (read.readKeys.indexOf(key) >= 0) - { - out.beginArray(); - out.value("r"); - key.datum.write(out); - out.nullValue(); - out.endArray(); - } - if (update.containsKey(key)) - { - out.beginArray(); - out.value("append"); - key.datum.write(out); - update.get(key).write(out); - out.endArray(); - } - } - out.endArray(); - } - - public static Txn readTxnExternal(JsonReader in, Node.Id client, long requestId) throws IOException - { - if (in.peek() == JsonToken.NULL) - return null; - - NavigableSet buildReadKeys = new TreeSet<>(); - NavigableSet buildKeys = new TreeSet<>(); - MaelstromUpdate update = new MaelstromUpdate(); - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - String op = in.nextString(); - Key key = MaelstromKey.readKey(in); - switch (op) - { - default: throw new IllegalStateException("Invalid op: " + op); - case "r": - in.nextNull(); - buildReadKeys.add(key); - break; - case "append": - Datum value = Datum.read(in); - buildKeys.add(key); - update.merge(key, new Value(value), Value::append); - } - in.endArray(); - } - in.endArray(); - - buildKeys.addAll(buildReadKeys); - Keys readKeys = new Keys(buildReadKeys); - Keys keys = new Keys(buildKeys); - MaelstromRead read = new MaelstromRead(readKeys, keys); - MaelstromQuery query = new MaelstromQuery(client, requestId); - - return new Txn.InMemory(keys, read, query, update); - } - -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java deleted file mode 100644 index 2ec35c88b..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.NavigableMap; -import java.util.TreeMap; - -import accord.local.Node; -import accord.local.Node.Id; -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; -import accord.api.Key; -import accord.api.Result; -import accord.primitives.Keys; - -public class MaelstromResult implements Result -{ - final Node.Id client; - final long requestId; - final Keys keys; - final Value[] read; - final MaelstromUpdate update; - - public MaelstromResult(Id client, long requestId, Keys keys, Value[] read, MaelstromUpdate update) - { - this.client = client; - this.requestId = requestId; - this.keys = keys; - this.read = read; - this.update = update; - } - - public static final TypeAdapter GSON_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Result value) throws IOException - { - if (value == null) - { - out.nullValue(); - return; - } - - MaelstromResult result = (MaelstromResult) value; - Keys keys = result.keys; - Value[] reads = result.read; - MaelstromUpdate update = result.update; - out.beginObject(); - out.name("r"); - out.beginArray(); - for (int i = 0 ; i < keys.size() ; ++i) - { - MaelstromKey key = (MaelstromKey) keys.get(i); - if (reads[i] != null) - { - out.beginArray(); - key.datum.write(out); - reads[i].write(out); - out.endArray(); - } - } - out.endArray(); - out.name("append"); - out.beginArray(); - for (int i = 0 ; i < keys.size() ; ++i) - { - MaelstromKey key = (MaelstromKey) keys.get(i); - if (update != null && update.containsKey(key)) - { - out.beginArray(); - key.datum.write(out); - update.get(key).write(out); - out.endArray(); - } - } - out.endArray(); - out.name("client"); - out.value(result.client.id); - out.name("requestId"); - out.value(result.requestId); - out.endObject(); - } - - @Override - public Result read(JsonReader in) throws IOException - { - if (in.peek() == JsonToken.NULL) - return null; - - Node.Id client = null; - long requestId = Long.MIN_VALUE; - NavigableMap reads = new TreeMap<>(); - MaelstromUpdate update = new MaelstromUpdate(); - in.beginObject(); - while (in.hasNext()) - { - String kind = in.nextName(); - switch (kind) - { - default: throw new IllegalStateException("Invalid kind: " + kind); - case "r": - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - Key key = MaelstromKey.readKey(in); - Value value = Value.read(in); - reads.put(key, value); - in.endArray(); - } - in.endArray(); - break; - case "append": - in.beginArray(); - while (in.hasNext()) - { - in.beginArray(); - Key key = MaelstromKey.readKey(in); - Value append = Value.read(in); - update.put(key, append); - in.endArray(); - } - in.endArray(); - break; - case "client": - client = Json.ID_ADAPTER.read(in); - break; - case "requestId": - requestId = in.nextLong(); - break; - } - } - in.endObject(); - - if (client == null) - throw new IllegalStateException(); - - for (Key key : update.keySet()) - reads.putIfAbsent(key, null); - - Keys keys = new Keys(reads.navigableKeySet()); - Value[] values = reads.values().toArray(new Value[0]); - return new MaelstromResult(client, requestId, keys, values, update); - } - }; -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java deleted file mode 100644 index 013db36d4..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import accord.api.Key; -import accord.api.DataStore; -import accord.local.Node; -import accord.local.SafeCommandStore; -import accord.primitives.Ranges; -import accord.primitives.SyncPoint; -import accord.utils.Timestamped; -import accord.utils.async.AsyncResults.SettableResult; - -public class MaelstromStore implements DataStore -{ - final Map> data = new ConcurrentHashMap<>(); - - public Value read(Key key) - { - Timestamped v = data.get(key); - return v == null ? Value.EMPTY : v.data; - } - - public Value get(Key key) - { - Timestamped v = data.get(key); - return v == null ? Value.EMPTY : v.data; - } - - static class ImmediateFetchResult extends SettableResult implements FetchResult - { - ImmediateFetchResult(Ranges ranges) { setSuccess(ranges); } - @Override public void abort(Ranges abort) { } - } - - @Override - public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback) - { - return new ImmediateFetchResult(ranges); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java deleted file mode 100644 index a89eacae6..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.util.Map; -import java.util.TreeMap; - -import accord.api.Key; -import accord.api.Data; -import accord.api.Update; -import accord.primitives.Ranges; -import accord.primitives.Keys; -import accord.primitives.Timestamp; - -public class MaelstromUpdate extends TreeMap implements Update -{ - @Override - public Keys keys() - { - return new Keys(navigableKeySet()); - } - - @Override - public MaelstromWrite apply(Timestamp executeAt, Data read) - { - MaelstromWrite write = new MaelstromWrite(); - Map data = (MaelstromData)read; - for (Map.Entry e : entrySet()) - write.put(e.getKey(), data.get(e.getKey()).append(e.getValue())); - return write; - } - - @Override - public Update slice(Ranges ranges) - { - MaelstromUpdate result = new MaelstromUpdate(); - for (Map.Entry e : entrySet()) - { - if (ranges.contains(e.getKey())) - result.put(e.getKey(), e.getValue()); - } - return result; - } - - @Override - public Update merge(Update other) - { - MaelstromUpdate result = new MaelstromUpdate(); - result.putAll(this); - result.putAll((MaelstromUpdate) other); - return result; - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java deleted file mode 100644 index 55a14f211..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import accord.api.Key; -import accord.api.DataStore; -import accord.api.Write; -import accord.local.SafeCommandStore; -import accord.primitives.Seekable; -import accord.primitives.Timestamp; -import accord.primitives.Writes; -import accord.utils.Timestamped; -import accord.utils.async.AsyncChain; - -import java.util.TreeMap; - -public class MaelstromWrite extends TreeMap implements Write -{ - @Override - public AsyncChain apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store) - { - MaelstromStore s = (MaelstromStore) store; - if (containsKey(key)) - s.data.merge((Key)key, new Timestamped<>(executeAt, get(key)), Timestamped::merge); - return Writes.SUCCESS; - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java deleted file mode 100644 index f773987c7..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintStream; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongSupplier; -import java.util.function.Supplier; - -import accord.coordinate.Timeout; -import accord.impl.SimpleProgressLog; -import accord.impl.InMemoryCommandStores; -import accord.impl.SizeOfIntersectionSorter; -import accord.local.AgentExecutor; -import accord.local.Node; -import accord.local.Node.Id; -import accord.api.Scheduler; -import accord.local.ShardDistributor; -import accord.messages.ReplyContext; -import accord.topology.Topology; -import accord.utils.DefaultRandom; -import accord.utils.ThreadPoolScheduler; -import accord.maelstrom.Packet.Type; -import accord.api.MessageSink; -import accord.messages.Callback; -import accord.messages.Reply; -import accord.messages.Request; - -import static accord.utils.async.AsyncChains.awaitUninterruptibly; - -public class Main -{ - static class CallbackInfo - { - final Callback callback; - final Id to; - final long timeout; - - CallbackInfo(Callback callback, Id to, long timeout) - { - this.callback = callback; - this.to = to; - this.timeout = timeout; - } - } - - public static class StdoutSink implements MessageSink - { - private final AtomicLong nextMessageId = new AtomicLong(1); - private final Map callbacks = new ConcurrentHashMap<>(); - - final LongSupplier nowSupplier; - final Scheduler scheduler; - final long start; - final Id self; - final PrintStream out, err; - - public StdoutSink(LongSupplier nowSupplier, Scheduler scheduler, long start, Id self, PrintStream stdout, PrintStream stderr) - { - this.nowSupplier = nowSupplier; - this.scheduler = scheduler; - this.start = start; - this.self = self; - this.out = stdout; - this.err = stderr; - this.scheduler.recurring(() -> { - long now = nowSupplier.getAsLong(); - callbacks.forEach((messageId, info) -> { - if (info.timeout < now && callbacks.remove(messageId, info)) - info.callback.onFailure(info.to, new Timeout(null, null)); - }); - }, 1L, TimeUnit.SECONDS); - } - - private void send(Packet packet) - { - err.println("Sending " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + packet); - err.flush(); - out.println(packet); - out.flush(); - } - - public synchronized void send(Id to, Body body) - { - send(new Packet(self, to, body)); - } - - @Override - public synchronized void send(Id to, Request send) - { - send(new Packet(self, to, Body.SENTINEL_MSG_ID, send)); - } - - @Override - public void send(Id to, Request send, AgentExecutor ignored, Callback callback) - { - // Executor is ignored due to the fact callbacks are applied in a single thread already - long messageId = nextMessageId.incrementAndGet(); - callbacks.put(messageId, new CallbackInfo(callback, to, nowSupplier.getAsLong() + 1000L)); - send(new Packet(self, to, messageId, send)); - } - - @Override - public void reply(Id replyToNode, ReplyContext replyContext, Reply reply) - { - send(new Packet(self, replyToNode, MaelstromReplyContext.messageIdFor(replyContext), reply)); - } - } - - public static void listen(TopologyFactory topologyFactory, InputStream stdin, PrintStream out, PrintStream err) throws IOException - { - try (BufferedReader in = new BufferedReader(new InputStreamReader(stdin))) - { - listen(topologyFactory, () -> { - try - { - return in.readLine(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }, out, err); - } - } - - public static void listen(TopologyFactory topologyFactory, Supplier in, PrintStream out, PrintStream err) throws IOException - { - long start = System.nanoTime(); - err.println("Starting..."); - err.flush(); - ThreadPoolScheduler scheduler = new ThreadPoolScheduler(); - Node on; - Topology topology; - StdoutSink sink; - { - String line = in.get(); - err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line); - err.flush(); - Packet packet = Json.GSON.fromJson(line, Packet.class); - MaelstromInit init = (MaelstromInit) packet.body; - topology = topologyFactory.toTopology(init.cluster); - sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err); - on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis, - MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), - MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new); - awaitUninterruptibly(on.start()); - err.println("Initialized node " + init.self); - err.flush(); - sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id)); - } - try - { - while (true) - { - String line = in.get(); - if (line == null) - { - err.println("Received EOF; terminating"); - err.flush(); - scheduler.stop(); - err.println("Terminated"); - err.flush(); - return; - } - err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line); - err.flush(); - Packet next = Packet.parse(line); - switch (next.body.type) - { - case txn: - on.receive((MaelstromRequest)next.body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id)); - break; - default: - if (next.body.in_reply_to > Body.SENTINEL_MSG_ID) - { - Reply reply = (Reply)((Wrapper)next.body).body; - CallbackInfo callback = sink.callbacks.remove(next.body.in_reply_to); - if (callback != null) - scheduler.now(() -> { - try - { - callback.callback.onSuccess(next.src, reply); - } - catch (Throwable t) - { - callback.callback.onCallbackFailure(next.src, t); - } - }); - } - else on.receive((Request)((Wrapper)next.body).body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id)); - } - } - } - finally - { - on.shutdown(); - } - } - - public static void main(String[] args) throws IOException - { - listen(new TopologyFactory(64, 3), System.in, System.out, System.err); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java b/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java deleted file mode 100644 index 728430a4b..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import accord.messages.*; -import com.google.gson.JsonObject; -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; -import accord.local.Node.Id; - -public class Packet implements ReplyContext -{ - - public enum Type - { - init(MaelstromInit.class, MaelstromInit.GSON_ADAPTER), - init_ok(Body.class, Body.GSON_ADAPTER), - txn(MaelstromRequest.class, MaelstromRequest.GSON_ADAPTER), - txn_ok(MaelstromReply.class, MaelstromReply.GSON_ADAPTER), - error(Error.class, Error.GSON_ADAPTER), - PreAccept(accord.messages.PreAccept.class), - PreAcceptOk(accord.messages.PreAccept.PreAcceptOk.class), - PreAcceptNack(accord.messages.PreAccept.PreAcceptNack.class), - Accept(accord.messages.Accept.class), - AcceptReply(accord.messages.Accept.AcceptReply.class), - Commit(accord.messages.Commit.class), - Apply(Apply.class), - ApplyReply(Apply.ApplyReply.class), - Read(ReadData.class), - ReadOk(ReadData.ReadOk.class), - ReadNack(ReadData.ReadNack.class), - WaitOnCommit(accord.messages.WaitOnCommit.class), - WaitOnCommitOk(accord.messages.WaitOnCommit.WaitOnCommitOk.class), - Recover(BeginRecovery.class), - RecoverOk(BeginRecovery.RecoverOk.class), - RecoverNack(BeginRecovery.RecoverNack.class), - CheckStatus(CheckStatus.class), - CheckStatusOk(CheckStatus.CheckStatusOk.class), - CheckStatusOkFull(CheckStatus.CheckStatusOkFull.class), - InformOfTxnId(InformOfTxnId.class, Json.DEFAULT_ADAPTER), - SimpleReply(accord.messages.SimpleReply.class, Json.DEFAULT_ADAPTER), - ; - - private static final Map, Type> LOOKUP_MAP = Arrays.stream(Type.values()) - .filter(t -> t.type != null) - ., Type>>collect(HashMap::new, (m, t) -> m.put(t.type, t), Map::putAll); - - public final Class type; - public final TypeAdapter adapter; - - Type(Class type, TypeAdapter adapter) - { - this.type = type; - this.adapter = adapter; - } - - Type(Class type) - { - this(type, Json.DEFAULT_ADAPTER); - } - - public static Type lookup(Class klass) - { - Type value = LOOKUP_MAP.get(klass); - if (value == null) - throw new NullPointerException("Unable to lookup for class " + klass); - return value; - } - } - - final Id src; - final Id dest; - final Body body; - - public Packet(Id src, Id dest, Body body) - { - this.src = src; - this.dest = dest; - this.body = body; - } - - public Packet(Id src, Id dest, long messageId, Request body) - { - this.src = src; - this.dest = dest; - this.body = new Wrapper(Type.lookup(body.getClass()), messageId, Body.SENTINEL_MSG_ID, body); - } - - public Packet(Id src, Id dest, long replyId, Reply body) - { - this.src = src; - this.dest = dest; - this.body = body instanceof Body ? (Body) body : new Wrapper(Type.lookup(body.getClass()), Body.SENTINEL_MSG_ID, replyId, body); - } - - public static Packet parse(String str) - { - return Json.GSON.fromJson(str, Packet.class); - } - - public String toString() - { - return Json.GSON.toJson(this); - } - - public static final TypeAdapter GSON_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Packet value) throws IOException - { - out.beginObject(); - out.name("src"); - out.value(value.src.id); - out.name("dest"); - out.value(value.dest.id); - out.name("body"); - Json.GSON.toJson(value.body, Body.class, out); - out.endObject(); - } - - @Override - public Packet read(JsonReader in) throws IOException - { - in.beginObject(); - Id src = null, dest = null; - Body body = null; - String deferredBody = null; - while (in.hasNext()) - { - String field = in.nextName(); - switch (field) - { - case "src": src = Json.ID_ADAPTER.read(in); break; - case "dest": dest = Json.ID_ADAPTER.read(in); break; - case "body": - if (src == null) deferredBody = Json.GSON.fromJson(in, JsonObject.class).toString(); - else body = Body.read(in, src); - break; - case "id": in.nextLong(); break; - default: - throw new IllegalStateException("Unexpected field " + field); - } - } - in.endObject(); - if (body == null && deferredBody != null) - body = Body.read(new JsonReader(new StringReader(deferredBody)), src); - return new Packet(src, dest, body); - } - }; - -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java deleted file mode 100644 index 28149c902..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import accord.api.ConfigurationService; -import accord.topology.Topology; - -public class SimpleConfigService implements ConfigurationService -{ - private final Topology topology; - - public SimpleConfigService(Topology topology) - { - this.topology = topology; - } - - @Override - public void registerListener(Listener listener) - { - - } - - @Override - public Topology currentTopology() - { - return topology; - } - - @Override - public Topology getTopologyForEpoch(long epoch) - { - assert epoch == topology.epoch(); - return topology; - } - - @Override - public void fetchTopologyForEpoch(long epoch) - { - return; - } - - @Override - public void acknowledgeEpoch(EpochReady ready) - { - - } -} - diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java deleted file mode 100644 index 71bef1d99..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import accord.primitives.Range; -import accord.local.Node.Id; -import accord.maelstrom.Datum.Kind; -import accord.topology.Shard; -import accord.topology.Topology; -import accord.utils.WrapAroundList; -import accord.utils.WrapAroundSet; - -import static accord.utils.Utils.toArray; - -public class TopologyFactory -{ - final int shards; - final int rf; - final Kind[] kinds; - final Range[][] ranges; - - public TopologyFactory(int shards, int rf) - { - this.shards = shards; - this.rf = rf; - this.kinds = Datum.COMPARE_BY_HASH ? new Kind[] { Kind.HASH } : new Kind[] { Kind.STRING, Kind.LONG, Kind.DOUBLE }; - this.ranges = new MaelstromKey.Range[kinds.length][shards]; - for (int i = 0 ; i < kinds.length ; ++i) - { - Kind kind = kinds[i]; - MaelstromKey.Routing[] starts = kind.split(shards); - MaelstromKey.Routing[] ends = new MaelstromKey.Routing[shards]; - System.arraycopy(starts, 1, ends, 0, shards - 1); - ends[shards - 1] = new MaelstromKey.Routing(kind, null); - this.ranges[i] = new MaelstromKey.Range[shards]; - for (int j=0; j lookup = new HashMap<>(); - for (int i = 0 ; i < cluster.length ; ++i) - lookup.put(cluster[i], i); - - List> electorates = new ArrayList<>(); - List> fastPathElectorates = new ArrayList<>(); - - for (int i = 0 ; i < cluster.length + rf - 1 ; ++i) - { - WrapAroundList electorate = new WrapAroundList<>(cluster, i % cluster.length, (i + rf) % cluster.length); - Set fastPathElectorate = new WrapAroundSet<>(lookup, electorate); - electorates.add(electorate); - fastPathElectorates.add(fastPathElectorate); - } - - final List shards = new ArrayList<>(); - for (int j = 0 ; j < kinds.length ; ++j) - { - for (int i = 0 ; i < this.shards ; ++i) - shards.add(new Shard(ranges[j][i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size()))); - } - return new Topology(1, toArray(shards, Shard[]::new)); - } -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Value.java b/accord-maelstrom/src/main/java/accord/maelstrom/Value.java deleted file mode 100644 index 47e36a7ff..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Value.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; - -import com.google.gson.TypeAdapter; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; - -import static accord.utils.Utils.toArray; - -public class Value -{ - public static final Value EMPTY = new Value(); - - final Datum[] contents; - - private Value() - { - this.contents = new Datum[0]; - } - - public Value(Datum contents) - { - this.contents = new Datum[] { contents }; - } - - public Value(Datum[] contents) - { - this.contents = contents; - } - - public Value append(Datum datum) - { - Datum[] contents = Arrays.copyOf(this.contents, this.contents.length + 1); - contents[contents.length - 1] = datum; - return new Value(contents); - } - - public Value append(Value data) - { - Datum[] contents = Arrays.copyOf(this.contents, this.contents.length + data.contents.length); - for (int i = contents.length - data.contents.length ; i < contents.length ; ++i) - contents[i] = data.contents[data.contents.length + i - contents.length]; - return new Value(contents); - } - - public void write(JsonWriter out) throws IOException - { - if (contents.length == 1 && contents[0].isSimple()) - { - contents[0].write(out); - } - else - { - out.beginArray(); - for (Datum datum : contents) - datum.write(out); - out.endArray(); - } - } - - public void writeVerbose(JsonWriter out) throws IOException - { - out.beginArray(); - for (Datum datum : contents) - datum.write(out); - out.endArray(); - } - - public static Value read(JsonReader in) throws IOException - { - return read(in, Value::new); - } - - protected static V read(JsonReader in, Function constructor) throws IOException - { - if (in.peek() == JsonToken.NULL) - { - in.nextNull(); - return null; - } - - if (in.peek() == JsonToken.BEGIN_ARRAY) - { - List buffer = new ArrayList<>(); - in.beginArray(); - while (in.hasNext()) - buffer.add(Datum.read(in)); - in.endArray(); - return constructor.apply(toArray(buffer, Datum[]::new)); - } - - return constructor.apply(new Datum[] { Datum.read(in) }); - } - - public static final TypeAdapter GSON_ADAPTER = new TypeAdapter() - { - @Override - public void write(JsonWriter out, Value value) throws IOException - { - if (value == null) out.nullValue(); - else value.write(out); - } - - @Override - public Value read(JsonReader in) throws IOException - { - return Value.read(in); - } - }; -} diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Wrapper.java b/accord-maelstrom/src/main/java/accord/maelstrom/Wrapper.java deleted file mode 100644 index 9b3571a7c..000000000 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Wrapper.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; - -import com.google.gson.stream.JsonWriter; -import accord.maelstrom.Packet.Type; - -public class Wrapper extends Body -{ - final Object body; - - public Wrapper(Type type, long msg_id, long in_reply_to, Object body) - { - super(type, msg_id, in_reply_to); - this.body = body; - } - - @Override - void writeBody(JsonWriter out) throws IOException - { - super.writeBody(out); - out.name("body"); - Json.GSON.toJson(body, type.type, out); - } -} diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/JsonTest.java b/accord-maelstrom/src/test/java/accord/maelstrom/JsonTest.java deleted file mode 100644 index af73e2451..000000000 --- a/accord-maelstrom/src/test/java/accord/maelstrom/JsonTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import org.junit.jupiter.api.Test; - -import accord.api.Key; -import accord.primitives.Deps; -import accord.primitives.Range; -import accord.primitives.RangeDeps; -import accord.utils.AccordGens; -import accord.utils.Gen; -import accord.utils.Gens; -import org.assertj.core.api.Assertions; - -import static accord.utils.Property.qt; - -class JsonTest -{ - @Test - void serdeDeps() - { - qt().forAll(depsGen()).check(JsonTest::serde); - } - - private static void serde(T expected) - { - String json = Json.GSON.toJson(expected); - T parsed; - try - { - parsed = Json.GSON.fromJson(json, expected.getClass()); - } - catch (Throwable t) - { - throw new AssertionError("Unable to parse json: " + json, t); - } - Assertions.assertThat(parsed) - .isEqualTo(expected); - } - - static Gen keyGen() - { - Gen kindGen = Gens.enums().all(Datum.Kind.class); - Gen strings = Gens.strings().all().ofLengthBetween(0, 10); - return rs -> { - Datum.Kind next = kindGen.next(rs); - switch (next) - { - case STRING: return new MaelstromKey.Key(Datum.Kind.STRING, strings.next(rs)); - case LONG: return new MaelstromKey.Key(Datum.Kind.LONG, rs.nextLong()); - case HASH: return new MaelstromKey.Key(Datum.Kind.HASH, new Datum.Hash(rs.nextInt())); - case DOUBLE: return new MaelstromKey.Key(Datum.Kind.DOUBLE, rs.nextDouble()); - default: throw new AssertionError("Unknown kind: " + next); - } - }; - } - - static Gen rangeGen() - { - return AccordGens.ranges(keyGen().map(Key::toUnseekable), MaelstromKey.Range::new); - } - - static Gen rangeDepsGen() - { - return AccordGens.rangeDeps(rangeGen()); - } - - static Gen depsGen() - { - return AccordGens.deps(AccordGens.keyDeps(keyGen()), rangeDepsGen()); - } - -} \ No newline at end of file diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java deleted file mode 100644 index ab0053d84..000000000 --- a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.PriorityQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import accord.local.Node.Id; -import accord.maelstrom.Cluster.Queue; -import accord.maelstrom.Cluster.QueueSupplier; -import accord.utils.DefaultRandom; -import accord.utils.RandomSource; - -import static accord.utils.Utils.toArray; - -public class Runner -{ - private static final Logger logger = LoggerFactory.getLogger(Runner.class); - - static class StandardQueue implements Queue - { - static class Factory implements QueueSupplier - { - final RandomSource seeds; - - Factory(RandomSource seeds) - { - this.seeds = seeds; - } - - @Override - public Queue get() - { - return new StandardQueue<>(seeds.fork()); - } - } - - static class Item implements Comparable> - { - final long time; - final int seq; - final T item; - - Item(long time, int seq, T item) - { - this.time = time; - this.seq = seq; - this.item = item; - } - - @Override - public int compareTo(Item that) - { - int c = Long.compare(this.time, that.time); - if (c == 0) c = Integer.compare(this.seq, that.seq); - return c; - } - } - - final PriorityQueue> queue = new PriorityQueue<>(); - final RandomSource random; - long now; - int seq; - - StandardQueue(RandomSource random) - { - this.random = random; - } - - @Override - public synchronized void add(T item) - { - add(item, random.nextInt(500), TimeUnit.MILLISECONDS); - } - - @Override - public synchronized void add(T item, long delay, TimeUnit units) - { - queue.add(new Item<>(now + units.toMillis(delay), seq++, item)); - } - - @Override - public synchronized T poll() - { - Item item = queue.poll(); - if (item == null) - return null; - now = item.time; - return item.item; - } - - @Override - public synchronized int size() - { - return queue.size(); - } - } - - static void run(int nodeCount, QueueSupplier queueSupplier, Supplier randomSupplier, TopologyFactory factory, String ... commands) throws IOException - { - run(nodeCount, queueSupplier, randomSupplier, factory, new Supplier() - { - int i = 0; - @Override - public Packet get() - { - return i == commands.length ? null : Packet.parse(commands[i++]); - } - }); - } - - static void run(int nodeCount, QueueSupplier queueSupplier, Supplier randomSupplier, TopologyFactory factory, Supplier commands) throws IOException - { - List nodes = new ArrayList<>(); - for (int i = 1 ; i <= nodeCount ; ++i) - nodes.add(new Id(i)); - - Cluster.run(toArray(nodes, Id[]::new), queueSupplier, ignore -> {}, randomSupplier, () -> new AtomicLong()::incrementAndGet, factory, commands, System.err); - } - - public static Builder test() - { - return new Builder(Thread.currentThread().getStackTrace()[2].getMethodName()); - } - - public static class Builder - { - public static final String ACCORD_TEST_SEED = "accord.test.%s.seed"; - - private final String key; - private long seed; - private int nodeCount = 3; - private TopologyFactory factory = new TopologyFactory(4, 3); - - private Builder(String name) - { - key = String.format(ACCORD_TEST_SEED, name); - String userSeed = System.getProperty(key, null); - seed = userSeed != null ? Long.parseLong(userSeed) : System.nanoTime(); - } - - Builder seed(long seed) - { - this.seed = seed; - return this; - } - - Builder nodeCount(int nodeCount) - { - this.nodeCount = nodeCount; - return this; - } - - Builder factory(TopologyFactory factory) - { - this.factory = factory; - return this; - } - - void run(String ... commands) throws IOException - { - logger.info("Seed {}; rerun with -D{}={}", seed, key, seed); - RandomSource randomSource = new DefaultRandom(seed); - try - { - Runner.run(nodeCount, new StandardQueue.Factory(randomSource), randomSource::fork, factory, commands); - } - catch (Throwable t) - { - throw new AssertionError("Failure for seed " + seed, t); - } - } - } -} diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java deleted file mode 100644 index a65fd2348..000000000 --- a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.maelstrom; - -import java.io.IOException; - -import org.junit.jupiter.api.Test; - -import static accord.maelstrom.Runner.test; - -// TODO (correctness) : if you run the tests with the same seed, you get different outcomes... this makes it hard to rerun a failure found from CI -public class SimpleRandomTest -{ - @Test - public void testLaunch() throws IOException - { - test().run(); - } - - @Test - public void testEmptyRead() throws IOException - { - test().run("{\"src\":\"c1\",\"dest\":\"n1\",\"body\":{\"type\":\"txn\",\"msg_id\":1,\"txn\":" - + "[[\"r\", 1, null]]}}"); - } - - @Test - public void testReadAndWrite() throws IOException - { - test().run("{\"src\":\"c1\",\"dest\":\"n1\",\"body\":{\"type\":\"txn\",\"msg_id\":1,\"txn\":" - + "[[\"r\", 1, null],[\"append\", 1, 1]]}}", - "{\"src\":\"c1\",\"dest\":\"n1\",\"body\":{\"type\":\"txn\",\"msg_id\":1,\"txn\":" - + "[[\"r\", 1, null],[\"append\", 1, 2]]}}", - "{\"src\":\"c1\",\"dest\":\"n1\",\"body\":{\"type\":\"txn\",\"msg_id\":1,\"txn\":" - + "[[\"r\", 1, null]]}}"); - } - - @Test - public void testReadAndWriteRandomMultiKey() throws IOException - { - test().nodeCount(5).run("{\"src\":\"c1\",\"dest\":\"n1\",\"body\":{\"type\":\"txn\",\"msg_id\":1,\"txn\":" - + "[[\"r\", 1, null],[\"r\", 2, null],[\"r\", 3, null],[\"append\", 1, 1],[\"append\", 2, 3]]}}", - "{\"src\":\"c1\",\"dest\":\"n2\",\"body\":{\"type\":\"txn\",\"msg_id\":2,\"txn\":" - + "[[\"r\", 1, null],[\"r\", 2, null],[\"r\", 3, null],[\"append\", 1, 2],[\"append\", 3, 3]]}}", - "{\"src\":\"c1\",\"dest\":\"n3\",\"body\":{\"type\":\"txn\",\"msg_id\":3,\"txn\":" - + "[[\"r\", 1, null],[\"r\", 2, null],[\"r\", 3, null],[\"append\", 1, 3],[\"append\", 2, 2]]}}", - "{\"src\":\"c1\",\"dest\":\"n1\",\"body\":{\"type\":\"txn\",\"msg_id\":4,\"txn\":" - + "[[\"r\", 1, null],[\"r\", 2, null],[\"r\", 3, null],[\"append\", 1, 4],[\"append\", 3, 2]]}}", - "{\"src\":\"c1\",\"dest\":\"n2\",\"body\":{\"type\":\"txn\",\"msg_id\":5,\"txn\":" - + "[[\"r\", 1, null],[\"r\", 2, null],[\"r\", 3, null],[\"append\", 1, 5],[\"append\", 2, 1]]}}", - "{\"src\":\"c1\",\"dest\":\"n3\",\"body\":{\"type\":\"txn\",\"msg_id\":6,\"txn\":" - + "[[\"r\", 1, null],[\"r\", 2, null],[\"r\", 3, null],[\"append\", 1, 6],[\"append\", 3, 1]]}}"); - } -} diff --git a/settings.gradle b/settings.gradle index 47010e264..ab07489cb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,4 +18,3 @@ rootProject.name = 'accord' include 'accord-core' -include 'accord-maelstrom'