Skip to content

Commit

Permalink
fix issue #358 : detect rule cyclic in DAG (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
puremilkfan authored and saintping committed Nov 15, 2019
1 parent bd66562 commit c738f53
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,48 +1,45 @@
package com.webank.weevent.governance.controller;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.webank.weevent.governance.exception.GovernanceException;
import com.webank.weevent.governance.service.CommonService;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.bind.annotation.RestController;

@CrossOrigin
@Slf4j
@RestController
public class ForwardController {


@Autowired
private RestTemplate restTemplate;
private CommonService commonService;

@Value("${weevent.url}")
private String url;

@RequestMapping(value = "/weevent/{path1}/{path2}", method = RequestMethod.GET)
public Object forward(@PathVariable(name = "path1") String path1, @PathVariable(name = "path2") String path2) {
public Object forward(HttpServletRequest request, HttpServletResponse response, @PathVariable(name = "path1") String path1, @PathVariable(name = "path2") String path2)throws GovernanceException {
log.info("weevent url: /wevent/ {} \"/\" {}", path1, path2);
String forwarUrl = new StringBuffer(this.url).append("/").append(path1).append("/").append(path2).toString();
Object result = restTemplate.getForEntity(forwarUrl, Object.class).getBody();
return result;
try {
CloseableHttpResponse closeResponse = commonService.getCloseResponse(request, forwarUrl);
return EntityUtils.toString(closeResponse.getEntity());
} catch (Exception e) {
throw new GovernanceException(e.getMessage());
}
}

@RequestMapping(value = "/weevent/admin/deploy_topic_control", method = RequestMethod.GET)
public Object forward() {
log.info("wevent url: /weevent/admin/deploy_topic_control");
String forwarUrl = new StringBuffer(this.url).append("/admin/deploy_topic_control").toString();
String result = restTemplate.getForEntity(forwarUrl, String.class).getBody();
return result;
}

@RequestMapping(value = "/weevent/{path1}/{path2}")
public Object forward(@PathVariable(name = "path1") String path1, @PathVariable(name = "path2") String path2,
@RequestParam String topic) {
log.info("weevent url: /wevent/ {} \"/\" {}", path1, path2);

String forwarUrl = new StringBuffer(this.url).append("/").append(path1).append("/").append(path2).append("?topic=").append(topic).toString();
Object result = restTemplate.getForEntity(forwarUrl, Object.class).getBody();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ public interface RuleEngineMapper {
// update RuleEngineEntity
Boolean updateRuleEngine(RuleEngineEntity ruleEngineEntity);

List<RuleEngineEntity> getRuleTopicList(RuleEngineEntity ruleEngineEntity);

Boolean updateRuleEngineStatus(RuleEngineEntity ruleEngineEntity);

int countRuleEngine(RuleEngineEntity ruleEngineEntity);

RuleEngineEntity getRuleById(Integer id);

List<RuleEngineEntity> checkRuleNameRepeat(RuleEngineEntity ruleEngineEntity);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
Expand Down Expand Up @@ -85,7 +87,6 @@ public CloseableHttpResponse getCloseResponse(HttpServletRequest req, String new
public CloseableHttpResponse getCloseResponse(HttpServletRequest req, String newUrl, String jsonString) throws ServletException {
CloseableHttpResponse closeResponse;
try {

log.info("url {}", newUrl);
CloseableHttpClient client = this.generateHttpClient(newUrl);
if (req.getMethod().equals(METHOD_TYPE)) {
Expand Down Expand Up @@ -276,6 +277,13 @@ private static String truncateUrlPage(String strURL) {
return strAllParam;
}

public Set<String> mergeSet(Set<String> list1, Set<String> list2) {
Set<String> set = new HashSet<>();
set.addAll(list1);
set.addAll(list2);
return set;
}


@Override
public void close() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,6 +30,7 @@
import com.webank.weevent.governance.mapper.RuleEngineMapper;
import com.webank.weevent.governance.properties.ConstantProperties;
import com.webank.weevent.governance.utils.CookiesTools;
import com.webank.weevent.governance.utils.DAGDetectUtil;
import com.webank.weevent.governance.utils.NumberValidationUtils;

import com.alibaba.fastjson.JSONObject;
Expand Down Expand Up @@ -67,6 +69,10 @@ public class RuleEngineService {
@Autowired
private BrokerMapper brokerMapper;

@Autowired
private DAGDetectUtil dagDetectUtil;


@Value("${weevent.processor.url:http://127.0.0.1:7008}")
private String processorUrl;

Expand Down Expand Up @@ -271,6 +277,10 @@ public boolean updateRuleEngine(RuleEngineEntity ruleEngineEntity, HttpServletRe
if (!flag) {
throw new GovernanceException("conditional is illegal");
}
flag = verifyInfiniteLoop(ruleEngineEntity);
if (!flag) {
throw new GovernanceException("update rule failed, detected DAG loop at topic [" + ruleEngineEntity.getFromDestination() + "]");
}
RuleDatabaseEntity ruleDataBase = getRuleDataBase(ruleEngineEntity.getRuleDataBaseId());
if (ruleDataBase != null) {
ruleEngineEntity.setDatabaseUrl(ruleDataBase.getDatabaseUrl() + "&tableName=" + ruleDataBase.getTableName());
Expand Down Expand Up @@ -661,8 +671,8 @@ private void checkRule(RuleEngineEntity ruleEngineEntity) throws GovernanceExcep
if (ruleEngineEntity.getPayloadMap().isEmpty()) {
throw new GovernanceException("rule description is empty");
}
if (ruleEngineEntity.getPayload() != null && ruleEngineEntity.getPayload().length() > 100) {
throw new GovernanceException("rule description length cannot exceed 100");
if (ruleEngineEntity.getPayload() != null && ruleEngineEntity.getPayload().length() > 4096) {
throw new GovernanceException("rule description length cannot exceed 4096");
}

}
Expand All @@ -677,12 +687,9 @@ private boolean checkRuleName(String ruleName, String regex) {
//check name repeat
private boolean checkRuleNameRepeat(RuleEngineEntity ruleEngineEntity) {
RuleEngineEntity rule = new RuleEngineEntity();
rule.setGroupId(ruleEngineEntity.getGroupId());
rule.setUserId(ruleEngineEntity.getUserId());
rule.setBrokerId(ruleEngineEntity.getBrokerId());
rule.setRuleName(ruleEngineEntity.getRuleName());
rule.setSystemTag("2");
List<RuleEngineEntity> ruleEngines = ruleEngineMapper.getRuleEngines(rule);
List<RuleEngineEntity> ruleEngines = ruleEngineMapper.checkRuleNameRepeat(rule);
if (CollectionUtils.isEmpty(ruleEngines)) {
return true;
}
Expand Down Expand Up @@ -789,4 +796,28 @@ private RuleDatabaseEntity getRuleDataBase(Integer id) {
return ruleDatabaseMapper.getRuleDataBaseById(id);
}

private boolean verifyInfiniteLoop(RuleEngineEntity ruleEngineEntity) {
if (!ConditionTypeEnum.TOPIC.getCode().equals(ruleEngineEntity.getConditionType())) {
return true;
}
RuleEngineEntity rule = new RuleEngineEntity();
rule.setGroupId(ruleEngineEntity.getGroupId());
rule.setBrokerId(ruleEngineEntity.getBrokerId());

//query all historical rules according to brokerId groupId
List<RuleEngineEntity> ruleTopicList = ruleEngineMapper.getRuleTopicList(rule);
if (CollectionUtils.isEmpty(ruleTopicList)) {
return true;
}
ruleTopicList.add(ruleEngineEntity);
Map<String, Set<String>> map = new HashMap<>();

for (RuleEngineEntity engineEntity : ruleTopicList) {
map.merge(engineEntity.getFromDestination(), new HashSet<>(Collections.singleton(engineEntity.getToDestination())), (a, b) -> commonService.mergeSet(a, b));
}
Set<String> set = map.keySet();
return dagDetectUtil.checkLoop(map, set);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.FileInputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -249,8 +248,8 @@ private RuleEngineEntity initializationRule(String ruleName, BrokerEntity broker
return ruleEngineEntity;
}

private List<String> getGroupList(HttpServletRequest request, BrokerEntity brokerEntity) throws GovernanceException {
List<String> groupList;
private List getGroupList(HttpServletRequest request, BrokerEntity brokerEntity) throws GovernanceException {
List groupList;
String url = new StringBuffer(brokerEntity.getBrokerUrl()).append(ConstantProperties.REST_LIST_SUBSCRIPTION).toString();
try {
log.info("url:{}", url);
Expand All @@ -259,8 +258,14 @@ private List<String> getGroupList(HttpServletRequest request, BrokerEntity broke
if (StringUtil.isBlank(mes)) {
throw new GovernanceException("group is empty");
}
String[] split = mes.replace("[", "").replace("]", "").split(",");
groupList = Arrays.asList(split);
JSONObject jsonObject = JSONObject.parseObject(mes);
Object data = jsonObject.get("data");
if ("0".equals(jsonObject.get("code").toString()) && data instanceof List) {
groupList = (List) data;
} else {
throw new GovernanceException(jsonObject.get("message").toString());
}

return groupList;
} catch (Exception e) {
log.error("get group list fail", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.webank.weevent.governance.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;

import org.springframework.stereotype.Component;

@SuppressWarnings("unchecked")
@Component
public class DAGDetectUtil {
private Stack<String> stack = new Stack<>();

public boolean checkLoop(Map<String, Set<String>> map, Set<String> topicSet) {
Map start = createNode("start");
Map<String, Map> nodeMap = new HashMap<>();
topicSet.forEach(it -> {
Map node = createNode(it);
((List) start.get("child")).add(node);
nodeMap.put(it, node);
});
map.forEach((k, v) -> {
v.forEach(it -> {
if (nodeMap.containsKey(it)) {
((List) nodeMap.get(k).get("child")).add(nodeMap.get(it));
}
});
});
return checkChild(start);
}

private Map createNode(String name) {
HashMap node = new HashMap();
node.put("topic", name);
node.put("child", new ArrayList());
return node;
}


private boolean checkChild(Map cursor) {
if (stack.contains(cursor.get("topic"))) {
stack = new Stack<>();
return false;
}
stack.push((String) cursor.get("topic"));
List childs = (List) cursor.get("child");
if (childs != null) {
for (Object child : childs) {
if (!checkChild((Map) child)) {
return false;
}
}
}
stack.pop();
return true;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ logging.config=classpath:log4j2.xml
spring.pid.file=./logs/governance.pid
spring.pid.fail-on-write-error=true

weevent.processor.url=http://localhost:7008
weevent.processor.url=http://127.0.0.1:8080

weevent.url=http://127.0.0.1:8080/weevent
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ logging.config=classpath:log4j2.xml
spring.pid.file=./logs/governance.pid
spring.pid.fail-on-write-error=true
weevent.processor.url=http://127.0.0.1:7008

weevent.url=http://127.0.0.1:8080/weevent
29 changes: 22 additions & 7 deletions weevent-governance/src/main/resources/mappers/RuleEngineMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
<result column="payload" property="payload" jdbcType="VARCHAR"/>
<result column="user_id" property="userId" jdbcType="INTEGER"/>
<result column="broker_id" property="brokerId" jdbcType="INTEGER"/>
<result column="cep_id" property="cepId" jdbcType="VARCHAR"/>
<result column="broker_url" property="brokerUrl" jdbcType="VARCHAR"/>
<result column="from_destination" property="fromDestination" jdbcType="VARCHAR"/>
<result column="to_destination" property="toDestination" jdbcType="VARCHAR"/>
Expand All @@ -24,23 +23,24 @@
<result column="error_message" property="errorMessage" jdbcType="VARCHAR"/>
<result column="status" property="status" jdbcType="INTEGER"/>
<result column="group_id" property="groupId" jdbcType="VARCHAR"/>
<result column="cep_id" property="cepId" jdbcType="VARCHAR"/>
<result column="system_tag" property="systemTag" jdbcType="VARCHAR"/>
</resultMap>

<sql id="Base_Column_List">
id,create_date,last_update, rule_name,payload_type,payload,broker_id,cep_id,broker_url,user_id,
id,create_date,last_update, rule_name,payload_type,payload,broker_id,broker_url,user_id,cep_id,
from_destination,to_destination,select_field,condition_field,condition_type,
rule_database_id,error_destination,error_message,status,group_id,system_tag
</sql>

<insert id="addRuleEngine" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity"
useGeneratedKeys="true" keyProperty="id">
insert into t_rule_engine(rule_name, payload_type, payload, broker_id, broker_url, user_id, cep_id,
insert into t_rule_engine(rule_name, payload_type, payload, broker_id, broker_url, user_id,
from_destination, to_destination, select_field,
condition_field, condition_type, status, group_id, error_message,rule_database_id,system_tag)
values (#{ruleName}, #{payloadType}, #{payload}, #{brokerId}, #{brokerUrl}, #{userId}, #{cepId},
condition_field, condition_type, status, group_id, error_message,rule_database_id,system_tag,cep_id)
values (#{ruleName}, #{payloadType}, #{payload}, #{brokerId}, #{brokerUrl}, #{userId},
#{fromDestination}, #{toDestination}, #{selectField},
#{conditionField}, #{conditionType}, #{status}, #{groupId}, #{errorMessage},#{ruleDataBaseId},#{systemTag})
#{conditionField}, #{conditionType}, #{status}, #{groupId}, #{errorMessage},#{ruleDataBaseId},#{systemTag},#{cepId})
</insert>

<select id="getRuleEngines" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity"
Expand Down Expand Up @@ -100,6 +100,20 @@
</if>
</sql>

<select id="getRuleTopicList" resultMap="BaseResultMap">
select distinct from_destination,to_destination
from t_rule_engine
where status != 2 and condition_type=1
<include refid="ruleEngineWhere"/>
</select>

<select id="checkRuleNameRepeat" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity"
resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from t_rule_engine where rule_name=#{ruleName}
</select>

<update id="updateRuleEngine" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity">
update t_rule_engine
set rule_name=#{ruleName},
Expand All @@ -111,7 +125,8 @@
to_destination=#{toDestination},
rule_database_id=#{ruleDataBaseId},
error_destination=#{errorDestination},
error_message=#{errorMessage}
error_message=#{errorMessage},
cep_id=#{cepId}
where id = #{id,jdbcType=INTEGER}
and status != 2
</update>
Expand Down
Loading

0 comments on commit c738f53

Please sign in to comment.