Skip to content

Commit

Permalink
feat: support correlation temporal
Browse files Browse the repository at this point in the history
  • Loading branch information
fukusuket committed Dec 18, 2024
1 parent 2bc3dcd commit 06693b3
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 27 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ yaml-rust2 = "0.9"
rust-embed={version = "8.5.0", features = ["include-exclude", "debug-embed"]}
encoding_rs = "0.8.35"
walkdir = "2.5.0"
uuid = { version = "1.11.0", features = ["v4"] }

[profile.dev]
debug-assertions = false
Expand Down
66 changes: 60 additions & 6 deletions src/detections/detection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
extern crate csv;

use chrono::{TimeZone, Utc};
use chrono::{Duration, TimeZone, Utc};
use compact_str::CompactString;
use hashbrown::HashMap;
use itertools::Itertools;
Expand All @@ -21,8 +21,8 @@ use crate::detections::configs::STORED_EKEY_ALIAS;
use crate::detections::field_data_map::FieldDataMapKey;
use crate::detections::message::{AlertMessage, DetectInfo, ERROR_LOG_STACK, TAGS_CONFIG};
use crate::detections::rule::correlation_parser::parse_correlation_rules;
use crate::detections::rule::count::AggRecordTimeInfo;
use crate::detections::rule::{self, AggResult, RuleNode};
use crate::detections::rule::count::{get_sec_timeframe, AggRecordTimeInfo};
use crate::detections::rule::{self, AggResult, CorrelationType, RuleNode};
use crate::detections::utils::{
create_recordinfos, format_time, get_writable_color, write_color_buffer,
};
Expand Down Expand Up @@ -197,18 +197,72 @@ impl Detection {
rt.block_on(self.add_aggcondition_msg(stored_static))
}

fn detect_within_timeframe(
ids: &[String],
data: &HashMap<String, Vec<AggResult>>,
timeframe: Duration,
) -> Vec<AggResult> {
let mut result = Vec::new();
let key = ids[0].clone();
for y in data.get(key.as_str()).unwrap() {
let mut found = true;
for id in ids.iter().skip(1) {
if !data.get(id.as_str()).unwrap().iter().any(|t| {
(t.start_timedate >= y.start_timedate - timeframe)
&& (t.start_timedate <= y.start_timedate + timeframe)
}) {
found = false;
break;
}
}
if found {
result.push(y.clone());
}
}
result
}

async fn add_aggcondition_msg(&self, stored_static: &StoredStatic) -> Vec<DetectInfo> {
let mut ret = vec![];
let mut detected_temporal_refs: HashMap<String, Vec<AggResult>> = HashMap::new();
for rule in &self.rules {
if !rule.has_agg_condition() {
continue;
}

for value in rule.judge_satisfy_aggcondition(stored_static) {
ret.push(Detection::create_agg_log_record(rule, value, stored_static));
if let CorrelationType::TemporalRef(_, uuid) = &rule.correlation_type {
detected_temporal_refs
.entry(uuid.clone())
.or_insert_with(Vec::new)
.push(value.clone());
} else {
ret.push(Detection::create_agg_log_record(rule, value, stored_static));
}
}
}
// temporalルールは個々ルールの判定がすべて出揃ってから判定できるため、再度rulesをループしてtemporalルールの判定を行う
for rule in self.rules.iter() {
if let CorrelationType::Temporal(ref_ids) = &rule.correlation_type {
if ref_ids
.iter()
.all(|x| detected_temporal_refs.contains_key(x))
{
let mut data = HashMap::new();
for id in ref_ids {
let entry = detected_temporal_refs.get_key_value(id);
data.insert(entry.unwrap().0.clone(), entry.unwrap().1.clone());
}
let timeframe = get_sec_timeframe(rule, stored_static);
if let Some(timeframe) = timeframe {
let duration = Duration::seconds(timeframe);
let values = Detection::detect_within_timeframe(ref_ids, &data, duration);
for v in values {
ret.push(Detection::create_agg_log_record(rule, v, stored_static));
}
}
}
}
}

ret
}

