Skip to content

Commit

Permalink
fix race condition at startup (#142)
Browse files Browse the repository at this point in the history
currently the socket was being opened during `register` but the queue is only set during `run`, which mean that connections+data could arrive before the a queue was configured, causing the error seen in the linked issue.

This commit changes the socket opening to also happen during `run`.
Also fixes testing now that bind happens during `run`.

solves #132
  • Loading branch information
jsvd authored Mar 9, 2019
1 parent 45d6cfb commit 58d804b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.0.2
- Fixed race condition where data would be accepted before queue was configured

## 6.0.1
- Support multiple certificates per file [#140](https://github.com/logstash-plugins/logstash-input-tcp/pull/140)

Expand Down
6 changes: 1 addition & 5 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ def initialize(*args)
def register
fix_streaming_codecs

# note that since we are opening a socket in register, we must also make sure we close it
# in the close method even if we also close it in the stop method since we could have
# a situation where register is called but not run & stop.

@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enable => "#{@ssl_enable}")
if server?
ssl_context = get_ssl_context(SslOptions)

Expand All @@ -153,6 +148,7 @@ def register
def run(output_queue)
@output_queue = output_queue
if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enable => "#{@ssl_enable}")
@loop.run
else
run_client()
Expand Down
19 changes: 4 additions & 15 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,7 @@ def get_port
chain_of_certificates = TcpHelpers.new.chain_of_certificates

let(:tcp) do
begin
socket = TCPSocket.new("127.0.0.1", port)
rescue Errno::ECONNREFUSED
sleep 1
socket = TCPSocket.new("127.0.0.1", port)
end
Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) }
end
let(:sslcontext) do
sslcontext = OpenSSL::SSL::SSLContext.new
Expand Down Expand Up @@ -502,14 +497,8 @@ def get_port

context "that disconnects before doing TLS handshake" do
before do
begin
client = TCPSocket.new("127.0.0.1", port)
client.close
rescue Errno::ECONNREFUSED
sleep 1
client = TCPSocket.new("127.0.0.1", port)
client.close
end
client = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) }
client.close
end

it "should not negatively impact the plugin" do
Expand Down Expand Up @@ -540,7 +529,7 @@ def get_port
# Assertion to verify this test is actually sending something.
expect(garbage.length).to be > 0

client = TCPSocket.new("127.0.0.1", port)
client = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) }
client.write(garbage)
client.flush
Thread.new { sleep(1); client.close }
Expand Down
22 changes: 17 additions & 5 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public final class InputLoop implements Runnable, Closeable {
private final EventLoopGroup worker;

/**
* The Server Socket's {@link ChannelFuture}.
* The Server Bootstrap
*/
private final ChannelFuture future;
private final ServerBootstrap serverBootstrap;

/**
* Reference to the logger.
Expand All @@ -48,6 +48,16 @@ public final class InputLoop implements Runnable, Closeable {
*/
private final SslContext sslContext;

/**
* TCP Port.
*/
private final int port;

/**
* TCP Host.
*/
private final String host;

/**
* Ctor.
* @param host Host to bind the listen to
Expand All @@ -60,19 +70,21 @@ public InputLoop(final String host, final int port, final Decoder decoder, final

this.logger = logger;
this.sslContext = sslContext;
this.host = host;
this.port = port;
worker = new NioEventLoopGroup();
boss = new NioEventLoopGroup(1);
future = new ServerBootstrap().group(boss, worker)
serverBootstrap = new ServerBootstrap().group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive)
.childHandler(new InputLoop.InputHandler(decoder, sslContext, logger)).bind(host, port);
.childHandler(new InputLoop.InputHandler(decoder, sslContext, logger));
}

@Override
public void run() {
try {
future.sync().channel().closeFuture().sync();
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.0.1
6.0.2

0 comments on commit 58d804b

Please sign in to comment.