Skip to content

Commit

Permalink
NIFI-12330: When synchronizing flow, pause component scheduling until…
Browse files Browse the repository at this point in the history
… after property migration has completed (apache#7994)
  • Loading branch information
markap14 authored Nov 7, 2023
1 parent 0fe9d2b commit 8b0abd4
Showing 1 changed file with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,31 +235,40 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve
}
}

context.getFlowManager().withParameterContextResolution(() -> {
try {
final Map<String, ParameterProviderReference> parameterProviderReferences = versionedExternalFlow.getParameterProviders() == null
// Pause component scheduling until after all properties have been migrated. This will ensure that we are able to migrate them
// before enabling any Controller Services or starting any properties.
context.getComponentScheduler().pause();
try {
context.getFlowManager().withParameterContextResolution(() -> {
try {
final Map<String, ParameterProviderReference> parameterProviderReferences = versionedExternalFlow.getParameterProviders() == null
? new HashMap<>() : versionedExternalFlow.getParameterProviders();
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()) : group;
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
}
});
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()) : group;
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(),
parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
}
});

for (final CreatedExtension createdExtension : createdExtensions) {
final ComponentNode extension = createdExtension.extension();
final Map<String, String> originalPropertyValues = createdExtension.propertyValues();
for (final CreatedExtension createdExtension : createdExtensions) {
final ComponentNode extension = createdExtension.extension();
final Map<String, String> originalPropertyValues = createdExtension.propertyValues();

final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(context.getExtensionManager(), context.getFlowManager(),
context.getControllerServiceProvider(), extension);
final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(context.getExtensionManager(), context.getFlowManager(),
context.getControllerServiceProvider(), extension);

if (extension instanceof final ProcessorNode processor) {
processor.migrateConfiguration(originalPropertyValues, serviceFactory);
} else if (extension instanceof final ControllerServiceNode service) {
service.migrateConfiguration(originalPropertyValues, serviceFactory);
} else if (extension instanceof final ReportingTaskNode task) {
task.migrateConfiguration(originalPropertyValues, serviceFactory);
if (extension instanceof final ProcessorNode processor) {
processor.migrateConfiguration(originalPropertyValues, serviceFactory);
} else if (extension instanceof final ControllerServiceNode service) {
service.migrateConfiguration(originalPropertyValues, serviceFactory);
} else if (extension instanceof final ReportingTaskNode task) {
task.migrateConfiguration(originalPropertyValues, serviceFactory);
}
}
} finally {
// Resume component scheduler, now that properties have been migrated, so that any components that are intended to be scheduled are.
context.getComponentScheduler().resume();
}

group.onComponentModified();
Expand Down

0 comments on commit 8b0abd4

Please sign in to comment.