Skip to content

Commit

Permalink
SNOW-163247 remove flusher (#152)
Browse files Browse the repository at this point in the history
* Removed flusher

* Refactored connector SNOW-163247

* Modified pressure test and reduced calls to list stage
  • Loading branch information
sfc-gh-zli authored Jun 9, 2020
1 parent cb20c25 commit a9ab61f
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void put(final Collection<SinkRecord> records)
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records",
this.id, records.size()));
//log more info may impact performance
records.forEach(getSink()::insert);
getSink().insert(records);
}

/**
Expand All @@ -220,6 +220,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets)
throws RetriableException
{
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit", this.id));

if (sink == null || sink.isClosed())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ static String fileName(String appName, String table, int partition,
return fileName(filePrefix(appName, table, partition), start, end);
}

// Used for testing only
static String fileName(String appName, String table, int partition,
long start, long end, long time)
{
return filePrefix(appName, table, partition) + start + "_" + end + "_" + time + ".json.gz";
}

/**
* generate file name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,15 @@ public void put(final String stageName, final String fileName,
InputStream input = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
try
{
sfconn.uploadStream(stageName,
FileNameUtils.getPrefixFromFileName(fileName), input,
FileNameUtils.removePrefixAndGZFromFileName(fileName), true);
} catch (SQLException e)
InternalUtils.backoffAndRetry(telemetry,
() ->
{
sfconn.uploadStream(stageName,
FileNameUtils.getPrefixFromFileName(fileName), input,
FileNameUtils.removePrefixAndGZFromFileName(fileName), true);
return true;
});
} catch (Exception e)
{
throw SnowflakeErrors.ERROR_2003.getException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public void ingestFile(final String fileName)
@Override
public void ingestFiles(final Set<String> fileNames)
{
if (fileNames.isEmpty())
{
return;
}
try
{
InternalUtils.backoffAndRetry(telemetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@ public interface SnowflakeSinkService
*/
void startTask(String tableName, String topic, int partition);

/**
* call pipe to insert a collections of JSON records
* will trigger time based flush
* @param records record content
*/
void insert(final Collection<SinkRecord> records);

/**
* call pipe to insert a JSON record
* will not trigger time based flush
* @param record record content
*/
void insert(SinkRecord record);
void insert(final SinkRecord record);

/**
* retrieve offset of last loaded record for given pipe name
Expand All @@ -36,6 +44,11 @@ public interface SnowflakeSinkService
*/
long getOffset(TopicPartition topicPartition);

/**
* used for testing only
*/
void callAllGetOffset();

/**
* terminate all tasks and close this service instance
*/
Expand Down
Loading

0 comments on commit a9ab61f

Please sign in to comment.