Skip to content

Commit

Permalink
[INLONG-9948][Agent] Optimize the instance class to decrease the comp…
Browse files Browse the repository at this point in the history
…lexity of usage (apache#9952)

Co-authored-by: AloysZhang <[email protected]>
  • Loading branch information
justinwwhuang and aloyszhang authored Apr 10, 2024
1 parent 98c8491 commit ec6ee22
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 495 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Sink;
import org.apache.inlong.agent.plugin.file.Source;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* common instance contains source and sink.
* main job is to read from source and write to sink
*/
public abstract class CommonInstance extends Instance {

private static final Logger LOGGER = LoggerFactory.getLogger(CommonInstance.class);
public static final int HEARTBEAT_CHECK_GAP = 10;
private Source source;
private Sink sink;
private InstanceProfile profile;
public static final int CORE_THREAD_SLEEP_TIME = 10;
private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private InstanceManager instanceManager;
private volatile boolean running = false;
private volatile boolean inited = false;
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();

@Override
public boolean init(Object srcManager, InstanceProfile srcProfile) {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
setInodeInfo(profile);
LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(),
profile.getInstanceId(), profile.toJsonStr());
source = (Source) Class.forName(profile.getSourceClass()).newInstance();
source.init(profile);
sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
sink.init(profile);
inited = true;
return true;
} catch (Throwable e) {
handleSourceDeleted();
doChangeState(State.FATAL);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
return false;
}
}

/**
* @throws IOException
*/
public abstract void setInodeInfo(InstanceProfile profile) throws IOException;

@Override
public void destroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
while (running) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
this.source.destroy();
this.sink.destroy();
}

@Override
public void run() {
Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId());
running = true;
try {
doRun();
} catch (Throwable e) {
LOGGER.error("do run error: ", e);
}
running = false;
}

private void doRun() {
while (!isFinished()) {
if (!source.sourceExist()) {
handleSourceDeleted();
break;
}
Message msg = source.read();
if (msg == null) {
if (source.sourceFinish() && sink.sinkFinish()) {
checkFinishCount++;
if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
handleReadEnd();
break;
}
} else {
checkFinishCount = 0;
}
heartbeatStatic();
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
} else {
boolean suc = false;
while (!isFinished() && !suc) {
suc = sink.write(msg);
if (!suc) {
heartbeatStatic();
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
}
}
heartbeatcheckCount++;
if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatStatic();
}
}
}
}

private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(),
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1);
heartbeatcheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
}

private void handleReadEnd() {
while (!isFinished() && !instanceManager.submitAction(new InstanceAction(ActionType.FINISH, profile))) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}

private void handleSourceDeleted() {
OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}

@Override
public void addCallbacks() {

}

@Override
public String getTaskId() {
return profile.getTaskId();
}

@Override
public String getInstanceId() {
return profile.getInstanceId();
}

public Sink getSink() {
return sink;
}

public InstanceProfile getProfile() {
return profile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,184 +19,18 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Sink;
import org.apache.inlong.agent.plugin.file.Source;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.io.IOException;

/**
* file instance contains source and sink.
* main job is to read from source and write to sink
*/
public class FileInstance extends Instance {

private static final Logger LOGGER = LoggerFactory.getLogger(FileInstance.class);
public static final int HEARTBEAT_CHECK_GAP = 10;
private Source source;
private Sink sink;
private InstanceProfile profile;
public static final int CORE_THREAD_SLEEP_TIME = 10;
private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private InstanceManager instanceManager;
private volatile boolean running = false;
private volatile boolean inited = false;
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();

@Override
public boolean init(Object srcManager, InstanceProfile srcProfile) {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId()));
LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(),
profile.getInstanceId(), profile.toJsonStr());
source = (Source) Class.forName(profile.getSourceClass()).newInstance();
source.init(profile);
sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
sink.init(profile);
inited = true;
return true;
} catch (Throwable e) {
handleSourceDeleted();
doChangeState(State.FATAL);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
return false;
}
}

@Override
public void destroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
while (running) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
this.source.destroy();
this.sink.destroy();
}

@Override
public void run() {
Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId());
running = true;
try {
doRun();
} catch (Throwable e) {
LOGGER.error("do run error: ", e);
}
running = false;
}

private void doRun() {
while (!isFinished()) {
if (!source.sourceExist()) {
handleSourceDeleted();
break;
}
Message msg = source.read();
if (msg == null) {
if (source.sourceFinish() && sink.sinkFinish()) {
checkFinishCount++;
if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
handleReadEnd();
break;
}
} else {
checkFinishCount = 0;
}
heartbeatStatic();
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
} else {
boolean suc = false;
while (!isFinished() && !suc) {
suc = sink.write(msg);
if (!suc) {
heartbeatStatic();
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
}
}
heartbeatcheckCount++;
if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatStatic();
}
}
}
}

private void heartbeatStatic() {
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
heartbeatcheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
}

private void handleReadEnd() {
InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}

private void handleSourceDeleted() {
OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}

@Override
public void addCallbacks() {

}
public class FileInstance extends CommonInstance {

@Override
public String getTaskId() {
return profile.getTaskId();
}

@Override
public String getInstanceId() {
return profile.getInstanceId();
}

public Sink getSink() {
return sink;
}

public InstanceProfile getProfile() {
return profile;
public void setInodeInfo(InstanceProfile profile) throws IOException {
profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId()));
}
}
Loading

0 comments on commit ec6ee22

Please sign in to comment.