Skip to content

Commit

Permalink
Use instance of LockService from Guice that is instantiated by Job Sc…
Browse files Browse the repository at this point in the history
…heduler

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Aug 27, 2024
1 parent 6095e0c commit 6a5d01b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
private final ClusterService clusterService;
private final LockService lockService;
private LockService lockService;

/**
* Constructor
Expand All @@ -43,6 +43,19 @@ public Ip2GeoLockService(final ClusterService clusterService, final Client clien
this.lockService = new LockService(client, clusterService);
}

/**
* Constructor
*
* @param clusterService the cluster service
*/
public Ip2GeoLockService(final ClusterService clusterService) {
this.clusterService = clusterService;
}

public void initialize(final LockService lockService) {
this.lockService = lockService;
}

/**
* Wrapper method of LockService#acquireLockWithId
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.search.PitService;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -37,7 +34,6 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction;
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONTransportAction;
import org.opensearch.geospatial.index.mapper.xypoint.XYPointFieldMapper;
Expand Down Expand Up @@ -79,7 +75,6 @@
import org.opensearch.geospatial.stats.upload.UploadStatsTransportAction;
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.ingest.Processor;
import org.opensearch.jobscheduler.spi.utils.LockService;
Expand All @@ -96,8 +91,6 @@
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.TransportService;
import org.opensearch.watcher.ResourceWatcherService;

import lombok.extern.log4j.Log4j2;
Expand All @@ -107,11 +100,22 @@
* to interact with Cluster.
*/
@Log4j2
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin, ClusterPlugin {
public class GeospatialPlugin extends Plugin
implements
IngestPlugin,
ActionPlugin,
MapperPlugin,
SearchPlugin,
SystemIndexPlugin,
ClusterPlugin {
private Ip2GeoCachedDao ip2GeoCachedDao;
private DatasourceDao datasourceDao;
private GeoIpDataDao geoIpDataDao;
private URLDenyListChecker urlDenyListChecker;
private ClusterService clusterService;
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoExecutor ip2GeoExecutor;
private DatasourceUpdateService datasourceUpdateService;

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
Expand Down Expand Up @@ -172,22 +176,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
datasourceDao,
geoIpDataDao,
urlDenyListChecker
);
Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
System.out.println("createComponents");
System.out.println("LockService: " + GuiceHolder.getLockService());
/**
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
*/
DatasourceRunner.getJobRunnerInstance()
.initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService);
this.clusterService = clusterService;
this.datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceDao, geoIpDataDao, urlDenyListChecker);
this.ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
this.ip2GeoLockService = new Ip2GeoLockService(clusterService);

return List.of(
UploadStats.getInstance(),
Expand Down Expand Up @@ -278,18 +270,23 @@ public List<AggregationSpec> getAggregations() {

@Override
public void onNodeStarted(DiscoveryNode localNode) {
System.out.println("onNodeStarted");
System.out.println("LockService: " + GuiceHolder.getLockService());
LockService lockService = GuiceHolder.getLockService();
ip2GeoLockService.initialize(lockService);

/**
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
*/
DatasourceRunner.getJobRunnerInstance()
.initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService);
}

public static class GuiceHolder implements LifecycleComponent {

private static LockService lockService;

@Inject
public GuiceHolder(
final LockService lockService
) {
public GuiceHolder(final LockService lockService) {
GuiceHolder.lockService = lockService;
}

Expand Down

0 comments on commit 6a5d01b

Please sign in to comment.