From e2e34cebc3dcd72d29178c79474ee0e99cc5c6b1 Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Sun, 28 Jun 2026 21:15:26 +0200 Subject: [PATCH 1/2] Ticketer, DNS and Kerberos fixes - Ticketer: bad timezone conversions, improve handling of PAC_CLIENT_INFO when editing a ticket - kerberos: support new error type when a password change is necessary - DNS: handle case where a weird payload is returned AI-Assisted: no --- scapy/layers/dns.py | 2 +- scapy/layers/kerberos.py | 3 +- scapy/layers/ldap.py | 2 +- scapy/layers/msrpce/mspac.py | 31 ++++++----- scapy/layers/ntlm.py | 14 ++++- scapy/layers/smbclient.py | 3 +- scapy/layers/windows/erref.py | 1 + scapy/modules/ticketer.py | 94 ++++++++++++++++++++++++++++------ test/scapy/layers/kerberos.uts | 13 +++++ 9 files changed, 126 insertions(+), 37 deletions(-) diff --git a/scapy/layers/dns.py b/scapy/layers/dns.py index e222b8cfeb6..432b0a06066 100644 --- a/scapy/layers/dns.py +++ b/scapy/layers/dns.py @@ -1505,7 +1505,7 @@ def dns_resolve(qname, qtype="A", raw=False, tcp=False, verbose=1, timeout=3, ** result = [ x for x in itertools.chain(res.an, res.ns, res.ar) - if x.type == qtype + if getattr(x, "type", None) == qtype ] if result: # Cache it diff --git a/scapy/layers/kerberos.py b/scapy/layers/kerberos.py index f02eda70227..7037545db8f 100644 --- a/scapy/layers/kerberos.py +++ b/scapy/layers/kerberos.py @@ -2136,12 +2136,13 @@ def m2i(self, pkt, s): # 36: KRB_AP_ERR_BADMATCH # 80: KDC_ERR_DIGEST_IN_SIGNED_DATA_NOT_ACCEPTED return MethodData(val[0].val, _underlayer=pkt), val[1] - elif pkt.errorCode.val in [6, 7, 12, 13, 18, 29, 32, 41, 60, 62]: + elif pkt.errorCode.val in [6, 7, 12, 13, 18, 23, 29, 32, 41, 60, 62]: # 6: KDC_ERR_C_PRINCIPAL_UNKNOWN # 7: KDC_ERR_S_PRINCIPAL_UNKNOWN # 12: KDC_ERR_POLICY # 13: KDC_ERR_BADOPTION # 18: KDC_ERR_CLIENT_REVOKED + # 23: KDC_ERR_KEY_EXPIRED # 29: KDC_ERR_SVC_UNAVAILABLE # 32: KRB_AP_ERR_TKT_EXPIRED # 41: KRB_AP_ERR_MODIFIED diff --git a/scapy/layers/ldap.py b/scapy/layers/ldap.py index 74ce0244f65..b715da2d72b 100644 --- a/scapy/layers/ldap.py +++ b/scapy/layers/ldap.py @@ -1633,7 +1633,7 @@ def dclocator( response = next( NETLOGON(x.values[0].value.val) for x in pkt.protocolOp.attributes - if x.type.val == b"Netlogon" + if x.type.val.lower() == b"netlogon" ) except StopIteration: pass diff --git a/scapy/layers/msrpce/mspac.py b/scapy/layers/msrpce/mspac.py index 1a96a9afd13..be0767e5c9e 100644 --- a/scapy/layers/msrpce/mspac.py +++ b/scapy/layers/msrpce/mspac.py @@ -66,6 +66,8 @@ ) from scapy.layers.ntlm import ( _NTLMPayloadField, + _NTLM_ENUM, + _NTLM_post_build, _NTLMPayloadPacket, ) from scapy.layers.windows.security import WINNT_SID @@ -332,21 +334,6 @@ class S4U_DELEGATION_INFO(NDRPacket): # sect 2.10 -def _pac_post_build(self, p, pay_offset, fields): - """Util function to build the offset and populate the lengths""" - for field_name, value in self.fields["Payload"]: - length = self.get_field("Payload").fields_map[field_name].i2len(self, value) - offset = fields[field_name] - # Length - if self.getfieldval(field_name + "Len") is None: - p = p[:offset] + struct.pack(" bytes + # The offsets are different depending on whether we're in S mode or not offset = 12 fields = { "Upn": 0, @@ -422,12 +411,22 @@ def post_build(self, pkt, pay): offset = 20 fields["SamName"] = 12 fields["Sid"] = 16 + + # Apply padding + if len(pkt + pay) % 8 != 0: + pkt += b"\x00" * (-len(pkt) % 8) + + # Post-build to update the sizes return ( - _pac_post_build( + _NTLM_post_build( self, pkt, offset, fields, + config=[ + ("Len", _NTLM_ENUM.LEN), + ("BufferOffset", _NTLM_ENUM.OFFSET | _NTLM_ENUM.PAD8), + ], ) + pay ) diff --git a/scapy/layers/ntlm.py b/scapy/layers/ntlm.py index fd94cc6f2ec..1af1139b9b4 100644 --- a/scapy/layers/ntlm.py +++ b/scapy/layers/ntlm.py @@ -114,6 +114,7 @@ class _NTLMPayloadField(_StrField[List[Tuple[str, Any]]]): __slots__ = [ "fields", "fields_map", + "fields_pad", "offset", "length_from", "force_order", @@ -126,6 +127,7 @@ def __init__( name, # type: str offset, # type: Union[int, Callable[[Packet], int]] fields, # type: List[Field[Any, Any]] + fields_pad=0, # type: int length_from=None, # type: Optional[Callable[[Packet], int]] force_order=None, # type: Optional[List[str]] offset_name="BufferOffset", # type: str @@ -137,6 +139,7 @@ def __init__( self.length_from = length_from self.force_order = force_order # whether the order of fields is fixed self.offset_name = offset_name + self.fields_pad = fields_pad super(_NTLMPayloadField, self).__init__( name, [ @@ -196,6 +199,10 @@ def addfield(self, pkt, s, val): if offset is None: # No offset specified: calc offset = len(buf) + if self.fields_pad: + pad = (-offset) % self.fields_pad + offset += pad + buf.append(pad * b"\x00", len(buf)) else: # Calc relative offset offset -= r_off @@ -370,8 +377,9 @@ class _NTLM_ENUM(IntEnum): def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG): """Util function to build the offset and populate the lengths""" + gl_fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME) for field_name, value in self.fields[self._NTLM_PAYLOAD_FIELD_NAME]: - fld = self.get_field(self._NTLM_PAYLOAD_FIELD_NAME).fields_map[field_name] + fld = gl_fld.fields_map[field_name] length = fld.i2len(self, value) count = fld.i2count(self, value) offset = fields[field_name] @@ -391,7 +399,9 @@ def _NTLM_post_build(self, p, pay_offset, fields, config=_NTLM_CONFIG): else: raise ValueError if ftype & _NTLM_ENUM.PAD8: - fval += (-fval) % 8 + pad = (-fval) % 8 + fval += pad + pay_offset += pad sz = self.get_field(field_name + fname).sz if self.getfieldval(field_name + fname) is None: p = ( diff --git a/scapy/layers/smbclient.py b/scapy/layers/smbclient.py index 975b670a6eb..137350b409c 100644 --- a/scapy/layers/smbclient.py +++ b/scapy/layers/smbclient.py @@ -443,7 +443,8 @@ def receive_negotiate_response(self, pkt): if self.REQUIRE_ENCRYPTION and not self.session.SupportsEncryption: self.ErrorStatus = "NEGOTIATE FAILURE: encryption." raise self.NEGO_FAILED() - self.update_smbheader(pkt) + if self.SMB2: + self.update_smbheader(pkt) raise self.NEGOTIATED(ssp_blob) elif SMBNegotiate_Response_Security in pkt: # Non-extended SMB1 diff --git a/scapy/layers/windows/erref.py b/scapy/layers/windows/erref.py index f3a2440198f..9adbd46fc93 100644 --- a/scapy/layers/windows/erref.py +++ b/scapy/layers/windows/erref.py @@ -49,6 +49,7 @@ 0xC0000022: "STATUS_ACCESS_DENIED", 0xC0000033: "STATUS_OBJECT_NAME_INVALID", 0xC0000034: "STATUS_OBJECT_NAME_NOT_FOUND", + 0xC0000224: "STATUS_PASSWORD_MUST_CHANGE", 0xC0000043: "STATUS_SHARING_VIOLATION", 0xC0000061: "STATUS_PRIVILEGE_NOT_HELD", 0xC0000064: "STATUS_NO_SUCH_USER", diff --git a/scapy/modules/ticketer.py b/scapy/modules/ticketer.py index 4bc469a78f9..27cb98a642f 100644 --- a/scapy/modules/ticketer.py +++ b/scapy/modules/ticketer.py @@ -48,7 +48,11 @@ from scapy.packet import Packet from scapy.utils import pretty_list -from scapy.layers.dcerpc import NDRUnion +from scapy.layers.dcerpc import ( + NDRUnion, + NDRConformantArray, + NDRVaryingArray, +) from scapy.layers.kerberos import ( AuthorizationData, AuthorizationDataItem, @@ -1442,10 +1446,39 @@ def _build_ticket(self, store): ] ), LogonServer=RPC_UNICODE_STRING( - Buffer=store["VI.LogonServer"], + MaximumLength=( + len(store["VI.LogonServer"]) + 1 + ) + * 2, + Buffer=NDRConformantArray( + max_count=len( + store["VI.LogonServer"] + ) + + 1, + value=NDRVaryingArray( + value=store[ + "VI.LogonServer" + ], + ), + ), ), LogonDomainName=RPC_UNICODE_STRING( - Buffer=store["VI.LogonDomainName"], + MaximumLength=( + len(store["VI.LogonDomainName"]) + + 1 + ) + * 2, + Buffer=NDRConformantArray( + max_count=len( + store["VI.LogonDomainName"] + ) + + 1, + value=NDRVaryingArray( + value=store[ + "VI.LogonDomainName" + ], + ), + ), ), LogonCount=store["VI.LogonCount"], BadPasswordCount=store[ @@ -1773,21 +1806,40 @@ def delete(): strf="%Y-%m-%d %H:%M:%S", ) - def _pretty_time(self, x): - return self._TIME_FIELD.i2repr(None, x).rsplit(" ", 1)[0] - def _utc_to_mstime(self, x): + """ + Convert a linux epoch into MS time (from 1601) + """ return int((x - self._TIME_FIELD.delta) * 1e7) def _time_to_int(self, x): + """ + Non ASN.1 strptime => MS time + """ return self._utc_to_mstime( - datetime.strptime(x, self._TIME_FIELD.strf).timestamp() + datetime.strptime(x, self._TIME_FIELD.strf) + .replace(tzinfo=timezone.utc) + .timestamp() ) + def _pretty_time(self, x): + """ + Non ASN.1 strftime + """ + return self._TIME_FIELD.i2repr(None, x).rsplit(" ", 1)[0] + def _time_to_asn1(self, x): - return ASN1_GENERALIZED_TIME(datetime.strptime(x, self._TIME_FIELD.strf)) + """ + Epoch to ASN.1 time + """ + return ASN1_GENERALIZED_TIME( + datetime.strptime(x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) + ) def _time_to_filetime(self, x): + """ + Non ASN.1 strptime => FILETIME + """ if isinstance(x, str) and x.strip() == "NEVER": return FILETIME(dwHighDateTime=0x7FFFFFFF, dwLowDateTime=0xFFFFFFFF) if isinstance(x, str): @@ -1799,7 +1851,10 @@ def _time_to_filetime(self, x): dwLowDateTime=x & 0xFFFFFFFF, ) - def _filetime_totime(self, x): + def _filetime_pretty_time(self, x): + """ + Non ASN.1 strftime + """ if x.dwHighDateTime == 0x7FFFFFFF and x.dwLowDateTime == 0xFFFFFFFF: return "NEVER" return self._pretty_time((x.dwHighDateTime << 32) + x.dwLowDateTime) @@ -1819,20 +1874,20 @@ def _getLogonInformation(self, pac, element): self._make_fields( element, [ - ("LogonTime", self._filetime_totime(logonInfo.LogonTime)), - ("LogoffTime", self._filetime_totime(logonInfo.LogoffTime)), - ("KickOffTime", self._filetime_totime(logonInfo.KickOffTime)), + ("LogonTime", self._filetime_pretty_time(logonInfo.LogonTime)), + ("LogoffTime", self._filetime_pretty_time(logonInfo.LogoffTime)), + ("KickOffTime", self._filetime_pretty_time(logonInfo.KickOffTime)), ( "PasswordLastSet", - self._filetime_totime(logonInfo.PasswordLastSet), + self._filetime_pretty_time(logonInfo.PasswordLastSet), ), ( "PasswordCanChange", - self._filetime_totime(logonInfo.PasswordCanChange), + self._filetime_pretty_time(logonInfo.PasswordCanChange), ), ( "PasswordMustChange", - self._filetime_totime(logonInfo.PasswordMustChange), + self._filetime_pretty_time(logonInfo.PasswordMustChange), ), ( "EffectiveName", @@ -2503,12 +2558,14 @@ def _resign_ticket(self, tkt, spn, hash=None, kdc_hash=None): rpac = tkt.authorizationData.seq[0].adData.seq[0].adData # real pac tmp_tkt = tkt.copy() # fake ticket and pac used for computation pac = tmp_tkt.authorizationData.seq[0].adData.seq[0].adData + # Variables for Signatures, indexed by ulType sig_i = {} sig_type = {} # Read PAC buffers to find all signatures, and set them to 0 for k, buf in enumerate(pac.Buffers): if buf.ulType in [0x00000006, 0x00000007, 0x00000010, 0x00000013]: + # Signatures sig_i[buf.ulType] = k sig_type[buf.ulType] = pac.Payloads[k].SignatureType try: @@ -2519,6 +2576,13 @@ def _resign_ticket(self, tkt, spn, hash=None, kdc_hash=None): raise ValueError("Unknown/Unsupported signatureType") rpac.Buffers[k].cbBufferSize = None rpac.Buffers[k].Offset = None + elif buf.ulType == 0x0000000A: + # CLIENT_INFO + # The timestamp in the PAC for CLIENT_INFO must match the one from + # the outer Ticket, else it will count as an invalid signature. + rpac.Payloads[k].ClientId = self._utc_to_mstime( + tkt.authtime.datetime.timestamp() + ) # There must at least be Server Signature and KDC Signature if any(x not in sig_i for x in [0x00000006, 0x00000007]): diff --git a/test/scapy/layers/kerberos.uts b/test/scapy/layers/kerberos.uts index e7805b35c99..b388f3989c1 100644 --- a/test/scapy/layers/kerberos.uts +++ b/test/scapy/layers/kerberos.uts @@ -924,6 +924,19 @@ assert isinstance(pkt.payload, Raw) and pkt.load == b"\x00\x00\x00\x00" pkt.clear_cache() assert bytes(pkt) == data += MSPAC - Build UPN_DNS_INFO from scratch + +from scapy.layers.msrpce.mspac import UPN_DNS_INFO + +pkt = UPN_DNS_INFO( + Upn="Administrator@DOMAIN.LOCAL", + DnsDomainName="DOMAIN.LOCAL", + Sid=WINNT_SID(), + SamName="Administrator", + Flags="U+S", +) + +assert bytes(pkt) == b'4\x00\x18\x00\x18\x00P\x00\x03\x00\x00\x00\x1a\x00x\x00\x0c\x00h\x00\x00\x00\x00\x00A\x00d\x00m\x00i\x00n\x00i\x00s\x00t\x00r\x00a\x00t\x00o\x00r\x00@\x00D\x00O\x00M\x00A\x00I\x00N\x00.\x00L\x00O\x00C\x00A\x00L\x00\x00\x00\x00\x00D\x00O\x00M\x00A\x00I\x00N\x00.\x00L\x00O\x00C\x00A\x00L\x00\x01\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00A\x00d\x00m\x00i\x00n\x00i\x00s\x00t\x00r\x00a\x00t\x00o\x00r\x00\x00\x00\x00\x00\x00\x00' + Build a CLAIMS_SET to test size_of From 563dcb4ac55994cbce6a36ce18236ff30e51e5b7 Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Mon, 29 Jun 2026 21:27:22 +0200 Subject: [PATCH 2/2] Add S4U2Self+U2U example AI-Assisted: no --- doc/scapy/layers/kerberos.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/doc/scapy/layers/kerberos.rst b/doc/scapy/layers/kerberos.rst index f27577da5fd..7c0b8bfd85e 100644 --- a/doc/scapy/layers/kerberos.rst +++ b/doc/scapy/layers/kerberos.rst @@ -159,6 +159,20 @@ The ``armor_with`` keyword allows to select a ticket to armor the request with. Start time End time Renew until Auth time 15/04/25 20:15:20 16/04/25 06:10:22 16/04/25 06:10:22 15/04/25 20:15:17 +- **Perform S4U2Self+U2U to get the PAC** + +.. code:: pycon + + >>> load_module("ticketer") + >>> t = Ticketer() + >>> t.request_tgt("MyUser@domain.local", password="password") + >>> tgt, key, _ = t.export_krb(0) + >>> # Get PAC of Administrator@domain.local + >>> t.request_st(0, "MyUser@domain.local", + ... for_user="Administrator@domain.local", + ... u2u=True, additional_tickets=[tgt]) + >>> t.export_krb(1)[0].encPart.decrypt(key).show() + - **Request a ticket for a DMSA** For more information about DMSAs and how to create them, consult the `relevant Microsoft documentation `_. In this example we allowed ``SERVER1$`` to retrieve the managed password of ``dmsa_user$``.