Skip to content

Commit

Permalink
Implement topic subscriptions & bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
Alemiz112 committed Nov 5, 2023
1 parent b8554ac commit 623e1f8
Show file tree
Hide file tree
Showing 23 changed files with 266 additions and 39 deletions.
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>vortex-parent</artifactId>
<groupId>alemiz.stargate.vortex</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Alemiz
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package alemiz.stargate.vortex.common.node;

import alemiz.stargate.StarGateSession;
import alemiz.stargate.vortex.common.protocol.packet.VortexTopicSubscribePacket;

public abstract class VortexAbstractClientNode extends VortexNode implements ClientSideNode {

public VortexAbstractClientNode(StarGateSession session, VortexNodeOwner vortexParent) {
super(session, vortexParent);
}

@Override
protected void subscribe0(String topic) {
VortexTopicSubscribePacket packet = new VortexTopicSubscribePacket();
packet.setTopic(topic.trim());
this.sendPacket(packet);
}

@Override
protected void unsubscribe0(String topic) {
VortexTopicSubscribePacket packet = new VortexTopicSubscribePacket();
packet.setTopic(topic.trim());
packet.setUnsubscribe(true);
this.sendPacket(packet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ protected void deinitialize0() {

@Override
protected boolean onMessagePacket(VortexMessagePacket packet) {
if (!packet.getTopic().isEmpty()) {
Collection<VortexNode> nodes = this.getVortexParent().getVortexNodes(packet.getTopic());
if (nodes == null) {
return false;
}

for (VortexNode node : nodes) {
if (node != this) {
node.sendPacket(packet);
}
}
return true;
}

if (packet.getTargetNode().isEmpty()) {
for (VortexMasterNode masterNode : this.masterNodes.values()) {
masterNode.sendPacket(packet);
Expand Down Expand Up @@ -118,4 +132,14 @@ public void setPrimaryMasterNode(String primaryMasterNode) {
public VortexServerNodeOwner getVortexParent() {
return (VortexServerNodeOwner) super.getVortexParent();
}

@Override
protected void subscribe0(String topic) {
this.getVortexParent().onNodeSubscribe(this, topic);
}

@Override
protected void unsubscribe0(String topic) {
this.getVortexParent().onNodeUnsubscribe(this, topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import alemiz.stargate.vortex.common.protocol.packet.VortexMessagePacket;
import alemiz.stargate.vortex.common.protocol.packet.VortexPacket;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -42,7 +43,21 @@ protected void deinitialize0() {

@Override
protected boolean onMessagePacket(VortexMessagePacket packet) {
// Broadcast any message recived from master to all child nodes
if (!packet.getTopic().isEmpty()) {
Collection<VortexNode> nodes = this.getVortexParent().getVortexNodes(packet.getTopic());
if (nodes == null) {
return false;
}

for (VortexNode node : nodes) {
if (node != this) {
node.sendPacket(packet);
}
}
return true;
}

// Broadcast any message received from master to all child nodes
for (VortexNode vortexNode : this.childNodes.values()) {
vortexNode.sendPacket(packet);
}
Expand Down Expand Up @@ -80,6 +95,16 @@ public void unregisterChildNode(VortexNode node) {
}
}

@Override
protected void subscribe0(String topic) {
this.getVortexParent().onNodeSubscribe(this, topic);
}

@Override
protected void unsubscribe0(String topic) {
this.getVortexParent().onNodeUnsubscribe(this, topic);
}

public VortexNode getChildNode(String name) {
return this.childNodes.get(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import alemiz.stargate.vortex.common.pipeline.VortexPacketDecoder;
import alemiz.stargate.vortex.common.pipeline.VortexPacketEncoder;
import alemiz.stargate.vortex.common.pipeline.VortexPipelineTail;
import alemiz.stargate.vortex.common.protocol.packet.VortexLatencyPacket;
import alemiz.stargate.vortex.common.protocol.packet.VortexMessagePacket;
import alemiz.stargate.vortex.common.protocol.packet.VortexPacket;
import alemiz.stargate.vortex.common.protocol.packet.*;
import alemiz.stargate.vortex.common.protocol.VortexPacketListener;
import alemiz.stargate.vortex.common.protocol.packet.VortexResponse;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultPromise;
Expand All @@ -37,7 +34,10 @@
import lombok.extern.log4j.Log4j2;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -64,6 +64,8 @@ public abstract class VortexNode extends SimpleChannelInboundHandler<VortexPacke
private final AtomicInteger responseIdAllocator = new AtomicInteger(0);
private final Long2ObjectMap<ResponseHandle> pendingResponses = new Long2ObjectOpenHashMap<>();

private final Set<String> subscribedTopics = Collections.newSetFromMap(new ConcurrentHashMap<>());

private volatile boolean closed = false;

public VortexNode(StarGateSession session, VortexNodeOwner vortexParent) {
Expand Down Expand Up @@ -144,6 +146,14 @@ protected boolean handleInternal(VortexPacket packet) {
if (packet instanceof VortexLatencyPacket) {
this.onPing((VortexLatencyPacket) packet);
return true;
} else if (packet instanceof VortexTopicSubscribePacket && this instanceof ServerSideNode) {
VortexTopicSubscribePacket subscribe = (VortexTopicSubscribePacket) packet;
if (subscribe.isUnsubscribe()) {
this.unsubscribe(subscribe.getTopic());
} else {
this.subscribe(subscribe.getTopic());
}
return true;
}
return false;
}
Expand Down Expand Up @@ -213,6 +223,9 @@ public Promise<VortexResponse> sendResponsePacket(VortexResponse packet) {

public void sendPacket(VortexPacket packet) {
if (!this.closed && this.session.getChannel().isActive()) {
if (packet instanceof VortexMessagePacket && ((VortexMessagePacket) packet).getSenderNode() == null) {
((VortexMessagePacket) packet).setSenderNode(this.getNodeName());
}
this.session.getChannel().writeAndFlush(packet).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}
}
Expand Down Expand Up @@ -240,6 +253,25 @@ public Promise<VortexResponse> getResponsePromise(int responseId) {
return null;
}

public final void subscribe(String topic) {
boolean success = this.subscribedTopics.add(topic);
if (success) {
this.subscribe0(topic);
}

}

protected abstract void subscribe0(String topic);

public final void unsubscribe(String topic) {
boolean success = this.subscribedTopics.remove(topic);
if (success) {
this.unsubscribe0(topic);
}
}

protected abstract void unsubscribe0(String topic);

public boolean isClosed() {
return this.closed;
}
Expand Down Expand Up @@ -269,4 +301,8 @@ public InetSocketAddress getAddress() {
public VortexNodeOwner getVortexParent() {
return this.vortexParent;
}

public Set<String> getSubscribedTopics() {
return Collections.unmodifiableSet(this.subscribedTopics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@
package alemiz.stargate.vortex.common.node;

import java.net.InetSocketAddress;
import java.util.Collection;

public interface VortexServerNodeOwner extends VortexNodeOwner {

VortexNode getVortexNode(String nodeName);

VortexNode getVortexNode(InetSocketAddress address);

Collection<VortexNode> getVortexNodes(String topic);

void onNodeSubscribe(VortexNode node, String topic);

void onNodeUnsubscribe(VortexNode node, String topic);

VortexNodeListener getNodeListener();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class VortexPacketPool {

public static final short VORTEX_LATENCY_PACKET = 0;
public static final short VORTEX_CHILD_INFO_PACKET = 1;
public static final short VORTEX_TOPIC_SUBSCRIBE_PACKET = 3;

private final Short2ObjectMap<PacketFactory> packetFactoryMap = new Short2ObjectOpenHashMap<>();
private final Object2ShortMap<Class<? extends VortexPacket>> packetIdMap = new Object2ShortOpenHashMap<>();
Expand All @@ -37,6 +38,7 @@ public VortexPacketPool() {
// Register default packets
this.registerPacket(VortexLatencyPacket.class, VORTEX_LATENCY_PACKET, VortexLatencyPacket::new);
this.registerPacket(VortexChildInfoPacket.class, VORTEX_CHILD_INFO_PACKET, VortexChildInfoPacket::new);
this.registerPacket(VortexTopicSubscribePacket.class, VORTEX_TOPIC_SUBSCRIBE_PACKET, VortexTopicSubscribePacket::new);
}

public <T extends VortexPacket> VortexPacket constructPacket(Class<T> packetClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public abstract class VortexMessagePacket implements VortexPacket {
*/
private String targetNode = "";

/**
* Name of the topic which should receive the message.
* This will be used only if targetNode is empty.
*/
private String topic = "";

/**
* Name of the node which sends the message.
*/
Expand All @@ -43,6 +49,7 @@ public abstract class VortexMessagePacket implements VortexPacket {
@Override
public final void encodePayload(ByteBuf buffer) {
PacketHelper.writeString(buffer, this.targetNode);
PacketHelper.writeString(buffer, this.topic);
PacketHelper.writeString(buffer, this.senderNode);
this.encode(buffer);
}
Expand All @@ -52,6 +59,7 @@ public final void encodePayload(ByteBuf buffer) {
@Override
public final void decodePayload(ByteBuf buffer) {
this.targetNode = PacketHelper.readString(buffer);
this.topic = PacketHelper.readString(buffer);
this.senderNode = PacketHelper.readString(buffer);
this.decode(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 Alemiz
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package alemiz.stargate.vortex.common.protocol.packet;

import alemiz.stargate.protocol.types.PacketHelper;
import alemiz.stargate.vortex.common.protocol.VortexPacketListener;
import alemiz.stargate.vortex.common.protocol.VortexPacketPool;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Data
@ToString
@EqualsAndHashCode(doNotUseGetters = true, callSuper = false)
public class VortexTopicSubscribePacket implements VortexPacket {

private String topic;
private boolean unsubscribe;

@Override
public void encodePayload(ByteBuf buffer) {
PacketHelper.writeString(buffer, this.topic);
buffer.writeBoolean(this.unsubscribe);
}

@Override
public void decodePayload(ByteBuf buffer) {
this.topic = PacketHelper.readString(buffer);
this.unsubscribe = buffer.readBoolean();
}

@Override
public boolean handle(VortexPacketListener listener) {
return false;
}

@Override
public short getPacketId() {
return VortexPacketPool.VORTEX_TOPIC_SUBSCRIBE_PACKET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ public class VortexClientHandshakePacket extends StarGatePacket {
private String vortexType;
private String primaryMasterNode = "";
private List<String> masterNodes = new ObjectArrayList<>();
private List<String> topics = new ObjectArrayList<>();

@Override
public void encodePayload(ByteBuf buffer) {
PacketHelper.writeString(buffer, this.vortexType);
PacketHelper.writeString(buffer, this.primaryMasterNode);
PacketHelper.writeArray(buffer, this.masterNodes, PacketHelper::writeString);
PacketHelper.writeArray(buffer, this.topics, PacketHelper::writeString);
}

@Override
public void decodePayload(ByteBuf buffer) {
this.vortexType = PacketHelper.readString(buffer);
this.primaryMasterNode = PacketHelper.readString(buffer);
PacketHelper.readArray(buffer, this.masterNodes, PacketHelper::readString);
PacketHelper.readArray(buffer, this.topics, PacketHelper::readString);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion minecraft/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>vortex-parent</artifactId>
<groupId>alemiz.stargate.vortex</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
package alemiz.stargate.vortex.minecraft.node;

import alemiz.stargate.StarGateSession;
import alemiz.stargate.vortex.common.node.ClientSideNode;
import alemiz.stargate.vortex.common.node.VortexNode;
import alemiz.stargate.vortex.common.node.VortexNodeOwner;
import alemiz.stargate.vortex.common.node.VortexNodeType;
import alemiz.stargate.vortex.common.node.*;

import static alemiz.stargate.vortex.minecraft.Minecraft.MINECRAFT_CLIENT_CHILD;

public class MinecraftClientChildNode extends VortexNode implements ClientSideNode {
public class MinecraftClientChildNode extends VortexAbstractClientNode {

public MinecraftClientChildNode(StarGateSession session, VortexNodeOwner vortexParent) {
super(session, vortexParent);
Expand Down
Loading

0 comments on commit 623e1f8

Please sign in to comment.