Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream manager debugging support #35

Merged
merged 58 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
116ef68
feat: added support in backend for stream manager
brnaba-aws Sep 12, 2023
a3f3b3c
feat: added support in frontend for stream manager
brnaba-aws Sep 12, 2023
4787062
fix: render based on preferences
brnaba-aws Sep 12, 2023
1d27f03
chore: applied review
brnaba-aws Sep 12, 2023
8b4e1da
chore: applied review
brnaba-aws Sep 12, 2023
c6619a2
chore: added support for stream manager custom port
brnaba-aws Sep 12, 2023
002b516
chore: added support for exportStatuses
brnaba-aws Sep 12, 2023
5560874
chore: added support for delete stream
brnaba-aws Sep 13, 2023
0189ae4
feat: added support for adding message to a stream
brnaba-aws Sep 13, 2023
3105e9f
feat: added support for stream list pagination
brnaba-aws Sep 13, 2023
b09d781
fix: minor change
brnaba-aws Sep 13, 2023
b5520aa
feat: adding support in backend for createMessageStream
brnaba-aws Sep 13, 2023
2e48cf0
fix: ui update
brnaba-aws Sep 13, 2023
46b8ac7
feat: added support for export definition
brnaba-aws Sep 13, 2023
dbf58c1
feat: saving preferences, ui improvements
brnaba-aws Sep 14, 2023
4070450
chore: align UI with greengrass UI expert
brnaba-aws Sep 14, 2023
0c9d657
chore: ui improvements
brnaba-aws Sep 14, 2023
79b37f5
chore: renaming
brnaba-aws Sep 14, 2023
d9ca922
chore: improved create stream ui
brnaba-aws Sep 14, 2023
0b5278e
fix: minor updates
brnaba-aws Sep 14, 2023
64d694e
chore: cleanup
brnaba-aws Sep 14, 2023
6ae12dc
chore: ui cleanup
brnaba-aws Sep 14, 2023
482f41c
feat: added support in backend for updateMessageStream
brnaba-aws Sep 14, 2023
74fdc1a
feat: added support in frontend for updateMessageStream, ui improvements
brnaba-aws Sep 14, 2023
8d0271b
feat: updating backend with error feedback to frontend
brnaba-aws Sep 15, 2023
9f40ff7
feat: updated backend with proper response to streamManager requests
brnaba-aws Sep 15, 2023
2d4b127
feat: added support for update export definition
brnaba-aws Sep 19, 2023
a5b1f92
feat: added support for delete, improved update
brnaba-aws Sep 19, 2023
795e953
feat: added support for http and iot analytics
brnaba-aws Sep 19, 2023
64d6c98
feat: added support for s3 export
brnaba-aws Sep 20, 2023
9772415
chore: removed unsused stuff
brnaba-aws Sep 20, 2023
0bd283f
chore: removed log
brnaba-aws Sep 20, 2023
0ee2d43
chore: fixed backend and front
brnaba-aws Sep 21, 2023
c43659a
chore: updated dependency
brnaba-aws Sep 21, 2023
e486975
chore: using cbor 2.13
brnaba-aws Sep 21, 2023
74fd49e
fix: cleanup exceptions
MikeDombo Sep 21, 2023
aa2f8ba
chore: improve logging of errors
MikeDombo Sep 21, 2023
e9d8123
chore: reformat, move some to async-await
MikeDombo Sep 21, 2023
1218217
fix: typos
MikeDombo Sep 21, 2023
90a6d7d
chore: cleanup StreamDetails
MikeDombo Sep 21, 2023
da35a4a
chore: cleanup StreamManager+StreamExportDefinition
MikeDombo Sep 21, 2023
6f72fbd
fixed label
brnaba-aws Sep 21, 2023
c0490ed
fix: add code to easily disable authentication (code change, not usin…
MikeDombo Sep 21, 2023
d1423b5
chore: exclude dependencies from jar
MikeDombo Sep 21, 2023
ad91a25
fix: use stream manager model for descriptions
MikeDombo Sep 21, 2023
9526f34
feat: added delete stream capability on stream detail page
brnaba-aws Sep 22, 2023
84d41a9
chore: added 4 random digit when creating new resource (stream or exp…
brnaba-aws Sep 22, 2023
21d524d
fix: further cleanup, removing duplicated strings, using template str…
MikeDombo Sep 22, 2023
edda61f
chore: changed stream definition from columns inside tabs to columns …
brnaba-aws Sep 22, 2023
35fa2d4
fix: remove loading state to prevent flashing on refresh
MikeDombo Sep 22, 2023
a70f8a0
fix: typo
MikeDombo Sep 26, 2023
27570d6
fix: typo
MikeDombo Sep 26, 2023
6fc5ddb
feat: support stream ttl setting
MikeDombo Sep 26, 2023
b72aad4
fix: make sidebar links work with cmd/ctrl click
MikeDombo Sep 26, 2023
3c3ad4d
fix: typo
MikeDombo Sep 26, 2023
15a879c
fix: use createPersistedState hook insteaad of localStorage
MikeDombo Sep 26, 2023
4b87d71
fix: get correct SM port, reconnect when needed
MikeDombo Sep 27, 2023
92a8b04
fix: eslint
MikeDombo Sep 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,31 @@
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws.greengrass</groupId>
<artifactId>streammanager</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.10.1</version>
<scope>provided</scope>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>2.13.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/aws/greengrass/localdebugconsole/APICalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,39 @@ public enum APICalls {
* Utility called by the client to unsubscribe to a local IPC topic.
*/
unsubscribeToPubSubTopic,

/**
* Returns the current Stream Manager streams list
*/
streamManagerListStreams,

/**
* Describes a message stream to get metadata including the stream’s definition, size, and exporter statuses.
*/
streamManagerDescribeStream,

/**
* Deletes a message stream based on its name.
*/
streamManagerDeleteMessageStream,

/**
* Read message(s) from a chosen stream with options. If no options are specified it will try to read 1 message from the stream.
*/
streamManagerReadMessages,

/**
* Append a message into the specified message stream.
*/
streamManagerAppendMessage,

/**
* Create a message stream with a given definition.
*/
streamManagerCreateMessageStream,

/**
* Update a message stream with a given definition.
*/
streamManagerUpdateMessageStream,
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.aws.greengrass.localdebugconsole;

import com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition;
import com.aws.greengrass.builtin.services.pubsub.PubSubIPCEventStreamAgent;
import com.aws.greengrass.builtin.services.pubsub.PublishEvent;
import com.aws.greengrass.builtin.services.pubsub.SubscribeRequest;
Expand All @@ -16,6 +17,7 @@
import com.aws.greengrass.localdebugconsole.messageutils.MessageType;
import com.aws.greengrass.localdebugconsole.messageutils.PackedRequest;
import com.aws.greengrass.localdebugconsole.messageutils.Request;
import com.aws.greengrass.localdebugconsole.messageutils.StreamManagerResponseMessage;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.mqttclient.MqttClient;
import com.aws.greengrass.mqttclient.MqttRequestException;
Expand All @@ -24,6 +26,7 @@
import com.aws.greengrass.mqttclient.v5.Unsubscribe;
import com.aws.greengrass.util.DefaultConcurrentHashMap;
import com.aws.greengrass.util.Pair;
import com.aws.greengrass.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -71,20 +74,23 @@ public class DashboardServer extends WebSocketServer implements KernelMessagePus
private final Authenticator authenticator;
private final MqttClient mqttClient;

private final StreamManagerHelper streamManagerHelper;

PubSubIPCEventStreamAgent pubSubIPCAgent;
private final String SERVICE_NAME = "LocalDebugConsole";

public DashboardServer(InetSocketAddress address, Logger logger, Kernel root, DeviceConfiguration deviceConfig,
Authenticator authenticator, Provider<SSLEngine> engineProvider) {
Authenticator authenticator, Provider<SSLEngine> engineProvider, String streamManagerAuthToken) {
this(address, logger, new KernelCommunicator(root, logger, deviceConfig), authenticator, engineProvider,
root.getContext().get(PubSubIPCEventStreamAgent.class),
root.getContext().get(MqttClient.class));
root.getContext().get(MqttClient.class),
new StreamManagerHelper(root, streamManagerAuthToken));
}

// constructor for unit testing
DashboardServer(InetSocketAddress address, Logger logger, DashboardAPI dashboardAPI, Authenticator authenticator,
Provider<SSLEngine> engineProvider, PubSubIPCEventStreamAgent pubSubIPCAgent,
MqttClient mqttClient) {
MqttClient mqttClient, StreamManagerHelper streamManagerHelper) {
super(address);
setReuseAddr(true);
setTcpNoDelay(true);
Expand All @@ -97,6 +103,7 @@ public DashboardServer(InetSocketAddress address, Logger logger, Kernel root, De
this.logger.atInfo().log("Starting dashboard server on address: {}", address);
this.pubSubIPCAgent = pubSubIPCAgent;
this.mqttClient = mqttClient;
this.streamManagerHelper = streamManagerHelper;
}

// links the API impl and starts the socket server
Expand Down Expand Up @@ -251,6 +258,40 @@ public void onMessage(WebSocket conn, String msg) {
unsubscribeFromPubSubTopic(conn, packedRequest, req);
break;
}
case streamManagerListStreams: {
streamManagerListStreams(conn, packedRequest);
break;
}
case streamManagerDescribeStream: {
streamManagerDescribeStream(conn, packedRequest, req);
break;
}

case streamManagerDeleteMessageStream: {
streamManagerDeleteMessageStream(conn, packedRequest, req);
break;
}

case streamManagerReadMessages: {
streamManagerReadMessages(conn, packedRequest, req);
break;
}

case streamManagerAppendMessage:{
streamManagerAppendMessage(conn, packedRequest, req);
break;
}

case streamManagerCreateMessageStream:{
streamManagerCreateMessageStream(conn, packedRequest, req);
break;
}

case streamManagerUpdateMessageStream:{
streamManagerUpdateMessageStream(conn, packedRequest, req);
break;
}

default: { // echo
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, req.call));
break;
Expand Down Expand Up @@ -355,6 +396,111 @@ private void publishToPubSubTopic(WebSocket conn, PackedRequest packedRequest, R
}
}

private void streamManagerListStreams(WebSocket conn, PackedRequest packedRequest) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
responseMessage.streamsList = this.streamManagerHelper.listStreams();
responseMessage.successful = true;
}
catch (Exception e){
logger.error("Error while listing streams:", e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}

private void streamManagerDescribeStream(WebSocket conn, PackedRequest packedRequest, Request req) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
responseMessage.messageStreamInfo = this.streamManagerHelper.describeStream(req.args[0]);
responseMessage.successful = true;
}
catch (Exception e){
logger.error("Error while describing stream:",e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}

private void streamManagerDeleteMessageStream(WebSocket conn, PackedRequest packedRequest, Request req) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
this.streamManagerHelper.deleteMessageStream(req.args[0]);
responseMessage.successful = true;
}
catch (Exception e){
logger.error("Error while deleting stream:", e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}

private void streamManagerReadMessages(WebSocket conn, PackedRequest packedRequest, Request req) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
if (req.args.length == 5) {
responseMessage.messagesList = this.streamManagerHelper.readMessages(
req.args[0],
Long.parseLong(req.args[1]),
Long.parseLong(req.args[2]),
Long.parseLong(req.args[3]),
Long.parseLong(req.args[4]));
responseMessage.successful = true;
}
else{
logger.atError().log("StreamManagerReadMessages requires 5 arguments");
responseMessage.errorMsg = "StreamManagerReadMessages requires 5 arguments";
}
}
catch (Exception e){
logger.error("Error while reading messages:", e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}

private void streamManagerAppendMessage(WebSocket conn, PackedRequest packedRequest, Request req) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
this.streamManagerHelper.appendMessage(req.args[0], req.args[1].getBytes());
responseMessage.successful = true;
}
catch (Exception e){
logger.error("Error while appending message to the stream:", e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}

private void streamManagerCreateMessageStream(WebSocket conn, PackedRequest packedRequest, Request req) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
MessageStreamDefinition messageStreamDefinition = jsonMapper.readValue(req.args[0], MessageStreamDefinition.class);
this.streamManagerHelper.createMessageStream(messageStreamDefinition);
responseMessage.successful = true;
}
catch (Exception e){
logger.error("Error while appending message to the stream:", e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}

private void streamManagerUpdateMessageStream(WebSocket conn, PackedRequest packedRequest, Request req) {
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
try {
MessageStreamDefinition messageStreamDefinition = jsonMapper.readValue(req.args[0], MessageStreamDefinition.class);
this.streamManagerHelper.updateMessageStream(messageStreamDefinition);
responseMessage.successful = true;
}
catch (Exception e){
logger.error("Error while appending message to the stream:", e);
responseMessage.errorMsg = Utils.generateFailureMessage(e);
}
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
}


@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
connections.remove(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.aws.greengrass.dependency.ImplementsService;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.deployment.DeviceConfiguration;
import com.aws.greengrass.ipc.AuthenticationHandler;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.lifecyclemanager.PluginService;
import com.aws.greengrass.logging.api.Logger;
Expand Down Expand Up @@ -96,6 +97,7 @@
import javax.net.ssl.SSLException;

import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
import static com.aws.greengrass.ipc.AuthenticationHandler.SERVICE_UNIQUE_ID_KEY;
import static com.aws.greengrass.util.Utils.isEmpty;
import static io.netty.buffer.Unpooled.copiedBuffer;

Expand Down Expand Up @@ -131,6 +133,7 @@ public class SimpleHttpServer extends PluginService implements Authenticator {
private boolean httpsEnabled = DEFAULT_HTTPS_ENABLED;
private SslContext context;
private Provider<SSLEngine> engineProvider;
private String streamManagerAuthToken;

@Inject
public SimpleHttpServer(Topics t, Kernel kernel, DeviceConfiguration deviceConfiguration) {
Expand All @@ -142,6 +145,10 @@ public SimpleHttpServer(Topics t, Kernel kernel, DeviceConfiguration deviceConfi
@Override
public void postInject() {
super.postInject();
// Does not happen for built-in/plugin services so doing explicitly
AuthenticationHandler.registerAuthenticationToken(this);
streamManagerAuthToken = Coerce.toString(this.getPrivateConfig().findLeafChild(SERVICE_UNIQUE_ID_KEY));

config.lookup(CONFIGURATION_CONFIG_KEY, "port").dflt(port).subscribe((w, n) -> {
int oldPort = port;
port = Coerce.toInt(n);
Expand Down Expand Up @@ -195,7 +202,7 @@ public void startup() throws InterruptedException {

logger.atInfo().log("Starting local dashboard server");
dashboardServer = new DashboardServer(new InetSocketAddress(bindHostname, websocketPort), logger,
kernel, deviceConfig, this, engineProvider);
kernel, deviceConfig, this, engineProvider, streamManagerAuthToken);
dashboardServer.startup();
try {
// We need to wait for the server to startup before grabbing the port because it starts in a separate thread
Expand Down Expand Up @@ -531,6 +538,10 @@ private boolean authenticated(String authHeader) {

@Override
public boolean isUsernameAndPasswordValid(Pair<String, String> usernameAndPassword) {
if (consoleAuthDisabled()) {
return true;
}

Topics passwordTopics = config.getRoot().findTopics(DEBUG_PASSWORD_NAMESPACE);
if (passwordTopics == null || usernameAndPassword == null) {
return false;
Expand Down Expand Up @@ -563,7 +574,15 @@ public boolean isUsernameAndPasswordValid(Pair<String, String> usernameAndPasswo
return Instant.now().isBefore(Instant.ofEpochMilli(Coerce.toLong(expirationTopic)));
}

private static boolean consoleAuthDisabled() {
// Set to true in development so you don't need to authenticate all the time.
return false;
}

private Pair<String, String> getUsernameAndPassword(String authHeader) {
if (consoleAuthDisabled()) {
return new Pair<>("", "");
}
if (Utils.isEmpty(authHeader)) {
return null;
}
Expand Down
Loading
Loading