Skip to content

Commit

Permalink
#118 Implement inserting into sharded message table - pass outbox tab…
Browse files Browse the repository at this point in the history
…le suffix to DatabaseIdGenerator
  • Loading branch information
cer committed Sep 10, 2022
1 parent 098eee8 commit 3293a54
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public Int128 genIdInternal() {
}

@Override
public synchronized Int128 genId(Long databaseId) {
public synchronized Int128 genId(Long databaseId, Integer partitionOffset) {
return genIdInternal();
}

@Override
public Optional<Int128> incrementIdIfPossible(Int128 anchorId) {
return Optional.of(genId(null));
return Optional.of(genId(null, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public DatabaseIdGenerator(long serviceId) {
}

@Override
public Int128 genId(Long databaseId) {
public Int128 genId(Long databaseId, Integer partitionOffset) {

if (databaseId == null) {
throw new IllegalArgumentException("database id is required");
}

return new Int128(databaseId, serviceId);
return new Int128(databaseId, serviceId + (partitionOffset == null ? 0 : partitionOffset));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@ public interface IdGenerator {

boolean databaseIdRequired();

Int128 genId(Long databaseId);
Int128 genId(Long databaseId, Integer partitionOffset);

default String genIdAsString(Long databaseId, Integer partitionOffset) {
return genId(databaseId, partitionOffset).asString();
}

default Int128 genId() {
return genId(null, null);
}

default String genIdAsString() {
return genId().asString();
}

Optional<Int128> incrementIdIfPossible(Int128 anchorId);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.common.id;

import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
Expand All @@ -9,27 +10,31 @@

public class ApplicationIdGeneratorTest {

private ApplicationIdGenerator idGen;

@Before
public void setUp() {
idGen = new ApplicationIdGenerator();
}

@Test
public void shouldGenerateId() {
ApplicationIdGenerator idGen = new ApplicationIdGenerator();
Int128 id = idGen.genId(null);
Int128 id = idGen.genId();
assertNotNull(id);
}

@Test
public void shouldGenerateMonotonicId() {
ApplicationIdGenerator idGen = new ApplicationIdGenerator();
Int128 id1 = idGen.genId(null);
Int128 id2 = idGen.genId(null);
Int128 id1 = idGen.genId();
Int128 id2 = idGen.genId();
assertTrue(id1.compareTo(id2) < 0);
}

@Test
public void shouldGenerateLotsOfIds() throws InterruptedException {
ApplicationIdGenerator idGen = new ApplicationIdGenerator();
IntStream.range(1, 1000000).forEach(x -> idGen.genId(null));
IntStream.range(1, 1000000).forEach(x -> idGen.genId());
TimeUnit.SECONDS.sleep(1);
IntStream.range(1, 1000000).forEach(x -> idGen.genId(null));
IntStream.range(1, 1000000).forEach(x -> idGen.genId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

public class DatabaseIdGeneratorTest {

private final long SERVICE_ID = DatabaseIdGenerator.SERVICE_ID_MAX_VALUE / 2;

@Test(expected = IllegalArgumentException.class)
public void shouldThrowAnExceptionOnNegativeInstanceId() {
new DatabaseIdGenerator(-1L);
Expand All @@ -19,11 +21,20 @@ public void shouldThrowAnExceptionOnTooBigInstanceId() {

@Test
public void shouldGenerateAnId() {
IdGenerator idGenerator = new DatabaseIdGenerator(DatabaseIdGenerator.SERVICE_ID_MAX_VALUE / 2);
IdGenerator idGenerator = new DatabaseIdGenerator(SERVICE_ID);

Int128 id = idGenerator.genId(Long.MAX_VALUE);
Int128 id = idGenerator.genId(Long.MAX_VALUE, null);
Assert.assertEquals(SERVICE_ID, id.getLo());
Assert.assertEquals(Long.MAX_VALUE, id.getHi());
}

@Test
public void shouldGenerateAnIdWithPartitionOffset() {
IdGenerator idGenerator = new DatabaseIdGenerator(SERVICE_ID);
int partitionOffset = 1;

Assert.assertEquals(DatabaseIdGenerator.SERVICE_ID_MAX_VALUE / 2, id.getLo());
Int128 id = idGenerator.genId(Long.MAX_VALUE, partitionOffset);
Assert.assertEquals(SERVICE_ID + partitionOffset, id.getLo());
Assert.assertEquals(Long.MAX_VALUE, id.getHi());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ private String insertIntoEventsTable(IdGenerator idGenerator,
metadata.orElse(null),
eventuateJdbcOperationsUtils.booleanToInt(published));

return idGenerator.genId(databaseId).asString();
return idGenerator.genId(databaseId, null).asString();
}
else {
String eventId = idGenerator.genId(null).asString();
String eventId = idGenerator.genId().asString();

eventuateJdbcStatementExecutor
.update(eventuateJdbcOperationsUtils.insertIntoEventsTableApplicationIdSql(eventuateSchema),
Expand Down Expand Up @@ -144,13 +144,13 @@ private String insertIntoMessageTableApplicationId(IdGenerator idGenerator,

headers = new HashMap<>(headers);

String messageId = idGenerator.genId(null).asString();
String messageId = idGenerator.genId(null, outboxPartitionValues.outboxTableSuffix.suffix).asString();

headers.put("ID", messageId);

String serializedHeaders = JSonMapper.toJson(headers);

eventuateJdbcStatementExecutor.update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
eventuateJdbcStatementExecutor.update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
messageId, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition);

return messageId;
Expand All @@ -166,10 +166,10 @@ private String insertIntoMessageTableDatabaseId(IdGenerator idGenerator,
String serializedHeaders = JSonMapper.toJson(headers);


long databaseId = eventuateJdbcStatementExecutor.insertAndReturnGeneratedId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
long databaseId = eventuateJdbcStatementExecutor.insertAndReturnGeneratedId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
MESSAGE_AUTO_GENERATED_ID_COLUMN, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition);

return idGenerator.genId(databaseId).asString();
return idGenerator.genId(databaseId, outboxPartitionValues.outboxTableSuffix.suffix).asString();
}

protected String columnToJson(EventuateSchema eventuateSchema, String column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.commons.lang.builder.HashCodeBuilder;

public class OutboxPartitionValues {
public final String outboxTableSuffix;
public final OutboxTableSuffix outboxTableSuffix;
public final Integer messagePartition;

@Override
Expand All @@ -15,24 +15,24 @@ public boolean equals(Object o) {

OutboxPartitionValues that = (OutboxPartitionValues) o;

return new EqualsBuilder().append(outboxTableSuffix, that.outboxTableSuffix).append(messagePartition, that.messagePartition).isEquals();
return new EqualsBuilder().append(outboxTableSuffix.suffix, that.outboxTableSuffix.suffix).append(messagePartition, that.messagePartition).isEquals();
}

@Override
public String toString() {
return "OutboxPartitionValues{" +
"outboxTableSuffix='" + outboxTableSuffix + '\'' +
"outboxTableSuffix='" + outboxTableSuffix.suffix + '\'' +
", messagePartition=" + messagePartition +
'}';
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(outboxTableSuffix).append(messagePartition).toHashCode();
return new HashCodeBuilder(17, 37).append(outboxTableSuffix.suffix).append(messagePartition).toHashCode();
}

public OutboxPartitionValues(String outboxTableSuffix, Integer messagePartition) {
this.outboxTableSuffix = outboxTableSuffix;
public OutboxPartitionValues(Integer outboxTableSuffix, Integer messagePartition) {
this.outboxTableSuffix = new OutboxTableSuffix(outboxTableSuffix);
this.messagePartition = messagePartition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public OutboxPartitioningSpec(Integer outboxTables, Integer outboxTablePartition
public OutboxPartitionValues outboxTableValues(String destination, String messageKey) {
Integer hash = abs(Objects.hash(destination, messageKey));

String outboxTableSuffix = nullOrOne(outboxTables) || messageKey == null ? "" : Integer.toString(hash % outboxTables);
Integer outboxTableSuffix = nullOrOne(outboxTables) || messageKey == null ? null : hash % outboxTables;
Integer messagePartition = nullOrOne(outboxTablePartitions) || messageKey == null ? null : hash % outboxTablePartitions;

return new OutboxPartitionValues(outboxTableSuffix, messagePartition);
Expand All @@ -33,11 +33,12 @@ private boolean nullOrOne(Integer x) {
return x == null || x == 1;
}

public List<String> outboxTableSuffixes() {
public List<OutboxTableSuffix> outboxTableSuffixes() {
if (nullOrOne(outboxTables))
return Collections.singletonList("");
else
return IntStream.range(0, outboxTables).mapToObj(Integer::toString).collect(Collectors.toList());
return Collections.singletonList(new OutboxTableSuffix(null));
else {
return IntStream.range(0, outboxTables).mapToObj(OutboxTableSuffix::new).collect(Collectors.toList());
}
}

public OutboxPartitioningSpec withOutboxTables(int outboxTables) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.eventuate.common.jdbc;

import java.util.Objects;

public class OutboxTableSuffix {
public final Integer suffix;
public final String suffixAsString;

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OutboxTableSuffix that = (OutboxTableSuffix) o;
return Objects.equals(suffix, that.suffix) && suffixAsString.equals(that.suffixAsString);
}

@Override
public int hashCode() {
return Objects.hash(suffix, suffixAsString);
}

public OutboxTableSuffix(Integer suffix) {
this.suffix = suffix;
this.suffixAsString = suffix == null ? "" : this.suffix.toString();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,41 @@ public class OutboxPartitioningSpecTest {

@Test
public void shouldCalculateDefault() {
assertEquals(new OutboxPartitionValues("", null), values(OutboxPartitioningSpec.DEFAULT));
assertEquals(new OutboxPartitionValues(null, null), values(OutboxPartitioningSpec.DEFAULT));
}

@Test
public void shouldCalculateOneOutboxTable() {
assertEquals(new OutboxPartitionValues("", null), values(new OutboxPartitioningSpec(1, null)));
assertEquals(new OutboxPartitionValues(null, null), values(new OutboxPartitioningSpec(1, null)));
}

@Test
public void shouldCalculateOneMessagePartition() {
assertEquals(new OutboxPartitionValues("", null), values(new OutboxPartitioningSpec(null, 1)));
assertEquals(new OutboxPartitionValues(null, null), values(new OutboxPartitioningSpec(null, 1)));
}

@Test
public void shouldCalculateBothOne() {
assertEquals(new OutboxPartitionValues("", null), values(new OutboxPartitioningSpec(1, 1)));
assertEquals(new OutboxPartitionValues(null, null), values(new OutboxPartitioningSpec(1, 1)));
}

@Test
public void shouldCalculateWithMultipleOutboxTables() {
OutboxPartitioningSpec spec = new OutboxPartitioningSpec(2, null);
assertOneOfExpectedValues(spec, Arrays.asList("0", "1"), null);
assertOneOfExpectedValues(spec, Arrays.asList(0, 1), null);
}

@Test
public void shouldCalculateWithMessagePartition() {
OutboxPartitioningSpec spec = new OutboxPartitioningSpec(null, 2);
assertOneOfExpectedValues(spec, Arrays.asList(""), Arrays.asList(0, 1));
assertOneOfExpectedValues(spec, singletonList(null), Arrays.asList(0, 1));
}

@Test
public void shouldCalculateWithBoth() {
OutboxPartitioningSpec spec = new OutboxPartitioningSpec(2, 2);
assertEquals(new OutboxPartitionValues("0", 0), values(spec));
assertOneOfExpectedValues(spec, Arrays.asList("0", "1"), Arrays.asList(0, 1));
assertEquals(new OutboxPartitionValues(0, 0), values(spec));
assertOneOfExpectedValues(spec, Arrays.asList(0, 1), Arrays.asList(0, 1));
}

private OutboxPartitionValues values(OutboxPartitioningSpec spec) {
Expand All @@ -63,10 +63,10 @@ private OutboxPartitionValues values(OutboxPartitioningSpec spec, int i) {
return spec.outboxTableValues(destination, Integer.toString(i));
}

private void assertOneOfExpectedValues(OutboxPartitioningSpec spec, List<String> expectedSuffixes, List<Integer> expectedPartitions) {
private void assertOneOfExpectedValues(OutboxPartitioningSpec spec, List<Integer> expectedSuffixes, List<Integer> expectedPartitions) {
IntStream.range(0, 100).forEach(i -> {
OutboxPartitionValues actual = values(spec, i);
assertThat(actual.outboxTableSuffix).isIn(expectedSuffixes);
assertThat(actual.outboxTableSuffix.suffix).isIn(expectedSuffixes);

if (expectedPartitions == null)
assertThat(actual.messagePartition).isNull();
Expand All @@ -77,11 +77,12 @@ private void assertOneOfExpectedValues(OutboxPartitioningSpec spec, List<String>

@Test
public void shouldReturnDefaultSuffix() {
assertEquals(singletonList(""), OutboxPartitioningSpec.DEFAULT.outboxTableSuffixes());
assertEquals(singletonList(new OutboxTableSuffix(null)), OutboxPartitioningSpec.DEFAULT.outboxTableSuffixes());
}

@Test
public void shouldReturnSuffixesForMultipleOutboxTables() {
assertEquals(Arrays.asList("0", "1"), new OutboxPartitioningSpec(2, 2).outboxTableSuffixes());
assertEquals(Arrays.asList(new OutboxTableSuffix(0), new OutboxTableSuffix(1)),
new OutboxPartitioningSpec(2, 2).outboxTableSuffixes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ private Mono<String> insertIntoEventsTable(IdGenerator idGenerator,
triggeringEvent.orElse(null),
metadata.orElse(null),
eventuateJdbcOperationsUtils.booleanToInt(published))
.map(id -> idGenerator.genId(id).asString());
.map(id -> idGenerator.genId(id, null).asString());
}
else {
String eventId = idGenerator.genId(null).asString();
String eventId = idGenerator.genIdAsString();

return reactiveJdbcStatementExecutor
.update(eventuateJdbcOperationsUtils.insertIntoEventsTableApplicationIdSql(eventuateSchema),
Expand Down Expand Up @@ -156,14 +156,14 @@ private Mono<String> insertIntoMessageTableApplicationId(IdGenerator idGenerator

headers = new HashMap<>(headers);

String messageId = idGenerator.genId(null).asString();
String messageId = idGenerator.genIdAsString();

headers.put("ID", messageId);

String serializedHeaders = JSonMapper.toJson(headers);

return reactiveJdbcStatementExecutor
.update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
.update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
messageId, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition)
.map(rowsUpdated -> messageId);
}
Expand All @@ -177,9 +177,9 @@ private Mono<String> insertIntoMessageTableDatabaseId(IdGenerator idGenerator,
String serializedHeaders = JSonMapper.toJson(headers);

return reactiveJdbcStatementExecutor
.insertAndReturnId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
.insertAndReturnId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
MESSAGE_AUTO_GENERATED_ID_COLUMN, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition)
.map(id -> idGenerator.genId(id).asString());
.map(id -> idGenerator.genIdAsString(id, outboxPartitionValues.outboxTableSuffix.suffix));
}

public String columnToJson(EventuateSchema eventuateSchema, String column) {
Expand Down
Loading

0 comments on commit 3293a54

Please sign in to comment.