Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test-logs (#17) #68

Draft
wants to merge 1 commit into
base: 2.4.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ public void startPartition(final String tableName, final TopicPartition topicPar
pipeName,
conn,
topicPartition.partition(),
cleanerServiceExecutor));
cleanerServiceExecutor,
nameIndex
)
);
}
}

Expand Down Expand Up @@ -376,6 +379,7 @@ private class ServiceContext {
private final String pipeName;
private final SnowflakeConnectionService conn;
private final SnowflakeIngestionService ingestionService;
private final String nameIndex;
private List<String> fileNames;

// Includes a list of files:
Expand Down Expand Up @@ -421,7 +425,10 @@ private ServiceContext(
String pipeName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
ScheduledExecutorService v2CleanerExecutor,
String nameIndex
) {
this.nameIndex = nameIndex;
this.pipeName = pipeName;
this.tableName = tableName;
this.stageName = stageName;
Expand Down Expand Up @@ -470,7 +477,9 @@ private ServiceContext(
ingestionService,
pipeStatus,
telemetryService,
v2CleanerExecutor);
v2CleanerExecutor,
nameIndex
);
this.stageFileProcessorClient = processor.trackFilesAsync();
this.cleanerExecutor = null;
this.reprocessCleanerExecutor = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class StageFilesProcessor {
// can
// save us from hitting "too many requests - 429 status code"
private static final long CLEANUP_PERIOD_SECONDS = 61;
private final String nameIndex;

/**
* Client interface for the StageFileProcessor - allows thread safe registration of new files and
Expand Down Expand Up @@ -103,7 +104,9 @@ public StageFilesProcessor(
SnowflakeIngestionService ingestionService,
SnowflakeTelemetryPipeStatus pipeTelemetry,
SnowflakeTelemetryService telemetryService,
ScheduledExecutorService schedulingExecutor) {
ScheduledExecutorService schedulingExecutor,
String nameIndex
) {
this(
pipeName,
tableName,
Expand All @@ -114,7 +117,9 @@ public StageFilesProcessor(
pipeTelemetry,
telemetryService,
schedulingExecutor,
System::currentTimeMillis);
System::currentTimeMillis,
nameIndex
);
}

@VisibleForTesting
Expand All @@ -128,7 +133,9 @@ public StageFilesProcessor(
SnowflakeTelemetryPipeStatus pipeTelemetry,
SnowflakeTelemetryService telemetryService,
ScheduledExecutorService schedulingExecutor,
TimeSupplier currentTimeSupplier) {
TimeSupplier currentTimeSupplier,
String nameIndex
) {
this.pipeName = pipeName;
this.tableName = tableName;
this.stageName = stageName;
Expand All @@ -140,6 +147,7 @@ public StageFilesProcessor(
this.pipeTelemetry = pipeTelemetry;
this.schedulingExecutor = schedulingExecutor;
this.filters = new FilteringPredicates(currentTimeSupplier, prefix);
this.nameIndex = nameIndex;
}

/**
Expand Down Expand Up @@ -172,7 +180,7 @@ public ProgressRegister trackFilesAsync() {
*/
@VisibleForTesting
void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry progressTelemetry) {
LOGGER.info("Starting file cleaner for pipe {} ...", pipeName);
LOGGER.info("Starting file cleaner for pipe {} ...{}", pipeName, nameIndex);

AtomicBoolean shouldFetchInitialStageFiles = new AtomicBoolean(true);
AtomicBoolean isFirstRun = new AtomicBoolean(true);
Expand All @@ -197,16 +205,17 @@ void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry pro
// initialize state based on the remote stage state (do it on first call or after
// error)
if (shouldFetchInitialStageFiles.getAndSet(false)) {
LOGGER.info("Getting in first time nameIndex {}", nameIndex);
initializeCleanStartState(ctx, isFirstRun.get());
isFirstRun.set(false);
}

LOGGER.debug(
"cleanup cycle {} for pipe {} with {} files and history with {} entries",
"cleanup cycle {} for pipe {} with {} files and history with {} entries {}",
ctx.cleanupCycle.incrementAndGet(),
pipeName,
ctx.files.size(),
ctx.ingestHistory.size());
ctx.ingestHistory.size(), nameIndex);

// process the files, store the spillover ones for the next cycle. in case of an
// error - we
Expand All @@ -217,11 +226,11 @@ void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry pro
} catch (Exception e) {
progressTelemetry.reportKafkaConnectFatalError(e.getMessage());
LOGGER.warn(
"Cleaner encountered an exception {} in cycle {}:\n{}\n{}",
"Cleaner encountered an exception {} in cycle {}:\n{}\n{}, {}",
e.getClass(),
ctx.cleanupCycle.get(),
e.getMessage(),
e.getStackTrace());
e.getStackTrace(), nameIndex);

shouldFetchInitialStageFiles.set(true);
hadError.set(true);
Expand Down Expand Up @@ -257,13 +266,18 @@ private void initializeCleanStartState(ProcessorContext ctx, boolean firstRun) {
// state
ctx.ingestHistory.clear();
ctx.historyMarker.set(null);
LOGGER.debug("for pipe {} found {} file(s) on remote stage", pipeName, remoteStageFiles.size());
LOGGER.debug("for pipe {} found {} file(s) on remote stage {}", pipeName, remoteStageFiles.size(), nameIndex);
}

