Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions PyMemoryEditor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
# Package-wide logger. Silent by default (NullHandler) — embedding apps opt in
# with `logging.basicConfig(level=logging.DEBUG)` or by attaching a handler to
# the "PyMemoryEditor" logger. Backends emit DEBUG for transient skips (pages
# vanished mid-scan) and WARNING for surprising-but-recovered conditions
# (partial reads, mach_vm_protect restore failure).
# vanished mid-scan, unreadable chunks) and WARNING for surprising-but-recovered
# conditions (the macOS mach_vm_protect restore failure). A partial read raises
# OSError rather than logging.
logger = logging.getLogger("PyMemoryEditor")
logger.addHandler(logging.NullHandler())

Expand Down
38 changes: 38 additions & 0 deletions PyMemoryEditor/linux/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,41 @@ def get_threads(pid: int) -> Generator[ThreadInfo, None, None]:
priority=priority,
raw=entry,
)


def get_processes() -> Generator[Tuple[int, str], None, None]:
"""
Yield ``(pid, name)`` for every process by listing ``/proc`` — each numeric
subdirectory is a live pid.

``name`` comes from ``/proc/<pid>/comm`` (the kernel truncates it to 15
characters via ``TASK_COMM_LEN``). Processes that vanish mid-scan are
skipped silently (logged at DEBUG).
"""
try:
entries = os.listdir("/proc")
except OSError as exc:
_logger.debug("get_processes: could not list /proc: %s", exc)
return

for entry in entries:
if not entry.isdigit():
continue
pid = int(entry)

try:
with open("/proc/{}/comm".format(entry), "r") as fh:
name = fh.readline().rstrip("\n")
except OSError as exc:
# Race (process exited) or permission issue — skip it.
_logger.debug("get_processes: could not read comm for pid=%s: %s", entry, exc)
continue

yield pid, name


def process_exists(pid: int) -> bool:
"""Return whether a process with ``pid`` currently exists (``/proc/<pid>``)."""
if pid < 0:
return False
return os.path.isdir("/proc/{}".format(pid))
74 changes: 73 additions & 1 deletion PyMemoryEditor/macos/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from ..util.pattern import PatternLike, compile_pattern

