Skip to content

Commit

Permalink
Merge pull request #162 from nats-io/nkeys-jwt
Browse files Browse the repository at this point in the history
Add support for NKEYS and JWT based auth
  • Loading branch information
wallyqs authored Jun 10, 2019
2 parents 7a75b53 + 6c6501a commit b361f9f
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 106 deletions.
72 changes: 6 additions & 66 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,79 +1,19 @@
language: ruby

rvm:
- 2.4
- 2.5
- ruby-head
- 2.6

cache:
directories:
- $HOME/gnatsd
- $HOME/nats-server

before_install:
- bash ./scripts/install_gnatsd.sh

before_script:
- export PATH=$HOME/gnatsd:$PATH
- export PATH=$HOME/nats-server:$PATH

sudo: required
dist: trusty
dist: xenial
bundler_args: --without server

matrix:
include:
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: openjdk8
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: oraclejdk8
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: oraclejdk9
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: oraclejdk10
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: oraclejdk11
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: openjdk9
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: openjdk10
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
script: "bundle exec rake spec:client:jruby"
jdk: openjdk11
allow_failures:
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: openjdk8
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: oraclejdk8
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: oraclejdk9
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: oraclejdk10
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: oraclejdk11
- rvm: ruby-head
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: openjdk9
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: openjdk10
- rvm: jruby
env: JRUBY_OPTS='--server -Xcompile.invokedynamic=false' JAVA_OPTS='-Djava.security.egd=file:///dev/urandom'
jdk: openjdk11
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ group :server do
gem 'thin'
gem 'rack', ">= 2.0.6"
end

group :v2 do
gem 'nkeys'
end
8 changes: 7 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
PATH
remote: .
specs:
nats (0.10.0)
nats (0.11.0)
eventmachine (~> 1.2, >= 1.2)

GEM
remote: http://rubygems.org/
specs:
base32 (0.3.2)
daemons (1.3.1)
diff-lcs (1.3)
ed25519 (1.2.4)
eventmachine (1.2.7)
json_pure (2.2.0)
nkeys (0.1.0)
base32 (~> 0.3)
ed25519 (~> 1.2)
rack (2.0.7)
rake (12.3.2)
rspec (3.8.0)
Expand Down Expand Up @@ -38,6 +43,7 @@ DEPENDENCIES
daemons
json_pure
nats!
nkeys
rack (>= 2.0.6)
rake
rspec
Expand Down
158 changes: 137 additions & 21 deletions lib/nats/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ module NATS
# Parser
AWAITING_CONTROL_LINE = 1 #:nodoc:
AWAITING_MSG_PAYLOAD = 2 #:nodoc:
AWAITING_INFO_LINE = 3 # :nodoc:

class Error < StandardError; end #:nodoc:

Expand Down Expand Up @@ -150,7 +151,9 @@ class << self
def connect(uri=nil, opts={}, &blk)
case uri
when String
opts[:uri] = process_uri(uri)
# Initialize TLS defaults in case any url is using it.
uris = opts[:uri] = process_uri(uri)
opts[:tls] ||= {} if uris.any? {|u| u.scheme == 'tls'}
when Hash
opts = uri
end
Expand Down Expand Up @@ -482,7 +485,16 @@ def initialize(options)
# Drain mode
@draining = false
@drained_subs = false
send_connect_command

# NKEYS
@user_credentials = options[:user_credentials] if options[:user_credentials]
@nkeys_seed = options[:nkeys_seed] if options[:nkeys_seed]
@user_nkey_cb = nil
@user_jwt_cb = nil
@signature_cb = nil

# NKEYS
setup_nkeys_connect if @user_credentials or @nkeys_seed
end

# Publish a message to a given subject, with optional reply subject and completion block
Expand Down Expand Up @@ -770,7 +782,7 @@ def user_err_cb? # :nodoc:
end

def auth_connection?
!@uri.user.nil? || @options[:token]
!@uri.user.nil? || @options[:token] || @server_info[:auth_required]
end

def connect_command #:nodoc:
Expand All @@ -782,7 +794,16 @@ def connect_command #:nodoc:
:protocol => ::NATS::PROTOCOL_VERSION,
:echo => !@options[:no_echo]
}

case
when @options[:user_credentials]
nonce = @server_info[:nonce]
cs[:jwt] = @user_jwt_cb.call
cs[:sig] = @signature_cb.call(nonce)
when @options[:nkeys_seed]
nonce = @server_info[:nonce]
cs[:nkey] = @user_nkey_cb.call
cs[:sig] = @signature_cb.call(nonce)
when @options[:token]
cs[:auth_token] = @options[:token]
when @uri.password.nil?
Expand Down Expand Up @@ -853,6 +874,15 @@ def receive_data(data) #:nodoc:

while (@buf)
case @parse_state
when AWAITING_INFO_LINE
case @buf
when INFO
@buf = $'
process_connect_init($1)
else
# If we are here we do not have a complete line yet that we understand.
return
end
when AWAITING_CONTROL_LINE
case @buf
when MSG
Expand Down Expand Up @@ -901,7 +931,7 @@ def receive_data(data) #:nodoc:
end
end

def process_info(info) #:nodoc:
def process_connect_init(info) # :nodoc:
# Each JSON parser uses a different key/value pair to use symbol keys
# instead of strings when parsing. Passing all three pairs assures each
# parser gets what it needs. For the json gem :symbolize_name, for yajl
Expand Down Expand Up @@ -931,9 +961,29 @@ def process_info(info) #:nodoc:
close_connection_after_writing
end
end
send_connect_command

# Only initial INFO command is treated specially for auth reasons,
# the rest are processed asynchronously to discover servers.
@parse_state = AWAITING_CONTROL_LINE
process_info(info)
process_connect

if @server_info[:auth_required]
current = server_pool.first
current[:auth_required] = true

# Send pending connect followed by ping/pong to ensure we're authorized.
queue_server_rt { current[:auth_ok] = true }
end
flush_pending
end

def process_info(info_line) #:nodoc:
info = JSON.parse(info_line, :symbolize_keys => true, :symbolize_names => true, :symbol_keys => true)

# Detect any announced server that we might not be aware of...
connect_urls = @server_info[:connect_urls]
connect_urls = info[:connect_urls]
if connect_urls
srvs = []

Expand Down Expand Up @@ -963,15 +1013,7 @@ def process_info(info) #:nodoc:
server_pool.push(*srvs)
end

if @server_info[:auth_required]
current = server_pool.first
current[:auth_required] = true
# Send pending connect followed by ping/pong to ensure we're authorized.
queue_server_rt { current[:auth_ok] = true }
flush_pending
end

@server_info
info
end

def client_using_secure_connection?
Expand Down Expand Up @@ -1002,22 +1044,19 @@ def cancel_ping_timer
end

def connection_completed #:nodoc:
@parse_state = AWAITING_CONTROL_LINE
@parse_state = AWAITING_INFO_LINE

# Delay sending CONNECT or any other command here until we are sure
# that we have a valid established secure connection.
return if (@ssl or @tls)

# Mark that we established already TCP connection to the server. In case of TLS,
# prepare commands which will be dispatched to server and delay flushing until
# we have processed the INFO line sent by the server and done the handshake.
# Mark that we established already TCP connection to the server,
# when using TLS we only do so after handshake has been completed.
@connected = true
process_connect
end

def ssl_handshake_completed
@connected = true
process_connect
end

def process_connect #:nodoc:
Expand Down Expand Up @@ -1174,7 +1213,6 @@ def attempt_reconnect #:nodoc:
current[:reconnect_attempts] ||= 0
current[:reconnect_attempts] += 1

send_connect_command
begin
EM.reconnect(@uri.host, @uri.port, self)
rescue
Expand All @@ -1201,6 +1239,84 @@ def send_command(command, priority = false) #:nodoc:
true
end

def setup_nkeys_connect
begin
require 'nkeys'
require 'base64'
rescue LoadError
raise(Error, "nkeys is not installed")
end

case
when @nkeys_seed
@user_nkey_cb = proc {
seed = File.read(@nkeys_seed).chomp
kp = NKEYS::from_seed(seed)

# Take a copy since original will be gone with the wipe.
pub_key = kp.public_key.dup
kp.wipe!

pub_key
}

@signature_cb = proc { |nonce|
seed = File.read(@nkeys_seed).chomp
kp = NKEYS::from_seed(seed)
raw_signed = kp.sign(nonce)
kp.wipe!
encoded = Base64.urlsafe_encode64(raw_signed)
encoded.gsub('=', '')
}
when @user_credentials
# When the credentials are within a single decorated file.
@user_jwt_cb = proc {
jwt_start = "BEGIN NATS USER JWT".freeze
found = false
jwt = nil
File.readlines(@user_credentials).each do |line|
case
when found
jwt = line.chomp
break
when line.include?(jwt_start)
found = true
end
end
raise(Error, "No JWT found in #{@user_credentials}") if not found

jwt
}

@signature_cb = proc { |nonce|
seed_start = "BEGIN USER NKEY SEED".freeze
found = false
seed = nil
File.readlines(@user_credentials).each do |line|
case
when found
seed = line.chomp
break
when line.include?(seed_start)
found = true
end
end
raise(Error, "No nkey user seed found in #{@user_credentials}") if not found

kp = NKEYS::from_seed(seed)
raw_signed = kp.sign(nonce)

# seed is a reference so also cleared when doing wipe,
# which can be done since Ruby strings are mutable.
kp.wipe
encoded = Base64.urlsafe_encode64(raw_signed)

# Remove padding
encoded.gsub('=', '')
}
end
end

# Parse out URIs which can now be an array of server choices
# The server pool will contain both explicit and implicit members.
def process_uri_options #:nodoc
Expand Down
2 changes: 1 addition & 1 deletion lib/nats/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

module NATS
# NOTE: These are all announced to the server on CONNECT
VERSION = "0.10.0".freeze
VERSION = "0.11.0".freeze
LANG = RUBY_ENGINE
PROTOCOL_VERSION = 1
end
Loading

0 comments on commit b361f9f

Please sign in to comment.