From a1be60c73cd44605a75743cd3310786a0caac92b Mon Sep 17 00:00:00 2001 From: Florent CHAMFROY Date: Fri, 3 Jan 2025 14:29:40 +0100 Subject: [PATCH] feat: handle subscriptions and apikeys in local sync --- .../process/local/mapper/ApiKeyMapper.java | 56 ++++++ .../sync/process/local/mapper/ApiMapper.java | 85 +++++++++ .../local/mapper/SubscriptionMapper.java | 63 ++++++ .../local/model/LocalSyncFileDefinition.java | 36 ++++ .../local/spring/LocalSyncConfiguration.java | 55 ++++-- .../synchronizer/LocalApiSynchronizer.java | 179 ++++++++---------- 6 files changed, 352 insertions(+), 122 deletions(-) create mode 100644 gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiKeyMapper.java create mode 100644 gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiMapper.java create mode 100644 gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/SubscriptionMapper.java create mode 100644 gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/model/LocalSyncFileDefinition.java diff --git a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiKeyMapper.java b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiKeyMapper.java new file mode 100644 index 00000000000..9014b3139b9 --- /dev/null +++ b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiKeyMapper.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2015 The Gravitee team (http://gravitee.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gravitee.gateway.services.sync.process.local.mapper; + +import io.gravitee.gateway.api.service.ApiKey; +import io.gravitee.gateway.api.service.Subscription; +import java.util.Set; + +public class ApiKeyMapper { + + public ApiKey to(io.gravitee.repository.management.model.ApiKey apiKeyModel, Set subscriptions) { + ApiKey.ApiKeyBuilder apiKeyBuilder = ApiKey + .builder() + .id(apiKeyModel.getId()) + .key(apiKeyModel.getKey()) + .application(apiKeyModel.getApplication()) + .expireAt(apiKeyModel.getExpireAt()) + .revoked(apiKeyModel.isRevoked()) + .paused(apiKeyModel.isPaused()) + .active(false); + + if (subscriptions != null) { + subscriptions + .stream() + .filter(subscription -> apiKeyModel.getSubscriptions().contains(subscription.getId())) + .findFirst() + .ifPresent(apiKeySubscription -> + apiKeyBuilder + .api(apiKeySubscription.getApi()) + .plan(apiKeySubscription.getPlan()) + .subscription(apiKeySubscription.getId()) + .active( + !apiKeyModel.isPaused() && + !apiKeyModel.isRevoked() && + io.gravitee.repository.management.model.Subscription.Status.ACCEPTED + .name() + .equals(apiKeySubscription.getStatus()) + ) + ); + } + return apiKeyBuilder.build(); + } +} diff --git a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiMapper.java b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiMapper.java new file mode 100644 index 00000000000..6e4877438e8 --- /dev/null +++ b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/ApiMapper.java @@ -0,0 +1,85 @@ +/* + * Copyright © 2015 The Gravitee team (http://gravitee.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gravitee.gateway.services.sync.process.local.mapper; + +import static io.gravitee.repository.management.model.Event.EventProperties.API_ID; +import static io.gravitee.repository.management.model.Event.EventProperties.DEPLOYMENT_NUMBER; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.gravitee.definition.model.DefinitionVersion; +import io.gravitee.definition.model.v4.ApiType; +import io.gravitee.definition.model.v4.nativeapi.NativeApi; +import io.gravitee.gateway.reactor.ReactableApi; +import io.gravitee.gateway.services.sync.process.repository.service.EnvironmentService; +import io.gravitee.repository.management.model.Event; +import io.gravitee.repository.management.model.LifecycleState; +import io.reactivex.rxjava3.core.Maybe; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@RequiredArgsConstructor +@Slf4j +public class ApiMapper { + + private final ObjectMapper objectMapper; + private final EnvironmentService environmentService; + + public ReactableApi to(Event apiEvent) { + try { + // Read API definition from event + var api = objectMapper.readValue(apiEvent.getPayload(), io.gravitee.repository.management.model.Api.class); + + ReactableApi reactableApi; + + // Check the version of the API definition to read the right model entity + if (DefinitionVersion.V4 != api.getDefinitionVersion()) { + var eventApiDefinition = objectMapper.readValue(api.getDefinition(), io.gravitee.definition.model.Api.class); + + // Update definition with required information for deployment phase + reactableApi = new io.gravitee.gateway.handlers.api.definition.Api(eventApiDefinition); + } else { + if (api.getType() == ApiType.NATIVE) { + var eventApiDefinition = objectMapper.readValue(api.getDefinition(), NativeApi.class); + + // Update definition with required information for deployment phase + reactableApi = new io.gravitee.gateway.reactive.handlers.api.v4.NativeApi(eventApiDefinition); + } else if (api.getType() == ApiType.PROXY || api.getType() == ApiType.MESSAGE) { + var eventApiDefinition = objectMapper.readValue(api.getDefinition(), io.gravitee.definition.model.v4.Api.class); + + // Update definition with required information for deployment phase + reactableApi = new io.gravitee.gateway.reactive.handlers.api.v4.Api(eventApiDefinition); + } else { + throw new IllegalArgumentException("Unsupported ApiType [" + api.getType() + "] for api: " + api.getId()); + } + } + + reactableApi.setEnabled(api.getLifecycleState() == LifecycleState.STARTED); + reactableApi.setDeployedAt(apiEvent.getCreatedAt()); + reactableApi.setRevision( + Optional.ofNullable(apiEvent.getProperties()).map(props -> props.get(DEPLOYMENT_NUMBER.getValue())).orElse(null) + ); + + environmentService.fill(api.getEnvironmentId(), reactableApi); + + return reactableApi; + } catch (Exception e) { + // Log the error and ignore this event. + log.error("Unable to extract api definition from event [{}].", apiEvent.getId(), e); + return null; + } + } +} diff --git a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/SubscriptionMapper.java b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/SubscriptionMapper.java new file mode 100644 index 00000000000..088b291303e --- /dev/null +++ b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/mapper/SubscriptionMapper.java @@ -0,0 +1,63 @@ +/* + * Copyright © 2015 The Gravitee team (http://gravitee.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gravitee.gateway.services.sync.process.local.mapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.gravitee.gateway.api.service.Subscription; +import io.gravitee.gateway.api.service.SubscriptionConfiguration; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@RequiredArgsConstructor +@Slf4j +public class SubscriptionMapper { + + private final ObjectMapper objectMapper; + + public Subscription to(io.gravitee.repository.management.model.Subscription subscriptionModel) { + try { + Subscription subscription = new Subscription(); + subscription.setApi(subscriptionModel.getApi()); + subscription.setApplication(subscriptionModel.getApplication()); + subscription.setClientId(subscriptionModel.getClientId()); + subscription.setClientCertificate(subscriptionModel.getClientCertificate()); + subscription.setStartingAt(subscriptionModel.getStartingAt()); + subscription.setEndingAt(subscriptionModel.getEndingAt()); + subscription.setId(subscriptionModel.getId()); + subscription.setPlan(subscriptionModel.getPlan()); + if (subscriptionModel.getStatus() != null) { + subscription.setStatus(subscriptionModel.getStatus().name()); + } + if (subscriptionModel.getConsumerStatus() != null) { + subscription.setConsumerStatus(Subscription.ConsumerStatus.valueOf(subscriptionModel.getConsumerStatus().name())); + } + if (subscriptionModel.getType() != null) { + subscription.setType(Subscription.Type.valueOf(subscriptionModel.getType().name().toUpperCase())); + } + if (subscriptionModel.getConfiguration() != null) { + subscription.setConfiguration( + objectMapper.readValue(subscriptionModel.getConfiguration(), SubscriptionConfiguration.class) + ); + } + subscription.setMetadata(subscriptionModel.getMetadata()); + subscription.setEnvironmentId(subscriptionModel.getEnvironmentId()); + return subscription; + } catch (Exception e) { + log.error("Unable to map subscription from model [{}].", subscriptionModel.getId(), e); + return null; + } + } +} diff --git a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/model/LocalSyncFileDefinition.java b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/model/LocalSyncFileDefinition.java new file mode 100644 index 00000000000..95c5410b23f --- /dev/null +++ b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/model/LocalSyncFileDefinition.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2015 The Gravitee team (http://gravitee.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.gravitee.gateway.services.sync.process.local.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.gravitee.repository.management.model.ApiKey; +import io.gravitee.repository.management.model.Event; +import io.gravitee.repository.management.model.Subscription; +import java.util.List; +import lombok.Data; + +@Data +public class LocalSyncFileDefinition { + + @JsonProperty("apiEvent") + Event repositoryApiEvent; + + @JsonProperty("subscriptions") + List repositorySubscriptionList; + + @JsonProperty("apiKeys") + List repositoryApiKeyList; +} diff --git a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/spring/LocalSyncConfiguration.java b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/spring/LocalSyncConfiguration.java index 40ec8a948a1..bcd974e0ed0 100644 --- a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/spring/LocalSyncConfiguration.java +++ b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/spring/LocalSyncConfiguration.java @@ -19,17 +19,17 @@ import static io.gravitee.gateway.services.sync.SyncConfiguration.newThreadFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import io.gravitee.gateway.api.service.ApiKeyService; +import io.gravitee.gateway.api.service.SubscriptionService; import io.gravitee.gateway.handlers.api.manager.ApiManager; -import io.gravitee.gateway.services.sync.process.common.deployer.DeployerFactory; import io.gravitee.gateway.services.sync.process.distributed.service.DistributedSyncService; import io.gravitee.gateway.services.sync.process.local.LocalSyncManager; import io.gravitee.gateway.services.sync.process.local.LocalSynchronizer; +import io.gravitee.gateway.services.sync.process.local.mapper.ApiKeyMapper; +import io.gravitee.gateway.services.sync.process.local.mapper.ApiMapper; +import io.gravitee.gateway.services.sync.process.local.mapper.SubscriptionMapper; import io.gravitee.gateway.services.sync.process.local.synchronizer.LocalApiSynchronizer; -import io.gravitee.gateway.services.sync.process.repository.mapper.ApiMapper; import io.gravitee.gateway.services.sync.process.repository.service.EnvironmentService; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.ApiKeyAppender; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.PlanAppender; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.SubscriptionAppender; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -67,30 +67,43 @@ public ThreadPoolExecutor syncLocalExecutor(@Value("${services.sync.local.thread return threadPoolExecutor; } + @Bean + public ApiMapper localApiMapper(ObjectMapper objectMapper, EnvironmentService environmentService) { + return new ApiMapper(objectMapper, environmentService); + } + + @Bean + public SubscriptionMapper localSubscriptionMapper(ObjectMapper objectMapper) { + return new SubscriptionMapper(objectMapper); + } + + @Bean + public ApiKeyMapper localApiKeyMapper() { + return new ApiKeyMapper(); + } + @Bean public LocalApiSynchronizer localApiSynchronizer( - ObjectMapper objectMapper, - EnvironmentService environmentService, + ApiKeyMapper apiKeyMapper, + ApiKeyService apiKeyService, ApiManager apiManager, ApiMapper apiMapper, - PlanAppender planAppender, - SubscriptionAppender subscriptionAppender, - ApiKeyAppender apiKeyAppender, - DeployerFactory deployerFactory, - @Qualifier("syncLocalExecutor") ThreadPoolExecutor syncLocalExecutor, - @Qualifier("syncDeployerExecutor") ThreadPoolExecutor syncDeployerExecutor + EnvironmentService environmentService, + ObjectMapper objectMapper, + SubscriptionMapper subscriptionMapper, + SubscriptionService subscriptionService, + @Qualifier("syncLocalExecutor") ThreadPoolExecutor syncLocalExecutor ) { return new LocalApiSynchronizer( - objectMapper, - environmentService, + apiKeyMapper, + apiKeyService, apiManager, apiMapper, - planAppender, - subscriptionAppender, - apiKeyAppender, - deployerFactory, - syncLocalExecutor, - syncDeployerExecutor + environmentService, + objectMapper, + subscriptionMapper, + subscriptionService, + syncLocalExecutor ); } diff --git a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/synchronizer/LocalApiSynchronizer.java b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/synchronizer/LocalApiSynchronizer.java index a0e59d2d969..8ac642ec1c7 100644 --- a/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/synchronizer/LocalApiSynchronizer.java +++ b/gravitee-apim-gateway/gravitee-apim-gateway-services/gravitee-apim-gateway-services-sync/src/main/java/io/gravitee/gateway/services/sync/process/local/synchronizer/LocalApiSynchronizer.java @@ -16,25 +16,23 @@ package io.gravitee.gateway.services.sync.process.local.synchronizer; import com.fasterxml.jackson.databind.ObjectMapper; -import io.gravitee.definition.model.DefinitionVersion; -import io.gravitee.definition.model.v4.ApiType; -import io.gravitee.definition.model.v4.nativeapi.NativeApi; +import io.gravitee.gateway.api.service.ApiKeyService; +import io.gravitee.gateway.api.service.Subscription; +import io.gravitee.gateway.api.service.SubscriptionService; import io.gravitee.gateway.handlers.api.definition.Api; import io.gravitee.gateway.handlers.api.manager.ApiManager; import io.gravitee.gateway.reactor.ReactableApi; -import io.gravitee.gateway.services.sync.process.common.deployer.DeployerFactory; import io.gravitee.gateway.services.sync.process.local.LocalSynchronizer; -import io.gravitee.gateway.services.sync.process.repository.mapper.ApiMapper; +import io.gravitee.gateway.services.sync.process.local.mapper.ApiKeyMapper; +import io.gravitee.gateway.services.sync.process.local.mapper.ApiMapper; +import io.gravitee.gateway.services.sync.process.local.mapper.SubscriptionMapper; +import io.gravitee.gateway.services.sync.process.local.model.LocalSyncFileDefinition; import io.gravitee.gateway.services.sync.process.repository.service.EnvironmentService; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.AbstractApiSynchronizer; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.ApiKeyAppender; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.PlanAppender; -import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.SubscriptionAppender; -import io.gravitee.repository.management.model.LifecycleState; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.schedulers.Schedulers; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardWatchEventKinds; @@ -43,66 +41,98 @@ import java.nio.file.WatchService; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; /** * @author GraviteeSource Team */ @Slf4j -public class LocalApiSynchronizer extends AbstractApiSynchronizer implements LocalSynchronizer { +public class LocalApiSynchronizer implements LocalSynchronizer { private final Map definitions = new HashMap<>(); - private final ObjectMapper objectMapper; + + private final ApiKeyMapper apiKeyMapper; + private final ApiKeyService apiKeyService; + private final ApiManager apiManager; + private final ApiMapper apiMapper; private final EnvironmentService environmentService; + private final ObjectMapper objectMapper; + private final SubscriptionMapper subscriptionMapper; + private final SubscriptionService subscriptionService; + private final ThreadPoolExecutor syncLocalExecutor; public LocalApiSynchronizer( - final ObjectMapper objectMapper, - final EnvironmentService environmentService, + final ApiKeyMapper apiKeyMapper, + final ApiKeyService apiKeyService, final ApiManager apiManager, final ApiMapper apiMapper, - final PlanAppender planAppender, - final SubscriptionAppender subscriptionAppender, - final ApiKeyAppender apiKeyAppender, - final DeployerFactory deployerFactory, - final ThreadPoolExecutor syncFetcherExecutor, - final ThreadPoolExecutor syncDeployerExecutor + final EnvironmentService environmentService, + final ObjectMapper objectMapper, + final SubscriptionMapper subscriptionMapper, + final SubscriptionService subscriptionService, + final ThreadPoolExecutor syncLocalExecutor ) { - super( - apiManager, - apiMapper, - planAppender, - subscriptionAppender, - apiKeyAppender, - deployerFactory, - syncFetcherExecutor, - syncDeployerExecutor - ); - this.objectMapper = objectMapper; + this.apiKeyMapper = apiKeyMapper; + this.apiKeyService = apiKeyService; + this.apiManager = apiManager; + this.apiMapper = apiMapper; this.environmentService = environmentService; + this.objectMapper = objectMapper; + this.subscriptionMapper = subscriptionMapper; + this.subscriptionService = subscriptionService; + this.syncLocalExecutor = syncLocalExecutor; } public Completable synchronize(final File localRegistryDir) { return Flowable .fromArray(localRegistryDir.listFiles((dir, name) -> name.endsWith(".json"))) - .map(apiDefinitionFile -> { - ReactableApi api = toReactableApi(apiDefinitionFile); - - this.apiManager.register(api); - this.definitions.put(Paths.get(apiDefinitionFile.toURI()), api); - - return api; - }) + .map(this::deployApi) + .doOnError(throwable -> log.error("Error synchronizing API", throwable)) .doOnNext(api -> log.debug("api {} synchronized from local registry", api.getId())) .ignoreElements(); } + private ReactableApi deployApi(File apiDefinitionFile) throws IOException { + LocalSyncFileDefinition fileDefinition = objectMapper.readValue(apiDefinitionFile, LocalSyncFileDefinition.class); + ReactableApi apiToDeploy; + if (fileDefinition.getRepositoryApiEvent() != null) { + apiToDeploy = apiMapper.to(fileDefinition.getRepositoryApiEvent()); + var successfulRegistration = this.apiManager.register(apiToDeploy); + if (successfulRegistration) { + this.definitions.put(Paths.get(apiDefinitionFile.toURI()), apiToDeploy); + + if (fileDefinition.getRepositorySubscriptionList() != null && !fileDefinition.getRepositorySubscriptionList().isEmpty()) { + Set subscriptionsToDeploy = fileDefinition + .getRepositorySubscriptionList() + .stream() + .map(subscriptionMapper::to) + .collect(Collectors.toSet()); + subscriptionsToDeploy.forEach(subscriptionService::register); + + if (fileDefinition.getRepositoryApiKeyList() != null && !fileDefinition.getRepositoryApiKeyList().isEmpty()) { + for (io.gravitee.repository.management.model.ApiKey apiKeyModel : fileDefinition.getRepositoryApiKeyList()) { + apiKeyService.register(apiKeyMapper.to(apiKeyModel, subscriptionsToDeploy)); + } + } + } + } else { + throw new IllegalStateException("Error during registration"); + } + } else { + throw new IllegalStateException("File to be synced cannot be empty"); + } + return apiToDeploy; + } + @Override public Completable watch(final Path localRegistryPath, final WatchService watchService) { return Flowable .interval(5, TimeUnit.SECONDS) - .subscribeOn(Schedulers.from(this.syncFetcherExecutor)) + .subscribeOn(Schedulers.from(this.syncLocalExecutor)) .map(t -> { WatchKey key = watchService.poll(); if (key != null) { @@ -111,27 +141,22 @@ public Completable watch(final Path localRegistryPath, final WatchService watchS Path fileName = localRegistryPath.resolve(((Path) event.context()).getFileName()); log.debug("An event occurs for file {}: {}", fileName, kind.name()); if (kind == StandardWatchEventKinds.ENTRY_MODIFY) { - ReactableApi loadedDefinition = this.toReactableApi(fileName.toFile()); Api existingDefinition = (Api) this.definitions.get(fileName); if (existingDefinition != null) { - if (this.apiManager.get(existingDefinition.getId()) != null) { - this.apiManager.register(loadedDefinition); - } else { - this.apiManager.unregister(existingDefinition.getId()); - this.definitions.remove(fileName); - this.definitions.put(fileName, loadedDefinition); - } + this.apiManager.unregister(existingDefinition.getId()); + this.subscriptionService.unregisterByApiId(existingDefinition.getId()); + this.apiKeyService.unregisterByApiId(existingDefinition.getId()); + this.definitions.remove(fileName); + deployApi(fileName.toFile()); } } else if (kind == StandardWatchEventKinds.ENTRY_CREATE) { - ReactableApi loadedDefinition = this.toReactableApi(fileName.toFile()); - boolean registered = this.apiManager.register(loadedDefinition); - if (registered) { - this.definitions.put(fileName, loadedDefinition); - } + deployApi(fileName.toFile()); } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { Api existingDefinition = (Api) this.definitions.get(fileName); if (existingDefinition != null && this.apiManager.get(existingDefinition.getId()) != null) { this.apiManager.unregister(existingDefinition.getId()); + this.subscriptionService.unregisterByApiId(existingDefinition.getId()); + this.apiKeyService.unregisterByApiId(existingDefinition.getId()); this.definitions.remove(fileName); } } @@ -146,52 +171,4 @@ public Completable watch(final Path localRegistryPath, final WatchService watchS }) .ignoreElements(); } - - @Override - protected int bulkEvents() { - return 1; - } - - private ReactableApi toReactableApi(File apiDefinitionFile) { - try { - // Read API definition from event - var api = objectMapper.readValue(apiDefinitionFile, io.gravitee.repository.management.model.Api.class); - - ReactableApi reactableApi; - - // Check the version of the API definition to read the right model entity - if (DefinitionVersion.V4 != api.getDefinitionVersion()) { - var eventApiDefinition = objectMapper.readValue(api.getDefinition(), io.gravitee.definition.model.Api.class); - - // Update definition with required information for deployment phase - reactableApi = new io.gravitee.gateway.handlers.api.definition.Api(eventApiDefinition); - } else { - if (api.getType() == ApiType.NATIVE) { - var eventApiDefinition = objectMapper.readValue(api.getDefinition(), NativeApi.class); - - // Update definition with required information for deployment phase - reactableApi = new io.gravitee.gateway.reactive.handlers.api.v4.NativeApi(eventApiDefinition); - } else if (api.getType() == ApiType.PROXY || api.getType() == ApiType.MESSAGE) { - var eventApiDefinition = objectMapper.readValue(api.getDefinition(), io.gravitee.definition.model.v4.Api.class); - - // Update definition with required information for deployment phase - reactableApi = new io.gravitee.gateway.reactive.handlers.api.v4.Api(eventApiDefinition); - } else { - throw new IllegalArgumentException("Unsupported ApiType [" + api.getType() + "] for api: " + api.getId()); - } - } - - reactableApi.setEnabled(api.getLifecycleState() == LifecycleState.STARTED); - reactableApi.setDeployedAt(api.getCreatedAt()); - reactableApi.setRevision("1"); - - environmentService.fill(api.getEnvironmentId(), reactableApi); - - return reactableApi; - } catch (Exception e) { - // Log the error and ignore this event. - log.error("Unable to extract api definition from file [{}].", apiDefinitionFile.getName(), e); - return null; - } - } }