Skip to content

Commit

Permalink
Batched ingest files (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-zli authored Jun 22, 2020
1 parent f3bdd10 commit 81f3554
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public interface SnowflakeIngestionService
/**
* Ingest a list of files
*
* @param fileNames file name set
* @param fileNames file name List
*/
void ingestFiles(Set<String> fileNames);
void ingestFiles(List<String> fileNames);

/**
* @return corresponding stage name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
import net.snowflake.ingest.utils.StagedFileWrapper;

import java.security.PrivateKey;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

import static com.snowflake.kafka.connector.internal.InternalUtils.convertIngestStatus;
import static com.snowflake.kafka.connector.internal.InternalUtils.timestampToDate;
Expand Down Expand Up @@ -71,7 +67,7 @@ public void ingestFile(final String fileName)
}

@Override
public void ingestFiles(final Set<String> fileNames)
public void ingestFiles(final List<String> fileNames)
{
if (fileNames.isEmpty())
{
Expand All @@ -80,7 +76,20 @@ public void ingestFiles(final Set<String> fileNames)
try
{
InternalUtils.backoffAndRetry(telemetry,
() -> ingestManager.ingestFiles(SimpleIngestManager.wrapFilepaths(fileNames), null)
() ->
{
while (fileNames.size() > 0)
{
// Can not send more than 5000 files in one request,
// so batch 4000 as one request
int toIndex = Math.min(4000, fileNames.size());
List<String> fileNamesBatch = fileNames.subList(0, toIndex);
Set<String> fileNamesSet = new HashSet<>(fileNamesBatch);
ingestManager.ingestFiles(SimpleIngestManager.wrapFilepaths(fileNamesSet), null);
fileNamesBatch.clear();
}
return true;
}
);
} catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ private long getOffset()
return committedOffset.get();
}

Set<String> fileNamesCopy = new HashSet<>();
List<String> fileNamesCopy = new ArrayList<>();
fileListLock.lock();
try
{
Expand Down

0 comments on commit 81f3554

Please sign in to comment.