Skip to content

Commit

Permalink
feat: add a local registry synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
phiz71 committed Jan 3, 2025
1 parent 0699f88 commit 11dd35d
Show file tree
Hide file tree
Showing 5 changed files with 511 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright © 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.gateway.services.sync.process.local;

import io.gravitee.common.service.AbstractService;
import io.gravitee.common.utils.RxHelper;
import io.gravitee.gateway.services.sync.SyncManager;
import io.gravitee.gateway.services.sync.process.distributed.service.DistributedSyncService;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchService;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* @author GraviteeSource Team
*/
@RequiredArgsConstructor
@Slf4j
/*
* INTERNAL USE ONLY: this synchronizer has been introduced to facilitate the tests of the gateway. It's not production ready.
*/
public class LocalSyncManager extends AbstractService<SyncManager> implements SyncManager {

private static final int EXPONENTIAL_BACKOFF_RETRY_INITIAL_DELAY_MS = 1_000;
private static final int EXPONENTIAL_BACKOFF_RETRY_MAX_DELAY_MS = 10_000;
private static final double EXPONENTIAL_BACKOFF_RETRY_FACTOR = 1.5;

private final List<LocalSynchronizer> synchronizers;
private final DistributedSyncService distributedSyncService;
private final String localRegistryPath;

private Disposable watcherDisposable;

private final AtomicBoolean synced = new AtomicBoolean();

public LocalSyncManager(
final String localRegistryPath,
final List<LocalSynchronizer> synchronizers,
final DistributedSyncService distributedSyncService
) {
this.synchronizers = synchronizers;
this.distributedSyncService = distributedSyncService;
this.localRegistryPath = localRegistryPath;
}

@Override
protected void doStart() {
log.debug("Starting local synchronization process");
if (!distributedSyncService.isEnabled() || distributedSyncService.isPrimaryNode()) {
if (this.localRegistryPath != null && !this.localRegistryPath.isEmpty()) {
File localRegistryDir = new File(this.localRegistryPath);
if (localRegistryDir.isDirectory()) {
Path registry = Paths.get(localRegistryPath);
try {
WatchService watcher = registry.getFileSystem().newWatchService();
registry.register(
watcher,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);

watcherDisposable =
sync(localRegistryDir)
.doOnComplete(() -> {
log.debug("Moving to ready state as all resources have been synchronized");
synced.set(true);
})
.andThen(watch(registry, watcher))
.doOnError(throwable ->
log.error("An error occurred during local synchronization. Restarting ...", throwable)
)
.retryWhen(
RxHelper.retryExponentialBackoff(
EXPONENTIAL_BACKOFF_RETRY_INITIAL_DELAY_MS,
EXPONENTIAL_BACKOFF_RETRY_MAX_DELAY_MS,
TimeUnit.MILLISECONDS,
EXPONENTIAL_BACKOFF_RETRY_FACTOR
)
)
.subscribe();
} catch (IOException ex) {
log.error("An error occurred during local synchronization", ex);
}
} else {
log.error("Invalid API definitions registry directory, {} is not a directory.", localRegistryDir.getAbsolutePath());
}
} else {
log.error("Local API definitions registry path is not specified.");
}
} else {
log.warn("Local synchronization is disabled as distributed sync is enabled, and current node is secondary.");
}
}

@Override
protected void doStop() {
if (watcherDisposable != null && !watcherDisposable.isDisposed()) {
watcherDisposable.dispose();
}
}

@Override
public boolean syncDone() {
return synced.get();
}

private Completable sync(File localRegistryDir) {
return Flowable
.fromIterable(synchronizers)
.doOnNext(sync -> log.debug("{} will synchronize all resources ...", sync))
.concatMapCompletable(sync -> sync.synchronize(localRegistryDir));
}

private Completable watch(Path localRegistryPath, WatchService watcher) {
return Flowable
.fromIterable(synchronizers)
.doOnNext(sync -> log.debug("{} will start watching on resources ...", sync))
.concatMapCompletable(sync -> sync.watch(localRegistryPath, watcher));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright © 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.gateway.services.sync.process.local;

import io.reactivex.rxjava3.core.Completable;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.WatchService;

/**
* @author GraviteeSource Team
*/
public interface LocalSynchronizer {
Completable synchronize(final File localRegistryDir);
Completable watch(final Path localRegistryPath, final WatchService watcher);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.gateway.services.sync.process.local.spring;

import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.ConfigurationCondition;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class LocalSyncCondition implements ConfigurationCondition {

@Override
/*
* INTERNAL USE ONLY: this synchronizer has been introduced to facilitate the tests of the gateway. It's not production ready.
*/
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return conditionContext.getEnvironment().getProperty("services.sync.local.enabled", Boolean.class, false);
}

@Override
public ConfigurationPhase getConfigurationPhase() {
return ConfigurationPhase.REGISTER_BEAN;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright © 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.gateway.services.sync.process.local.spring;

import static io.gravitee.gateway.services.sync.SyncConfiguration.POOL_SIZE;
import static io.gravitee.gateway.services.sync.SyncConfiguration.newThreadFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.gravitee.gateway.handlers.api.manager.ApiManager;
import io.gravitee.gateway.services.sync.process.common.deployer.DeployerFactory;
import io.gravitee.gateway.services.sync.process.distributed.service.DistributedSyncService;
import io.gravitee.gateway.services.sync.process.local.LocalSyncManager;
import io.gravitee.gateway.services.sync.process.local.LocalSynchronizer;
import io.gravitee.gateway.services.sync.process.local.synchronizer.LocalApiSynchronizer;
import io.gravitee.gateway.services.sync.process.repository.mapper.ApiMapper;
import io.gravitee.gateway.services.sync.process.repository.service.EnvironmentService;
import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.ApiKeyAppender;
import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.PlanAppender;
import io.gravitee.gateway.services.sync.process.repository.synchronizer.api.SubscriptionAppender;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

/**
* @author GraviteeSource Team
*/
@Configuration
@Conditional(LocalSyncCondition.class)
public class LocalSyncConfiguration {

/*
* Local Synchronization
*/
@Bean("syncLocalExecutor")
public ThreadPoolExecutor syncLocalExecutor(@Value("${services.sync.local.threads:-1}") int syncLocal) {
int poolSize = syncLocal != -1 ? syncLocal : POOL_SIZE;
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,
poolSize,
15L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
newThreadFactory("gio.sync-local-")
);

threadPoolExecutor.allowCoreThreadTimeOut(true);

return threadPoolExecutor;
}

@Bean
public LocalApiSynchronizer localApiSynchronizer(
ObjectMapper objectMapper,
EnvironmentService environmentService,
ApiManager apiManager,
ApiMapper apiMapper,
PlanAppender planAppender,
SubscriptionAppender subscriptionAppender,
ApiKeyAppender apiKeyAppender,
DeployerFactory deployerFactory,
@Qualifier("syncLocalExecutor") ThreadPoolExecutor syncLocalExecutor,
@Qualifier("syncDeployerExecutor") ThreadPoolExecutor syncDeployerExecutor
) {
return new LocalApiSynchronizer(
objectMapper,
environmentService,
apiManager,
apiMapper,
planAppender,
subscriptionAppender,
apiKeyAppender,
deployerFactory,
syncLocalExecutor,
syncDeployerExecutor
);
}

@Bean
public LocalSyncManager localSyncManager(
@Value("${services.sync.local.path:${gravitee.home}/apis}") final String localRegistryPath,
final List<LocalSynchronizer> localSynchronizers,
final DistributedSyncService distributedSyncService
) {
return new LocalSyncManager(localRegistryPath, localSynchronizers, distributedSyncService);
}
}
Loading

0 comments on commit 11dd35d

Please sign in to comment.