Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMBARI-26030: Implement Handling for DB Records Purge with Cluster ID -1 #3763

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public class CleanupDriver {

private static final String DATE_PATTERN = "yyyy-MM-dd";
private static final String CLUSTER_NAME_ARG = "cluster-name";
private static final String CLEAN_COMMON_INFO = "purge-common-info";
private static final String FROM_DATE_ARG = "from-date";

private static Options getOptions() {
Options options = new Options();
options.addOption(Option.builder().longOpt(CLUSTER_NAME_ARG).desc("The cluster name").required().type(String.class).hasArg().valueSeparator(' ').build());
options.addOption(Option.builder().longOpt(CLEAN_COMMON_INFO).desc("Clean Common Info").required().type(String.class).hasArg().valueSeparator(' ').build());
options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("Date up until data will be purged.").required().type(String.class).hasArg().valueSeparator(' ').build());
return options;
}
Expand All @@ -63,8 +65,9 @@ private static CleanupContext processArguments(String... args) {
try {
CommandLine line = cmdLineParser.parse(getOptions(), args);
String clusterName = (String) line.getParsedOptionValue(CLUSTER_NAME_ARG);
String cleanCommonInfo = (String) line.getParsedOptionValue(CLEAN_COMMON_INFO);
Date fromDate = df.parse(line.getOptionValue(FROM_DATE_ARG));
ctx = new CleanupContext(clusterName, fromDate.getTime());
ctx = new CleanupContext(clusterName, fromDate.getTime(), cleanCommonInfo);
} catch (Exception exp) {
System.err.println("Parsing failed. Reason: " + exp.getMessage());
LOGGER.error("Parsing failed. Reason: ", exp);
Expand All @@ -87,7 +90,7 @@ public static void main(String... args) throws Exception {
injector.getInstance(AmbariJpaPersistService.class).start();

CleanupServiceImpl cleanupService = injector.getInstance(CleanupServiceImpl.class);
CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp()));
CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp(), cleanupContext.getcleanCommonInfo()));

