Skip to content

Commit

Permalink
Merge pull request #9 from grookage/optimus
Browse files Browse the repository at this point in the history
Optimizations
  • Loading branch information
koushikr authored Nov 19, 2024
2 parents 36c1df9 + 5d2ac41 commit a7ce743
Show file tree
Hide file tree
Showing 47 changed files with 284 additions and 182 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

All notable changes to this project will be documented in this file.

## [0.0.1-RC5]

- Introduced a PermissionValidator and introduced suppliers for versionGenerator, SchemaUpdater and PermissionValidator,
to facilitate clients implement them via Dependency Injection
- Removed the unnecessary `LeiaMessages` data-model and converged to a Map Structure
- A few linting fixes, along the way

## [0.0.1-RC4]

- Periodic Refresh - Added support to enable/disable periodicRefresh. Default value is true
- Event Multiplexing - Added support for event transformations. Can multiplex one event into multiple events.
- Event Multiplexing - Added support for event transformations. Can multiplex one event into multiple events.

## [0.0.1-RC3]

Expand Down
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Leia is a governance and metadata framework aimed at meeting compliance requirem
<dependency>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-bom</artifactId>
<versio>0.0.1-RC4</version>
<versio>0.0.1-RC5</version>
</dependency>
```

Expand All @@ -44,14 +44,14 @@ Leia is a governance and metadata framework aimed at meeting compliance requirem

### Using the schema registry

#### Build your own dropwizard schema server by using the `LeiaElastic` bundle.
#### Build your own dropwizard schema server by using the `LeiaElastic` bundle.

```
new LeiaElasticBundle<TestConfiguration, SchemaUpdater>() {
@Override
protected SchemaUpdaterResolver<SchemaUpdater> userResolver(TestConfiguration configuration) {
return null;
protected Supplier<SchemaUpdaterResolver<SchemaUpdater>> userResolver(TestConfiguration configuration) {
return () -> new DefaultResolver();
}
@Override
Expand All @@ -60,8 +60,8 @@ Leia is a governance and metadata framework aimed at meeting compliance requirem
}
@Override
protected VersionIDGenerator getVersionIDGenerator() {
return null;
protected Supplier<VersionIDGenerator> getVersionIDGenerator() {
return () -> new DefaultVersionGenerator();
}
@Override
Expand All @@ -71,9 +71,10 @@ Leia is a governance and metadata framework aimed at meeting compliance requirem
}
```

- **SchemaUpdater** is an RBAC governing class, please extend this SchemaUpdater to implement your own UserProfile
- **CacheConfig** - If the schema will be always resolved from the dataStore (Elasticsearch) or from the in-memory cache with a refreshInterval to refresh the data
- **CacheConfig** - If the schema will be always resolved from the dataStore (Elasticsearch) or from the in-memory cache
with a refreshInterval to refresh the data
- **VersionIdGenerator** - Your own version id generator, to generate a unique versionId for every document
- **ElasticConfig** - Elasticsearch configuration to bring up the schema server

Expand Down Expand Up @@ -126,7 +127,10 @@ A sample schema looks like the following
```

- **AttributeInfo** : There are various type of attributes you can define, please refer to the `SchemaAttribute` class.
- **TransformationTargets** - Helps in event multiplexing, in the above example, when provided with the namespace, `testNamespace` and schemaName, `testSchema` with version `V1234`, during message production the `LeiaMessageProduceClient`, will multiplex the testSchema to both the versions, the transformationTargets ought to be jsonPathRules.
- **TransformationTargets** - Helps in event multiplexing, in the above example, when provided with the
namespace, `testNamespace` and schemaName, `testSchema` with version `V1234`, during message production
the `LeiaMessageProduceClient`, will multiplex the testSchema to both the versions, the transformationTargets ought to
be jsonPathRules.

#### Using the LeiaClientBundle

Expand Down
2 changes: 1 addition & 1 deletion leia-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia</artifactId>
<version>0.0.1-RC4</version>
<version>0.0.1-RC5</version>
</parent>

<artifactId>leia-bom</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion leia-client-dropwizard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC4</version>
<version>0.0.1-RC5</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions leia-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC4</version>
<version>0.0.1-RC5</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down Expand Up @@ -81,7 +81,7 @@
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.2.0</version>
<version>[2.9.0,)</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.grookage.leia.client;

import com.grookage.leia.models.mux.LeiaMessage;
import com.grookage.leia.models.mux.LeiaMessages;
import com.grookage.leia.models.schema.SchemaKey;
import com.grookage.leia.models.schema.transformer.TransformationTarget;
import com.jayway.jsonpath.DocumentContext;
Expand All @@ -33,6 +32,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;


@EqualsAndHashCode(callSuper = true)
Expand Down Expand Up @@ -80,29 +80,35 @@ private Optional<JsonPath> getJsonPath(SchemaKey schemaKey, String attributeName
Optional.ofNullable(compiledPaths.get(schemaKey).get(attributeName)) : Optional.empty();
}

@SneakyThrows
public LeiaMessages getMessages(SchemaKey schemaKey, byte[] sourceMessage) {
private Map<SchemaKey, LeiaMessage> getMessages(SchemaKey schemaKey, byte[] sourceMessage) {
final var messages = new HashMap<SchemaKey, LeiaMessage>();
messages.put(schemaKey, LeiaMessage.builder()
.schemaKey(schemaKey)
.message(sourceMessage)
.build()
);
final var sourceSchemaDetails = super.getSchemaDetails()
.stream().filter(each -> each.getSchemaKey().equals(schemaKey))
.findFirst().orElse(null);
final var messages = new LeiaMessages();
messages.add(
LeiaMessage.builder()
.schemaKey(schemaKey)
.message(sourceMessage)
.build()
);

final var transformationTargets = null == sourceSchemaDetails ? null :
sourceSchemaDetails.getTransformationTargets();
if (null == transformationTargets) {
return messages;
}
final var documentContext = JsonPath.parse(new String(sourceMessage));
transformationTargets.forEach(transformationTarget ->
createMessage(documentContext, transformationTarget).ifPresent(messages::add));
createMessage(documentContext, transformationTarget).ifPresent(message ->
messages.put(message.getSchemaKey(), message)));
return messages;
}

public void processMessages(final SchemaKey schemaKey,
final byte[] sourceMessage,
final UnaryOperator<Map<SchemaKey, LeiaMessage>> messageHandler) {
messageHandler.apply(getMessages(schemaKey, sourceMessage));
}

@Override
public void start() {
super.getSchemaDetails().forEach(schemaDetails -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ void testLeiaMessageProduceClient() {
.schemaUnits(List.of(TestSchemaUnit.builder()
.registeredName("testRegisteredName").build()))
.build();
final var messages = schemaClient.getMessages(sourceSchema, mapper.writeValueAsBytes(testSchema));
Assertions.assertFalse(messages.getMessages().isEmpty());
Assertions.assertEquals(2, messages.getMessages().size());
schemaClient.processMessages(sourceSchema, mapper.writeValueAsBytes(testSchema), messages -> {
Assertions.assertFalse(messages.isEmpty());
Assertions.assertEquals(2, messages.size());
return messages;
});
}
}
2 changes: 1 addition & 1 deletion leia-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.grookage.leia</groupId>
<artifactId>leia-parent</artifactId>
<version>0.0.1-RC4</version>
<version>0.0.1-RC5</version>
<relativePath>../leia-parent</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

@Slf4j
public class SchemaProcessorHub {

private final Map<SchemaEvent, SchemaProcessor> processors = Maps.newHashMap();
private SchemaRepository schemaRepository;
private VersionIDGenerator versionIDGenerator;
private Supplier<SchemaRepository> repositorySupplier;
private Supplier<VersionIDGenerator> versionSupplier;

private SchemaProcessorHub() {

Expand All @@ -44,19 +45,19 @@ public static SchemaProcessorHub of() {
return new SchemaProcessorHub();
}

public SchemaProcessorHub withSchemaRepository(SchemaRepository schemaRepository) {
this.schemaRepository = schemaRepository;
public SchemaProcessorHub withRepositoryResolver(Supplier<SchemaRepository> repositorySupplier) {
this.repositorySupplier = repositorySupplier;
return this;
}

public SchemaProcessorHub withVersionIDGenerator(VersionIDGenerator versionIDGenerator) {
this.versionIDGenerator = versionIDGenerator;
public SchemaProcessorHub wtihVersionSupplier(Supplier<VersionIDGenerator> versionSupplier) {
this.versionSupplier = versionSupplier;
return this;
}

public SchemaProcessorHub build() {
Preconditions.checkNotNull(schemaRepository, "Schema Repository can't be null");
Preconditions.checkNotNull(versionIDGenerator, "Version ID Generator can't be null");
Preconditions.checkNotNull(repositorySupplier, "Schema Repository can't be null");
Preconditions.checkNotNull(versionSupplier, "Version ID Generator can't be null");
Arrays.stream(SchemaEvent.values()).forEach(this::buildProcessor);
return this;
}
Expand All @@ -66,32 +67,32 @@ public void buildProcessor(final SchemaEvent event) {
@Override
public SchemaProcessor schemaCreate() {
return CreateSchemaProcessor.builder()
.schemaRepository(schemaRepository)
.versionIDGenerator(versionIDGenerator)
.repositorySupplier(repositorySupplier)
.versionSupplier(versionSupplier)
.build();
}

@Override
public SchemaProcessor schemaUpdate() {
return UpdateSchemaProcessor.builder()
.schemaRepository(schemaRepository)
.versionIDGenerator(versionIDGenerator)
.repositorySupplier(repositorySupplier)
.versionSupplier(versionSupplier)
.build();
}

@Override
public SchemaProcessor schemaApprove() {
return ApproveSchemaProcessor.builder()
.schemaRepository(schemaRepository)
.versionIDGenerator(versionIDGenerator)
.repositorySupplier(repositorySupplier)
.versionSupplier(versionSupplier)
.build();
}

@Override
public SchemaProcessor schemaReject() {
return RejectSchemaProcessor.builder()
.schemaRepository(schemaRepository)
.versionIDGenerator(versionIDGenerator)
.repositorySupplier(repositorySupplier)
.versionSupplier(versionSupplier)
.build();
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SchemaEvent name() {
public void process(SchemaContext context) {
final var schemaKey = context.getContext(SchemaKey.class)
.orElseThrow((Supplier<Throwable>) () -> LeiaException.error(LeiaErrorCode.VALUE_NOT_FOUND));
final var storedSchema = getSchemaRepository().get(schemaKey).orElse(null);
final var storedSchema = getRepositorySupplier().get().get(schemaKey).orElse(null);
if (null == storedSchema || storedSchema.getSchemaState() != SchemaState.CREATED) {
log.error("There are no stored schemas present with namespace {}, version {} and schemaName {}. Please try updating them instead",
schemaKey.getNamespace(),
Expand All @@ -62,7 +62,7 @@ public void process(SchemaContext context) {
storedSchema.getSchemaMeta().setUpdatedByEmail(email);
storedSchema.getSchemaMeta().setUpdatedAt(System.currentTimeMillis());
storedSchema.setSchemaState(SchemaState.APPROVED);
getSchemaRepository().update(storedSchema);
getRepositorySupplier().get().update(storedSchema);
context.addContext(SchemaDetails.class.getSimpleName(), storedSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public SchemaEvent name() {
public void process(SchemaContext context) {
final var createSchemaRequest = context.getContext(CreateSchemaRequest.class)
.orElseThrow((Supplier<Throwable>) () -> LeiaException.error(LeiaErrorCode.VALUE_NOT_FOUND));
final var storedSchemas = getSchemaRepository()
final var storedSchemas = getRepositorySupplier()
.get()
.get(createSchemaRequest.getNamespace(), createSchemaRequest.getSchemaName());
if (!storedSchemas.isEmpty() && storedSchemas.stream()
.anyMatch(each -> each.getSchemaState() == SchemaState.CREATED)) {
Expand All @@ -59,8 +60,8 @@ public void process(SchemaContext context) {
}
final var userName = ContextUtils.getUser(context);
final var email = ContextUtils.getEmail(context);
final var schemaDetails = SchemaUtils.toSchemaDetails(createSchemaRequest, userName, email, getVersionIDGenerator());
getSchemaRepository().create(schemaDetails);
final var schemaDetails = SchemaUtils.toSchemaDetails(createSchemaRequest, userName, email, getVersionSupplier());
getRepositorySupplier().get().create(schemaDetails);
context.addContext(SchemaDetails.class.getSimpleName(), schemaDetails);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public SchemaEvent name() {
public void process(SchemaContext context) {
final var schemaKey = context.getContext(SchemaKey.class)
.orElseThrow((Supplier<Throwable>) () -> LeiaException.error(LeiaErrorCode.VALUE_NOT_FOUND));
final var storedSchema = getSchemaRepository().get(schemaKey).orElse(null);
final var storedSchema = getRepositorySupplier().get().get(schemaKey).orElse(null);
if (null == storedSchema || storedSchema.getSchemaState() != SchemaState.CREATED) {
log.error("There are no stored schemas present with namespace {}, version {} and schemaName {}. Please try updating them instead",
schemaKey.getNamespace(),
Expand All @@ -61,7 +61,7 @@ public void process(SchemaContext context) {
storedSchema.getSchemaMeta().setUpdatedByEmail(email);
storedSchema.getSchemaMeta().setUpdatedAt(System.currentTimeMillis());
storedSchema.setSchemaState(SchemaState.REJECTED);
getSchemaRepository().update(storedSchema);
getRepositorySupplier().get().update(storedSchema);
context.addContext(SchemaDetails.class.getSimpleName(), storedSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.function.Supplier;

@SuperBuilder
@Data
@AllArgsConstructor
public abstract class SchemaProcessor {

private final SchemaRepository schemaRepository;
private final VersionIDGenerator versionIDGenerator;
private final Supplier<SchemaRepository> repositorySupplier;
private final Supplier<VersionIDGenerator> versionSupplier;

public abstract SchemaEvent name();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public SchemaEvent name() {
public void process(SchemaContext context) {
final var updateSchemaRequest = context.getContext(UpdateSchemaRequest.class)
.orElseThrow((Supplier<Throwable>) () -> LeiaException.error(LeiaErrorCode.VALUE_NOT_FOUND));
final var storedSchema = getSchemaRepository()
final var storedSchema = getRepositorySupplier()
.get()
.get(SchemaKey.builder()
.version(updateSchemaRequest.getVersion())
.schemaName(updateSchemaRequest.getSchemaName())
Expand All @@ -73,7 +74,7 @@ public void process(SchemaContext context) {
if (null != updateSchemaRequest.getTransformationTargets()) {
storedSchema.setTransformationTargets(updateSchemaRequest.getTransformationTargets());
}
getSchemaRepository().update(storedSchema);
getRepositorySupplier().get().update(storedSchema);
context.addContext(SchemaDetails.class.getSimpleName(), storedSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ public static String getEmail(final SchemaContext schemaContext) {
return schemaContext.getValue(EMAIL)
.orElseThrow((Supplier<Throwable>) () -> LeiaException.error(LeiaErrorCode.VALUE_NOT_FOUND));
}

}
Loading

0 comments on commit a7ce743

Please sign in to comment.