Skip to content

Commit

Permalink
fix: STREAMP-10186: Fixing schema delete cascade for stream delete ev…
Browse files Browse the repository at this point in the history
…ent (#350)
  • Loading branch information
ojhagaurov authored Feb 7, 2024
1 parent 76d61ad commit 4617305
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.expediagroup.streamplatform.streamregistry.graphql.StateHelper.maintainState;

import java.util.Optional;

import lombok.RequiredArgsConstructor;

import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -63,12 +65,12 @@ public ConsumerBinding upsert(ConsumerBindingKeyInput key, SpecificationInput sp

@Override
public Boolean delete(ConsumerBindingKeyInput key) {
if (checkExistEnabled) {
consumerBindingView.get(key.asConsumerBindingKey()).ifPresent(consumerBindingService::delete);
} else {
ConsumerBinding consumerBinding = new ConsumerBinding(key.asConsumerBindingKey(), StateHelper.specification(), StateHelper.status());
consumerBindingService.delete(consumerBinding);
}
Optional<ConsumerBinding> consumerBinding = consumerBindingView.get(key.asConsumerBindingKey());
consumerBinding.ifPresentOrElse(consumerBindingService::delete, () -> {
if (!checkExistEnabled) {
consumerBindingService.delete(new ConsumerBinding(key.asConsumerBindingKey(), StateHelper.specification(), StateHelper.status()));
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.expediagroup.streamplatform.streamregistry.graphql.StateHelper.maintainState;

import java.util.Optional;

import lombok.RequiredArgsConstructor;

import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -63,12 +65,12 @@ public Consumer upsert(ConsumerKeyInput key, SpecificationInput specification) {

@Override
public Boolean delete(ConsumerKeyInput key) {
if (checkExistEnabled) {
consumerView.get(key.asConsumerKey()).ifPresent(consumerService::delete);
} else {
Consumer consumer = new Consumer(key.asConsumerKey(), StateHelper.specification(), StateHelper.status());
consumerService.delete(consumer);
}
Optional<Consumer> consumer = consumerView.get(key.asConsumerKey());
consumer.ifPresentOrElse(consumerService::delete, () -> {
if (!checkExistEnabled) {
consumerService.delete(new Consumer(key.asConsumerKey(), StateHelper.specification(), StateHelper.status()));
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.expediagroup.streamplatform.streamregistry.graphql.StateHelper.maintainState;

import java.util.Optional;

import lombok.RequiredArgsConstructor;

import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -63,12 +65,12 @@ public ProducerBinding upsert(ProducerBindingKeyInput key, SpecificationInput sp

@Override
public Boolean delete(ProducerBindingKeyInput key) {
if (checkExistEnabled) {
producerBindingView.get(key.asProducerBindingKey()).ifPresent(producerBindingService::delete);
} else {
ProducerBinding producerBinding = new ProducerBinding(key.asProducerBindingKey(), StateHelper.specification(), StateHelper.status());
producerBindingService.delete(producerBinding);
}
Optional<ProducerBinding> producerBinding = producerBindingView.get(key.asProducerBindingKey());
producerBinding.ifPresentOrElse(producerBindingService::delete, () -> {
if (!checkExistEnabled) {
producerBindingService.delete(new ProducerBinding(key.asProducerBindingKey(), StateHelper.specification(), StateHelper.status()));
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.expediagroup.streamplatform.streamregistry.graphql.StateHelper.maintainState;

import java.util.Optional;

import lombok.RequiredArgsConstructor;

import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -63,12 +65,12 @@ public Producer upsert(ProducerKeyInput key, SpecificationInput specification) {

@Override
public Boolean delete(ProducerKeyInput key) {
if (checkExistEnabled) {
producerView.get(key.asProducerKey()).ifPresent(producerService::delete);
} else {
Producer producer = new Producer(key.asProducerKey(), StateHelper.specification(), StateHelper.status());
producerService.delete(producer);
}
Optional<Producer> producer = producerView.get(key.asProducerKey());
producer.ifPresentOrElse(producerService::delete, () -> {
if (!checkExistEnabled) {
producerService.delete(new Producer(key.asProducerKey(), StateHelper.specification(), StateHelper.status()));
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.expediagroup.streamplatform.streamregistry.graphql.StateHelper.maintainState;

import java.util.Optional;

import lombok.RequiredArgsConstructor;

import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -63,12 +65,12 @@ public StreamBinding upsert(StreamBindingKeyInput key, SpecificationInput specif

@Override
public Boolean delete(StreamBindingKeyInput key) {
if (checkExistEnabled) {
streamBindingView.get(key.asStreamBindingKey()).ifPresent(streamBindingService::delete);
} else {
StreamBinding streamBinding = new StreamBinding(key.asStreamBindingKey(), StateHelper.specification(), StateHelper.status());
streamBindingService.delete(streamBinding);
}
Optional<StreamBinding> streamBinding = streamBindingView.get(key.asStreamBindingKey());
streamBinding.ifPresentOrElse(streamBindingService::delete, () -> {
if (!checkExistEnabled) {
streamBindingService.delete(new StreamBinding(key.asStreamBindingKey(), StateHelper.specification(), StateHelper.status()));
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public Stream upsert(StreamKeyInput key, SpecificationInput specification, Schem

@Override
public Boolean delete(StreamKeyInput key) {
if (checkExistEnabled) {
streamView.get(key.asStreamKey()).ifPresent(streamService::delete);
} else {
Stream stream = new Stream(key.asStreamKey(), StateHelper.schemaKey(), StateHelper.specification(), StateHelper.status());
streamService.delete(stream);
}
Optional<Stream> stream = streamView.get(key.asStreamKey());
stream.ifPresentOrElse(streamService::delete, () -> {
if (!checkExistEnabled) {
streamService.delete(new Stream(key.asStreamKey(), StateHelper.schemaKey(), StateHelper.specification(), StateHelper.status()));
}
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,47 @@ public void before() throws Exception {
public void deleteWithCheckExistEnabledWhenEntityExists() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", true);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
when(consumerBindingView.get(any())).thenReturn(Optional.of(getConsumer(key)));
Optional<ConsumerBinding> consumerBinding = Optional.of(getConsumer(key));
when(consumerBindingView.get(any())).thenReturn(consumerBinding);
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingService, times(1)).delete(any());
verify(consumerBindingView, times(1)).get(any());
verify(consumerBindingView, times(1)).get(key.asConsumerBindingKey());
verify(consumerBindingService, times(1)).delete(consumerBinding.get());
assertTrue(result);
}

@Test
public void deleteWithCheckExistEnabledWhenEntityDoesNotExist() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", true);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
Optional<ConsumerBinding> consumerBinding = Optional.of(getConsumer(key));
when(consumerBindingView.get(any())).thenReturn(Optional.empty());
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingView, times(1)).get(key.asConsumerBindingKey());
verify(consumerBindingService, times(0)).delete(any());
verify(consumerBindingView, times(1)).get(any());
assertTrue(result);
}

@Test
public void deleteWithCheckExistDisabled() {
public void deleteWithCheckExistDisabledWhenEntityExists() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", false);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
Optional<ConsumerBinding> consumerBinding = Optional.of(getConsumer(key));
when(consumerBindingView.get(any())).thenReturn(consumerBinding);
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingView, times(1)).get(key.asConsumerBindingKey());
verify(consumerBindingService, times(1)).delete(consumerBinding.get());
assertTrue(result);
}

@Test
public void deleteWithCheckExistDisabledWhenEntityDoesNotExist() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", false);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
Optional<ConsumerBinding> consumerBinding = Optional.of(getConsumer(key));
when(consumerBindingView.get(any())).thenReturn(Optional.empty());
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingService, times(1)).delete(getConsumer(key));
verify(consumerBindingView, times(0)).get(any());
verify(consumerBindingView, times(1)).get(key.asConsumerBindingKey());
verify(consumerBindingService, times(1)).delete(any());
assertTrue(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void deleteWithCheckExistEnabledWhenEntityExists() {
ConsumerKeyInput key = getConsumerInputKey();
when(consumerView.get(any())).thenReturn(Optional.of(getConsumer(key)));
Boolean result = consumerMutation.delete(key);
verify(consumerService, times(1)).delete(any());
verify(consumerView, times(1)).get(any());
verify(consumerView, times(1)).get(key.asConsumerKey());
verify(consumerService, times(1)).delete(getConsumer(key));
assertTrue(result);
}

Expand All @@ -69,18 +69,30 @@ public void deleteWithCheckExistEnabledWhenEntityDoesNotExist() {
ConsumerKeyInput key = getConsumerInputKey();
when(consumerView.get(any())).thenReturn(Optional.empty());
Boolean result = consumerMutation.delete(key);
verify(consumerView, times(1)).get(key.asConsumerKey());
verify(consumerService, times(0)).delete(any());
verify(consumerView, times(1)).get(any());
assertTrue(result);
}

@Test
public void deleteWithCheckExistDisabled() {
public void deleteWithCheckExistDisabledWhenEntityExists() {
ReflectionTestUtils.setField(consumerMutation, "checkExistEnabled", false);
ConsumerKeyInput key = getConsumerInputKey();
when(consumerView.get(any())).thenReturn(Optional.of(getConsumer(key)));
Boolean result = consumerMutation.delete(key);
verify(consumerView, times(1)).get(key.asConsumerKey());
verify(consumerService, times(1)).delete(getConsumer(key));
assertTrue(result);
}

@Test
public void deleteWithCheckExistDisabledWhenEntityDoesNotExist() {
ReflectionTestUtils.setField(consumerMutation, "checkExistEnabled", false);
ConsumerKeyInput key = getConsumerInputKey();
when(consumerView.get(any())).thenReturn(Optional.of(getConsumer(key)));
Boolean result = consumerMutation.delete(key);
verify(consumerView, times(1)).get(key.asConsumerKey());
verify(consumerService, times(1)).delete(getConsumer(key));
verify(consumerView, times(0)).get(any());
assertTrue(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,72 +30,84 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;

import com.expediagroup.streamplatform.streamregistry.core.services.ConsumerBindingService;
import com.expediagroup.streamplatform.streamregistry.core.views.ConsumerBindingView;
import com.expediagroup.streamplatform.streamregistry.core.services.ProducerBindingService;
import com.expediagroup.streamplatform.streamregistry.core.views.ProducerBindingView;
import com.expediagroup.streamplatform.streamregistry.graphql.StateHelper;
import com.expediagroup.streamplatform.streamregistry.graphql.model.inputs.ConsumerBindingKeyInput;
import com.expediagroup.streamplatform.streamregistry.model.ConsumerBinding;
import com.expediagroup.streamplatform.streamregistry.graphql.model.inputs.ProducerBindingKeyInput;
import com.expediagroup.streamplatform.streamregistry.model.ProducerBinding;

@RunWith(MockitoJUnitRunner.class)
public class ProducerBindingMutationImplTest {

@Mock
private ConsumerBindingService consumerBindingService;
private ProducerBindingService producerBindingService;

@Mock
private ConsumerBindingView consumerBindingView;
private ProducerBindingView producerBindingView;

private ConsumerBindingMutationImpl consumerBindingMutation;
private ProducerBindingMutationImpl producerBindingMutation;

@Before
public void before() throws Exception {
consumerBindingMutation = new ConsumerBindingMutationImpl(consumerBindingService, consumerBindingView);
producerBindingMutation = new ProducerBindingMutationImpl(producerBindingService, producerBindingView);
}

@Test
public void deleteWithCheckExistEnabledWhenEntityExists() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", true);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
when(consumerBindingView.get(any())).thenReturn(Optional.of(getConsumer(key)));
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingService, times(1)).delete(any());
verify(consumerBindingView, times(1)).get(any());
ReflectionTestUtils.setField(producerBindingMutation, "checkExistEnabled", true);
ProducerBindingKeyInput key = getproducerBindingInputKey();
when(producerBindingView.get(any())).thenReturn(Optional.of(getProducer(key)));
Boolean result = producerBindingMutation.delete(key);
verify(producerBindingView, times(1)).get(key.asProducerBindingKey());
verify(producerBindingService, times(1)).delete(any());
assertTrue(result);
}

@Test
public void deleteWithCheckExistEnabledWhenEntityDoesNotExist() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", true);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
when(consumerBindingView.get(any())).thenReturn(Optional.empty());
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingService, times(0)).delete(any());
verify(consumerBindingView, times(1)).get(any());
ReflectionTestUtils.setField(producerBindingMutation, "checkExistEnabled", true);
ProducerBindingKeyInput key = getproducerBindingInputKey();
when(producerBindingView.get(any())).thenReturn(Optional.empty());
Boolean result = producerBindingMutation.delete(key);
verify(producerBindingView, times(1)).get(key.asProducerBindingKey());
verify(producerBindingService, times(0)).delete(any());
assertTrue(result);
}

@Test
public void deleteWithCheckExistDisabled() {
ReflectionTestUtils.setField(consumerBindingMutation, "checkExistEnabled", false);
ConsumerBindingKeyInput key = getConsumerBindingInputKey();
Boolean result = consumerBindingMutation.delete(key);
verify(consumerBindingService, times(1)).delete(getConsumer(key));
verify(consumerBindingView, times(0)).get(any());
public void deleteWithCheckExistDisabledWhenEntityExists() {
ReflectionTestUtils.setField(producerBindingMutation, "checkExistEnabled", false);
ProducerBindingKeyInput key = getproducerBindingInputKey();
when(producerBindingView.get(any())).thenReturn(Optional.of(getProducer(key)));
Boolean result = producerBindingMutation.delete(key);
verify(producerBindingView, times(1)).get(key.asProducerBindingKey());
verify(producerBindingService, times(1)).delete(getProducer(key));
assertTrue(result);
}

private ConsumerBindingKeyInput getConsumerBindingInputKey() {
return ConsumerBindingKeyInput.builder()
@Test
public void deleteWithCheckExistDisabledWhenEntityDoesNotExist() {
ReflectionTestUtils.setField(producerBindingMutation, "checkExistEnabled", false);
ProducerBindingKeyInput key = getproducerBindingInputKey();
when(producerBindingView.get(any())).thenReturn(Optional.empty());
Boolean result = producerBindingMutation.delete(key);
verify(producerBindingView, times(1)).get(key.asProducerBindingKey());
verify(producerBindingService, times(1)).delete(getProducer(key));
assertTrue(result);
}

private ProducerBindingKeyInput getproducerBindingInputKey() {
return ProducerBindingKeyInput.builder()
.streamDomain("domain")
.streamName("stream")
.streamVersion(1)
.infrastructureZone("zone")
.infrastructureName("infrastructure")
.consumerName("consumer")
.producerName("producer")
.build();
}

private ConsumerBinding getConsumer(ConsumerBindingKeyInput key) {
return new ConsumerBinding(key.asConsumerBindingKey(), StateHelper.specification(), StateHelper.status());
private ProducerBinding getProducer(ProducerBindingKeyInput key) {
return new ProducerBinding(key.asProducerBindingKey(), StateHelper.specification(), StateHelper.status());
}
}
Loading

0 comments on commit 4617305

Please sign in to comment.