Skip to content

Commit

Permalink
[apache#4562] fixed dual-AZ engine disk abnormal scenario registratio…
Browse files Browse the repository at this point in the history
…n center intermittently delete instance problem (apache#4563)
  • Loading branch information
chengyouling committed Oct 22, 2024
1 parent 664d314 commit f03a530
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public void setQueryConfigurationsRequest(QueryConfigurationsRequest queryConfig

public void startConfigCenterManager() {
this.startTask(new PollConfigurationTask(0));
this.startTask(new CheckConfigCenterAddressTask());
schedulerCheckAddressAvailable("cc-addr-check", new CheckConfigCenterAddressTask(),
configCenterConfiguration.getRefreshIntervalInMillis());
}

class PollConfigurationTask implements Task {
Expand Down Expand Up @@ -96,18 +97,16 @@ public void execute() {
}
}

class CheckConfigCenterAddressTask implements Task {
class CheckConfigCenterAddressTask implements Runnable {
@Override
public void execute() {
public void run() {
List<String> isolationAddresses = configCenterAddressManager.getIsolationAddresses();
if (isolationAddresses.isEmpty()) {
return;
}
for (String address : isolationAddresses) {
configCenterClient.checkAddressAvailable(queryConfigurationsRequest, address);
}
startTask(new BackOffSleepTask(configCenterConfiguration.getRefreshIntervalInMillis(),
new CheckConfigCenterAddressTask()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void checkAddressAvailable(ConfigurationsRequest request, String address)
addressManager.recoverIsolatedAddress(address);
}
} catch (IOException e) {
LOGGER.error("check kie config isolation address {} available error!", address, e);
LOGGER.error("check kie config isolation address {} available error!", address);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected void initTaskPool(String taskName) {
public void startConfigKieManager() {
this.configurationsRequests.forEach((t) ->
this.startTask(new PollConfigurationTask(0, t)));
startTask(new CheckKieConfigAddressTask(configurationsRequests.get(0)));
schedulerCheckAddressAvailable("kie-addr-check", new CheckKieAddressTask(configurationsRequests.get(0)),
kieConfiguration.getRefreshIntervalInMillis());
}

class PollConfigurationTask implements Task {
Expand Down Expand Up @@ -162,24 +163,22 @@ public void execute() {
}
}

class CheckKieConfigAddressTask implements Task {
class CheckKieAddressTask implements Runnable {
ConfigurationsRequest configurationsRequest;

public CheckKieConfigAddressTask(ConfigurationsRequest configurationsRequest) {
public CheckKieAddressTask(ConfigurationsRequest configurationsRequest) {
this.configurationsRequest = configurationsRequest;
}

@Override
public void execute() {
public void run() {
List<String> isolationAddresses = kieAddressManager.getIsolationAddresses();
if (isolationAddresses.isEmpty()) {
return;
}
for (String address : isolationAddresses) {
configKieClient.checkAddressAvailable(this.configurationsRequest, address);
}
startTask(new BackOffSleepTask(kieConfiguration.getRefreshIntervalInMillis(),
new CheckKieConfigAddressTask(this.configurationsRequest)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void recoverIsolatedAddress(String address) {
recordSuccessState(address);
if (addressAutoRefreshed) {
if (isolationZoneAddress.remove(address)) {
LOGGER.warn("restore default address [{}]", address);
LOGGER.warn("restore same region address [{}]", address);
if (eventBus != null && availableZone.isEmpty()) {
eventBus.post(new EngineConnectChangedEvent());
}
Expand All @@ -212,7 +212,7 @@ public void recoverIsolatedAddress(String address) {
return;
}
if (defaultIsolationAddress.remove(address)) {
LOGGER.warn("restore same region address [{}]", address);
LOGGER.warn("restore default address [{}]", address);
addresses.add(address);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -66,6 +67,8 @@ public void execute() {

public static AtomicInteger taskCounter = new AtomicInteger(0);

private ScheduledExecutorService addrCheckExecutor;

protected AbstractTask(String taskName) {
initTaskPool(taskName);
Runtime.getRuntime().addShutdownHook(new Thread(AbstractTask.this::stop, taskName + "-shutdown-hook"));
Expand All @@ -76,6 +79,13 @@ protected void initTaskPool(String taskName) {
new Thread(task, taskName + "-" + taskCounter.getAndIncrement()));
}

protected void schedulerCheckAddressAvailable(String taskName, Runnable task, long delayTime) {
if (addrCheckExecutor == null) {
addrCheckExecutor = Executors.newScheduledThreadPool(1, (t) -> new Thread(t, taskName));
}
addrCheckExecutor.scheduleWithFixedDelay(task, delayTime, delayTime, TimeUnit.MILLISECONDS);
}

protected void startTask(Task task) {
if (!running) {
return;
Expand All @@ -99,6 +109,10 @@ public void stop() {
running = false;
this.taskPool.shutdown();
this.taskPool.awaitTermination(10, TimeUnit.SECONDS);
if (addrCheckExecutor != null) {
this.addrCheckExecutor.shutdown();
this.addrCheckExecutor.awaitTermination(10, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
LOGGER.warn("tasks not shutdown in time {}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,13 @@ public boolean updateMicroserviceProperties(String serviceId, Map<String, String
}

@Override
public void checkIsolationAddressAvailable(String serviceId, String instanceId) {
public void checkIsolationAddressAvailable() {
List<String> isolationAddresses = addressManager.getIsolationAddresses();
if (isolationAddresses.isEmpty()) {
return;
}
for (String address : isolationAddresses) {
httpClient.checkServiceCenterAddressAvailable("/registry/microservices/" + serviceId + "/instances/" + instanceId +
"/heartbeat", null, null, address);
httpClient.checkAddressAvailable("/registry/microservices", null, null, address);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,6 @@ boolean updateMicroserviceInstanceStatus(String serviceId, String instanceId,

/**
* Check serviceCenter isolation address available
*
* @param serviceId serviceId
* @param instanceId instanceId
*/
void checkIsolationAddressAvailable(String serviceId, String instanceId);
void checkIsolationAddressAvailable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ private HttpResponse doHttpRequest(String url, boolean absoluteUrl, Map<String,
}
}

public void checkServiceCenterAddressAvailable(String url, Map<String, String> headers, String content,
public void checkAddressAvailable(String url, Map<String, String> headers, String content,
String address) {
String formatUrl = addressManager.formatUrl(url, false, address);
HttpRequest httpRequest = buildHttpRequest(formatUrl, headers, content, HttpRequest.PUT);
HttpRequest httpRequest = buildHttpRequest(formatUrl, headers, content, HttpRequest.GET);
try {
httpTransport.doRequest(httpRequest);
addressManager.recoverIsolatedAddress(address);
} catch (IOException e) {
LOGGER.error("check service center isolation address {} available error!", address, e);
LOGGER.error("check service center isolation address {} available error!", address);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public List<SchemaInfo> getSchemaInfos() {

public void startRegistration() {
startTask(new RegisterMicroserviceTask(0));
schedulerCheckAddressAvailable("sc-addr-check", new CheckAddressTask(), heartBeatInterval);
}

class RegisterMicroserviceTask implements Task {
Expand Down Expand Up @@ -258,7 +259,6 @@ public void execute() {
microserviceInstance.getInstanceId());
eventBus.post(new MicroserviceInstanceRegistrationEvent(true, microservice, microserviceInstance));
startTask(new SendHeartBeatTask(0));
startTask(new CheckServiceCenterAddressTask());
}
} catch (Exception e) {
LOGGER.error("register microservice instance failed, and will try again.", e);
Expand Down Expand Up @@ -304,13 +304,10 @@ public void execute() {
}
}

class CheckServiceCenterAddressTask implements Task {
class CheckAddressTask implements Runnable {
@Override
public void execute() {
serviceCenterClient.checkIsolationAddressAvailable(microservice.getServiceId(),
microserviceInstance.getInstanceId());
startTask(new BackOffSleepTask(Math.max(heartBeatInterval, heartBeatRequestTimeout),
new CheckServiceCenterAddressTask()));
public void run() {
serviceCenterClient.checkIsolationAddressAvailable();
}
}
}

0 comments on commit f03a530

Please sign in to comment.