-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Code refactor for Iceberg support #792
Conversation
* Add parameters & disable interleaving mode
… & configure response (#773)
Co-authored-by: Hitesh Madan <[email protected]>
src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java
Outdated
Show resolved
Hide resolved
* @param <T> The type of chunk data | ||
* @param <TLocation> the type of location that's being managed (internal stage / external volume) | ||
*/ | ||
interface StorageManager<T, TLocation> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: IStorageManager, since its an interface? (similarly for all other interfaces?)
I don't think this is something we've really followed consistently everywhere else, but check with Toby on whether this is something we want to start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, IStorageManager is better
@@ -228,8 +225,14 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea | |||
this.setupMetricsForClient(); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a JIRA to track passing this snowflakeServiceClient all the way into OAuthClient, which also does RPC with the snowflake service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jira created.
...ain/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
Show resolved
Hide resolved
...ain/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
Outdated
Show resolved
Hide resolved
} | ||
DropChannelRequestInternal dropChannelRequest = | ||
new DropChannelRequestInternal( | ||
this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should getNextRequestId kind of behavior be in the storage manager perhaps? Can be a future PR, just curious what you think @sfc-gh-tzhang ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, then counter can be moved to storageManager as well
src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java
Outdated
Show resolved
Hide resolved
...ain/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java
Show resolved
Hide resolved
@@ -161,10 +157,6 @@ public String getOffsetToken() { | |||
return this.offsetToken; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
please undo these changes. you're changing public API surface of a released SDK.
-
@sfc-gh-tzhang do we have CI tools to catch changes to public API surface? If not lets add a backlog item to catch accidental API surface area changes before a release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undo changes.
@@ -26,7 +26,7 @@ public enum ErrorCode { | |||
INVALID_ENCRYPTED_KEY("0018"), | |||
INVALID_DATA_IN_CHUNK("0019"), | |||
IO_ERROR("0020"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like error code actually gets exposed to customers via SFException.getErrorCode().
Thus this is a breaking change, please don't rename an error code.
@sfc-gh-tzhang pl confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I tagged you in another thread, please don't modify existing error codes, just add new ones, see #794 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted change.
src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java
Outdated
Show resolved
Hide resolved
a3148c0
to
28dea75
Compare
src/main/java/net/snowflake/ingest/streaming/internal/FileLocationInfo.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, otherwise LGTM, thanks!
} | ||
DropChannelRequestInternal dropChannelRequest = | ||
new DropChannelRequestInternal( | ||
this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, then counter can be moved to storageManager as well
* @param <T> The type of chunk data | ||
* @param <TLocation> the type of location that's being managed (internal stage / external volume) | ||
*/ | ||
interface StorageManager<T, TLocation> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, IStorageManager is better
* The StreamingIngestRequest interface is a marker interface used for type safety in the {@link | ||
* SnowflakeServiceClient} for streaming ingest API request. | ||
*/ | ||
interface StreamingIngestRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IStreamingIngestRequest?
@@ -26,7 +26,7 @@ public enum ErrorCode { | |||
INVALID_ENCRYPTED_KEY("0018"), | |||
INVALID_DATA_IN_CHUNK("0019"), | |||
IO_ERROR("0020"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I tagged you in another thread, please don't modify existing error codes, just add new ones, see #794 (comment)
Co-authored-by: Hitesh Madan <[email protected]>
The Iceberg support PR on feature branch includes some refactor. Merge the refactor code to reduce conflict when merging feature branch in the future.
This PR includes following changes
StreamingIngestStage
intoStorageManager
andStreamingIngestStorage
. TheStorageManager
is responsible for managing storage. This is useful for streaming to Iceberg which aStorageManager
could manage multiple storages.SnowflakeServiceClient
to handle all api calls to server. As we are going to add some endpoints for Iceberg table streaming, having an all-in-one place api caller makes development easier.