Skip to content

Commit

Permalink
Keep DropChannelResponse private
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Dec 14, 2023
1 parent 201deb0 commit 062a45d
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import net.snowflake.ingest.utils.Utils;

/** A class that is used to open/create a {@link SnowflakeStreamingIngestChannel} */
/** A class that is used to drop a {@link SnowflakeStreamingIngestChannel} */
public class DropChannelRequest {
// Name of the channel
private final String channelName;
Expand All @@ -20,13 +20,14 @@ public class DropChannelRequest {
// Name of the table that the channel belongs to
private final String tableName;

// Optional client sequencer to verify when dropping the channel.
private final Long clientSequencer;

public static DropChannelRequestBuilder builder(String channelName) {
return new DropChannelRequestBuilder(channelName);
}

/** A builder class to build a OpenChannelRequest */
/** A builder class to build a DropChannelRequest */
public static class DropChannelRequestBuilder {
private final String channelName;
private String dbName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface SnowflakeStreamingIngestClient extends AutoCloseable {
*
* @param request the drop channel request
*/
DropChannelResponse dropChannel(DropChannelRequest request);
void dropChannel(DropChannelRequest request);

/**
* Get the client name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
}

@Override
public DropChannelResponse dropChannel(DropChannelRequest request) {
public void dropChannel(DropChannelRequest request) {
if (isClosed) {
throw new SFException(ErrorCode.CLOSED_CLIENT);
}
Expand Down Expand Up @@ -404,7 +404,6 @@ public DropChannelResponse dropChannel(DropChannelRequest request) {
request.getFullyQualifiedTableName(),
request.getClientSequencer(),
getName());
return response;

} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,15 +849,12 @@ public void testDropOnClose() throws Exception {

Assert.assertFalse(channel.isClosed());
DropChannelResponse dropChannelResponse = new DropChannelResponse();
Mockito.doReturn(dropChannelResponse).when(client).dropChannel(Mockito.any());
Mockito.doNothing().when(client).dropChannel(Mockito.any());
channel.close().get();
Assert.assertTrue(channel.isClosed());
Mockito.verify(client, Mockito.times(1))
.dropChannel(
argThat(
(DropChannelRequest req) ->
req.getChannelName().equals(channel.getName())
&& req.getClientSequencer().equals(channel.getChannelSequencer())));
argThat((DropChannelRequest req) -> req.getChannelName().equals(channel.getName()) && req.getClientSequencer().equals(channel.getChannelSequencer())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package net.snowflake.ingest.streaming.internal;

import static java.time.ZoneOffset.UTC;
import static net.snowflake.ingest.utils.Constants.ACCOUNT_URL;
import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.MAX_STREAMING_INGEST_API_CHANNEL_RETRY;
import static net.snowflake.ingest.utils.Constants.PRIVATE_KEY;
import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL;
import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS;
import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.Constants.USER;
import static net.snowflake.ingest.utils.Constants.*;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_SNOWPIPE_STREAMING_METRICS;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -65,6 +57,7 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class SnowflakeStreamingIngestClientTest {
Expand Down Expand Up @@ -412,8 +405,12 @@ public void testDropChannel() throws Exception {
.setTableName("table")
.setSchemaName("schema")
.build();
DropChannelResponse result = client.dropChannel(request);
Assert.assertEquals(response.getMessage(), result.getMessage());
client.dropChannel(request);
Mockito.verify(requestBuilder)
.generateStreamingIngestPostRequest(
ArgumentMatchers.contains("channel"),
ArgumentMatchers.refEq(DROP_CHANNEL_ENDPOINT),
ArgumentMatchers.refEq("drop channel"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testDropChannel() throws Exception {
.generateStreamingIngestPostRequest(
ArgumentMatchers.contains("channel"),
ArgumentMatchers.refEq(DROP_CHANNEL_ENDPOINT),
ArgumentMatchers.refEq("drop_channel"));
ArgumentMatchers.refEq("drop channel"));
}

@Test
Expand Down

0 comments on commit 062a45d

Please sign in to comment.