From 20cf635b8fb017fdd4da316b32e74db5add2c3ba Mon Sep 17 00:00:00 2001 From: ozgen Date: Thu, 11 Jun 2026 13:34:42 +0200 Subject: [PATCH 1/3] add: add Host Discovery IPv6 alive test support for targets --- gvm/protocols/gmp/_gmpnext.py | 186 +++++++++ gvm/protocols/gmp/requests/next/__init__.py | 3 +- gvm/protocols/gmp/requests/next/_targets.py | 372 ++++++++++++++++++ .../gmpnext/entities/targets/__init__.py | 20 + .../entities/targets/test_clone_target.py | 26 ++ .../entities/targets/test_create_target.py | 255 ++++++++++++ .../entities/targets/test_delete_target.py | 29 ++ .../entities/targets/test_get_target.py | 41 ++ .../entities/targets/test_get_targets.py | 43 ++ .../entities/targets/test_modify_target.py | 233 +++++++++++ .../gmpnext/entities/test_targets.py | 4 +- .../gmpnext/enums/test_alive_test.py | 9 +- 12 files changed, 1215 insertions(+), 6 deletions(-) create mode 100644 gvm/protocols/gmp/requests/next/_targets.py create mode 100644 tests/protocols/gmpnext/entities/targets/__init__.py create mode 100644 tests/protocols/gmpnext/entities/targets/test_clone_target.py create mode 100644 tests/protocols/gmpnext/entities/targets/test_create_target.py create mode 100644 tests/protocols/gmpnext/entities/targets/test_delete_target.py create mode 100644 tests/protocols/gmpnext/entities/targets/test_get_target.py create mode 100644 tests/protocols/gmpnext/entities/targets/test_get_targets.py create mode 100644 tests/protocols/gmpnext/entities/targets/test_modify_target.py diff --git a/gvm/protocols/gmp/_gmpnext.py b/gvm/protocols/gmp/_gmpnext.py index 9a9a87f70..27edaf498 100644 --- a/gvm/protocols/gmp/_gmpnext.py +++ b/gvm/protocols/gmp/_gmpnext.py @@ -15,6 +15,7 @@ AgentInstallerInstructionLanguageType, AgentInstallerInstructions, Agents, + AliveTest, Credentials, CredentialStoreCredentialType, CredentialStores, @@ -29,6 +30,7 @@ ReportPorts, ReportTlsCertificates, ReportVulnerabilities, + Targets, Tasks, WebApplicationTargets, ) @@ -1382,3 +1384,187 @@ def get_web_application_targets( tasks=tasks, ) ) + + def create_target( + self, + name: str, + *, + asset_hosts_filter: str | None = None, + hosts: list[str] | None = None, + comment: str | None = None, + exclude_hosts: list[str] | None = None, + ssh_credential_id: EntityID | None = None, + ssh_credential_port: int | str | None = None, + smb_credential_id: EntityID | None = None, + esxi_credential_id: EntityID | None = None, + snmp_credential_id: EntityID | None = None, + alive_test: str | AliveTest | None = None, + allow_simultaneous_ips: bool | None = None, + reverse_lookup_only: bool | None = None, + reverse_lookup_unify: bool | None = None, + port_range: str | None = None, + port_list_id: EntityID | None = None, + ) -> T: + """Create a new target + + Args: + name: Name of the target + asset_hosts_filter: Filter to select target host from assets hosts + hosts: List of hosts addresses to scan + exclude_hosts: List of hosts addresses to exclude from scan + comment: Comment for the target + ssh_credential_id: UUID of a ssh credential to use on target + ssh_credential_port: The port to use for ssh credential + smb_credential_id: UUID of a smb credential to use on target + snmp_credential_id: UUID of a snmp credential to use on target + esxi_credential_id: UUID of a esxi credential to use on target + alive_test: Which alive test to use + allow_simultaneous_ips: Whether to scan multiple IPs of the + same host simultaneously + reverse_lookup_only: Whether to scan only hosts that have names + reverse_lookup_unify: Whether to scan only one IP when multiple IPs + have the same name. + port_range: Port range for the target + port_list_id: UUID of the port list to use on target + """ + return self._send_request_and_transform_response( + Targets.create_target( + name, + asset_hosts_filter=asset_hosts_filter, + hosts=hosts, + comment=comment, + exclude_hosts=exclude_hosts, + ssh_credential_id=ssh_credential_id, + ssh_credential_port=ssh_credential_port, + smb_credential_id=smb_credential_id, + esxi_credential_id=esxi_credential_id, + snmp_credential_id=snmp_credential_id, + alive_test=alive_test, + allow_simultaneous_ips=allow_simultaneous_ips, + reverse_lookup_only=reverse_lookup_only, + reverse_lookup_unify=reverse_lookup_unify, + port_range=port_range, + port_list_id=port_list_id, + ) + ) + + def modify_target( + self, + target_id: EntityID, + *, + name: str | None = None, + comment: str | None = None, + hosts: list[str] | None = None, + exclude_hosts: list[str] | None = None, + ssh_credential_id: EntityID | None = None, + ssh_credential_port: str | int | None = None, + smb_credential_id: EntityID | None = None, + esxi_credential_id: EntityID | None = None, + snmp_credential_id: EntityID | None = None, + alive_test: AliveTest | str | None = None, + allow_simultaneous_ips: bool | None = None, + reverse_lookup_only: bool | None = None, + reverse_lookup_unify: bool | None = None, + port_list_id: EntityID | None = None, + ) -> T: + """Modify an existing target. + + Args: + target_id: UUID of target to modify. + comment: Comment on target. + name: Name of target. + hosts: List of target hosts. + exclude_hosts: A list of hosts to exclude. + ssh_credential_id: UUID of SSH credential to use on target. + ssh_credential_port: The port to use for ssh credential + smb_credential_id: UUID of SMB credential to use on target. + esxi_credential_id: UUID of ESXi credential to use on target. + snmp_credential_id: UUID of SNMP credential to use on target. + port_list_id: UUID of port list describing ports to scan. + alive_test: Which alive tests to use. + allow_simultaneous_ips: Whether to scan multiple IPs of the + same host simultaneously + reverse_lookup_only: Whether to scan only hosts that have names. + reverse_lookup_unify: Whether to scan only one IP when multiple IPs + have the same name. + """ + return self._send_request_and_transform_response( + Targets.modify_target( + target_id, + name=name, + comment=comment, + hosts=hosts, + exclude_hosts=exclude_hosts, + ssh_credential_id=ssh_credential_id, + ssh_credential_port=ssh_credential_port, + smb_credential_id=smb_credential_id, + esxi_credential_id=esxi_credential_id, + snmp_credential_id=snmp_credential_id, + alive_test=alive_test, + allow_simultaneous_ips=allow_simultaneous_ips, + reverse_lookup_only=reverse_lookup_only, + reverse_lookup_unify=reverse_lookup_unify, + port_list_id=port_list_id, + ) + ) + + def clone_target(self, target_id: EntityID) -> T: + """Clone an existing target. + + Args: + target_id: UUID of an existing target to clone. + """ + return self._send_request_and_transform_response( + Targets.clone_target(target_id) + ) + + def delete_target( + self, target_id: EntityID, *, ultimate: bool | None = False + ) -> T: + """Delete an existing target. + + Args: + target_id: UUID of an existing target to delete. + ultimate: Whether to remove entirely or to the trashcan. + """ + return self._send_request_and_transform_response( + Targets.delete_target(target_id, ultimate=ultimate) + ) + + def get_target( + self, target_id: EntityID, *, tasks: bool | None = None + ) -> T: + """Request a single target. + + Args: + target_id: UUID of the target to request. + tasks: Whether to include list of tasks that use the target + """ + return self._send_request_and_transform_response( + Targets.get_target(target_id, tasks=tasks) + ) + + def get_targets( + self, + *, + filter_string: str | None = None, + filter_id: EntityID | None = None, + trash: bool | None = None, + tasks: bool | None = None, + ) -> T: + """Request a list of targets. + + Args: + filter_string: Filter term to use for the query. + filter_id: UUID of an existing filter to use for the query. + trash: Whether to include targets in the trashcan. + tasks: Whether to include list of tasks that use the target. + """ + return self._send_request_and_transform_response( + Targets.get_targets( + filter_string=filter_string, + filter_id=filter_id, + trash=trash, + tasks=tasks, + ) + ) diff --git a/gvm/protocols/gmp/requests/next/__init__.py b/gvm/protocols/gmp/requests/next/__init__.py index 664536938..570f20f6b 100644 --- a/gvm/protocols/gmp/requests/next/__init__.py +++ b/gvm/protocols/gmp/requests/next/__init__.py @@ -44,6 +44,7 @@ from gvm.protocols.gmp.requests.next._report_vulnerabilities import ( ReportVulnerabilities, ) +from gvm.protocols.gmp.requests.next._targets import AliveTest, Targets from gvm.protocols.gmp.requests.next._tasks import Tasks from gvm.protocols.gmp.requests.next._web_application_targets import ( WebApplicationTargets, @@ -58,7 +59,6 @@ AlertEvent, AlertMethod, Alerts, - AliveTest, AuditReports, Audits, Authentication, @@ -108,7 +108,6 @@ SortOrder, SystemReports, Tags, - Targets, Tickets, TicketStatus, TLSCertificates, diff --git a/gvm/protocols/gmp/requests/next/_targets.py b/gvm/protocols/gmp/requests/next/_targets.py new file mode 100644 index 000000000..c598e3c36 --- /dev/null +++ b/gvm/protocols/gmp/requests/next/_targets.py @@ -0,0 +1,372 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Optional + +from gvm._enum import Enum +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.core import Request +from gvm.utils import to_bool, to_comma_list +from gvm.xml import XmlCommand + +from .._entity_id import EntityID + + +class AliveTest(Enum): + """Enum for choosing an alive test""" + + SCAN_CONFIG_DEFAULT = "Scan Config Default" + ICMP_PING = "ICMP Ping" + TCP_ACK_SERVICE_PING = "TCP-ACK Service Ping" + TCP_SYN_SERVICE_PING = "TCP-SYN Service Ping" + ARP_PING = "ARP Ping" + APR_PING = "ARP Ping" # Alias for ARP_PING + ICMP_AND_TCP_ACK_SERVICE_PING = "ICMP & TCP-ACK Service Ping" + ICMP_AND_ARP_PING = "ICMP & ARP Ping" + TCP_ACK_SERVICE_AND_ARP_PING = "TCP-ACK Service & ARP Ping" + ICMP_TCP_ACK_SERVICE_AND_ARP_PING = ( # pylint: disable=invalid-name + "ICMP, TCP-ACK Service & ARP Ping" + ) + CONSIDER_ALIVE = "Consider Alive" + HOST_DISCOVERY_IPV6 = "Host Discovery IPv6" + + @classmethod + def from_string( + cls, + alive_test: str | None, + ) -> Optional["AliveTest"]: + """Convert an alive test string into an AliveTest instance""" + if not alive_test: + return None + + alive_test = alive_test.lower() + + try: + return cls[ + alive_test.replace(",", "") + .replace(" ", "_") + .replace("-", "_") + .replace("&", "and") + .upper() + ] + except KeyError: + raise InvalidArgument( + argument="alive_test", + function=cls.from_string.__name__, + ) from None + + +class Targets: + @classmethod + def create_target( + cls, + name: str, + *, + asset_hosts_filter: str | None = None, + hosts: list[str] | None = None, + comment: str | None = None, + exclude_hosts: list[str] | None = None, + ssh_credential_id: EntityID | None = None, + ssh_credential_port: int | str | None = None, + smb_credential_id: EntityID | None = None, + esxi_credential_id: EntityID | None = None, + snmp_credential_id: EntityID | None = None, + alive_test: str | AliveTest | None = None, + allow_simultaneous_ips: bool | None = None, + reverse_lookup_only: bool | None = None, + reverse_lookup_unify: bool | None = None, + port_range: str | None = None, + port_list_id: EntityID | None = None, + ) -> Request: + """Create a new target + + Args: + name: Name of the target + asset_hosts_filter: Filter to select target host from assets hosts + hosts: List of hosts addresses to scan + exclude_hosts: List of hosts addresses to exclude from scan + comment: Comment for the target + ssh_credential_id: UUID of a ssh credential to use on target + ssh_credential_port: The port to use for ssh credential + smb_credential_id: UUID of a smb credential to use on target + snmp_credential_id: UUID of a snmp credential to use on target + esxi_credential_id: UUID of a esxi credential to use on target + alive_test: Which alive test to use + allow_simultaneous_ips: Whether to scan multiple IPs of the + same host simultaneously + reverse_lookup_only: Whether to scan only hosts that have names + reverse_lookup_unify: Whether to scan only one IP when multiple IPs + have the same name. + port_range: Port range for the target + port_list_id: UUID of the port list to use on target + """ + if not name: + raise RequiredArgument( + function=cls.create_target.__name__, argument="name" + ) + + cmd = XmlCommand("create_target") + cmd.add_element("name", name) + + if asset_hosts_filter: + cmd.add_element( + "asset_hosts", attrs={"filter": str(asset_hosts_filter)} + ) + elif hosts: + cmd.add_element("hosts", to_comma_list(hosts)) + else: + raise RequiredArgument( + function=cls.create_target.__name__, + argument="hosts or asset_hosts_filter", + ) + + if comment: + cmd.add_element("comment", comment) + + if exclude_hosts: + cmd.add_element("exclude_hosts", to_comma_list(exclude_hosts)) + + if ssh_credential_id: + xml_ssh = cmd.add_element( + "ssh_credential", attrs={"id": str(ssh_credential_id)} + ) + if ssh_credential_port: + xml_ssh.add_element("port", str(ssh_credential_port)) + + if smb_credential_id: + cmd.add_element( + "smb_credential", attrs={"id": str(smb_credential_id)} + ) + + if esxi_credential_id: + cmd.add_element( + "esxi_credential", attrs={"id": str(esxi_credential_id)} + ) + + if snmp_credential_id: + cmd.add_element( + "snmp_credential", attrs={"id": str(snmp_credential_id)} + ) + + if alive_test: + if not isinstance(alive_test, AliveTest): + alive_test = AliveTest(alive_test) + cmd.add_element("alive_tests", alive_test.value) + + if allow_simultaneous_ips is not None: + cmd.add_element( + "allow_simultaneous_ips", to_bool(allow_simultaneous_ips) + ) + + if reverse_lookup_only is not None: + cmd.add_element("reverse_lookup_only", to_bool(reverse_lookup_only)) + + if reverse_lookup_unify is not None: + cmd.add_element( + "reverse_lookup_unify", to_bool(reverse_lookup_unify) + ) + + if port_range: + cmd.add_element("port_range", port_range) + + if port_list_id: + cmd.add_element("port_list", attrs={"id": str(port_list_id)}) + + return cmd + + @classmethod + def modify_target( + cls, + target_id: EntityID, + *, + name: str | None = None, + comment: str | None = None, + hosts: list[str] | None = None, + exclude_hosts: list[str] | None = None, + ssh_credential_id: EntityID | None = None, + ssh_credential_port: str | int | None = None, + smb_credential_id: EntityID | None = None, + esxi_credential_id: EntityID | None = None, + snmp_credential_id: EntityID | None = None, + alive_test: AliveTest | str | None = None, + allow_simultaneous_ips: bool | None = None, + reverse_lookup_only: bool | None = None, + reverse_lookup_unify: bool | None = None, + port_list_id: EntityID | None = None, + ) -> Request: + """Modify an existing target. + + Args: + target_id: UUID of target to modify. + comment: Comment on target. + name: Name of target. + hosts: List of target hosts. + exclude_hosts: A list of hosts to exclude. + ssh_credential_id: UUID of SSH credential to use on target. + ssh_credential_port: The port to use for ssh credential + smb_credential_id: UUID of SMB credential to use on target. + esxi_credential_id: UUID of ESXi credential to use on target. + snmp_credential_id: UUID of SNMP credential to use on target. + port_list_id: UUID of port list describing ports to scan. + alive_test: Which alive tests to use. + allow_simultaneous_ips: Whether to scan multiple IPs of the + same host simultaneously + reverse_lookup_only: Whether to scan only hosts that have names. + reverse_lookup_unify: Whether to scan only one IP when multiple IPs + have the same name. + """ + if not target_id: + raise RequiredArgument( + function=cls.modify_target.__name__, argument="target_id" + ) + + cmd = XmlCommand("modify_target") + cmd.set_attribute("target_id", str(target_id)) + + if comment: + cmd.add_element("comment", comment) + + if name: + cmd.add_element("name", name) + + if hosts: + cmd.add_element("hosts", to_comma_list(hosts)) + if exclude_hosts is None: + exclude_hosts = [""] + + if exclude_hosts: + cmd.add_element("exclude_hosts", to_comma_list(exclude_hosts)) + + if alive_test: + if not isinstance(alive_test, AliveTest): + alive_test = AliveTest(alive_test) + cmd.add_element("alive_tests", alive_test.value) + + if ssh_credential_id: + xml_ssh = cmd.add_element( + "ssh_credential", attrs={"id": str(ssh_credential_id)} + ) + + if ssh_credential_port: + xml_ssh.add_element("port", str(ssh_credential_port)) + + if smb_credential_id: + cmd.add_element( + "smb_credential", attrs={"id": str(smb_credential_id)} + ) + + if esxi_credential_id: + cmd.add_element( + "esxi_credential", attrs={"id": str(esxi_credential_id)} + ) + + if snmp_credential_id: + cmd.add_element( + "snmp_credential", attrs={"id": str(snmp_credential_id)} + ) + + if allow_simultaneous_ips is not None: + cmd.add_element( + "allow_simultaneous_ips", to_bool(allow_simultaneous_ips) + ) + + if reverse_lookup_only is not None: + cmd.add_element("reverse_lookup_only", to_bool(reverse_lookup_only)) + + if reverse_lookup_unify is not None: + cmd.add_element( + "reverse_lookup_unify", to_bool(reverse_lookup_unify) + ) + + if port_list_id: + cmd.add_element("port_list", attrs={"id": str(port_list_id)}) + + return cmd + + @classmethod + def clone_target(cls, target_id: EntityID) -> Request: + """Clone an existing target. + + Args: + target_id: UUID of an existing target to clone. + """ + if not target_id: + raise RequiredArgument( + function=cls.clone_target.__name__, argument="target_id" + ) + + cmd = XmlCommand("create_target") + cmd.add_element("copy", str(target_id)) + return cmd + + @classmethod + def delete_target( + cls, target_id: EntityID, *, ultimate: bool | None = False + ) -> Request: + """Delete an existing target. + + Args: + target_id: UUID of an existing target to delete. + ultimate: Whether to remove entirely or to the trashcan. + """ + if not target_id: + raise RequiredArgument( + function=cls.delete_target.__name__, argument="target_id" + ) + + cmd = XmlCommand("delete_target") + cmd.set_attribute("target_id", str(target_id)) + cmd.set_attribute("ultimate", to_bool(ultimate)) + return cmd + + @classmethod + def get_target( + cls, target_id: EntityID, *, tasks: bool | None = None + ) -> Request: + """Request a single target. + + Args: + target_id: UUID of the target to request. + tasks: Whether to include list of tasks that use the target + """ + if not target_id: + raise RequiredArgument( + function=cls.get_target.__name__, argument="target_id" + ) + + cmd = XmlCommand("get_targets") + cmd.set_attribute("target_id", str(target_id)) + + if tasks is not None: + cmd.set_attribute("tasks", to_bool(tasks)) + + return cmd + + @classmethod + def get_targets( + cls, + *, + filter_string: str | None = None, + filter_id: EntityID | None = None, + trash: bool | None = None, + tasks: bool | None = None, + ) -> Request: + """Request a list of targets. + + Args: + filter_string: Filter term to use for the query. + filter_id: UUID of an existing filter to use for the query. + trash: Whether to include targets in the trashcan. + tasks: Whether to include list of tasks that use the target. + """ + cmd = XmlCommand("get_targets") + cmd.add_filter(filter_string, filter_id) + + if trash is not None: + cmd.set_attribute("trash", to_bool(trash)) + + if tasks is not None: + cmd.set_attribute("tasks", to_bool(tasks)) + + return cmd diff --git a/tests/protocols/gmpnext/entities/targets/__init__.py b/tests/protocols/gmpnext/entities/targets/__init__.py new file mode 100644 index 000000000..9e7a4c15e --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/__init__.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from .test_clone_target import GmpCloneTargetTestMixin +from .test_create_target import GmpCreateTargetTestMixin +from .test_delete_target import GmpDeleteTargetTestMixin +from .test_get_target import GmpGetTargetTestMixin +from .test_get_targets import GmpGetTargetsTestMixin +from .test_modify_target import GmpModifyTargetTestMixin + +__all__ = ( + "GmpCloneTargetTestMixin", + "GmpCreateTargetTestMixin", + "GmpDeleteTargetTestMixin", + "GmpGetTargetTestMixin", + "GmpGetTargetsTestMixin", + "GmpModifyTargetTestMixin", +) diff --git a/tests/protocols/gmpnext/entities/targets/test_clone_target.py b/tests/protocols/gmpnext/entities/targets/test_clone_target.py new file mode 100644 index 000000000..787a81ca7 --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/test_clone_target.py @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import RequiredArgument + + +class GmpCloneTargetTestMixin: + TARGET_ID = "00000000-0000-0000-0000-000000000000" + + def test_clone(self): + self.gmp.clone_target(self.TARGET_ID) + + self.connection.send.has_been_called_with( + "" + f"{self.TARGET_ID}" + "".encode() + ) + + def test_missing_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.clone_target("") + + with self.assertRaises(RequiredArgument): + self.gmp.clone_target(None) diff --git a/tests/protocols/gmpnext/entities/targets/test_create_target.py b/tests/protocols/gmpnext/entities/targets/test_create_target.py new file mode 100644 index 000000000..3988a0be8 --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/test_create_target.py @@ -0,0 +1,255 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.gmp.requests.next import AliveTest + + +class GmpCreateTargetTestMixin: + def test_create_target_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_target(None, hosts=["foo"]) + + with self.assertRaises(RequiredArgument): + self.gmp.create_target(name=None, hosts=["foo"]) + + with self.assertRaises(RequiredArgument): + self.gmp.create_target("", hosts=["foo"]) + + def test_create_target_with_asset_hosts_filter(self): + self.gmp.create_target("foo", asset_hosts_filter="name=foo") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b'' + b"" + ) + + def test_create_target_missing_hosts(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_target(name="foo") + + def test_create_target_with_comment(self): + self.gmp.create_target("foo", hosts=["foo"], comment="bar") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"bar" + b"" + ) + + def test_create_target_with_exclude_hosts(self): + self.gmp.create_target( + "foo", hosts=["foo", "bar"], exclude_hosts=["bar", "ipsum"] + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo,bar" + b"bar,ipsum" + b"" + ) + + def test_create_target_with_ssh_credential(self): + self.gmp.create_target("foo", hosts=["foo"], ssh_credential_id="c1") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b'' + b"" + ) + + def test_create_target_with_ssh_credential_port(self): + self.gmp.create_target( + "foo", + hosts=["foo"], + ssh_credential_id="c1", + ssh_credential_port=123, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b'' + b"123" + b"" + b"" + ) + + def test_create_target_with_smb_credential_id(self): + self.gmp.create_target("foo", hosts=["foo"], smb_credential_id="c1") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b'' + b"" + ) + + def test_create_target_with_esxi_credential_id(self): + self.gmp.create_target("foo", hosts=["foo"], esxi_credential_id="c1") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b'' + b"" + ) + + def test_create_target_with_snmp_credential_id(self): + self.gmp.create_target("foo", hosts=["foo"], snmp_credential_id="c1") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b'' + b"" + ) + + def test_create_target_with_alive_tests(self): + self.gmp.create_target( + "foo", hosts=["foo"], alive_test=AliveTest.ICMP_PING + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"ICMP Ping" + b"" + ) + + def test_create_target_invalid_alive_tests(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_target("foo", hosts=["foo"], alive_test="foo") + + def test_create_target_with_allow_simultaneous_ips(self): + self.gmp.create_target( + "foo", hosts=["foo"], allow_simultaneous_ips=True + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"1" + b"" + ) + + self.gmp.create_target( + "foo", hosts=["foo"], allow_simultaneous_ips=False + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"0" + b"" + ) + + def test_create_target_with_reverse_lookup_only(self): + self.gmp.create_target("foo", hosts=["foo"], reverse_lookup_only=True) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"1" + b"" + ) + + self.gmp.create_target("foo", hosts=["foo"], reverse_lookup_only=False) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"0" + b"" + ) + + def test_create_target_with_reverse_lookup_unify(self): + self.gmp.create_target("foo", hosts=["foo"], reverse_lookup_unify=True) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"1" + b"" + ) + + self.gmp.create_target("foo", hosts=["foo"], reverse_lookup_unify=False) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"0" + b"" + ) + + def test_create_target_with_port_range(self): + self.gmp.create_target("foo", hosts=["foo"], port_range="bar") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"bar" + b"" + ) + + def test_create_target_with_port_list_id(self): + self.gmp.create_target("foo", hosts=["foo"], port_list_id="pl1") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b'' + b"" + ) + + def test_create_target_with_host_discovery_ipv6_alive_test_as_string(self): + self.gmp.create_target( + "foo", + hosts=["foo"], + alive_test="Host Discovery IPv6", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"Host Discovery IPv6" + b"" + ) + + def test_create_target_with_host_discovery_ipv6_alive_test(self): + self.gmp.create_target( + "foo", + hosts=["foo"], + alive_test=AliveTest.HOST_DISCOVERY_IPV6, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"foo" + b"Host Discovery IPv6" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/targets/test_delete_target.py b/tests/protocols/gmpnext/entities/targets/test_delete_target.py new file mode 100644 index 000000000..3f5fa35b5 --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/test_delete_target.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpDeleteTargetTestMixin: + def test_delete(self): + self.gmp.delete_target("a1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_delete_ultimate(self): + self.gmp.delete_target("a1", ultimate=True) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_missing_id(self): + with self.assertRaises(GvmError): + self.gmp.delete_target(None) + + with self.assertRaises(GvmError): + self.gmp.delete_target("") diff --git a/tests/protocols/gmpnext/entities/targets/test_get_target.py b/tests/protocols/gmpnext/entities/targets/test_get_target.py new file mode 100644 index 000000000..83c5d77d0 --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/test_get_target.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import RequiredArgument + + +class GmpGetTargetTestMixin: + def test_get_target(self): + self.gmp.get_target("t1") + + self.connection.send.has_been_called_with( + b'' + ) + + self.gmp.get_target(target_id="t1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_target_missing_target_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.get_target(target_id=None) + + with self.assertRaises(RequiredArgument): + self.gmp.get_target("") + + def test_get_target_with_tasks(self): + self.gmp.get_target(target_id="t1", tasks=True) + + self.connection.send.has_been_called_with( + b'' + ) + + self.gmp.get_target(target_id="t1", tasks=False) + + self.connection.send.has_been_called_with( + b'' + ) diff --git a/tests/protocols/gmpnext/entities/targets/test_get_targets.py b/tests/protocols/gmpnext/entities/targets/test_get_targets.py new file mode 100644 index 000000000..f00c6d912 --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/test_get_targets.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + + +class GmpGetTargetsTestMixin: + def test_get_targets(self): + self.gmp.get_targets() + + self.connection.send.has_been_called_with(b"") + + def test_get_targets_with_filter_string(self): + self.gmp.get_targets(filter_string="foo=bar") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_targets_with_filter_id(self): + self.gmp.get_targets(filter_id="f1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_targets_with_trash(self): + self.gmp.get_targets(trash=True) + + self.connection.send.has_been_called_with(b'') + + self.gmp.get_targets(trash=False) + + self.connection.send.has_been_called_with(b'') + + def test_get_targets_with_tasks(self): + self.gmp.get_targets(tasks=True) + + self.connection.send.has_been_called_with(b'') + + self.gmp.get_targets(tasks=False) + + self.connection.send.has_been_called_with(b'') diff --git a/tests/protocols/gmpnext/entities/targets/test_modify_target.py b/tests/protocols/gmpnext/entities/targets/test_modify_target.py new file mode 100644 index 000000000..aae550fe3 --- /dev/null +++ b/tests/protocols/gmpnext/entities/targets/test_modify_target.py @@ -0,0 +1,233 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.gmp.requests.next import AliveTest + + +class GmpModifyTargetTestMixin: + def test_modify_target(self): + self.gmp.modify_target(target_id="t1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_target_missing_target_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.modify_target(target_id=None) + + with self.assertRaises(RequiredArgument): + self.gmp.modify_target(target_id="") + + def test_modify_target_with_comment(self): + self.gmp.modify_target(target_id="t1", comment="foo") + + self.connection.send.has_been_called_with( + b'' + b"foo" + b"" + ) + + def test_modify_target_with_hosts(self): + self.gmp.modify_target(target_id="t1", hosts=["foo"]) + + self.connection.send.has_been_called_with( + b'' + b"foo" + b"" + b"" + ) + + self.gmp.modify_target(target_id="t1", hosts=["foo", "bar"]) + + self.connection.send.has_been_called_with( + b'' + b"foo,bar" + b"" + b"" + ) + + def test_modify_target_with_hosts_and_exclude_hosts(self): + self.gmp.modify_target( + target_id="t1", hosts=["foo", "bar"], exclude_hosts=["foo"] + ) + + self.connection.send.has_been_called_with( + b'' + b"foo,bar" + b"foo" + b"" + ) + + def test_modify_target_with_name(self): + self.gmp.modify_target(target_id="t1", name="foo") + + self.connection.send.has_been_called_with( + b'foo' + ) + + def test_modify_target_with_exclude_hosts(self): + self.gmp.modify_target(target_id="t1", exclude_hosts=["foo"]) + + self.connection.send.has_been_called_with( + b'' + b"foo" + b"" + ) + + self.gmp.modify_target(target_id="t1", exclude_hosts=["foo", "bar"]) + + self.connection.send.has_been_called_with( + b'' + b"foo,bar" + b"" + ) + + def test_modify_target_with_ssh_credential(self): + self.gmp.modify_target(target_id="t1", ssh_credential_id="c1") + + self.connection.send.has_been_called_with( + b'' + b'' + b"" + ) + + def test_modify_target_with_ssh_credential_port(self): + self.gmp.modify_target( + target_id="t1", ssh_credential_id="c1", ssh_credential_port=123 + ) + + self.connection.send.has_been_called_with( + b'' + b'' + b"123" + b"" + b"" + ) + + def test_modify_target_with_smb_credential_id(self): + self.gmp.modify_target(target_id="t1", smb_credential_id="c1") + + self.connection.send.has_been_called_with( + b'' + b'' + b"" + ) + + def test_modify_target_with_esxi_credential_id(self): + self.gmp.modify_target(target_id="t1", esxi_credential_id="c1") + + self.connection.send.has_been_called_with( + b'' + b'' + b"" + ) + + def test_modify_target_with_snmp_credential_id(self): + self.gmp.modify_target(target_id="t1", snmp_credential_id="c1") + + self.connection.send.has_been_called_with( + b'' + b'' + b"" + ) + + def test_modify_target_with_alive_tests(self): + self.gmp.modify_target(target_id="t1", alive_test=AliveTest.ICMP_PING) + + self.connection.send.has_been_called_with( + b'' + b"ICMP Ping" + b"" + ) + + def test_modify_target_invalid_alive_tests(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_target(target_id="t1", alive_test="foo") + + def test_modify_target_with_allow_simultaneous_ips(self): + self.gmp.modify_target(target_id="t1", allow_simultaneous_ips=True) + + self.connection.send.has_been_called_with( + b'' + b"1" + b"" + ) + + self.gmp.modify_target(target_id="t1", allow_simultaneous_ips=False) + + self.connection.send.has_been_called_with( + b'' + b"0" + b"" + ) + + def test_modify_target_with_reverse_lookup_only(self): + self.gmp.modify_target(target_id="t1", reverse_lookup_only=True) + + self.connection.send.has_been_called_with( + b'' + b"1" + b"" + ) + + self.gmp.modify_target(target_id="t1", reverse_lookup_only=False) + + self.connection.send.has_been_called_with( + b'' + b"0" + b"" + ) + + def test_modify_target_with_reverse_lookup_unify(self): + self.gmp.modify_target(target_id="t1", reverse_lookup_unify=True) + + self.connection.send.has_been_called_with( + b'' + b"1" + b"" + ) + + self.gmp.modify_target(target_id="t1", reverse_lookup_unify=False) + + self.connection.send.has_been_called_with( + b'' + b"0" + b"" + ) + + def test_modify_target_with_port_list_id(self): + self.gmp.modify_target(target_id="t1", port_list_id="pl1") + + self.connection.send.has_been_called_with( + b'' + b'' + b"" + ) + + def test_modify_target_with_host_discovery_ipv6_alive_test(self): + self.gmp.modify_target( + target_id="t1", + alive_test=AliveTest.HOST_DISCOVERY_IPV6, + ) + + self.connection.send.has_been_called_with( + b'' + b"Host Discovery IPv6" + b"" + ) + + def test_modify_target_with_host_discovery_ipv6_alive_test_as_string(self): + self.gmp.modify_target( + target_id="t1", + alive_test="Host Discovery IPv6", + ) + + self.connection.send.has_been_called_with( + b'' + b"Host Discovery IPv6" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/test_targets.py b/tests/protocols/gmpnext/entities/test_targets.py index f8c3408fb..4a975af04 100644 --- a/tests/protocols/gmpnext/entities/test_targets.py +++ b/tests/protocols/gmpnext/entities/test_targets.py @@ -3,7 +3,8 @@ # SPDX-License-Identifier: GPL-3.0-or-later # -from ...gmpv224.entities.targets import ( +from ...gmpnext import GMPTestCase +from ...gmpnext.entities.targets import ( GmpCloneTargetTestMixin, GmpCreateTargetTestMixin, GmpDeleteTargetTestMixin, @@ -11,7 +12,6 @@ GmpGetTargetTestMixin, GmpModifyTargetTestMixin, ) -from ...gmpv227 import GMPTestCase class GMPCloneTargetTestCase(GmpCloneTargetTestMixin, GMPTestCase): diff --git a/tests/protocols/gmpnext/enums/test_alive_test.py b/tests/protocols/gmpnext/enums/test_alive_test.py index ad15c7e1c..04c74d06b 100644 --- a/tests/protocols/gmpnext/enums/test_alive_test.py +++ b/tests/protocols/gmpnext/enums/test_alive_test.py @@ -6,7 +6,7 @@ import unittest from gvm.errors import InvalidArgument -from gvm.protocols.gmp.requests.v227 import AliveTest +from gvm.protocols.gmp.requests.next import AliveTest class GetAliveTestFromStringTestCase(unittest.TestCase): @@ -17,6 +17,7 @@ def test_invalid(self): def test_none_or_empty(self): ct = AliveTest.from_string(None) self.assertIsNone(ct) + ct = AliveTest.from_string("") self.assertIsNone(ct) @@ -32,7 +33,7 @@ def test_tcp_ack_service_ping(self): ct = AliveTest.from_string("TCP-ACK Service Ping") self.assertEqual(ct, AliveTest.TCP_ACK_SERVICE_PING) - def test_tcp_sync_service_ping(self): + def test_tcp_syn_service_ping(self): ct = AliveTest.from_string("TCP-SYN Service Ping") self.assertEqual(ct, AliveTest.TCP_SYN_SERVICE_PING) @@ -59,3 +60,7 @@ def test_icmp_tcp_ack_service_and_arp_ping(self): def test_consider_alive(self): ct = AliveTest.from_string("Consider Alive") self.assertEqual(ct, AliveTest.CONSIDER_ALIVE) + + def test_host_discovery_ipv6(self): + ct = AliveTest.from_string("Host Discovery IPv6") + self.assertEqual(ct, AliveTest.HOST_DISCOVERY_IPV6) From e97a061152d467c1ba7a815c4730d1bc984839d1 Mon Sep 17 00:00:00 2001 From: ozgen Date: Thu, 11 Jun 2026 13:44:32 +0200 Subject: [PATCH 2/3] fix: allow v224 alive tests in next target requests --- gvm/protocols/gmp/_gmpnext.py | 5 +++-- gvm/protocols/gmp/requests/next/_targets.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/gvm/protocols/gmp/_gmpnext.py b/gvm/protocols/gmp/_gmpnext.py index 27edaf498..4b560de7b 100644 --- a/gvm/protocols/gmp/_gmpnext.py +++ b/gvm/protocols/gmp/_gmpnext.py @@ -34,6 +34,7 @@ Tasks, WebApplicationTargets, ) +from .requests.v224 import AliveTest as AliveTestV224 from .requests.v224 import HostsOrdering @@ -1398,7 +1399,7 @@ def create_target( smb_credential_id: EntityID | None = None, esxi_credential_id: EntityID | None = None, snmp_credential_id: EntityID | None = None, - alive_test: str | AliveTest | None = None, + alive_test: str | AliveTest | AliveTestV224 | None = None, allow_simultaneous_ips: bool | None = None, reverse_lookup_only: bool | None = None, reverse_lookup_unify: bool | None = None, @@ -1461,7 +1462,7 @@ def modify_target( smb_credential_id: EntityID | None = None, esxi_credential_id: EntityID | None = None, snmp_credential_id: EntityID | None = None, - alive_test: AliveTest | str | None = None, + alive_test: str | AliveTest | AliveTestV224 | None = None, allow_simultaneous_ips: bool | None = None, reverse_lookup_only: bool | None = None, reverse_lookup_unify: bool | None = None, diff --git a/gvm/protocols/gmp/requests/next/_targets.py b/gvm/protocols/gmp/requests/next/_targets.py index c598e3c36..d3b5a006a 100644 --- a/gvm/protocols/gmp/requests/next/_targets.py +++ b/gvm/protocols/gmp/requests/next/_targets.py @@ -11,6 +11,7 @@ from gvm.xml import XmlCommand from .._entity_id import EntityID +from ..v224 import AliveTest as AliveTestV224 class AliveTest(Enum): @@ -72,7 +73,7 @@ def create_target( smb_credential_id: EntityID | None = None, esxi_credential_id: EntityID | None = None, snmp_credential_id: EntityID | None = None, - alive_test: str | AliveTest | None = None, + alive_test: str | AliveTest | AliveTestV224 | None = None, allow_simultaneous_ips: bool | None = None, reverse_lookup_only: bool | None = None, reverse_lookup_unify: bool | None = None, @@ -189,7 +190,7 @@ def modify_target( smb_credential_id: EntityID | None = None, esxi_credential_id: EntityID | None = None, snmp_credential_id: EntityID | None = None, - alive_test: AliveTest | str | None = None, + alive_test: str | AliveTest | AliveTestV224 | None = None, allow_simultaneous_ips: bool | None = None, reverse_lookup_only: bool | None = None, reverse_lookup_unify: bool | None = None, From 10546773b4d2a3456fe1a98745335c0ca98c49cc Mon Sep 17 00:00:00 2001 From: ozgen mehmet Date: Thu, 11 Jun 2026 14:25:41 +0200 Subject: [PATCH 3/3] Update gvm/protocols/gmp/requests/next/_targets.py Co-authored-by: Ahmed <144101267+a-h-abdelsalam@users.noreply.github.com> --- gvm/protocols/gmp/requests/next/_targets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gvm/protocols/gmp/requests/next/_targets.py b/gvm/protocols/gmp/requests/next/_targets.py index d3b5a006a..16f843f21 100644 --- a/gvm/protocols/gmp/requests/next/_targets.py +++ b/gvm/protocols/gmp/requests/next/_targets.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2025 Greenbone AG +# SPDX-FileCopyrightText: 2026 Greenbone AG # # SPDX-License-Identifier: GPL-3.0-or-later