Skip to content

Commit

Permalink
Merge pull request #14 from Yolean/many-onupdate
Browse files Browse the repository at this point in the history
Adds support for multiple onupdate hook URLs
  • Loading branch information
solsson authored Mar 1, 2019
2 parents c862cb3 + e17a36a commit dedce3a
Show file tree
Hide file tree
Showing 41 changed files with 1,331 additions and 388 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ The [build-contract](https://github.com/Yolean/build-contract/) can be used as d
alias compose='docker-compose -f build-contracts/docker-compose.yml'
gradle jibDockerBuild --image=yolean/kafka-keyvalue:dev
compose up -d cache1
compose up smoketest
compose up --build example-nodejs-client
compose down
```

Note: Running `build-contract` without warmup doesn't work ATM, due to timing issues at start.
Note: `build-contract` (see [build-and-push.sh](./build-and-push.sh)) sometimes fails due to timing issues. Try re-running.

During development of the cache itself or the example nodejs client
it's more convenient to start only `kafka` and `pixy` through docker.

The main class is `se.yolean.kafka.keyvalue.cli.Main`.

Run the cache service from your IDE with args like: `--port 18081 --streams-props bootstrap.servers=localhost:19092 num.standby.replicas=0 --hostname localhost --topic topic1 --application-id kv-test1-local-001 --onupdate http://127.0.0.1:8081/updated`
Run the cache service from your IDE with args like: `--port 18081 --streams-props bootstrap.servers=localhost:19092 num.standby.replicas=0 --hostname localhost --topic topic1 --application-id kv-test1-local-001 --onupdate http://127.0.0.1:8081/kafka-keyvalue/v1/updates`

Test manually using for example `echo 'mytest={"t":1}' | kafkacat -b localhost:19092 -P -t topic1 -K '='; curl http://localhost:19081/cache/v1/raw/mytest`.

Run the nodejs locally using: `cd example-nodejs-client; npm ci; ./node_modules/.bin/jest --watch`
Run the nodejs locally using: `cd example-nodejs-client; npm ci; ./node_modules/.bin/jest --runInBand --watch `
(Note that the mock server for unupdate calls only exists during Jest runs)

## Logging

The distribution bundles log4j2, which we also use as logging API.
To configure per deployment, provide a file and set `-Dlog4j.configurationFile`.
For reconfigurability at runtime Kubernets configmaps could be used with [monitorInterval](https://logging.apache.org/log4j/2.x/manual/configuration.html#AutomaticReconfiguration).
12 changes: 9 additions & 3 deletions build-and-push.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#!/bin/sh
set -e
# TODO make jib build run the unit tests

# If all tooling is available locally use
#gradlejibdocker=gradle
gradlejibdocker="docker run --rm -v $(pwd):/workspace -v /var/run/docker.sock:/var/run/docker.sock solsson/gradle-jib-docker:latest gradle --no-daemon --no-parallel --no-build-cache"
#contract=build-contract
gradlejibdocker="docker run --rm -v $(pwd):/workspace -v /var/run/docker.sock:/var/run/docker.sock solsson/gradle-jib-docker@sha256:390f765ba4c8423e30ae1668bfd2e74f026a11b5ec3f0bae23bd36b0ed4c0c75 gradle --no-daemon --no-parallel"
contract="docker run -v /var/run/docker.sock:/var/run/docker.sock -v $(pwd)/:/source --rm --name kafka-keyvalue-build solsson/build-contract@sha256:961624a502c4bf64bdec328e65a911a2096192e7c1a268d7360b9c85ae7a35b8"

$gradlejibdocker --stacktrace test
$gradlejibdocker --stacktrace jibDockerBuild --image=yolean/kafka-keyvalue:dev
build-contract
$gradlejibdocker --stacktrace jibDockerBuild --image=yolean/kafka-keyvalue:dev -Djib.baseImageCache=build/jib-base-image-cache

$contract test

docker tag yolean/kafka-keyvalue:dev yolean/kafka-keyvalue:latest
docker push yolean/kafka-keyvalue:latest
38 changes: 36 additions & 2 deletions build-contracts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,40 @@ services:
- --topic
- topic1
- --onupdate
- http://example-nodejs-client:8080/updates
- http://example-nodejs-client:8081/kafka-keyvalue/v1/updates
- http://onupdate-logging:8080/
- http://example-nodejs-client:8082/testpost
- --starttimeout
- '10'

onupdate-logging:
image: solsson/httpstatus-random:access-logging@sha256:5330b2e84457e65ae9552acb00b26e8b7b2a878bcafb3baabddc13c1f840cd90
expose:
- '8080'

smoketest:
depends_on:
- onupdate-logging
- pixy
- cache1
image: solsson/curl@sha256:92ebf15ac57bea360484480336ed5d9fa16d38d773fd00f7e9fb2cae94baf25a
labels:
- com.yolean.build-contract
entrypoint:
- /bin/bash
- -cex
- |
curl --ipv4 --retry 5 --retry-connrefused -H 'User-Agent: curl-based-kafka-keyvalue-smoketest' http://onupdate-logging:8080/
curl --ipv4 --retry 5 --retry-connrefused http://pixy:19090 -I
curl --ipv4 --retry 5 --retry-connrefused http://cache1:19081 -I
curl --ipv4 --retry 5 --retry-connrefused http://cache1:19081/metrics -f | grep consumer_metrics | grep incoming_byte_total
curl --ipv4 --retry 5 --retry-connrefused http://cache1:19081/cache/v1/raw/smoketest1 -I
curl --ipv4 --retry 5 --retry-connrefused http://pixy:19090/topics -f | grep topic1
curl --ipv4 -d '{"x":1}' -H 'Content-Type: application/json' 'http://pixy:19090/topics/topic1/messages?key=smoketest1' -f
curl --ipv4 -d '{"x":2}' -H 'Content-Type: application/json' 'http://pixy:19090/topics/topic1/messages?key=smoketest1&sync' -f
curl --ipv4 -d '{"x":3}' -H 'Content-Type: application/json' 'http://pixy:19090/topics/topic1/messages?key=smoketest1&sync' -f
curl --ipv4 --retry 5 --retry-connrefused http://cache1:19081/cache/v1/raw/smoketest1 -f | grep '{"x":3}'
example-nodejs-client:
depends_on:
- pixy
Expand All @@ -95,7 +125,11 @@ services:
labels:
- com.yolean.build-contract
expose:
- "8080"
- "8081"
- "8082"
environment:
- PIXY_HOST=http://pixy:19090
- CACHE1_HOST=http://cache1:19081
command:
# By design the cache service deals with a single topic, meaning that tests probably can't run concurrently
- --runInBand
19 changes: 13 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ group 'se.yolean'

sourceCompatibility = 1.11

jib.from.image = 'gcr.io/distroless/java@sha256:a3ec250f951cf33017843691cc54f8dfb43a3727e095a75d149dc2ada5720d30'
jib {
from {
image = 'gcr.io/distroless/java@sha256:31d5dfbf339e09dd31524dc301df29ac066744bad5a89a20ffa582e33d01e998'
}
to {
image = 'yolean/kafka-keyvalue'
tags = ['dev']
}
}

dependencies {
runtime 'com.fasterxml.jackson.core:jackson-databind:2.9.8'
Expand All @@ -43,8 +51,8 @@ dependencies {
compile group: 'javax.inject', name: 'javax.inject', version: '1'
compile group: 'com.google.inject', name: 'guice', version: '4.2.0'

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.1.0'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.1.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.1.1'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.1.1'

compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.2'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.2'
Expand All @@ -60,7 +68,7 @@ dependencies {
testRuntime("org.junit.platform:junit-platform-launcher:1.4.0")
testCompile group: 'org.mockito', name: 'mockito-core', version: '2.24.0'

testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '2.1.0'
testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '2.1.1'

testCompile group: 'javax.json', name: 'javax.json-api', version: '1.1.2'

Expand All @@ -69,7 +77,7 @@ dependencies {
compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.4.14.v20181114'
compile group: 'org.glassfish.jersey.core', name: 'jersey-common', version: '2.28'
compile group: 'org.glassfish.jersey.inject', name: 'jersey-hk2', version: '2.28'
compile group: 'org.glassfish.jersey.inject', name: 'jersey-hk2', version: '2.28'
compile group: 'org.glassfish.jersey.containers', name: 'jersey-container-servlet', version: '2.28'
compile group: 'org.glassfish.jersey.media', name: 'jersey-media-json-jackson', version: '2.28'

Expand All @@ -85,4 +93,3 @@ task copyToLib(type: Copy) {
}

build.dependsOn(copyToLib)

113 changes: 41 additions & 72 deletions example-nodejs-client/cache-update-flow.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,13 @@ const {
} = process.env;

const fetch = require('node-fetch');
const { gzip, gunzip } = require('zlib');

// retry on no connection, but not on any status code
// There's node-fetch-retry and node-fetch-plus if we want libs
const fetchRetry = async (url, opts) => {
let retry = opts && opts.retries || 3
while (retry > 0) {
try {
return await fetch(url, opts)
} catch(e) {
if (opts.retryCallback) {
opts.retryCallback(retry)
}
retry = retry - 1
if (retry == 0) {
throw e
}
}
}
};

// we don't use mockserver for any asserts now (onupdate- spec does that) but the access logging is a bit useful for multi-onupdate still
const mockserver = require('./mockserver');

beforeAll(() => {
mockserver.start();
});

afterAll(() => {
mockserver.stop();
});
Expand All @@ -44,52 +25,6 @@ describe("A complete cache update flow", () => {
expect(response.status).toEqual(200);
});

test("Check that pixy is online at " + PIXY_HOST, async () => {
let response = await fetchRetry(PIXY_HOST, {
timeout: 3,
retries: 5,
retryCallback: retry => console.log('Retrying pixy access', retry)
});
expect(response.status).toEqual(404);
});

test("Check existence of test topic " + TOPIC1_NAME, async () => {
let retries = 5;
while (true) {
try {
const response = await fetch(`${PIXY_HOST}/topics`, {
method: 'GET',
headers: {
'Accept': 'application/json'
}
});
expect(response.status).toEqual(200);
expect(await response.json()).toContain(TOPIC1_NAME);
retries = 0;
} catch (e) {
if (retries-- < 1) throw e;
console.log('Retrying topic existence');
}
}
});

test("Check that cache is online at " + CACHE1_HOST, async () => {
//const response = await fetch(`${CACHE1_HOST}/ready`, {
const response = await fetchRetry(`${CACHE1_HOST}/`, {
method: 'GET',
headers: {
'Accept': 'application/json'
},
timeout: 3,
retries: 10,
retryCallback: retry => console.log('Retrying cache access', retry)
});
//expect(response.status).toEqual(204);
// For now we don't have a working readiness check
//expect(response.status).toEqual(500);
expect(response.status).toEqual(404);
});

it("Starts with a produce to Pixy", async () => {
const response = await fetch(`${PIXY_HOST}/topics/${TOPIC1_NAME}/messages?key=testasync`, {
method: 'POST',
Expand Down Expand Up @@ -169,15 +104,49 @@ describe("A complete cache update flow", () => {

it("Can enumerate keys", async () => {
const response = await fetch(`${CACHE1_HOST}/cache/v1/keys`);
expect(await response.json()).toEqual(["test1", "testasync"]);
expect(await response.json()).toEqual(expect.arrayContaining(["test1", "testasync"]));
});

it("Can stream values, newline separated - but note that order isn't guaranteed to match that of /keys", async () => {
const response = await fetch(`${CACHE1_HOST}/cache/v1/values`);
expect(await response.text()).toEqual(
`{"test":"${TEST_ID}","step":"First wait for ack"}` + '\n' +
`{"test":"${TEST_ID}","step":"First async produce"}` + '\n'
);
const body = await response.text();
expect(body).toContain(`{"test":"${TEST_ID}","step":"First wait for ack"}` + '\n');
expect(body).toContain(`{"test":"${TEST_ID}","step":"First async produce"}` + '\n');
});

let testblob, testblobcomingback;

test("We can gzip some test data", done => {
const jsonstring = JSON.stringify({ test: TEST_ID, step: 'No key' });
gzip(jsonstring, (err, gzipped) => {
expect(err).toEqual(null);
testblob = gzipped;
done();
});
});

it('handles gzipped payloads', async () => {
expect(testblob).toBeTruthy();
await fetch(`${PIXY_HOST}/topics/${TOPIC1_NAME}/messages?key=testgzip1&sync`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: testblob
});
const response = await fetch(`${CACHE1_HOST}/cache/v1/raw/testgzip1`);
expect(response.ok).toEqual(true);
expect(response.status).toEqual(200);
testblobcomingback = await response.buffer();
});

it('is a gunzippable payload with the actual data intact', done => {
expect(testblobcomingback).toBeTruthy();
gunzip(testblobcomingback, (err, gunzipped) => {
expect(err).toEqual(null);
expect(JSON.parse(gunzipped)).toEqual({ test: TEST_ID, step: 'No key' });
done();
});
});

xit("... so if we key+value streaming we should add another endpoint", async () => {
Expand Down
3 changes: 3 additions & 0 deletions example-nodejs-client/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports = {
globalSetup: './stack-setup'
}
6 changes: 5 additions & 1 deletion example-nodejs-client/mockserver.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
const port = 8081,

// Note: this server's port is currently not exposed in the docker-compose test setup,
// because onpudate-flow.spec.js has its own server

const port = 8082,
express = require('express'),
app = express(),
morgan = require('morgan');
Expand Down
Loading

0 comments on commit dedce3a

Please sign in to comment.