From 3293a54d28b52f89f56d83906c107e0d4a4cbe2c Mon Sep 17 00:00:00 2001
From: Chris Richardson <chris@chrisrichardson.net>
Date: Sat, 10 Sep 2022 09:47:24 -0700
Subject: [PATCH] #118 Implement inserting into sharded message table - pass
 outbox table suffix to DatabaseIdGenerator

---
 .../common/id/ApplicationIdGenerator.java     |  4 +--
 .../common/id/DatabaseIdGenerator.java        |  4 +--
 .../io/eventuate/common/id/IdGenerator.java   | 14 +++++++++-
 .../common/id/ApplicationIdGeneratorTest.java | 21 +++++++++------
 .../common/id/DatabaseIdGeneratorTest.java    | 17 +++++++++---
 .../jdbc/EventuateCommonJdbcOperations.java   | 12 ++++-----
 .../common/jdbc/OutboxPartitionValues.java    | 12 ++++-----
 .../common/jdbc/OutboxPartitioningSpec.java   | 11 ++++----
 .../common/jdbc/OutboxTableSuffix.java        | 27 +++++++++++++++++++
 .../jdbc/OutboxPartitioningSpecTest.java      | 25 ++++++++---------
 ...EventuateCommonReactiveJdbcOperations.java | 12 ++++-----
 .../EventuateSpringJdbcStatementExecutor.java |  6 +++++
 12 files changed, 114 insertions(+), 51 deletions(-)
 create mode 100644 eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxTableSuffix.java

diff --git a/eventuate-common-id/src/main/java/io/eventuate/common/id/ApplicationIdGenerator.java b/eventuate-common-id/src/main/java/io/eventuate/common/id/ApplicationIdGenerator.java
index a208c08e..1cc28d82 100644
--- a/eventuate-common-id/src/main/java/io/eventuate/common/id/ApplicationIdGenerator.java
+++ b/eventuate-common-id/src/main/java/io/eventuate/common/id/ApplicationIdGenerator.java
@@ -79,12 +79,12 @@ public Int128 genIdInternal() {
   }
 
   @Override
-  public synchronized Int128 genId(Long databaseId) {
+  public synchronized Int128 genId(Long databaseId, Integer partitionOffset) {
     return genIdInternal();
   }
 
   @Override
   public Optional<Int128> incrementIdIfPossible(Int128 anchorId) {
-    return Optional.of(genId(null));
+    return Optional.of(genId(null, null));
   }
 }
diff --git a/eventuate-common-id/src/main/java/io/eventuate/common/id/DatabaseIdGenerator.java b/eventuate-common-id/src/main/java/io/eventuate/common/id/DatabaseIdGenerator.java
index b71141e1..53c6b990 100644
--- a/eventuate-common-id/src/main/java/io/eventuate/common/id/DatabaseIdGenerator.java
+++ b/eventuate-common-id/src/main/java/io/eventuate/common/id/DatabaseIdGenerator.java
@@ -23,13 +23,13 @@ public DatabaseIdGenerator(long serviceId) {
   }
 
   @Override
