Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
Properly handle empty or missing templates in NetFlow 9 (#17)
Browse files Browse the repository at this point in the history
Fixes #16

(cherry picked from commit f00cf72)
  • Loading branch information
joschi authored and Jochen Schalanda committed Aug 8, 2017
1 parent 6adae4b commit 6b7422f
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Collection<Message> decodeMessages(@Nonnull RawMessage rawMessage) {
try {
return NetFlowParser.parse(rawMessage, templateCache, typeRegistry);
} catch (FlowException e) {
LOG.error("Error parsing NetFlow packet", e);
LOG.error("Error parsing NetFlow packet <{}> received from <{}>", rawMessage.getId(), rawMessage.getRemoteAddress(), e);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@
package org.graylog.plugins.netflow.flows;

public class EmptyTemplateException extends FlowException {
public EmptyTemplateException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import org.graylog.plugins.netflow.flows.EmptyTemplateException;
import org.graylog.plugins.netflow.flows.InvalidFlowVersionException;

import java.util.ArrayList;
Expand All @@ -28,13 +29,13 @@


public class NetFlowV9Parser {
private static AtomicReference<NetFlowV9OptionTemplate> optionTemplateReference = new AtomicReference<>();
private static final AtomicReference<NetFlowV9OptionTemplate> optionTemplateReference = new AtomicReference<>();

public static NetFlowV9Packet parsePacket(ByteBuf bb, NetFlowV9TemplateCache cache, NetFlowV9FieldTypeRegistry typeRegistry) {
final int dataLength = bb.readableBytes();
final NetFlowV9Header header = parseHeader(bb);

final ImmutableList.Builder<NetFlowV9Template> allTemplates = ImmutableList.builder();
final List<NetFlowV9Template> allTemplates = new ArrayList<>();
NetFlowV9OptionTemplate optTemplate = null;
List<NetFlowV9BaseRecord> records = Collections.emptyList();
while (bb.isReadable()) {
Expand All @@ -51,13 +52,16 @@ public static NetFlowV9Packet parsePacket(ByteBuf bb, NetFlowV9TemplateCache cac
optionTemplateReference.set(optTemplate);
} else {
bb.resetReaderIndex();
if (cache.isEmpty()) {
throw new EmptyTemplateException("Unable to parse NetFlow 9 records without template. Discarding packet.");
}
records = parseRecords(bb, cache);
}
}

return NetFlowV9Packet.create(
header,
allTemplates.build(),
allTemplates,
optTemplate,
records,
dataLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public NetFlowV9Template get(int id) {
return cache.getIfPresent(id);
}

public boolean isEmpty() {
return cache.size() == 0L;
}

@Override
public void run() {
if (cache.size() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ public void decodeMessagesSuccessfullyDecodesNetFlowV9() throws Exception {
.containsEntry("nf_snmp_output", 0);
}

@Test
public void decodeMessagesThrowsEmptyTemplateExceptionWithIncompleteNetFlowV9() throws Exception {
final byte[] b = Resources.toByteArray(Resources.getResource("netflow-data/netflow-v9-3_incomplete.dat"));
final InetSocketAddress source = new InetSocketAddress(InetAddress.getLocalHost(), 12345);

assertThat(codec.decodeMessages(new RawMessage(b, source))).isNull();
}

@Test
public void pcap_softflowd_NetFlowV5() throws Exception {
final List<Message> allMessages = new ArrayList<>();
Expand Down Expand Up @@ -380,6 +388,29 @@ public void pcap_nprobe_NetFlowV9_2() throws Exception {
}
assertThat(allMessages)
.hasSize(6)
.allSatisfy(message -> assertThat(message.getField("nf_version")).isEqualTo(9));
.allSatisfy(message -> assertThat(message.getField("nf_version")).isEqualTo(9));
}

@Test
public void pcap_nprobe_NetFlowV9_4() throws Exception {
final List<Message> allMessages = new ArrayList<>();
try (InputStream inputStream = Resources.getResource("netflow-data/nprobe-netflow9-4.pcap").openStream()) {
final Pcap pcap = Pcap.openStream(inputStream);
pcap.loop(packet -> {
if (packet.hasProtocol(Protocol.UDP)) {
final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP);
final InetSocketAddress source = new InetSocketAddress(udp.getSourceIP(), udp.getSourcePort());
final Collection<Message> messages = codec.decodeMessages(new RawMessage(udp.getPayload().getArray(), source));
if (messages != null) {
allMessages.addAll(messages);
}
}
return true;
}
);
}
assertThat(allMessages)
.hasSize(1)
.allSatisfy(message -> assertThat(message.getField("nf_version")).isEqualTo(9));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pkts.Pcap;
import io.pkts.packet.UDPPacket;
import io.pkts.protocol.Protocol;
import org.graylog.plugins.netflow.flows.EmptyTemplateException;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

Expand Down Expand Up @@ -124,6 +126,13 @@ public void testParse() throws IOException {
assertEquals(1, p3.records().size());
}

@Test
public void testParseIncomplete() throws Exception {
final byte[] b = Resources.toByteArray(Resources.getResource("netflow-data/netflow-v9-3_incomplete.dat"));
assertThatExceptionOfType(EmptyTemplateException.class)
.isThrownBy(() -> NetFlowV9Parser.parsePacket(Unpooled.wrappedBuffer(b), cache, typeRegistry));
}

@Test
public void pcap_softflowd_NetFlowV9() throws Exception {
final List<NetFlowV9BaseRecord> allRecords = new ArrayList<>();
Expand Down Expand Up @@ -428,10 +437,14 @@ public void pcap_nprobe_NetFlowV9_4() throws Exception {
if (packet.hasProtocol(Protocol.UDP)) {
final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP);
final ByteBuf byteBuf = Unpooled.wrappedBuffer(udp.getPayload().getArray());
final NetFlowV9Packet netFlowV9Packet = NetFlowV9Parser.parsePacket(byteBuf, cache, typeRegistry);
assertThat(netFlowV9Packet).isNotNull();
allTemplates.addAll(netFlowV9Packet.templates());
allRecords.addAll(netFlowV9Packet.records());
try {
final NetFlowV9Packet netFlowV9Packet = NetFlowV9Parser.parsePacket(byteBuf, cache, typeRegistry);
assertThat(netFlowV9Packet).isNotNull();
allTemplates.addAll(netFlowV9Packet.templates());
allRecords.addAll(netFlowV9Packet.records());
} catch (EmptyTemplateException e) {
// ignore
}
}
return true;
}
Expand Down Expand Up @@ -461,7 +474,7 @@ public void pcap_nprobe_NetFlowV9_4() throws Exception {
).build()
)
);
assertThat(allRecords).hasSize(898);
assertThat(allRecords).hasSize(2);
}

private String name(NetFlowV9FieldDef def) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,31 @@ public void loadsCacheFileOnStart() throws Exception {
assertThat(templateCache.get(0)).isEqualTo(template1);
assertThat(templateCache.get(1)).isEqualTo(template2);
}

@Test
public void isEmptyReturnsFalseForNonEmptyCache() throws Exception {
final byte[] json = ("{" +
"\"0\":{" +
"\"template_id\":0," +
"\"field_count\":1," +
"\"definitions\":[{\"type\":{\"id\":0,\"value_type\":\"UINT64\",\"name\":\"foobar\"},\"length\":8}]" +
"}," +
"\"1\":{" +
"\"template_id\":1," +
"\"field_count\":1," +
"\"definitions\":[{\"type\":{\"id\":0,\"value_type\":\"IPV4\",\"name\":\"covfefe\"},\"length\":4}]}}")
.getBytes(StandardCharsets.UTF_8);
assertThat(Files.write(cachePath, json)).isEqualTo(cachePath);
assertThat(Files.size(cachePath)).isEqualTo(json.length);

final NetFlowV9TemplateCache templateCache = new NetFlowV9TemplateCache(100L, cachePath, 300, executorService, objectMapper);

assertThat(templateCache.isEmpty()).isFalse();
}

@Test
public void isEmptyReturnsTrueForNonEmptyCache() throws Exception {
final NetFlowV9TemplateCache templateCache = new NetFlowV9TemplateCache(100L, cachePath, 300, executorService, objectMapper);
assertThat(templateCache.isEmpty()).isTrue();
}
}
Binary file not shown.

0 comments on commit 6b7422f

Please sign in to comment.