Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make pubsub topic configurable #12

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class Constants {
public static final String AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER = "aws.greengrass.telemetry.NucleusEmitter";

public static final String PUBSUB_PUBLISH_CONFIG_NAME = "pubSubPublish";
public static final String PUBSUB_TOPIC_CONFIG_NAME = "pubSubTopic";
public static final String MQTT_TOPIC_CONFIG_NAME = "mqttTopic";
public static final String TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME = "telemetryPublishIntervalMs";
public static final String DEFAULT_TELEMETRY_PUBSUB_TOPIC = "$local/greengrass/telemetry";
Expand All @@ -20,9 +21,9 @@ public class Constants {
public static final long MIN_TELEMETRY_PUBLISH_INTERVAL_MS = 500;

public static final String PUBSUB_PUBLISH_SUCCESS_LOG = "Published local pub/sub message on topic "
+ "'$local/greengrass/telemetry'";
+ "'{}'";
public static final String PUBSUB_PUBLISH_FAILURE_LOG = "Failed to publish local pub/sub message on topic "
+ "'$local/greengrass/telemetry'";
+ "'{}'";
public static final String TELEMETRY_PUBLISH_SCHEDULED = "Scheduling telemetry publish";
public static final String TELEMETRY_PUBLISH_STOPPING = "Stopping telemetry publish";
public static final String PUBSUB_PUBLISH_STARTING = "Starting local pub/sub publishing";
Expand All @@ -40,6 +41,9 @@ public class Constants {
+ " configuration.";
public static final String PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG = "Could not parse the pubSubPublish config option"
+ " {}. Please make sure this is set to a valid boolean value";

public static final String PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG = "Could not parse the pubSubTopic config option {}."
+ " Please make sure this is set to a valid topic string value";
public static final String MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG = "Could not parse the mqttTopic config option {}."
+ " Please make sure this is set to a valid topic string value";
public static final String TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG = "Could not parse the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.CONFIG_UPDATE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBSUB_TOPIC;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.INVALID_PUBLISH_THRESHOLD_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.JSON_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MIN_TELEMETRY_PUBLISH_INTERVAL_MS;
Expand Down Expand Up @@ -132,10 +131,10 @@ public void startup() {
scheduleTelemetryPublish();
}

private void publishTelemetry(boolean pubSubPublish, boolean mqttPublish, String mqttTopic) {
private void publishTelemetry(boolean pubSubPublish, String pubSubTopic, boolean mqttPublish, String mqttTopic) {
String jsonString = retrieveMetricsJson(jsonMapper);
if (pubSubPublish) {
this.pubSubPublisher.publishMessage(jsonString, DEFAULT_TELEMETRY_PUBSUB_TOPIC);
this.pubSubPublisher.publishMessage(jsonString, pubSubTopic);
}
if (mqttPublish) {
this.mqttPublisher.publishMessage(jsonString, mqttTopic);
Expand All @@ -162,12 +161,14 @@ private void scheduleTelemetryPublish() {
}
logger.debug(TELEMETRY_PUBLISH_SCHEDULED);
telemetryPublishFuture = ses.scheduleAtFixedRate(
() -> publishTelemetry(newPubPublish,!Utils.isEmpty(newMqttTopic), newMqttTopic), 0,
() -> publishTelemetry(newPubPublish, configuration.getPubsubTopic(),
!Utils.isEmpty(newMqttTopic), newMqttTopic),
0,
newTelemetryPublishIntervalMs, TimeUnit.MILLISECONDS);
}

logger.info(STARTUP_CONFIGURATION_LOG, newPubPublish,
DEFAULT_TELEMETRY_PUBSUB_TOPIC, newMqttTopic, newTelemetryPublishIntervalMs);
configuration.getPubsubTopic(), newMqttTopic, newTelemetryPublishIntervalMs);
}

protected String retrieveMetricsJson(ObjectMapper jsonMapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@

import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.CONFIG_INVALID_OPTION_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBLISH_INTERVAL_MS;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBSUB_TOPIC;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG;

Expand All @@ -30,6 +33,9 @@ public class NucleusEmitterConfiguration {
//Only local pub/sub is enabled by default
@Builder.Default
boolean pubsubPublish = true;
@Builder.Default
String pubsubTopic = DEFAULT_TELEMETRY_PUBSUB_TOPIC;

@Builder.Default
String mqttTopic = "";
@Builder.Default
Expand All @@ -39,15 +45,14 @@ public class NucleusEmitterConfiguration {
* Get the Nucleus Emitter configuration from the POJO map.
* @param pojo POJO Topics object.
* @param logger Greengrass logger.
* @return the Nucleus Emitter configuration.
* @return the Nucleus Emitter configuration.
*/
public static NucleusEmitterConfiguration fromPojo(Map<String, Object> pojo, Logger logger) {
if (pojo.isEmpty()) {
return null;
}
long telemetryPublishIntervalMs = DEFAULT_TELEMETRY_PUBLISH_INTERVAL_MS;
boolean pubsubPublish = true;
String mqttTopic = "";
NucleusEmitterConfigurationBuilder config = NucleusEmitterConfiguration.builder();

for (Map.Entry<String, Object> entry : pojo.entrySet()) {
switch (entry.getKey()) {
case PUBSUB_PUBLISH_CONFIG_NAME:
Expand All @@ -58,42 +63,47 @@ public static NucleusEmitterConfiguration fromPojo(Map<String, Object> pojo, Log
logger.error(PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
pubsubPublish = parsedBoolean;
config.pubsubPublish(parsedBoolean);
break;
} else {
logger.error(PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
case TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME:
if (entry.getValue() instanceof Number || entry.getValue() instanceof String) {
telemetryPublishIntervalMs = Coerce.toLong(entry.getValue());
long telemetryPublishIntervalMs = Coerce.toLong(entry.getValue());
if (telemetryPublishIntervalMs == 0L) { //If value is 0 or non-numeric String
logger.error(TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
config.telemetryPublishIntervalMs(telemetryPublishIntervalMs);
break;
} else { //If not a Number or String
logger.error(TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
case MQTT_TOPIC_CONFIG_NAME:
if (entry.getValue() instanceof String) {
mqttTopic = Coerce.toString(entry.getValue());
config.mqttTopic(Coerce.toString(entry.getValue()));
break;
} else {
logger.error(MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
case PUBSUB_TOPIC_CONFIG_NAME:
if (entry.getValue() instanceof String) {
config.pubsubTopic(Coerce.toString(entry.getValue()));
break;
} else {
logger.error(PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
default:
logger.error(CONFIG_INVALID_OPTION_ERROR_LOG, entry.getKey());
return null;
}
}

return NucleusEmitterConfiguration.builder()
.pubsubPublish(pubsubPublish)
.mqttTopic(mqttTopic)
.telemetryPublishIntervalMs(telemetryPublishIntervalMs)
.build();
return config.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public void publishMessage(String message, String topic) {
try {
this.pubSubIPCEventStreamAgent.publish(topic, message.getBytes(StandardCharsets.UTF_8),
AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER);
logger.trace(PUBSUB_PUBLISH_SUCCESS_LOG);
logger.trace(PUBSUB_PUBLISH_SUCCESS_LOG, topic);
} catch (InvalidArgumentsError e) {
logger.error(PUBSUB_PUBLISH_FAILURE_LOG, e);
logger.error(PUBSUB_PUBLISH_FAILURE_LOG, topic, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.NucleusEmitterConfiguration.fromPojo;
Expand Down Expand Up @@ -58,6 +59,14 @@ void GIVEN_valid_string_config_options_THEN_parses_correctly() {
assertEquals(defaultConfiguration, generatedConfiguration);
}

@Test
void GIVEN_valid_nondefault_string_config_options_THEN_parses_correctly() {
Map<String, Object> pojo = new TreeMap<>();
pojo.put(PUBSUB_TOPIC_CONFIG_NAME,"pubsub");
NucleusEmitterConfiguration generatedConfiguration = fromPojo(pojo, logger);
assertEquals("pubsub", generatedConfiguration.getPubsubTopic());
}

@Test
void GIVEN_invalid_pubSubPublish_option_THEN_fails() {
Map<String, Object> pojo = new TreeMap<>();
Expand Down
Loading