diff --git a/front50-api/src/main/java/com/netflix/spinnaker/front50/api/model/pipeline/Pipeline.java b/front50-api/src/main/java/com/netflix/spinnaker/front50/api/model/pipeline/Pipeline.java index 8f57741f4..2c249dea8 100644 --- a/front50-api/src/main/java/com/netflix/spinnaker/front50/api/model/pipeline/Pipeline.java +++ b/front50-api/src/main/java/com/netflix/spinnaker/front50/api/model/pipeline/Pipeline.java @@ -21,9 +21,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; +@EqualsAndHashCode(of = {"id", "name", "application"}) public class Pipeline implements Timestamped { public static final String TYPE_TEMPLATED = "templatedPipeline"; diff --git a/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt b/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt index df32b09a5..10293250a 100644 --- a/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt +++ b/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt @@ -258,63 +258,58 @@ class SqlStorageService( } override fun storeObjects(objectType: ObjectType, allItems: Collection) { - // using a lower `chunkSize` to avoid exceeding default packet size limits. - allItems.chunked(100).forEach { items -> - try { - withPool(poolName) { - jooq.transactional(sqlRetryProperties.transactions) { ctx -> + withPool(poolName) { + jooq.transactional(sqlRetryProperties.transactions) { ctx -> + // using a lower `chunkSize` to avoid exceeding default packet size limits. + allItems.chunked(100).forEach { items -> + try { + ctx.batch( + items.map { item -> + val insertPairs = definitionsByType[objectType]!!.getInsertPairs( + objectMapper, item.id.toLowerCase(), item + ) + val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs) + + ctx.insertInto( + table(definitionsByType[objectType]!!.tableName), + *insertPairs.keys.map { field(it) }.toTypedArray() + ) + .values(insertPairs.values) + .onConflict(field("id", String::class.java)) + .doUpdate() + .set(updatePairs.mapKeys { field(it.key) }) + } + ).execute() + } catch (e: SQLDialectNotSupportedException) { + for (item in items) { + storeSingleObject(objectType, item.id.toLowerCase(), item) + } + } + + if (definitionsByType[objectType]!!.supportsHistory) { try { ctx.batch( items.map { item -> - val insertPairs = definitionsByType[objectType]!!.getInsertPairs( - objectMapper, item.id.toLowerCase(), item + val historyPairs = definitionsByType[objectType]!!.getHistoryPairs( + objectMapper, clock, item.id.toLowerCase(), item ) - val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs) - ctx.insertInto( - table(definitionsByType[objectType]!!.tableName), - *insertPairs.keys.map { field(it) }.toTypedArray() - ) - .values(insertPairs.values) - .onConflict(field("id", String::class.java)) - .doUpdate() - .set(updatePairs.mapKeys { field(it.key) }) + ctx + .insertInto( + table(definitionsByType[objectType]!!.historyTableName), + *historyPairs.keys.map { field(it) }.toTypedArray() + ) + .values(historyPairs.values) + .onDuplicateKeyIgnore() } ).execute() } catch (e: SQLDialectNotSupportedException) { for (item in items) { - storeSingleObject(objectType, item.id.toLowerCase(), item) - } - } - - if (definitionsByType[objectType]!!.supportsHistory) { - try { - ctx.batch( - items.map { item -> - val historyPairs = definitionsByType[objectType]!!.getHistoryPairs( - objectMapper, clock, item.id.toLowerCase(), item - ) - - ctx - .insertInto( - table(definitionsByType[objectType]!!.historyTableName), - *historyPairs.keys.map { field(it) }.toTypedArray() - ) - .values(historyPairs.values) - .onDuplicateKeyIgnore() - } - ).execute() - } catch (e: SQLDialectNotSupportedException) { - for (item in items) { - storeSingleObjectHistory(objectType, item.id.toLowerCase(), item) - } + storeSingleObjectHistory(objectType, item.id.toLowerCase(), item) } } } } - } catch (e: Exception) { - log.error("Unable to store objects (objectType: {}, objectKeys: {})", objectType, items.map { it.id }) - throw e } } } diff --git a/front50-sql/src/test/kotlin/com/netflix/spinnaker/front50/model/SqlStorageServiceTests.kt b/front50-sql/src/test/kotlin/com/netflix/spinnaker/front50/model/SqlStorageServiceTests.kt index dbd7db8c4..4d6c90ae1 100644 --- a/front50-sql/src/test/kotlin/com/netflix/spinnaker/front50/model/SqlStorageServiceTests.kt +++ b/front50-sql/src/test/kotlin/com/netflix/spinnaker/front50/model/SqlStorageServiceTests.kt @@ -30,6 +30,7 @@ import dev.minutest.junit.JUnit5Minutests import dev.minutest.rootContext import java.time.Clock import org.jooq.SQLDialect +import org.jooq.exception.DataAccessException import org.jooq.impl.DSL import org.jooq.impl.DSL.field import org.jooq.impl.DSL.table @@ -195,6 +196,37 @@ internal object SqlStorageServiceTests : JUnit5Minutests { } } + test("bulk create pipelines atomically") { + // verify that pipelines can be bulk created + val pipelines = (1..500).map { idx -> + Pipeline().apply { + id = "pipeline${idx}" + name = "pipeline${idx}" + lastModified = 100 + idx.toLong() + lastModifiedBy = "test" + setApplication("application") + } + } + + // set lastModifiedBy of one of the pipelines to null in order to force an error + // and make sure no pipelines are added since additions are done in a single transaction + pipelines[250].lastModifiedBy = null + expectThrows { + sqlStorageService.storeObjects(ObjectType.PIPELINE,pipelines) + expectThat( + jooq.selectCount().from("pipelines").fetchOne(0, Int::class.java) + ).isEqualTo(0) + } + + // Reset lastModifiedBy to ensure successful bulk creation + pipelines[250].lastModifiedBy = "test" + sqlStorageService.storeObjects(ObjectType.PIPELINE,pipelines) + + val storedPipelines = sqlStorageService.loadObjects(ObjectType.PIPELINE, pipelines.map { it.id }); + expectThat(storedPipelines.size).isEqualTo(500); + expectThat(storedPipelines.map { it.id }).isEqualTo(pipelines.map { it.id }) + } + var lastModifiedMs : Long = 100 test("loadObjects basic behavior") { val objectKeys = mutableSetOf() diff --git a/front50-web/src/main/java/com/netflix/spinnaker/front50/config/Front50WebConfig.java b/front50-web/src/main/java/com/netflix/spinnaker/front50/config/Front50WebConfig.java index 9f7e1cb1d..142cdb949 100644 --- a/front50-web/src/main/java/com/netflix/spinnaker/front50/config/Front50WebConfig.java +++ b/front50-web/src/main/java/com/netflix/spinnaker/front50/config/Front50WebConfig.java @@ -24,6 +24,7 @@ import com.netflix.spinnaker.filters.AuthenticatedRequestFilter; import com.netflix.spinnaker.front50.ItemDAOHealthIndicator; import com.netflix.spinnaker.front50.api.validator.PipelineValidator; +import com.netflix.spinnaker.front50.config.controllers.PipelineControllerConfig; import com.netflix.spinnaker.front50.model.application.ApplicationDAO; import com.netflix.spinnaker.front50.model.application.ApplicationPermissionDAO; import com.netflix.spinnaker.front50.model.delivery.DeliveryRepository; @@ -58,7 +59,10 @@ @EnableFiatAutoConfig @EnableScheduling @Import({PluginsAutoConfiguration.class}) -@EnableConfigurationProperties(StorageServiceConfigurationProperties.class) +@EnableConfigurationProperties({ + StorageServiceConfigurationProperties.class, + PipelineControllerConfig.class +}) public class Front50WebConfig extends WebMvcConfigurerAdapter { @Autowired private Registry registry; diff --git a/front50-web/src/main/java/com/netflix/spinnaker/front50/config/controllers/PipelineControllerConfig.java b/front50-web/src/main/java/com/netflix/spinnaker/front50/config/controllers/PipelineControllerConfig.java new file mode 100644 index 000000000..4a4a09e19 --- /dev/null +++ b/front50-web/src/main/java/com/netflix/spinnaker/front50/config/controllers/PipelineControllerConfig.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Salesforce, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.netflix.spinnaker.front50.config.controllers; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Data +@ConfigurationProperties(prefix = "controller.pipeline") +public class PipelineControllerConfig { + + /** Holds the configurations to be used for save/update controller mappings */ + private SavePipelineConfiguration save = new SavePipelineConfiguration(); + + @Data + public static class SavePipelineConfiguration { + /** This controls whether cache should be refreshes while checking for duplicate pipelines */ + boolean refreshCacheOnDuplicatesCheck = true; + } +} diff --git a/front50-web/src/main/java/com/netflix/spinnaker/front50/controllers/PipelineController.java b/front50-web/src/main/java/com/netflix/spinnaker/front50/controllers/PipelineController.java index 5282e1303..f47a09683 100644 --- a/front50-web/src/main/java/com/netflix/spinnaker/front50/controllers/PipelineController.java +++ b/front50-web/src/main/java/com/netflix/spinnaker/front50/controllers/PipelineController.java @@ -19,13 +19,16 @@ import static com.netflix.spinnaker.front50.model.pipeline.TemplateConfiguration.TemplateSource.SPINNAKER_PREFIX; import static java.lang.String.format; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; +import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator; import com.netflix.spinnaker.front50.ServiceAccountsService; import com.netflix.spinnaker.front50.api.model.pipeline.Pipeline; import com.netflix.spinnaker.front50.api.model.pipeline.Trigger; import com.netflix.spinnaker.front50.api.validator.PipelineValidator; import com.netflix.spinnaker.front50.api.validator.ValidatorErrors; +import com.netflix.spinnaker.front50.config.controllers.PipelineControllerConfig; import com.netflix.spinnaker.front50.exception.BadRequestException; import com.netflix.spinnaker.front50.exceptions.DuplicateEntityException; import com.netflix.spinnaker.front50.exceptions.InvalidEntityException; @@ -34,23 +37,32 @@ import com.netflix.spinnaker.front50.model.pipeline.PipelineTemplateDAO; import com.netflix.spinnaker.front50.model.pipeline.TemplateConfiguration; import com.netflix.spinnaker.front50.model.pipeline.V2TemplateConfiguration; +import com.netflix.spinnaker.kork.annotations.VisibleForTesting; import com.netflix.spinnaker.kork.web.exceptions.NotFoundException; import com.netflix.spinnaker.kork.web.exceptions.ValidationException; +import com.netflix.spinnaker.security.AuthenticatedRequest; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PostAuthorize; import org.springframework.security.access.prepost.PostFilter; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -69,18 +81,27 @@ public class PipelineController { private final Optional serviceAccountsService; private final List pipelineValidators; private final Optional pipelineTemplateDAO; + private final PipelineControllerConfig pipelineControllerConfig; + private final FiatPermissionEvaluator fiatPermissionEvaluator; + private final AuthorizationSupport authorizationSupport; public PipelineController( PipelineDAO pipelineDAO, ObjectMapper objectMapper, Optional serviceAccountsService, List pipelineValidators, - Optional pipelineTemplateDAO) { + Optional pipelineTemplateDAO, + PipelineControllerConfig pipelineControllerConfig, + FiatPermissionEvaluator fiatPermissionEvaluator, + AuthorizationSupport authorizationSupport) { this.pipelineDAO = pipelineDAO; this.objectMapper = objectMapper; this.serviceAccountsService = serviceAccountsService; this.pipelineValidators = pipelineValidators; this.pipelineTemplateDAO = pipelineTemplateDAO; + this.pipelineControllerConfig = pipelineControllerConfig; + this.fiatPermissionEvaluator = fiatPermissionEvaluator; + this.authorizationSupport = authorizationSupport; } @PreAuthorize("#restricted ? @fiatPermissionEvaluator.storeWholePermission() : true") @@ -237,33 +258,105 @@ public Pipeline getByApplicationAndName( } @PreAuthorize( - "@fiatPermissionEvaluator.storeWholePermission() and hasPermission(#pipeline.application, 'APPLICATION', 'WRITE') and @authorizationSupport.hasRunAsUserPermission(#pipeline)") + "@fiatPermissionEvaluator.storeWholePermission() " + + "and hasPermission(#pipeline.application, 'APPLICATION', 'WRITE') " + + "and @authorizationSupport.hasRunAsUserPermission(#pipeline)") @RequestMapping(value = "", method = RequestMethod.POST) public synchronized Pipeline save( @RequestBody Pipeline pipeline, @RequestParam(value = "staleCheck", required = false, defaultValue = "false") Boolean staleCheck) { - validatePipeline(pipeline, staleCheck); - - pipeline.setName(pipeline.getName().trim()); - ensureCronTriggersHaveIdentifier(pipeline); + long saveStartTime = System.currentTimeMillis(); + log.info( + "Received request to save pipeline {} in application {}", + pipeline.getName(), + pipeline.getApplication()); - if (Strings.isNullOrEmpty(pipeline.getId()) - || (boolean) pipeline.getAny().getOrDefault("regenerateCronTriggerIds", false)) { - // ensure that cron triggers are assigned a unique identifier for new pipelines - pipeline.getTriggers().stream() - .filter(it -> "cron".equals(it.getType())) - .forEach(it -> it.put("id", UUID.randomUUID().toString())); - } + log.debug("Running validation before saving pipeline {}", pipeline.getName()); + long validationStartTime = System.currentTimeMillis(); + validatePipeline(pipeline, staleCheck); + checkForDuplicatePipeline( + pipeline.getApplication(), pipeline.getName().trim(), pipeline.getId()); + log.debug( + "Successfully validated pipeline {} in {}ms", + pipeline.getName(), + System.currentTimeMillis() - validationStartTime); - return pipelineDAO.create(pipeline.getId(), pipeline); + Pipeline savedPipeline = pipelineDAO.create(pipeline.getId(), pipeline); + log.info( + "Successfully saved pipeline {} in application {} in {}ms", + savedPipeline.getName(), + savedPipeline.getApplication(), + System.currentTimeMillis() - saveStartTime); + return savedPipeline; } - @PreAuthorize("@fiatPermissionEvaluator.isAdmin()") + @PreAuthorize("@fiatPermissionEvaluator.storeWholePermission()") @RequestMapping(value = "batchUpdate", method = RequestMethod.POST) - public void batchUpdate(@RequestBody List pipelines) { - pipelineDAO.bulkImport(pipelines); + public Map batchUpdate( + @RequestBody List> pipelinesJson, + @RequestParam(value = "staleCheck", required = false, defaultValue = "false") + Boolean staleCheck) { + + long batchUpdateStartTime = System.currentTimeMillis(); + + log.debug( + "Deserializing the provided map of {} pipelines into a list of pipeline objects.", + pipelinesJson.size()); + + // The right side of the pair holds failed pipelines. This needs to be + // List> as opposed to List since some + // elements of pipelineJson may fail to deserialize into Pipeline objects. + ImmutablePair, List>> deserializedPipelines = + deSerializePipelines(pipelinesJson); + List pipelines = deserializedPipelines.getLeft(); + List> failedPipelines = deserializedPipelines.getRight(); + + log.info( + "Batch upserting the following pipelines: {}", + pipelines.stream().map(Pipeline::getName).collect(Collectors.toList())); + + List pipelinesToSave = new ArrayList<>(); + Map returnData = new HashMap<>(); + + // List of pipelines in the provided request body which don't adhere to the schema + List invalidPipelines = new ArrayList<>(); + validatePipelines(pipelines, staleCheck, pipelinesToSave, invalidPipelines); + + TypeReference> mapType = new TypeReference<>() {}; + failedPipelines.addAll( + invalidPipelines.stream() + .map((pipeline) -> objectMapper.convertValue(pipeline, mapType)) + .collect(Collectors.toList())); + + long bulkImportStartTime = System.currentTimeMillis(); + log.debug("Bulk importing the following pipelines: {}", pipelinesToSave); + pipelineDAO.bulkImport(pipelinesToSave); + log.debug( + "Bulk imported {} pipelines successfully in {}ms", + pipelinesToSave.size(), + System.currentTimeMillis() - bulkImportStartTime); + + List savedPipelines = + pipelinesToSave.stream().map(Pipeline::getName).collect(Collectors.toList()); + returnData.put("successful_pipelines_count", savedPipelines.size()); + returnData.put("successful_pipelines", savedPipelines); + returnData.put("failed_pipelines_count", failedPipelines.size()); + returnData.put("failed_pipelines", failedPipelines); + + if (!failedPipelines.isEmpty()) { + log.warn( + "Following pipelines were skipped during the bulk import since they had errors: {}", + failedPipelines.stream().map(p -> p.get("name")).collect(Collectors.toList())); + } + log.info( + "Batch updated the following {} pipelines in {}ms: {}", + savedPipelines.size(), + System.currentTimeMillis() - batchUpdateStartTime, + savedPipelines); + + return returnData; } @PreAuthorize("hasPermission(#application, 'APPLICATION', 'WRITE')") @@ -303,16 +396,58 @@ public Pipeline update( } validatePipeline(pipeline, staleCheck); + checkForDuplicatePipeline( + pipeline.getApplication(), pipeline.getName().trim(), pipeline.getId()); - pipeline.setName(pipeline.getName().trim()); pipeline.setLastModified(System.currentTimeMillis()); - ensureCronTriggersHaveIdentifier(pipeline); pipelineDAO.update(id, pipeline); return pipeline; } + /** + * Helper method to deserialize the given list of Pipeline maps into a list of Pipeline objects + * + * @param pipelinesMap List of pipeline maps + * @return Deserialized list of pipeline objects + */ + private ImmutablePair, List>> deSerializePipelines( + List> pipelinesMap) { + + List pipelines = new ArrayList<>(); + List> failedPipelines = new ArrayList<>(); + + log.trace("Deserializing the following pipeline maps into pipeline objects: {}", pipelinesMap); + pipelinesMap.forEach( + pipelineMap -> { + try { + Pipeline pipeline = objectMapper.convertValue(pipelineMap, Pipeline.class); + if (!authorizationSupport.hasRunAsUserPermission(pipeline)) { + String errorMessage = + String.format( + "Validation of runAsUser permissions for pipeline %s in the application %s failed.", + pipeline.getName(), pipeline.getApplication()); + log.error(errorMessage); + pipelineMap.put("errorMsg", errorMessage); + failedPipelines.add(pipelineMap); + } else { + pipelines.add(pipeline); + } + } catch (IllegalArgumentException e) { + log.error( + "Failed to deserialize pipeline map from the provided json: {}", pipelineMap, e); + pipelineMap.put( + "errorMsg", + String.format( + "Failed to deserialize the pipeline json into a valid pipeline: %s", e)); + failedPipelines.add(pipelineMap); + } + }); + + return ImmutablePair.of(pipelines, failedPipelines); + } + /** * Ensure basic validity of the pipeline. Invalid pipelines will raise runtime exceptions. * @@ -323,6 +458,7 @@ private void validatePipeline(final Pipeline pipeline, Boolean staleCheck) { if (StringUtils.isAnyBlank(pipeline.getApplication(), pipeline.getName())) { throw new InvalidEntityException("A pipeline requires name and application fields"); } + pipeline.setName(pipeline.getName().trim()); // Check if pipeline type is templated if (TYPE_TEMPLATED.equals(pipeline.getType())) { @@ -355,18 +491,29 @@ private void validatePipeline(final Pipeline pipeline, Boolean staleCheck) { } } - checkForDuplicatePipeline( - pipeline.getApplication(), pipeline.getName().trim(), pipeline.getId()); + ensureCronTriggersHaveIdentifier(pipeline); + + // Ensure cron trigger ids are regenerated if needed + if (Strings.isNullOrEmpty(pipeline.getId()) + || (boolean) pipeline.getAny().getOrDefault("regenerateCronTriggerIds", false)) { + // ensure that cron triggers are assigned a unique identifier for new pipelines + pipeline.getTriggers().stream() + .filter(it -> "cron".equals(it.getType())) + .forEach(it -> it.put("id", UUID.randomUUID().toString())); + } final ValidatorErrors errors = new ValidatorErrors(); - pipelineValidators.forEach(it -> it.validate(pipeline, errors)); + // Run stale pipeline definition check if (staleCheck && !Strings.isNullOrEmpty(pipeline.getId()) && pipeline.getLastModified() != null) { checkForStalePipeline(pipeline, errors); } + // Run other pre-configured validators + pipelineValidators.forEach(it -> it.validate(pipeline, errors)); + if (errors.hasErrors()) { String message = errors.getAllErrorsMessage(); throw new ValidationException(message, errors.getAllErrors()); @@ -400,9 +547,16 @@ private void checkForStalePipeline(Pipeline pipeline, ValidatorErrors errors) { } } - private void checkForDuplicatePipeline(String application, String name, String id) { + @VisibleForTesting + void checkForDuplicatePipeline(String application, String name, String id) { + log.debug( + "Cache refresh enabled when checking for duplicates: {}", + pipelineControllerConfig.getSave().isRefreshCacheOnDuplicatesCheck()); boolean any = - pipelineDAO.getPipelinesByApplication(application).stream() + pipelineDAO + .getPipelinesByApplication( + application, pipelineControllerConfig.getSave().isRefreshCacheOnDuplicatesCheck()) + .stream() .anyMatch(it -> it.getName().equalsIgnoreCase(name) && !it.getId().equals(id)); if (any) { throw new DuplicateEntityException( @@ -410,10 +564,6 @@ private void checkForDuplicatePipeline(String application, String name, String i } } - private void checkForDuplicatePipeline(String application, String name) { - checkForDuplicatePipeline(application, name, null); - } - /** * Ensure that cron triggers have an identifier * @@ -426,4 +576,112 @@ private static void ensureCronTriggersHaveIdentifier(Pipeline pipeline) { .filter(it -> Strings.isNullOrEmpty((String) it.get("id"))) .forEach(it -> it.put("id", UUID.randomUUID().toString())); } + + /** * Fetches all the pipelines and groups then into a map indexed by applications */ + private Map> getAllPipelinesByApplication() { + Map> appToPipelines = new HashMap<>(); + pipelineDAO + .all(false) + .forEach( + pipeline -> + appToPipelines + .computeIfAbsent(pipeline.getApplication(), k -> new ArrayList<>()) + .add(pipeline)); + return appToPipelines; + } + + /** + * Validates the provided list of pipelines and populates the provided valid and invalid pipelines + * accordingly. Following validations are performed: Check if user has permissions to write the + * pipeline; Check if duplicate pipeline exists in the same app; Validate pipeline id + * + * @param pipelines List of {@link Pipeline} to be validated + * @param validPipelines Result list of {@link Pipeline} that passed validations + * @param invalidPipelines Result list of {@link Pipeline} that failed validations + */ + private void validatePipelines( + List pipelines, + Boolean staleCheck, + List validPipelines, + List invalidPipelines) { + + Map> pipelinesByApp = getAllPipelinesByApplication(); + Map appPermissionForUser = new HashMap<>(); + Set uniqueIdSet = new HashSet<>(); + + final Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + String user = AuthenticatedRequest.getSpinnakerUser().orElse("anonymous"); + + long validationStartTime = System.currentTimeMillis(); + log.debug("Running validations before saving"); + for (Pipeline pipeline : pipelines) { + try { + validatePipeline(pipeline, staleCheck); + + String app = pipeline.getApplication(); + String pipelineName = pipeline.getName(); + + // Check if user has permissions to write the pipeline + if (!appPermissionForUser.computeIfAbsent( + app, + key -> + fiatPermissionEvaluator.hasPermission( + auth, pipeline.getApplication(), "APPLICATION", "WRITE"))) { + String errorMessage = + String.format( + "User %s does not have WRITE permission to save the pipeline %s in the application %s.", + user, pipeline.getName(), pipeline.getApplication()); + log.error(errorMessage); + pipeline.setAny("errorMsg", errorMessage); + invalidPipelines.add(pipeline); + continue; + } + + // Check if duplicate pipeline exists in the same app + List appPipelines = pipelinesByApp.getOrDefault(app, new ArrayList<>()); + if (appPipelines.stream() + .anyMatch( + existingPipeline -> + existingPipeline.getName().equalsIgnoreCase(pipeline.getName()) + && !existingPipeline.getId().equals(pipeline.getId()))) { + String errorMessage = + String.format( + "A pipeline with name %s already exists in the application %s", + pipelineName, app); + log.error(errorMessage); + pipeline.setAny("errorMsg", errorMessage); + invalidPipelines.add(pipeline); + continue; + } + + // Validate pipeline id + String id = pipeline.getId(); + if (Strings.isNullOrEmpty(id)) { + pipeline.setId(UUID.randomUUID().toString()); + } else if (!uniqueIdSet.add(id)) { + String errorMessage = + String.format( + "Duplicate pipeline id %s found when processing pipeline %s in the application %s", + id, pipeline.getName(), pipeline.getApplication()); + log.error(errorMessage); + invalidPipelines.add(pipeline); + pipeline.setAny("errorMsg", errorMessage); + continue; + } + validPipelines.add(pipeline); + } catch (Exception e) { + String errorMessage = + String.format( + "Encountered the following error when validating pipeline %s in the application %s: %s", + pipeline.getName(), pipeline.getApplication(), e.getMessage()); + log.error(errorMessage, e); + pipeline.setAny("errorMsg", errorMessage); + invalidPipelines.add(pipeline); + } + } + log.debug( + "Validated {} pipelines in {}ms", + pipelines.size(), + System.currentTimeMillis() - validationStartTime); + } } diff --git a/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerSpec.groovy b/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerSpec.groovy index b90c78216..68bf5f067 100644 --- a/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerSpec.groovy +++ b/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerSpec.groovy @@ -1,9 +1,12 @@ package com.netflix.spinnaker.front50.controllers import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator import com.netflix.spinnaker.front50.api.model.pipeline.Pipeline import com.netflix.spinnaker.front50.api.validator.PipelineValidator import com.netflix.spinnaker.front50.api.validator.ValidatorErrors +import com.netflix.spinnaker.front50.config.controllers.PipelineControllerConfig +import com.netflix.spinnaker.front50.exceptions.DuplicateEntityException import com.netflix.spinnaker.front50.model.pipeline.PipelineDAO import com.netflix.spinnaker.kork.web.exceptions.ExceptionMessageDecorator import com.netflix.spinnaker.kork.web.exceptions.GenericExceptionHandlers @@ -32,12 +35,129 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder @AutoConfigureMockMvc(addFilters = false) @WebMvcTest(controllers = [PipelineController]) -@ContextConfiguration(classes = [TestConfiguration, PipelineController]) +@ContextConfiguration(classes = [TestConfiguration, AuthorizationSupport, PipelineController, PipelineControllerConfig]) class PipelineControllerSpec extends Specification { @Autowired private MockMvc mockMvc + @Autowired + PipelineControllerConfig pipelineControllerConfig + + @Autowired + FiatPermissionEvaluator fiatPermissionEvaluator + + @Autowired + private AuthorizationSupport authorizationSupport + + @Unroll + def "should fail the pipeline when staleCheck is true and conditions are met"() { + given: + def staleCheck_true = true + def staleCheck_false = false + def localFiatPermissionEvaluator = Mock(FiatPermissionEvaluator) + def pipelinesBatch1 = [ + [ id : "1", + name : "test-pipeline", + application : "test-application", + lastModified: 1662644108666 + ] + ] + + def pipelinesBatch2 = [ + [ id : "1", + name : "test-pipeline", + application : "test-application", + ] + ] + def pipelineDAO = new InMemoryPipelineDAO(){ + @Override + Pipeline findById(String id) throws NotFoundException { + return new Pipeline([ + id : "1", + name : "test-pipeline", + application : "test-application", + lastModified: 1772644108777 + ]) + } + + @Override + void bulkImport(Collection items) {} + } + + _ * localFiatPermissionEvaluator.hasPermission(_, "test-application", "APPLICATION", "WRITE") >> true + + def pipelineController = new PipelineController( + pipelineDAO, new ObjectMapper(), Optional.empty(), [], Optional.empty(), pipelineControllerConfig, + localFiatPermissionEvaluator, authorizationSupport) + + when: "staleCheck is true and conditions are met" + def response = pipelineController.batchUpdate(pipelinesBatch1, staleCheck_true) + + then: "pipeline save should fail" + response.failed_pipelines_count == 1 + response.successful_pipelines_count == 0 + response.successful_pipelines == [] + response.failed_pipelines[0].any.errorMsg == + "Encountered the following error when validating pipeline test-pipeline in the application test-application: " + + "The submitted pipeline is stale. submitted updateTs 1662644108666 does not match stored updateTs 1772644108777" + + when: "staleCheck is false" + response = pipelineController.batchUpdate(pipelinesBatch1, staleCheck_false) + + then: "pipeline save should be successful" + response.failed_pipelines_count == 0 + response.successful_pipelines_count == 1 + + when: "staleCheck is true but submitted pipeline doesn't have lastModified timestamp" + response = pipelineController.batchUpdate(pipelinesBatch2, staleCheck_true) + + then: "pipeline save should be successful" + response.failed_pipelines_count == 0 + response.successful_pipelines_count == 1 + + } + + @Unroll + def "test cache refresh enabled flag when checking for duplicate pipelines"() { + given: + def existingPipelineIdNotInCache = "123" + def newPipelineId = "456" + def application = "test-application" + def pipelineName = "test-pipeline" + def pipeline = new Pipeline([ + id : existingPipelineIdNotInCache, + name : pipelineName, + application : application, + ]) + + def pipelineDAO = new InMemoryPipelineDAO(){ + @Override + Collection getPipelinesByApplication(String app, boolean refresh) { + return refresh ? [pipeline] : [] + } + } + + def pipelineController = new PipelineController( + pipelineDAO, new ObjectMapper(), Optional.empty(), [], Optional.empty(), pipelineControllerConfig, + fiatPermissionEvaluator, authorizationSupport) + + when: + pipelineControllerConfig.getSave().refreshCacheOnDuplicatesCheck = true + pipelineController.checkForDuplicatePipeline(application, pipelineName, newPipelineId) + + then: + thrown DuplicateEntityException + + when: + pipelineControllerConfig.getSave().refreshCacheOnDuplicatesCheck = false + pipelineController.checkForDuplicatePipeline(application, pipelineName, newPipelineId) + + then: + noExceptionThrown() + + } + @Unroll def "should fail to save if application is missing, empty or blank"() { given: @@ -113,7 +233,10 @@ class PipelineControllerSpec extends Specification { new ObjectMapper(), Optional.empty(), [new MockValidator()] as List, - Optional.empty() + Optional.empty(), + pipelineControllerConfig, + fiatPermissionEvaluator, + authorizationSupport ) ) .setControllerAdvice( @@ -155,7 +278,8 @@ class PipelineControllerSpec extends Specification { pipelineDAO.history(testPipelineId, 20) >> pipelineList def mockMvcWithController = MockMvcBuilders.standaloneSetup(new PipelineController( - pipelineDAO, new ObjectMapper(), Optional.empty(), [], Optional.empty() + pipelineDAO, new ObjectMapper(), Optional.empty(), [], Optional.empty(), pipelineControllerConfig, + fiatPermissionEvaluator, authorizationSupport )).build() when: @@ -193,7 +317,8 @@ class PipelineControllerSpec extends Specification { ])) def mockMvcWithController = MockMvcBuilders.standaloneSetup(new PipelineController( - pipelineDAO, new ObjectMapper(), Optional.empty(), [], Optional.empty() + pipelineDAO, new ObjectMapper(), Optional.empty(), [], Optional.empty(), pipelineControllerConfig, + fiatPermissionEvaluator, authorizationSupport )).build() when: @@ -214,6 +339,11 @@ class PipelineControllerSpec extends Specification { PipelineDAO pipelineDAO() { detachedMockFactory.Stub(PipelineDAO) } + + @Bean + FiatPermissionEvaluator fiatPermissionEvaluator() { + detachedMockFactory.Stub(FiatPermissionEvaluator) + } } private class MockValidator implements PipelineValidator { @@ -288,6 +418,7 @@ class PipelineControllerSpec extends Specification { @Override Pipeline create(String id, Pipeline item) { map.put(id, item) + item } @Override diff --git a/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerTck.groovy b/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerTck.groovy index b9d6ecd20..8ccceefe6 100644 --- a/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerTck.groovy +++ b/front50-web/src/test/groovy/com/netflix/spinnaker/front50/controllers/PipelineControllerTck.groovy @@ -18,10 +18,13 @@ package com.netflix.spinnaker.front50.controllers import com.netflix.spectator.api.NoopRegistry import com.netflix.spinnaker.config.Front50SqlProperties -import com.netflix.spinnaker.front50.api.model.pipeline.Pipeline +import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator import com.netflix.spinnaker.front50.ServiceAccountsService +import com.netflix.spinnaker.front50.api.model.pipeline.Pipeline import com.netflix.spinnaker.front50.api.model.pipeline.Trigger import com.netflix.spinnaker.front50.config.StorageServiceConfigurationProperties +import com.netflix.spinnaker.front50.config.controllers.PipelineControllerConfig +import com.netflix.spinnaker.front50.jackson.Front50ApiModule import com.netflix.spinnaker.front50.model.DefaultObjectKeyLoader import com.netflix.spinnaker.front50.model.SqlStorageService import com.netflix.spinnaker.front50.model.pipeline.DefaultPipelineDAO @@ -29,12 +32,13 @@ import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties import com.netflix.spinnaker.kork.sql.test.SqlTestUtil import com.netflix.spinnaker.kork.web.exceptions.ExceptionMessageDecorator import com.netflix.spinnaker.kork.web.exceptions.GenericExceptionHandlers -import com.netflix.spinnaker.kork.web.exceptions.NotFoundException import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry import org.hamcrest.Matchers import org.springframework.beans.factory.ObjectProvider +import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter import org.springframework.web.util.UriComponentsBuilder +import java.nio.charset.StandardCharsets import java.time.Clock import java.util.concurrent.Callable import java.util.concurrent.Executors @@ -53,6 +57,7 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put +import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status @@ -71,23 +76,40 @@ abstract class PipelineControllerTck extends Specification { ServiceAccountsService serviceAccountsService StorageServiceConfigurationProperties.PerObjectType pipelineDAOConfigProperties = new StorageServiceConfigurationProperties().getPipeline() + FiatPermissionEvaluator fiatPermissionEvaluator + AuthorizationSupport authorizationSupport + ObjectMapper objectMapper + PipelineControllerConfig pipelineControllerConfig void setup() { println "--------------- Test " + specificationContext.currentIteration.name + this.objectMapper = new ObjectMapper() + this.objectMapper.registerModule(new Front50ApiModule()) + this.pipelineDAO = Spy(createPipelineDAO()) this.serviceAccountsService = Mock(ServiceAccountsService) + this.pipelineControllerConfig = new PipelineControllerConfig() + this.fiatPermissionEvaluator = Mock(FiatPermissionEvaluator) + this.authorizationSupport = Spy(new AuthorizationSupport(fiatPermissionEvaluator)) + + MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter = new MappingJackson2HttpMessageConverter(); + mappingJackson2HttpMessageConverter.setObjectMapper(objectMapper) mockMvc = MockMvcBuilders .standaloneSetup( new PipelineController( pipelineDAO, - new ObjectMapper(), + objectMapper, Optional.of(serviceAccountsService), Collections.emptyList(), - Optional.empty() + Optional.empty(), + pipelineControllerConfig, + fiatPermissionEvaluator, + authorizationSupport ) ) + .setMessageConverters(mappingJackson2HttpMessageConverter) .setControllerAdvice( new GenericExceptionHandlers( new ExceptionMessageDecorator(Mock(ObjectProvider)) @@ -109,7 +131,7 @@ abstract class PipelineControllerTck extends Specification { .perform( post("/pipelines") .contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString(command)) + .content(objectMapper.writeValueAsString(command)) ) .andReturn() .response @@ -152,7 +174,7 @@ abstract class PipelineControllerTck extends Specification { when: pipeline.name = "Updated Name" def response = mockMvc.perform(put("/pipelines/${pipeline.id}").contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString(pipeline))).andReturn().response + .content(objectMapper.writeValueAsString(pipeline))).andReturn().response then: response.status == OK @@ -172,7 +194,7 @@ abstract class PipelineControllerTck extends Specification { when: def response = mockMvc.perform(put("/pipelines/${pipeline1.id}").contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString(pipeline1))).andReturn().response + .content(objectMapper.writeValueAsString(pipeline1))).andReturn().response then: response.status == BAD_REQUEST @@ -180,7 +202,7 @@ abstract class PipelineControllerTck extends Specification { when: response = mockMvc.perform(put("/pipelines/${pipeline2.id}").contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString(pipeline1))).andReturn().response + .content(objectMapper.writeValueAsString(pipeline1))).andReturn().response then: response.status == BAD_REQUEST @@ -209,7 +231,7 @@ abstract class PipelineControllerTck extends Specification { when: def response = mockMvc.perform(post('/pipelines'). - contentType(MediaType.APPLICATION_JSON).content(new ObjectMapper().writeValueAsString(pipeline))) + contentType(MediaType.APPLICATION_JSON).content(objectMapper.writeValueAsString(pipeline))) .andReturn().response def updatedPipeline = pipelineDAO.findById( @@ -247,7 +269,7 @@ abstract class PipelineControllerTck extends Specification { when: def response = mockMvc.perform(post('/pipelines'). - contentType(MediaType.APPLICATION_JSON).content(new ObjectMapper().writeValueAsString(pipeline))) + contentType(MediaType.APPLICATION_JSON).content(objectMapper.writeValueAsString(pipeline))) .andReturn().response def updatedPipeline = pipelineDAO.findById( @@ -307,7 +329,7 @@ abstract class PipelineControllerTck extends Specification { when: def response = mockMvc.perform(post('/pipelines') .contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString([name: "pipeline1", application: "test"]))) + .content(objectMapper.writeValueAsString([name: "pipeline1", application: "test"]))) .andReturn().response then: @@ -315,6 +337,159 @@ abstract class PipelineControllerTck extends Specification { response.errorMessage == "A pipeline with name pipeline1 already exists in application test" } + void 'should not refresh cache when checking for duplicates when saving'() { + given: + def pipeline = [name: "My Pipeline", application: "test"] + pipelineControllerConfig.save.refreshCacheOnDuplicatesCheck = false + + when: + def response = mockMvc.perform(post('/pipelines') + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(pipeline))) + .andReturn() + .response + + then: + response.status == OK + 1 * pipelineDAO.getPipelinesByApplication("test", false) + + when: + pipeline.name = "My Second Pipeline" + pipelineControllerConfig.save.refreshCacheOnDuplicatesCheck = true + response = mockMvc.perform(post('/pipelines') + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(pipeline))) + .andReturn() + .response + + then: + response.status == OK + 1 * pipelineDAO.getPipelinesByApplication("test", true) + } + + def "should perform batch update"() { + given: + def pipelines = [ + new Pipeline([name: "My Pipeline1", application: "test1", id: "id1", triggers: []]), + new Pipeline([name: "My Pipeline2", application: "test1", id: "id2", triggers: []]), + new Pipeline([name: "My Pipeline3", application: "test2", id: "id3", triggers: []]), + new Pipeline([name: "My Pipeline4", application: "test2", id: "id4", triggers: []]) + ] + + when: + def response = mockMvc.perform(post('/pipelines/batchUpdate') + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(pipelines))) + .andReturn() + .response + + then: + response.status == OK + 1 * fiatPermissionEvaluator.hasPermission(_, "test1", "APPLICATION", "WRITE") >> true + 1 * fiatPermissionEvaluator.hasPermission(_, "test2", "APPLICATION", "WRITE") >> true + 1 * pipelineDAO.bulkImport(pipelines) >> null + new JsonSlurper().parseText(response.getContentAsString()) == [ + successful_pipelines_count: 4, + successful_pipelines : ["My Pipeline1", "My Pipeline2", "My Pipeline3", "My Pipeline4"], + failed_pipelines_count : 0, + failed_pipelines : [] + ] + } + + def "should perform batch updates with failures"() { + given: + def pipelines = [ + new Pipeline([name: "Successful Pipeline 1", application: "test_app", id: "id1", triggers: []]), + new Pipeline([id: "id2", triggers: []]), + new Pipeline([name: "Failed Pipeline 3", application: "test_app_without_permission", id: "id3", triggers: []]), + new Pipeline([name: "Failed Pipeline 4", application: "test_app", id: "id4", triggers: []]), + new Pipeline([name: "Failed Pipeline 5", application: "test_app", id: "id1", triggers: []]), + [name: "Failed Pipeline 6", application: "test_app", id: "id6", triggers: [:]], + new Pipeline([name: "Failed Pipeline 7", application: "test_app", id: "id7", + triggers: [[runAsUser: "not_accessible"]]]) + ] + + // Success case + when: + def response = mockMvc.perform(post('/pipelines/batchUpdate') + .contentType(MediaType.APPLICATION_JSON) + .characterEncoding(StandardCharsets.UTF_8.toString()) + .content(objectMapper.writeValueAsString(pipelines))) + .andDo(print()) + .andReturn() + .response + + then: + 1 * pipelineDAO.all(false) >> [ + [name: "Failed Pipeline 4", application: "test_app", id: "existing_pipeline_id"] as Pipeline + ] + 1 * fiatPermissionEvaluator.hasPermission(_, "test_app", "APPLICATION", "WRITE") >> true + 1 * fiatPermissionEvaluator.hasPermission(_, "test_app_without_permission", "APPLICATION", "WRITE") >> false + 1 * pipelineDAO.bulkImport(pipelines[0..0]) >> null + 1 * authorizationSupport.hasRunAsUserPermission(pipelines[6]) >> false + response.status == OK + new JsonSlurper().parseText(response.getContentAsString()) == [ + successful_pipelines_count: 1, + successful_pipelines: ["Successful Pipeline 1"], + failed_pipelines_count : 6, + failed_pipelines : [ + [ + id : "id6", + name : "Failed Pipeline 6", + application : "test_app", + triggers : [:], + errorMsg : "Failed to deserialize the pipeline json into a valid pipeline: " + + "java.lang.IllegalArgumentException: Cannot deserialize value of type " + + "`java.util.ArrayList` " + + "from Object value (token `JsonToken.START_OBJECT`)\n at [Source: UNKNOWN; byte offset: #UNKNOWN] " + + "(through reference chain: com.netflix.spinnaker.front50.api.model.pipeline.Pipeline[\"triggers\"])" + ], + [ + id : "id7", + name : "Failed Pipeline 7", + application : "test_app", + schema : "1", + triggers: [[runAsUser: "not_accessible"]], + errorMsg : "Validation of runAsUser permissions for pipeline Failed Pipeline 7 " + + "in the application test_app failed." + ], + [ + id : "id2", + schema : "1", + triggers : [], + errorMsg : "Encountered the following error when validating pipeline null in the application null: " + + "A pipeline requires name and application fields" + ], + [ + id : "id3", + name : "Failed Pipeline 3", + application : "test_app_without_permission", + schema : "1", + triggers : [], + errorMsg : "User anonymous does not have WRITE permission " + + "to save the pipeline Failed Pipeline 3 in the application test_app_without_permission." + ], + [ + id : "id4", + name : "Failed Pipeline 4", + application : "test_app", + schema : "1", + triggers : [], + errorMsg : "A pipeline with name Failed Pipeline 4 already exists in the application test_app" + ], + [ + id : "id1", + name : "Failed Pipeline 5", + application : "test_app", + schema : "1", + triggers : [], + errorMsg : "Duplicate pipeline id id1 found when processing pipeline Failed Pipeline 5 " + + "in the application test_app" + ] + ] + ] + } + @Unroll void "pipeline with limitConcurrent = #limitConcurrent and maxConcurrentExecutions = #maxConcurrentExecutions"() { def appName = "test" @@ -340,7 +515,7 @@ abstract class PipelineControllerTck extends Specification { def postResponse = mockMvc.perform( post("/pipelines") .contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString(pipelineData)) + .content(objectMapper.writeValueAsString(pipelineData)) ) .andReturn() .response @@ -447,7 +622,7 @@ abstract class PipelineControllerTck extends Specification { if (it % 2 == 0) { mockMvc.perform(post('/pipelines') .contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString([ + .content(objectMapper.writeValueAsString([ name: "My Pipeline" + it, application: "test" + it, id: "id" + it, @@ -743,7 +918,7 @@ abstract class PipelineControllerTck extends Specification { // Update Pipeline 2 mockMvc.perform(put('/pipelines/id2') .contentType(MediaType.APPLICATION_JSON) - .content(new ObjectMapper().writeValueAsString(pipelines[1]))) + .content(objectMapper.writeValueAsString(pipelines[1]))) .andExpect(status().isOk()) response = mockMvc.perform(get('/pipelines/test')) @@ -811,7 +986,7 @@ class SqlPipelineControllerTck extends PipelineControllerTck { def registry = new NoopRegistry() def storageService = new SqlStorageService( - new ObjectMapper(), + objectMapper, registry, currentDatabase.context, Clock.systemDefaultZone(), @@ -831,6 +1006,9 @@ class SqlPipelineControllerTck extends PipelineControllerTck { new NoopRegistry(), CircuitBreakerRegistry.ofDefaults()) + // refreshing to initialize the cache with empty set + pipelineDAO.all(true) + return pipelineDAO } }