Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #8

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
[![build](https://github.com/creek-service/connected-services-demo/actions/workflows/build.yml/badge.svg)](https://github.com/creek-service/connected-services-demo/actions/workflows/build.yml)
[![CodeQL](https://github.com/creek-service/connected-services-demo/actions/workflows/codeql.yml/badge.svg)](https://github.com/creek-service/connected-services-demo/actions/workflows/codeql.yml)

# Basic Kafka Streams Demo
# Connected Services Demo

Repo containing the completed [Basic Kafka Streams demo](https://www.creekservice.org/connected-services-demo)
Repo containing the completed [Connected Services demo](https://www.creekservice.org/connected-services-demo)
and associated [docs](docs/README.md).

This repository is also a template repository to enable later tutorials, that build on this one.
Click the [Use this template](https://github.com/creek-service/connected-services-demo/generate) button at the top to create a new repository from this template.

### Gradle commands

* `./gradlew` should be the go-to local command to run when developing.
Expand Down
8 changes: 4 additions & 4 deletions docs/_data/navigation/demo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
url: /bootstrap
- title: "Add a second microservice"
url: /add-service
#- title: "Write the code"
# children:
# - title: "Service descriptor"
# url: /descriptor
- title: "Write the code"
children:
- title: "Service descriptor"
url: /descriptor
# - title: "Business logic"
# url: /business-logic
#- title: Testing
Expand Down
7 changes: 7 additions & 0 deletions docs/_demo/11-further-reading.md_
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

[todo]: link to additional demos, e.g.

[todo]: - JSON messages
[todo]: - composing services
[todo]: - defining aggregate api
[todo]: - componsing aggregates
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.github.creek.service.connected.services.demo.service.kafka.streams;

import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetHandleUsageTopic;
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetTextTopic;
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetHandleUsageStream;
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetTextStream;
import static java.util.Objects.requireNonNull;
import static org.creekservice.api.kafka.metadata.KafkaTopicDescriptor.DEFAULT_CLUSTER_NAME;

Expand Down Expand Up @@ -50,8 +50,8 @@ public Topology build() {

// Pass a topic descriptor to the Kafka Streams extension to
// obtain a typed `KafkaTopic` instance, which provides access to serde:
final KafkaTopic<Long, String> input = ext.topic(TweetTextTopic);
final KafkaTopic<String, Integer> output = ext.topic(TweetHandleUsageTopic);
final KafkaTopic<Long, String> input = ext.topic(TweetTextStream);
final KafkaTopic<String, Integer> output = ext.topic(TweetHandleUsageStream);

// Build a simple topology:
// Consume input topic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.github.creek.service.connected.services.demo.service.kafka.streams;

// formatting:off
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetTextTopic;
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetHandleUsageTopic;
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetTextStream;
import static io.github.creek.service.connected.services.demo.services.HandleOccurrenceServiceDescriptor.TweetHandleUsageStream;
import static org.creekservice.api.kafka.metadata.KafkaTopicDescriptor.DEFAULT_CLUSTER_NAME;
import static org.creekservice.api.kafka.streams.test.TestTopics.inputTopic;
import static org.creekservice.api.kafka.streams.test.TestTopics.outputTopic;
Expand Down Expand Up @@ -55,8 +55,8 @@ class TopologyBuilderTest {
private Topology topology;
// formatting:off
// begin-snippet: topic-declarations
private TestInputTopic<Long, String> tweetTextTopic;
private TestOutputTopic<String, Integer> handleUsageTopic;
private TestInputTopic<Long, String> tweetTextStream;
private TestOutputTopic<String, Integer> handleUsageStream;
// end-snippet
// formatting:on

Expand All @@ -82,8 +82,8 @@ public void setUp() {
testDriver = new TopologyTestDriver(topology, ext.properties(DEFAULT_CLUSTER_NAME));

// Create the topologies input and output topics"
tweetTextTopic = inputTopic(TweetTextTopic, ctx, testDriver);
handleUsageTopic = outputTopic(TweetHandleUsageTopic, ctx, testDriver);
tweetTextStream = inputTopic(TweetTextStream, ctx, testDriver);
handleUsageStream = outputTopic(TweetHandleUsageStream, ctx, testDriver);
}
// end-snippet
// formatting:on
Expand All @@ -98,11 +98,11 @@ public void tearDown() {
@Test
void shouldOutputHandleOccurrences() {
// When:
tweetTextTopic.pipeInput(1622262145390972929L, "@PepitoTheCat @BillyM2k @PepitoTheCat Responding to feedback, " +
tweetTextStream.pipeInput(1622262145390972929L, "@PepitoTheCat @BillyM2k @PepitoTheCat Responding to feedback, " +
"Twitter will enable a light, write-only API for bots providing good content that is free.");

// Then:
assertThat(handleUsageTopic.readKeyValuesToList(), containsInAnyOrder(
assertThat(handleUsageStream.readKeyValuesToList(), containsInAnyOrder(
pair("@PepitoTheCat", 2),
pair("@BillyM2k", 1)
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@

package io.github.creek.service.connected.services.demo.handle.scoreboard.service.kafka.streams;

import static io.github.creek.service.connected.services.demo.services.HandleScoreboardServiceDescriptor.TweetHandleLeaderBoardChangeLog;
import static io.github.creek.service.connected.services.demo.services.HandleScoreboardServiceDescriptor.TweetHandleLeaderBoardTable;
import static io.github.creek.service.connected.services.demo.services.HandleScoreboardServiceDescriptor.TweetHandleUsageStream;
import static java.util.Objects.requireNonNull;
import static org.creekservice.api.kafka.metadata.KafkaTopicDescriptor.DEFAULT_CLUSTER_NAME;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.Stores;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.kafka.streams.extension.KafkaStreamsExtension;
import org.creekservice.api.kafka.streams.extension.util.Name;

Expand All @@ -33,9 +42,49 @@ public TopologyBuilder(final KafkaStreamsExtension ext) {
this.ext = requireNonNull(ext, "ext");
}

// begin-snippet: build-method
public Topology build() {
final StreamsBuilder builder = new StreamsBuilder();

// Pass a topic descriptor to the Kafka Streams extension to
// obtain a typed `KafkaTopic` instance, which provides access to serde:
final KafkaTopic<String, Integer> input = ext.topic(TweetHandleUsageStream);
final KafkaTopic<String, Integer> changelog = ext.topic(TweetHandleLeaderBoardChangeLog);
final KafkaTopic<String, Integer> output = ext.topic(TweetHandleLeaderBoardTable);

// Build a topology to build the scoreboard:
// Consume input topic:
builder.stream(
input.name(),
Consumed.with(input.keySerde(), input.valueSerde())
.withName(name.name("ingest-" + input.name())))
// Group the input stream by the key, which is the Twitter handle:
.groupByKey(Grouped.as(name.name("group-by-key" + input.name())))
// Reduce to a table tha tracks the total number of occurrences, by Twitter handle:
.reduce(
Integer::sum,
name.named("handle-scoreboard"),
// Todo: call out design issue with in memory store:
// Todo:Materialized.<String, Integer, KeyValueStore<Bytes,
// byte[]>>as(Stores.inMemoryKeyValueStore(changelog.storeName()))
Materialized.<String, Integer>as(Stores.inMemoryKeyValueStore("todo"))
.withKeySerde(changelog.keySerde())
.withValueSerde(changelog.valueSerde())
// Todo: .withRetention() -> get from output topic config

)
.toStream(name.named("scoreboard-to-stream"))
// Finally, produce to output:
.to(
output.name(),
Produced.with(output.keySerde(), output.valueSerde())
.withName(name.name("egress-" + output.name())));

return builder.build(ext.properties(DEFAULT_CLUSTER_NAME));
}
// end-snippet
}

// Todo: document design issues, i.e.
// ever growing table
// sum number wrapping
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public final class HandleOccurrenceServiceDescriptor implements ServiceDescripto
// formatting:off
// begin-snippet: topic-resources
// Define the tweet-text input topic, conceptually owned by this service:
public static final OwnedKafkaTopicInput<Long, String> TweetTextTopic =

public static final OwnedKafkaTopicInput<Long, String> TweetTextStream =
register(
inputTopic(
"twitter.tweet.text", // Topic name
Expand All @@ -54,7 +55,7 @@ public final class HandleOccurrenceServiceDescriptor implements ServiceDescripto
withPartitions(5))); // Topic config

// Define the output topic, again conceptually owned by this service:
public static final OwnedKafkaTopicOutput<String, Integer> TweetHandleUsageTopic =
public static final OwnedKafkaTopicOutput<String, Integer> TweetHandleUsageStream =
register(outputTopic(
"twitter.handle.usage",
String.class, // (Twitter handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@

package io.github.creek.service.connected.services.demo.services;

import static io.github.creek.service.connected.services.demo.internal.TopicConfigBuilder.withPartitions;
import static io.github.creek.service.connected.services.demo.internal.TopicDescriptors.internalTopic;
import static io.github.creek.service.connected.services.demo.internal.TopicDescriptors.outputTopic;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.creekservice.api.kafka.metadata.KafkaTopicInput;
import org.creekservice.api.kafka.metadata.KafkaTopicInternal;
import org.creekservice.api.kafka.metadata.OwnedKafkaTopicOutput;
import org.creekservice.api.platform.metadata.ComponentInput;
import org.creekservice.api.platform.metadata.ComponentInternal;
import org.creekservice.api.platform.metadata.ComponentOutput;
Expand All @@ -30,6 +38,37 @@ public final class HandleScoreboardServiceDescriptor implements ServiceDescripto
private static final List<ComponentInternal> INTERNALS = new ArrayList<>();
private static final List<ComponentOutput> OUTPUTS = new ArrayList<>();

// formatting:off
// begin-snippet: topic-resources
// Hookup the handle-occurrence-service's output as this services, unowned, input:
public static final KafkaTopicInput<String, Integer> TweetHandleUsageStream =
register(HandleOccurrenceServiceDescriptor.TweetHandleUsageStream.toInput());

public static final KafkaTopicInternal<String, Integer> TweetHandleLeaderBoardChangeLog =
register(internalTopic(
"someting", // Todo: fix this
String.class, // (Twitter handle)
Integer.class // (Summed usage count)
));

// Define the output topic, conceptually owned by this service:
public static final OwnedKafkaTopicOutput<String, Integer> TweetHandleLeaderBoardTable =
register(outputTopic(
"twitter.handle.leaderboard",
String.class, // (Twitter handle)
Integer.class, // (Summed usage count)
withPartitions(6)
// Todo: document:
.withRetentionTime(Duration.ofDays(1))));
// end-snippet
// formatting:on

// Todo: call out design issue with 6 partitions
// Todo: call out same design issue with basic kafa streas demo
// Todo: Test if system tests can access internal changelog topic without it being listed here.
// Todo: Do we want to list the internal changelog topic here?
// Todo: Windowing...

public HandleScoreboardServiceDescriptor() {}

@Override
Expand Down Expand Up @@ -57,11 +96,11 @@ private static <T extends ComponentInput> T register(final T input) {
return input;
}

// Uncomment if needed:
// private static <T extends ComponentInternal> T register(final T internal) {
// INTERNALS.add(internal);
// return internal;
// }
// Todo: document uncommenting this
private static <T extends ComponentInternal> T register(final T internal) {
INTERNALS.add(internal);
return internal;
}

private static <T extends ComponentOutput> T register(final T output) {
OUTPUTS.add(output);
Expand Down
1 change: 1 addition & 0 deletions system-tests/src/system-test/example-suite/suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
name: basic suite
services:
- handle-occurrence-service
- handle-scoreboard-service
tests:
- name: test 1
inputs:
Expand Down