From 580adecb93b251acdfaf83febc4afadc1d46bddb Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz <30436981+Elmacioro@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:08:13 +0100 Subject: [PATCH] Fix deployments for scenarios with dict editors after model reload (#7123) --- .../engine/api/parameter/ParameterName.scala | 11 +++ .../AdditionalUIConfigProvider.scala | 3 + .../component/DesignerWideComponentId.scala | 6 +- .../DeploymentManagerDependencies.scala | 2 + ...lComponentConfigsForRuntimeExtractor.scala | 32 ++++++ ...ponentConfigsForRuntimeExtractorTest.scala | 99 +++++++++++++++++++ ...cessingTypeDeployedScenariosProvider.scala | 5 +- .../deployment/DeploymentService.scala | 22 ++++- .../newdeployment/DeploymentService.scala | 59 ++++++++--- .../server/AkkaHttpBasedRouteProvider.scala | 16 ++- .../test/base/it/NuResourcesTest.scala | 5 +- .../test/utils/domain/TestFactory.scala | 6 +- .../NotificationServiceTest.scala | 11 ++- .../deployment/DeploymentServiceSpec.scala | 3 +- .../newdeployment/DeploymentServiceTest.scala | 3 +- docs/Changelog.md | 1 + .../FlinkProcessCompilerDataFactory.scala | 7 +- ...ubbedFlinkProcessCompilerDataFactory.scala | 6 +- .../TestFlinkProcessCompilerDataFactory.scala | 3 +- ...ationFlinkProcessCompilerDataFactory.scala | 3 +- .../process/runner/FlinkProcessMain.scala | 4 +- .../engine/process/runner/FlinkTestMain.scala | 13 ++- .../process/runner/FlinkTestMainSpec.scala | 73 +++++++++++++- .../periodic/PeriodicDeploymentManager.scala | 3 +- .../periodic/PeriodicProcessService.scala | 17 +++- .../PeriodicDeploymentManagerTest.scala | 3 +- ...eriodicProcessServiceIntegrationTest.scala | 3 +- .../periodic/PeriodicProcessServiceTest.scala | 3 +- .../management/FlinkRestManagerSpec.scala | 11 ++- .../embedded/EmbeddedDeploymentManager.scala | 13 +++ .../RunnableScenarioInterpreterFactory.scala | 4 +- .../deployment/AdditionalModelConfigs.scala | 15 +++ .../engine/deployment/DeploymentData.scala | 14 ++- .../touk/nussknacker/engine/ModelData.scala | 27 +++-- ...ompilerDataFactoryWithTestComponents.scala | 7 +- 35 files changed, 447 insertions(+), 66 deletions(-) create mode 100644 designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala create mode 100644 designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala create mode 100644 extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala diff --git a/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala b/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala index 72076f2fb67..532d9453763 100644 --- a/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala +++ b/common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala @@ -1,5 +1,16 @@ package pl.touk.nussknacker.engine.api.parameter +import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder} +import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder} + final case class ParameterName(value: String) { def withBranchId(branchId: String): ParameterName = ParameterName(s"$value for branch $branchId") } + +object ParameterName { + implicit val encoder: Encoder[ParameterName] = deriveUnwrappedEncoder + implicit val decoder: Decoder[ParameterName] = deriveUnwrappedDecoder + + implicit val keyEncoder: KeyEncoder[ParameterName] = KeyEncoder.encodeKeyString.contramap(_.value) + implicit val keyDecoder: KeyDecoder[ParameterName] = KeyDecoder.decodeKeyString.map(ParameterName(_)) +} diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala index 7be6b67a99f..6e38258c60f 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/AdditionalUIConfigProvider.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.api.component +import io.circe.generic.JsonCodec import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue import pl.touk.nussknacker.engine.api.parameter.{ ParameterName, @@ -24,6 +25,7 @@ object AdditionalUIConfigProvider { val empty = new DefaultAdditionalUIConfigProvider(Map.empty, Map.empty) } +@JsonCodec case class ComponentAdditionalConfig( parameterConfigs: Map[ParameterName, ParameterAdditionalUIConfig], icon: Option[String] = None, @@ -32,6 +34,7 @@ case class ComponentAdditionalConfig( disabled: Boolean = false ) +@JsonCodec case class ParameterAdditionalUIConfig( required: Boolean, initialValue: Option[FixedExpressionValue], diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala index d6a35fdd628..99b0ddcbec4 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/DesignerWideComponentId.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.api.component import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder} -import io.circe.{Decoder, Encoder} +import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder} // TODO This class is used as a work around for the problem that the components are duplicated across processing types. // We plan to get rid of this. After that, we could replace usages of this class by usage of ComponentId @@ -14,6 +14,10 @@ object DesignerWideComponentId { implicit val encoder: Encoder[DesignerWideComponentId] = deriveUnwrappedEncoder implicit val decoder: Decoder[DesignerWideComponentId] = deriveUnwrappedDecoder + implicit val keyEncoder: KeyEncoder[DesignerWideComponentId] = KeyEncoder.encodeKeyString.contramap(_.value) + implicit val keyDecoder: KeyDecoder[DesignerWideComponentId] = + KeyDecoder.decodeKeyString.map(DesignerWideComponentId(_)) + def apply(value: String): DesignerWideComponentId = new DesignerWideComponentId(value.toLowerCase) def forBuiltInComponent(componentId: ComponentId): DesignerWideComponentId = { diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala index 27f402a0384..bf8fed6e669 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerDependencies.scala @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.deployment.{ ProcessingTypeDeployedScenariosProvider, ScenarioActivityManager } +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} import sttp.client3.SttpBackend import scala.concurrent.{ExecutionContext, Future} @@ -17,6 +18,7 @@ case class DeploymentManagerDependencies( executionContext: ExecutionContext, actorSystem: ActorSystem, sttpBackend: SttpBackend[Future, Any], + configsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty ) { implicit def implicitExecutionContext: ExecutionContext = executionContext implicit def implicitActorSystem: ActorSystem = actorSystem diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala new file mode 100644 index 00000000000..6490977cf9a --- /dev/null +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractor.scala @@ -0,0 +1,32 @@ +package pl.touk.nussknacker.engine.util + +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} +import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor + +object AdditionalComponentConfigsForRuntimeExtractor { + + // This is done to reduce data sent to Flink + def getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] + ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = { + getAdditionalConfigsWithDictParametersEditors(additionalComponentConfigs) + } + + // This function filters additional configs provided by AdditionalUIConfigProvider + // to include only component and parameter configs with Dictionary editors. + private def getAdditionalConfigsWithDictParametersEditors( + additionalComponentConfigs: Map[DesignerWideComponentId, ComponentAdditionalConfig] + ): Map[DesignerWideComponentId, ComponentAdditionalConfig] = additionalComponentConfigs + .map { case (componentId, componentAdditionalConfig) => + val parametersWithDictEditors = componentAdditionalConfig.parameterConfigs.filter { + case (_, additionalUiConfig) => + additionalUiConfig.valueEditor match { + case Some(_: ValueInputWithDictEditor) => true + case _ => false + } + } + componentId -> componentAdditionalConfig.copy(parameterConfigs = parametersWithDictEditors) + } + .filter(_._2.parameterConfigs.nonEmpty) + +} diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala new file mode 100644 index 00000000000..568c72d6304 --- /dev/null +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/util/AdditionalComponentConfigsForRuntimeExtractorTest.scala @@ -0,0 +1,99 @@ +package pl.touk.nussknacker.engine.util + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + ComponentGroupName, + DesignerWideComponentId, + ParameterAdditionalUIConfig +} +import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue +import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ValueInputWithDictEditor} +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractorTest.{ + componentConfigWithDictionaryEditorInParameter, + componentConfigWithOnlyDictEditorParameters, + componentConfigWithoutDictionaryEditorInParameter +} + +class AdditionalComponentConfigsForRuntimeExtractorTest extends AnyFunSuite with Matchers { + + test("should filter only components and parameters with dictionary editors") { + val additionalConfig = Map( + DesignerWideComponentId("componentA") -> componentConfigWithDictionaryEditorInParameter, + DesignerWideComponentId("componentB") -> componentConfigWithoutDictionaryEditorInParameter, + ) + val filteredResult = + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime(additionalConfig) + + filteredResult shouldBe Map( + DesignerWideComponentId("componentA") -> componentConfigWithOnlyDictEditorParameters + ) + } + +} + +object AdditionalComponentConfigsForRuntimeExtractorTest { + + private val parameterAWithDictEditor = ( + ParameterName("parameterA"), + ParameterAdditionalUIConfig( + required = true, + initialValue = Some(FixedExpressionValue("'someInitialValueExpression'", "someInitialValueLabel")), + hintText = None, + valueEditor = Some(ValueInputWithDictEditor("someDictA", allowOtherValue = true)), + valueCompileTimeValidation = None + ) + ) + + private val parameterBWithDictEditor = ( + ParameterName("parameterB"), + ParameterAdditionalUIConfig( + required = false, + initialValue = None, + hintText = Some("someHint"), + valueEditor = Some(ValueInputWithDictEditor("someDictB", allowOtherValue = false)), + valueCompileTimeValidation = None + ) + ) + + private val parameterWithoutDictEditor = ( + ParameterName("parameterC"), + ParameterAdditionalUIConfig( + required = true, + initialValue = None, + hintText = None, + valueEditor = None, + valueCompileTimeValidation = None + ) + ) + + private val componentConfigWithDictionaryEditorInParameter = ComponentAdditionalConfig( + parameterConfigs = Map( + parameterAWithDictEditor, + parameterBWithDictEditor, + parameterWithoutDictEditor + ), + icon = Some("someIcon"), + docsUrl = Some("someDocUrl"), + componentGroup = Some(ComponentGroupName("Service")) + ) + + private val componentConfigWithoutDictionaryEditorInParameter = ComponentAdditionalConfig( + parameterConfigs = Map(parameterWithoutDictEditor), + icon = Some("someOtherIcon"), + docsUrl = Some("someOtherDocUrl"), + componentGroup = Some(ComponentGroupName("Service")) + ) + + private val componentConfigWithOnlyDictEditorParameters = ComponentAdditionalConfig( + parameterConfigs = Map( + parameterAWithDictEditor, + parameterBWithDictEditor + ), + icon = Some("someIcon"), + docsUrl = Some("someDocUrl"), + componentGroup = Some(ComponentGroupName("Service")) + ) + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala index 838787de980..fd0a3063793 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DefaultProcessingTypeDeployedScenariosProvider.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, User} +import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData, DeploymentId, User} import pl.touk.nussknacker.ui.db.DbRef import pl.touk.nussknacker.ui.process.ScenarioQuery import pl.touk.nussknacker.ui.process.fragment.{DefaultFragmentRepository, FragmentResolver} @@ -51,7 +51,8 @@ class DefaultProcessingTypeDeployedScenariosProvider( DeploymentId.fromActionId(lastDeployAction.id), deployingUser, Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + AdditionalModelConfigs.empty ) val deployedScenarioDataTry = scenarioResolver.resolveScenario(details.json).map { resolvedScenario => diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index 5ae7f7bab7b..8a6c8a78fc4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -8,7 +8,11 @@ import cats.syntax.functor._ import com.typesafe.scalalogging.LazyLogging import db.util.DBIOActionInstances._ import pl.touk.nussknacker.engine.api.Comment -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus @@ -16,6 +20,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser} import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess} @@ -55,6 +60,10 @@ class DeploymentService( processChangeListener: ProcessChangeListener, scenarioStateTimeout: Option[FiniteDuration], deploymentCommentSettings: Option[DeploymentCommentSettings], + additionalComponentConfigs: ProcessingTypeDataProvider[ + Map[DesignerWideComponentId, ComponentAdditionalConfig], + _ + ], clock: Clock = Clock.systemUTC() )(implicit system: ActorSystem) extends ActionService @@ -324,7 +333,8 @@ class DeploymentService( DeploymentId.fromActionId(actionId), user.toManagerUser, additionalDeploymentData, - nodesDeploymentData + nodesDeploymentData, + getAdditionalModelConfigsRequiredForRuntime(processDetails.processingType) ) } yield DeployedScenarioData(processDetails.toEngineProcessVersion, deploymentData, resolvedCanonicalProcess) } @@ -408,6 +418,14 @@ class DeploymentService( ) } + private def getAdditionalModelConfigsRequiredForRuntime(processingType: ProcessingType)(implicit user: LoggedUser) = { + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs.forProcessingType(processingType).getOrElse(Map.empty) + ) + ) + } + // TODO: check deployment id to be sure that returned status is for given deployment override def getProcessState( processIdWithName: ProcessIdWithName diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala index 55a48256f50..f30004cb49e 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentService.scala @@ -4,12 +4,21 @@ import cats.Applicative import cats.data.{EitherT, NonEmptyList} import com.typesafe.scalalogging.LazyLogging import db.util.DBIOActionInstances._ -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, ProcessingType, VersionId} import pl.touk.nussknacker.engine.api.{ProcessVersion => RuntimeVersionData} -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId => LegacyDeploymentId} +import pl.touk.nussknacker.engine.deployment.{ + AdditionalModelConfigs, + DeploymentData, + DeploymentId => LegacyDeploymentId +} import pl.touk.nussknacker.engine.newdeployment.DeploymentId +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import pl.touk.nussknacker.restmodel.validation.ValidationResults.ValidationErrors import pl.touk.nussknacker.security.Permission import pl.touk.nussknacker.security.Permission.Permission @@ -19,6 +28,7 @@ import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps import pl.touk.nussknacker.ui.process.newdeployment.DeploymentEntityFactory.{DeploymentEntityData, WithModifiedAt} import pl.touk.nussknacker.ui.process.newdeployment.DeploymentService._ +import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioMetadataRepository} import pl.touk.nussknacker.ui.process.version.ScenarioGraphVersionService import pl.touk.nussknacker.ui.security.api.LoggedUser @@ -42,7 +52,11 @@ class DeploymentService( deploymentRepository: DeploymentRepository, dmDispatcher: DeploymentManagerDispatcher, dbioRunner: DBIOActionRunner, - clock: Clock + clock: Clock, + additionalComponentConfigs: ProcessingTypeDataProvider[ + Map[DesignerWideComponentId, ComponentAdditionalConfig], + _ + ], )(implicit ec: ExecutionContext) extends LazyLogging { @@ -152,11 +166,11 @@ class DeploymentService( ): EitherT[Future, RunDeploymentError, Unit] = { val runtimeVersionData = processVersionFor(scenarioMetadata, scenarioGraphVersion) // TODO: It shouldn't be needed - val dumbDeploymentData = DeploymentData( + val dumbDeploymentData = createDeploymentData( LegacyDeploymentId(""), - user.toManagerUser, - Map.empty, - NodesDeploymentData.empty + user, + NodesDeploymentData.empty, + scenarioMetadata.processingType ) for { result <- EitherT[Future, RunDeploymentError, Unit]( @@ -185,11 +199,11 @@ class DeploymentService( command: RunDeploymentCommand ): EitherT[Future, RunDeploymentError, Unit] = { val runtimeVersionData = processVersionFor(scenarioMetadata, scenarioGraphVersion) - val deploymentData = DeploymentData( + val deploymentData = createDeploymentData( toLegacyDeploymentId(command.id), - command.user.toManagerUser, - additionalDeploymentData = Map.empty, - command.nodesDeploymentData + command.user, + command.nodesDeploymentData, + scenarioMetadata.processingType ) dmDispatcher .deploymentManagerUnsafe(scenarioMetadata.processingType)(command.user) @@ -211,6 +225,19 @@ class DeploymentService( EitherT.pure(()) } + private def createDeploymentData( + deploymentId: LegacyDeploymentId, + loggedUser: LoggedUser, + nodesData: NodesDeploymentData, + processingType: ProcessingType + ) = DeploymentData( + deploymentId = deploymentId, + user = loggedUser.toManagerUser, + additionalDeploymentData = Map.empty, + nodesData = nodesData, + additionalModelConfigs = getAdditionalModelConfigsRequiredForRuntime(processingType, loggedUser) + ) + private def handleFailureDuringDeploymentRequesting( deploymentId: DeploymentId, ex: Throwable @@ -256,6 +283,14 @@ class DeploymentService( ) } + private def getAdditionalModelConfigsRequiredForRuntime(processingType: ProcessingType, loggedUser: LoggedUser) = { + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime( + additionalComponentConfigs.forProcessingType(processingType)(loggedUser).getOrElse(Map.empty) + ) + ) + } + } object DeploymentService { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala index de65f02d053..822727dc6d4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala @@ -232,6 +232,10 @@ class AkkaHttpBasedRouteProvider( futureProcessRepository ) + val additionalComponentConfigs = processingTypeDataProvider.mapValues { processingTypeData => + processingTypeData.designerModelData.modelData.additionalConfigsFromProvider + } + val legacyDeploymentService = new LegacyDeploymentService( dmDispatcher, processRepository, @@ -241,7 +245,8 @@ class AkkaHttpBasedRouteProvider( scenarioResolver, processChangeListener, featureTogglesConfig.scenarioStateTimeout, - featureTogglesConfig.deploymentCommentSettings + featureTogglesConfig.deploymentCommentSettings, + additionalComponentConfigs ) legacyDeploymentService.invalidateInProgressActions() @@ -438,7 +443,8 @@ class AkkaHttpBasedRouteProvider( deploymentRepository, dmDispatcher, dbioRunner, - Clock.systemDefaultZone() + Clock.systemDefaultZone(), + additionalComponentConfigs ) val activityService = new ActivityService( @@ -707,6 +713,7 @@ class AkkaHttpBasedRouteProvider( featureTogglesConfig.componentDefinitionExtractionMode ), getDeploymentManagerDependencies( + additionalUIConfigProvider, actionServiceProvider, scenarioActivityRepository, dbioActionRunner, @@ -722,12 +729,14 @@ class AkkaHttpBasedRouteProvider( } private def getDeploymentManagerDependencies( + additionalUIConfigProvider: AdditionalUIConfigProvider, actionServiceProvider: Supplier[ActionService], scenarioActivityRepository: ScenarioActivityRepository, dbioActionRunner: DBIOActionRunner, sttpBackend: SttpBackend[Future, Any], processingType: ProcessingType )(implicit executionContext: ExecutionContext) = { + val additionalConfigsFromProvider = additionalUIConfigProvider.getAllForProcessingType(processingType) DeploymentManagerDependencies( DefaultProcessingTypeDeployedScenariosProvider(dbRef, processingType), new DefaultProcessingTypeActionService( @@ -740,7 +749,8 @@ class AkkaHttpBasedRouteProvider( ), system.dispatcher, system, - sttpBackend + sttpBackend, + additionalConfigsFromProvider ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala index 4e7e42fc74d..cde22f36654 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala @@ -18,13 +18,11 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.{Assertion, BeforeAndAfterEach, OptionValues, Suite} import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.CirceUtil.humanReadablePrinter -import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.graph.ScenarioGraph import pl.touk.nussknacker.engine.api.process.VersionId.initialVersionId import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.definition.test.{ModelDataTestInfoProvider, TestInfoProvider} import pl.touk.nussknacker.restmodel.CustomActionRequest import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails @@ -120,7 +118,8 @@ trait NuResourcesTest scenarioResolverByProcessingType, processChangeListener, None, - deploymentCommentSettings + deploymentCommentSettings, + mapProcessingTypeDataProvider() ) protected val processingTypeConfig: ProcessingTypeConfig = diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala index 6cb0419e829..c95746e8536 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/TestFactory.scala @@ -6,7 +6,7 @@ import cats.effect.unsafe.IORuntime import cats.instances.future._ import com.typesafe.config.ConfigFactory import db.util.DBIOActionInstances._ -import pl.touk.nussknacker.engine.api.component.{DesignerWideComponentId, ProcessingMode} +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId, ProcessingMode} import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue import pl.touk.nussknacker.engine.api.deployment.{ NoOpScenarioActivityManager, @@ -120,6 +120,10 @@ object TestFactory { Streaming.stringify -> new ScenarioResolver(sampleResolver, Streaming.stringify) ) + def additionalComponentConfigsByProcessingType + : ProcessingTypeDataProvider[Map[DesignerWideComponentId, ComponentAdditionalConfig], _] = + mapProcessingTypeDataProvider() + val modelDependencies: ModelDependencies = ModelDependencies( TestAdditionalUIConfigProvider.componentAdditionalConfigMap, diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index 931bcdf91c3..0c01b8408aa 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -14,7 +14,12 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId} +import pl.touk.nussknacker.engine.deployment.{ + AdditionalModelConfigs, + DeploymentData, + DeploymentId, + ExternalDeploymentId +} import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory} import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues @@ -195,6 +200,7 @@ class NotificationServiceTest mock[ProcessChangeListener], None, None, + TestFactory.additionalComponentConfigsByProcessingType, clock ) { override protected def validateBeforeDeploy( @@ -216,7 +222,8 @@ class NotificationServiceTest DeploymentId.fromActionId(actionId), user.toManagerUser, additionalDeploymentData, - nodesDeploymentData + nodesDeploymentData, + AdditionalModelConfigs.empty ), processDetails.json ) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala index 88c0bb1cab4..fe926036f87 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala @@ -114,7 +114,8 @@ class DeploymentServiceSpec TestFactory.scenarioResolverByProcessingType, listener, scenarioStateTimeout, - deploymentCommentSettings + deploymentCommentSettings, + additionalComponentConfigsByProcessingType ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala index e8259434b44..5b4b8d8bae9 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/newdeployment/DeploymentServiceTest.scala @@ -53,7 +53,8 @@ class DeploymentServiceTest TestFactory.newFutureFetchingScenarioRepository(testDbRef) ), dbioRunner, - clock + clock, + TestFactory.additionalComponentConfigsByProcessingType ) } diff --git a/docs/Changelog.md b/docs/Changelog.md index cd3132a2435..9e45ab6c331 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -12,6 +12,7 @@ * [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries * [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation +* [#7123](https://github.com/TouK/nussknacker/pull/7123) Fix deployments for scenarios with dict editors after model reload ## 1.18 diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala index f2b0214f758..dd56b848c8f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerDataFactory.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.process.compiler import com.typesafe.config.Config import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun -import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} import pl.touk.nussknacker.engine.api.dict.EngineDictRegistry import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator, ProcessObjectDependencies} @@ -36,6 +36,7 @@ class FlinkProcessCompilerDataFactory( modelConfig: Config, namingStrategy: NamingStrategy, componentUseCase: ComponentUseCase, + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ) extends Serializable { import net.ceedubs.ficus.Ficus._ @@ -47,6 +48,7 @@ class FlinkProcessCompilerDataFactory( modelData.modelConfig, modelData.namingStrategy, componentUseCase = ComponentUseCase.EngineRuntime, + modelData.additionalConfigsFromProvider ) def prepareCompilerData( @@ -119,12 +121,11 @@ class FlinkProcessCompilerDataFactory( ): (ModelDefinitionWithClasses, EngineDictRegistry) = { val dictRegistryFactory = loadDictRegistry(userCodeClassLoader) val modelDefinitionWithTypes = ModelDefinitionWithClasses( - // additionalConfigsFromProvider aren't provided, as it's not needed to run the process on flink extractModelDefinition( userCodeClassLoader, modelDependencies, id => DesignerWideComponentId(id.toString), - Map.empty + configsFromProviderWithDictionaryEditor ) ) val dictRegistry = dictRegistryFactory.createEngineDictRegistry( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala index 04e6f402438..6cd907bc57c 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/StubbedFlinkProcessCompilerDataFactory.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.process.compiler import com.typesafe.config.Config import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun import pl.touk.nussknacker.engine.api.{NodeId, Params} -import pl.touk.nussknacker.engine.api.component.ComponentType +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, ComponentType, DesignerWideComponentId} import pl.touk.nussknacker.engine.api.context.ContextTransformation import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator} @@ -29,13 +29,15 @@ abstract class StubbedFlinkProcessCompilerDataFactory( extractModelDefinition: ExtractDefinitionFun, modelConfig: Config, namingStrategy: NamingStrategy, - componentUseCase: ComponentUseCase + componentUseCase: ComponentUseCase, + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ) extends FlinkProcessCompilerDataFactory( creator, extractModelDefinition, modelConfig, namingStrategy, componentUseCase, + configsFromProviderWithDictionaryEditor ) { override protected def adjustDefinitions( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala index fa82e64d0b8..c2697567e2a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala @@ -35,7 +35,8 @@ object TestFlinkProcessCompilerDataFactory { modelData.extractModelDefinitionFun, modelData.modelConfig, modelData.namingStrategy, - ComponentUseCase.TestRuntime + ComponentUseCase.TestRuntime, + modelData.additionalConfigsFromProvider ) { override protected def adjustListeners( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala index ae29cccc21a..5f5b4e50151 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/VerificationFlinkProcessCompilerDataFactory.scala @@ -17,7 +17,8 @@ object VerificationFlinkProcessCompilerDataFactory { modelData.extractModelDefinitionFun, modelData.modelConfig, modelData.namingStrategy, - componentUseCase = ComponentUseCase.Validation + componentUseCase = ComponentUseCase.Validation, + modelData.additionalConfigsFromProvider ) { override protected def adjustListeners( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala index f2ade445799..da869ea2d97 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala @@ -4,7 +4,7 @@ import java.io.File import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig -import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.{CirceUtil, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData @@ -27,7 +27,7 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { s"Model version ${processVersion.modelVersion}. Deploying user [id=${deploymentData.user.id}, name=${deploymentData.user.name}]" ) val config: Config = readConfigFromArgs(args) - val modelData = ModelData.duringFlinkExecution(config) + val modelData = ModelData.duringFlinkExecution(ModelConfigs(config, deploymentData.additionalModelConfigs)) val env = getExecutionEnvironment runProcess( env, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index bdbce7f3d83..62e2efce685 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} @@ -32,7 +32,16 @@ object FlinkTestMain extends FlinkRunner { val processVersion = ProcessVersion.empty.copy(processName = ProcessName("snapshot version") ) // testing process may be unreleased, so it has no version - new FlinkTestMain(modelData, process, scenarioTestData, processVersion, DeploymentData.empty, configuration).runTest + new FlinkTestMain( + modelData, + process, + scenarioTestData, + processVersion, + DeploymentData.empty.copy(additionalModelConfigs = + AdditionalModelConfigs(modelData.additionalConfigsFromProvider) + ), + configuration + ).runTest } } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala index dea96b429cc..ab8ce3c2f26 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala @@ -20,7 +20,15 @@ import pl.touk.nussknacker.engine.graph.node.Case import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.ThreadUtils -import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + ParameterAdditionalUIConfig +} +import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ValueInputWithDictEditor} +import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs +import pl.touk.nussknacker.engine.graph.expression.Expression import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global @@ -654,6 +662,62 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) ) } + + "should throw exception when parameter was modified by AdditionalUiConfigProvider with dict editor and flink wasn't provided with additional config" in { + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .processor( + "eager1", + "collectingEager", + "static" -> Expression.dictKeyWithLabel("'s'", Some("s")), + "dynamic" -> "#input.id".spel + ) + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val run = Future { + runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + } + val dictEditorException = intercept[IllegalStateException](Await.result(run, 10 seconds)) + dictEditorException.getMessage shouldBe "DictKeyWithLabel expression can only be used with DictParameterEditor, got Some(DualParameterEditor(StringParameterEditor,RAW))" + } + + "should run correctly when parameter was modified by AdditionalUiConfigProvider with dict editor and flink was provided with additional config" in { + val modifiedComponentName = "collectingEager" + val modifiedParameterName = "static" + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .processor( + "eager1", + modifiedComponentName, + modifiedParameterName -> Expression.dictKeyWithLabel("'s'", Some("s")), + "dynamic" -> "#input.id".spel + ) + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val results = runFlinkTest( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), + useIOMonadInInterpreter, + additionalConfigsFromProvider = Map( + DesignerWideComponentId("service-" + modifiedComponentName) -> ComponentAdditionalConfig( + parameterConfigs = Map( + ParameterName(modifiedParameterName) -> ParameterAdditionalUIConfig( + required = false, + initialValue = None, + hintText = None, + valueEditor = Some(ValueInputWithDictEditor("someDictId", allowOtherValue = false)), + valueCompileTimeValidation = None + ) + ) + ) + ) + ) + results.exceptions should have length 0 + } } private def createTestRecord( @@ -667,13 +731,16 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor process: CanonicalProcess, scenarioTestData: ScenarioTestData, useIOMonadInInterpreter: Boolean, - enrichDefaultConfig: Config => Config = identity + enrichDefaultConfig: Config => Config = identity, + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty ): TestResults[_] = { val config = enrichDefaultConfig(ConfigFactory.load("application.conf")) .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) // We need to set context loader to avoid forking in sbt - val modelData = ModelData.duringFlinkExecution(config) + val modelData = ModelData.duringFlinkExecution( + ModelConfigs(config, AdditionalModelConfigs(additionalConfigsFromProvider)) + ) ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.configuration()) } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index 02e47fe9eaa..dae199ecf46 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -58,7 +58,8 @@ object PeriodicDeploymentManager { periodicBatchConfig.executionConfig, processConfigEnricher, clock, - dependencies.actionService + dependencies.actionService, + dependencies.configsFromProvider ) // These actors have to be created with retries because they can initially fail to create due to taken names, diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index ebec77566a5..e346c92ec08 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -3,14 +3,18 @@ package pl.touk.nussknacker.engine.management.periodic import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + NodesDeploymentData +} import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId} +import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData, DeploymentId} import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.{ DeploymentStatus, EngineStatusesToReschedule, @@ -23,6 +27,7 @@ import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesReposi import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus import pl.touk.nussknacker.engine.management.periodic.model._ import pl.touk.nussknacker.engine.management.periodic.service._ +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import java.time.chrono.ChronoLocalDateTime import java.time.temporal.ChronoUnit @@ -40,7 +45,8 @@ class PeriodicProcessService( executionConfig: PeriodicExecutionConfig, processConfigEnricher: ProcessConfigEnricher, clock: Clock, - actionService: ProcessingTypeActionService + actionService: ProcessingTypeActionService, + configsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] )(implicit ec: ExecutionContext) extends LazyLogging { @@ -401,7 +407,10 @@ class PeriodicProcessService( DeploymentData.systemUser, additionalDeploymentDataProvider.prepareAdditionalData(deployment), // TODO: in the future we could allow users to specify nodes data during schedule requesting - NodesDeploymentData.empty + NodesDeploymentData.empty, + AdditionalModelConfigs( + AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime(configsFromProvider) + ) ) val deploymentWithJarData = deployment.periodicProcess.deploymentData val deploymentAction = for { diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index 8c0401c3339..37f365c1d4d 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -74,7 +74,8 @@ class PeriodicDeploymentManagerTest executionConfig = executionConfig, processConfigEnricher = ProcessConfigEnricher.identity, clock = Clock.systemDefaultZone(), - new ProcessingTypeActionServiceStub + new ProcessingTypeActionServiceStub, + Map.empty ) val periodicDeploymentManager = new PeriodicDeploymentManager( diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index 8b6bcd8a564..0fd79cd3c06 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -145,7 +145,8 @@ class PeriodicProcessServiceIntegrationTest executionConfig = executionConfig, processConfigEnricher = ProcessConfigEnricher.identity, clock = fixedClock(currentTime), - new ProcessingTypeActionServiceStub + new ProcessingTypeActionServiceStub, + Map.empty ) } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index 89c41213d2f..7da7f65421a 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -120,7 +120,8 @@ class PeriodicProcessServiceTest }, Clock.systemDefaultZone(), - actionService + actionService, + Map.empty ) } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index ea47b28fd3f..f379f73891b 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -19,7 +19,13 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.Proble import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId, User} +import pl.touk.nussknacker.engine.deployment.{ + AdditionalModelConfigs, + DeploymentData, + DeploymentId, + ExternalDeploymentId, + User +} import pl.touk.nussknacker.engine.management.rest.HttpFlinkClient import pl.touk.nussknacker.engine.management.rest.flinkRestModel._ import pl.touk.nussknacker.engine.testing.LocalModelData @@ -58,7 +64,8 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu DeploymentId(""), User("user1", "User 1"), Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + AdditionalModelConfigs.empty ) private val returnedJobId = "jobId" diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 178f5ab5cbf..7a7242a36d4 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus +import pl.touk.nussknacker.engine.api.parameter.ValueInputWithDictEditor import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId, ExternalDeploymentId} @@ -19,6 +20,7 @@ import pl.touk.nussknacker.engine.lite.metrics.dropwizard.{DropwizardMetricsProv import pl.touk.nussknacker.engine.{BaseModelData, CustomProcessValidator, DeploymentManagerDependencies, ModelData} import pl.touk.nussknacker.lite.manager.{LiteDeploymentManager, LiteDeploymentManagerProvider} import pl.touk.nussknacker.engine.newdeployment +import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{Await, ExecutionContext, Future} @@ -98,6 +100,7 @@ class EmbeddedDeploymentManager( case DMRunDeploymentCommand(processVersion, deploymentData, canonicalProcess, updateStrategy) => Future { ensureReplaceDeploymentUpdateStrategy(updateStrategy) + ensureAdditionalComponentsConfigsAreEmpty(deploymentData) deployScenarioClosingOldIfNeeded( processVersion, deploymentData, @@ -121,6 +124,16 @@ class EmbeddedDeploymentManager( } } + // We make sure that we don't let deploy a scenario when any component was modified by AdditionalUIConfigProvider + // as it could potentially result in failure during compilation before execution + private def ensureAdditionalComponentsConfigsAreEmpty(deploymentData: DeploymentData): Unit = { + if (deploymentData.additionalModelConfigs.additionalConfigsFromProvider.nonEmpty) { + throw new IllegalArgumentException( + "Component config modification by AdditionalUIConfigProvider is not supported for Lite engine" + ) + } + } + private def deployScenarioClosingOldIfNeeded( processVersion: ProcessVersion, deploymentData: DeploymentData, diff --git a/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala b/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala index a81ab729573..d4cd99dc792 100644 --- a/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala +++ b/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader -import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.{JobData, LiteStreamMetaData, ProcessVersion, RequestResponseMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.lite.RunnableScenarioInterpreter @@ -25,7 +25,7 @@ object RunnableScenarioInterpreterFactory extends LazyLogging { ): RunnableScenarioInterpreter = { val modelConfig: Config = runtimeConfig.getConfig("modelConfig") val modelData = ModelData.duringExecution( - modelConfig, + ModelConfigs(modelConfig), ModelClassLoader(modelConfig.as[List[String]]("classPath"), workingDirectoryOpt = None), resolveConfigs = true ) diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala new file mode 100644 index 00000000000..65fbdea3716 --- /dev/null +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/AdditionalModelConfigs.scala @@ -0,0 +1,15 @@ +package pl.touk.nussknacker.engine.deployment + +import io.circe.generic.JsonCodec +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} + +@JsonCodec +final case class AdditionalModelConfigs( + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] +) + +object AdditionalModelConfigs { + + def empty: AdditionalModelConfigs = AdditionalModelConfigs(Map.empty) + +} diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala index 09fc28ab902..3e59150483a 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/DeploymentData.scala @@ -7,7 +7,8 @@ import pl.touk.nussknacker.engine.api.component.NodesDeploymentData deploymentId: DeploymentId, user: User, additionalDeploymentData: Map[String, String], - nodesData: NodesDeploymentData + nodesData: NodesDeploymentData, + additionalModelConfigs: AdditionalModelConfigs ) object DeploymentData { @@ -15,14 +16,21 @@ object DeploymentData { val systemUser: User = User("system", "system") val empty: DeploymentData = - DeploymentData(DeploymentId(""), systemUser, Map.empty, NodesDeploymentData.empty) + DeploymentData( + DeploymentId(""), + systemUser, + Map.empty, + NodesDeploymentData.empty, + AdditionalModelConfigs.empty + ) def withDeploymentId(deploymentIdString: String) = DeploymentData( DeploymentId(deploymentIdString), systemUser, Map.empty, - NodesDeploymentData.empty + NodesDeploymentData.empty, + AdditionalModelConfigs.empty ) } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala index 3bee3bdfb54..43f00f94bd9 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala @@ -14,12 +14,12 @@ import pl.touk.nussknacker.engine.api.dict.{DictServicesFactory, EngineDictRegis import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process.{ProcessConfigCreator, ProcessObjectDependencies} import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode -import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode.FinalDefinition import pl.touk.nussknacker.engine.definition.model.{ ModelDefinition, ModelDefinitionExtractor, ModelDefinitionWithClasses } +import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs import pl.touk.nussknacker.engine.dict.DictServicesFactoryLoader import pl.touk.nussknacker.engine.migration.ProcessMigrations import pl.touk.nussknacker.engine.modelconfig._ @@ -58,8 +58,12 @@ object ModelData extends LazyLogging { // Also a classloader is correct so we don't need to build the new one // This tiny method is Flink specific so probably the interpreter module is not the best one // but it is very convenient to keep in near normal, duringExecution method - def duringFlinkExecution(inputConfig: Config): ModelData = { - duringExecution(inputConfig, ModelClassLoader.empty, resolveConfigs = false) + def duringFlinkExecution(modelConfigs: ModelConfigs): ModelData = { + duringExecution( + modelConfigs, + ModelClassLoader.empty, + resolveConfigs = false, + ) } // On the runtime side, we get only model config, not the whole processing type config, @@ -67,15 +71,19 @@ object ModelData extends LazyLogging { // But it is not a big deal, because scenario was already validated before deploy, so we already check that // we don't use not allowed components for a given category // and that the scenario doesn't violate validators introduced by additionalConfigsFromProvider - def duringExecution(inputConfig: Config, modelClassLoader: ModelClassLoader, resolveConfigs: Boolean): ModelData = { + def duringExecution( + modelConfigs: ModelConfigs, + modelClassLoader: ModelClassLoader, + resolveConfigs: Boolean, + ): ModelData = { def resolveInputConfigDuringExecution(modelConfigLoader: ModelConfigLoader): InputConfigDuringExecution = { if (resolveConfigs) { modelConfigLoader.resolveInputConfigDuringExecution( - ConfigWithUnresolvedVersion(modelClassLoader.classLoader, inputConfig), + ConfigWithUnresolvedVersion(modelClassLoader.classLoader, modelConfigs.modelInputConfig), modelClassLoader.classLoader ) } else { - InputConfigDuringExecution(inputConfig) + InputConfigDuringExecution(modelConfigs.modelInputConfig) } } ClassLoaderModelData( @@ -83,7 +91,7 @@ object ModelData extends LazyLogging { modelClassLoader = modelClassLoader, category = None, determineDesignerWideId = id => DesignerWideComponentId(id.toString), - additionalConfigsFromProvider = Map.empty, + additionalConfigsFromProvider = modelConfigs.additionalModelConfigs.additionalConfigsFromProvider, shouldIncludeConfigCreator = _ => true, shouldIncludeComponentProvider = _ => true, componentDefinitionExtractionMode = ComponentDefinitionExtractionMode.FinalDefinition @@ -96,6 +104,11 @@ object ModelData extends LazyLogging { } +final case class ModelConfigs( + modelInputConfig: Config, + additionalModelConfigs: AdditionalModelConfigs = AdditionalModelConfigs.empty +) + final case class ModelDependencies( additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig], determineDesignerWideId: ComponentId => DesignerWideComponentId, diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala index f5dd8d0c133..15fcd878c7b 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerDataFactoryWithTestComponents.scala @@ -5,7 +5,7 @@ import io.circe.Json import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun import pl.touk.nussknacker.engine.api._ -import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId} import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.definition.component.ComponentDefinitionWithImplementation @@ -36,7 +36,8 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { modelData.namingStrategy, componentUseCase, testExtensionsHolder, - resultsCollectingListener + resultsCollectingListener, + modelData.additionalConfigsFromProvider ) def apply( @@ -47,6 +48,7 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { componentUseCase: ComponentUseCase, testExtensionsHolder: TestExtensionsHolder, resultsCollectingListener: ResultsCollectingListener[Any], + configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig] ): FlinkProcessCompilerDataFactory = { new FlinkProcessCompilerDataFactory( creator, @@ -54,6 +56,7 @@ object FlinkProcessCompilerDataFactoryWithTestComponents { modelConfig, namingStrategy, componentUseCase, + configsFromProviderWithDictionaryEditor ) { override protected def adjustDefinitions(