Skip to content

Commit

Permalink
Merge pull request #517 from CloudSlang/oracle_memory_management
Browse files Browse the repository at this point in the history
ORA-04031 fix
  • Loading branch information
Akash-Anand0744 authored Jan 20, 2025
2 parents bfbab09 + a0ade43 commit c6c8f80
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

import java.util.List;

Expand Down Expand Up @@ -56,4 +58,8 @@ public interface WorkerNodeRepository extends JpaRepository<WorkerNode,Long> {

@Modifying @Query("update WorkerNode w set w.uuid = w.uuid where w.uuid = ?1")
void lockByUuid(String uuid);

@Query(value = "SELECT w FROM WorkerNode w",
countQuery = "SELECT count(w) FROM WorkerNode w")
Page<WorkerNode> findAllWithPagination(Pageable pageable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -52,6 +54,7 @@ public class WorkerNodeServiceImpl implements WorkerNodeService {
private static final long MAX_VERSION_GAP_ALLOWED = Long.getLong("max.allowed.version.gap.worker.recovery", 2);
private static boolean disableMonitoring = Boolean.getBoolean("global.worker.monitoring.disable");
private static final String MSG_RECOVERY_VERSION_NAME = "MSG_RECOVERY_VERSION";
private static final int WORKER_GROUPS_PAGE_SIZE = Integer.getInteger("worker.groups.page.size", 50);

@Autowired
private WorkerNodeRepository workerNodeRepository;
Expand Down Expand Up @@ -271,7 +274,24 @@ public List<WorkerNode> readAllWorkers() {
@Override
@Transactional(readOnly = true)
public Map<String, Set<String>> readWorkerGroupsMap() {
List<WorkerNode> all = workerNodeRepository.findAll();
/**
* Reduces memory usage by loading data in chunks of WORKER_GROUPS_PAGE_SIZE
* Prevent ORA-04031.
*/
int currentPage = 0;
List<WorkerNode> all = new ArrayList<>();

Page<WorkerNode> page;
do {
page = workerNodeRepository.findAllWithPagination(
PageRequest.of(currentPage, WORKER_GROUPS_PAGE_SIZE)
);
if (page != null && page.hasContent()) {
all.addAll(page.getContent());
}
currentPage++;
} while (page.hasNext());

Map<String, Set<String>> workerGroupsMap = newHashMapWithExpectedSize(all.size());
for (WorkerNode workerNode : all) {
workerGroupsMap.put(workerNode.getUuid(), new HashSet<>(workerNode.getGroups()));
Expand Down Expand Up @@ -507,5 +527,4 @@ public void updateWRV(String workerUuid, String wrv) {
WorkerNode worker = workerNodeRepository.findByUuid(workerUuid);
worker.setWorkerRecoveryVersion(wrv);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Collections;

import static java.lang.Long.parseLong;
import static org.apache.commons.io.IOUtils.toByteArray;
Expand Down Expand Up @@ -334,8 +335,7 @@ public class ExecutionQueueRepositoryImpl implements ExecutionQueueRepository {
final private String BUSY_WORKERS_SQL =
"SELECT ASSIGNED_WORKER " +
" FROM OO_EXECUTION_QUEUES q " +
" WHERE " +
" (q.STATUS IN (:status)) AND " +
" WHERE q.STATUS IN (%s) AND " +
" (NOT EXISTS (SELECT qq.MSG_SEQ_ID " +
" FROM OO_EXECUTION_QUEUES qq " +
" WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID)) " +
Expand Down Expand Up @@ -897,15 +897,19 @@ public List<ExecutionMessage> findByStatuses(int maxSize, ExecStatus... statuses

@Override
public List<String> getBusyWorkers(ExecStatus... statuses) {
/**
* Uses bind parameters to avoid memory issues. This way the database uses fewer unique statements and thus reduces memory usage
* This way the database can reuse the same statement for multiple executions and thus prevent ORA-04031.
*/
String bindParams = String.join(",", Collections.nCopies(statuses.length, "?"));
// prepare the sql statement
String sqlStat = BUSY_WORKERS_SQL
.replaceAll(":status", StringUtils.repeat("?", ",", statuses.length));
String sqlStat = String.format(BUSY_WORKERS_SQL, bindParams);

// prepare the argument
Object[] values = new Object[statuses.length];
int i = 0;
for (ExecStatus status : statuses) {
values[i] = status.getNumber();
}
Object[] values = Arrays.stream(statuses)
.map(ExecStatus::getNumber)
.toArray();

return doSelectWithTemplate(getBusyWorkersJdbcTemplate, sqlStat, new BusyWorkerRowMapper(), values);
}

Expand Down

0 comments on commit c6c8f80

Please sign in to comment.