diff --git a/src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java b/src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java index 297103e..22bf1e6 100644 --- a/src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java +++ b/src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java @@ -106,7 +106,7 @@ public Collection decodeMessages(@Nonnull RawMessage rawMessage) { try { return NetFlowParser.parse(rawMessage, templateCache, typeRegistry); } catch (FlowException e) { - LOG.error("Error parsing NetFlow packet", e); + LOG.error("Error parsing NetFlow packet <{}> received from <{}>", rawMessage.getId(), rawMessage.getRemoteAddress(), e); return null; } } diff --git a/src/main/java/org/graylog/plugins/netflow/flows/EmptyTemplateException.java b/src/main/java/org/graylog/plugins/netflow/flows/EmptyTemplateException.java index bf8a33b..7d193ee 100644 --- a/src/main/java/org/graylog/plugins/netflow/flows/EmptyTemplateException.java +++ b/src/main/java/org/graylog/plugins/netflow/flows/EmptyTemplateException.java @@ -22,4 +22,7 @@ package org.graylog.plugins.netflow.flows; public class EmptyTemplateException extends FlowException { + public EmptyTemplateException(String message) { + super(message); + } } diff --git a/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java b/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java index a51661c..d9a0961 100644 --- a/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java +++ b/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; +import org.graylog.plugins.netflow.flows.EmptyTemplateException; import org.graylog.plugins.netflow.flows.InvalidFlowVersionException; import java.util.ArrayList; @@ -28,13 +29,13 @@ public class NetFlowV9Parser { - private static AtomicReference optionTemplateReference = new AtomicReference<>(); + private static final AtomicReference optionTemplateReference = new AtomicReference<>(); public static NetFlowV9Packet parsePacket(ByteBuf bb, NetFlowV9TemplateCache cache, NetFlowV9FieldTypeRegistry typeRegistry) { final int dataLength = bb.readableBytes(); final NetFlowV9Header header = parseHeader(bb); - final ImmutableList.Builder allTemplates = ImmutableList.builder(); + final List allTemplates = new ArrayList<>(); NetFlowV9OptionTemplate optTemplate = null; List records = Collections.emptyList(); while (bb.isReadable()) { @@ -51,13 +52,16 @@ public static NetFlowV9Packet parsePacket(ByteBuf bb, NetFlowV9TemplateCache cac optionTemplateReference.set(optTemplate); } else { bb.resetReaderIndex(); + if (cache.isEmpty()) { + throw new EmptyTemplateException("Unable to parse NetFlow 9 records without template. Discarding packet."); + } records = parseRecords(bb, cache); } } return NetFlowV9Packet.create( header, - allTemplates.build(), + allTemplates, optTemplate, records, dataLength); diff --git a/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCache.java b/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCache.java index d5dea78..35f03f6 100644 --- a/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCache.java +++ b/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCache.java @@ -87,6 +87,10 @@ public NetFlowV9Template get(int id) { return cache.getIfPresent(id); } + public boolean isEmpty() { + return cache.size() == 0L; + } + @Override public void run() { if (cache.size() != 0) { diff --git a/src/test/java/org/graylog/plugins/netflow/codecs/NetFlowCodecTest.java b/src/test/java/org/graylog/plugins/netflow/codecs/NetFlowCodecTest.java index 6db3848..a078c83 100644 --- a/src/test/java/org/graylog/plugins/netflow/codecs/NetFlowCodecTest.java +++ b/src/test/java/org/graylog/plugins/netflow/codecs/NetFlowCodecTest.java @@ -220,6 +220,14 @@ public void decodeMessagesSuccessfullyDecodesNetFlowV9() throws Exception { .containsEntry("nf_snmp_output", 0); } + @Test + public void decodeMessagesThrowsEmptyTemplateExceptionWithIncompleteNetFlowV9() throws Exception { + final byte[] b = Resources.toByteArray(Resources.getResource("netflow-data/netflow-v9-3_incomplete.dat")); + final InetSocketAddress source = new InetSocketAddress(InetAddress.getLocalHost(), 12345); + + assertThat(codec.decodeMessages(new RawMessage(b, source))).isNull(); + } + @Test public void pcap_softflowd_NetFlowV5() throws Exception { final List allMessages = new ArrayList<>(); @@ -380,6 +388,29 @@ public void pcap_nprobe_NetFlowV9_2() throws Exception { } assertThat(allMessages) .hasSize(6) - .allSatisfy(message -> assertThat(message.getField("nf_version")).isEqualTo(9)); + .allSatisfy(message -> assertThat(message.getField("nf_version")).isEqualTo(9)); + } + + @Test + public void pcap_nprobe_NetFlowV9_4() throws Exception { + final List allMessages = new ArrayList<>(); + try (InputStream inputStream = Resources.getResource("netflow-data/nprobe-netflow9-4.pcap").openStream()) { + final Pcap pcap = Pcap.openStream(inputStream); + pcap.loop(packet -> { + if (packet.hasProtocol(Protocol.UDP)) { + final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP); + final InetSocketAddress source = new InetSocketAddress(udp.getSourceIP(), udp.getSourcePort()); + final Collection messages = codec.decodeMessages(new RawMessage(udp.getPayload().getArray(), source)); + if (messages != null) { + allMessages.addAll(messages); + } + } + return true; + } + ); + } + assertThat(allMessages) + .hasSize(1) + .allSatisfy(message -> assertThat(message.getField("nf_version")).isEqualTo(9)); } } \ No newline at end of file diff --git a/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9ParserTest.java b/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9ParserTest.java index 123423a..bfe368b 100644 --- a/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9ParserTest.java +++ b/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9ParserTest.java @@ -24,6 +24,7 @@ import io.pkts.Pcap; import io.pkts.packet.UDPPacket; import io.pkts.protocol.Protocol; +import org.graylog.plugins.netflow.flows.EmptyTemplateException; import org.graylog2.shared.bindings.providers.ObjectMapperProvider; import org.junit.Before; import org.junit.Rule; @@ -38,6 +39,7 @@ import java.util.concurrent.Executors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -124,6 +126,13 @@ public void testParse() throws IOException { assertEquals(1, p3.records().size()); } + @Test + public void testParseIncomplete() throws Exception { + final byte[] b = Resources.toByteArray(Resources.getResource("netflow-data/netflow-v9-3_incomplete.dat")); + assertThatExceptionOfType(EmptyTemplateException.class) + .isThrownBy(() -> NetFlowV9Parser.parsePacket(Unpooled.wrappedBuffer(b), cache, typeRegistry)); + } + @Test public void pcap_softflowd_NetFlowV9() throws Exception { final List allRecords = new ArrayList<>(); @@ -428,10 +437,14 @@ public void pcap_nprobe_NetFlowV9_4() throws Exception { if (packet.hasProtocol(Protocol.UDP)) { final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP); final ByteBuf byteBuf = Unpooled.wrappedBuffer(udp.getPayload().getArray()); - final NetFlowV9Packet netFlowV9Packet = NetFlowV9Parser.parsePacket(byteBuf, cache, typeRegistry); - assertThat(netFlowV9Packet).isNotNull(); - allTemplates.addAll(netFlowV9Packet.templates()); - allRecords.addAll(netFlowV9Packet.records()); + try { + final NetFlowV9Packet netFlowV9Packet = NetFlowV9Parser.parsePacket(byteBuf, cache, typeRegistry); + assertThat(netFlowV9Packet).isNotNull(); + allTemplates.addAll(netFlowV9Packet.templates()); + allRecords.addAll(netFlowV9Packet.records()); + } catch (EmptyTemplateException e) { + // ignore + } } return true; } @@ -461,7 +474,7 @@ public void pcap_nprobe_NetFlowV9_4() throws Exception { ).build() ) ); - assertThat(allRecords).hasSize(898); + assertThat(allRecords).hasSize(2); } private String name(NetFlowV9FieldDef def) { diff --git a/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCacheTest.java b/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCacheTest.java index 205e51d..a4098e6 100644 --- a/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCacheTest.java +++ b/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9TemplateCacheTest.java @@ -178,4 +178,31 @@ public void loadsCacheFileOnStart() throws Exception { assertThat(templateCache.get(0)).isEqualTo(template1); assertThat(templateCache.get(1)).isEqualTo(template2); } + + @Test + public void isEmptyReturnsFalseForNonEmptyCache() throws Exception { + final byte[] json = ("{" + + "\"0\":{" + + "\"template_id\":0," + + "\"field_count\":1," + + "\"definitions\":[{\"type\":{\"id\":0,\"value_type\":\"UINT64\",\"name\":\"foobar\"},\"length\":8}]" + + "}," + + "\"1\":{" + + "\"template_id\":1," + + "\"field_count\":1," + + "\"definitions\":[{\"type\":{\"id\":0,\"value_type\":\"IPV4\",\"name\":\"covfefe\"},\"length\":4}]}}") + .getBytes(StandardCharsets.UTF_8); + assertThat(Files.write(cachePath, json)).isEqualTo(cachePath); + assertThat(Files.size(cachePath)).isEqualTo(json.length); + + final NetFlowV9TemplateCache templateCache = new NetFlowV9TemplateCache(100L, cachePath, 300, executorService, objectMapper); + + assertThat(templateCache.isEmpty()).isFalse(); + } + + @Test + public void isEmptyReturnsTrueForNonEmptyCache() throws Exception { + final NetFlowV9TemplateCache templateCache = new NetFlowV9TemplateCache(100L, cachePath, 300, executorService, objectMapper); + assertThat(templateCache.isEmpty()).isTrue(); + } } \ No newline at end of file diff --git a/src/test/resources/netflow-data/netflow-v9-3_incomplete.dat b/src/test/resources/netflow-data/netflow-v9-3_incomplete.dat new file mode 100644 index 0000000..ae50e5e Binary files /dev/null and b/src/test/resources/netflow-data/netflow-v9-3_incomplete.dat differ