// explicitly stopping the persist service
injector.getInstance(AmbariJpaPersistService.class).stop();
Expand All @@ -107,16 +110,22 @@ public static void main(String... args) throws Exception {
private static class CleanupContext {
private String clusterName;
private Long fromDayTimestamp;
private String cleanCommonInfo;

public CleanupContext(String clusterName, Long fromDayTimestamp) {
public CleanupContext(String clusterName, Long fromDayTimestamp, String cleanCommonInfo) {
this.clusterName = clusterName;
this.fromDayTimestamp = fromDayTimestamp;
this.cleanCommonInfo = cleanCommonInfo;
}

public String getClusterName() {
return clusterName;
}

public boolean getcleanCommonInfo() {
return cleanCommonInfo.equals("true");
}

public Long getFromDayTimestamp() {
return fromDayTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public class TimeBasedCleanupPolicy {

private String clusterName;
private boolean cleanCommonInfo;
private Long toDateInMillis;

/**
Expand All @@ -31,18 +32,27 @@ public class TimeBasedCleanupPolicy {
* @param clusterName the cluster name
* @param toDateInMillis timestamp before that entities are purged.
*/
public TimeBasedCleanupPolicy(String clusterName, Long toDateInMillis) {
public TimeBasedCleanupPolicy(String clusterName, Long toDateInMillis, boolean cleanCommonInfo) {
this.clusterName = clusterName;
this.cleanCommonInfo = cleanCommonInfo;
this.toDateInMillis = toDateInMillis;
}

public TimeBasedCleanupPolicy(String clusterName, Long toDateInMillis) {
this(clusterName, toDateInMillis, false); // Set a default value for cleanCommonInfo
}

/**
* @return the cluster name
*/
public String getClusterName() {
return clusterName;
}

public boolean getcleanCommonInfo() {
return cleanCommonInfo;
}

/**
*
* @return the timestamp before that entities are purged
Expand All @@ -58,4 +68,4 @@ public Long getToDateInMillis() {
public PurgePolicy getPurgePolicy() {
return PurgePolicy.DELETE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,61 +371,70 @@ protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<
public long cleanup(TimeBasedCleanupPolicy policy) {
long affectedRows = 0;
try {
Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
// find request and stage ids that were created before date populated by user.
List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());

// find request ids from Upgrade table and exclude these ids from
// request ids set that we already have. We don't want to make any changes for upgrade
Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator();
Set<Long> requestIds = new HashSet<>();
while (requestStageIdsIterator.hasNext()) {
StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
requestStageIdsIterator.remove();
continue;
}
requestIds.add(nextRequestStageIds.getRequestId());
List<Long> clusterIdList = new ArrayList<>();
Long clusterId_after_creation = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
clusterIdList.add(clusterId_after_creation);
if(policy.getcleanCommonInfo()){
LOG.info(String.format("Including records before date with cluster_id -1 before %s",policy.getToDateInMillis()));
clusterIdList.add(-1L);
}
for(Long clusterId : clusterIdList){
LOG.info(String.format("Purging records before date with cluster_id %s before %s", clusterId, policy.getToDateInMillis()));
// find request and stage ids that were created before date populated by user.
List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());

// find request ids from Upgrade table and exclude these ids from
// request ids set that we already have. We don't want to make any changes for upgrade
Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator();
Set<Long> requestIds = new HashSet<>();
while (requestStageIdsIterator.hasNext()) {
StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
requestStageIdsIterator.remove();
continue;
}
requestIds.add(nextRequestStageIds.getRequestId());
}

// find task ids using request stage ids
Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
LinkedList<String> params = new LinkedList<>();
params.add("stageId");
params.add("requestId");

// find host task ids, to find related host requests and also to remove needed host tasks
Set<Long> hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);

// find host request ids by host task ids to remove later needed host requests
Set<Long> hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
Set<Long> topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);

//removing all entities one by one according to their relations using stage, task and request ids
affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
"ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
"TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
"TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
"TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
for (Long topologyRequestId : topologyRequestIds) {
topologyRequestDAO.removeByPK(topologyRequestId);
// find task ids using request stage ids
Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
LinkedList<String> params = new LinkedList<>();
params.add("stageId");
params.add("requestId");

// find host task ids, to find related host requests and also to remove needed host tasks
Set<Long> hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);

// find host request ids by host task ids to remove later needed host requests
Set<Long> hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
Set<Long> topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);

//removing all entities one by one according to their relations using stage, task and request ids
affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
"ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
"TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
"TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
"TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
for (Long topologyRequestId : topologyRequestIds) {
topologyRequestDAO.removeByPK(topologyRequestId);
}
affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(),
"HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(),
"RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(),
"StageEntity.removeByRequestStageIds", StageEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(),
"RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(),
"RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
"RequestEntity.removeByRequestIds", RequestEntity.class);
}
affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(),
"HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(),
"RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(),
"StageEntity.removeByRequestStageIds", StageEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(),
"RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(),
"RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
"RequestEntity.removeByRequestIds", RequestEntity.class);

} catch (AmbariException e) {
LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
Expand All @@ -434,4 +443,4 @@ public long cleanup(TimeBasedCleanupPolicy policy) {

return affectedRows;
}
}
}
1 change: 1 addition & 0 deletions ambari-server/src/main/python/ambari-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ def init_enable_stack_parser_options(parser):
@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
def init_db_purge_parser_options(parser):
parser.add_option('--cluster-name', default=None, help="Cluster name", dest="cluster_name")
parser.add_option('--purge-common-info', default="false", type="string", help="Want to purge records for cluster_id = -1",dest="clean_common_info")
parser.add_option("-d", "--from-date", dest="purge_from_date", default=None, type="string", help="Specify date for the database purge process in 'yyyy-MM-dd' format")

@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
Expand Down
10 changes: 7 additions & 3 deletions ambari-server/src/main/python/ambari_server/dbCleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
DB_CLEANUP_CMD = "{0} " \
"-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \
"--cluster-name {2} " \
"--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
"--purge-common-info {3} " \
"--from-date {4}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"

DB_DEBUG_CLEANUP_CMD = "{0} -agentlib:jdwp=transport=dt_socket,server=y,suspend={4},address={5} " \
DB_DEBUG_CLEANUP_CMD = "{0} -agentlib:jdwp=transport=dt_socket,server=y,suspend={5},address={6} " \
"-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \
"--cluster-name {2} " \
"--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
"--purge-common-info {3} " \
"--from-date {4}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"


def run_db_purge(options):
Expand Down Expand Up @@ -98,6 +100,7 @@ def run_db_purge(options):
jdk_path,
class_path,
options.cluster_name,
options.clean_common_info,
options.purge_from_date,
DEBUG_SUSPEND_AT_START,
DEBUG_PORT
Expand All @@ -107,6 +110,7 @@ def run_db_purge(options):
jdk_path,
class_path,
options.cluster_name,
options.clean_common_info,
options.purge_from_date
)

Expand Down