From e19626662b49cd2fc0c70b84ce2d43e0c672104e Mon Sep 17 00:00:00 2001 From: parit-27 Date: Sat, 9 Dec 2023 14:55:19 +0530 Subject: [PATCH] AMBARI-26030: Implement Handling for DB Records Purge with Cluster ID -1 --- .../ambari/server/cleanup/CleanupDriver.java | 15 ++- .../cleanup/TimeBasedCleanupPolicy.java | 14 ++- .../ambari/server/orm/dao/RequestDAO.java | 115 ++++++++++-------- .../src/main/python/ambari-server.py | 1 + .../main/python/ambari_server/dbCleanup.py | 10 +- 5 files changed, 94 insertions(+), 61 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java index 27bdc90159d..e6d2517dd01 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java @@ -45,11 +45,13 @@ public class CleanupDriver { private static final String DATE_PATTERN = "yyyy-MM-dd"; private static final String CLUSTER_NAME_ARG = "cluster-name"; + private static final String CLEAN_COMMON_INFO = "purge-common-info"; private static final String FROM_DATE_ARG = "from-date"; private static Options getOptions() { Options options = new Options(); options.addOption(Option.builder().longOpt(CLUSTER_NAME_ARG).desc("The cluster name").required().type(String.class).hasArg().valueSeparator(' ').build()); + options.addOption(Option.builder().longOpt(CLEAN_COMMON_INFO).desc("Clean Common Info").required().type(String.class).hasArg().valueSeparator(' ').build()); options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("Date up until data will be purged.").required().type(String.class).hasArg().valueSeparator(' ').build()); return options; } @@ -63,8 +65,9 @@ private static CleanupContext processArguments(String... args) { try { CommandLine line = cmdLineParser.parse(getOptions(), args); String clusterName = (String) line.getParsedOptionValue(CLUSTER_NAME_ARG); + String cleanCommonInfo = (String) line.getParsedOptionValue(CLEAN_COMMON_INFO); Date fromDate = df.parse(line.getOptionValue(FROM_DATE_ARG)); - ctx = new CleanupContext(clusterName, fromDate.getTime()); + ctx = new CleanupContext(clusterName, fromDate.getTime(), cleanCommonInfo); } catch (Exception exp) { System.err.println("Parsing failed. Reason: " + exp.getMessage()); LOGGER.error("Parsing failed. Reason: ", exp); @@ -87,7 +90,7 @@ public static void main(String... args) throws Exception { injector.getInstance(AmbariJpaPersistService.class).start(); CleanupServiceImpl cleanupService = injector.getInstance(CleanupServiceImpl.class); - CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp())); + CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp(), cleanupContext.getcleanCommonInfo())); // explicitly stopping the persist service injector.getInstance(AmbariJpaPersistService.class).stop(); @@ -107,16 +110,22 @@ public static void main(String... args) throws Exception { private static class CleanupContext { private String clusterName; private Long fromDayTimestamp; + private String cleanCommonInfo; - public CleanupContext(String clusterName, Long fromDayTimestamp) { + public CleanupContext(String clusterName, Long fromDayTimestamp, String cleanCommonInfo) { this.clusterName = clusterName; this.fromDayTimestamp = fromDayTimestamp; + this.cleanCommonInfo = cleanCommonInfo; } public String getClusterName() { return clusterName; } + public boolean getcleanCommonInfo() { + return cleanCommonInfo.equals("true"); + } + public Long getFromDayTimestamp() { return fromDayTimestamp; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/TimeBasedCleanupPolicy.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/TimeBasedCleanupPolicy.java index 4fd5ca3a2a3..bacfe33699f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/TimeBasedCleanupPolicy.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/TimeBasedCleanupPolicy.java @@ -23,6 +23,7 @@ public class TimeBasedCleanupPolicy { private String clusterName; + private boolean cleanCommonInfo; private Long toDateInMillis; /** @@ -31,11 +32,16 @@ public class TimeBasedCleanupPolicy { * @param clusterName the cluster name * @param toDateInMillis timestamp before that entities are purged. */ - public TimeBasedCleanupPolicy(String clusterName, Long toDateInMillis) { + public TimeBasedCleanupPolicy(String clusterName, Long toDateInMillis, boolean cleanCommonInfo) { this.clusterName = clusterName; + this.cleanCommonInfo = cleanCommonInfo; this.toDateInMillis = toDateInMillis; } + public TimeBasedCleanupPolicy(String clusterName, Long toDateInMillis) { + this(clusterName, toDateInMillis, false); // Set a default value for cleanCommonInfo + } + /** * @return the cluster name */ @@ -43,6 +49,10 @@ public String getClusterName() { return clusterName; } + public boolean getcleanCommonInfo() { + return cleanCommonInfo; + } + /** * * @return the timestamp before that entities are purged @@ -58,4 +68,4 @@ public Long getToDateInMillis() { public PurgePolicy getPurgePolicy() { return PurgePolicy.DELETE; } -} +} \ No newline at end of file diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java index 42e5ca669fa..d2ffa395eb0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java @@ -371,61 +371,70 @@ protected int cleanTableByStageEntityPK(List ids, LinkedList< public long cleanup(TimeBasedCleanupPolicy policy) { long affectedRows = 0; try { - Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId(); - // find request and stage ids that were created before date populated by user. - List requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis()); - - // find request ids from Upgrade table and exclude these ids from - // request ids set that we already have. We don't want to make any changes for upgrade - Set requestIdsFromUpgrade = findAllRequestIdsFromUpgrade(); - Iterator requestStageIdsIterator = requestStageIds.iterator(); - Set requestIds = new HashSet<>(); - while (requestStageIdsIterator.hasNext()) { - StageEntityPK nextRequestStageIds = requestStageIdsIterator.next(); - if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) { - requestStageIdsIterator.remove(); - continue; - } - requestIds.add(nextRequestStageIds.getRequestId()); + List clusterIdList = new ArrayList<>(); + Long clusterId_after_creation = m_clusters.get().getCluster(policy.getClusterName()).getClusterId(); + clusterIdList.add(clusterId_after_creation); + if(policy.getcleanCommonInfo()){ + LOG.info(String.format("Including records before date with cluster_id -1 before %s",policy.getToDateInMillis())); + clusterIdList.add(-1L); } + for(Long clusterId : clusterIdList){ + LOG.info(String.format("Purging records before date with cluster_id %s before %s", clusterId, policy.getToDateInMillis())); + // find request and stage ids that were created before date populated by user. + List requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis()); + + // find request ids from Upgrade table and exclude these ids from + // request ids set that we already have. We don't want to make any changes for upgrade + Set requestIdsFromUpgrade = findAllRequestIdsFromUpgrade(); + Iterator requestStageIdsIterator = requestStageIds.iterator(); + Set requestIds = new HashSet<>(); + while (requestStageIdsIterator.hasNext()) { + StageEntityPK nextRequestStageIds = requestStageIdsIterator.next(); + if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) { + requestStageIdsIterator.remove(); + continue; + } + requestIds.add(nextRequestStageIds.getRequestId()); + } - // find task ids using request stage ids - Set taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds); - LinkedList params = new LinkedList<>(); - params.add("stageId"); - params.add("requestId"); - - // find host task ids, to find related host requests and also to remove needed host tasks - Set hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds); - - // find host request ids by host task ids to remove later needed host requests - Set hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds); - Set topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds); - - //removing all entities one by one according to their relations using stage, task and request ids - affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(), - "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class); - affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(), - "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class); - affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(), - "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class); - affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(), - "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class); - for (Long topologyRequestId : topologyRequestIds) { - topologyRequestDAO.removeByPK(topologyRequestId); + // find task ids using request stage ids + Set taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds); + LinkedList params = new LinkedList<>(); + params.add("stageId"); + params.add("requestId"); + + // find host task ids, to find related host requests and also to remove needed host tasks + Set hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds); + + // find host request ids by host task ids to remove later needed host requests + Set hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds); + Set topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds); + + //removing all entities one by one according to their relations using stage, task and request ids + affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(), + "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class); + affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(), + "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class); + affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(), + "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class); + affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(), + "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class); + for (Long topologyRequestId : topologyRequestIds) { + topologyRequestDAO.removeByPK(topologyRequestId); + } + affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(), + "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class); + affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(), + "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class); + affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(), + "StageEntity.removeByRequestStageIds", StageEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(), + "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(), + "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(), + "RequestEntity.removeByRequestIds", RequestEntity.class); } - affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(), - "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class); - affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(), - "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class); - affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(), - "StageEntity.removeByRequestStageIds", StageEntity.class); - affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(), - "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class); - affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(), - "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class); - affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(), - "RequestEntity.removeByRequestIds", RequestEntity.class); } catch (AmbariException e) { LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e); @@ -434,4 +443,4 @@ public long cleanup(TimeBasedCleanupPolicy policy) { return affectedRows; } -} +} \ No newline at end of file diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py index 601f4d3dda2..e63640b3142 100755 --- a/ambari-server/src/main/python/ambari-server.py +++ b/ambari-server/src/main/python/ambari-server.py @@ -674,6 +674,7 @@ def init_enable_stack_parser_options(parser): @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT) def init_db_purge_parser_options(parser): parser.add_option('--cluster-name', default=None, help="Cluster name", dest="cluster_name") + parser.add_option('--purge-common-info', default="false", type="string", help="Want to purge records for cluster_id = -1",dest="clean_common_info") parser.add_option("-d", "--from-date", dest="purge_from_date", default=None, type="string", help="Specify date for the database purge process in 'yyyy-MM-dd' format") @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT) diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py index 860d144e74d..384c0420d86 100644 --- a/ambari-server/src/main/python/ambari_server/dbCleanup.py +++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py @@ -40,12 +40,14 @@ DB_CLEANUP_CMD = "{0} " \ "-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \ "--cluster-name {2} " \ - "--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1" + "--purge-common-info {3} " \ + "--from-date {4}> " + configDefaults.SERVER_OUT_FILE + " 2>&1" -DB_DEBUG_CLEANUP_CMD = "{0} -agentlib:jdwp=transport=dt_socket,server=y,suspend={4},address={5} " \ +DB_DEBUG_CLEANUP_CMD = "{0} -agentlib:jdwp=transport=dt_socket,server=y,suspend={5},address={6} " \ "-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \ "--cluster-name {2} " \ - "--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1" + "--purge-common-info {3} " \ + "--from-date {4}> " + configDefaults.SERVER_OUT_FILE + " 2>&1" def run_db_purge(options): @@ -98,6 +100,7 @@ def run_db_purge(options): jdk_path, class_path, options.cluster_name, + options.clean_common_info, options.purge_from_date, DEBUG_SUSPEND_AT_START, DEBUG_PORT @@ -107,6 +110,7 @@ def run_db_purge(options): jdk_path, class_path, options.cluster_name, + options.clean_common_info, options.purge_from_date )