From bff9c2cfb58fba9ae60dd4a47db748dc908cece0 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Wed, 10 Apr 2024 21:40:39 +0800 Subject: [PATCH] [INLONG-9968][Manager] Support pulsar multi cluster when creating pulsar consumption groups (#9970) --- .../apply/ApproveConsumeProcessListener.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java index 5b39e14bf2b..eb54f4ba230 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java @@ -127,24 +127,28 @@ private void createPulsarSubscription(InlongConsumeEntity entity) { "mq resource cannot empty for groupId=" + groupId); String clusterTag = groupEntity.getInlongClusterTag(); - ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR); - PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; - try { - InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); - String tenant = pulsarDTO.getPulsarTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = pulsarCluster.getPulsarTenant(); + List clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + for (ClusterInfo clusterInfo : clusterInfos) { + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; + try { + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); + String tenant = pulsarDTO.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getPulsarTenant(); + } + PulsarTopicInfo topicMessage = new PulsarTopicInfo(); + topicMessage.setPulsarTenant(tenant); + topicMessage.setNamespace(mqResource); + + List topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA)); + + this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics); + } catch (Exception e) { + log.error("create pulsar topic failed", e); + throw new WorkflowListenerException( + "failed to create pulsar topic for groupId=" + groupId + ", reason: " + + e.getMessage()); } - PulsarTopicInfo topicMessage = new PulsarTopicInfo(); - topicMessage.setPulsarTenant(tenant); - topicMessage.setNamespace(mqResource); - - List topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA)); - this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics); - } catch (Exception e) { - log.error("create pulsar topic failed", e); - throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " - + e.getMessage()); } }