Expand Down
123 changes: 111 additions & 12 deletions src/detections/rule/correlation_parser.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::error::Error;
use std::sync::Arc;

use hashbrown::HashMap;
use yaml_rust2::yaml::Hash;
use yaml_rust2::Yaml;

use crate::detections::configs::StoredStatic;
use crate::detections::message::{AlertMessage, ERROR_LOG_STACK};
use crate::detections::rule::aggregation_parser::{
AggregationConditionToken, AggregationParseInfo,
};
use crate::detections::rule::count::TimeFrameInfo;
use crate::detections::rule::selectionnodes::{OrSelectionNode, SelectionNode};
use crate::detections::rule::{DetectionNode, RuleNode};
use crate::detections::rule::{CorrelationType, DetectionNode, RuleNode};
use hashbrown::{HashMap, HashSet};
use uuid::Uuid;
use yaml_rust2::yaml::Hash;
use yaml_rust2::Yaml;

type Name2Selection = HashMap<String, Arc<Box<dyn SelectionNode>>>;

Expand Down Expand Up @@ -202,7 +202,7 @@ fn create_detection(
None => Err("Failed to get 'timespan'".into()),
Some(timespan) => {
let time_frame = parse_tframe(timespan.to_string())?;
let nodes = to_or_selection_node(related_rule_nodes);
let node = to_or_selection_node(related_rule_nodes);
let agg_info = AggregationParseInfo {
_field_name: condition.2,
_by_field_name: group_by,
Expand All @@ -211,7 +211,7 @@ fn create_detection(
};
Ok(DetectionNode::new_with_data(
name_to_selection,
Some(Box::new(nodes)),
Some(Box::new(node)),
Some(agg_info),
Some(time_frame),
))
Expand Down Expand Up @@ -248,8 +248,11 @@ fn merge_referenced_rule(
parse_error_count: &mut u128,
) -> RuleNode {
let rule_type = rule.yaml["correlation"]["type"].as_str();
if rule_type != Some("event_count") && rule_type != Some("value_count") {
let m = "The type of correlation rule only supports event_count/value_count.";
if rule_type != Some("event_count")
&& rule_type != Some("value_count")
&& rule_type != Some("temporal")
{
let m = "The type of correlation rule only supports event_count/value_count/temporal.";
error_log(&rule.rulepath, m, stored_static, parse_error_count);
return rule;
}
Expand All @@ -266,6 +269,19 @@ fn merge_referenced_rule(
error_log(&rule.rulepath, m, stored_static, parse_error_count);
return rule;
}
if rule.yaml["correlation"]["timespan"].as_str().is_none() {
let m = "key timespan not found.";
error_log(&rule.rulepath, m, stored_static, parse_error_count);
return rule;
}
if rule.yaml["correlation"]["group-by"].as_vec().is_none() {
let m = "key group-by not found.";
error_log(&rule.rulepath, m, stored_static, parse_error_count);
return rule;
}
if rule_type == Some("temporal") {
return rule;
}
let (referenced_rules, name_to_selection) =
match create_related_rule_nodes(&referenced_ids, other_rules, stored_static) {
Ok(result) => result,
Expand Down Expand Up @@ -316,6 +332,83 @@ fn merge_referenced_rule(
RuleNode::new_with_detection(rule.rulepath, Yaml::Hash(merged_yaml), detection)
}

fn parse_temporal_rules(
temporal_rules: Vec<RuleNode>,
other_rules: &mut Vec<RuleNode>,
stored_static: &StoredStatic,
) -> Vec<RuleNode> {
let mut parsed_temporal_rules: Vec<RuleNode> = Vec::new();
let mut temporal_ref_rules: Vec<RuleNode> = Vec::new();
let mut referenced_del_ids: HashSet<String> = HashSet::new();
for temporal in temporal_rules.iter() {
let temporal_yaml = &temporal.yaml;
let mut temporal_ref_ids: Vec<Yaml> = Vec::new();
if let Some(ref_ids) = temporal_yaml["correlation"]["rules"].as_vec() {
for ref_id in ref_ids {
for other_rule in other_rules.iter() {
if is_referenced_rule(other_rule, ref_id.as_str().unwrap_or_default()) {
let new_id = Uuid::new_v4();
temporal_ref_ids.push(Yaml::String(new_id.to_string()));
let mut new_yaml = other_rule.yaml.clone();
new_yaml["id"] = Yaml::String(new_id.to_string());
let generate = temporal_yaml["correlation"]["generate"]
.as_bool()
.unwrap_or_default();
if !generate {
referenced_del_ids
.insert(ref_id.as_str().unwrap_or_default().to_string());
}
let mut node = RuleNode::new(other_rule.rulepath.clone(), new_yaml);
let _ = node.init(stored_static);
node.correlation_type =
CorrelationType::TemporalRef(generate, new_id.to_string());
let group_by = get_group_by_from_yaml(&temporal.yaml);
let timespan = &temporal.yaml["correlation"]["timespan"].as_str().unwrap();
let time_frame = parse_tframe(timespan.to_string());
let agg_info = AggregationParseInfo {
_field_name: None,
_by_field_name: group_by.unwrap(),
_cmp_op: AggregationConditionToken::GE,
_cmp_num: 1,
};
let mut detection = DetectionNode::new();
detection.name_to_selection = node.detection.name_to_selection;
detection.condition = node.detection.condition;
detection.timeframe = Some(time_frame.unwrap());
detection.aggregation_condition = Some(agg_info);
node.detection = detection;
temporal_ref_rules.push(node);
}
}
}
let mut new_yaml = temporal_yaml.clone();
new_yaml["correlation"]["rules"] = Yaml::Array(temporal_ref_ids);
let mut node = RuleNode::new(temporal.rulepath.clone(), new_yaml);
let group_by = get_group_by_from_yaml(&temporal.yaml);
let timespan = &temporal.yaml["correlation"]["timespan"].as_str().unwrap();
let time_frame = parse_tframe(timespan.to_string());
node.detection.aggregation_condition = Some(AggregationParseInfo {
_field_name: None,
_by_field_name: group_by.unwrap(),
_cmp_op: AggregationConditionToken::GE,
_cmp_num: 1,
});
node.detection.timeframe = Some(time_frame.unwrap());
parsed_temporal_rules.push(node);
}
}
other_rules.retain(|rule| {
let id = rule.yaml["id"].as_str().unwrap_or_default();
let title = rule.yaml["title"].as_str().unwrap_or_default();
let name = rule.yaml["name"].as_str().unwrap_or_default();
!referenced_del_ids.contains(id)
&& !referenced_del_ids.contains(title)
&& !referenced_del_ids.contains(name)
});
other_rules.extend(temporal_ref_rules);
parsed_temporal_rules
}

pub fn parse_correlation_rules(
rule_nodes: Vec<RuleNode>,
stored_static: &StoredStatic,
Expand All @@ -324,7 +417,10 @@ pub fn parse_correlation_rules(
let (correlation_rules, mut not_correlation_rules): (Vec<RuleNode>, Vec<RuleNode>) = rule_nodes
.into_iter()
.partition(|rule_node| !rule_node.yaml["correlation"].is_badvalue());
let mut parsed_rules: Vec<RuleNode> = correlation_rules
let (temporal_rules, not_temporal_rules): (Vec<RuleNode>, Vec<RuleNode>) = correlation_rules
.into_iter()
.partition(|rule_node| rule_node.yaml["correlation"]["type"].as_str() == Some("temporal"));
let mut correlation_parsed_rules: Vec<RuleNode> = not_temporal_rules
.into_iter()
.map(|correlation_rule_node| {
merge_referenced_rule(
Expand All @@ -335,8 +431,11 @@ pub fn parse_correlation_rules(
)
})
.collect();
parsed_rules.extend(not_correlation_rules);
parsed_rules
let parsed_temporal_rules =
parse_temporal_rules(temporal_rules, &mut not_correlation_rules, stored_static);
correlation_parsed_rules.extend(not_correlation_rules);
correlation_parsed_rules.extend(parsed_temporal_rules);
correlation_parsed_rules
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 06693b3

Please sign in to comment.