Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-384] Decouple Bookkeeper client from ManagedLedgerStorage and enable multiple ManagedLedgerFactory instances #23313

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;

@Getter
@Setter
private String storageClassName;
@Getter
@Setter
private String shadowSourceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -39,16 +41,18 @@
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerClientFactory implements ManagedLedgerStorage {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);

private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
private BookkeeperManagedLedgerStorageClass defaultStorageClass;
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
Expand Down Expand Up @@ -119,20 +123,50 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
defaultBkClient.close();
throw e;
}

defaultStorageClass = new BookkeeperManagedLedgerStorageClass() {
@Override
public String getName() {
return DEFAULT_STORAGE_CLASS_NAME;
}

@Override
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
}

@Override
public StatsProvider getStatsProvider() {
return statsProvider;
}

@Override
public BookKeeper getBookKeeperClient() {
return defaultBkClient;
}
};
}

public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
@Override
public Collection<ManagedLedgerStorageClass> getStorageClasses() {
return List.of(getDefaultStorageClass());
}

public BookKeeper getBookKeeperClient() {
return defaultBkClient;
@Override
public Optional<ManagedLedgerStorageClass> getManagedLedgerStorageClass(String name) {
if (name == null || DEFAULT_STORAGE_CLASS_NAME.equals(name)) {
return Optional.of(getDefaultStorageClass());
} else {
return Optional.empty();
}
}

public StatsProvider getStatsProvider() {
return statsProvider;
@Override
public ManagedLedgerStorageClass getDefaultStorageClass() {
return defaultStorageClass;
}


@VisibleForTesting
public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
Expand Down Expand Up @@ -210,7 +212,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private static final int DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS = 8;
private final ServiceConfiguration config;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
private ManagedLedgerStorage managedLedgerStorage = null;
private LeaderElectionService leaderElectionService = null;
private BrokerService brokerService = null;
private WebService webService = null;
Expand Down Expand Up @@ -606,13 +608,13 @@ public CompletableFuture<Void> closeAsync() {
this.brokerService = null;
}

if (this.managedLedgerClientFactory != null) {
if (this.managedLedgerStorage != null) {
try {
this.managedLedgerClientFactory.close();
this.managedLedgerStorage.close();
} catch (Exception e) {
LOG.warn("ManagedLedgerClientFactory closing failed {}", e.getMessage());
}
this.managedLedgerClientFactory = null;
this.managedLedgerStorage = null;
}

if (bkClientFactory != null) {
Expand Down Expand Up @@ -899,7 +901,7 @@ public void start() throws PulsarServerException {
// Now we are ready to start services
this.bkClientFactory = newBookKeeperClientFactory();

managedLedgerClientFactory = newManagedLedgerClientFactory();
managedLedgerStorage = newManagedLedgerStorage();

this.brokerService = newBrokerService(this);

Expand Down Expand Up @@ -1122,7 +1124,7 @@ protected OrderedExecutor newOrderedExecutor() {
}

@VisibleForTesting
protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception {
protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
return ManagedLedgerStorage.create(
config, localMetadataStore,
bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry()
Expand Down Expand Up @@ -1348,7 +1350,7 @@ private synchronized void startLoadBalancerTasks() {
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor,
config, getManagedLedgerFactory());
config, getDefaultManagedLedgerFactory());
loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
Expand Down Expand Up @@ -1535,11 +1537,17 @@ public WorkerService getWorkerService() throws UnsupportedOperationException {
}

public BookKeeper getBookKeeperClient() {
return getManagedLedgerClientFactory().getBookKeeperClient();
ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass();
if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) {
return bkStorageClass.getBookKeeperClient();
} else {
// TODO: Refactor code to support other than default bookkeeper based storage class
throw new UnsupportedOperationException("BookKeeper client is not available");
}
}

public ManagedLedgerFactory getManagedLedgerFactory() {
return getManagedLedgerClientFactory().getManagedLedgerFactory();
public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
return getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());

return pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
v -> transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
v -> transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerStorage().getManagedLedgerStorageClass(v.getStorageClassName())
.get().getManagedLedgerFactory(), v,
timeoutTracker, recoverTracker,
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), txnLogBufferedWriterConfig,
brokerClientSharedTimer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2540,7 +2540,7 @@ protected void internalScanOffloadedLedgers(OffloaderObjectsScannerUtils.Scanner
String localClusterName = pulsar().getConfiguration().getClusterName();

OffloaderObjectsScannerUtils.scanOffloadedLedgers(managedLedgerOffloader,
localClusterName, pulsar().getManagedLedgerFactory(), sink);
localClusterName, pulsar().getDefaultManagedLedgerFactory(), sink);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,19 +1405,27 @@ protected void internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
.thenAccept(__ -> {
String managedLedger = topicName.getPersistenceNamingEncoding();
pulsar().getManagedLedgerFactory()
.asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
objectWriter().writeValue(output, info);
pulsar().getBrokerService().getManagedLedgerFactoryForTopic(topicName)
.thenAccept(managedLedgerFactory -> {
managedLedgerFactory.asyncGetManagedLedgerInfo(managedLedger,
new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
objectWriter().writeValue(output, info);
});
}

@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}).exceptionally(ex -> {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -3174,7 +3182,9 @@ protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync
try {
PersistentOfflineTopicStats estimateOfflineTopicStats =
offlineTopicBacklog.estimateUnloadedTopicBacklog(
pulsar().getManagedLedgerFactory(),
pulsar().getBrokerService()
.getManagedLedgerFactoryForTopic(topicName,
config.getStorageClassName()),
topicName);
pulsar().getBrokerService()
.cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
Expand Down
Loading
Loading