-  public Int128 genId(Long databaseId) {
+  public Int128 genId(Long databaseId, Integer partitionOffset) {
 
     if (databaseId == null) {
       throw new IllegalArgumentException("database id is required");
     }
 
-    return new Int128(databaseId, serviceId);
+    return new Int128(databaseId, serviceId + (partitionOffset == null ? 0 : partitionOffset));
   }
 
   @Override
diff --git a/eventuate-common-id/src/main/java/io/eventuate/common/id/IdGenerator.java b/eventuate-common-id/src/main/java/io/eventuate/common/id/IdGenerator.java
index 662d1190..fe98c39c 100644
--- a/eventuate-common-id/src/main/java/io/eventuate/common/id/IdGenerator.java
+++ b/eventuate-common-id/src/main/java/io/eventuate/common/id/IdGenerator.java
@@ -6,7 +6,19 @@ public interface IdGenerator {
 
   boolean databaseIdRequired();
 
-  Int128 genId(Long databaseId);
+  Int128 genId(Long databaseId, Integer partitionOffset);
+
+  default String genIdAsString(Long databaseId, Integer partitionOffset) {
+    return genId(databaseId, partitionOffset).asString();
+  }
+
+  default Int128 genId() {
+    return genId(null, null);
+  }
+
+  default String genIdAsString() {
+    return genId().asString();
+  }
 
   Optional<Int128> incrementIdIfPossible(Int128 anchorId);
 }
diff --git a/eventuate-common-id/src/test/java/io/eventuate/common/id/ApplicationIdGeneratorTest.java b/eventuate-common-id/src/test/java/io/eventuate/common/id/ApplicationIdGeneratorTest.java
index 5d8ac7d5..f5e1b771 100644
--- a/eventuate-common-id/src/test/java/io/eventuate/common/id/ApplicationIdGeneratorTest.java
+++ b/eventuate-common-id/src/test/java/io/eventuate/common/id/ApplicationIdGeneratorTest.java
@@ -1,5 +1,6 @@
 package io.eventuate.common.id;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -9,27 +10,31 @@
 
 public class ApplicationIdGeneratorTest {
 
+  private ApplicationIdGenerator idGen;
+
+  @Before
+  public void setUp() {
+    idGen = new ApplicationIdGenerator();
+  }
+
   @Test
   public void shouldGenerateId() {
-    ApplicationIdGenerator idGen = new ApplicationIdGenerator();
-    Int128 id = idGen.genId(null);
+    Int128 id = idGen.genId();
     assertNotNull(id);
   }
 
   @Test
   public void shouldGenerateMonotonicId() {
-    ApplicationIdGenerator idGen = new ApplicationIdGenerator();
-    Int128 id1 = idGen.genId(null);
-    Int128 id2 = idGen.genId(null);
+    Int128 id1 = idGen.genId();
+    Int128 id2 = idGen.genId();
     assertTrue(id1.compareTo(id2) < 0);
   }
 
   @Test
   public void shouldGenerateLotsOfIds() throws InterruptedException {
-    ApplicationIdGenerator idGen = new ApplicationIdGenerator();
-    IntStream.range(1, 1000000).forEach(x -> idGen.genId(null));
+    IntStream.range(1, 1000000).forEach(x -> idGen.genId());
     TimeUnit.SECONDS.sleep(1);
-    IntStream.range(1, 1000000).forEach(x -> idGen.genId(null));
+    IntStream.range(1, 1000000).forEach(x -> idGen.genId());
   }
 
 }
\ No newline at end of file
diff --git a/eventuate-common-id/src/test/java/io/eventuate/common/id/DatabaseIdGeneratorTest.java b/eventuate-common-id/src/test/java/io/eventuate/common/id/DatabaseIdGeneratorTest.java
index 1f0e9fad..cf746fb0 100644
--- a/eventuate-common-id/src/test/java/io/eventuate/common/id/DatabaseIdGeneratorTest.java
+++ b/eventuate-common-id/src/test/java/io/eventuate/common/id/DatabaseIdGeneratorTest.java
@@ -7,6 +7,8 @@
 
 public class DatabaseIdGeneratorTest {
 
+  private final long SERVICE_ID = DatabaseIdGenerator.SERVICE_ID_MAX_VALUE / 2;
+
   @Test(expected = IllegalArgumentException.class)
   public void shouldThrowAnExceptionOnNegativeInstanceId() {
     new DatabaseIdGenerator(-1L);
@@ -19,11 +21,20 @@ public void shouldThrowAnExceptionOnTooBigInstanceId() {
 
   @Test
   public void shouldGenerateAnId() {
-    IdGenerator idGenerator = new DatabaseIdGenerator(DatabaseIdGenerator.SERVICE_ID_MAX_VALUE / 2);
+    IdGenerator idGenerator = new DatabaseIdGenerator(SERVICE_ID);
 
-    Int128 id = idGenerator.genId(Long.MAX_VALUE);
+    Int128 id = idGenerator.genId(Long.MAX_VALUE, null);
+    Assert.assertEquals(SERVICE_ID, id.getLo());
+    Assert.assertEquals(Long.MAX_VALUE, id.getHi());
+  }
+
+  @Test
+  public void shouldGenerateAnIdWithPartitionOffset() {
+    IdGenerator idGenerator = new DatabaseIdGenerator(SERVICE_ID);
+    int partitionOffset = 1;
 
-    Assert.assertEquals(DatabaseIdGenerator.SERVICE_ID_MAX_VALUE / 2, id.getLo());
+    Int128 id = idGenerator.genId(Long.MAX_VALUE, partitionOffset);
+    Assert.assertEquals(SERVICE_ID + partitionOffset, id.getLo());
     Assert.assertEquals(Long.MAX_VALUE, id.getHi());
   }
 
diff --git a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/EventuateCommonJdbcOperations.java b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/EventuateCommonJdbcOperations.java
index 67e59293..18f83436 100644
--- a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/EventuateCommonJdbcOperations.java
+++ b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/EventuateCommonJdbcOperations.java
@@ -79,10 +79,10 @@ private String insertIntoEventsTable(IdGenerator idGenerator,
                       metadata.orElse(null),
                       eventuateJdbcOperationsUtils.booleanToInt(published));
 
-      return idGenerator.genId(databaseId).asString();
+      return idGenerator.genId(databaseId, null).asString();
     }
     else {
-      String eventId = idGenerator.genId(null).asString();
+      String eventId = idGenerator.genId().asString();
 
       eventuateJdbcStatementExecutor
               .update(eventuateJdbcOperationsUtils.insertIntoEventsTableApplicationIdSql(eventuateSchema),
@@ -144,13 +144,13 @@ private String insertIntoMessageTableApplicationId(IdGenerator idGenerator,
 
     headers = new HashMap<>(headers);
 
-    String messageId = idGenerator.genId(null).asString();
+    String messageId = idGenerator.genId(null, outboxPartitionValues.outboxTableSuffix.suffix).asString();
 
     headers.put("ID", messageId);
 
     String serializedHeaders = JSonMapper.toJson(headers);
 
-    eventuateJdbcStatementExecutor.update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
+    eventuateJdbcStatementExecutor.update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
             messageId, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition);
 
     return messageId;
@@ -166,10 +166,10 @@ private String insertIntoMessageTableDatabaseId(IdGenerator idGenerator,
     String serializedHeaders = JSonMapper.toJson(headers);
 
 
-    long databaseId = eventuateJdbcStatementExecutor.insertAndReturnGeneratedId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
+    long databaseId = eventuateJdbcStatementExecutor.insertAndReturnGeneratedId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
             MESSAGE_AUTO_GENERATED_ID_COLUMN, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition);
 
-    return idGenerator.genId(databaseId).asString();
+    return idGenerator.genId(databaseId, outboxPartitionValues.outboxTableSuffix.suffix).asString();
   }
 
   protected String columnToJson(EventuateSchema eventuateSchema, String column) {
diff --git a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitionValues.java b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitionValues.java
index 5ab8626a..293b7102 100644
--- a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitionValues.java
+++ b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitionValues.java
@@ -4,7 +4,7 @@
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
 public class OutboxPartitionValues {
-    public final String outboxTableSuffix;
+    public final OutboxTableSuffix outboxTableSuffix;
     public final Integer messagePartition;
 
     @Override
@@ -15,24 +15,24 @@ public boolean equals(Object o) {
 
         OutboxPartitionValues that = (OutboxPartitionValues) o;
 
-        return new EqualsBuilder().append(outboxTableSuffix, that.outboxTableSuffix).append(messagePartition, that.messagePartition).isEquals();
+        return new EqualsBuilder().append(outboxTableSuffix.suffix, that.outboxTableSuffix.suffix).append(messagePartition, that.messagePartition).isEquals();
     }
 
     @Override
     public String toString() {
         return "OutboxPartitionValues{" +
-                "outboxTableSuffix='" + outboxTableSuffix + '\'' +
+                "outboxTableSuffix='" + outboxTableSuffix.suffix + '\'' +
                 ", messagePartition=" + messagePartition +
                 '}';
     }
 
     @Override
     public int hashCode() {
-        return new HashCodeBuilder(17, 37).append(outboxTableSuffix).append(messagePartition).toHashCode();
+        return new HashCodeBuilder(17, 37).append(outboxTableSuffix.suffix).append(messagePartition).toHashCode();
     }
 
-    public OutboxPartitionValues(String outboxTableSuffix, Integer messagePartition) {
-        this.outboxTableSuffix = outboxTableSuffix;
+    public OutboxPartitionValues(Integer outboxTableSuffix, Integer messagePartition) {
+        this.outboxTableSuffix = new OutboxTableSuffix(outboxTableSuffix);
         this.messagePartition = messagePartition;
     }
 }
diff --git a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitioningSpec.java b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitioningSpec.java
index a5479099..1998720d 100644
--- a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitioningSpec.java
+++ b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxPartitioningSpec.java
@@ -23,7 +23,7 @@ public OutboxPartitioningSpec(Integer outboxTables, Integer outboxTablePartition
     public OutboxPartitionValues outboxTableValues(String destination, String messageKey) {
         Integer hash = abs(Objects.hash(destination, messageKey));
 
-        String outboxTableSuffix = nullOrOne(outboxTables) || messageKey == null ? "" : Integer.toString(hash % outboxTables);
+        Integer outboxTableSuffix = nullOrOne(outboxTables) || messageKey == null ? null : hash % outboxTables;
         Integer messagePartition = nullOrOne(outboxTablePartitions) || messageKey == null ? null : hash % outboxTablePartitions;
 
         return new OutboxPartitionValues(outboxTableSuffix, messagePartition);
@@ -33,11 +33,12 @@ private boolean nullOrOne(Integer x) {
         return x == null || x == 1;
     }
 
-    public List<String> outboxTableSuffixes() {
+    public List<OutboxTableSuffix> outboxTableSuffixes() {
         if (nullOrOne(outboxTables))
-            return Collections.singletonList("");
-        else
-            return IntStream.range(0, outboxTables).mapToObj(Integer::toString).collect(Collectors.toList());
+            return Collections.singletonList(new OutboxTableSuffix(null));
+        else {
+            return IntStream.range(0, outboxTables).mapToObj(OutboxTableSuffix::new).collect(Collectors.toList());
+        }
     }
 
     public OutboxPartitioningSpec withOutboxTables(int outboxTables) {
diff --git a/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxTableSuffix.java b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxTableSuffix.java
new file mode 100644
index 00000000..777b6566
--- /dev/null
+++ b/eventuate-common-jdbc/src/main/java/io/eventuate/common/jdbc/OutboxTableSuffix.java
@@ -0,0 +1,27 @@
+package io.eventuate.common.jdbc;
+
+import java.util.Objects;
+
+public class OutboxTableSuffix {
+    public final Integer suffix;
+    public final String suffixAsString;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        OutboxTableSuffix that = (OutboxTableSuffix) o;
+        return Objects.equals(suffix, that.suffix) && suffixAsString.equals(that.suffixAsString);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(suffix, suffixAsString);
+    }
+
+    public OutboxTableSuffix(Integer suffix) {
+        this.suffix = suffix;
+        this.suffixAsString = suffix == null ? "" : this.suffix.toString();
+
+    }
+}
\ No newline at end of file
diff --git a/eventuate-common-jdbc/src/test/java/io/eventuate/common/jdbc/OutboxPartitioningSpecTest.java b/eventuate-common-jdbc/src/test/java/io/eventuate/common/jdbc/OutboxPartitioningSpecTest.java
index c019baa5..c62dc9dc 100644
--- a/eventuate-common-jdbc/src/test/java/io/eventuate/common/jdbc/OutboxPartitioningSpecTest.java
+++ b/eventuate-common-jdbc/src/test/java/io/eventuate/common/jdbc/OutboxPartitioningSpecTest.java
@@ -18,41 +18,41 @@ public class OutboxPartitioningSpecTest {
 
     @Test
     public void shouldCalculateDefault() {
-        assertEquals(new OutboxPartitionValues("", null), values(OutboxPartitioningSpec.DEFAULT));
+        assertEquals(new OutboxPartitionValues(null, null), values(OutboxPartitioningSpec.DEFAULT));
     }
 
     @Test
     public void shouldCalculateOneOutboxTable() {
-        assertEquals(new OutboxPartitionValues("", null), values(new OutboxPartitioningSpec(1, null)));
+        assertEquals(new OutboxPartitionValues(null, null), values(new OutboxPartitioningSpec(1, null)));
     }
 
     @Test
     public void shouldCalculateOneMessagePartition() {
-        assertEquals(new OutboxPartitionValues("", null), values(new OutboxPartitioningSpec(null, 1)));
+        assertEquals(new OutboxPartitionValues(null, null), values(new OutboxPartitioningSpec(null, 1)));
     }
 
     @Test
     public void shouldCalculateBothOne() {
-        assertEquals(new OutboxPartitionValues("", null), values(new OutboxPartitioningSpec(1, 1)));
+        assertEquals(new OutboxPartitionValues(null, null), values(new OutboxPartitioningSpec(1, 1)));
     }
 
     @Test
     public void shouldCalculateWithMultipleOutboxTables() {
         OutboxPartitioningSpec spec = new OutboxPartitioningSpec(2, null);
-        assertOneOfExpectedValues(spec, Arrays.asList("0", "1"), null);
+        assertOneOfExpectedValues(spec, Arrays.asList(0, 1), null);
     }
 
     @Test
     public void shouldCalculateWithMessagePartition() {
         OutboxPartitioningSpec spec = new OutboxPartitioningSpec(null, 2);
-        assertOneOfExpectedValues(spec, Arrays.asList(""), Arrays.asList(0, 1));
+        assertOneOfExpectedValues(spec, singletonList(null), Arrays.asList(0, 1));
     }
 
     @Test
     public void shouldCalculateWithBoth() {
         OutboxPartitioningSpec spec = new OutboxPartitioningSpec(2, 2);
-        assertEquals(new OutboxPartitionValues("0", 0), values(spec));
-        assertOneOfExpectedValues(spec, Arrays.asList("0", "1"), Arrays.asList(0, 1));
+        assertEquals(new OutboxPartitionValues(0, 0), values(spec));
+        assertOneOfExpectedValues(spec, Arrays.asList(0, 1), Arrays.asList(0, 1));
     }
 
     private OutboxPartitionValues values(OutboxPartitioningSpec spec) {
@@ -63,10 +63,10 @@ private OutboxPartitionValues values(OutboxPartitioningSpec spec, int i) {
         return spec.outboxTableValues(destination, Integer.toString(i));
     }
 
-    private void assertOneOfExpectedValues(OutboxPartitioningSpec spec, List<String> expectedSuffixes, List<Integer> expectedPartitions) {
+    private void assertOneOfExpectedValues(OutboxPartitioningSpec spec, List<Integer> expectedSuffixes, List<Integer> expectedPartitions) {
         IntStream.range(0, 100).forEach(i -> {
             OutboxPartitionValues actual = values(spec, i);
-            assertThat(actual.outboxTableSuffix).isIn(expectedSuffixes);
+            assertThat(actual.outboxTableSuffix.suffix).isIn(expectedSuffixes);
 
             if (expectedPartitions == null)
                 assertThat(actual.messagePartition).isNull();
@@ -77,11 +77,12 @@ private void assertOneOfExpectedValues(OutboxPartitioningSpec spec, List<String>
 
     @Test
     public void shouldReturnDefaultSuffix() {
-        assertEquals(singletonList(""), OutboxPartitioningSpec.DEFAULT.outboxTableSuffixes());
+        assertEquals(singletonList(new OutboxTableSuffix(null)), OutboxPartitioningSpec.DEFAULT.outboxTableSuffixes());
     }
 
     @Test
     public void shouldReturnSuffixesForMultipleOutboxTables() {
-        assertEquals(Arrays.asList("0", "1"), new OutboxPartitioningSpec(2, 2).outboxTableSuffixes());
+        assertEquals(Arrays.asList(new OutboxTableSuffix(0), new OutboxTableSuffix(1)),
+                new OutboxPartitioningSpec(2, 2).outboxTableSuffixes());
     }
 }
\ No newline at end of file
diff --git a/eventuate-common-reactive-jdbc/src/main/java/io/eventuate/common/reactive/jdbc/EventuateCommonReactiveJdbcOperations.java b/eventuate-common-reactive-jdbc/src/main/java/io/eventuate/common/reactive/jdbc/EventuateCommonReactiveJdbcOperations.java
index edd55cef..95921e7e 100644
--- a/eventuate-common-reactive-jdbc/src/main/java/io/eventuate/common/reactive/jdbc/EventuateCommonReactiveJdbcOperations.java
+++ b/eventuate-common-reactive-jdbc/src/main/java/io/eventuate/common/reactive/jdbc/EventuateCommonReactiveJdbcOperations.java
@@ -90,10 +90,10 @@ private Mono<String> insertIntoEventsTable(IdGenerator idGenerator,
                       triggeringEvent.orElse(null),
                       metadata.orElse(null),
                       eventuateJdbcOperationsUtils.booleanToInt(published))
-              .map(id -> idGenerator.genId(id).asString());
+              .map(id -> idGenerator.genId(id, null).asString());
     }
     else {
-      String eventId = idGenerator.genId(null).asString();
+      String eventId = idGenerator.genIdAsString();
 
       return reactiveJdbcStatementExecutor
               .update(eventuateJdbcOperationsUtils.insertIntoEventsTableApplicationIdSql(eventuateSchema),
@@ -156,14 +156,14 @@ private Mono<String> insertIntoMessageTableApplicationId(IdGenerator idGenerator
 
     headers = new HashMap<>(headers);
 
-    String messageId = idGenerator.genId(null).asString();
+    String messageId = idGenerator.genIdAsString();
 
     headers.put("ID", messageId);
 
     String serializedHeaders = JSonMapper.toJson(headers);
 
     return reactiveJdbcStatementExecutor
-            .update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
+            .update(eventuateJdbcOperationsUtils.insertIntoMessageTableApplicationIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
                     messageId, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition)
             .map(rowsUpdated -> messageId);
   }
@@ -177,9 +177,9 @@ private Mono<String> insertIntoMessageTableDatabaseId(IdGenerator idGenerator,
     String serializedHeaders = JSonMapper.toJson(headers);
 
     return reactiveJdbcStatementExecutor
-            .insertAndReturnId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix),
+            .insertAndReturnId(eventuateJdbcOperationsUtils.insertIntoMessageTableDbIdSql(eventuateSchema, this::columnToJson, outboxPartitionValues.outboxTableSuffix.suffixAsString),
                     MESSAGE_AUTO_GENERATED_ID_COLUMN, destination, serializedHeaders, payload, eventuateJdbcOperationsUtils.booleanToInt(published), outboxPartitionValues.messagePartition)
-            .map(id -> idGenerator.genId(id).asString());
+            .map(id -> idGenerator.genIdAsString(id, outboxPartitionValues.outboxTableSuffix.suffix));
   }
 
   public String columnToJson(EventuateSchema eventuateSchema, String column) {
diff --git a/eventuate-common-spring-jdbc/src/main/java/io/eventuate/common/spring/jdbc/EventuateSpringJdbcStatementExecutor.java b/eventuate-common-spring-jdbc/src/main/java/io/eventuate/common/spring/jdbc/EventuateSpringJdbcStatementExecutor.java
index dfb1b1ba..8eac5851 100644
--- a/eventuate-common-spring-jdbc/src/main/java/io/eventuate/common/spring/jdbc/EventuateSpringJdbcStatementExecutor.java
+++ b/eventuate-common-spring-jdbc/src/main/java/io/eventuate/common/spring/jdbc/EventuateSpringJdbcStatementExecutor.java
@@ -3,6 +3,8 @@
 import io.eventuate.common.jdbc.EventuateDuplicateKeyException;
 import io.eventuate.common.jdbc.EventuateJdbcStatementExecutor;
 import io.eventuate.common.jdbc.EventuateRowMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.dao.DuplicateKeyException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.support.GeneratedKeyHolder;
@@ -10,6 +12,7 @@
 
 import java.sql.PreparedStatement;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -20,9 +23,12 @@ public class EventuateSpringJdbcStatementExecutor implements EventuateJdbcStatem
   public EventuateSpringJdbcStatementExecutor(JdbcTemplate jdbcTemplate) {
     this.jdbcTemplate = jdbcTemplate;
   }
+    private Logger logger = LoggerFactory.getLogger(getClass());
 
   @Override
   public long insertAndReturnGeneratedId(String sql, String idColumn, Object... params) {
+    if (logger.isDebugEnabled())
+      logger.debug("insertAndReturnGeneratedId {} {} {}", sql, idColumn, Arrays.asList(params));
     try {
       KeyHolder holder = new GeneratedKeyHolder();
       jdbcTemplate.update(connection -> {