from .libsystem import libsystem, mach_error_message, mach_task_self_
from .libsystem import PROC_ALL_PIDS, libsystem, mach_error_message, mach_task_self_
from .types import (
KERN_INVALID_ADDRESS,
KERN_INVALID_ARGUMENT,
Expand Down Expand Up @@ -905,3 +905,75 @@ def search_values_by_addresses(
raise_error=raise_error,
transient_error_check=_is_transient,
)


def get_processes() -> Generator[Tuple[int, str], None, None]:
"""
Yield ``(pid, name)`` for every process via libproc's ``proc_listpids`` +
``proc_name`` — no ``task_for_pid`` (and thus no debugger entitlement)
required, since both operate on the BSD process table.

``name`` is the executable name ``proc_name`` reports, falling back to the
basename of ``proc_pidpath``; for the rare process that denies both it is an
empty string. Needs no special privileges.
"""
# First call sizes the pid array: proc_listpids(.., NULL, 0) returns the
# number of bytes that would be written.
nbytes = libsystem.proc_listpids(PROC_ALL_PIDS, 0, None, 0)
if nbytes <= 0:
_logger.debug("get_processes: proc_listpids sizing returned %d", nbytes)
return

count = nbytes // ctypes.sizeof(ctypes.c_int)
# Over-allocate slightly: the table can grow between the two calls.
pids = (ctypes.c_int * (count + 16))()
written = libsystem.proc_listpids(
PROC_ALL_PIDS, 0, pids, ctypes.sizeof(pids)
)
if written <= 0:
_logger.debug("get_processes: proc_listpids returned %d", written)
return

name_buffer = ctypes.create_string_buffer(256)
path_buffer = ctypes.create_string_buffer(4096)
for pid in pids[: written // ctypes.sizeof(ctypes.c_int)]:
# pid 0 is the kernel and shows up as a zero-filled slot — skip it.
if pid <= 0:
continue

length = libsystem.proc_name(pid, name_buffer, ctypes.sizeof(name_buffer))
if length > 0:
name = name_buffer.raw[:length].decode("utf-8", errors="replace")
else:
# proc_name is denied for many root-owned processes; recover the
# name from the executable path's basename instead.
plen = libsystem.proc_pidpath(pid, path_buffer, ctypes.sizeof(path_buffer))
if plen > 0:
path = path_buffer.raw[:plen].decode("utf-8", errors="replace")
name = os.path.basename(path)
else:
name = ""

yield int(pid), name


def process_exists(pid: int) -> bool:
"""
Return whether a process with ``pid`` currently exists.

Uses ``os.kill(pid, 0)``: it sends no signal but performs the existence /
permission check — ``ESRCH`` means no such process, ``EPERM`` means it
exists but is owned by another user (still True).
"""
# ``pid <= 0`` is rejected outright: os.kill(0, 0) targets the *caller's
# process group* (not "pid 0"), which would spuriously report True and
# diverge from the Linux/Windows backends.
if pid <= 0:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
36 changes: 36 additions & 0 deletions PyMemoryEditor/macos/libsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,42 @@ class rusage_info_v0(ctypes.Structure):
)
libsystem.proc_pid_rusage.restype = ctypes.c_int

# libproc process enumeration (in <libproc.h>, exported by libSystem).
# PROC_ALL_PIDS lists every pid; the buffer is an array of pid_t (c_int).
PROC_ALL_PIDS = 1

# int proc_listpids(uint32_t type, uint32_t typeinfo, void *buffer, int buffersize);
# With buffer=NULL/buffersize=0 it returns the number of bytes that would be
# written, letting the caller size the pid array. Returns bytes written (or -1).
libsystem.proc_listpids.argtypes = (
ctypes.c_uint32,
ctypes.c_uint32,
ctypes.c_void_p,
ctypes.c_int,
)
libsystem.proc_listpids.restype = ctypes.c_int

# int proc_name(int pid, void *buffer, uint32_t buffersize);
# Fills buffer with the executable name and returns its length. Returns 0 for
# processes the caller can't query (e.g. root-owned daemons as a normal user).
libsystem.proc_name.argtypes = (
ctypes.c_int,
ctypes.c_void_p,
ctypes.c_uint32,
)
libsystem.proc_name.restype = ctypes.c_int

# int proc_pidpath(int pid, void *buffer, uint32_t buffersize);
# Full executable path; its basename is the fallback name source when proc_name
# is denied — proc_pidpath succeeds for many processes proc_name refuses,
# including root-owned ones.
libsystem.proc_pidpath.argtypes = (
ctypes.c_int,
ctypes.c_void_p,
ctypes.c_uint32,
)
libsystem.proc_pidpath.restype = ctypes.c_int

# char *mach_error_string(mach_error_t error_value);
libsystem.mach_error_string.argtypes = (ctypes.c_int,)
libsystem.mach_error_string.restype = ctypes.c_char_p
Expand Down
12 changes: 7 additions & 5 deletions PyMemoryEditor/process/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,16 @@ def write_bool(self, address: int, value: bool) -> bool:

def read_string(self, address: int, byte_count: int) -> str:
"""
Read up to ``byte_count`` bytes, decode them as UTF-8 and return the
Read exactly ``byte_count`` bytes, decode them as UTF-8 and return the
text up to the first NUL terminator (C-string semantics).

Goes through the ``str`` read path, so invalid UTF-8 becomes ``U+FFFD``
(``errors="replace"``). ``byte_count`` is the maximum field width to
read; the NUL terminator and everything after it are dropped. To make a
shorter :meth:`write_string` read back cleanly here, write it with
``null_terminator=True`` (or into an already-zeroed field).
(``errors="replace"``). ``byte_count`` is the field width to read, not an
upper bound — those bytes must all be readable or an ``OSError`` is
raised; the NUL terminator and everything after it are then dropped from
the returned text. To make a shorter :meth:`write_string` read back
cleanly here, write it with ``null_terminator=True`` (or into an
already-zeroed field).
"""
return self.read_process_memory(address, str, byte_count).split("\x00", 1)[0]

Expand Down
6 changes: 4 additions & 2 deletions PyMemoryEditor/process/module_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ class ModuleInfo:
"""A single module (executable or shared library) loaded in a process.

:param name: file name of the module (e.g. ``"game.exe"``, ``"libc.so.6"``).
:param path: full path of the backing file on disk when the OS exposes it;
falls back to ``name`` when only the name is available.
May be empty on macOS when the image path can't be resolved.
:param path: full path of the backing file on disk. On Windows it falls back
to ``name`` when only the name is available; on macOS ``name`` derives
from ``path``, so an unresolvable image yields both as empty strings.
:param base_address: address where the module is loaded for this run — the
value to add static offsets to. Defeats ASLR for ``base + offset``
addressing.
Expand Down
6 changes: 4 additions & 2 deletions PyMemoryEditor/process/remote_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ class RemotePointer:
:param pytype: how to interpret the bytes at the resolved address (bool,
int, float, str or bytes). Defaults to ``int``.
:param bufflength: value size in bytes. May be ``None`` for numeric types
(defaults: int→4, float→8, bool→1); ``str`` and ``bytes`` require an
explicit size.
(defaults: int→4, float→8, bool→1). For ``str`` / ``bytes`` it is
required only to **read** (``.value`` / :meth:`read`) — there is no
value to infer the width from; **writing** accepts ``None`` and stores
the whole value (a set ``bufflength`` then caps the width, truncating).
:param ptr_size: pointer width used when walking ``offsets`` — 8 for 64-bit
targets, 4 for 32-bit. Leave ``None`` (the default) to use the target
process's :attr:`~PyMemoryEditor.process.abstract.AbstractProcess.pointer_size`,
Expand Down
5 changes: 3 additions & 2 deletions PyMemoryEditor/process/thread_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class ThreadInfo:
"""A single thread inside a target process.

:param tid: thread identifier (see module docstring — meaning is platform-dependent).
:param start_address: entry point of the thread, when the OS exposes it
cheaply. ``None`` when not available.
:param start_address: reserved for the thread's entry point. Currently
always ``None`` on every platform (no backend fetches it); kept as a
stable field for forward compatibility.
:param state: short human-readable state — e.g. ``"R"`` / ``"S"`` on Linux.
``None`` when not available.
:param priority: scheduling priority value as reported by the OS. The scale
Expand Down
43 changes: 32 additions & 11 deletions PyMemoryEditor/process/util.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
# -*- coding: utf-8 -*-

from typing import List, Optional

import psutil
import sys
from typing import Iterator, List, Optional, Tuple

from .errors import AmbiguousProcessNameError


# Native, dependency-free process enumeration. Each backend exposes:
# iter_processes() -> Iterator[(pid, name)] (name = executable name only)
# backend_process_exists(pid) -> bool
# Windows uses CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS); Linux walks /proc;
# macOS uses libproc's proc_listpids/proc_name. This is the same per-platform
# dispatch PyMemoryEditor already does for OpenProcess.
if sys.platform == "win32":
from ..win32.functions import GetProcesses as _iter_processes
from ..win32.functions import ProcessExists as _backend_process_exists

elif sys.platform.startswith("linux"):
from ..linux.functions import get_processes as _iter_processes
from ..linux.functions import process_exists as _backend_process_exists

elif sys.platform == "darwin":
from ..macos.functions import get_processes as _iter_processes
from ..macos.functions import process_exists as _backend_process_exists

else: # pragma: no cover - importing the package already raises on these.
def _iter_processes() -> Iterator[Tuple[int, str]]:
return iter(())

def _backend_process_exists(pid: int) -> bool:
return False


def get_process_ids_by_process_name(
process_name: str,
*,
Expand All @@ -30,12 +55,8 @@ def get_process_ids_by_process_name(

matches: List[int] = []

for process in psutil.process_iter(["name", "pid"]):
try:
name = process.info["name"] or ""
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue

for pid, name in _iter_processes():
name = name or ""
name_cmp = name if case_sensitive else name.casefold()

if exact_match:
Expand All @@ -44,7 +65,7 @@ def get_process_ids_by_process_name(
hit = process_name_cmp in name_cmp

if hit:
matches.append(process.info["pid"])
matches.append(pid)

return matches

Expand Down Expand Up @@ -77,4 +98,4 @@ def pid_exists(pid: int) -> bool:
"""
Check if the process ID exists.
"""
return psutil.pid_exists(pid)
return _backend_process_exists(pid)
Loading
Loading