From 381cbe5285deaa71ef4eaea5b5a2abed9eaa6bd6 Mon Sep 17 00:00:00 2001 From: doleyzi <43397300+doleyzi@users.noreply.github.com> Date: Tue, 30 Jul 2024 09:44:28 +0800 Subject: [PATCH] [INLONG-10728][Audit] Add global memory control for the Audit SDK (#10733) --- .../inlong/audit/AuditReporterImpl.java | 72 +++++++++++++------ .../inlong/audit/entity/AuditMetric.java | 6 ++ .../inlong/audit/send/SenderManager.java | 35 +++++++-- 3 files changed, 85 insertions(+), 28 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index b4cc8028f03..e8a38da28f9 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -310,33 +310,40 @@ public synchronized void flush(long isolateKey) { } long startTime = System.currentTimeMillis(); LOGGER.info("Audit flush isolate key {} ", isolateKey); - manager.checkFailedData(); - resetStat(); - summaryExpiredStatMap(isolateKey); - - Iterator>> iterator = this.preStatMap.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - if (entry.getValue().isEmpty()) { - LOGGER.info("Remove the key of pre stat map: {},isolate key: {} ", entry.getKey(), isolateKey); - iterator.remove(); - continue; - } - if (entry.getKey() > isolateKey) { - continue; + try { + manager.checkFailedData(); + resetStat(); + + summaryExpiredStatMap(isolateKey); + + Iterator>> iterator = + this.preStatMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + if (entry.getValue().isEmpty()) { + LOGGER.info("Remove the key of pre stat map: {},isolate key: {} ", entry.getKey(), isolateKey); + iterator.remove(); + continue; + } + if (entry.getKey() > isolateKey) { + continue; + } + summaryPreStatMap(entry.getKey(), entry.getValue()); + send(entry.getKey()); } - summaryPreStatMap(entry.getKey(), entry.getValue()); - send(entry.getKey()); + clearExpiredKey(isolateKey); + } catch (Exception exception) { + LOGGER.error("Flush audit has exception!", exception); + } finally { + manager.closeSocket(); } - clearExpiredKey(isolateKey); - - manager.closeSocket(); - - LOGGER.info("Success report {} package, Failed report {} package, total {} message, cost: {} ms", + LOGGER.info( + "Success report {} package, Failed report {} package, total {} message, memory size {}, cost: {} ms", auditMetric.getSuccessPack(), auditMetric.getFailedPack(), auditMetric.getTotalMsg(), + auditMetric.getMemorySize(), System.currentTimeMillis() - startTime); auditMetric.reset(); @@ -475,12 +482,25 @@ private void send(long isolateKey) { for (Map.Entry entry : summaryStatMap.get(isolateKey).entrySet()) { // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion String[] keyArray = entry.getKey().split(FIELD_SEPARATORS); - long logTime = Long.parseLong(keyArray[0]) * PERIOD; + if (keyArray.length < 6) { + LOGGER.error("Number of keys {} <6", keyArray.length); + continue; + } + + long logTime; + long auditVersion; + try { + logTime = Long.parseLong(keyArray[0]) * PERIOD; + auditVersion = Long.parseLong(keyArray[5]); + } catch (NumberFormatException numberFormatException) { + LOGGER.error("Failed to parse long from string", numberFormatException); + continue; + } + String inlongGroupID = keyArray[1]; String inlongStreamID = keyArray[2]; String auditID = keyArray[3]; String auditTag = keyArray[4]; - long auditVersion = Long.parseLong(keyArray[5]); StatInfo value = entry.getValue(); AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder() .setLogTs(logTime) @@ -495,6 +515,8 @@ private void send(long isolateKey) { .build(); requestBuild.addMsgBody(msgBody); + auditMetric.addMemorySize(msgBody.toByteArray().length); + if (dataId++ >= BATCH_NUM) { dataId = 0; packageId++; @@ -615,4 +637,8 @@ public void setAutoUpdateAuditProxy() { public void setUpdateInterval(int updateInterval) { ProxyManager.getInstance().setUpdateInterval(updateInterval); } + + public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) { + SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory); + } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java index 000642bd640..05ac91a0029 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java @@ -29,6 +29,7 @@ public class AuditMetric { private Long successPack = 0L; private Long failedPack = 0L; private Long totalMsg = 0L; + private Long memorySize = 0L; public void addSuccessPack(long successPack) { this.successPack += successPack; @@ -42,9 +43,14 @@ public void addTotalMsg(long totalMsg) { this.totalMsg += totalMsg; } + public void addMemorySize(long memorySize) { + this.memorySize += memorySize; + } + public void reset() { successPack = 0L; failedPack = 0L; totalMsg = 0L; + memorySize = 0L; } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java index 4da90f9f0d6..f941cbae6de 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java @@ -37,6 +37,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; /** * Audit sender manager @@ -50,6 +51,12 @@ public class SenderManager { private Socket socket = new Socket(); private static final int PACKAGE_HEADER_LEN = 4; private static final int MAX_RESPONSE_LENGTH = 32 * 1024; + private static final AtomicLong globalAuditMemory = new AtomicLong(0); + private static long maxGlobalAuditMemory = 200 * 1024 * 1024; + + public static void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) { + SenderManager.maxGlobalAuditMemory = maxGlobalAuditMemory; + } public SenderManager(AuditConfig config) { auditConfig = config; @@ -179,11 +186,22 @@ public void checkFailedData() { if (failedDataMap.isEmpty()) { checkAuditFile(); } - if (failedDataMap.size() > auditConfig.getMaxCacheRow()) { - LOGGER.info("Failed cache size: {} > {}", failedDataMap.size(), auditConfig.getMaxCacheRow()); + + long failedDataSize = getFailedDataSize(); + globalAuditMemory.addAndGet(failedDataSize); + + if (failedDataMap.size() > auditConfig.getMaxCacheRow() + || globalAuditMemory.get() > maxGlobalAuditMemory) { + LOGGER.warn("Failed cache [size: {}, threshold {}], [count {}, threshold: {}]", + globalAuditMemory.get(), maxGlobalAuditMemory, + failedDataMap.size(), auditConfig.getMaxCacheRow()); + writeLocalFile(); + failedDataMap.clear(); } + + globalAuditMemory.addAndGet(-failedDataSize); } /** @@ -255,10 +273,9 @@ private void checkAuditFile() { .readObject(); for (Map.Entry entry : fileData.entrySet()) { - if (failedDataMap.size() < (auditConfig.getMaxCacheRow() / 2)) { - failedDataMap.putIfAbsent(entry.getKey(), entry.getValue()); + if (!sendData(entry.getValue().getDataByte())) { + LOGGER.error("Local file recovery failed: {}", entry.getValue()); } - sendData(entry.getValue().getDataByte()); sleep(); } } catch (IOException | ClassNotFoundException e) { @@ -294,4 +311,12 @@ private void sleep() { public void setAuditConfig(AuditConfig config) { auditConfig = config; } + + private long getFailedDataSize() { + long dataSize = 0; + for (AuditData auditData : failedDataMap.values()) { + dataSize += auditData.getDataByte().length; + } + return dataSize; + } }