private void nextCheck(ProcessorContext ctx, ProgressRegisterImpl register, boolean hadErrors) {

// make first categorization - split files into these with start offset higher than current
FileCategorizer fileCategories = FileCategorizer.build(ctx.files, register.offset.get());
LOGGER.info("DirtyFiles : {}, Staging Files : {}, {}, {}",
String.join(", ", fileCategories.dirtyFiles),
String.join(", ", fileCategories.stageFiles.keySet()),
nameIndex, pipeName, register.offset.get()
);

if (hadErrors) {
ctx.progressTelemetry.updateStatsAfterError(
Expand Down Expand Up @@ -300,10 +314,10 @@ private void nextCheck(ProcessorContext ctx, ProgressRegisterImpl register, bool
fileCategories.query(filters.trackableFilesPredicate).collect(Collectors.toList());
cleanOldHistory(ctx);
LOGGER.debug(
"keep {} files and {} history entries for next cycle for pipe {}",
"keep {} files and {} history entries for next cycle for pipe {}, {}",
filesToTrack.size(),
ctx.ingestHistory.size(),
pipeName);
pipeName, nameIndex);
ctx.files.clear();
ctx.files.addAll(filesToTrack);
ctx.files.addAll(fileCategories.dirtyFiles);
Expand Down Expand Up @@ -346,10 +360,10 @@ private void checkAndRefreshStaleFiles(FileCategorizer fileCategorizer, Processo
// new or modified
// readOneHourHistory method though)
LOGGER.debug(
"Checking stale file history for pipe: {}, staleFileCount: {}, staleFiles:{}",
"Checking stale file history for pipe: {}, staleFileCount: {}, staleFiles:{}, {}",
pipeName,
staleFiles.size(),
String.join(", ", staleFiles));
String.join(", ", staleFiles), nameIndex);

Map<String, InternalUtils.IngestedFileStatus> report =
ingestionService.readOneHourHistory(staleFiles, historyWindow);
Expand Down Expand Up @@ -377,10 +391,10 @@ private void purgeLoadedFiles(

if (!loadedFiles.isEmpty()) {
LOGGER.debug(
"Purging loaded files for pipe: {}, loadedFileCount: {}, loadedFiles:{}",
"Purging loaded files for pipe: {}, loadedFileCount: {}, loadedFiles:{}, {}",
pipeName,
loadedFiles.size(),
String.join(", ", loadedFiles));
String.join(", ", loadedFiles), nameIndex);
conn.purgeStage(stageName, loadedFiles);
stopTrackingFiles(loadedFiles, fileCategorizer, ctx);

Expand All @@ -397,10 +411,10 @@ private void moveFailedFiles(
fileCategorizer.query(filters.failedFilesPredicate).collect(Collectors.toList());
if (!failedFiles.isEmpty()) {
LOGGER.debug(
"Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}",
"Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}, {}",
pipeName,
failedFiles.size(),
String.join(", ", failedFiles));
String.join(", ", failedFiles), nameIndex);
conn.moveToTableStage(tableName, stageName, failedFiles);
stopTrackingFiles(failedFiles, fileCategorizer, ctx);
onMoveFiles.accept(failedFiles.size());
Expand Down Expand Up @@ -440,18 +454,18 @@ private void purgeDirtyFiles(Set<String> files) {
try {
LOGGER.info(
"Purging files already present on the stage for pipe{} before start."
+ " reprocessFileSize:{}, files: {}",
+ " reprocessFileSize:{}, files: {}, {}",
pipeName,
files.size(),
String.join(", ", files));
String.join(", ", files), nameIndex);
conn.purgeStage(stageName, new ArrayList<>(files));
files.clear();
} catch (Exception e) {
LOGGER.error(
"Reprocess cleaner encountered an exception {}:\n{}\n{}",
"Reprocess cleaner encountered an exception {}:\n{}\n{}, {}",
e.getClass(),
e.getMessage(),
e.getStackTrace());
e.getStackTrace(), nameIndex);
}
}

Expand Down Expand Up @@ -540,13 +554,13 @@ public ProgressRegisterImpl(StageFilesProcessor filesProcessor) {

@Override
public void registerNewStageFile(String fileName) {
LOGGER.debug("Start tracking new file {}", fileName);
LOGGER.debug("Start tracking new file {}, {}", fileName, owner.get().nameIndex);
files.add(fileName);
}

@Override
public void newOffset(long offset) {
LOGGER.trace("New offset: {}", offset);
LOGGER.trace("New offset: {}, {}", offset, owner.get().nameIndex);
this.offset.set(offset);
}

Expand All @@ -564,9 +578,9 @@ private void transferFilesToContext(ProcessorContext ctx) {
files.drainTo(freshFiles);
StageFilesProcessor processor = owner.get();
LOGGER.debug(
"collected {} files for processing for pipe {}",
"collected {} files for processing for pipe {}, files {}, {}",
freshFiles.size(),
processor == null ? "n/a" : processor.pipeName);
processor == null ? "n/a" : processor.pipeName, owner.get().nameIndex);
ctx.files.addAll(freshFiles);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ private void createFileProcessor(int ticks) {
pipeTelemetry,
telemetryService,
createTestScheduler(ticks, currentTime, nextTickCallback, scheduledFuture),
currentTime::get);
currentTime::get,
"nameIndex"
);
register = new StageFilesProcessor.ProgressRegisterImpl(victim);
}

Expand Down