diff --git a/test/client/test_protected_services.py b/test/client/test_protected_services.py new file mode 100644 index 0000000..35b8f52 --- /dev/null +++ b/test/client/test_protected_services.py @@ -0,0 +1,456 @@ +from udsoncan import services, CommunicationType +from udsoncan.exceptions import * +from udsoncan import DidCodec +import struct + +from test.ClientServerTest import ClientServerTest + + +class StubbedDidCodec(DidCodec): + def encode(self, did_value): + return struct.pack('B', did_value + 1) + + def decode(self, did_payload): + return struct.unpack('B', did_payload)[0] - 1 + + def __len__(self): + return 1 + + +class TestProtectedServicesBase(ClientServerTest): + def __init__(self, *args, **kwargs): + ClientServerTest.__init__(self, *args, **kwargs) + + def dummy_algo(self, level, seed, params=None): + key = bytearray(seed) + for i in range(len(key)): + key[i] = (params + level + i + key[i]) + return bytes(key) + + def postClientSetUp(self): + self.udsclient.config["data_identifiers"] = { + 0x1234: '>H', + 0x5678: '>H', + } + + +class TestProtectedServicesConfiguration(TestProtectedServicesBase): + + def test_protected_service_access_denied_without_unlock(self): + pass + + def _test_protected_service_access_denied_without_unlock(self): + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.WriteDataByIdentifier._sid) + + def test_protected_service_access_granted_after_unlock(self): + self.conn.fromuserqueue.put(b"\x67\x05\x11\x22\x33\x44") + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x05") + key = bytearray([(0x10 + 0x05 + 0 + 0x11), (0x10 + 0x05 + 1 + 0x22), (0x10 + 0x05 + 2 + 0x33), (0x10 + 0x05 + 3 + 0x44)]) + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x06" + bytes(key)) + self.conn.fromuserqueue.put(b"\x67\x06") + + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x2E\x12\x34\x12\x34") + self.conn.fromuserqueue.put(b"\x6E\x12\x34") + + def _test_protected_service_access_granted_after_unlock(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + self.udsclient.unlock_security_access(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + response = self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + self.assertTrue(response.positive) + + def test_protected_did_access_denied_without_unlock(self): + pass + + def _test_protected_did_access_denied_without_unlock(self): + self.udsclient.config['protected_dids'] = { + 0x1234: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'DID') + self.assertEqual(context.exception.resource_id, 0x1234) + + def test_protected_did_access_granted_after_unlock(self): + self.conn.fromuserqueue.put(b"\x67\x05\x11\x22\x33\x44") + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x05") + key = bytearray([(0x10 + 0x05 + 0 + 0x11), (0x10 + 0x05 + 1 + 0x22), (0x10 + 0x05 + 2 + 0x33), (0x10 + 0x05 + 3 + 0x44)]) + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x06" + bytes(key)) + self.conn.fromuserqueue.put(b"\x67\x06") + + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x2E\x12\x34\x12\x34") + self.conn.fromuserqueue.put(b"\x6E\x12\x34") + + def _test_protected_did_access_granted_after_unlock(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + self.udsclient.config['protected_dids'] = { + 0x1234: 0x05 + } + + self.udsclient.unlock_security_access(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + response = self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + self.assertTrue(response.positive) + + def test_unprotected_did_access_allowed(self): + self.conn.fromuserqueue.put(b"\x6E\x56\x78") + + def _test_unprotected_did_access_allowed(self): + self.udsclient.config['protected_dids'] = { + 0x1234: 0x05 + } + + response = self.udsclient.write_data_by_identifier(did=0x5678, value=0x1234) + self.assertTrue(response.positive) + + def test_protected_routine_access_denied_without_unlock(self): + pass + + def _test_protected_routine_access_denied_without_unlock(self): + self.udsclient.config['protected_routines'] = { + 0xFF00: 0x07 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.start_routine(0xFF00) + + self.assertEqual(context.exception.required_level, 0x07) + self.assertEqual(context.exception.resource_type, 'routine') + self.assertEqual(context.exception.resource_id, 0xFF00) + + def test_protected_routine_access_granted_after_unlock(self): + self.conn.fromuserqueue.put(b"\x67\x07\x11\x22\x33\x44") + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x07") + key = bytearray([(0x10 + 0x07 + 0 + 0x11), (0x10 + 0x07 + 1 + 0x22), (0x10 + 0x07 + 2 + 0x33), (0x10 + 0x07 + 3 + 0x44)]) + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x08" + bytes(key)) + self.conn.fromuserqueue.put(b"\x67\x08") + + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x31\x01\xFF\x00") + self.conn.fromuserqueue.put(b"\x71\x01\xFF\x00") + + def _test_protected_routine_access_granted_after_unlock(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + self.udsclient.config['protected_routines'] = { + 0xFF00: 0x07 + } + + self.udsclient.unlock_security_access(0x07) + self.assertIn(0x07, self.udsclient.get_unlocked_security_levels()) + + response = self.udsclient.start_routine(0xFF00) + self.assertTrue(response.positive) + + def test_no_config_means_no_protection(self): + self.conn.fromuserqueue.put(b"\x6E\x12\x34") + + def _test_no_config_means_no_protection(self): + response = self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + self.assertTrue(response.positive) + + +class TestSecurityStateReset(TestProtectedServicesBase): + + def test_session_change_resets_security_access(self): + self.conn.fromuserqueue.put(b"\x50\x03\x00\x0A\x00\x14") + + def _test_session_change_resets_security_access(self): + self.udsclient.config['standard_version'] = 2013 + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + self.udsclient.change_session(0x03) + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + with self.assertRaises(SecurityAccessDeniedException): + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + def test_ecu_reset_resets_security_access(self): + self.conn.fromuserqueue.put(b"\x51\x01") + + def _test_ecu_reset_resets_security_access(self): + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + self.udsclient.ecu_reset(0x01) + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + with self.assertRaises(SecurityAccessDeniedException): + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + def test_connection_close_resets_security_access(self): + pass + + def _test_connection_close_resets_security_access(self): + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + self.udsclient.close() + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + +class TestCombinedProtection(TestProtectedServicesBase): + + def test_both_service_and_did_protected(self): + self.conn.fromuserqueue.put(b"\x6E\x12\x34") + + def _test_both_service_and_did_protected(self): + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + self.udsclient.config['protected_dids'] = { + 0x1234: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + response = self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + self.assertTrue(response.positive) + + def test_service_and_did_require_different_levels(self): + self.conn.fromuserqueue.put(b"\x6E\x12\x34") + + def _test_service_and_did_require_different_levels(self): + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + self.udsclient.config['protected_dids'] = { + 0x1234: 0x07 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + self.assertEqual(context.exception.required_level, 0x07) + + self.udsclient._unlocked_security_levels.add(0x07) + self.assertIn(0x07, self.udsclient.get_unlocked_security_levels()) + + response = self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + self.assertTrue(response.positive) + + +class TestTimeoutSecurityReset(TestProtectedServicesBase): + + def test_timeout_resets_security_access(self): + pass + + def _test_timeout_resets_security_access(self): + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(TimeoutException): + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + with self.assertRaises(SecurityAccessDeniedException): + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + def test_timeout_requires_re_unlock(self): + pass + + def _test_timeout_requires_re_unlock(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + self.udsclient.config['protected_services'] = { + services.WriteDataByIdentifier._sid: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(TimeoutException): + self.udsclient.write_data_by_identifier(did=0x1234, value=0x1234) + + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + +class TestGenericServiceProtection(TestProtectedServicesBase): + + def test_clear_dtc_protected_without_unlock(self): + pass + + def _test_clear_dtc_protected_without_unlock(self): + self.udsclient.config['protected_services'] = { + services.ClearDiagnosticInformation._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.clear_dtc() + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.ClearDiagnosticInformation._sid) + + def test_clear_dtc_access_granted_after_unlock(self): + self.conn.fromuserqueue.put(b"\x54\xFF\xFF\xFF") + + def _test_clear_dtc_access_granted_after_unlock(self): + self.udsclient.config['protected_services'] = { + services.ClearDiagnosticInformation._sid: 0x05 + } + + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + response = self.udsclient.clear_dtc() + self.assertTrue(response.positive) + + def test_io_control_protected_without_unlock(self): + pass + + def _test_io_control_protected_without_unlock(self): + self.udsclient.config['protected_services'] = { + services.InputOutputControlByIdentifier._sid: 0x05 + } + self.udsclient.config['input_output'] = { + 0x1234: { + 'codec': '>H', + 'mask': { + 'test1': 0x01, + 'test2': 0x02, + } + } + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.io_control(did=0x1234) + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.InputOutputControlByIdentifier._sid) + + def test_write_memory_by_address_protected_without_unlock(self): + pass + + def _test_write_memory_by_address_protected_without_unlock(self): + from udsoncan.common.MemoryLocation import MemoryLocation + + self.udsclient.config['protected_services'] = { + services.WriteMemoryByAddress._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + memloc = MemoryLocation(address=0x12345678, memorysize=4) + self.udsclient.write_memory_by_address(memloc, b'\x11\x22\x33\x44') + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.WriteMemoryByAddress._sid) + + def test_read_memory_by_address_protected_without_unlock(self): + pass + + def _test_read_memory_by_address_protected_without_unlock(self): + from udsoncan.common.MemoryLocation import MemoryLocation + + self.udsclient.config['protected_services'] = { + services.ReadMemoryByAddress._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + memloc = MemoryLocation(address=0x12345678, memorysize=4) + self.udsclient.read_memory_by_address(memloc) + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.ReadMemoryByAddress._sid) + + def test_communication_control_protected_without_unlock(self): + pass + + def _test_communication_control_protected_without_unlock(self): + self.udsclient.config['protected_services'] = { + services.CommunicationControl._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.communication_control( + control_type=services.CommunicationControl.ControlType.enableRxAndDisableTx, + communication_type=CommunicationType(subnet=CommunicationType.Subnet.node, normal_msg=True) + ) + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.CommunicationControl._sid) + + def test_control_dtc_setting_protected_without_unlock(self): + pass + + def _test_control_dtc_setting_protected_without_unlock(self): + self.udsclient.config['protected_services'] = { + services.ControlDTCSetting._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.control_dtc_setting( + setting_type=services.ControlDTCSetting.SettingType.off + ) + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.ControlDTCSetting._sid) + + def test_authentication_protected_without_unlock(self): + pass + + def _test_authentication_protected_without_unlock(self): + self.udsclient.config['protected_services'] = { + services.Authentication._sid: 0x05 + } + + with self.assertRaises(SecurityAccessDeniedException) as context: + self.udsclient.deauthenticate() + + self.assertEqual(context.exception.required_level, 0x05) + self.assertEqual(context.exception.resource_type, 'service') + self.assertEqual(context.exception.resource_id, services.Authentication._sid) diff --git a/test/client/test_security_access.py b/test/client/test_security_access.py index 55fb892..7460601 100755 --- a/test/client/test_security_access.py +++ b/test/client/test_security_access.py @@ -322,3 +322,125 @@ def test_no_algo_set(self): def _test_no_algo_set(self): with self.assertRaises(NotImplementedError): self.udsclient.unlock_security_access(0x07) + + +class TestSecurityAccessStateLifecycle(ClientServerTest): + def __init__(self, *args, **kwargs): + ClientServerTest.__init__(self, *args, **kwargs) + + def dummy_algo(self, level, seed, params=None): + key = bytearray(seed) + for i in range(len(key)): + key[i] = (params + level + i + key[i]) + return bytes(key) + + def test_security_level_tracked_after_unlock(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x07") + self.conn.fromuserqueue.put(b"\x67\x07\x11\x22\x33\x44") + request = self.conn.touserqueue.get(timeout=0.2) + key = bytearray([(0x10 + 0x07 + 0 + 0x11), (0x10 + 0x07 + 1 + 0x22), (0x10 + 0x07 + 2 + 0x33), (0x10 + 0x07 + 3 + 0x44)]) + self.assertEqual(request, b"\x27\x08" + bytes(key)) + self.conn.fromuserqueue.put(b"\x67\x08") + + def _test_security_level_tracked_after_unlock(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + response = self.udsclient.unlock_security_access(0x07) + self.assertTrue(response.positive) + self.assertIn(0x07, self.udsclient.get_unlocked_security_levels()) + + def test_security_level_tracked_after_send_key(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x06\x11\x22\x33\x44") + self.conn.fromuserqueue.put(b"\x67\x06") + + def _test_security_level_tracked_after_send_key(self): + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + response = self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + self.assertTrue(response.positive) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + def test_security_level_tracked_after_zero_seed(self): + request = self.conn.touserqueue.get(timeout=0.2) + self.assertEqual(request, b"\x27\x05") + self.conn.fromuserqueue.put(b"\x67\x05\x00\x00\x00\x00") + + def _test_security_level_tracked_after_zero_seed(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + response = self.udsclient.unlock_security_access(0x05) + self.assertTrue(response.positive) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + def test_reset_after_session_change(self): + self.conn.fromuserqueue.put(b"\x50\x03\x00\x0A\x00\x14") + self.conn.fromuserqueue.put(b"\x27\x05") + self.conn.fromuserqueue.put(b"\x67\x05\x00\x00\x00\x00") + + def _test_reset_after_session_change(self): + self.udsclient.config['security_algo'] = self.dummy_algo + self.udsclient.config['security_algo_params'] = 0x10 + self.udsclient.config['standard_version'] = 2013 + + self.udsclient._unlocked_security_levels.add(0x05) + self.udsclient._unlocked_security_levels.add(0x01) + self.assertEqual(len(self.udsclient.get_unlocked_security_levels()), 2) + + self.udsclient.change_session(0x03) + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + def test_reset_after_ecu_reset(self): + self.conn.fromuserqueue.put(b"\x51\x55") + + def _test_reset_after_ecu_reset(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.udsclient._unlocked_security_levels.add(0x01) + self.assertEqual(len(self.udsclient.get_unlocked_security_levels()), 2) + + self.udsclient.ecu_reset(0x55) + self.assertEqual(self.udsclient.get_unlocked_security_levels(), set()) + + def test_reset_after_invalid_key(self): + self.wait_request_and_respond(b"\x7F\x27\x35") + + def _test_reset_after_invalid_key(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(NegativeResponseException): + self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + + self.assertNotIn(0x05, self.udsclient.get_unlocked_security_levels()) + + def test_reset_after_invalid_key_no_exception(self): + self.wait_request_and_respond(b"\x7F\x27\x35") + + def _test_reset_after_invalid_key_no_exception(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + self.udsclient.config['exception_on_negative_response'] = False + response = self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + + self.assertFalse(response.positive) + self.assertEqual(response.code, 0x35) + + def test_reset_after_required_time_delay(self): + self.wait_request_and_respond(b"\x7F\x27\x37") + + def _test_reset_after_required_time_delay(self): + self.udsclient._unlocked_security_levels.add(0x05) + self.assertIn(0x05, self.udsclient.get_unlocked_security_levels()) + + with self.assertRaises(NegativeResponseException): + self.udsclient.send_key(0x06, b"\x11\x22\x33\x44") + + self.assertNotIn(0x05, self.udsclient.get_unlocked_security_levels()) diff --git a/udsoncan/client.py b/udsoncan/client.py index 489d11e..4e21a0a 100755 --- a/udsoncan/client.py +++ b/udsoncan/client.py @@ -21,7 +21,7 @@ import functools import time -from typing import Callable, Optional, Union, Dict, List, Any, cast, Type +from typing import Callable, Optional, Union, Dict, List, Any, cast, Type, Set class SessionTiming: @@ -108,6 +108,7 @@ def get_overrided_payload(self, original_payload: bytes) -> bytes: last_response: Optional[Response] session_timing: SessionTiming logger: logging.Logger + _unlocked_security_levels: Set[int] def __init__(self, conn: BaseConnection, config: ClientConfig = default_client_config, request_timeout: Optional[float] = None): self.conn = conn @@ -121,6 +122,7 @@ def __init__(self, conn: BaseConnection, config: ClientConfig = default_client_c self.last_response = None self.session_timing = SessionTiming(p2_server_max=None, p2_star_server_max=None) + self._unlocked_security_levels = set() self.refresh_config() @@ -134,9 +136,75 @@ def __exit__(self, type, value, traceback): def open(self) -> None: if not self.conn.is_open(): self.conn.open() + self._reset_security_access_state() def close(self) -> None: self.conn.close() + self._reset_security_access_state() + + def _reset_security_access_state(self, level: Optional[int] = None) -> None: + if level is None: + if len(self._unlocked_security_levels) > 0: + self.logger.info('Resetting all security access levels') + self._unlocked_security_levels.clear() + else: + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=level + ) + if normalized_level in self._unlocked_security_levels: + self.logger.info('Resetting security access level 0x%02x' % normalized_level) + self._unlocked_security_levels.remove(normalized_level) + + def get_unlocked_security_levels(self) -> Set[int]: + return set(self._unlocked_security_levels) + + def _check_security_access_for_service(self, service_id: int) -> None: + protected_services = self.config.get('protected_services') + if protected_services is None: + return + if service_id in protected_services: + required_level = protected_services[service_id] + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=required_level + ) + if normalized_level not in self._unlocked_security_levels: + raise SecurityAccessDeniedException( + required_level=normalized_level, + resource_type='service', + resource_id=service_id + ) + + def _check_security_access_for_did(self, did: int) -> None: + protected_dids = self.config.get('protected_dids') + if protected_dids is None: + return + if did in protected_dids: + required_level = protected_dids[did] + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=required_level + ) + if normalized_level not in self._unlocked_security_levels: + raise SecurityAccessDeniedException( + required_level=normalized_level, + resource_type='DID', + resource_id=did + ) + + def _check_security_access_for_routine(self, routine_id: int) -> None: + protected_routines = self.config.get('protected_routines') + if protected_routines is None: + return + if routine_id in protected_routines: + required_level = protected_routines[routine_id] + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=required_level + ) + if normalized_level not in self._unlocked_security_levels: + raise SecurityAccessDeniedException( + required_level=normalized_level, + resource_type='routine', + resource_id=routine_id + ) def configure_logger(self) -> None: logger_name = 'UdsClient' @@ -259,6 +327,7 @@ def change_session(self, newsession: int) -> Optional[services.DiagnosticSession self.session_timing.p2_server_max = response.service_data.p2_server_max self.session_timing.p2_star_server_max = response.service_data.p2_star_server_max + self._reset_security_access_state() return response @standard_error_management @@ -323,19 +392,30 @@ def send_key(self, level: int, key: bytes) -> Optional[services.SecurityAccess.I (self.service_log_prefix(services.SecurityAccess), req.subfunction)) self.logger.debug('\tKey to send [%s]' % (binascii.hexlify(key).decode('ascii'))) - response = self.send_request(req) - if response is None: - return None + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=level + ) - response = services.SecurityAccess.interpret_response(response, mode=services.SecurityAccess.Mode.SendKey) + try: + response = self.send_request(req) + if response is None: + return None - expected_level = services.SecurityAccess.normalize_level(mode=services.SecurityAccess.Mode.SendKey, level=level) - received_level = response.service_data.security_level_echo - if expected_level != received_level: - raise UnexpectedResponseException( - response, "Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)" % (received_level, expected_level)) + response = services.SecurityAccess.interpret_response(response, mode=services.SecurityAccess.Mode.SendKey) - return response + expected_level = services.SecurityAccess.normalize_level(mode=services.SecurityAccess.Mode.SendKey, level=level) + received_level = response.service_data.security_level_echo + if expected_level != received_level: + raise UnexpectedResponseException( + response, "Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)" % (received_level, expected_level)) + + if response.positive: + self._unlocked_security_levels.add(normalized_level) + + return response + except NegativeResponseException: + self._reset_security_access_state(level) + raise @standard_error_management def unlock_security_access(self, level, seed_params=bytes()) -> Optional[services.SecurityAccess.InterpretedResponse]: @@ -360,9 +440,14 @@ def unlock_security_access(self, level, seed_params=bytes()) -> Optional[service response = self.request_seed._func_no_error_management(self, level, data=seed_params) seed = response.service_data.seed + normalized_level = services.SecurityAccess.normalize_level( + mode=services.SecurityAccess.Mode.RequestSeed, level=level + ) + if len(seed) > 0 and seed == b'\x00' * len(seed): self.logger.info('%s - Security access level 0x%02x is already unlocked, no key will be sent.' % (self.service_log_prefix(services.SecurityAccess), level)) + self._unlocked_security_levels.add(normalized_level) return response params = self.config['security_algo_params'] if 'security_algo_params' in self.config else None @@ -383,7 +468,14 @@ def unlock_security_access(self, level, seed_params=bytes()) -> Optional[service algo_params = {'seed': seed, 'params': params, 'level': level} key = self.config['security_algo'].__call__(**algo_params) # type: ignore - return self.send_key._func_no_error_management(self, level, key) + try: + result = self.send_key._func_no_error_management(self, level, key) + if result is not None and result.positive: + self._unlocked_security_levels.add(normalized_level) + return result + except NegativeResponseException: + self._reset_security_access_state(level) + raise @standard_error_management def tester_present(self) -> Optional[services.TesterPresent.InterpretedResponse]: @@ -520,7 +612,7 @@ def write_data_by_identifier(self, did: int, value: Any) -> Optional[services.Wr """ Requests to write a value associated with a data identifier (DID) through the :ref:`WriteDataByIdentifier` service. - :Effective configuration: ``exception_on__response`` ``data_identifiers`` + :Effective configuration: ``exception_on__response`` ``data_identifiers`` ``protected_services`` ``protected_dids`` :param did: The DID to write its value :type did: int @@ -531,7 +623,10 @@ def write_data_by_identifier(self, did: int, value: Any) -> Optional[services.Wr :return: The server response parsed by :meth:`WriteDataByIdentifier.interpret_response` :rtype: :ref:`Response` + :raises SecurityAccessDeniedException: If the service or DID is protected and the required security level is not unlocked """ + self._check_security_access_for_service(services.WriteDataByIdentifier._sid) + self._check_security_access_for_did(did) req = services.WriteDataByIdentifier.make_request(did, value, didconfig=self.config['data_identifiers']) self.logger.info("%s - Writing data identifier 0x%04x (%s)" % (self.service_log_prefix(services.WriteDataByIdentifier), did, DataIdentifier.name_from_id(did))) @@ -578,6 +673,7 @@ def ecu_reset(self, reset_type: int) -> Optional[services.ECUReset.InterpretedRe assert response.service_data.powerdown_time is not None self.logger.info('Server will shutdown in %d seconds.' % (response.service_data.powerdown_time)) + self._reset_security_access_state() return response @standard_error_management @@ -684,7 +780,7 @@ def routine_control(self, routine_id: int, control_type: int, data: Optional[byt """ Sends a generic request for the :ref:`RoutineControl` service with custom subfunction (control_type). - :Effective configuration: ``exception_on__response`` + :Effective configuration: ``exception_on__response`` ``protected_services`` ``protected_routines`` :param control_type: The service subfunction. See :class:`RoutineControl.ControlType` :type group: int @@ -697,7 +793,11 @@ def routine_control(self, routine_id: int, control_type: int, data: Optional[byt :return: The server response parsed by :meth:`RoutineControl.interpret_response` :rtype: :ref:`Response` + + :raises SecurityAccessDeniedException: If the service or routine is protected and the required security level is not unlocked """ + self._check_security_access_for_service(services.RoutineControl._sid) + self._check_security_access_for_routine(routine_id) request = services.RoutineControl.make_request(routine_id, control_type, data=data) payload_length = 0 if data is None else len(data) action = "ISOSAEReserved action for routine ID" @@ -928,6 +1028,14 @@ def request_upload(self, # Common code for both RequestDownload and RequestUpload services @standard_error_management def request_upload_download(self, service_cls, memory_location, dfi=None): + """ + Common code for both RequestDownload and RequestUpload services. + + :Effective configuration: ``exception_on__response`` ``server_address_format`` ``server_memorysize_format`` ``protected_services`` + + :raises SecurityAccessDeniedException: If the service is protected and the required security level is not unlocked + """ + self._check_security_access_for_service(service_cls._sid) dfi = service_cls.normalize_data_format_identifier(dfi) if service_cls not in [services.RequestDownload, services.RequestUpload]: @@ -971,7 +1079,7 @@ def transfer_data(self, """ Transfer a block of data to/from the client to/from the server by sending a :ref:`TransferData` service request and returning the server response. - :Effective configuration: ``exception_on__response`` + :Effective configuration: ``exception_on__response`` ``protected_services`` :param sequence_number: Corresponds to an 8bit counter that should increment for each new block transferred. Allowed values are from 0 to 0xFF @@ -982,7 +1090,10 @@ def transfer_data(self, :return: The server response parsed by :meth:`TransferData.interpret_response` :rtype: :ref:`Response` + + :raises SecurityAccessDeniedException: If the service is protected and the required security level is not unlocked """ + self._check_security_access_for_service(services.TransferData._sid) request = services.TransferData.make_request(sequence_number, data) data_len = 0 if data is None else len(data) @@ -1007,14 +1118,17 @@ def request_transfer_exit(self, data: Optional[bytes] = None) -> Optional[servic """ Informs the server that the client wants to stop the data transfer by sending a :ref:`RequestTransferExit` service request. - :Effective configuration: ``exception_on__response`` + :Effective configuration: ``exception_on__response`` ``protected_services`` :param data: Optional additional data to send to the server :type data: bytes :return: The server response parsed by :meth:`RequestTransferExit.interpret_response` :rtype: :ref:`Response` + + :raises SecurityAccessDeniedException: If the service is protected and the required security level is not unlocked """ + self._check_security_access_for_service(services.RequestTransferExit._sid) request = services.RequestTransferExit.make_request(data) self.logger.info('%s - Sending exit request' % (self.service_log_prefix(services.RequestTransferExit))) @@ -2217,6 +2331,8 @@ def send_request(self, request: Request, timeout: int = -1) -> Optional[Response if request.service is None: raise ValueError("Request has no service") + self._check_security_access_for_service(request.service.request_id()) + if timeout < 0: # Timeout not provided by user: defaults to Client request_timeout value overall_timeout = self.config['request_timeout'] @@ -2292,6 +2408,7 @@ def send_request(self, request: Request, timeout: int = -1) -> Optional[Response timeout_name_to_report = 'Timeout' timeout_value_to_report = timeout_value + self._reset_security_access_state() raise TimeoutException('Did not receive response in time. %s time has expired (timeout=%.3f sec)' % (timeout_name_to_report, float(timeout_value_to_report))) diff --git a/udsoncan/configs.py b/udsoncan/configs.py index 10a45b2..5f81cad 100755 --- a/udsoncan/configs.py +++ b/udsoncan/configs.py @@ -21,5 +21,8 @@ 'standard_version': latest_standard, # 2006, 2013, 2020 'use_server_timing': True, 'extended_data_size': None, - 'nrc78_callback':None + 'nrc78_callback': None, + 'protected_services': None, + 'protected_dids': None, + 'protected_routines': None }) diff --git a/udsoncan/exceptions.py b/udsoncan/exceptions.py index 239488c..c4d337f 100755 --- a/udsoncan/exceptions.py +++ b/udsoncan/exceptions.py @@ -3,7 +3,8 @@ 'NegativeResponseException', 'InvalidResponseException', 'UnexpectedResponseException', - 'ConfigError' + 'ConfigError', + 'SecurityAccessDeniedException' ] from udsoncan.Response import Response from typing import Any @@ -107,3 +108,35 @@ class ConfigError(Exception): def __init__(self, key: Any, msg="", *args, **kwargs): self.key = key super().__init__(msg, *args, **kwargs) + + +class SecurityAccessDeniedException(Exception): + """ + Raised when the client attempts to access a protected service, DID, or routine + without having unlocked the required security level. + + :param required_level: The security level that is required but not unlocked + :type required_level: int + + :param resource_type: The type of resource being accessed (service, did, routine) + :type resource_type: str + + :param resource_id: The identifier of the resource being accessed + :type resource_id: int + + """ + required_level: int + resource_type: str + resource_id: int + + def __init__(self, required_level: int, resource_type: str, resource_id: int, *args, **kwargs): + self.required_level = required_level + self.resource_type = resource_type + self.resource_id = resource_id + msg = "Security access denied: Required level 0x%02x not unlocked for %s 0x%04x" % ( + required_level, resource_type, resource_id + ) + if len(args) > 0: + msg += " " + str(args[0]) + args = tuple(list(args)[1:]) + super().__init__(msg, *args, **kwargs) diff --git a/udsoncan/typing.py b/udsoncan/typing.py index b79ce07..3f29584 100644 --- a/udsoncan/typing.py +++ b/udsoncan/typing.py @@ -26,6 +26,11 @@ class IOConfigEntry(TypedDict, total=False): IOConfig = Dict[Union[int, str], Union[IOConfigEntry, CodecDefinition]] +ProtectedServicesConfig = Dict[int, int] +ProtectedDidsConfig = Dict[int, int] +ProtectedRoutinesConfig = Dict[int, int] + + class ClientConfig(TypedDict, total=False): exception_on_negative_response: bool exception_on_invalid_response: bool @@ -47,3 +52,6 @@ class ClientConfig(TypedDict, total=False): logger_name: str extended_data_size: Optional[Union[int, Dict[int, int]]] nrc78_callback:Optional[Nrc78CallbackType] + protected_services: Optional[ProtectedServicesConfig] + protected_dids: Optional[ProtectedDidsConfig] + protected_routines: Optional[ProtectedRoutinesConfig]