Skip to content

Commit

Permalink
reject not supported identity tokens
Browse files Browse the repository at this point in the history
prevents unsupported identity tokens to create a session.
For example reject anonymous connection if not set via set_security_IDs, currently this was possible and was a security issue.
  • Loading branch information
schroeder- authored and oroulet committed May 20, 2022
1 parent 67f1555 commit 1b85c61
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
5 changes: 5 additions & 0 deletions opcua/server/internal_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(self, shelffile=None, user_manager=None, session_cls=None):
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
self._address_space_fixes()
self.setup_nodes()
self.supported_tokens = []

@property
def thread_loop(self):
Expand Down Expand Up @@ -346,6 +347,10 @@ def activate_session(self, params):
result.Results.append(ua.StatusCode())
self.state = SessionState.Activated
id_token = params.UserIdentityToken
# Check if security policy is supported
if not isinstance(id_token, self.iserver.supported_tokens):
self.logger.error('Rejected active session UserIdentityToken not supported')
raise utils.ServiceError(ua.StatusCodes.BadIdentityTokenRejected)
if isinstance(id_token, ua.UserNameIdentityToken):
if self.user_manager.check_user_token(self, id_token) == False:
raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied)
Expand Down
5 changes: 5 additions & 0 deletions opcua/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,23 +263,27 @@ def _setup_server_nodes(self):

def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
supported_token_classes = []
if "Anonymous" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'anonymous'
idtoken.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken)
supported_token_classes.append(ua.AnonymousIdentityToken)

if "Basic256Sha256" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic256sha256'
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
supported_token_classes.append(ua.X509IdentityToken)

if "Username" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'username'
idtoken.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken)
supported_token_classes.append(ua.UserNameIdentityToken)

appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
Expand All @@ -299,6 +303,7 @@ def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.N
edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
edp.SecurityLevel = 0
self.iserver.add_endpoint(edp)
self.iserver.supported_tokens = tuple(supported_token_classes)

def set_server_name(self, name):
self.name = name
Expand Down
17 changes: 17 additions & 0 deletions tests/tests_crypto_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
port_num1 = 48515
port_num2 = 48512
port_num3 = 48510
port_num4 = 48533

@unittest.skipIf(disable_crypto_tests, "crypto not available")
class TestCryptoConnect(unittest.TestCase):
Expand Down Expand Up @@ -51,6 +52,14 @@ def setUpClass(cls):
cls.srv_crypto2.load_private_key("examples/private-key-3072-example.pem")
cls.srv_crypto2.start()

cls.srv_crypto_no_anoymous = Server()
cls.uri_crypto_no_anoymous = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num4)
cls.srv_crypto_no_anoymous.set_endpoint(cls.uri_crypto_no_anoymous)
cls.srv_crypto_no_anoymous.load_certificate("examples/certificate-3072-example.der")
cls.srv_crypto_no_anoymous.load_private_key("examples/private-key-3072-example.pem")
cls.srv_crypto_no_anoymous.set_security_IDs(["Username", "Basic256Sha256"])
cls.srv_crypto_no_anoymous.start()

@classmethod
def tearDownClass(cls):
# stop the server
Expand Down Expand Up @@ -131,3 +140,11 @@ def test_basic256sha56_encrypt_fail(self):
None,
ua.MessageSecurityMode.None_
)

def test_anonymous_rejection(self):
# FIXME: how to make it fail???
clt = Client(self.uri_crypto_no_anoymous)
with self.assertRaises(ua.UaError) as exc_info:
clt.connect()
clt.disconnect()
assert ua.StatusCodes.BadIdentityTokenRejected == exc_info.type.code

0 comments on commit 1b85c61

Please sign in to comment.