diff --git a/tests/mdm/test_management_enrolled_device.py b/tests/mdm/test_management_enrolled_device.py index 07c00e6da4..2601be535f 100644 --- a/tests/mdm/test_management_enrolled_device.py +++ b/tests/mdm/test_management_enrolled_device.py @@ -837,7 +837,7 @@ def test_enrolled_device_no_perms_no_set_recovery_lock_command_link(self): def test_enrolled_device_not_apple_silicon_no_set_recovery_lock_command_link(self): session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) self.assertFalse(session.enrolled_device.apple_silicon) - self._login("mdm.view_enrolleddevice") + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") response = self.client.get(reverse("mdm:enrolled_device", args=(session.enrolled_device.pk,))) self.assertEqual(response.status_code, 200) self.assertTemplateUsed(response, "mdm/enrolleddevice_detail.html") @@ -1014,6 +1014,183 @@ def test_create_enrolled_device_set_recovery_lock_command_clear_ok(self): "NewPassword": ""}, ) + # create set firmware password command + + def test_enrolled_device_no_perms_no_set_firmware_password_command_link(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self.assertFalse(session.enrolled_device.apple_silicon) + self._login("mdm.view_enrolleddevice") + response = self.client.get(reverse("mdm:enrolled_device", args=(session.enrolled_device.pk,))) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_detail.html") + self.assertNotContains( + response, + reverse("mdm:create_enrolled_device_command", args=(session.enrolled_device.pk, "SetFirmwarePassword")) + ) + + def test_enrolled_device_apple_silicon_no_set_firmware_password_command_link(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + session.enrolled_device.apple_silicon = True + session.enrolled_device.save() + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.get(reverse("mdm:enrolled_device", args=(session.enrolled_device.pk,))) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_detail.html") + self.assertNotContains( + response, + reverse("mdm:create_enrolled_device_command", args=(session.enrolled_device.pk, "SetFirmwarePassword")) + ) + + def test_enrolled_device_set_firmware_password_command_link(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.get(reverse("mdm:enrolled_device", args=(session.enrolled_device.pk,))) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_detail.html") + self.assertContains( + response, + reverse("mdm:create_enrolled_device_command", args=(session.enrolled_device.pk, "SetFirmwarePassword")) + ) + + def test_create_enrolled_device_set_firmware_password_command_redirect(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login_redirect(reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword"))) + + def test_create_enrolled_device_set_firmware_password_command_permission_denied(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.view_enrolleddevice") + response = self.client.get( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")) + ) + self.assertEqual(response.status_code, 403) + + def test_create_enrolled_device_set_firmware_password_command_get(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.add_devicecommand") + response = self.client.get( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")) + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_create_command.html") + + def test_create_enrolled_device_set_firmware_password_command_pwd_too_short(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.post( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")), + {"new_password": "1234567"}, + follow=True + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_create_command.html") + self.assertFormError( + response.context["form"], + "new_password", + "The password must be at least 8 characters long." + ) + + def test_create_enrolled_device_set_firmware_password_command_pwd_too_long(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.post( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")), + {"new_password": 33 * "1"}, + follow=True + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_create_command.html") + self.assertFormError( + response.context["form"], + "new_password", + "The password must be at most 32 characters long." + ) + + def test_create_enrolled_device_set_firmware_password_command_pwd_non_ascii(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.post( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")), + {"new_password": 8 * "é"}, + follow=True + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_create_command.html") + self.assertFormError( + response.context["form"], + "new_password", + "The characters in this value must consist of low-ASCII, printable characters (0x20 through 0x7E) " + "to ensure that all characters are enterable on the EFI login screen." + ) + + def test_create_enrolled_device_set_firmware_password_command_pwd_clear_non_existing(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.post( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")), + {"new_password": ""}, + follow=True + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_create_command.html") + self.assertFormError( + response.context["form"], + "new_password", + "No current firmware password set: this field is required." + ) + + def test_create_enrolled_device_set_firmware_password_command_ok(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + session.enrolled_device.set_recovery_password("87654321") + session.enrolled_device.save() + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.post( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")), + {"new_password": "12345678"}, + follow=True + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_detail.html") + self.assertContains(response, "Set firmware password command successfully created") + db_cmd = session.enrolled_device.commands.first() + self.assertEqual(db_cmd.name, "SetFirmwarePassword") + cmd = load_command(db_cmd) + self.assertEqual( + cmd.build_command(), + {"CurrentPassword": "87654321", + "NewPassword": "12345678"}, + ) + + def test_create_enrolled_device_set_firmware_password_command_clear_ok(self): + session, _, _ = force_dep_enrollment_session(self.mbu, completed=True) + session.enrolled_device.set_recovery_password("87654321") + session.enrolled_device.save() + self._login("mdm.view_enrolleddevice", "mdm.add_devicecommand") + response = self.client.post( + reverse("mdm:create_enrolled_device_command", + args=(session.enrolled_device.pk, "SetFirmwarePassword")), + {"new_password": ""}, + follow=True + ) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "mdm/enrolleddevice_detail.html") + self.assertContains(response, "Set firmware password command successfully created") + db_cmd = session.enrolled_device.commands.first() + self.assertEqual(db_cmd.name, "SetFirmwarePassword") + cmd = load_command(db_cmd) + self.assertEqual( + cmd.build_command(), + {"CurrentPassword": "87654321", + "NewPassword": ""}, + ) + # create rotate filevault key command def test_enrolled_device_no_rotate_filevault_key_command_link(self): diff --git a/tests/mdm/test_security_info_command.py b/tests/mdm/test_security_info_command.py index 65ced523e7..94d3a1cba6 100644 --- a/tests/mdm/test_security_info_command.py +++ b/tests/mdm/test_security_info_command.py @@ -13,7 +13,10 @@ from zentral.contrib.mdm.commands.scheduling import _update_base_inventory from zentral.contrib.mdm.commands.setup_filevault import get_escrow_key_certificate_der_bytes from zentral.contrib.mdm.crypto import encrypt_cms_payload -from zentral.contrib.mdm.events import FileVaultPRKUpdatedEvent +from zentral.contrib.mdm.events import (FileVaultPRKUpdatedEvent, + RecoveryPasswordClearedEvent, + RecoveryPasswordSetEvent, + RecoveryPasswordUpdatedEvent) from zentral.contrib.mdm.models import Blueprint, Channel, Platform, RequestStatus from .utils import force_dep_enrollment_session @@ -203,6 +206,162 @@ def test_process_acknowledged_response_same_filevault_prk(self, post_event): events = list(call_args.args[0] for call_args in post_event.call_args_list) self.assertEqual(len(events), 0) + @patch("zentral.core.queues.backends.kombu.EventQueues.post_event") + def test_process_acknowledged_response_pending_firmware_password_pending(self, post_event): + self.assertIsNone(self.enrolled_device.recovery_password) + self.enrolled_device.set_pending_firmware_password("12345678") + self.enrolled_device.save() + new_db_cmd_qs = self.enrolled_device.commands.filter(name="RestartDevice") + self.assertEqual(new_db_cmd_qs.count(), 0) + cmd = SecurityInfo.create_for_device(self.enrolled_device) + security_info = copy.deepcopy(self.security_info) + security_info["SecurityInfo"]["FirmwarePasswordStatus"] = { + "PasswordExists": False, + "AllowOroms": True, + "ChangePending": True + } + with self.captureOnCommitCallbacks(execute=True): + cmd.process_response(security_info, self.dep_enrollment_session, self.mbu) + self.enrolled_device.refresh_from_db() + self.assertEqual(self.enrolled_device.get_pending_firmware_password(), "12345678") + self.assertIsNone(self.enrolled_device.recovery_password) + events = list(call_args.args[0] for call_args in post_event.call_args_list) + self.assertEqual(len(events), 0) + self.assertEqual(new_db_cmd_qs.count(), 1) + new_db_cmd = new_db_cmd_qs.first() + self.assertEqual(new_db_cmd.kwargs, {"NotifyUser": True}) + + @patch("zentral.core.queues.backends.kombu.EventQueues.post_event") + def test_process_acknowledged_response_pending_firmware_password_set(self, post_event): + self.assertIsNone(self.enrolled_device.recovery_password) + self.enrolled_device.set_pending_firmware_password("12345678") + self.enrolled_device.save() + cmd = SecurityInfo.create_for_device(self.enrolled_device) + security_info = copy.deepcopy(self.security_info) + security_info["SecurityInfo"]["FirmwarePasswordStatus"] = { + "PasswordExists": True, + "AllowOroms": True, + "Mode": "command", + "ChangePending": False, + } + with self.captureOnCommitCallbacks(execute=True): + cmd.process_response(security_info, self.dep_enrollment_session, self.mbu) + self.enrolled_device.refresh_from_db() + self.assertIsNone(self.enrolled_device.get_pending_firmware_password()) + self.assertIsNone(self.enrolled_device.pending_firmware_password_created_at) + self.assertEqual(self.enrolled_device.get_recovery_password(), "12345678") + self.assertEqual(self.enrolled_device.commands.filter(name="RestartDevice").count(), 0) + events = list(call_args.args[0] for call_args in post_event.call_args_list) + self.assertEqual(len(events), 1) + event = events[0] + self.assertIsInstance(event, RecoveryPasswordSetEvent) + self.assertEqual( + event.payload, + {'command': {'request_type': 'SecurityInfo', + 'uuid': str(cmd.uuid)}, + 'password_type': 'firmware_password'} + ) + metadata = event.metadata.serialize() + self.assertEqual(metadata["machine_serial_number"], self.enrolled_device.serial_number) + self.assertEqual(metadata["objects"], {"mdm_command": [str(cmd.uuid)]}) + self.assertEqual(set(metadata["tags"]), {"mdm", "recovery_password"}) + + @patch("zentral.core.queues.backends.kombu.EventQueues.post_event") + def test_process_acknowledged_response_pending_firmware_password_update(self, post_event): + self.assertIsNone(self.enrolled_device.recovery_password) + self.enrolled_device.set_recovery_password("87654321") + self.enrolled_device.set_pending_firmware_password("12345678") + self.enrolled_device.save() + cmd = SecurityInfo.create_for_device(self.enrolled_device) + security_info = copy.deepcopy(self.security_info) + security_info["SecurityInfo"]["FirmwarePasswordStatus"] = { + "PasswordExists": True, + "AllowOroms": True, + "Mode": "command", + "ChangePending": False, + } + with self.captureOnCommitCallbacks(execute=True): + cmd.process_response(security_info, self.dep_enrollment_session, self.mbu) + self.enrolled_device.refresh_from_db() + self.assertIsNone(self.enrolled_device.get_pending_firmware_password()) + self.assertIsNone(self.enrolled_device.pending_firmware_password_created_at) + self.assertEqual(self.enrolled_device.get_recovery_password(), "12345678") + self.assertEqual(self.enrolled_device.commands.filter(name="RestartDevice").count(), 0) + events = list(call_args.args[0] for call_args in post_event.call_args_list) + self.assertEqual(len(events), 1) + event = events[0] + self.assertIsInstance(event, RecoveryPasswordUpdatedEvent) + self.assertEqual( + event.payload, + {'command': {'request_type': 'SecurityInfo', + 'uuid': str(cmd.uuid)}, + 'password_type': 'firmware_password'} + ) + metadata = event.metadata.serialize() + self.assertEqual(metadata["machine_serial_number"], self.enrolled_device.serial_number) + self.assertEqual(metadata["objects"], {"mdm_command": [str(cmd.uuid)]}) + self.assertEqual(set(metadata["tags"]), {"mdm", "recovery_password"}) + + @patch("zentral.core.queues.backends.kombu.EventQueues.post_event") + def test_process_acknowledged_response_pending_firmware_password_clear(self, post_event): + self.assertIsNone(self.enrolled_device.recovery_password) + self.enrolled_device.set_recovery_password("87654321") + self.enrolled_device.set_pending_firmware_password("") + self.enrolled_device.save() + cmd = SecurityInfo.create_for_device(self.enrolled_device) + security_info = copy.deepcopy(self.security_info) + security_info["SecurityInfo"]["FirmwarePasswordStatus"] = { + "PasswordExists": False, + "AllowOroms": True, + "Mode": "command", + "ChangePending": False, + } + with self.captureOnCommitCallbacks(execute=True): + cmd.process_response(security_info, self.dep_enrollment_session, self.mbu) + self.enrolled_device.refresh_from_db() + self.assertIsNone(self.enrolled_device.get_pending_firmware_password()) + self.assertIsNone(self.enrolled_device.pending_firmware_password_created_at) + self.assertIsNone(self.enrolled_device.get_recovery_password()) + self.assertEqual(self.enrolled_device.commands.filter(name="RestartDevice").count(), 0) + events = list(call_args.args[0] for call_args in post_event.call_args_list) + self.assertEqual(len(events), 1) + event = events[0] + self.assertIsInstance(event, RecoveryPasswordClearedEvent) + self.assertEqual( + event.payload, + {'command': {'request_type': 'SecurityInfo', + 'uuid': str(cmd.uuid)}, + 'password_type': 'firmware_password'} + ) + metadata = event.metadata.serialize() + self.assertEqual(metadata["machine_serial_number"], self.enrolled_device.serial_number) + self.assertEqual(metadata["objects"], {"mdm_command": [str(cmd.uuid)]}) + self.assertEqual(set(metadata["tags"]), {"mdm", "recovery_password"}) + + @patch("zentral.core.queues.backends.kombu.EventQueues.post_event") + def test_process_acknowledged_response_pending_firmware_password_clear_error(self, post_event): + self.assertIsNone(self.enrolled_device.recovery_password) + self.enrolled_device.set_recovery_password("87654321") + self.enrolled_device.set_pending_firmware_password("") + self.enrolled_device.save() + cmd = SecurityInfo.create_for_device(self.enrolled_device) + security_info = copy.deepcopy(self.security_info) + security_info["SecurityInfo"]["FirmwarePasswordStatus"] = { + "PasswordExists": True, # the problem + "AllowOroms": True, + "Mode": "command", + "ChangePending": False, + } + with self.captureOnCommitCallbacks(execute=True): + cmd.process_response(security_info, self.dep_enrollment_session, self.mbu) + self.enrolled_device.refresh_from_db() + self.assertEqual(self.enrolled_device.get_recovery_password(), "87654321") + self.assertIsNone(self.enrolled_device.pending_firmware_password) + self.assertIsNone(self.enrolled_device.pending_firmware_password_created_at) + # no events + events = list(call_args.args[0] for call_args in post_event.call_args_list) + self.assertEqual(len(events), 0) + def test_process_acknowledged_ios_response(self): start = datetime.utcnow() self.assertIsNone(self.enrolled_device.security_info) diff --git a/tests/mdm/test_set_firmware_password_command.py b/tests/mdm/test_set_firmware_password_command.py new file mode 100644 index 0000000000..2ca5f212dc --- /dev/null +++ b/tests/mdm/test_set_firmware_password_command.py @@ -0,0 +1,159 @@ +from datetime import datetime, timedelta +from django.test import TestCase +from django.utils.crypto import get_random_string +from zentral.contrib.inventory.models import MetaBusinessUnit +from zentral.contrib.mdm.artifacts import Target +from zentral.contrib.mdm.commands import SetFirmwarePassword +from zentral.contrib.mdm.commands.scheduling import _manage_recovery_password +from zentral.contrib.mdm.models import Channel, Command, Platform, RequestStatus +from .utils import force_blueprint, force_dep_enrollment_session, force_recovery_password_config + + +class SetFirmwarePasswordCommandTestCase(TestCase): + @classmethod + def setUpTestData(cls): + cls.mbu = MetaBusinessUnit.objects.create(name=get_random_string(12)) + cls.mbu.create_enrollment_business_unit() + cls.dep_enrollment_session, cls.device_udid, cls.serial_number = force_dep_enrollment_session( + cls.mbu, + authenticated=True, + completed=True, + ) + cls.enrolled_device = cls.dep_enrollment_session.enrolled_device + + # verify_channel_and_device + + def test_verify_channel_and_device(self): + for channel, platform, user_enrollment, apple_silicon, pending_firmware_password, result in ( + (Channel.USER, Platform.MACOS, False, False, None, False), + (Channel.DEVICE, Platform.IOS, False, False, None, False), + (Channel.DEVICE, Platform.MACOS, True, False, None, False), + (Channel.DEVICE, Platform.MACOS, False, True, None, False), + (Channel.DEVICE, Platform.MACOS, False, False, "012345678", False), + (Channel.DEVICE, Platform.MACOS, False, False, None, True), + ): + self.enrolled_device.platform = platform + self.enrolled_device.user_enrollment = user_enrollment + self.enrolled_device.apple_silicon = apple_silicon + self.enrolled_device.set_pending_firmware_password(pending_firmware_password) + self.assertEqual( + SetFirmwarePassword.verify_channel_and_device(channel, self.enrolled_device), + result + ) + + # process_response + + def test_process_acknowledged_response_password_not_changed(self): + cmd = SetFirmwarePassword.create_for_automatic_scheduling(Target(self.enrolled_device), "12345678") + cmd.process_response( + {"UDID": self.enrolled_device.udid, + "Status": "Acknowledged", + "SetFirmwarePassword": {"PasswordChanged": False}, + "CommandUUID": str(cmd.uuid).upper()}, + self.dep_enrollment_session, + self.mbu + ) + self.assertEqual(cmd.status, Command.Status.ACKNOWLEDGED) + self.assertEqual(cmd.db_command.status, Command.Status.ACKNOWLEDGED) + self.enrolled_device.refresh_from_db() + self.assertIsNone(self.enrolled_device.pending_firmware_password) + self.assertIsNone(self.enrolled_device.pending_firmware_password_created_at) + self.assertEqual(self.enrolled_device.commands.filter(name="RestartDevice").count(), 0) + + def test_process_acknowledged_response_password_changed(self): + cmd = SetFirmwarePassword.create_for_automatic_scheduling(Target(self.enrolled_device), "12345678") + now = datetime.utcnow() + cmd.process_response( + {"UDID": self.enrolled_device.udid, + "Status": "Acknowledged", + "SetFirmwarePassword": {"PasswordChanged": True}, + "CommandUUID": str(cmd.uuid).upper()}, + self.dep_enrollment_session, + self.mbu + ) + self.assertEqual(cmd.status, Command.Status.ACKNOWLEDGED) + self.assertEqual(cmd.db_command.status, Command.Status.ACKNOWLEDGED) + self.enrolled_device.refresh_from_db() + self.assertEqual(self.enrolled_device.get_pending_firmware_password(), "12345678") + self.assertTrue(self.enrolled_device.pending_firmware_password_created_at > now) + new_db_cmd_qs = self.enrolled_device.commands.filter(name="RestartDevice") + self.assertEqual(new_db_cmd_qs.count(), 1) + new_db_cmd = new_db_cmd_qs.first() + self.assertEqual(new_db_cmd.kwargs, {"NotifyUser": True}) + + # _manage_recovery_password + + # see test_set_recovery_lock_command.py too + + def test_manage_recovery_password_pending_firmware_password_noop(self): + self.enrolled_device.blueprint = force_blueprint(recovery_password_config=force_recovery_password_config()) + self.enrolled_device.set_pending_firmware_password("12345678") + self.enrolled_device.save() + self.assertIsNone(_manage_recovery_password( + Target(self.enrolled_device), + self.dep_enrollment_session, + RequestStatus.IDLE, + )) + + def test_manage_recovery_password_first_time_static_ok(self): + self.enrolled_device.blueprint = force_blueprint(recovery_password_config=force_recovery_password_config( + static_password="12345678" + )) + self.enrolled_device.save() + cmd = _manage_recovery_password( + Target(self.enrolled_device), + self.dep_enrollment_session, + RequestStatus.IDLE, + ) + self.assertIsInstance(cmd, SetFirmwarePassword) + self.assertEqual( + cmd.build_command(), + {"NewPassword": "12345678"} + ) + + def test_manage_recovery_password_existing_recent_password_firmware_rotation_noop(self): + self.enrolled_device.blueprint = force_blueprint(recovery_password_config=force_recovery_password_config( + rotation_interval_days=90, + rotate_firmware_password=True, + )) + self.enrolled_device.set_recovery_password("12345678") + self.enrolled_device.save() + self.assertIsNone(_manage_recovery_password( + Target(self.enrolled_device), + self.dep_enrollment_session, + RequestStatus.IDLE, + )) + + def test_manage_recovery_password_existing_old_password_no_firmware_rotation_noop(self): + self.enrolled_device.blueprint = force_blueprint(recovery_password_config=force_recovery_password_config( + rotation_interval_days=90, + rotate_firmware_password=False, + )) + self.enrolled_device.set_recovery_password("12345678") + self.enrolled_device.recovery_password_updated_at -= timedelta(days=91) + self.enrolled_device.save() + self.assertIsNone(_manage_recovery_password( + Target(self.enrolled_device), + self.dep_enrollment_session, + RequestStatus.IDLE, + )) + + def test_manage_recovery_password_existing_old_password_firmware_rotation_ok(self): + self.enrolled_device.blueprint = force_blueprint(recovery_password_config=force_recovery_password_config( + rotation_interval_days=90, + rotate_firmware_password=True, + )) + self.enrolled_device.set_recovery_password("12345678") + self.enrolled_device.recovery_password_updated_at -= timedelta(days=91) + self.enrolled_device.save() + cmd = _manage_recovery_password( + Target(self.enrolled_device), + self.dep_enrollment_session, + RequestStatus.IDLE, + ) + self.assertIsInstance(cmd, SetFirmwarePassword) + self.assertEqual( + cmd.build_command(), + {"CurrentPassword": "12345678", + "NewPassword": cmd.load_new_password()} + ) diff --git a/tests/mdm/utils.py b/tests/mdm/utils.py index abdb61bfa8..bd5eb89e9d 100644 --- a/tests/mdm/utils.py +++ b/tests/mdm/utils.py @@ -427,11 +427,12 @@ def force_filevault_config(prk_rotation_interval_days=0): ) -def force_recovery_password_config(rotation_interval_days=0, static_password=None): +def force_recovery_password_config(rotation_interval_days=0, static_password=None, rotate_firmware_password=False): cfg = RecoveryPasswordConfig.objects.create( name=get_random_string(12), dynamic_password=static_password is None, rotation_interval_days=rotation_interval_days, + rotate_firmware_password=rotate_firmware_password, ) if static_password: cfg.set_static_password(static_password) diff --git a/zentral/contrib/mdm/commands/__init__.py b/zentral/contrib/mdm/commands/__init__.py index 61c77d5fdb..f6aa16ca20 100644 --- a/zentral/contrib/mdm/commands/__init__.py +++ b/zentral/contrib/mdm/commands/__init__.py @@ -17,5 +17,6 @@ from .restart_device import RestartDevice # NOQA from .rotate_filevault_key import RotateFileVaultKey # NOQA from .security_info import SecurityInfo # NOQA +from .set_firmware_password import SetFirmwarePassword # NOQA from .set_recovery_lock import SetRecoveryLock # NOQA from .setup_filevault import SetupFileVault # NOQA diff --git a/zentral/contrib/mdm/commands/scheduling.py b/zentral/contrib/mdm/commands/scheduling.py index 54cbbeb319..712b38812a 100644 --- a/zentral/contrib/mdm/commands/scheduling.py +++ b/zentral/contrib/mdm/commands/scheduling.py @@ -24,6 +24,7 @@ from .remove_profile import RemoveProfile from .rotate_filevault_key import RotateFileVaultKey from .security_info import SecurityInfo +from .set_firmware_password import SetFirmwarePassword from .set_recovery_lock import SetRecoveryLock from .setup_filevault import SetupFileVault @@ -283,7 +284,7 @@ def _manage_recovery_password(target, enrollment_session, status): return if not recovery_password_config: return - for cmd_class in (SetRecoveryLock,): + for cmd_class in (SetRecoveryLock, SetFirmwarePassword): if cmd_class.verify_target(target): break else: @@ -292,7 +293,13 @@ def _manage_recovery_password(target, enrollment_session, status): if ( enrolled_device.recovery_password and ( - not recovery_password_config.rotation_interval_days + not ( + recovery_password_config.rotation_interval_days + and ( + enrolled_device.apple_silicon + or recovery_password_config.rotate_firmware_password + ) + ) or (enrolled_device.recovery_password_updated_at and (datetime.utcnow() - enrolled_device.recovery_password_updated_at < timedelta(days=recovery_password_config.rotation_interval_days)) diff --git a/zentral/contrib/mdm/commands/security_info.py b/zentral/contrib/mdm/commands/security_info.py index 28df2064b3..a98c8f9b8b 100644 --- a/zentral/contrib/mdm/commands/security_info.py +++ b/zentral/contrib/mdm/commands/security_info.py @@ -3,10 +3,11 @@ from cryptography.hazmat.primitives.serialization import load_der_private_key from django.db import transaction from zentral.contrib.mdm.crypto import decrypt_cms_payload -from zentral.contrib.mdm.events import post_filevault_prk_updated_event +from zentral.contrib.mdm.events import post_filevault_prk_updated_event, post_recovery_password_event from zentral.contrib.mdm.models import Channel, Platform from zentral.utils.json import prepare_loaded_plist from .base import register_command, Command, CommandBaseForm +from .restart_device import RestartDevice logger = logging.getLogger("zentral.contrib.mdm.commands.security_info") @@ -54,6 +55,39 @@ def command_acknowledged(self): self.enrolled_device.set_filevault_prk(prk) transaction.on_commit(lambda: post_filevault_prk_updated_event(self)) + # Firmware password + if self.enrolled_device.pending_firmware_password: + firmware_password_status = security_info.get("FirmwarePasswordStatus", {}) + if firmware_password_status.get("ChangePending"): + # schedule a reboot notification for the pending firmware password to be applied + RestartDevice.create_for_target(self.target, kwargs={"NotifyUser": True}, queue=True, delay=0) + else: + pending_firmware_password = self.enrolled_device.get_pending_firmware_password() + operation = None + if pending_firmware_password: + if not self.enrolled_device.recovery_password: + operation = "set" + else: + recovery_password = self.enrolled_device.get_recovery_password() + if recovery_password != pending_firmware_password: + operation = "update" + elif self.enrolled_device.recovery_password: + if firmware_password_status.get("PasswordExists"): + logger.error("Enrolled device %s security info %s: password exists, but pending removal", + self.enrolled_device, self.uuid) + # clear the pending firmware password + self.enrolled_device.set_pending_firmware_password(None) + self.enrolled_device.save() + else: + operation = "clear" + if operation: + self.enrolled_device.set_pending_firmware_password(None) + self.enrolled_device.set_recovery_password(pending_firmware_password) + self.enrolled_device.save() + transaction.on_commit(lambda: post_recovery_password_event( + self, password_type="firmware_password", operation=operation + )) + self.enrolled_device.security_info = prepare_loaded_plist(security_info) self.enrolled_device.security_info_updated_at = datetime.utcnow() # management status diff --git a/zentral/contrib/mdm/commands/set_firmware_password.py b/zentral/contrib/mdm/commands/set_firmware_password.py new file mode 100644 index 0000000000..859b4afe32 --- /dev/null +++ b/zentral/contrib/mdm/commands/set_firmware_password.py @@ -0,0 +1,87 @@ +import logging +from uuid import uuid4 +from django import forms +from zentral.contrib.mdm.models import Channel, Platform +from zentral.core.secret_engines import decrypt_str, encrypt_str +from .base import register_command, Command, CommandBaseForm +from .restart_device import RestartDevice +from .set_recovery_lock import generate_password, get_secret_engine_kwargs, validate_recovery_password + + +logger = logging.getLogger("zentral.contrib.mdm.commands.set_recovery_lock") + + +class SetFirmwarePasswordForm(CommandBaseForm): + new_password = forms.CharField( + label="New password", required=False, strip=True, + validators=[validate_recovery_password] + ) + + def clean_new_password(self): + new_password = self.cleaned_data.get("new_password") + if not new_password and not self.enrolled_device.recovery_password: + raise forms.ValidationError("No current firmware password set: this field is required.") + return new_password + + def get_command_kwargs(self, uuid): + kwargs = {} + new_password = self.cleaned_data.get("new_password") + if new_password: + kwargs["new_password"] = encrypt_str(self.cleaned_data["new_password"], **get_secret_engine_kwargs(uuid)) + return kwargs + + +class SetFirmwarePassword(Command): + request_type = "SetFirmwarePassword" + display_name = "Set firmware password" + reschedule_notnow = True + form_class = SetFirmwarePasswordForm + + @classmethod + def create_for_automatic_scheduling(cls, target, password=None): + if not password: + password = generate_password() + uuid = uuid4() + return super().create_for_target( + target, + kwargs={"new_password": encrypt_str(password, **get_secret_engine_kwargs(uuid))}, + uuid=uuid, + ) + + @staticmethod + def verify_channel_and_device(channel, enrolled_device): + return ( + channel == Channel.DEVICE + and enrolled_device.platform == Platform.MACOS + and not enrolled_device.user_enrollment + and not enrolled_device.apple_silicon + and not enrolled_device.pending_firmware_password # cannot send the command if there is a pending change + ) + + def load_new_password(self): + new_password = self.db_command.kwargs.get("new_password") + if new_password: + return decrypt_str(new_password, **get_secret_engine_kwargs(self.uuid)) + else: + return "" + + def build_command(self): + payload = {"NewPassword": self.load_new_password()} + current_password = self.enrolled_device.get_recovery_password() + if current_password: + payload["CurrentPassword"] = current_password + return payload + + def command_acknowledged(self): + password_changed = self.response.get("SetFirmwarePassword", {}).get("PasswordChanged") + if not password_changed: + logger.error("Enrolled device %s firmware password was not changed by command %s", + self.enrolled_device, self.uuid) + return + self.enrolled_device.set_pending_firmware_password(self.load_new_password()) + self.enrolled_device.save() + # schedule a reboot notification for the pending firmware password to be applied + RestartDevice.create_for_target(self.target, kwargs={"NotifyUser": True}, queue=True, delay=0) + + +register_command(SetFirmwarePassword) diff --git a/zentral/contrib/mdm/migrations/0069_enrolleddevice_pending_firmware_password_and_more.py b/zentral/contrib/mdm/migrations/0069_enrolleddevice_pending_firmware_password_and_more.py new file mode 100644 index 0000000000..7640b6f73a --- /dev/null +++ b/zentral/contrib/mdm/migrations/0069_enrolleddevice_pending_firmware_password_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 4.1.9 on 2023-08-10 12:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('mdm', '0068_reenrollmentsession_first_enrolled_at'), + ] + + operations = [ + migrations.AddField( + model_name='enrolleddevice', + name='pending_firmware_password', + field=models.TextField(null=True), + ), + migrations.AddField( + model_name='enrolleddevice', + name='pending_firmware_password_created_at', + field=models.DateTimeField(null=True), + ), + ] diff --git a/zentral/contrib/mdm/models.py b/zentral/contrib/mdm/models.py index 81a3c9b724..b783a86fac 100644 --- a/zentral/contrib/mdm/models.py +++ b/zentral/contrib/mdm/models.py @@ -761,6 +761,8 @@ class EnrolledDevice(models.Model): # Recovery password recovery_password = models.TextField(null=True) recovery_password_updated_at = models.DateTimeField(null=True) + pending_firmware_password = models.TextField(null=True) + pending_firmware_password_created_at = models.DateTimeField(null=True) # timestamps checkout_at = models.DateTimeField(blank=True, null=True) @@ -847,6 +849,23 @@ def set_recovery_password(self, recovery_password): self.recovery_password = encrypt_str(recovery_password, **self._get_secret_engine_kwargs("recovery_password")) self.recovery_password_updated_at = datetime.utcnow() + def get_pending_firmware_password(self): + if not self.pending_firmware_password: + return + return decrypt_str(self.pending_firmware_password, + **self._get_secret_engine_kwargs("pending_firmware_password")) + + def set_pending_firmware_password(self, pending_firmware_password): + if pending_firmware_password is None: + self.pending_firmware_password = None + self.pending_firmware_password_created_at = None + return + self.pending_firmware_password = encrypt_str( + pending_firmware_password, + **self._get_secret_engine_kwargs("pending_firmware_password") + ) + self.pending_firmware_password_created_at = datetime.utcnow() + def rewrap_secrets(self): if self.bootstrap_token: self.bootstrap_token = rewrap(self.bootstrap_token, **self._get_secret_engine_kwargs("bootstrap_token")) @@ -860,6 +879,9 @@ def rewrap_secrets(self): if self.recovery_password: self.recovery_password = rewrap(self.recovery_password, **self._get_secret_engine_kwargs("recovery_password")) + if self.pending_firmware_password: + self.pending_firmware_password = rewrap(self.pending_firmware_password, + **self._get_secret_engine_kwargs("pending_firmware_password")) def get_urlsafe_serial_number(self): if self.serial_number: