diff --git a/engine/flink/management/periodic/src/main/resources/web/static/assets/custom-actions/batch-instant.svg b/designer/client/src/assets/img/toolbarButtons/perform-single-execution.svg similarity index 100% rename from engine/flink/management/periodic/src/main/resources/web/static/assets/custom-actions/batch-instant.svg rename to designer/client/src/assets/img/toolbarButtons/perform-single-execution.svg diff --git a/designer/client/src/components/Process/ProcessStateUtils.ts b/designer/client/src/components/Process/ProcessStateUtils.ts index 1fe025efb88..ed8433de849 100644 --- a/designer/client/src/components/Process/ProcessStateUtils.ts +++ b/designer/client/src/components/Process/ProcessStateUtils.ts @@ -1,4 +1,4 @@ -import { PredefinedActionName, ProcessStateType, Scenario } from "./types"; +import { ActionName, PredefinedActionName, ProcessStateType, Scenario, ActionTooltip } from "./types"; import { descriptionProcessArchived, descriptionFragment, @@ -18,6 +18,12 @@ class ProcessStateUtils { public canArchive = (state: ProcessStateType): boolean => state?.allowedActions.includes(PredefinedActionName.Archive); + public canSeePerformSingleExecution = (state: ProcessStateType): boolean => + state?.visibleActions.includes(PredefinedActionName.PerformSingleExecution); + + public canPerformSingleExecution = (state: ProcessStateType): boolean => + state?.allowedActions.includes(PredefinedActionName.PerformSingleExecution); + getStateDescription({ isArchived, isFragment }: Scenario, processState: ProcessStateType): string { if (isArchived) { return isFragment ? descriptionFragmentArchived() : descriptionProcessArchived(); @@ -60,6 +66,10 @@ class ProcessStateUtils { } return `${name}-${processState?.icon || state?.icon || unknownIcon}`; } + + getActionCustomTooltip(processState: ProcessStateType, actionName: ActionName): ActionTooltip | undefined { + return processState?.actionTooltips[actionName] || undefined; + } } export default new ProcessStateUtils(); diff --git a/designer/client/src/components/Process/types.ts b/designer/client/src/components/Process/types.ts index 36ddc9dd2e8..bf0a6edc4a6 100644 --- a/designer/client/src/components/Process/types.ts +++ b/designer/client/src/components/Process/types.ts @@ -9,6 +9,7 @@ export enum PredefinedActionName { Archive = "ARCHIVE", UnArchive = "UNARCHIVE", Pause = "PAUSE", + PerformSingleExecution = "PERFORM_SINGLE_EXECUTION", } export type ActionName = string; @@ -66,17 +67,25 @@ export interface Scenario { export type ProcessName = Scenario["name"]; +export enum ActionTooltip { + NotAllowedForDeployedVersion = "NOT_ALLOWED_FOR_DEPLOYED_VERSION", + NotAllowedInCurrentState = "NOT_ALLOWED_IN_CURRENT_STATE", +} + export type ProcessStateType = { status: StatusType; + latestVersionId: number; + deployedVersionId?: number; externalDeploymentId?: string; + visibleActions: Array; allowedActions: Array; + actionTooltips: Record; icon: string; tooltip: string; description: string; startTime?: Date; attributes?: UnknownRecord; errors?: Array; - version?: number | null; }; export type StatusType = { diff --git a/designer/client/src/components/toolbarSettings/buttons/ActionButton.tsx b/designer/client/src/components/toolbarSettings/buttons/ActionButton.tsx index 519d33d4cf8..f274b003036 100644 --- a/designer/client/src/components/toolbarSettings/buttons/ActionButton.tsx +++ b/designer/client/src/components/toolbarSettings/buttons/ActionButton.tsx @@ -17,20 +17,7 @@ export function ActionButton({ name, type }: ActionButtonProps): JSX.Element { const customActions = useSelector(getCustomActions); const action = useMemo(() => customActions.find((a) => a.name === name), [customActions, name]); - // FIXME: This part requires further changes within periodic scenario engine. - // Currently we use experimental api of custom actions for periodic scenarios (an experimental engine). - // Part of this experimental engine allows to run immediately scheduled scenario. This activity will be moved inside core deployment operations and aligned with other deployment engines. - // Here we want to disable that one action button in confusing situation when user looks at scenario version that is not currently deployed. - const isDeployed = useSelector(isDeployedVersion); - const disabledValue = useMemo(() => !isDeployed, [isDeployed, name]); - return action ? ( - + ) : null; } diff --git a/designer/client/src/components/toolbarSettings/buttons/BuiltinButtonTypes.ts b/designer/client/src/components/toolbarSettings/buttons/BuiltinButtonTypes.ts index b1ce9631c62..ec4816131a4 100644 --- a/designer/client/src/components/toolbarSettings/buttons/BuiltinButtonTypes.ts +++ b/designer/client/src/components/toolbarSettings/buttons/BuiltinButtonTypes.ts @@ -2,6 +2,7 @@ export enum BuiltinButtonTypes { processSave = "process-save", processDeploy = "process-deploy", processCancel = "process-cancel", + processPerformSingleExecution = "process-perform-single-execution", editUndo = "edit-undo", editRedo = "edit-redo", editCopy = "edit-copy", diff --git a/designer/client/src/components/toolbarSettings/buttons/TOOLBAR_BUTTONS_MAP.ts b/designer/client/src/components/toolbarSettings/buttons/TOOLBAR_BUTTONS_MAP.ts index 0f95980e408..6f02c96d672 100644 --- a/designer/client/src/components/toolbarSettings/buttons/TOOLBAR_BUTTONS_MAP.ts +++ b/designer/client/src/components/toolbarSettings/buttons/TOOLBAR_BUTTONS_MAP.ts @@ -31,6 +31,7 @@ import { ZoomOutButton } from "../../toolbars/view/buttons/ZoomOutButton"; import { BuiltinButtonTypes } from "./BuiltinButtonTypes"; import { CustomButtonTypes } from "./CustomButtonTypes"; import { ToolbarButton, ToolbarButtonTypes } from "./types"; +import PerformSingleExecutionButton from "../../toolbars/scenarioActions/buttons/PerformSingleExecutionButton"; export type PropsOfButton = ToolbarButton & { type: T; @@ -44,6 +45,7 @@ export const TOOLBAR_BUTTONS_MAP: ToolbarButtonsMap = { [BuiltinButtonTypes.processSave]: SaveButton, [BuiltinButtonTypes.processDeploy]: DeployButton, [BuiltinButtonTypes.processCancel]: CancelDeployButton, + [BuiltinButtonTypes.processPerformSingleExecution]: PerformSingleExecutionButton, [BuiltinButtonTypes.viewZoomIn]: ZoomInButton, [BuiltinButtonTypes.viewZoomOut]: ZoomOutButton, [BuiltinButtonTypes.viewReset]: ResetViewButton, diff --git a/designer/client/src/components/toolbarSettings/defaultToolbarsConfig.ts b/designer/client/src/components/toolbarSettings/defaultToolbarsConfig.ts index b608cd27836..ee30210c9ef 100644 --- a/designer/client/src/components/toolbarSettings/defaultToolbarsConfig.ts +++ b/designer/client/src/components/toolbarSettings/defaultToolbarsConfig.ts @@ -29,6 +29,7 @@ export function defaultToolbarsConfig(isFragment: boolean, isArchived: boolean): { type: BuiltinButtonTypes.processSave }, { type: BuiltinButtonTypes.processDeploy }, { type: BuiltinButtonTypes.processCancel }, + { type: BuiltinButtonTypes.processPerformSingleExecution }, ], }, { diff --git a/designer/client/src/components/toolbars/actions/buttons/CustomActionButton.tsx b/designer/client/src/components/toolbars/actions/buttons/CustomActionButton.tsx deleted file mode 100644 index 98f07725c6c..00000000000 --- a/designer/client/src/components/toolbars/actions/buttons/CustomActionButton.tsx +++ /dev/null @@ -1,50 +0,0 @@ -import React from "react"; -import { useTranslation } from "react-i18next"; -import DefaultIcon from "../../../../assets/img/toolbarButtons/custom_action.svg"; -import { CustomAction } from "../../../../types"; -import { useWindows, WindowKind } from "../../../../windowManager"; -import { StatusType } from "../../../Process/types"; -import { ToolbarButton } from "../../../toolbarComponents/toolbarButtons"; -import { ToolbarButtonProps } from "../../types"; -import UrlIcon from "../../../UrlIcon"; -import { ACTION_DIALOG_WIDTH } from "../../../../stylesheets/variables"; - -type CustomActionProps = { - action: CustomAction; - processName: string; - processStatus: StatusType | null; -} & ToolbarButtonProps; - -export default function CustomActionButton(props: CustomActionProps) { - const { action, processStatus, disabled, type } = props; - - const { t } = useTranslation(); - - const icon = action.icon ? : ; - - const statusName = processStatus?.name; - const available = !disabled && action.allowedStateStatusNames.includes(statusName); - - const toolTip = available - ? null - : t("panels.actions.custom-action.tooltips.disabled", "Disabled for {{statusName}} status.", { statusName }); - - const { open } = useWindows(); - return ( - - open({ - title: action.name, - kind: WindowKind.customAction, - width: ACTION_DIALOG_WIDTH, - meta: action, - }) - } - type={type} - /> - ); -} diff --git a/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx b/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx index 3f956ee9bc6..658248415b2 100644 --- a/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx +++ b/designer/client/src/components/toolbars/activities/ActivitiesPanelRow.tsx @@ -27,7 +27,8 @@ export const ActivitiesPanelRow = memo(({ index, style, setRowHeight, handleShow () => activities.findIndex((activeItem) => activeItem.uiType === "item" && activeItem.type === "SCENARIO_DEPLOYED"), [activities], ); - const isRunning = firstDeployedIndex === index && scenarioState.status.name === "RUNNING"; + const isRunning = + firstDeployedIndex === index && (scenarioState.status.name === "RUNNING" || scenarioState.status.name === "SCHEDULED"); const isFirstDateItem = activities.findIndex((activeItem) => activeItem.uiType === "date") === index; useEffect(() => { diff --git a/designer/client/src/components/toolbars/scenarioActions/buttons/PerformSingleExecutionButton.tsx b/designer/client/src/components/toolbars/scenarioActions/buttons/PerformSingleExecutionButton.tsx new file mode 100644 index 00000000000..39a6c50b6fc --- /dev/null +++ b/designer/client/src/components/toolbars/scenarioActions/buttons/PerformSingleExecutionButton.tsx @@ -0,0 +1,73 @@ +import React from "react"; +import { useTranslation } from "react-i18next"; +import { useDispatch, useSelector } from "react-redux"; +import { loadProcessState } from "../../../../actions/nk"; +import Icon from "../../../../assets/img/toolbarButtons/perform-single-execution.svg"; +import HttpService from "../../../../http/HttpService"; +import { getProcessName, isPerformSingleExecutionPossible, isPerformSingleExecutionVisible } from "../../../../reducers/selectors/graph"; +import { getCapabilities } from "../../../../reducers/selectors/other"; +import { useWindows, WindowKind } from "../../../../windowManager"; +import { ToggleProcessActionModalData } from "../../../modals/DeployProcessDialog"; +import { ToolbarButton } from "../../../toolbarComponents/toolbarButtons"; +import { ToolbarButtonProps } from "../../types"; +import { ACTION_DIALOG_WIDTH } from "../../../../stylesheets/variables"; +import ProcessStateUtils from "../../../Process/ProcessStateUtils"; +import { RootState } from "../../../../reducers"; +import { getProcessState } from "../../../../reducers/selectors/scenarioState"; +import { ActionTooltip, PredefinedActionName } from "../../../Process/types"; + +export default function PerformSingleExecutionButton(props: ToolbarButtonProps) { + const { t } = useTranslation(); + const dispatch = useDispatch(); + const { disabled, type } = props; + const scenarioState = useSelector((state: RootState) => getProcessState(state)); + const isVisible = useSelector(isPerformSingleExecutionVisible); + const isPossible = useSelector(isPerformSingleExecutionPossible); + const processName = useSelector(getProcessName); + const capabilities = useSelector(getCapabilities); + const available = !disabled && isPossible && capabilities.deploy; + + const { open } = useWindows(); + const action = (p, c) => HttpService.performSingleExecution(p, c).finally(() => dispatch(loadProcessState(processName))); + const message = t("panels.actions.perform-single-execution.dialog", "Perform single execution", { name: processName }); + + const actionTooltip = ProcessStateUtils.getActionCustomTooltip(scenarioState, PredefinedActionName.PerformSingleExecution); + + const tooltip = + actionTooltip === ActionTooltip.NotAllowedForDeployedVersion + ? t( + "panels.actions.perform-single-execution.tooltip.not-allowed-for-deployed-version", + "There is new version {{ latestVersion }} available.{{ deployedVersionDescription }}", + { + latestVersion: scenarioState.latestVersionId, + deployedVersionDescription: scenarioState?.deployedVersionId + ? ` (version ${scenarioState.deployedVersionId} is deployed)` + : ``, + }, + ) + : actionTooltip === ActionTooltip.NotAllowedInCurrentState + ? t("panels.actions.perform-single-execution.tooltip.not-allowed-in-current-state", "Disabled for {{ status }} status.", { + status: scenarioState.status.name, + }) + : "run now"; + + if (isVisible) { + return ( + } + onClick={() => + open({ + title: message, + kind: WindowKind.deployProcess, + width: ACTION_DIALOG_WIDTH, + meta: { action }, + }) + } + type={type} + /> + ); + } else return <>; +} diff --git a/designer/client/src/containers/event-tracking/helpers.ts b/designer/client/src/containers/event-tracking/helpers.ts index 8e968a111f6..824538f0f53 100644 --- a/designer/client/src/containers/event-tracking/helpers.ts +++ b/designer/client/src/containers/event-tracking/helpers.ts @@ -92,6 +92,9 @@ export const mapToolbarButtonToStatisticsEvent = ( case BuiltinButtonTypes.processCancel: { return EventTrackingSelector.ScenarioCancel; } + case BuiltinButtonTypes.processPerformSingleExecution: { + return EventTrackingSelector.ScenarioPerformSingleExecution; + } case BuiltinButtonTypes.processArchiveToggle: { return EventTrackingSelector.ScenarioArchiveToggle; } diff --git a/designer/client/src/containers/event-tracking/use-register-tracking-events.ts b/designer/client/src/containers/event-tracking/use-register-tracking-events.ts index 4961763f791..a11d28e25b4 100644 --- a/designer/client/src/containers/event-tracking/use-register-tracking-events.ts +++ b/designer/client/src/containers/event-tracking/use-register-tracking-events.ts @@ -51,6 +51,7 @@ enum ClickEventsSelector { ScenarioSave = "SCENARIO_SAVE", TestCounts = "TEST_COUNTS", ScenarioCancel = "SCENARIO_CANCEL", + ScenarioPerformSingleExecution = "SCENARIO_PERFORM_SINGLE_EXECUTION", ScenarioArchiveToggle = "SCENARIO_ARCHIVE_TOGGLE", ScenarioUnarchive = "SCENARIO_UNARCHIVE", ScenarioCustomAction = "SCENARIO_CUSTOM_ACTION", diff --git a/designer/client/src/http/HttpService.ts b/designer/client/src/http/HttpService.ts index 977af76a486..42612049e10 100644 --- a/designer/client/src/http/HttpService.ts +++ b/designer/client/src/http/HttpService.ts @@ -359,6 +359,31 @@ class HttpService { }); } + performSingleExecution(processName: string, comment?: string) { + const data = { + comment: comment, + }; + return api + .post(`/processManagement/performSingleExecution/${encodeURIComponent(processName)}`, data) + .then((res) => { + const msg = res.data.msg; + this.#addInfo(msg); + return { + isSuccess: res.data.isSuccess, + msg: msg, + }; + }) + .catch((error) => { + const msg = error.response.data.msg || error.response.data; + const result = { + isSuccess: false, + msg: msg, + }; + if (error?.response?.status != 400) return this.#addError(msg, error, false).then(() => result); + return result; + }); + } + customAction(processName: string, actionName: string, params: Record, comment?: string) { const data = { actionName: actionName, diff --git a/designer/client/src/reducers/graph/utils.fixtures.ts b/designer/client/src/reducers/graph/utils.fixtures.ts index fa320120b80..3c42ffd014a 100644 --- a/designer/client/src/reducers/graph/utils.fixtures.ts +++ b/designer/client/src/reducers/graph/utils.fixtures.ts @@ -166,8 +166,11 @@ export const state: GraphState = { status: { name: "NOT_DEPLOYED", }, - version: null, + latestVersionId: 1, + deployedVersionId: 1, + visibleActions: ["DEPLOY", "ARCHIVE", "RENAME"], allowedActions: ["DEPLOY", "ARCHIVE", "RENAME"], + actionTooltips: {}, icon: "/assets/states/not-deployed.svg", tooltip: "The scenario is not deployed.", description: "The scenario is not deployed.", diff --git a/designer/client/src/reducers/selectors/graph.ts b/designer/client/src/reducers/selectors/graph.ts index 41642d30f66..a943c21c682 100644 --- a/designer/client/src/reducers/selectors/graph.ts +++ b/designer/client/src/reducers/selectors/graph.ts @@ -56,11 +56,18 @@ export const isDeployedVersion = createSelector( [getProcessVersionId, createSelector(getScenario, (scenario) => scenario?.lastDeployedAction?.processVersionId)], (visibleVersion, deployedVersion) => visibleVersion === deployedVersion, ); +export const isCancelPossible = createSelector(getProcessState, (state) => ProcessStateUtils.canCancel(state)); +export const isPerformSingleExecutionVisible = createSelector([getProcessState], (state) => + ProcessStateUtils.canSeePerformSingleExecution(state), +); +export const isPerformSingleExecutionPossible = createSelector( + [isSaveDisabled, hasError, getProcessState, isFragment], + (saveDisabled, error, state, fragment) => !fragment && saveDisabled && !error && ProcessStateUtils.canPerformSingleExecution(state), +); export const isMigrationPossible = createSelector( [isSaveDisabled, hasError, getProcessState, isFragment], (saveDisabled, error, state, fragment) => saveDisabled && !error && (fragment || ProcessStateUtils.canDeploy(state)), ); -export const isCancelPossible = createSelector(getProcessState, (state) => ProcessStateUtils.canCancel(state)); export const isArchivePossible = createSelector( [getProcessState, isFragment], (state, isFragment) => isFragment || ProcessStateUtils.canArchive(state), diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DMScenarioCommand.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DMScenarioCommand.scala index 0db44eef5fa..a9a4f1ffc76 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DMScenarioCommand.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DMScenarioCommand.scala @@ -5,13 +5,7 @@ import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{ - CustomActionResult, - DeploymentData, - DeploymentId, - ExternalDeploymentId, - User -} +import pl.touk.nussknacker.engine.deployment._ import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults // DM Prefix is from Deployment Manager, to distinguish from commands passed into the domain service layer (DeploymentService) @@ -86,3 +80,9 @@ case class DMCancelScenarioCommand(scenarioName: ProcessName, user: User) extend case class DMStopScenarioCommand(scenarioName: ProcessName, savepointDir: Option[String], user: User) extends DMScenarioCommand[SavepointResult] + +case class DMPerformSingleExecutionCommand( + processVersion: ProcessVersion, + canonicalProcess: CanonicalProcess, + user: User, +) extends DMScenarioCommand[SingleExecutionResult] diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index c56c9b7ea74..32adfa086b5 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.api.deployment import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.CustomActionDefinition import pl.touk.nussknacker.engine.newdeployment @@ -14,10 +14,14 @@ trait DeploymentManagerInconsistentStateHandlerMixIn { final override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = { val engineStateResolvedWithLastAction = flattenStatus(lastStateAction, statusDetails) - Future.successful(processStateDefinitionManager.processState(engineStateResolvedWithLastAction)) + Future.successful( + processStateDefinitionManager.processState(engineStateResolvedWithLastAction, latestVersionId, deployedVersionId) + ) } // This method is protected to make possible to override it with own logic handling different edge cases like @@ -37,14 +41,23 @@ trait DeploymentManager extends AutoCloseable { def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] - final def getProcessState(idWithName: ProcessIdWithName, lastStateAction: Option[ProcessAction])( + final def getProcessState( + idWithName: ProcessIdWithName, + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], + )( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[ProcessState]] = { for { statusDetailsWithFreshness <- getProcessStates(idWithName.name) - stateWithFreshness <- resolve(idWithName, statusDetailsWithFreshness.value, lastStateAction).map(state => - statusDetailsWithFreshness.map(_ => state) - ) + stateWithFreshness <- resolve( + idWithName, + statusDetailsWithFreshness.value, + lastStateAction, + latestVersionId, + deployedVersionId + ).map(state => statusDetailsWithFreshness.map(_ => state)) } yield stateWithFreshness } @@ -63,7 +76,9 @@ trait DeploymentManager extends AutoCloseable { def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] def processStateDefinitionManager: ProcessStateDefinitionManager diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala index 741801374e8..fef9b2e4d06 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.api.deployment +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import java.net.URI @@ -20,15 +21,23 @@ import java.net.URI */ class OverridingProcessStateDefinitionManager( delegate: ProcessStateDefinitionManager, - statusActionsPF: PartialFunction[StateStatus, List[ScenarioActionName]] = PartialFunction.empty, + statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = PartialFunction.empty, statusIconsPF: PartialFunction[StateStatus, URI] = PartialFunction.empty, statusTooltipsPF: PartialFunction[StateStatus, String] = PartialFunction.empty, statusDescriptionsPF: PartialFunction[StateStatus, String] = PartialFunction.empty, - customStateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty + customStateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty, + customApplicableActions: Option[List[ScenarioActionName]] = None, + customActionTooltips: Option[ProcessStatus => Map[ScenarioActionName, ScenarioActionTooltip]] = None, ) extends ProcessStateDefinitionManager { - override def statusActions(stateStatus: StateStatus): List[ScenarioActionName] = - statusActionsPF.applyOrElse(stateStatus, delegate.statusActions) + override def visibleActions: List[ScenarioActionName] = + customApplicableActions.getOrElse(delegate.visibleActions) + + override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = + statusActionsPF.applyOrElse(processStatus, delegate.statusActions) + + override def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, ScenarioActionTooltip] = + customActionTooltips.map(_(processStatus)).getOrElse(delegate.actionTooltips(processStatus)) override def statusIcon(stateStatus: StateStatus): URI = statusIconsPF.orElse(stateDefinitionsPF(_.icon)).applyOrElse(stateStatus, delegate.statusIcon) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala index 572500c82a7..a3fabddc8b8 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala @@ -1,6 +1,8 @@ package pl.touk.nussknacker.engine.api.deployment +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{ProcessStatus, defaultApplicableActions} import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName +import pl.touk.nussknacker.engine.api.process.VersionId import java.net.URI @@ -36,20 +38,39 @@ trait ProcessStateDefinitionManager { def statusIcon(stateStatus: StateStatus): URI = stateDefinitions(stateStatus.name).icon + /** + * Actions that are applicable to scenario in general. They may be available only in particular states, as defined by `def statusActions` + */ + def visibleActions: List[ScenarioActionName] = defaultApplicableActions + + /** + * Custom tooltips for actions + */ + def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, ScenarioActionTooltip] = Map.empty + /** * Allowed transitions between states. */ - def statusActions(stateStatus: StateStatus): List[ScenarioActionName] + def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] /** * Enhances raw [[StateStatus]] with scenario properties, including deployment info. */ - def processState(statusDetails: StatusDetails): ProcessState = { + def processState( + statusDetails: StatusDetails, + latestVersionId: VersionId, + deployedVersionId: Option[VersionId] + ): ProcessState = { + val status = ProcessStatus(statusDetails.status, latestVersionId, deployedVersionId) ProcessState( statusDetails.externalDeploymentId, statusDetails.status, statusDetails.version, - statusActions(statusDetails.status), + latestVersionId, + deployedVersionId, + visibleActions, + statusActions(status), + actionTooltips(status), statusIcon(statusDetails.status), statusTooltip(statusDetails.status), statusDescription(statusDetails.status), @@ -60,3 +81,32 @@ trait ProcessStateDefinitionManager { } } + +object ProcessStateDefinitionManager { + + /** + * ProcessStatus contains status of the scenario, it is used as argument of ProcessStateDefinitionManager methods + * + * @param stateStatus current scenario state + * @param latestVersionId latest saved versionId for the scenario + * @param deployedVersionId currently deployed versionId of the scenario + */ + final case class ProcessStatus( + stateStatus: StateStatus, + latestVersionId: VersionId, + deployedVersionId: Option[VersionId] + ) + + /** + * Actions, that are applicable in standard use-cases for most deployment managers. + */ + val defaultApplicableActions: List[ScenarioActionName] = List( + ScenarioActionName.Cancel, + ScenarioActionName.Deploy, + ScenarioActionName.Pause, + ScenarioActionName.Archive, + ScenarioActionName.UnArchive, + ScenarioActionName.Rename, + ) + +} diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala index be4135f3cab..a594b114f61 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala @@ -4,7 +4,7 @@ import com.github.benmanes.caffeine.cache.{AsyncCache, Caffeine} import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.CustomActionDefinition import scala.compat.java8.FutureConverters._ @@ -26,9 +26,11 @@ class CachingProcessStateDeploymentManager( override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = - delegate.resolve(idWithName, statusDetails, lastStateAction) + delegate.resolve(idWithName, statusDetails, lastStateAction, latestVersionId, deployedVersionId) override def getProcessStates( name: ProcessName diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala index 5f0c2de53f7..808aecad067 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.api.deployment.simple +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.DefaultActions import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus._ @@ -12,6 +13,7 @@ import pl.touk.nussknacker.engine.api.deployment.{ StateStatus, StatusDetails } +import pl.touk.nussknacker.engine.api.process.VersionId /** * Base [[ProcessStateDefinitionManager]] with basic state definitions and state transitions. @@ -20,8 +22,8 @@ import pl.touk.nussknacker.engine.api.deployment.{ */ object SimpleProcessStateDefinitionManager extends ProcessStateDefinitionManager { - override def statusActions(stateStatus: StateStatus): List[ScenarioActionName] = - statusActionsPF.applyOrElse(stateStatus, (_: StateStatus) => DefaultActions) + override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = + statusActionsPF.applyOrElse(processStatus, (_: ProcessStatus) => DefaultActions) override def statusDescription(stateStatus: StateStatus): String = stateStatus match { case _ @ProblemStateStatus(message, _) => message @@ -36,6 +38,7 @@ object SimpleProcessStateDefinitionManager extends ProcessStateDefinitionManager override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = SimpleStateStatus.definitions - val ErrorFailedToGet: ProcessState = processState(StatusDetails(FailedToGet, None)) + def errorFailedToGet(versionId: VersionId): ProcessState = + processState(StatusDetails(FailedToGet, None), versionId, None) } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala index 1ca7cd99450..f18131e4c8a 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala @@ -1,16 +1,9 @@ package pl.touk.nussknacker.engine.api.deployment.simple +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName +import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus.defaultActions -import pl.touk.nussknacker.engine.api.deployment.{ - DeploymentStatus, - NoAttributesDeploymentStatus, - NoAttributesStateStatus, - ProblemDeploymentStatus, - ScenarioActionName, - StateDefinitionDetails, - StateStatus -} import pl.touk.nussknacker.engine.api.process.VersionId import java.net.URI @@ -97,7 +90,7 @@ object SimpleStateStatus { status ) - val statusActionsPF: PartialFunction[StateStatus, List[ScenarioActionName]] = { + val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = _.stateStatus match { case SimpleStateStatus.NotDeployed => List(ScenarioActionName.Deploy, ScenarioActionName.Archive, ScenarioActionName.Rename) case SimpleStateStatus.DuringDeploy => List(ScenarioActionName.Deploy, ScenarioActionName.Cancel) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index 7a52b24eca2..0857606b57b 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.definition._ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.CustomActionDefinition import pl.touk.nussknacker.engine.{ BaseModelData, @@ -25,7 +25,9 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = { val lastStateActionStatus = lastStateAction match { case Some(action) if action.actionName == ScenarioActionName.Deploy => @@ -35,7 +37,13 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands case _ => SimpleStateStatus.NotDeployed } - Future.successful(processStateDefinitionManager.processState(StatusDetails(lastStateActionStatus, None))) + Future.successful( + processStateDefinitionManager.processState( + StatusDetails(lastStateActionStatus, None), + latestVersionId, + deployedVersionId + ) + ) } override def getProcessStates( @@ -59,14 +67,14 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands trait StubbingCommands { self: DeploymentManager => override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match { - case _: DMValidateScenarioCommand => Future.successful(()) - case _: DMRunDeploymentCommand => Future.successful(None) - case _: DMStopDeploymentCommand => Future.successful(SavepointResult("")) - case _: DMStopScenarioCommand => Future.successful(SavepointResult("")) - case _: DMCancelDeploymentCommand => Future.successful(()) - case _: DMCancelScenarioCommand => Future.successful(()) - case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) - case _: DMCustomActionCommand | _: DMTestScenarioCommand => notImplemented + case _: DMValidateScenarioCommand => Future.successful(()) + case _: DMRunDeploymentCommand => Future.successful(None) + case _: DMStopDeploymentCommand => Future.successful(SavepointResult("")) + case _: DMStopScenarioCommand => Future.successful(SavepointResult("")) + case _: DMCancelDeploymentCommand => Future.successful(()) + case _: DMCancelScenarioCommand => Future.successful(()) + case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) + case _: DMPerformSingleExecutionCommand | _: DMCustomActionCommand | _: DMTestScenarioCommand => notImplemented } } diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala index 5550143b693..79f411db975 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.api.deployment import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateDefinitionDetails.UnknownIcon import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName @@ -27,7 +28,7 @@ class OverridingProcessStateDefinitionManagerTest extends AnyFunSuite with Match ) ) - override def statusActions(stateStatus: StateStatus): List[ScenarioActionName] = Nil + override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = Nil } test("should combine delegate state definitions with custom overrides") { diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala index 089da77d92f..2be15eebb21 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala @@ -4,6 +4,7 @@ import org.scalatest.Inside import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} +import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId import scala.collection.immutable.List @@ -11,7 +12,11 @@ import scala.collection.immutable.List class SimpleProcessStateSpec extends AnyFunSpec with Matchers with Inside { def createProcessState(stateStatus: StateStatus): ProcessState = - SimpleProcessStateDefinitionManager.processState(StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12")))) + SimpleProcessStateDefinitionManager.processState( + StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12"))), + VersionId(1), + None + ) it("scenario state should be during deploy") { val state = createProcessState(SimpleStateStatus.DuringDeploy) diff --git a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/PerformSingleExecutionRequest.scala b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/PerformSingleExecutionRequest.scala new file mode 100644 index 00000000000..1a8659bedcf --- /dev/null +++ b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/PerformSingleExecutionRequest.scala @@ -0,0 +1,7 @@ +package pl.touk.nussknacker.restmodel + +import io.circe.generic.JsonCodec + +@JsonCodec final case class PerformSingleExecutionRequest( + comment: Option[String] = None, +) diff --git a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/PerformSingleExecutionResponse.scala b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/PerformSingleExecutionResponse.scala new file mode 100644 index 00000000000..420e75e6340 --- /dev/null +++ b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/PerformSingleExecutionResponse.scala @@ -0,0 +1,6 @@ +package pl.touk.nussknacker.restmodel + +import io.circe.generic.JsonCodec + +@JsonCodec +final case class PerformSingleExecutionResponse(isSuccess: Boolean, msg: String) diff --git a/designer/server/src/main/resources/defaultDesignerConfig.conf b/designer/server/src/main/resources/defaultDesignerConfig.conf index 548967a5392..2cd256d61af 100644 --- a/designer/server/src/main/resources/defaultDesignerConfig.conf +++ b/designer/server/src/main/resources/defaultDesignerConfig.conf @@ -133,6 +133,7 @@ processToolbarConfig { { type: "process-deploy", disabled: { fragment: true, archived: true, type: "oneof" } } { type: "process-cancel", disabled: { fragment: true, archived: true, type: "oneof" } } { type: "custom-link", name: "metrics", icon: "/assets/buttons/metrics.svg", url: "/metrics/$processName", disabled: { fragment: true } } + { type: "process-perform-single-execution", disabled: { fragment: true, archived: true, type: "oneof" } } ] } { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala index 6fd5089ef48..b9f999a1d74 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala @@ -172,7 +172,7 @@ class AppApiHttpService( ) statusMap = processes.flatMap(process => process.state.map(process.name -> _)).toMap withProblem = statusMap.collect { - case (name, processStatus @ ProcessState(_, _ @ProblemStateStatus(_, _), _, _, _, _, _, _, _, _)) => + case (name, processStatus @ ProcessState(_, _ @ProblemStateStatus(_, _), _, _, _, _, _, _, _, _, _, _, _, _)) => (name, processStatus) } } yield withProblem diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala index 6b29e1d02e5..b4857bb8597 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala @@ -10,28 +10,25 @@ import io.circe.generic.extras.semiauto.deriveConfiguredEncoder import io.circe.{Decoder, Encoder, Json, parser} import io.dropwizard.metrics5.MetricRegistry import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.graph.ScenarioGraph import pl.touk.nussknacker.engine.testmode.TestProcess._ -import pl.touk.nussknacker.restmodel.{CustomActionRequest, CustomActionResponse} -import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.AdhocTestParametersRequest -import pl.touk.nussknacker.ui.api.utils.ScenarioDetailsOps._ +import pl.touk.nussknacker.restmodel.{ + CustomActionRequest, + CustomActionResponse, + PerformSingleExecutionRequest, + PerformSingleExecutionResponse +} import pl.touk.nussknacker.ui.api.ProcessesResources.ProcessUnmarshallingError +import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.AdhocTestParametersRequest import pl.touk.nussknacker.ui.metrics.TimeMeasuring.measureTime import pl.touk.nussknacker.ui.process.ProcessService import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps -import pl.touk.nussknacker.ui.process.deployment.{ - CancelScenarioCommand, - CommonCommandData, - CustomActionCommand, - DeploymentManagerDispatcher, - DeploymentService, - RunDeploymentCommand -} +import pl.touk.nussknacker.ui.process.deployment._ import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider -import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.ui.process.test.{RawScenarioTestData, ResultsWithCounts, ScenarioTestService} import pl.touk.nussknacker.ui.security.api.LoggedUser @@ -278,6 +275,24 @@ class ManagementResources( ) } } + } ~ path("performSingleExecution" / ProcessNameSegment) { processName => + (post & processId(processName) & entity(as[PerformSingleExecutionRequest])) { (processIdWithName, req) => + canDeploy(processIdWithName) { + complete { + measureTime("singleExecution", metricRegistry) { + deploymentService + .processCommand( + PerformSingleExecutionCommand( + commonData = CommonCommandData(processIdWithName, req.comment.flatMap(Comment.from), user), + ) + ) + .flatMap(actionResult => + toHttpResponse(PerformSingleExecutionResponse(isSuccess = true, actionResult.msg))(StatusCodes.OK) + ) + } + } + } + } } } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/config/scenariotoolbar/ToolbarButtonConfig.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/config/scenariotoolbar/ToolbarButtonConfig.scala index 3fb8e94c823..c8c3ac0899c 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/config/scenariotoolbar/ToolbarButtonConfig.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/config/scenariotoolbar/ToolbarButtonConfig.scala @@ -43,9 +43,10 @@ object ToolbarButtonConfigType extends Enumeration { CustomLink ) - val ProcessSave: Value = Value("process-save") - val ProcessCancel: Value = Value("process-cancel") - val ProcessDeploy: Value = Value("process-deploy") + val ProcessSave: Value = Value("process-save") + val ProcessCancel: Value = Value("process-cancel") + val ProcessDeploy: Value = Value("process-deploy") + val ProcessPerformSingleExecution: Value = Value("process-perform-single-execution") val EditUndo: Value = Value("edit-undo") val EditRedo: Value = Value("edit-redo") diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index 8a6c8a78fc4..fee8ccbce62 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -75,9 +75,10 @@ class DeploymentService( def processCommand[Result](command: ScenarioCommand[Result]): Future[Result] = { command match { - case command: RunDeploymentCommand => runDeployment(command) - case command: CancelScenarioCommand => cancelScenario(command) - case command: CustomActionCommand => processCustomAction(command) + case command: RunDeploymentCommand => runDeployment(command) + case command: CancelScenarioCommand => cancelScenario(command) + case command: PerformSingleExecutionCommand => processSingleExecution(command) + case command: CustomActionCommand => processCustomAction(command) } } @@ -311,7 +312,8 @@ class DeploymentService( )(implicit user: LoggedUser): List[CustomActionDefinition] = { val fixedActionDefinitions = List( CustomActionDefinition(ScenarioActionName.Deploy, Nil, Nil, None), - CustomActionDefinition(ScenarioActionName.Cancel, Nil, Nil, None) + CustomActionDefinition(ScenarioActionName.Cancel, Nil, Nil, None), + CustomActionDefinition(ScenarioActionName.PerformSingleExecution, Nil, Nil, None) ) val actionsDefinedInCustomActions = dispatcher .deploymentManagerUnsafe(processingType) @@ -489,6 +491,8 @@ class DeploymentService( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName] )(implicit freshnessPolicy: DataFreshnessPolicy, user: LoggedUser): DB[ProcessState] = { + val processVersionId = processDetails.processVersionId + val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId) dispatcher .deploymentManager(processDetails.processingType) .map { manager => @@ -499,18 +503,34 @@ class DeploymentService( } else if (inProgressActionNames.contains(ScenarioActionName.Deploy)) { logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.DuringDeploy}") DBIOAction.successful( - manager.processStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.DuringDeploy, None)) + manager.processStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.DuringDeploy, None), + processVersionId, + deployedVersionId, + ) ) } else if (inProgressActionNames.contains(ScenarioActionName.Cancel)) { logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.DuringCancel}") DBIOAction.successful( - manager.processStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.DuringCancel, None)) + manager.processStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.DuringCancel, None), + processVersionId, + deployedVersionId, + ) ) } else { processDetails.lastStateAction match { case Some(_) => DBIOAction - .from(getStateFromDeploymentManager(manager, processDetails.idWithName, processDetails.lastStateAction)) + .from( + getStateFromDeploymentManager( + manager, + processDetails.idWithName, + processDetails.lastStateAction, + processVersionId, + deployedVersionId, + ) + ) .map { statusWithFreshness => logger.debug( s"Status for: '${processDetails.name}' is: ${statusWithFreshness.value.status}, cached: ${statusWithFreshness.cached}, last status action: ${processDetails.lastStateAction @@ -521,40 +541,62 @@ class DeploymentService( case _ => // We assume that the process never deployed should have no state at the engine logger.debug(s"Status for never deployed: '${processDetails.name}' is: ${SimpleStateStatus.NotDeployed}") DBIOAction.successful( - manager.processStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.NotDeployed, None)) + manager.processStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.NotDeployed, None), + processVersionId, + deployedVersionId, + ) ) } } } - .getOrElse(DBIOAction.successful(SimpleProcessStateDefinitionManager.ErrorFailedToGet)) + .getOrElse( + DBIOAction.successful(SimpleProcessStateDefinitionManager.errorFailedToGet(processVersionId)) + ) } // We assume that checking the state for archived doesn't make sense, and we compute the state based on the last state action private def getArchivedProcessState( processDetails: ScenarioWithDetailsEntity[_] )(implicit manager: DeploymentManager) = { + val processVersionId = processDetails.processVersionId + val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId) processDetails.lastStateAction.map(a => (a.actionName, a.state)) match { case Some((Cancel, _)) => logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.Canceled}") DBIOAction.successful( - manager.processStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.Canceled, None)) + manager.processStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.Canceled, None), + processVersionId, + deployedVersionId, + ) ) case Some((Deploy, ProcessActionState.ExecutionFinished)) => logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.Finished} ") DBIOAction.successful( - manager.processStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.Finished, None)) + manager.processStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.Finished, None), + processVersionId, + deployedVersionId, + ) ) case Some(_) => logger.warn(s"Status for: '${processDetails.name}' is: ${ProblemStateStatus.ArchivedShouldBeCanceled}") DBIOAction.successful( manager.processStateDefinitionManager.processState( - StatusDetails(ProblemStateStatus.ArchivedShouldBeCanceled, None) + StatusDetails(ProblemStateStatus.ArchivedShouldBeCanceled, None), + processVersionId, + deployedVersionId, ) ) case _ => logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.NotDeployed}") DBIOAction.successful( - manager.processStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.NotDeployed, None)) + manager.processStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.NotDeployed, None), + processVersionId, + deployedVersionId, + ) ) } } @@ -569,19 +611,23 @@ class DeploymentService( private def getStateFromDeploymentManager( deploymentManager: DeploymentManager, processIdWithName: ProcessIdWithName, - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], )( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[ProcessState]] = { - val state = deploymentManager.getProcessState(processIdWithName, lastStateAction).recover { case NonFatal(e) => - logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) - failedToGetProcessState - } + val state = deploymentManager + .getProcessState(processIdWithName, lastStateAction, latestVersionId, deployedVersionId) + .recover { case NonFatal(e) => + logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) + failedToGetProcessState(latestVersionId) + } scenarioStateTimeout .map { timeout => - state.withTimeout(timeout, timeoutResult = failedToGetProcessState).map { + state.withTimeout(timeout, timeoutResult = failedToGetProcessState(latestVersionId)).map { case CompletedNormally(value) => value case CompletedByTimeout(value) => @@ -653,8 +699,8 @@ class DeploymentService( } } - private lazy val failedToGetProcessState = - WithDataFreshnessStatus.fresh(SimpleProcessStateDefinitionManager.ErrorFailedToGet) + private def failedToGetProcessState(versionId: VersionId) = + WithDataFreshnessStatus.fresh(SimpleProcessStateDefinitionManager.errorFailedToGet(versionId)) // It is very naive implementation for situation when designer was restarted after spawning some long running action // like deploy but before marking it as finished. Without this, user will always see "during deploy" status - even @@ -666,13 +712,46 @@ class DeploymentService( Await.result(dbioRunner.run(actionRepository.deleteInProgressActions()), 10 seconds) } + private def processSingleExecution(command: PerformSingleExecutionCommand): Future[SingleExecutionResult] = { + processAction( + command = command, + actionName = ScenarioActionName.PerformSingleExecution, + actionParams = Map.empty, + dmCommandCreator = ctx => + DMPerformSingleExecutionCommand( + ctx.latestScenarioDetails.toEngineProcessVersion, + ctx.latestScenarioDetails.json, + command.commonData.user.toManagerUser, + ) + ) + } + + private def processCustomAction(command: CustomActionCommand): Future[CustomActionResult] = { + processAction( + command = command, + actionName = command.actionName, + actionParams = command.params, + dmCommandCreator = ctx => + DMCustomActionCommand( + command.actionName, + ctx.latestScenarioDetails.toEngineProcessVersion, + ctx.latestScenarioDetails.json, + command.commonData.user.toManagerUser, + command.params + ) + ) + } + // TODO: further changes // - block two concurrent custom actions - see ManagementResourcesConcurrentSpec // - better comment validation - private def processCustomAction(command: CustomActionCommand): Future[CustomActionResult] = { + private def processAction[COMMAND <: ScenarioCommand[RESULT], RESULT]( + command: COMMAND, + actionName: ScenarioActionName, + actionParams: Map[String, String], + dmCommandCreator: CommandContext[CanonicalProcess] => DMScenarioCommand[RESULT], + ): Future[RESULT] = { import command.commonData._ - val actionName: ScenarioActionName = command.actionName - val actionParams: Map[String, String] = command.params for { validatedComment <- validateDeploymentComment(comment) ctx <- prepareCommandContextWithAction[CanonicalProcess]( @@ -682,13 +761,7 @@ class DeploymentService( p => Some(p.processVersionId), _ => None ) - dmCommand = DMCustomActionCommand( - actionName, - ctx.latestScenarioDetails.toEngineProcessVersion, - ctx.latestScenarioDetails.json, - user.toManagerUser, - actionParams - ) + dmCommand = dmCommandCreator(ctx) actionResult <- runActionAndHandleResults( actionName, validatedComment, diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioCommand.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioCommand.scala index 52a0eff7095..7d0050d7bc4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioCommand.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioCommand.scala @@ -1,11 +1,11 @@ package pl.touk.nussknacker.ui.process.deployment +import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy -import pl.touk.nussknacker.engine.api.deployment.{DMScenarioCommand, ScenarioActionName} +import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName import pl.touk.nussknacker.engine.api.process.ProcessIdWithName -import pl.touk.nussknacker.engine.deployment.{CustomActionResult, ExternalDeploymentId} -import pl.touk.nussknacker.engine.api.Comment +import pl.touk.nussknacker.engine.deployment.{CustomActionResult, ExternalDeploymentId, SingleExecutionResult} import pl.touk.nussknacker.ui.security.api.LoggedUser import scala.concurrent.Future @@ -34,6 +34,10 @@ case class CustomActionCommand( params: Map[String, String], ) extends ScenarioCommand[CustomActionResult] +case class PerformSingleExecutionCommand( + commonData: CommonCommandData, +) extends ScenarioCommand[SingleExecutionResult] + // TODO CancelScenarioCommand will be legacy in some future because it operates on the scenario level instead of deployment level - // we should replace it by command operating on deployment case class CancelScenarioCommand(commonData: CommonCommandData) extends ScenarioCommand[Unit] diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala index 501dd6ba3ca..a62ad8b1bdc 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.ui.process.processingtype import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.CustomActionDefinition import pl.touk.nussknacker.ui.process.exception.ProcessIllegalAction @@ -28,9 +28,11 @@ object InvalidDeploymentManagerStub extends DeploymentManager { override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = { - Future.successful(processStateDefinitionManager.processState(stubbedStatus)) + Future.successful(processStateDefinitionManager.processState(stubbedStatus, latestVersionId, deployedVersionId)) } override def processStateDefinitionManager: ProcessStateDefinitionManager = SimpleProcessStateDefinitionManager diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ScenarioActionRepository.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ScenarioActionRepository.scala index 287f44bceca..e4afa1efc28 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ScenarioActionRepository.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ScenarioActionRepository.scala @@ -7,7 +7,6 @@ import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.deployment.ProcessActionState.ProcessActionState import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, ProcessingType, VersionId} -import pl.touk.nussknacker.engine.management.periodic.InstantBatchCustomAction import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap import pl.touk.nussknacker.ui.app.BuildInfo import pl.touk.nussknacker.ui.db.entity.{ @@ -291,7 +290,7 @@ class DbScenarioActionRepository private ( ScenarioActivityType.ScenarioPaused case ScenarioActionName.Rename => ScenarioActivityType.ScenarioNameChanged - case InstantBatchCustomAction.name => + case ScenarioActionName.PerformSingleExecution => ScenarioActivityType.PerformedSingleExecution case otherCustomName => ScenarioActivityType.CustomAction(otherCustomName.value) @@ -519,7 +518,7 @@ class DbScenarioActionRepository private ( case ScenarioActivityType.OutgoingMigration => None case ScenarioActivityType.PerformedSingleExecution => - Some(InstantBatchCustomAction.name) + Some(ScenarioActionName.PerformSingleExecution) case ScenarioActivityType.PerformedScheduledExecution => None case ScenarioActivityType.AutomaticUpdate => diff --git a/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala b/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala index bc9c38f453f..4db2f05e841 100644 --- a/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala +++ b/designer/server/src/test/scala/db/migration/V1_057__MigrateActionsAndCommentsToScenarioActivities.scala @@ -13,7 +13,6 @@ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessAdditionalFields, RequestResponseMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.management.periodic.InstantBatchCustomAction import pl.touk.nussknacker.restmodel.component.ScenarioComponentsUsages import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting import pl.touk.nussknacker.test.base.it.NuItTest @@ -230,7 +229,7 @@ class V1_057__MigrateActionsAndCommentsToScenarioActivities } "migrate custom action 'run now' with comment to scenario_activities table" in { testMigratingActionWithComment( - scenarioActionName = InstantBatchCustomAction.name, + scenarioActionName = ScenarioActionName.PerformSingleExecution, actionComment = Some("Run now: Deployed at the request of business"), expectedActivity = (sid, sad, user, date, sv) => ScenarioActivity.PerformedSingleExecution( diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index 0c01b8408aa..9e85e9f8acd 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{ @@ -26,13 +26,7 @@ import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, PatientScalaFutures} import pl.touk.nussknacker.ui.listener.ProcessChangeListener import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions._ -import pl.touk.nussknacker.ui.process.deployment.{ - CommonCommandData, - DeploymentManagerDispatcher, - DeploymentService, - RunDeploymentCommand, - ScenarioResolver -} +import pl.touk.nussknacker.ui.process.deployment._ import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction import pl.touk.nussknacker.ui.process.repository.{ @@ -178,11 +172,20 @@ class NotificationServiceTest } private val notDeployed = - SimpleProcessStateDefinitionManager.processState(StatusDetails(SimpleStateStatus.NotDeployed, None)) + SimpleProcessStateDefinitionManager.processState( + StatusDetails(SimpleStateStatus.NotDeployed, None), + VersionId(1), + None + ) private def createServices(deploymentManager: DeploymentManager) = { when( - deploymentManager.getProcessState(any[ProcessIdWithName], any[Option[ProcessAction]])(any[DataFreshnessPolicy]) + deploymentManager.getProcessState( + any[ProcessIdWithName], + any[Option[ProcessAction]], + any[VersionId], + any[Option[VersionId]] + )(any[DataFreshnessPolicy]) ) .thenReturn(Future.successful(WithDataFreshnessStatus.fresh(notDeployed))) val managerDispatcher = mock[DeploymentManagerDispatcher] diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala index 82b6fbe527a..ab0d8f6a72e 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala @@ -4,12 +4,11 @@ import com.typesafe.config.ConfigFactory import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateDefinitionDetails.UnknownIcon import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessingType, Source, SourceFactory} -import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.deployment.EngineSetupName import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.security.Permission @@ -158,8 +157,8 @@ class ProcessStateDefinitionServiceSpec extends AnyFunSuite with Matchers { } private val emptyStateDefinitionManager = new ProcessStateDefinitionManager { - override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty - override def statusActions(stateStatus: StateStatus): List[ScenarioActionName] = Nil + override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty + override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = Nil } private def createProcessingTypeDataMap( diff --git a/docs/Changelog.md b/docs/Changelog.md index 3cfb9f12bf7..94fb74a9672 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -15,6 +15,9 @@ * [#7123](https://github.com/TouK/nussknacker/pull/7123) Fix deployments for scenarios with dict editors after model reload * [#7162](https://github.com/TouK/nussknacker/pull/7162) Component API enhancement: ability to access information about expression parts used in SpEL template +* [#7165](https://github.com/TouK/nussknacker/pull/7165) Added PerformSingleExecution scenario action + * Added support for PerformSingleExecution action in DeploymentManager and in GUI + * Improved scenario state management to include information about current and deployed versions and allow more customization ## 1.18 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 8c04b9c82d2..2236bf23588 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -9,6 +9,18 @@ To see the biggest differences please consult the [changelog](Changelog.md). * [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation * We lost support for old ConsumerRecord constructor supported by Flink 1.14 / 1.15 * If you used Kafka source/sink components in your scenarios then state of these scenarios won't be restored +* [#7165](https://github.com/TouK/nussknacker/pull/7165) + * `pl.touk.nussknacker.engine.api.deployment.DeploymentManager`: + * new command `DMPerformSingleExecutionCommand`, which must be handled in `DeploymentManager.processCommand` method + (handle it the same as `DMCustomActionCommand` with actionName=`run now`) + * added new arguments to `def resolve` and `getProcessState` methods (`latestVersionId: VersionId`, `deployedVersionId: Option[VersionId]`), which will be provided by Nu when invoking this method + * `pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager`: + * added new arguments to `def processState` method (`latestVersionId: VersionId`, `deployedVersionId: Option[VersionId]`) + * added new methods with default implementations: + * `def visibleActions: List[ScenarioActionName]` - allows to specify, which actions are applicable to scenario (and consequently should be visible in Designer), by default all previously available actions + * `def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, ScenarioActionTooltip]` - allows to define custom tooltips for actions (from predefined list of tooltips handled by Designer), if not defined the default is still used + * modified method: + * `def statusActions(processStatus: ProcessStatus): List[ScenarioActionName]` - changed argument, to include information about latest and deployed versions ## In version 1.18.0 diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 967fe9c3c01..c6603dceec5 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -101,9 +101,10 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode case DMCancelDeploymentCommand(name, _, user) => // TODO: cancelling specific deployment cancelScenario(DMCancelScenarioCommand(name, user)) - case command: DMCancelScenarioCommand => cancelScenario(command) - case command: DMCustomActionCommand => invokeCustomAction(command) - case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) + case command: DMCancelScenarioCommand => cancelScenario(command) + case command: DMCustomActionCommand => invokeCustomAction(command) + case command: DMPerformSingleExecutionCommand => performSingleExecution(command) + case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => flinkTestRunner.test(canonicalProcess, scenarioTestData) // it's just for streaming e2e tests from file purposes } @@ -182,6 +183,10 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode } } + private def performSingleExecution(command: DMPerformSingleExecutionCommand): Future[SingleExecutionResult] = { + notImplemented + } + override def close(): Unit = {} private def changeState(name: ProcessName, stateStatus: StateStatus): Unit = diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala index 2ff668319c0..bd0ca90b5b0 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala @@ -1,14 +1,9 @@ package pl.touk.nussknacker.development.manager +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateDefinitionDetails.UnknownIcon import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName -import pl.touk.nussknacker.engine.api.deployment.{ - OverridingProcessStateDefinitionManager, - ProcessStateDefinitionManager, - ScenarioActionName, - StateDefinitionDetails, - StateStatus -} +import pl.touk.nussknacker.engine.api.deployment._ class DevelopmentProcessStateDefinitionManager(delegate: ProcessStateDefinitionManager) extends OverridingProcessStateDefinitionManager( @@ -27,7 +22,7 @@ object DevelopmentStateStatus { val PreparingResourcesActionName: ScenarioActionName = ScenarioActionName("PREPARING") val TestActionName: ScenarioActionName = ScenarioActionName("TEST") - val statusActionsPF: PartialFunction[StateStatus, List[ScenarioActionName]] = { + val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = _.stateStatus match { case DevelopmentStateStatus.AfterRunningStatus => List(ScenarioActionName.Cancel) case DevelopmentStateStatus.PreparingResourcesStatus => List(ScenarioActionName.Deploy) case DevelopmentStateStatus.TestStatus => List(ScenarioActionName.Deploy) diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index dc2233e024c..ccae3a0a742 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.definition.{NotBlankParameterValidator, StringParameterEditor} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, CustomActionParameter, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig} import pl.touk.nussknacker.engine.newdeployment.DeploymentId @@ -63,9 +63,13 @@ object MockableDeploymentManagerProvider { override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = { - Future.successful(processStateDefinitionManager.processState(statusDetails.head)) + Future.successful( + processStateDefinitionManager.processState(statusDetails.head, latestVersionId, deployedVersionId) + ) } override def processStateDefinitionManager: ProcessStateDefinitionManager = diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala index c2f3d218673..0ece0cb1173 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala @@ -11,12 +11,25 @@ import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, Ren import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.component.{BoundedStreamComponent, ComponentDefinition} import pl.touk.nussknacker.engine.api.context.ValidationContext -import pl.touk.nussknacker.engine.api.context.transformation.{DefinedLazyParameter, NodeDependencyValue, SingleInputDynamicComponent} -import pl.touk.nussknacker.engine.api.definition.{NodeDependency, OutputVariableNameDependency, Parameter, SpelTemplateParameterEditor} +import pl.touk.nussknacker.engine.api.context.transformation.{ + DefinedLazyParameter, + NodeDependencyValue, + SingleInputDynamicComponent +} +import pl.touk.nussknacker.engine.api.definition.{ + NodeDependency, + OutputVariableNameDependency, + Parameter, + SpelTemplateParameterEditor +} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.typed.typing.Typed import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.flink.api.process.{AbstractOneParamLazyParameterFunction, FlinkCustomNodeContext, FlinkCustomStreamTransformation} +import pl.touk.nussknacker.engine.flink.api.process.{ + AbstractOneParamLazyParameterFunction, + FlinkCustomNodeContext, + FlinkCustomStreamTransformation +} import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.graph.expression.Expression @@ -25,7 +38,11 @@ import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.util.test.TestScenarioRunner import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage -class SpelTemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers with ValidatedValuesDetailedMessage { +class SpelTemplateLazyParameterTest + extends AnyFunSuite + with FlinkSpec + with Matchers + with ValidatedValuesDetailedMessage { private lazy val runner = TestScenarioRunner .flinkBased(ConfigFactory.empty(), flinkMiniCluster) @@ -94,24 +111,26 @@ object SpelTemplatePartsCustomTransformer params.extractUnsafe[LazyParameter[TemplateEvaluationResult]](spelTemplateParameterName) FlinkCustomStreamTransformation { (dataStream: DataStream[Context], flinkCustomNodeContext: FlinkCustomNodeContext) => - dataStream.flatMap( - new AbstractOneParamLazyParameterFunction[TemplateEvaluationResult]( - templateLazyParam, - flinkCustomNodeContext.lazyParameterHelper - ) with FlatMapFunction[Context, ValueWithContext[String]] { - override def flatMap(value: Context, out: Collector[ValueWithContext[String]]): Unit = { - collectHandlingErrors(value, out) { - val templateResult = evaluateParameter(value) - val result = templateResult.renderedParts.map { - case RenderedLiteral(value) => s"[$value]-literal" - case RenderedSubExpression(value) => s"[$value]-subexpression" - }.mkString - ValueWithContext(result, value) + dataStream + .flatMap( + new AbstractOneParamLazyParameterFunction[TemplateEvaluationResult]( + templateLazyParam, + flinkCustomNodeContext.lazyParameterHelper + ) with FlatMapFunction[Context, ValueWithContext[String]] { + override def flatMap(value: Context, out: Collector[ValueWithContext[String]]): Unit = { + collectHandlingErrors(value, out) { + val templateResult = evaluateParameter(value) + val result = templateResult.renderedParts.map { + case RenderedLiteral(value) => s"[$value]-literal" + case RenderedSubExpression(value) => s"[$value]-subexpression" + }.mkString + ValueWithContext(result, value) + } } - } - }, - flinkCustomNodeContext.valueWithContextInfo.forClass[String] - ).asInstanceOf[DataStream[ValueWithContext[AnyRef]]] + }, + flinkCustomNodeContext.valueWithContextInfo.forClass[String] + ) + .asInstanceOf[DataStream[ValueWithContext[AnyRef]]] } } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala index 849ce88392b..bd64c7e8bd8 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala @@ -58,7 +58,6 @@ class FlinkPeriodicDeploymentManagerProvider extends DeploymentManagerProvider w modelData = modelData, EmptyPeriodicProcessListenerFactory, DefaultAdditionalDeploymentDataProvider, - new WithRunNowPeriodicCustomActionsProviderFactory, dependencies ) } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicCustomActionsProviderFactory.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicCustomActionsProviderFactory.scala deleted file mode 100644 index 648d8cb29b3..00000000000 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicCustomActionsProviderFactory.scala +++ /dev/null @@ -1,40 +0,0 @@ -package pl.touk.nussknacker.engine.management.periodic - -import pl.touk.nussknacker.engine.api.deployment.DMCustomActionCommand -import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, CustomActionResult} -import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository - -import scala.concurrent.Future - -trait PeriodicCustomActionsProviderFactory { - - def create( - periodicProcessesRepository: PeriodicProcessesRepository, - periodicProcessService: PeriodicProcessService - ): PeriodicCustomActionsProvider - -} - -object PeriodicCustomActionsProviderFactory { - def noOp: PeriodicCustomActionsProviderFactory = (_: PeriodicProcessesRepository, _: PeriodicProcessService) => - EmptyPeriodicCustomActionsProvider -} - -trait PeriodicCustomActionsProvider { - def customActions: List[CustomActionDefinition] - - def invokeCustomAction( - actionRequest: DMCustomActionCommand - ): Future[CustomActionResult] - -} - -object EmptyPeriodicCustomActionsProvider extends PeriodicCustomActionsProvider { - override def customActions: List[CustomActionDefinition] = Nil - - override def invokeCustomAction( - actionRequest: DMCustomActionCommand - ): Future[CustomActionResult] = - Future.failed(new NotImplementedError()) - -} diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index dae199ecf46..e7eedd55b13 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -1,15 +1,20 @@ package pl.touk.nussknacker.engine.management.periodic +import cats.data.OptionT import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, ExternalDeploymentId} +import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, ExternalDeploymentId, SingleExecutionResult} import pl.touk.nussknacker.engine.management.FlinkConfig import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.PeriodicProcessStatus import pl.touk.nussknacker.engine.management.periodic.Utils.{createActorWithRetry, runSafely} -import pl.touk.nussknacker.engine.management.periodic.db.{DbInitializer, SlickPeriodicProcessesRepository} +import pl.touk.nussknacker.engine.management.periodic.db.{ + DbInitializer, + PeriodicProcessesRepository, + SlickPeriodicProcessesRepository +} import pl.touk.nussknacker.engine.management.periodic.flink.FlinkJarManager import pl.touk.nussknacker.engine.management.periodic.service.{ AdditionalDeploymentDataProvider, @@ -35,7 +40,6 @@ object PeriodicDeploymentManager { modelData: BaseModelData, listenerFactory: PeriodicProcessListenerFactory, additionalDeploymentDataProvider: AdditionalDeploymentDataProvider, - customActionsProviderFactory: PeriodicCustomActionsProviderFactory, dependencies: DeploymentManagerDependencies ): PeriodicDeploymentManager = { import dependencies._ @@ -75,8 +79,6 @@ object PeriodicDeploymentManager { dependencies.actorSystem ) - val customActionsProvider = customActionsProviderFactory.create(scheduledProcessesRepository, service) - val toClose = () => { runSafely(listener.close()) // deploymentActor and rescheduleFinishedActor just call methods from PeriodicProcessService on interval, @@ -88,8 +90,8 @@ object PeriodicDeploymentManager { new PeriodicDeploymentManager( delegate, service, + scheduledProcessesRepository, schedulePropertyExtractorFactory(originalConfig), - customActionsProvider, toClose ) } @@ -99,21 +101,23 @@ object PeriodicDeploymentManager { class PeriodicDeploymentManager private[periodic] ( val delegate: DeploymentManager, service: PeriodicProcessService, + repository: PeriodicProcessesRepository, schedulePropertyExtractor: SchedulePropertyExtractor, - customActionsProvider: PeriodicCustomActionsProvider, toClose: () => Unit )(implicit val ec: ExecutionContext) extends DeploymentManager with ManagerSpecificScenarioActivitiesStoredByManager with LazyLogging { + import repository._ + override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match { - case command: DMValidateScenarioCommand => validate(command) - case command: DMRunDeploymentCommand => runDeployment(command) - case command: DMCancelScenarioCommand => cancelScenario(command) - case command: DMStopScenarioCommand => stopScenario(command) - case command: DMCustomActionCommand => customActionsProvider.invokeCustomAction(command) + case command: DMValidateScenarioCommand => validate(command) + case command: DMRunDeploymentCommand => runDeployment(command) + case command: DMCancelScenarioCommand => cancelScenario(command) + case command: DMStopScenarioCommand => stopScenario(command) + case command: DMPerformSingleExecutionCommand => actionInstantBatch(command) case _: DMTestScenarioCommand | _: DMCancelDeploymentCommand | _: DMStopDeploymentCommand | _: DMMakeScenarioSavepointCommand | _: DMCustomActionCommand => delegate.processCommand(command) @@ -197,13 +201,19 @@ class PeriodicDeploymentManager private[periodic] ( override def resolve( idWithName: ProcessIdWithName, statusDetailsList: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = { val statusDetails = statusDetailsList.head // TODO: add "real" presentation of deployments in GUI val mergedStatus = processStateDefinitionManager .processState( - statusDetails.copy(status = statusDetails.status.asInstanceOf[PeriodicProcessStatus].mergedStatusDetails.status) + statusDetails.copy(status = + statusDetails.status.asInstanceOf[PeriodicProcessStatus].mergedStatusDetails.status + ), + latestVersionId, + deployedVersionId ) Future.successful(mergedStatus.copy(tooltip = processStateDefinitionManager.statusTooltip(statusDetails.status))) } @@ -217,7 +227,7 @@ class PeriodicDeploymentManager private[periodic] ( delegate.close() } - override def customActionsDefinitions: List[CustomActionDefinition] = customActionsProvider.customActions + override def customActionsDefinitions: List[CustomActionDefinition] = List.empty // TODO We don't handle deployment synchronization on periodic DM because it currently uses it's own deployments and // its statuses synchronization mechanism (see PeriodicProcessService.synchronizeDeploymentsStates) @@ -244,4 +254,26 @@ class PeriodicDeploymentManager private[periodic] ( ): Future[List[ScenarioActivity]] = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName) + private def actionInstantBatch(command: DMPerformSingleExecutionCommand): Future[SingleExecutionResult] = { + val processName = command.processVersion.processName + val instantScheduleResult = instantSchedule(processName) + instantScheduleResult + .map(_ => SingleExecutionResult(s"Scenario ${processName.value} scheduled for immediate start")) + .getOrElse(SingleExecutionResult(s"Failed to schedule $processName to run as instant batch")) + } + + // TODO: Why we don't allow running not scheduled scenario? Maybe we can try to schedule it? + private def instantSchedule(processName: ProcessName): OptionT[Future, Unit] = for { + // schedule for immediate run + processDeployment <- OptionT( + service + .getLatestDeploymentsForActiveSchedules(processName) + .map(_.groupedByPeriodicProcess.headOption.flatMap(_.deployments.headOption)) + ) + processDeploymentWithProcessJson <- OptionT.liftF( + repository.findProcessData(processDeployment.id).run + ) + _ <- OptionT.liftF(service.deploy(processDeploymentWithProcessJson)) + } yield () + } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManager.scala index 30d913f0d3a..23959f36da4 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManager.scala @@ -1,8 +1,10 @@ package pl.touk.nussknacker.engine.management.periodic +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.defaultApplicableActions import pl.touk.nussknacker.engine.api.deployment.{ OverridingProcessStateDefinitionManager, ProcessStateDefinitionManager, + ScenarioActionName, StateStatus } import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.{DeploymentStatus, PeriodicProcessStatus} @@ -13,6 +15,8 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana statusTooltipsPF = PeriodicStateStatus.statusTooltipsPF, statusDescriptionsPF = PeriodicStateStatus.statusDescriptionsPF, customStateDefinitions = PeriodicStateStatus.customStateDefinitions, + customApplicableActions = Some(defaultApplicableActions ::: ScenarioActionName.PerformSingleExecution :: Nil), + customActionTooltips = Some(PeriodicStateStatus.customActionTooltips), delegate = delegate ) { diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicStateStatus.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicStateStatus.scala index ba14646c577..4fe9f84b705 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicStateStatus.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicStateStatus.scala @@ -1,9 +1,15 @@ package pl.touk.nussknacker.engine.management.periodic +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, StateDefinitionDetails, StateStatus} +import pl.touk.nussknacker.engine.api.deployment.{ + ScenarioActionName, + ScenarioActionTooltip, + StateDefinitionDetails, + StateStatus +} import java.net.URI import java.time.LocalDateTime @@ -33,12 +39,18 @@ object PeriodicStateStatus { val WaitingForScheduleStatus: StateStatus = StateStatus("WAITING_FOR_SCHEDULE") - val statusActionsPF: PartialFunction[StateStatus, List[ScenarioActionName]] = { - case SimpleStateStatus.Running => + val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = { + case ProcessStatus(SimpleStateStatus.Running, _, _) => List(ScenarioActionName.Cancel) // periodic processes cannot be redeployed from GUI - case _: ScheduledStatus => List(ScenarioActionName.Cancel, ScenarioActionName.Deploy) - case WaitingForScheduleStatus => List(ScenarioActionName.Cancel) // or maybe should it be empty?? - case _: ProblemStateStatus => List(ScenarioActionName.Cancel) // redeploy is not allowed + case ProcessStatus(_: ScheduledStatus, latestVersionId, deployedVersionId) + if deployedVersionId.contains(latestVersionId) => + List(ScenarioActionName.Cancel, ScenarioActionName.Deploy, ScenarioActionName.PerformSingleExecution) + case ProcessStatus(_: ScheduledStatus, _, _) => + List(ScenarioActionName.Cancel, ScenarioActionName.Deploy) + case ProcessStatus(WaitingForScheduleStatus, _, _) => + List(ScenarioActionName.Cancel) // or maybe should it be empty?? + case ProcessStatus(_: ProblemStateStatus, _, _) => + List(ScenarioActionName.Cancel) // redeploy is not allowed } val statusTooltipsPF: PartialFunction[StateStatus, String] = { case ScheduledStatus(nextRunAt) => @@ -64,4 +76,16 @@ object PeriodicStateStatus { ), ) + def customActionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, ScenarioActionTooltip] = { + processStatus match { + case ProcessStatus(_: ScheduledStatus, latestVersionId, deployedVersionId) + if deployedVersionId.contains(latestVersionId) => + Map.empty + case ProcessStatus(_: ScheduledStatus, _, _) => + Map(ScenarioActionName.PerformSingleExecution -> ScenarioActionTooltip.NotAllowedForDeployedVersion) + case ProcessStatus(_, _, _) => + Map(ScenarioActionName.PerformSingleExecution -> ScenarioActionTooltip.NotAllowedInCurrentState) + } + } + } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/WithRunNowPeriodicCustomActionsProviderFactory.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/WithRunNowPeriodicCustomActionsProviderFactory.scala deleted file mode 100644 index 9501316c622..00000000000 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/WithRunNowPeriodicCustomActionsProviderFactory.scala +++ /dev/null @@ -1,73 +0,0 @@ -package pl.touk.nussknacker.engine.management.periodic - -import cats.data.OptionT -import com.typesafe.scalalogging.LazyLogging -import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName -import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, CustomActionResult} -import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository -import pl.touk.nussknacker.engine.api.deployment.DMCustomActionCommand - -import java.net.URI -import scala.concurrent.{ExecutionContext, Future} - -class WithRunNowPeriodicCustomActionsProviderFactory extends PeriodicCustomActionsProviderFactory { - - override def create( - periodicProcessesRepository: PeriodicProcessesRepository, - service: PeriodicProcessService - ): PeriodicCustomActionsProvider = new PeriodicCustomActionsProvider with LazyLogging { - implicit val ec: ExecutionContext = ExecutionContext.global - import periodicProcessesRepository._ - - override def customActions: List[CustomActionDefinition] = List(InstantBatchCustomAction()) - - override def invokeCustomAction(actionRequest: DMCustomActionCommand): Future[CustomActionResult] = { - actionRequest.actionName match { - case InstantBatchCustomAction.name => actionInstantBatch(actionRequest) - case _ => Future.failed(new NotImplementedError()) - } - } - - private def actionInstantBatch(actionRequest: DMCustomActionCommand): Future[CustomActionResult] = { - val processName = actionRequest.processVersion.processName - val instantScheduleResult = instantSchedule(processName) - instantScheduleResult - .map(_ => CustomActionResult(s"Scenario ${processName.value} scheduled for immediate start")) - .getOrElse(CustomActionResult(s"Failed to schedule $processName to run as instant batch")) - } - - // TODO: Why we don't allow running not scheduled scenario? Maybe we can try to schedule it? - private def instantSchedule(processName: ProcessName): OptionT[Future, Unit] = for { - // schedule for immediate run - processDeployment <- OptionT( - service - .getLatestDeploymentsForActiveSchedules(processName) - .map(_.groupedByPeriodicProcess.headOption.flatMap(_.deployments.headOption)) - ) - processDeploymentWithProcessJson <- OptionT.liftF( - periodicProcessesRepository.findProcessData(processDeployment.id).run - ) - _ <- OptionT.liftF(service.deploy(processDeploymentWithProcessJson)) - } yield () - - } - -} - -//TODO: replace custom action with dedicated command in core services -case object InstantBatchCustomAction { - - // name is displayed as label under the button - val name: ScenarioActionName = ScenarioActionName("run now") - - def apply(): CustomActionDefinition = { - CustomActionDefinition( - actionName = name, - allowedStateStatusNames = List("SCHEDULED"), - icon = Some(new URI("/assets/custom-actions/batch-instant.svg")), - parameters = Nil - ) - } - -} diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala index 840b82342dc..08fc6865f63 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.management.periodic import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentId import pl.touk.nussknacker.engine.testing.StubbingCommands @@ -30,11 +30,15 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = Future.successful( processStateDefinitionManager.processState( - statusDetails.headOption.getOrElse(StatusDetails(SimpleStateStatus.NotDeployed, None)) + statusDetails.headOption.getOrElse(StatusDetails(SimpleStateStatus.NotDeployed, None)), + latestVersionId, + deployedVersionId ) ) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index 37f365c1d4d..e9c16c6cf78 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -81,13 +81,19 @@ class PeriodicDeploymentManagerTest val periodicDeploymentManager = new PeriodicDeploymentManager( delegate = delegateDeploymentManagerStub, service = periodicProcessService, + repository = repository, schedulePropertyExtractor = CronSchedulePropertyExtractor(), - EmptyPeriodicCustomActionsProvider, toClose = () => () ) - def getAllowedActions(statusDetails: StatusDetails): List[ScenarioActionName] = { - periodicDeploymentManager.processStateDefinitionManager.processState(statusDetails).allowedActions + def getAllowedActions( + statusDetails: StatusDetails, + latestVersionId: VersionId, + deployedVersionId: Option[VersionId] + ): List[ScenarioActionName] = { + periodicDeploymentManager.processStateDefinitionManager + .processState(statusDetails, latestVersionId, deployedVersionId) + .allowedActions } def getMergedStatusDetails: StatusDetails = @@ -124,9 +130,12 @@ class PeriodicDeploymentManagerTest val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe a[ScheduledStatus] - f.getAllowedActions(statusDetails) shouldBe List(ScenarioActionName.Cancel, ScenarioActionName.Deploy) + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List( + ScenarioActionName.Cancel, + ScenarioActionName.Deploy + ) f.periodicDeploymentManager - .getProcessState(idWithName, None) + .getProcessState(idWithName, None, processVersion.versionId, Some(processVersion.versionId)) .futureValue .value .status shouldBe a[ScheduledStatus] @@ -139,7 +148,10 @@ class PeriodicDeploymentManagerTest val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe a[ScheduledStatus] - f.getAllowedActions(statusDetails) shouldBe List(ScenarioActionName.Cancel, ScenarioActionName.Deploy) + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List( + ScenarioActionName.Cancel, + ScenarioActionName.Deploy + ) } test("getProcessState - should be finished when scenario finished and job finished on Flink") { @@ -153,7 +165,8 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) f.periodicProcessService.deactivate(processName).futureValue - val state = f.periodicDeploymentManager.getProcessState(idWithName, None).futureValue.value + val state = + f.periodicDeploymentManager.getProcessState(idWithName, None, processVersion.versionId, None).futureValue.value state.status shouldBe SimpleStateStatus.Finished state.allowedActions shouldBe List(ScenarioActionName.Deploy, ScenarioActionName.Archive, ScenarioActionName.Rename) @@ -166,7 +179,7 @@ class PeriodicDeploymentManagerTest val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe SimpleStateStatus.Running - f.getAllowedActions(statusDetails) shouldBe List(ScenarioActionName.Cancel) + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List(ScenarioActionName.Cancel) } test("getProcessState - should be waiting for reschedule if job finished on Flink but scenario is still deployed") { @@ -176,7 +189,7 @@ class PeriodicDeploymentManagerTest val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe WaitingForScheduleStatus - f.getAllowedActions(statusDetails) shouldBe List(ScenarioActionName.Cancel) + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List(ScenarioActionName.Cancel) } test("getProcessState - should be failed after unsuccessful deployment") { @@ -185,7 +198,7 @@ class PeriodicDeploymentManagerTest val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe ProblemStateStatus.Failed - f.getAllowedActions(statusDetails) shouldBe List(ScenarioActionName.Cancel) + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List(ScenarioActionName.Cancel) } test("deploy - should fail for invalid periodic property") { @@ -270,7 +283,7 @@ class PeriodicDeploymentManagerTest val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe ProblemStateStatus.Failed - f.getAllowedActions(statusDetails) shouldBe List(ScenarioActionName.Cancel) + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List(ScenarioActionName.Cancel) } test("should redeploy failed scenario") { @@ -279,7 +292,7 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) val statusDetailsBeforeRedeploy = f.getMergedStatusDetails statusDetailsBeforeRedeploy.status shouldBe ProblemStateStatus.Failed - f.getAllowedActions(statusDetailsBeforeRedeploy) shouldBe List( + f.getAllowedActions(statusDetailsBeforeRedeploy, processVersion.versionId, None) shouldBe List( ScenarioActionName.Cancel ) // redeploy is blocked in GUI but API allows it @@ -302,7 +315,10 @@ class PeriodicDeploymentManagerTest val statusDetailsAfterRedeploy = f.getMergedStatusDetails // Previous job is still visible as Failed. statusDetailsAfterRedeploy.status shouldBe a[ScheduledStatus] - f.getAllowedActions(statusDetailsAfterRedeploy) shouldBe List(ScenarioActionName.Cancel, ScenarioActionName.Deploy) + f.getAllowedActions(statusDetailsAfterRedeploy, processVersion.versionId, None) shouldBe List( + ScenarioActionName.Cancel, + ScenarioActionName.Deploy + ) } test("should redeploy scheduled scenario") { @@ -332,7 +348,7 @@ class PeriodicDeploymentManagerTest val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Running, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails - f.getAllowedActions(statusDetails) shouldBe List( + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List( ScenarioActionName.Cancel ) // redeploy is blocked in GUI but API allows it @@ -359,7 +375,7 @@ class PeriodicDeploymentManagerTest val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails - f.getAllowedActions(statusDetails) shouldBe List( + f.getAllowedActions(statusDetails, processVersion.versionId, None) shouldBe List( ScenarioActionName.Cancel ) // redeploy is blocked in GUI but API allows it diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManagerTest.scala index 05002ea839e..5d02d9df763 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessStateDefinitionManagerTest.scala @@ -2,16 +2,14 @@ package pl.touk.nussknacker.engine.management.periodic import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, ScenarioActionTooltip} +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.{DeploymentStatus, PeriodicProcessStatus} import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessStateDefinitionManager.statusTooltip -import pl.touk.nussknacker.engine.management.periodic.model.{ - PeriodicProcessDeploymentId, - PeriodicProcessDeploymentState, - PeriodicProcessDeploymentStatus, - PeriodicProcessId, - ScheduleId, - ScheduleName -} +import pl.touk.nussknacker.engine.management.periodic.PeriodicStateStatus.ScheduledStatus +import pl.touk.nussknacker.engine.management.periodic.model._ import java.time.LocalDateTime import java.util.concurrent.atomic.AtomicLong @@ -71,6 +69,40 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher |Schedule ${firstScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Deployed""".stripMargin } + test("not display custom tooltip for perform single execution when latest version is deployed") { + PeriodicStateStatus.customActionTooltips( + ProcessStatus( + stateStatus = ScheduledStatus(nextRunAt = LocalDateTime.now()), + latestVersionId = VersionId(5), + deployedVersionId = Some(VersionId(5)) + ) + ) shouldEqual Map.empty + } + + test("display custom tooltip for perform single execution when older version is deployed") { + PeriodicStateStatus.customActionTooltips( + ProcessStatus( + stateStatus = ScheduledStatus(nextRunAt = LocalDateTime.now()), + latestVersionId = VersionId(5), + deployedVersionId = Some(VersionId(4)) + ) + ) shouldEqual Map( + ScenarioActionName.PerformSingleExecution -> ScenarioActionTooltip.NotAllowedForDeployedVersion + ) + } + + test("display custom tooltip for perform single execution in CANCELED state") { + PeriodicStateStatus.customActionTooltips( + ProcessStatus( + stateStatus = SimpleStateStatus.Canceled, + latestVersionId = VersionId(5), + deployedVersionId = Some(VersionId(4)) + ) + ) shouldEqual Map( + ScenarioActionName.PerformSingleExecution -> ScenarioActionTooltip.NotAllowedInCurrentState + ) + } + private def generateDeploymentId = PeriodicProcessDeploymentId(nextDeploymentId.getAndIncrement()) private def generateScheduleId = ScheduleId(fooProcessId, generateScheduleName) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index b991cd32cba..3cf9a4370ba 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateR import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{ CustomActionDefinition, @@ -42,7 +42,9 @@ abstract class FlinkDeploymentManager( override def resolve( idWithName: ProcessIdWithName, statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction] + lastStateAction: Option[ProcessAction], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], ): Future[ProcessState] = { for { actionAfterPostprocessOpt <- postprocess(idWithName, statusDetails) @@ -50,7 +52,11 @@ abstract class FlinkDeploymentManager( statusDetails, actionAfterPostprocessOpt.orElse(lastStateAction) ) - } yield processStateDefinitionManager.processState(engineStateResolvedWithLastAction) + } yield processStateDefinitionManager.processState( + engineStateResolvedWithLastAction, + latestVersionId, + deployedVersionId + ) } // Flink has a retention for job overviews so we can't rely on this to distinguish between statuses: @@ -113,7 +119,8 @@ abstract class FlinkDeploymentManager( } case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => testRunner.test(canonicalProcess, scenarioTestData) - case command: DMCustomActionCommand => processCustomAction(command) + case command: DMCustomActionCommand => processCustomAction(command) + case _: DMPerformSingleExecutionCommand => notImplemented } private def validate(command: DMValidateScenarioCommand): Future[Unit] = { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala index d970936b631..b7453dc58a7 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.management +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName -import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus /** @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus */ object FlinkStateStatus { - val statusActionsPF: PartialFunction[StateStatus, List[ScenarioActionName]] = { + val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = _.stateStatus match { case SimpleStateStatus.DuringDeploy => List(ScenarioActionName.Cancel) case SimpleStateStatus.Restarting => List(ScenarioActionName.Cancel) } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala index 6db72d530f0..53035271f97 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala @@ -1,18 +1,21 @@ package pl.touk.nussknacker.engine.management +import org.scalatest.Inside import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers -import org.scalatest.Inside import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.deployment.{ProcessState, ScenarioActionName, StateStatus, StatusDetails} import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.deployment.{ProcessState, ScenarioActionName, StateStatus, StatusDetails} +import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId class FlinkProcessStateSpec extends AnyFunSpec with Matchers with Inside { def createProcessState(stateStatus: StateStatus): ProcessState = FlinkProcessStateDefinitionManager.processState( - StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12")), Some(ProcessVersion.empty)) + StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12")), Some(ProcessVersion.empty)), + VersionId(1), + None, ) it("scenario state should be during deploy") { diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 7a7242a36d4..0b9debc2d26 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -112,7 +112,7 @@ class EmbeddedDeploymentManager( case command: DMCancelScenarioCommand => cancelScenario(command) case command: DMTestScenarioCommand => testScenario(command) case _: DMStopDeploymentCommand | _: DMStopScenarioCommand | _: DMMakeScenarioSavepointCommand | - _: DMCustomActionCommand => + _: DMCustomActionCommand | _: DMPerformSingleExecutionCommand => notImplemented } diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala index 8191d11852c..6a875941123 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.embedded +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.deployment.{OverridingProcessStateDefinitionManager, ScenarioActionName} @@ -10,7 +11,7 @@ import pl.touk.nussknacker.engine.api.deployment.{OverridingProcessStateDefiniti object EmbeddedProcessStateDefinitionManager extends OverridingProcessStateDefinitionManager( delegate = SimpleProcessStateDefinitionManager, - statusActionsPF = { case SimpleStateStatus.Restarting => + statusActionsPF = { case ProcessStatus(SimpleStateStatus.Restarting, _, _) => List(ScenarioActionName.Cancel) } ) diff --git a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala index 5845a998dbc..030fb1f6e81 100644 --- a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala +++ b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap -import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} +import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies} import pl.touk.nussknacker.k8s.manager.K8sDeploymentManager._ import pl.touk.nussknacker.k8s.manager.K8sUtils.{sanitizeLabel, sanitizeObjectName, shortHash} import pl.touk.nussknacker.k8s.manager.deployment.K8sScalingConfig.DividingParallelismConfig @@ -114,7 +114,7 @@ class K8sDeploymentManager( case command: DMCancelScenarioCommand => cancelScenario(command) case command: DMTestScenarioCommand => testScenario(command) case _: DMCancelDeploymentCommand | _: DMStopDeploymentCommand | _: DMStopScenarioCommand | - _: DMMakeScenarioSavepointCommand | _: DMCustomActionCommand => + _: DMMakeScenarioSavepointCommand | _: DMCustomActionCommand | _: DMPerformSingleExecutionCommand => notImplemented } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessAction.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessAction.scala index ebb91e628c3..9e56e3e8426 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessAction.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessAction.scala @@ -1,8 +1,8 @@ package pl.touk.nussknacker.engine.api.deployment -import io.circe.{Decoder, Encoder} import io.circe.generic.JsonCodec import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder} +import io.circe.{Decoder, Encoder} import pl.touk.nussknacker.engine.api.deployment.ProcessActionState.ProcessActionState import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId} @@ -74,4 +74,20 @@ object ScenarioActionName { val DefaultActions: List[ScenarioActionName] = Nil val StateActions: Set[ScenarioActionName] = Set(Cancel, Deploy, Pause) + + // TODO: We kept the old name of "run now" CustomAction for compatibility reasons. + // In the future it can be changed to better name, according to convention, but that would require database migration + // In the meantime, there are methods serialize and deserialize, which operate on name PERFORM_SINGLE_EXECUTION instead. + val PerformSingleExecution: ScenarioActionName = ScenarioActionName("run now") + + def serialize(name: ScenarioActionName): String = name match { + case ScenarioActionName.PerformSingleExecution => "PERFORM_SINGLE_EXECUTION" + case other => other.value + } + + def deserialize(str: String): ScenarioActionName = str match { + case "PERFORM_SINGLE_EXECUTION" => ScenarioActionName.PerformSingleExecution + case other => ScenarioActionName(other) + } + } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala index 210fde024be..3ad7511aecc 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala @@ -4,6 +4,7 @@ import io.circe._ import io.circe.generic.JsonCodec import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName +import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import java.net.URI @@ -25,8 +26,13 @@ import java.net.URI @JsonCodec case class ProcessState( externalDeploymentId: Option[ExternalDeploymentId], status: StateStatus, + // TODO: Designed does not need the `version` field, it can be removed after confirming no other service depends on it version: Option[ProcessVersion], + latestVersionId: VersionId, + deployedVersionId: Option[VersionId], + visibleActions: List[ScenarioActionName], allowedActions: List[ScenarioActionName], + actionTooltips: Map[ScenarioActionName, ScenarioActionTooltip], icon: URI, tooltip: String, description: String, @@ -36,8 +42,21 @@ import java.net.URI ) object ProcessState { - implicit val uriEncoder: Encoder[URI] = Encoder.encodeString.contramap(_.toString) - implicit val uriDecoder: Decoder[URI] = Decoder.decodeString.map(URI.create) + implicit val uriEncoder: Encoder[URI] = Encoder.encodeString.contramap(_.toString) + implicit val uriDecoder: Decoder[URI] = Decoder.decodeString.map(URI.create) + implicit val scenarioVersionIdEncoder: Encoder[ScenarioVersionId] = Encoder.encodeLong.contramap(_.value) + implicit val scenarioVersionIdDecoder: Decoder[ScenarioVersionId] = Decoder.decodeLong.map(ScenarioVersionId.apply) + + implicit val scenarioActionNameEncoder: Encoder[ScenarioActionName] = + Encoder.encodeString.contramap(ScenarioActionName.serialize) + implicit val scenarioActionNameDecoder: Decoder[ScenarioActionName] = + Decoder.decodeString.map(ScenarioActionName.deserialize) + + implicit val scenarioActionNameKeyDecoder: KeyDecoder[ScenarioActionName] = + (key: String) => Some(ScenarioActionName.deserialize(key)) + implicit val scenarioActionNameKeyEncoder: KeyEncoder[ScenarioActionName] = (name: ScenarioActionName) => + ScenarioActionName.serialize(name) + } object StateStatus { @@ -76,3 +95,23 @@ case class StatusDetails( attributes: Option[Json] = None, errors: List[String] = List.empty ) + +sealed trait ScenarioActionTooltip + +object ScenarioActionTooltip { + case object NotAllowedInCurrentState extends ScenarioActionTooltip + case object NotAllowedForDeployedVersion extends ScenarioActionTooltip + + implicit val encoder: Encoder[ScenarioActionTooltip] = + Encoder.encodeString.contramap { + case NotAllowedInCurrentState => "NOT_ALLOWED_IN_CURRENT_STATE" + case NotAllowedForDeployedVersion => "NOT_ALLOWED_FOR_DEPLOYED_VERSION" + } + + implicit val decoder: Decoder[ScenarioActionTooltip] = + Decoder.decodeString.map { + case "NOT_ALLOWED_IN_CURRENT_STATE" => NotAllowedInCurrentState + case "NOT_ALLOWED_FOR_DEPLOYED_VERSION" => NotAllowedForDeployedVersion + } + +} diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/CustomActionDefinition.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/CustomActionDefinition.scala index 986521592d2..aacbb1101ba 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/CustomActionDefinition.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/deployment/CustomActionDefinition.scala @@ -32,3 +32,5 @@ case class CustomActionParameter( ) case class CustomActionResult(msg: String) + +case class SingleExecutionResult(msg: String)