Skip to content

Commit

Permalink
Merge pull request #15 from grookage/messageProcessor
Browse files Browse the repository at this point in the history
MessageProcessor
  • Loading branch information
koushikr authored Dec 12, 2024
2 parents dcbcefd + 20ff0d0 commit f6cb05e
Show file tree
Hide file tree
Showing 19 changed files with 66 additions and 23 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file.

## [0.0.1-RC11]

- MessageProcessor
- Introduced MessageProcessor in LeiaMessageProduceClient instead of a lambda
- GetMessages from LeiaMessageProducerClient is now public

## [0.0.1-RC10]

Expand Down
2 changes: 1 addition & 1 deletion leia-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
</parent>

<artifactId>leia-bom</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion leia-client-dropwizard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import com.grookage.leia.client.LeiaMessageProduceClient;
import com.grookage.leia.client.datasource.NamespaceDataSource;
import com.grookage.leia.client.processor.MessageProcessor;
import com.grookage.leia.client.refresher.LeiaClientRefresher;
import com.grookage.leia.client.refresher.LeiaClientSupplier;
import com.grookage.leia.provider.config.LeiaHttpConfiguration;
Expand Down Expand Up @@ -65,6 +66,8 @@ protected LeiaSchemaValidator getSchemaValidator(T configuration,
.build();
}

protected abstract Supplier<MessageProcessor> getMessageProcessor(T configuration);

@Override
public void run(T configuration, Environment environment) {
final var namespaceDataSource = getNamespaceDataSource(configuration);
Expand Down Expand Up @@ -94,6 +97,7 @@ public void run(T configuration, Environment environment) {
.refresher(clientRefresher)
.schemaValidator(validator)
.mapper(environment.getObjectMapper())
.messageProcessor(getMessageProcessor(configuration))
.build();
producerClient.start();
}
Expand Down
2 changes: 1 addition & 1 deletion leia-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.grookage.leia.client;

import com.grookage.leia.client.processor.MessageProcessor;
import com.grookage.leia.models.mux.LeiaMessage;
import com.grookage.leia.models.schema.SchemaKey;
import com.grookage.leia.models.schema.transformer.TransformationTarget;
Expand All @@ -32,6 +33,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;


Expand All @@ -43,6 +45,8 @@ public class LeiaMessageProduceClient extends AbstractSchemaClient {

private final Map<SchemaKey, Map<String, JsonPath>> compiledPaths = new HashMap<>();

private final Supplier<MessageProcessor> messageProcessor;

/*
Multiplexes from source and generates the list of messages as applicable
a) Fetches the schemaDetails from schemaKey of source
Expand Down Expand Up @@ -80,7 +84,7 @@ private Optional<JsonPath> getJsonPath(SchemaKey schemaKey, String attributeName
Optional.ofNullable(compiledPaths.get(schemaKey).get(attributeName)) : Optional.empty();
}

private Map<SchemaKey, LeiaMessage> getMessages(SchemaKey schemaKey, byte[] sourceMessage) {
public Map<SchemaKey, LeiaMessage> getMessages(SchemaKey schemaKey, byte[] sourceMessage) {
final var messages = new HashMap<SchemaKey, LeiaMessage>();
messages.put(schemaKey, LeiaMessage.builder()
.schemaKey(schemaKey)
Expand All @@ -104,9 +108,8 @@ private Map<SchemaKey, LeiaMessage> getMessages(SchemaKey schemaKey, byte[] sour
}

public void processMessages(final SchemaKey schemaKey,
final byte[] sourceMessage,
final UnaryOperator<Map<SchemaKey, LeiaMessage>> messageHandler) {
messageHandler.apply(getMessages(schemaKey, sourceMessage));
final byte[] sourceMessage) {
messageProcessor.get().processMessages(getMessages(schemaKey, sourceMessage));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024. Koushik R <[email protected]>.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.grookage.leia.client.processor;

import com.grookage.leia.models.mux.LeiaMessage;
import com.grookage.leia.models.schema.SchemaKey;

import java.util.Map;

public interface MessageProcessor {

void processMessages(Map<SchemaKey, LeiaMessage> messages);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.grookage.leia.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.grookage.leia.client.processor.MessageProcessor;
import com.grookage.leia.client.refresher.LeiaClientRefresher;
import com.grookage.leia.client.stubs.TargetSchema;
import com.grookage.leia.client.stubs.TestSchema;
import com.grookage.leia.client.stubs.TestSchemaUnit;
import com.grookage.leia.models.ResourceHelper;
import com.grookage.leia.models.mux.LeiaMessage;
import com.grookage.leia.models.schema.SchemaDetails;
import com.grookage.leia.models.schema.SchemaKey;
import com.grookage.leia.validator.LeiaSchemaValidator;
Expand All @@ -31,6 +33,7 @@
import org.mockito.Mockito;

import java.util.List;
import java.util.Map;
import java.util.Optional;

class LeiaMessageProduceClientTest {
Expand Down Expand Up @@ -63,17 +66,17 @@ void testLeiaMessageProduceClient() {
.mapper(new ObjectMapper())
.refresher(clientRefresher)
.schemaValidator(schemaValidator)
.messageProcessor(() -> messages -> {
Assertions.assertFalse(messages.isEmpty());
Assertions.assertEquals(2, messages.size());
})
.build();
schemaClient.start();
final var testSchema = TestSchema.builder()
.userName("testUser")
.schemaUnits(List.of(TestSchemaUnit.builder()
.registeredName("testRegisteredName").build()))
.build();
schemaClient.processMessages(sourceSchema, mapper.writeValueAsBytes(testSchema), messages -> {
Assertions.assertFalse(messages.isEmpty());
Assertions.assertEquals(2, messages.size());
return messages;
});
schemaClient.processMessages(sourceSchema, mapper.writeValueAsBytes(testSchema));
}
}
2 changes: 1 addition & 1 deletion leia-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-dropwizard-es/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-dropwizard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-elastic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-models/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-bom</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-bom</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-refresher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion leia-schema-validator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.grookage.leia</groupId>
<artifactId>leia</artifactId>
<version>0.0.1-RC10</version>
<version>0.0.1-RC11</version>
<packaging>pom</packaging>

<name>Leia</name>
Expand Down

0 comments on commit f6cb05e

Please sign in to comment.