Skip to content

Commit

Permalink
WIP finish PscWriterITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Sep 26, 2024
1 parent a2cfd2e commit bd41598
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class PscWriter<IN>
private final SinkWriterMetricGroup metricGroup;
private final boolean disabledMetrics;
private final Counter numRecordsOutCounter;
private final Counter numRecordsSendCounter;
private final Counter numBytesOutCounter;
private final Counter numRecordsOutErrorsCounter;
private final ProcessingTimeService timeService;
Expand Down Expand Up @@ -158,7 +159,8 @@ class PscWriter<IN>
this.timeService = sinkInitContext.getProcessingTimeService();
this.metricGroup = sinkInitContext.metricGroup();
this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter = metricGroup.getNumRecordsSendCounter();
this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.pscSinkContext =
new DefaultPscSinkContext(
Expand Down Expand Up @@ -204,6 +206,7 @@ public void write(IN element, Context context) throws IOException {
throw new RuntimeException(e);
}
numRecordsOutCounter.inc();
numRecordsSendCounter.inc();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testGetTransactions() throws ConfigurationException, ConsumerExcepti
lingeringTransaction(4);

final PscTransactionLog transactionLog =
new PscTransactionLog(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString(), getKafkaClientConfiguration());
new PscTransactionLog(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString(), getPscClientConfiguration());
final List<PscTransactionLog.TransactionRecord> transactions = transactionLog.getTransactions();
assertThat(
transactions,
Expand Down Expand Up @@ -162,7 +162,7 @@ private static String buildTransactionalId(long id) {
}

private static PscProducer<byte[], Integer> createProducer(String transactionalId) throws ConfigurationException, ProducerException {
final Properties producerProperties = getKafkaClientConfiguration();
final Properties producerProperties = getPscClientConfiguration();
producerProperties.put(
PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
producerProperties.put(
Expand All @@ -172,7 +172,7 @@ private static PscProducer<byte[], Integer> createProducer(String transactionalI
return new PscProducer<>(PscConfigurationUtils.propertiesToPscConfiguration(producerProperties));
}

private static Properties getKafkaClientConfiguration() {
private static Properties getPscClientConfiguration() {
final Properties standardProps = new Properties();
// standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "flink-tests");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package com.pinterest.flink.connector.psc.sink;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.exception.startup.ConfigurationException;
Expand Down Expand Up @@ -73,6 +76,7 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs;
import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer;
import static com.pinterest.flink.connector.psc.testutils.PscUtil.drainAllRecordsFromTopic;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
Expand All @@ -85,9 +89,10 @@ public class PscWriterITCase {
private static final Logger LOG = LoggerFactory.getLogger(PscWriterITCase.class);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total";
private static final String PSC_METRIC_WITH_GROUP_NAME = "PscProducer.incoming-byte-total";
private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext();
private String topic;
private String topicUriStr;

private MetricListener metricListener;
private TriggerTimeService timeService;
Expand All @@ -113,14 +118,16 @@ public void setUp(TestInfo testInfo) {
metricListener = new MetricListener();
timeService = new TriggerTimeService();
topic = testInfo.getDisplayName().replaceAll("\\W", "");
topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
}

@ParameterizedTest
@EnumSource(DeliveryGuarantee.class)
public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception {
try (final PscWriter<Integer> ignored =
createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue();
createWriterWithConfiguration(getPscClientConfiguration(), guarantee)) {
ignored.write(1, SINK_WRITER_CONTEXT); // write one record to trigger backendProducer creation
assertThat(metricListener.getGauge(PSC_METRIC_WITH_GROUP_NAME).isPresent()).isTrue();
}
}

Expand All @@ -140,7 +147,7 @@ public void testIncreasingRecordBasedCounters() throws Exception {
metricListener.getMetricGroup(), operatorIOMetricGroup);
try (final PscWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
getPscClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
Expand All @@ -165,13 +172,9 @@ public void testCurrentSendTimeMetric() throws Exception {
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
try (final PscWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(),
getPscClientConfiguration(),
DeliveryGuarantee.AT_LEAST_ONCE,
metricGroup)) {
final Optional<Gauge<Long>> currentSendTime =
metricListener.getGauge("currentSendTime");
assertThat(currentSendTime.isPresent()).isTrue();
assertThat(currentSendTime.get().getValue()).isEqualTo(0L);
IntStream.range(0, 100)
.forEach(
(run) -> {
Expand All @@ -185,13 +188,17 @@ public void testCurrentSendTimeMetric() throws Exception {
throw new RuntimeException("Failed writing Kafka record.");
}
});
Thread.sleep(500L);
final Optional<Gauge<Long>> currentSendTime =
metricListener.getGauge("currentSendTime");
assertThat(currentSendTime.isPresent()).isTrue();
assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
}
}

@Test
void testNumRecordsOutErrorsCounterMetric() throws Exception {
Properties properties = getKafkaClientConfiguration();
Properties properties = getPscClientConfiguration();
final InternalSinkWriterMetricGroup metricGroup =
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());

Expand All @@ -210,8 +217,8 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception {
new FlinkPscInternalProducer<>(properties, transactionalId)) {

producer.initTransactions();
// producer.beginTransaction();
producer.send(new PscProducerMessage<>(topic, "2".getBytes()));
producer.beginTransaction();
producer.send(new PscProducerMessage<>(topicUriStr, "2".getBytes()));
producer.commitTransaction();
}

Expand All @@ -227,14 +234,14 @@ public void testMetadataPublisher() throws Exception {
List<String> metadataList = new ArrayList<>();
try (final PscWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(),
getPscClientConfiguration(),
DeliveryGuarantee.AT_LEAST_ONCE,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
meta -> metadataList.add(meta.toString()))) {
meta -> metadataList.add(meta.getTopicUriPartition().getTopicUriAsString() + "-" + meta.getTopicUriPartition().getPartition() + "@" + meta.getOffset()))) {
List<String> expected = new ArrayList<>();
for (int i = 0; i < 100; i++) {
writer.write(1, SINK_WRITER_CONTEXT);
expected.add("testMetadataPublisher-0@" + i);
expected.add(topicUriStr + "-0@" + i);
}
writer.flush(false);
assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected);
Expand All @@ -246,7 +253,7 @@ public void testMetadataPublisher() throws Exception {
void testLingeringTransaction() throws Exception {
final PscWriter<Integer> failedWriter =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE);
getPscClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE);

// create two lingering transactions
failedWriter.flush(false);
Expand All @@ -258,7 +265,7 @@ void testLingeringTransaction() throws Exception {

try (final PscWriter<Integer> recoveredWriter =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
getPscClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
recoveredWriter.write(1, SINK_WRITER_CONTEXT);

recoveredWriter.flush(false);
Expand All @@ -271,7 +278,7 @@ void testLingeringTransaction() throws Exception {
committable.getProducer().get().getObject().commitTransaction();

List<PscConsumerMessage<byte[], byte[]>> records =
drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true);
drainAllRecordsFromTopic(topicUriStr, getPscClientConfiguration(), true);
assertThat(records).hasSize(1);
}

Expand All @@ -286,7 +293,7 @@ void testLingeringTransaction() throws Exception {
mode = EnumSource.Mode.EXCLUDE)
void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exception {
try (final PscWriter<Integer> writer =
createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
createWriterWithConfiguration(getPscClientConfiguration(), guarantee)) {
assertThat(writer.getProducerPool()).hasSize(0);

FlinkPscInternalProducer<byte[], byte[]> firstProducer = writer.getCurrentProducer();
Expand All @@ -307,7 +314,7 @@ void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exce
void usePoolForTransactional() throws Exception {
try (final PscWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
getPscClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
assertThat(writer.getProducerPool()).hasSize(0);

writer.flush(false);
Expand Down Expand Up @@ -348,11 +355,11 @@ void usePoolForTransactional() throws Exception {
*/
@Test
void testAbortOnClose() throws Exception {
Properties properties = getKafkaClientConfiguration();
Properties properties = getPscClientConfiguration();
try (final PscWriter<Integer> writer =
createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) {
writer.write(1, SINK_WRITER_CONTEXT);
assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(0);
assertThat(drainAllRecordsFromTopic(topicUriStr, properties, true)).hasSize(0);
}

try (final PscWriter<Integer> writer =
Expand All @@ -372,18 +379,18 @@ void testAbortOnClose() throws Exception {
producer.commitTransaction();
}

assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(1);
assertThat(drainAllRecordsFromTopic(topicUriStr, properties, true)).hasSize(1);
}
}

private void assertKafkaMetricNotPresent(
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
final Properties config = getKafkaClientConfiguration();
final Properties config = getPscClientConfiguration();
config.put(configKey, configValue);
try (final PscWriter<Integer> ignored =
createWriterWithConfiguration(config, guarantee)) {
Assertions.assertFalse(
metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent());
metricListener.getGauge(PSC_METRIC_WITH_GROUP_NAME).isPresent());
}
}

Expand Down Expand Up @@ -421,14 +428,17 @@ private PscWriter<Integer> createWriterWithConfiguration(
}
}

private static Properties getKafkaClientConfiguration() {
private static Properties getPscClientConfiguration() {
final Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put("group.id", "kafkaWriter-tests");
standardProps.put("enable.auto.commit", false);
standardProps.put("key.serializer", ByteArraySerializer.class.getName());
standardProps.put("value.serializer", ByteArraySerializer.class.getName());
standardProps.put("auto.offset.reset", "earliest");
standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "pscWriter-tests");
standardProps.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false);
standardProps.put(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
standardProps.put(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName());
standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "pscWriter-tests");
standardProps.put(PscConfiguration.PSC_AUTO_RESOLUTION_ENABLED, false);
standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
standardProps.put(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
injectDiscoveryConfigs(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
return standardProps;
}

Expand Down Expand Up @@ -498,7 +508,7 @@ private class DummyRecordSerializer implements PscRecordSerializationSchema<Inte
@Override
public PscProducerMessage<byte[], byte[]> serialize(
Integer element, PscSinkContext context, Long timestamp) {
return new PscProducerMessage<>(topic, ByteBuffer.allocate(4).putInt(element).array());
return new PscProducerMessage<>(topicUriStr, ByteBuffer.allocate(4).putInt(element).array());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,10 +780,7 @@ private void handleNullTransactionManager() throws ProducerException {
public Map<MetricName, ? extends Metric> metrics() throws ProducerException {
if (kafkaProducer == null)
handleUninitializedKafkaProducer("metrics()");

if (metricValueProvider.getMetrics().isEmpty()) {
reportProducerMetrics();
}
reportProducerMetrics();
return metricValueProvider.getMetrics();
}

Expand Down

0 comments on commit bd41598

Please sign in to comment.