Skip to content

Commit

Permalink
[INLONG-10728][Audit] Add global memory control for the Audit SDK (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Jul 30, 2024
1 parent d0f1688 commit 381cbe5
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,33 +310,40 @@ public synchronized void flush(long isolateKey) {
}
long startTime = System.currentTimeMillis();
LOGGER.info("Audit flush isolate key {} ", isolateKey);
manager.checkFailedData();
resetStat();

summaryExpiredStatMap(isolateKey);

Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> iterator = this.preStatMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = iterator.next();
if (entry.getValue().isEmpty()) {
LOGGER.info("Remove the key of pre stat map: {},isolate key: {} ", entry.getKey(), isolateKey);
iterator.remove();
continue;
}
if (entry.getKey() > isolateKey) {
continue;
try {
manager.checkFailedData();
resetStat();

summaryExpiredStatMap(isolateKey);

Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> iterator =
this.preStatMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = iterator.next();
if (entry.getValue().isEmpty()) {
LOGGER.info("Remove the key of pre stat map: {},isolate key: {} ", entry.getKey(), isolateKey);
iterator.remove();
continue;
}
if (entry.getKey() > isolateKey) {
continue;
}
summaryPreStatMap(entry.getKey(), entry.getValue());
send(entry.getKey());
}
summaryPreStatMap(entry.getKey(), entry.getValue());
send(entry.getKey());

clearExpiredKey(isolateKey);
} catch (Exception exception) {
LOGGER.error("Flush audit has exception!", exception);
} finally {
manager.closeSocket();
}

clearExpiredKey(isolateKey);

manager.closeSocket();

LOGGER.info("Success report {} package, Failed report {} package, total {} message, cost: {} ms",
LOGGER.info(
"Success report {} package, Failed report {} package, total {} message, memory size {}, cost: {} ms",
auditMetric.getSuccessPack(), auditMetric.getFailedPack(), auditMetric.getTotalMsg(),
auditMetric.getMemorySize(),
System.currentTimeMillis() - startTime);

auditMetric.reset();
Expand Down Expand Up @@ -475,12 +482,25 @@ private void send(long isolateKey) {
for (Map.Entry<String, StatInfo> entry : summaryStatMap.get(isolateKey).entrySet()) {
// Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion
String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
long logTime = Long.parseLong(keyArray[0]) * PERIOD;
if (keyArray.length < 6) {
LOGGER.error("Number of keys {} <6", keyArray.length);
continue;
}

long logTime;
long auditVersion;
try {
logTime = Long.parseLong(keyArray[0]) * PERIOD;
auditVersion = Long.parseLong(keyArray[5]);
} catch (NumberFormatException numberFormatException) {
LOGGER.error("Failed to parse long from string", numberFormatException);
continue;
}

String inlongGroupID = keyArray[1];
String inlongStreamID = keyArray[2];
String auditID = keyArray[3];
String auditTag = keyArray[4];
long auditVersion = Long.parseLong(keyArray[5]);
StatInfo value = entry.getValue();
AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder()
.setLogTs(logTime)
Expand All @@ -495,6 +515,8 @@ private void send(long isolateKey) {
.build();
requestBuild.addMsgBody(msgBody);

auditMetric.addMemorySize(msgBody.toByteArray().length);

if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
Expand Down Expand Up @@ -615,4 +637,8 @@ public void setAutoUpdateAuditProxy() {
public void setUpdateInterval(int updateInterval) {
ProxyManager.getInstance().setUpdateInterval(updateInterval);
}

public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class AuditMetric {
private Long successPack = 0L;
private Long failedPack = 0L;
private Long totalMsg = 0L;
private Long memorySize = 0L;

public void addSuccessPack(long successPack) {
this.successPack += successPack;
Expand All @@ -42,9 +43,14 @@ public void addTotalMsg(long totalMsg) {
this.totalMsg += totalMsg;
}

public void addMemorySize(long memorySize) {
this.memorySize += memorySize;
}

public void reset() {
successPack = 0L;
failedPack = 0L;
totalMsg = 0L;
memorySize = 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* Audit sender manager
Expand All @@ -50,6 +51,12 @@ public class SenderManager {
private Socket socket = new Socket();
private static final int PACKAGE_HEADER_LEN = 4;
private static final int MAX_RESPONSE_LENGTH = 32 * 1024;
private static final AtomicLong globalAuditMemory = new AtomicLong(0);
private static long maxGlobalAuditMemory = 200 * 1024 * 1024;

public static void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
SenderManager.maxGlobalAuditMemory = maxGlobalAuditMemory;
}

public SenderManager(AuditConfig config) {
auditConfig = config;
Expand Down Expand Up @@ -179,11 +186,22 @@ public void checkFailedData() {
if (failedDataMap.isEmpty()) {
checkAuditFile();
}
if (failedDataMap.size() > auditConfig.getMaxCacheRow()) {
LOGGER.info("Failed cache size: {} > {}", failedDataMap.size(), auditConfig.getMaxCacheRow());

long failedDataSize = getFailedDataSize();
globalAuditMemory.addAndGet(failedDataSize);

if (failedDataMap.size() > auditConfig.getMaxCacheRow()
|| globalAuditMemory.get() > maxGlobalAuditMemory) {
LOGGER.warn("Failed cache [size: {}, threshold {}], [count {}, threshold: {}]",
globalAuditMemory.get(), maxGlobalAuditMemory,
failedDataMap.size(), auditConfig.getMaxCacheRow());

writeLocalFile();

failedDataMap.clear();
}

globalAuditMemory.addAndGet(-failedDataSize);
}

/**
Expand Down Expand Up @@ -255,10 +273,9 @@ private void checkAuditFile() {
.readObject();

for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
if (failedDataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
failedDataMap.putIfAbsent(entry.getKey(), entry.getValue());
if (!sendData(entry.getValue().getDataByte())) {
LOGGER.error("Local file recovery failed: {}", entry.getValue());
}
sendData(entry.getValue().getDataByte());
sleep();
}
} catch (IOException | ClassNotFoundException e) {
Expand Down Expand Up @@ -294,4 +311,12 @@ private void sleep() {
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
}

private long getFailedDataSize() {
long dataSize = 0;
for (AuditData auditData : failedDataMap.values()) {
dataSize += auditData.getDataByte().length;
}
return dataSize;
}
}

0 comments on commit 381cbe5

Please sign in to comment.