import os import re import json import hashlib import subprocess import time from pathlib import Path from datetime import datetime, timedelta import requests import base64 from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import serialization, hashes try: import yara except Exception: yara = None SCANNER_VERSION = "0.1.7" RULE_VERSION = "2026.05.22" SERVER_URL = os.getenv("SERVER_URL", "http://127.0.0.1:8080/api/scan").strip() SCAN_API_KEY = os.getenv("SCAN_API_KEY", "").strip() UPLOAD_PUBLIC_KEY_B64 = os.getenv("UPLOAD_PUBLIC_KEY_B64", "").strip() CLIENT_ID_PATH = Path("scanner_client_id.txt") THREAT_NAMES_PATH = Path("threat_names.json") HARDWARE_BASELINE_PATH = Path("hardware_baseline.json") HWID_BASELINE_PATH = Path("hwid_baseline.json") HIGH_KEYWORDS = [ "kdmapper", "pcileech", "screamer", "captaindma", "leechcore", "memprocfs", "usb3380", "ft601", "ft600", "xilinx", "altera", "lattice", "kmbox", "km box", "kmbox net", "kmbox nb", "kmbox nvideo", "cheatengine", "cheat engine", "aimbot", "wallhack", "esp", "spoofer", "hwidspoofer", "bypass", "injector", "speedhack", "speed hack", "fakevgc", "fake vgc", "vgc bypass", "ace bypass", "anti-cheat bypass", "antiaim", "triggerbot", "silentaim", "radarhack", "柿子external", "柿子云external", "afg-cheat-sjz", "unicorn-cheat-sjz", "外挂", "过ace", "过检测", "过vanguard", "过eac", "过be", "绕过", "自瞄", "透视", "方框", "无后座", "锁头" ] MEDIUM_KEYWORDS = [ "loader", "mapper", "inject", "dma", "radar", "unlocker", "trainer", "fling", "hook", "overlay", "cheat", "hack", "memory", "driver", "modmenu", "external", "internal", "ch340", "ch341", "cp210", "cp210x", "ftdi", "usb serial", "serial converter", "hid-compliant vendor-defined", "usb composite device", "上号器", "辅助", "修改器", "驱动", "注入", "内存", "读写", "劫持" ] SCAN_EXTENSIONS = { ".exe", ".dll", ".sys", ".bat", ".cmd", ".ps1", ".vbs", ".js", ".zip", ".rar", ".7z" } KNOWN_GOOD_PATH_PARTS = [ r"\appdata\local\microsoft\onedrive", r"\appdata\roaming\zoom", r"\appdata\local\temp\roslyn", r"\appdata\local\temp\vbcscompiler", r"\programdata\lghub", r"\program files\lghub", r"\program files\nvidia corporation", r"\program files (x86)\steam", r"\program files\microsoft visual studio", r"\program files (x86)\microsoft visual studio", r"\windows\system32", r"\windows\syswow64", r"\windows\winsxs", r"\windows\servicing", r"\appdata\local\discord", r"\appdata\local\nvidia corporation", r"\appdata\local\epic games", r"\appdata\local\microsoft\windowsapps", r"\appdata\local\packages\microsoft.xboxgamingoverlay", r"\appdata\local\packages\microsoft.xboxspeechtotextoverlay", r"\appdata\local\connecteddevicesplatform", r"\appdata\roaming\eaanticheat.installer.tool", r"\appdata\local\python", r"\appdata\local\microsoft\visualstudio", r"\appdata\local\temp\cmuatiex", r"\appdata\local\temp\4mcwnkul", "\\programdata\\asus\\", "\\program files\\asus\\", "\\program files (x86)\\asus\\", "\\appdata\\local\\gameppsdk\\", "\\appdata\\local\\qiyou\\", "\\appdata\\local\\temp\\oopz\\" ] KNOWN_GOOD_FILENAMES = [ "webview2loader.dll", "api-ms-win-core-memory-l1-1-0.dll", "api-ms-win-core-libraryloader-l1-1-0.dll", "mdnsresponder.dll", "loguploader.dll", "microsoft.codeanalysis", "microsoft.extensions.dependencyinjection", "fsharp.editor.dll", "smapi.modbuildconfig.analyzer.dll" ] PREFETCH_EXE_WHITELIST = { "discord.exe", "discordhookhelper.exe", "discordhookhelper64.exe", "discordcrashhandler.exe", "discordvoice.exe", "steam.exe", "steamwebhelper.exe", "gameoverlayui.exe", "gameoverlayui64.exe", "nvidia overlay.exe", "nvidia share.exe", "nvidia container.exe", "nvcontainer.exe", "nvsphelper64.exe", "nvidia app.exe", "nvidia web helper.exe", "radeonsoftware.exe", "amdow.exe", "amddvr.exe", "intelgraphicscommandcenter.exe", "igfxem.exe", "igfxtray.exe", "gldriverquery.exe", "gldriverquery64.exe", "vulkandriverquery.exe", "vulkandriverquery64.exe", "gamebar.exe", "gamebarftserver.exe", "gamebarpresencewriter.exe", "gamingservices.exe", "xboxgamebar.exe", "epicgameslauncher.exe", "eosoverlayrenderer-win64-shipping.exe", "eadesktop.exe", "ealauncher.exe", "eabackgroundservice.exe", "eaaanticheat.gameservice.exe", "riotclientservices.exe", "riotclientux.exe", "battle.net.exe", "agent.exe", "oopz.exe", "oopz-overlay2.exe", "oopz_installer.exe", "obs64.exe", "obs32.exe", "streamlabs obs.exe", "streamlabs.exe", "chrome.exe", "msedge.exe", "firefox.exe", "code.exe", "cursor.exe", "devenv.exe", "msbuild.exe", "vbcscompiler.exe", "vstest.console.exe", "wegame.exe", "wegameminiloader.exe", "bugreport.exe", "douyin-downloader-v7.4.0-win3", "injecthelper.exe", "injecthelper_x64.exe", "qqpcexternal.exe" } NORMAL_DIRECTORY_WHITELIST_KEYWORDS = [ "\\windows\\system32\\", "\\windows\\syswow64\\", "\\windows\\winsxs\\", "\\windowsapps\\microsoft.", "\\packages\\microsoft.", "\\microsoft\\windowsapps\\", "\\microsoft\\edge\\", "\\microsoft\\edgewebview\\", "\\microsoft\\edgeupdate\\", "\\microsoft\\onedrive\\", "\\microsoft\\visualstudio\\", "\\microsoft visual studio\\", "\\visual studio\\", "\\diagnosticshub\\", "\\roslyn\\", "\\vbcscompiler\\", "\\vstelem\\", "\\ngenpdb\\", "\\microsoft.codeanalysis\\", "\\microsoft.net\\", "\\dotnet\\sdk\\", "\\dotnet\\shared\\", "\\appdata\\roaming\\cursor\\user\\globalstorage\\", "\\appdata\\roaming\\cursor\\user\\workspace storage\\", "\\appdata\\roaming\\cursor\\", "\\appdata\\roaming\\code\\user\\globalstorage\\", "\\appdata\\roaming\\code\\user\\workspace storage\\", "\\appdata\\roaming\\code - insiders\\user\\globalstorage\\", "\\appdata\\roaming\\visual studio code\\", "\\.vscode\\", "\\.vs\\", "\\appdata\\local\\discord\\", "\\discord\\modules\\discord_hook", "\\discord\\modules\\discord_overlay", "\\discord\\modules\\discord_desktop_overlay", "\\program files (x86)\\steam\\", "\\program files\\steam\\", "\\steam\\bin\\", "\\steam\\gameoverlayui", "\\steam\\steamapps\\common\\steamworks shared\\", "\\program files\\nvidia corporation\\", "\\program files (x86)\\nvidia corporation\\", "\\appdata\\local\\nvidia corporation\\", "\\appdata\\local\\nvidia corporation\\nvidia overlay", "\\program files\\amd\\", "\\program files\\amd software\\", "\\program files\\intel\\", "\\program files (x86)\\intel\\", "\\epic games\\eosoverlay", "\\epicgameslauncher\\", "\\program files (x86)\\epic games\\", "\\program files\\epic games\\", "\\appdata\\roaming\\eaanticheat", "\\appdata\\roaming\\ea anti-cheat", "\\program files\\ea games\\", "\\program files\\electronic arts\\", "\\eaaanticheat.installer.tool", "\\microsoft.xboxgamingoverlay_", "\\microsoft.xboxspeechtotextoverlay_", "\\microsoft.gamingservices_", "\\appdata\\local\\oopz\\", "\\appdata\\local\\temp\\oopz_installer\\", "\\oopz\\overlay\\", "\\oopz_installer\\app\\overlay\\", "\\obs-studio\\", "\\streamlabs\\", "\\twitch studio\\", "\\appdata\\local\\wegame\\", "\\wegameminiloader", "\\smapi-installer-", "\\smapi-internal", "\\python\\pythoncore-", "\\site-packages\\", "\\dist-packages\\", "\\appdata\\local\\programs\\python\\", ] AI_SCAN_PATH_EXCLUDE_KEYWORDS = [ "\\appdata\\roaming\\cursor\\user\\globalstorage\\", "\\appdata\\roaming\\cursor\\user\\workspace storage\\", "\\appdata\\roaming\\cursor\\", "\\appdata\\roaming\\code\\user\\globalstorage\\", "\\appdata\\roaming\\code\\user\\workspace storage\\", "\\appdata\\roaming\\visual studio code\\", "\\.vscode\\", "\\.vs\\", "\\claude-cli-nodejs\\cache\\", "\\appdata\\local\\temp\\claude\\", ] NORMAL_HID_VID_WHITELIST = {x.lower() for x in { "VID_046D", "VID_0B05", "VID_1532", "VID_1038", "VID_1B1C", "VID_2516", "VID_04D9", "VID_258A", "VID_05AC", "VID_045E", "VID_1EA7", "VID_0C45", "VID_0D8C", "VID_1B3F", }} NORMAL_HID_NAME_KEYWORDS = [ "hid-compliant mouse", "hid keyboard device", "hid-compliant consumer control device", "hid-compliant system controller", "hid-compliant vendor-defined device", "usb input device", "符合 hid 标准的鼠标", "符合 hid 标准的键盘", "符合 hid 标准的用户控制设备", "符合 hid 标准的系统控制器", "符合 hid 标准的供应商定义设备", "usb 输入设备", ] NORMAL_FILE_NAME_WHITELIST = { "discordhookhelper.exe", "discordhookhelper64.exe", "gameoverlayui.exe", "gameoverlayui64.exe", "nvidia overlay.exe", "nvidia share.exe", "gldriverquery.exe", "gldriverquery64.exe", "vulkandriverquery.exe", "vulkandriverquery64.exe", "oopz-overlay2.exe", "oopz.exe", "webview2loader.dll", "microsoft.visualstudio.shell.ui.internal.ni.pdb", "microsoft.internal.visualstudio.interop.ni.pdb", "microsoft.visualstudio.utilities.internal_16.3.90_netfx.37bf5b02489fb8d353ca", "wegameminiloader.exe", "bugreport.exe", "smapi-internal", "ascheckamddriver.exe", "injecthelper.exe", "injecthelper_x64.exe", "processfilter8.sys", } NORMAL_FOLDER_NAME_WHITELIST = { "discord_desktop_overlay-1", "discord_hook-1", "discord_overlay2-1", "eosoverlay", "nvidia overlay", "overlay", "microsoft.xboxgamingoverlay_8wekyb3d8bbwe", "microsoft.xboxspeechtotextoverlay_8wekyb3d8bbwe", "analyzerassemblyloader", "smapi-internal", "clr_loader", "setuploader", "filecontentindex", "temppe", "wegameminiloader", } NORMAL_SOFTWARE_REVIEW_KEYWORDS = [ "discord", "nvidia", "steam", "gameoverlayui", "eosoverlay", "epic games", "eaanticheat", "eaaanticheat", "xboxgamingoverlay", "microsoft.xbox", "visualstudio", "visual studio", "roslyn", "vbcscompiler", "diagnosticshub", "cursor", "vscode", "code.exe", "webview2", "oopz", "wegame", "smapi", "obs-studio", ] DANGEROUS_KEYWORDS_NEVER_FULLY_WHITELIST = [ "cheat engine", "xenos", "extreme injector", "manualmap", "manual map", "injector", "dllinject", "kdmapper", "mapper", "driver loader", "驱动加载", "注入工具", "过ace", "ace bypass", "fake vgc", "fakevgc", "vgcemulator", "kmbox", "dma", "pcileech", "leechcore", "柿子external", "柿子云external", "afg-cheat-sjz", "unicorn-cheat-sjz", "undetek", "aimbot.txt", "wait for the cheat to inject", "cheat loader", "press insert to open the menu", ] INJECTOR_INDICATORS = [ ("manualmap", 70, "manual map style indicator"), ("manual map", 70, "manual map style indicator"), ("injector", 65, "injector indicator"), ("dllinject", 65, "DLL injection indicator"), ("dll inject", 65, "DLL injection indicator"), ("loadlibrary", 55, "LoadLibrary injection indicator"), ("reflective", 55, "reflective loader indicator"), ("manual load", 50, "manual loader indicator"), ("thread hijack", 60, "thread hijack indicator"), ("create remote thread", 60, "CreateRemoteThread indicator"), ("setwindowshookex", 50, "SetWindowsHookEx injection indicator"), ("mapdriver", 65, "driver mapper indicator"), ("kernel inject", 65, "kernel injection indicator"), ("shellcode", 45, "shellcode indicator"), ("pe loader", 50, "PE loader indicator"), ("dll loader", 45, "DLL loader indicator") ] DISPLAY_FUSER_INDICATORS = [ ("edid", 30, "EDID-related display indicator"), ("emulator", 35, "display emulator indicator"), ("dummy", 35, "dummy display indicator"), ("headless", 35, "headless display indicator"), ("splitter", 35, "display splitter indicator"), ("matrix", 20, "display matrix indicator"), ("capture", 30, "video capture indicator"), ("hdmi usb", 30, "HDMI-to-USB capture indicator"), ("usb display", 25, "USB display bridge indicator"), ("displaylink", 20, "DisplayLink indicator"), ("virtual display", 35, "virtual display indicator"), ("indirect display", 30, "indirect display indicator"), ("parsec virtual", 30, "Parsec virtual display indicator"), ("spacedesk", 30, "spacedesk virtual display indicator"), ("sunshine", 25, "streaming display indicator"), ("moonlight", 25, "streaming display indicator"), ("obs virtual", 20, "OBS virtual display indicator"), ("kmbox nvideo", 85, "KMBOX video path indicator") ] AI_AIM_INDICATORS = [ ("yolo", 35, "YOLO model/runtime indicator"), ("onnx", 25, "ONNX inference indicator"), ("ultralytics", 30, "Ultralytics package indicator"), ("opencv", 20, "OpenCV vision indicator"), ("tensorrt", 25, "TensorRT inference indicator"), ("directml", 20, "DirectML inference indicator"), ("cuda", 15, "CUDA inference indicator"), ("screen capture", 20, "screen-capture inference indicator"), ("capture model", 25, "capture-and-infer indicator"), ("triggerbot", 45, "triggerbot indicator"), ("aimbot", 45, "aimbot indicator"), ("pixelbot", 45, "pixel-bot indicator"), ("colorbot", 45, "color-bot indicator"), ("silentaim", 45, "silent aim indicator"), ("mouse event", 20, "synthetic mouse event indicator"), ("sendinput", 20, "SendInput automation indicator"), ("arduino aim", 50, "Arduino-assisted aim indicator"), ("ai aim", 50, "AI aim indicator"), ("cv aim", 50, "computer-vision aim indicator") ] YARA_RULE_SOURCE = r''' rule Escort_Cheat_Loader_Instructions { strings: $a = "wait for the cheat to inject" nocase $b = "press insert to open the menu" nocase $c = "cheat loader" nocase $d = "run as admin" nocase condition: 2 of them } rule Escort_Injector_ManualMap_Toolchain { strings: $a = "manualmap" nocase $b = "manual map" nocase $c = "CreateRemoteThread" nocase $d = "LoadLibraryA" nocase $e = "VirtualAllocEx" nocase $f = "WriteProcessMemory" nocase $g = "reflective loader" nocase condition: 2 of them } rule Escort_DMA_Toolchain_Strings { strings: $a = "pcileech" nocase $b = "leechcore" nocase $c = "memprocfs" nocase $d = "ft601" nocase $e = "captaindma" nocase $f = "kmbox" nocase condition: any of them } rule Escort_AntiCheat_Bypass_Strings { strings: $a = "fake vgc" nocase $b = "fakevgc" nocase $c = "vgc bypass" nocase $d = "ace bypass" nocase $e = "anti-cheat bypass" nocase condition: any of them } ''' YARA_FALLBACK_RULES = [ ("Escort_Cheat_Loader_Instructions", ["wait for the cheat to inject", "press insert to open the menu", "cheat loader", "run as admin"], 2, 95), ("Escort_Injector_ManualMap_Toolchain", ["manualmap", "manual map", "createremotethread", "loadlibrarya", "virtualallocex", "writeprocessmemory", "reflective loader"], 2, 90), ("Escort_DMA_Toolchain_Strings", ["pcileech", "leechcore", "memprocfs", "ft601", "captaindma", "kmbox"], 1, 90), ("Escort_AntiCheat_Bypass_Strings", ["fake vgc", "fakevgc", "vgc bypass", "ace bypass", "anti-cheat bypass"], 1, 90), ] _YARA_RULES = None # Fast-scan safety limits. Large files are still recorded by metadata/path/signature, # but expensive YARA/hash work is capped to avoid hanging on huge archives or game files. MAX_TOTAL_SCAN_SECONDS = int(os.getenv("EAC_MAX_SCAN_SECONDS", "150")) MAX_RECENT_FILE_CANDIDATES = int(os.getenv("EAC_MAX_RECENT_FILE_CANDIDATES", "3500")) MAX_FILES_PER_ROOT = int(os.getenv("EAC_MAX_FILES_PER_ROOT", "900")) MAX_YARA_FILE_BYTES = int(os.getenv("EAC_MAX_YARA_FILE_MB", "50")) * 1024 * 1024 MAX_HASH_FILE_BYTES = int(os.getenv("EAC_MAX_HASH_FILE_MB", "150")) * 1024 * 1024 def scan_time_left(start_ts: float, limit_seconds: int = MAX_TOTAL_SCAN_SECONDS) -> float: return max(0.0, float(limit_seconds) - (time.monotonic() - float(start_ts))) def scan_deadline_exceeded(start_ts: float, limit_seconds: int = MAX_TOTAL_SCAN_SECONDS) -> bool: return scan_time_left(start_ts, limit_seconds) <= 0 CLIENT_RULES_CACHE = { "false_positive": None, "rules_yar": None, } def server_base_url() -> str: base = SERVER_URL.rsplit("/api/scan", 1)[0].rstrip("/") return base or "http://127.0.0.1:8080" def fetch_server_text(path: str, timeout: int = 8) -> str: try: url = server_base_url() + path resp = requests.get(url, timeout=timeout, headers={"X-API-Key": SCAN_API_KEY} if SCAN_API_KEY else {}) if resp.ok: return resp.text except Exception: pass return "" def fetch_server_json(path: str, timeout: int = 8): try: raw = fetch_server_text(path, timeout=timeout) return json.loads(raw) if raw else {} except Exception: return {} def load_server_false_positive_rules() -> dict: if CLIENT_RULES_CACHE.get("false_positive") is not None: return CLIENT_RULES_CACHE["false_positive"] data = fetch_server_json("/api/rules/false-positive") if not isinstance(data, dict): data = {} CLIENT_RULES_CACHE["false_positive"] = data return data def _fp_norm(value) -> str: return str(value or "").strip().lower().replace("/", "\\") def apply_client_false_positive_rules(findings: list) -> list: rules = load_server_false_positive_rules() entries = rules.get("entries", []) if isinstance(rules, dict) else [] if not entries: return findings learned_hashes = {_fp_norm(e.get("value")) for e in entries if e.get("kind") == "sha256"} learned_names = {_fp_norm(e.get("value")) for e in entries if e.get("kind") == "filename"} learned_paths = [_fp_norm(e.get("value")) for e in entries if e.get("kind") == "path_keyword"] out = [] for item in findings or []: item = dict(item or {}) evidence = dict(item.get("evidence", {}) or {}) hay = _fp_norm(json.dumps(item, ensure_ascii=False)) filename = _fp_norm(evidence.get("filename") or item.get("detail")) sha256 = _fp_norm(evidence.get("sha256")) matched = [] if sha256 and sha256 in learned_hashes: matched.append("sha256") if filename and filename in learned_names: matched.append("filename") if any(k and k in hay for k in learned_paths): matched.append("path_keyword") if matched: item["risk"] = min(int(item.get("risk", 0) or 0), 10) evidence["client_false_positive_learning"] = {"matched": matched, "rule_version": rules.get("generated_at", "")} evidence.setdefault("reasons", []) evidence["reasons"] = list(evidence.get("reasons") or []) + ["client-side false-positive learning lowered this finding"] item["evidence"] = evidence out.append(item) return out def get_external_yara_source() -> str: if CLIENT_RULES_CACHE.get("rules_yar") is not None: return CLIENT_RULES_CACHE["rules_yar"] raw = fetch_server_text("/downloads/rules.yar") if raw and "rule " in raw: CLIENT_RULES_CACHE["rules_yar"] = raw else: CLIENT_RULES_CACHE["rules_yar"] = "" return CLIENT_RULES_CACHE["rules_yar"] def get_scanner_integrity_status() -> dict: path = Path(__file__).resolve() if "__file__" in globals() else Path(sys.argv[0]).resolve() h = sha256_file(path) if path.exists() else "" suspicious_tools = [] try: raw = run_cmd('powershell -NoProfile -Command "Get-CimInstance Win32_Process | Select-Object Name,ExecutablePath | ConvertTo-Json -Depth 3"') lower = raw.lower() for name in ["x64dbg", "ida", "ida64", "ollydbg", "dnspy", "processhacker", "procmon", "wireshark", "fiddler"]: if name in lower: suspicious_tools.append(name) except Exception: pass return { "scanner_path": str(path), "scanner_sha256": h, "debug_or_analysis_tools": sorted(set(suspicious_tools)), "integrity_note": "Analysis tools only lower report trust; scanner does not self-delete or kill processes." } def scan_scanner_integrity() -> list: status = get_scanner_integrity_status() findings = [] reasons = [] risk = 0 if status.get("debug_or_analysis_tools"): risk += 25 reasons.append("debugging/reverse-engineering tools observed while scanner ran") path = status.get("scanner_path", "").lower() if "\\temp\\" in path or "\\downloads\\" in path: risk += 5 reasons.append("scanner executed from temporary/download path") if risk >= 20: findings.append({ "category": "scanner_integrity_warning", "risk": min(risk, 45), "title": "Scanner integrity/trust context", "detail": "Scanner ran with analysis/debug context", "evidence": {"scanner_integrity": status, "reasons": reasons} }) return findings def get_pre_scan_environment_snapshot() -> dict: return { "snapshot_time": datetime.now().isoformat(), "current_user": os.getenv("USERNAME", ""), "computer_name": os.getenv("COMPUTERNAME", ""), "uptime_raw": run_cmd('powershell -NoProfile -Command "(Get-CimInstance Win32_OperatingSystem).LastBootUpTime"')[:200], "network_adapters_raw": run_cmd('powershell -NoProfile -Command "Get-NetAdapter | Select-Object Name,Status,InterfaceDescription,MacAddress | ConvertTo-Json -Depth 4"')[:4000], "top_processes_raw": run_cmd('powershell -NoProfile -Command "Get-Process | Sort-Object CPU -Descending | Select-Object -First 40 ProcessName,Id,Path | ConvertTo-Json -Depth 4"')[:6000], } def scan_anti_forensics() -> list: findings = [] reasons = [] risk = 0 evidence = {} # Event log health: weak anti-forensics context only. try: logs = ps_json("Get-WinEvent -ListLog System,Security,Application -ErrorAction SilentlyContinue | Select-Object LogName,RecordCount,IsEnabled,LastWriteTime") evidence["event_logs"] = logs for log in logs: name = str(log.get("LogName", "")) count = int(log.get("RecordCount", 0) or 0) enabled = str(log.get("IsEnabled", "True")).lower() == "true" if not enabled: risk += 20; reasons.append(f"{name} event log disabled") elif count < 20: risk += 15; reasons.append(f"{name} event log has very low record count") except Exception: pass # Prefetch health. try: pf = Path(r"C:\Windows\Prefetch") pf_count = len(list(pf.glob("*.pf"))) if pf.exists() else 0 evidence["prefetch_count"] = pf_count if pf.exists() and pf_count < 10: risk += 20; reasons.append("Prefetch contains very few entries; possible cleanup or disabled prefetch") except Exception: pass # Defender history weak context. try: defender_paths = [Path(r"C:\ProgramData\Microsoft\Windows Defender\Scans\History"), Path(r"C:\ProgramData\Microsoft\Windows Defender\Support")] existing = [str(x) for x in defender_paths if x.exists()] evidence["defender_history_paths_present"] = existing if not existing: risk += 10; reasons.append("Windows Defender history/support paths not found") except Exception: pass if risk >= 25: findings.append({ "category": "anti_forensics", "risk": min(risk, 60), "title": "Possible evidence-cleanup or logging gap", "detail": "System logs/history appear incomplete", "evidence": {"reasons": reasons, **evidence, "note": "Anti-forensics signals are weak by themselves and require correlation."} }) return findings # ============================================================ # False-positive suppression / clean-machine baseline # ============================================================ SELF_ARTIFACT_FILENAMES = { "last_scan_payload.json", "last_scan_report.txt", "scanner_client_id.txt", "hardware_baseline.json", "hwid_baseline.json", "threat_names.json", } TRUSTED_VENDOR_PATH_KEYWORDS = [ "\\programdata\\asus\\", "\\program files\\asus\\", "\\program files (x86)\\asus\\", "\\asus system control interface\\", "\\armoury crate", "\\asusoptimization\\", "\\splendid\\", "\\appdata\\local\\gameppsdk\\", "\\appdata\\local\\qiyou\\", "\\appdata\\local\\temp\\oopz\\", "\\appdata\\local\\oopz\\", ] TRUSTED_SIGNER_KEYWORDS = [ "microsoft corporation", "microsoft windows", "nvidia corporation", "advanced micro devices", "amd", "intel corporation", "logitech", "razer", "steelseries", "corsair", "valve corp", "discord inc", "epic games", "electronic arts", "riot games", "obs project", "google llc", "mozilla corporation", ] TRUSTED_VENDOR_FILE_PREFIXES = ( "hwinfo_", "hwinfo64", ) TRUSTED_VENDOR_FILENAMES = { "ascheckamddriver.exe", "injecthelper.exe", "injecthelper_x64.exe", "qeeyoupacket", "processfilter8.sys", } PREFETCH_FALSE_POSITIVE_EXES = { "douyin-downloader-v7.4.0-win3", "injecthelper.exe", "injecthelper_x64.exe", "qqpcexternal.exe", } NORMAL_OEM_MANUFACTURERS = [ "asus", "asustek", "american megatrends", "ami", "lenovo", "dell", "hp", "hewlett", "acer", "gigabyte", "micro-star", "msi", "intel", "amd", "microsoft", ] DMA_STRONG_CONTEXT_KEYWORDS = [ "ft601", "ft600", "pcileech", "leechcore", "memprocfs", "captaindma", "screamer", "usb3380", "kmbox", "kmbox net", "192.168.2.188", ] USB_KMBOX_FINGERPRINTS = { "known_good": [ {"type": "vid", "value": vid.lower(), "label": "common HID peripheral VID"} for vid in NORMAL_HID_VID_WHITELIST ], "suspicious": [ {"type": "vid", "value": "vid_0403", "label": "FTDI USB bridge/dev-board VID"}, {"type": "vid", "value": "vid_1a86", "label": "CH340/CH341 serial bridge VID"}, {"type": "vid", "value": "vid_10c4", "label": "CP210x serial bridge VID"}, {"type": "driver", "value": "ftdibus", "label": "FTDI bus driver"}, {"type": "driver", "value": "ch341", "label": "CH341 serial driver"}, {"type": "driver", "value": "cp210", "label": "CP210x serial driver"}, ], "known_bad": [ {"type": "name", "value": "kmbox", "label": "KMBOX device name"}, {"type": "name", "value": "pcileech", "label": "PCILeech device name"}, {"type": "name", "value": "leechcore", "label": "LeechCore device name"}, {"type": "name", "value": "captaindma", "label": "CaptainDMA device name"}, {"type": "net", "value": "192.168.2.188", "label": "KMBOX Net default IP"}, ], } def is_self_artifact_path(path) -> bool: try: name = Path(path).name.lower() except Exception: name = str(path or "").lower().rsplit("\\", 1)[-1] return name in SELF_ARTIFACT_FILENAMES def is_trusted_vendor_context(text: str) -> bool: lower = str(text or "").lower() return any(k in lower for k in TRUSTED_VENDOR_PATH_KEYWORDS) def is_trusted_vendor_file(path) -> bool: try: p = Path(path) name = p.name.lower() lower = str(p).lower() except Exception: name = str(path or "").lower() lower = name if name in TRUSTED_VENDOR_FILENAMES: return True if any(name.startswith(prefix) for prefix in TRUSTED_VENDOR_FILE_PREFIXES): return True if is_trusted_vendor_context(lower): return True return False def has_strong_dma_context(text: str) -> bool: lower = str(text or "").lower() return any(k in lower for k in DMA_STRONG_CONTEXT_KEYWORDS) def score_usb_kmbox_fingerprint(text: str): lower = str(text or "").lower() score = 0 reasons = [] status = "unknown" for fp in USB_KMBOX_FINGERPRINTS["known_bad"]: if fp["value"] in lower: score = max(score, 90) status = "known_bad" reasons.append(f"known_bad USB/KMBOX fingerprint: {fp['label']}") for fp in USB_KMBOX_FINGERPRINTS["suspicious"]: if fp["value"] in lower: score = max(score, 35) if status != "known_bad": status = "suspicious" reasons.append(f"suspicious USB/KMBOX fingerprint: {fp['label']}") known_good = any(fp["value"] in lower for fp in USB_KMBOX_FINGERPRINTS["known_good"]) if known_good and status != "known_bad": score = min(score, 10) status = "known_good" reasons.append("known_good HID/USB fingerprint lowered standalone hardware risk") return score, status, reasons def is_normal_oem_hwid_snapshot(snapshot: dict) -> bool: hay = " ".join(str(snapshot.get(k, "") or "") for k in [ "computer_manufacturer", "computer_model", "bios_manufacturer", "bios_version", "baseboard_manufacturer", "baseboard_product", "csproduct_vendor", "csproduct_name", ]).lower() return any(x in hay for x in NORMAL_OEM_MANUFACTURERS) def load_threat_names() -> dict: if not THREAT_NAMES_PATH.exists(): return {} try: with open(THREAT_NAMES_PATH, "r", encoding="utf-8") as f: data = json.load(f) return {str(k).lower(): v for k, v in data.items()} except Exception: return {} KNOWN_THREAT_NAMES = load_threat_names() def get_or_create_client_id() -> str: env_player_id = os.getenv("PLAYER_ID", "").strip() if env_player_id: return env_player_id try: if CLIENT_ID_PATH.exists(): existing = CLIENT_ID_PATH.read_text(encoding="utf-8").strip() if existing: return existing except Exception: pass raw = f"{datetime.now().isoformat()}-{os.urandom(16).hex()}" short_id = hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest()[:12].upper() client_id = f"ANON-{short_id}" try: CLIENT_ID_PATH.write_text(client_id, encoding="utf-8") except Exception: pass return client_id def match_known_threat(text: str): lower = str(text).lower() matches = [] for threat_name, meta in KNOWN_THREAT_NAMES.items(): if threat_name in lower: matches.append({ "name": threat_name, "risk": int(meta.get("risk", 80)), "category": meta.get("category", "known_threat"), "confidence": meta.get("confidence", "unknown"), "note": meta.get("note", "") }) return matches def run_cmd(cmd: str) -> str: try: p = subprocess.run( cmd, shell=True, capture_output=True, text=True, errors="ignore", timeout=45 ) return p.stdout.strip() except Exception as e: return f"[ERROR] {e}" def parse_version_parts(version: str): parts = [] for chunk in str(version or "").strip().split("."): try: parts.append(int(chunk)) except Exception: parts.append(0) return tuple(parts) def compare_versions(left: str, right: str) -> int: l_parts = list(parse_version_parts(left)) r_parts = list(parse_version_parts(right)) length = max(len(l_parts), len(r_parts), 1) l_parts.extend([0] * (length - len(l_parts))) r_parts.extend([0] * (length - len(r_parts))) for l_value, r_value in zip(l_parts, r_parts): if l_value < r_value: return -1 if l_value > r_value: return 1 return 0 def get_server_status(): try: status_url = SERVER_URL.rsplit("/api/scan", 1)[0] + "/api/status" response = requests.get(status_url, timeout=10) response.raise_for_status() return response.json() except Exception: return {} def ps_json(command: str): out = run_cmd( 'powershell -NoProfile -ExecutionPolicy Bypass -Command ' f'"{command} | ConvertTo-Json -Depth 8"' ) if not out: return [] try: data = json.loads(out) if data is None: return [] if isinstance(data, dict): return [data] if isinstance(data, list): return data return [] except Exception: return [{"raw_output": out[:3000], "parse_error": True}] def try_parse_json(raw: str): try: return json.loads(raw) except Exception: return {} def sha256_file(path: Path, max_bytes: int = MAX_HASH_FILE_BYTES): try: size = path.stat().st_size if size > max_bytes: return None h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() except Exception: return None def escape_ps_single_quoted(value: str) -> str: return str(value or "").replace("'", "''") def extract_windows_file_path(value: str): text = str(value or "").strip().strip('"') if not text: return None if text.lower().startswith("\\systemroot\\"): text = str(Path(os.environ.get("SystemRoot", r"C:\Windows"))) + text[len("\\systemroot"):] if text.lower().startswith("system32\\"): text = str(Path(os.environ.get("SystemRoot", r"C:\Windows")) / text) match = re.search(r"([a-zA-Z]:\\[^\"']+?\.(?:exe|dll|sys))", text, re.IGNORECASE) if match: return Path(match.group(1)) if text.lower().endswith((".exe", ".dll", ".sys")): return Path(text) return None def get_authenticode_signature(path: Path) -> dict: if path.suffix.lower() not in {".exe", ".dll", ".sys"}: return {} literal = escape_ps_single_quoted(str(path)) items = ps_json( f"$sig = Get-AuthenticodeSignature -LiteralPath '{literal}' -ErrorAction SilentlyContinue; " "[pscustomobject]@{ " "status=[string]$sig.Status; " "signer_subject=[string]$sig.SignerCertificate.Subject; " "issuer=[string]$sig.SignerCertificate.Issuer " "}" ) if not items or items[0].get("parse_error"): return {} sig = items[0] signer = str(sig.get("signer_subject", "") or "") status = str(sig.get("status", "") or "") lower = (signer + " " + status).lower() sig["trusted_signer"] = status.lower() == "valid" and any(k in lower for k in TRUSTED_SIGNER_KEYWORDS) return sig def signature_risk_adjustment(path: Path, signature: dict): if path.suffix.lower() not in {".exe", ".dll", ".sys"}: return 0, [] if not signature: return 10, ["digital signature could not be read"] status = str(signature.get("status", "") or "").lower() trusted = bool(signature.get("trusted_signer")) reasons = [] adjustment = 0 if trusted: adjustment -= 20 reasons.append("valid trusted digital signature") elif status in {"notsigned", "hashmismatch", "nottrusted", "unknownerror"}: adjustment += 20 reasons.append(f"digital signature status is {status}") elif status and status != "valid": adjustment += 10 reasons.append(f"digital signature status is {status}") return adjustment, reasons def get_yara_rules(): global _YARA_RULES if yara is None: return None if _YARA_RULES is not None: return _YARA_RULES try: source = get_external_yara_source() or YARA_RULE_SOURCE _YARA_RULES = yara.compile(source=source) except Exception: try: _YARA_RULES = yara.compile(source=YARA_RULE_SOURCE) except Exception: _YARA_RULES = None return _YARA_RULES def scan_file_with_yara(path: Path, max_bytes: int = 8 * 1024 * 1024): if path.suffix.lower() not in {".exe", ".dll", ".sys"}: return 0, [], "not_applicable" try: if path.stat().st_size > MAX_YARA_FILE_BYTES: return 0, [], "skipped_large_file_over_50mb" except Exception: return 0, [], "stat_failed" rules = get_yara_rules() if rules is not None: try: matches = [str(m) for m in rules.match(str(path))] if matches: return 90, matches, "yara-python" return 0, [], "yara-python" except Exception: pass try: data = path.read_bytes()[:max_bytes].lower() except Exception: return 0, [], "read_failed" text = data.decode("latin-1", errors="ignore") matched = [] score = 0 for name, keywords, minimum, risk in YARA_FALLBACK_RULES: hits = [kw for kw in keywords if kw in text] if len(hits) >= minimum: matched.append({"rule": name, "hits": hits[:6]}) score = max(score, risk) return score, matched, "builtin-fallback" def read_text_snippet(path: Path, max_chars: int = 4000) -> str: try: if path.suffix.lower() not in {".ps1", ".bat", ".cmd", ".vbs", ".js"}: return "" return path.read_text(encoding="utf-8", errors="ignore")[:max_chars] except Exception: return "" def normalize_path(path: Path) -> str: text = str(path) home = str(Path.home()) if text.lower().startswith(home.lower()): return text.replace(home, "%USER%") return text def contains_any_keyword(text: str, keywords) -> bool: return any(contains_keyword(text, kw) for kw in keywords) def is_dangerous_whitelist_exception(text: str) -> bool: return contains_any_keyword(str(text or ""), DANGEROUS_KEYWORDS_NEVER_FULLY_WHITELIST) def is_normal_directory_path(path: Path) -> bool: lower = str(path).lower() return any(k in lower for k in NORMAL_DIRECTORY_WHITELIST_KEYWORDS) def is_ai_scan_excluded_path(path: Path) -> bool: lower = str(path).lower() return any(k in lower for k in AI_SCAN_PATH_EXCLUDE_KEYWORDS) def is_normal_filename(path: Path) -> bool: return path.name.lower() in NORMAL_FILE_NAME_WHITELIST def is_normal_folder_name(path: Path) -> bool: folder_parts = [part.lower() for part in path.parts] return any(part in NORMAL_FOLDER_NAME_WHITELIST for part in folder_parts) def should_soft_whitelist_path(path: Path) -> bool: if is_dangerous_whitelist_exception(str(path)): return False return is_normal_directory_path(path) or is_normal_filename(path) or is_normal_folder_name(path) def is_prefetch_whitelisted(exe_name: str) -> bool: exe_name = str(exe_name or "").strip().lower() if exe_name == "update.exe": return False return exe_name in PREFETCH_EXE_WHITELIST def is_normal_hid_context(text: str) -> bool: lower = str(text or "").lower() has_vid = any(vid in lower for vid in NORMAL_HID_VID_WHITELIST) has_name = any(name_kw in lower for name_kw in NORMAL_HID_NAME_KEYWORDS) return has_vid or has_name def has_kmbox_network_indicator(text: str) -> bool: lower = str(text or "").lower() return "192.168.2.188" in lower or "kmbox net" in lower or "kmbox nvideo" in lower def is_normal_hid_device(instance_id: str, name: str) -> bool: return is_normal_hid_context(" ".join([str(instance_id or ""), str(name or "")])) def is_known_good_path(path: Path) -> bool: lower = str(path).lower() name = path.name.lower() if is_self_artifact_path(path): return True if is_trusted_vendor_file(path): return True for good in KNOWN_GOOD_PATH_PARTS: if good in lower: return True for good_name in KNOWN_GOOD_FILENAMES: if good_name in name: return True if should_soft_whitelist_path(path): return True return False def contains_keyword(text: str, keyword: str) -> bool: text = str(text).lower() keyword = str(keyword).lower() if len(keyword) <= 4: pattern = r"(^|[^a-z0-9])" + re.escape(keyword) + r"([^a-z0-9]|$)" return re.search(pattern, text) is not None return keyword in text def score_injector_loader_text(text: str): lower = str(text or "").lower() score = 0 reasons = [] for phrase, weight, label in INJECTOR_INDICATORS: if contains_keyword(lower, phrase): score += weight reasons.append(label + f": {phrase}") return min(score, 100), reasons def decode_monitor_field(value) -> str: if isinstance(value, list): chars = [] for item in value: try: code = int(item) except Exception: continue if code <= 0: continue chars.append(chr(code)) return "".join(chars).strip() return str(value or "").strip() def score_display_fuser_text(text: str): lower = str(text or "").lower() score = 0 reasons = [] for phrase, weight, label in DISPLAY_FUSER_INDICATORS: if contains_keyword(lower, phrase): score += weight reasons.append(label + f": {phrase}") return min(score, 100), reasons def score_ai_aim_text(text: str): lower = str(text or "").lower() score = 0 reasons = [] for phrase, weight, label in AI_AIM_INDICATORS: if contains_keyword(lower, phrase): score += weight reasons.append(label + f": {phrase}") return min(score, 100), reasons def score_cheat_documentation_text(text: str): lower = str(text or "").lower() indicators = [ ("cs2", 15, "game-targeting indicator"), ("cheat file", 30, "explicit cheat-package instruction"), ("run as admin", 20, "privileged cheat-launch instruction"), ("cheat loader", 35, "explicit cheat-loader instruction"), ("wait for the cheat to inject", 35, "explicit cheat injection instruction"), ("press insert to open the menu", 35, "explicit cheat-menu instruction"), ("insert to open the menu", 30, "cheat-menu hotkey instruction"), ("inject", 20, "cheat injection wording"), ("loader", 20, "loader wording"), ("aimbot", 25, "explicit aimbot wording"), ] score = 0 reasons = [] hits = 0 for phrase, weight, label in indicators: if contains_keyword(lower, phrase): score += weight hits += 1 reasons.append(label + f": {phrase}") explicit_loader = contains_any_keyword( lower, ["cheat loader", "wait for the cheat to inject", "press insert to open the menu"], ) if explicit_loader and contains_keyword(lower, "aimbot"): score += 20 reasons.append("loader/injection instructions paired with aimbot wording") if hits < 3 and not explicit_loader: return 0, [] return min(score, 100), reasons def keyword_score(path: Path): name = path.name.lower() stem = path.stem.lower() reasons = [] score = 0 soft_whitelisted = should_soft_whitelist_path(path) dangerous = is_dangerous_whitelist_exception(str(path)) for k in HIGH_KEYWORDS: if contains_keyword(name, k): score += 70 reasons.append(f"high keyword in filename: {k}") for k in MEDIUM_KEYWORDS: if soft_whitelisted and not dangerous and k in {"loader", "hook", "overlay", "internal"}: continue if contains_keyword(stem, k): score += 30 reasons.append(f"medium keyword in filename: {k}") if soft_whitelisted and not dangerous and score > 0: score = max(score - 35, 0) reasons.append("normal software/path whitelist reduced filename keyword risk") return min(score, 100), reasons def get_path_risk(path: Path): lower = str(path).lower() reasons = [] score = 0 soft_whitelisted = should_soft_whitelist_path(path) dangerous = is_dangerous_whitelist_exception(lower) if r"\appdata\local\temp" in lower or r"\windows\temp" in lower: score += 15 reasons.append("file in temp path") if r"\downloads" in lower: score += 10 reasons.append("file in downloads") if r"\desktop" in lower: score += 5 reasons.append("file on desktop") if path.suffix.lower() == ".sys": if path.name.lower().startswith("cpuz"): score += 10 reasons.append("CPU-Z style temporary driver") else: score += 30 reasons.append("kernel driver file") if soft_whitelisted and not dangerous and score > 0: score = max(score - 20, 0) reasons.append("normal software/path whitelist reduced path-based risk") return score, reasons def looks_random_exe_name(path: Path) -> bool: stem = path.stem.lower() if path.suffix.lower() != ".exe": return False if len(stem) < 6 or len(stem) > 18: return False if not re.fullmatch(r"[a-z0-9_-]+", stem): return False digit_count = sum(1 for ch in stem if ch.isdigit()) alpha_count = sum(1 for ch in stem if ch.isalpha()) vowel_count = sum(1 for ch in stem if ch in "aeiou") if digit_count >= 3 and alpha_count >= 3: return True if "_" not in stem and "-" not in stem and vowel_count <= 1 and alpha_count >= 6: return True return False def self_delete_exe_risk(path: Path, created: datetime, modified: datetime, size: int): lower = str(path).lower() reasons = [] score = 0 if path.suffix.lower() != ".exe": return 0, reasons age_hours = max((datetime.now() - max(created, modified)).total_seconds() / 3600, 0) if looks_random_exe_name(path): score += 30 reasons.append("random-looking executable filename") if age_hours <= 24: score += 15 reasons.append("executable was created or modified within the last 24 hours") elif age_hours <= 72: score += 8 reasons.append("executable was created or modified within the last 72 hours") if abs((modified - created).total_seconds()) <= 180: score += 10 reasons.append("creation and modification times are nearly identical") if size and size <= 3 * 1024 * 1024: score += 10 reasons.append("small executable size often seen in droppers/loaders") if any(part in lower for part in [r"\appdata\local\temp", r"\windows\temp", r"\downloads", r"\desktop"]): score += 15 reasons.append("executable is located in a disposable or user-drop path") return min(score, 55), reasons def normalize_hwid_value(value): if value is None: return "" text = str(value).strip() if text.lower() in ["null", "none", "unknown"]: return "" return text def get_hwid_snapshot() -> dict: computer_raw = run_cmd( 'powershell -NoProfile -Command ' '"Get-CimInstance Win32_ComputerSystem | ' 'Select-Object Manufacturer, Model, Name, TotalPhysicalMemory | ConvertTo-Json"' ) bios_raw = run_cmd( 'powershell -NoProfile -Command ' '"Get-CimInstance Win32_BIOS | ' 'Select-Object Manufacturer, SerialNumber, SMBIOSBIOSVersion | ConvertTo-Json"' ) baseboard_raw = run_cmd( 'powershell -NoProfile -Command ' '"Get-CimInstance Win32_BaseBoard | ' 'Select-Object Manufacturer, Product, SerialNumber | ConvertTo-Json"' ) csproduct_raw = run_cmd( 'powershell -NoProfile -Command ' '"Get-CimInstance Win32_ComputerSystemProduct | ' 'Select-Object Vendor, Name, IdentifyingNumber, UUID | ConvertTo-Json"' ) computer = try_parse_json(computer_raw) bios = try_parse_json(bios_raw) baseboard = try_parse_json(baseboard_raw) csproduct = try_parse_json(csproduct_raw) return { "computer_manufacturer": normalize_hwid_value(computer.get("Manufacturer")), "computer_model": normalize_hwid_value(computer.get("Model")), "computer_name": normalize_hwid_value(computer.get("Name")), "bios_manufacturer": normalize_hwid_value(bios.get("Manufacturer")), "bios_serial": normalize_hwid_value(bios.get("SerialNumber")), "bios_version": normalize_hwid_value(bios.get("SMBIOSBIOSVersion")), "baseboard_manufacturer": normalize_hwid_value(baseboard.get("Manufacturer")), "baseboard_product": normalize_hwid_value(baseboard.get("Product")), "baseboard_serial": normalize_hwid_value(baseboard.get("SerialNumber")), "csproduct_vendor": normalize_hwid_value(csproduct.get("Vendor")), "csproduct_name": normalize_hwid_value(csproduct.get("Name")), "csproduct_identifying_number": normalize_hwid_value(csproduct.get("IdentifyingNumber")), "uuid": normalize_hwid_value(csproduct.get("UUID")) } def get_system_info() -> dict: os_info = run_cmd( 'powershell -NoProfile -Command ' '"Get-CimInstance Win32_OperatingSystem | ' 'Select-Object Caption, Version, BuildNumber, InstallDate, LastBootUpTime | ConvertTo-Json"' ) info = { "os_info_raw": os_info, "hwid_snapshot": get_hwid_snapshot(), "pre_scan_environment": get_pre_scan_environment_snapshot(), "scanner_integrity": get_scanner_integrity_status(), } return info def looks_random_hwid_string(value: str) -> bool: """Conservative HWID random detector. OEM serials and UUIDs naturally look random. Treat only obvious placeholder, repeated, or malformed values as suspicious. Do not flag normal ASUS/ROG/AMI style serials such as R3NRKD01356212A. """ if not value: return False text = str(value).strip() lower = text.lower() if len(text) < 8: return False known_real_words = [ "asus", "asustek", "american megatrends", "ami", "gigabyte", "msi", "micro-star", "lenovo", "dell", "hp", "hewlett", "acer", "intel", "amd", "microsoft", "system product name", "prime", "rog", "strix", "tuf", "b760", "z790", "x670", "b650" ] if any(word in lower for word in known_real_words): return False placeholder_bits = [ "to be filled", "default string", "system serial", "none", "unknown", "not specified", "not available", "o.e.m", "00000000", "ffffffff", "123456789" ] if any(x in lower for x in placeholder_bits): return True compact = re.sub(r"[^a-zA-Z0-9]", "", text) if not compact: return False # Repeated fake-looking values, e.g. AAAAAAAA, 11111111, ABCDEFABCDEFABCDEF. if len(set(compact.lower())) <= 2 and len(compact) >= 10: return True # Pure uppercase long random manufacturer strings are suspicious, but normal # serial/UUID values are not enough by themselves. letters = sum(c.isalpha() for c in compact) digits = sum(c.isdigit() for c in compact) if text.isupper() and len(compact) >= 20 and letters >= 16 and digits == 0: return True return False def score_hwid_snapshot_suspicion(snapshot: dict): score = 0 reasons = [] default_values = [ "to be filled by o.e.m", "default string", "system serial number", "none", "unknown", "not specified", "not available", "00000000", "ffffffff", "123456789" ] important_fields = [ "computer_manufacturer", "computer_model", "bios_manufacturer", "bios_serial", "bios_version", "baseboard_product", "baseboard_serial", "csproduct_vendor", "csproduct_name", "csproduct_identifying_number", "uuid" ] random_fields = [] default_fields = [] for field in important_fields: value = str(snapshot.get(field, "") or "").strip() lower = value.lower() if not value: continue for bad in default_values: if bad in lower: default_fields.append(field) score += 15 reasons.append(f"default/abnormal HWID value in {field}: {value}") break if looks_random_hwid_string(value): random_fields.append(field) score += 20 reasons.append(f"random-looking HWID value in {field}: {value}") manufacturer = str(snapshot.get("computer_manufacturer", "")).strip() bios_manufacturer = str(snapshot.get("bios_manufacturer", "")).strip() baseboard_manufacturer = str(snapshot.get("baseboard_manufacturer", "")).strip() if looks_random_hwid_string(manufacturer): score += 25 reasons.append(f"computer manufacturer looks random: {manufacturer}") if looks_random_hwid_string(bios_manufacturer): score += 25 reasons.append(f"BIOS manufacturer looks random: {bios_manufacturer}") if looks_random_hwid_string(baseboard_manufacturer): score += 15 reasons.append(f"baseboard manufacturer looks random: {baseboard_manufacturer}") if len(random_fields) >= 3: score += 35 reasons.append(f"multiple random-looking SMBIOS fields: {', '.join(random_fields)}") if len(random_fields) >= 5: score += 25 reasons.append("many SMBIOS/DMI fields look randomized") return min(score, 100), reasons, random_fields, default_fields def scan_prefetch() -> list: findings = [] prefetch = Path(r"C:\Windows\Prefetch") if not prefetch.exists(): return findings seen = set() now = datetime.now() for file in prefetch.glob("*.pf"): name = file.name.lower() exe_name = file.stem.rsplit("-", 1)[0].strip().lower() if not exe_name or file.name in seen: continue if is_prefetch_whitelisted(exe_name) or exe_name in PREFETCH_FALSE_POSITIVE_EXES: continue matched_keywords = [] risk = 0 for k in HIGH_KEYWORDS: compact_k = k.replace(" ", "").lower() if compact_k and compact_k in exe_name: matched_keywords.append(k) risk += 70 for k in MEDIUM_KEYWORDS: compact_k = k.replace(" ", "").lower() if compact_k and compact_k in exe_name: matched_keywords.append(k) risk += 25 known_matches = match_known_threat(exe_name) for match in known_matches: risk += int(match.get("risk", 80) or 80) if risk < 25: continue seen.add(file.name) try: modified_at = datetime.fromtimestamp(file.stat().st_mtime) age_days = max((now - modified_at).days, 0) reasons = [] if matched_keywords: reasons.append("matched suspicious executable keywords: " + ", ".join(matched_keywords[:5])) if known_matches: reasons.append( "known threat name in historical execution trace: " + ", ".join([str(x.get("name", "")) for x in known_matches[:3] if x.get("name")]) ) if age_days <= 3: risk += 20 reasons.append("prefetch entry modified within the last 3 days") elif age_days <= 14: risk += 10 reasons.append("prefetch entry modified within the last 14 days") risk = min(risk, 95) findings.append({ "category": "prefetch", "risk": risk, "title": "Suspicious historical execution trace", "detail": exe_name, "evidence": { "path_type": r"C:\Windows\Prefetch", "filename": file.name, "prefetch_exe_name": exe_name, "modified": modified_at.isoformat(), "age_days": age_days, "known_threat_matches": known_matches, "reasons": reasons } }) except Exception: pass return findings def scan_suspicious_directories(days: int = 90) -> list: findings = [] user = Path.home() targets = [ user / "Desktop", user / "Downloads", user / "Documents", user / "AppData" / "Local", user / "AppData" / "Roaming" ] since = datetime.now() - timedelta(days=days) seen_dirs = set() blocked_roots = set() for base in targets: if not base.exists(): continue files_seen_in_root = 0 for root, dirs, files in os.walk(base): if scan_deadline_exceeded(scan_start_ts) or total_candidates >= MAX_RECENT_FILE_CANDIDATES or files_seen_in_root >= MAX_FILES_PER_ROOT: dirs[:] = [] break root_path = Path(root) root_lower = str(root_path).lower() if any(root_lower.startswith(blocked) for blocked in blocked_roots): dirs[:] = [] continue if is_known_good_path(root_path): dirs[:] = [] continue try: depth = len(root_path.relative_to(base).parts) if depth > 3: dirs[:] = [] continue except Exception: pass for dirname in list(dirs): dir_path = root_path / dirname lower_name = dirname.lower() lower_path = str(dir_path).lower() if is_known_good_path(dir_path): continue risk = 0 reasons = [] known_matches = match_known_threat(lower_name + " " + lower_path) for m in known_matches: risk += m["risk"] reasons.append(f"known threat name: {m['name']} ({m['category']}, {m['confidence']})") for k in HIGH_KEYWORDS: if contains_keyword(lower_name, k) or contains_keyword(lower_path, k): risk += 70 reasons.append(f"high keyword in folder name/path: {k}") for k in MEDIUM_KEYWORDS: if contains_keyword(lower_name, k): risk += 25 reasons.append(f"medium keyword in folder name: {k}") if risk < 25: continue try: stat = dir_path.stat() created = datetime.fromtimestamp(stat.st_ctime) modified = datetime.fromtimestamp(stat.st_mtime) if created < since and modified < since: continue key = str(dir_path).lower() if key in seen_dirs: continue seen_dirs.add(key) sample_files = [] try: for child in dir_path.iterdir(): if child.is_file() and child.suffix.lower() in SCAN_EXTENSIONS: sample_files.append(child.name) if len(sample_files) >= 10: break except Exception: pass final_risk = min(risk, 100) findings.append({ "category": "suspicious_directory", "risk": final_risk, "title": "Suspicious folder found", "detail": dirname, "evidence": { "path_type": normalize_path(dir_path), "created": created.isoformat(), "modified": modified.isoformat(), "sample_files": sample_files, "known_threat_matches": known_matches, "reasons": reasons } }) if final_risk >= 70: blocked_roots.add(str(dir_path).lower()) except Exception: continue return findings def scan_suspicious_shortcuts(days: int = 90) -> list: findings = [] user = Path.home() targets = [ user / "Desktop", user / "Downloads", user / "AppData" / "Roaming" / "Microsoft" / "Windows" / "Start Menu", user / "AppData" / "Roaming" / "Microsoft" / "Windows" / "Start Menu" / "Programs" ] since = datetime.now() - timedelta(days=days) for base in targets: if not base.exists(): continue for root, dirs, files in os.walk(base): try: depth = len(Path(root).relative_to(base).parts) if depth > 3: dirs[:] = [] continue except Exception: pass for filename in files: path = Path(root) / filename if path.suffix.lower() != ".lnk": continue lower_name = path.name.lower() risk = 0 reasons = [] known_matches = match_known_threat(lower_name) il_score, il_reasons = score_injector_loader_text(lower_name) for m in known_matches: risk += m["risk"] reasons.append(f"known threat shortcut: {m['name']} ({m['category']}, {m['confidence']})") for k in HIGH_KEYWORDS: if contains_keyword(lower_name, k): risk += 70 reasons.append(f"high keyword in shortcut name: {k}") for k in MEDIUM_KEYWORDS: if contains_keyword(lower_name, k): risk += 25 reasons.append(f"medium keyword in shortcut name: {k}") risk += il_score reasons.extend(il_reasons) if risk < 25: continue try: stat = path.stat() created = datetime.fromtimestamp(stat.st_ctime) modified = datetime.fromtimestamp(stat.st_mtime) if created < since and modified < since: continue findings.append({ "category": "suspicious_shortcut", "risk": min(risk, 100), "title": "Suspicious shortcut found", "detail": path.name, "evidence": { "path_type": normalize_path(path), "created": created.isoformat(), "modified": modified.isoformat(), "known_threat_matches": known_matches, "reasons": reasons } }) except Exception: continue return findings def scan_recent_suspicious_files(days: int = 30, scan_start_ts: float = None) -> list: findings = [] scan_start_ts = scan_start_ts or time.monotonic() total_candidates = 0 user = Path.home() targets = [ user / "Downloads", user / "Desktop", user / "Documents", user / "AppData" / "Local", user / "AppData" / "Roaming", user / "AppData" / "Local" / "Temp", Path(r"C:\ProgramData"), Path(r"C:\Windows\Temp") ] since = datetime.now() - timedelta(days=days) seen_hashes = set() seen_driver_dirs = set() for base in targets: if not base.exists(): continue for root, dirs, files in os.walk(base): root_path = Path(root) if is_known_good_path(root_path): dirs[:] = [] continue try: depth = len(root_path.relative_to(base).parts) if depth > 4: dirs[:] = [] continue except Exception: pass for filename in files: path = root_path / filename if is_self_artifact_path(path): continue if path.suffix.lower() not in SCAN_EXTENSIONS: continue total_candidates += 1 files_seen_in_root += 1 if scan_deadline_exceeded(scan_start_ts) or total_candidates >= MAX_RECENT_FILE_CANDIDATES or files_seen_in_root >= MAX_FILES_PER_ROOT: break if is_known_good_path(path): continue try: stat = path.stat() created = datetime.fromtimestamp(stat.st_ctime) modified = datetime.fromtimestamp(stat.st_mtime) if created < since and modified < since: continue if scan_deadline_exceeded(scan_start_ts): break large_file_notes = [] if stat.st_size > MAX_YARA_FILE_BYTES: large_file_notes.append("file larger than 50MB: YARA/deep content scan skipped, metadata/path/signature still evaluated") if stat.st_size > MAX_HASH_FILE_BYTES: large_file_notes.append("file larger than hash cap: SHA256 skipped to keep scan responsive") k_score, k_reasons = keyword_score(path) p_score, p_reasons = get_path_risk(path) sd_score, sd_reasons = self_delete_exe_risk(path, created, modified, stat.st_size) script_snippet = read_text_snippet(path) if stat.st_size <= MAX_YARA_FILE_BYTES else "" il_score, il_reasons = score_injector_loader_text( " ".join([str(path), path.name, script_snippet]) ) yara_score, yara_matches, yara_engine = scan_file_with_yara(path) signature = get_authenticode_signature(path) sig_adjust, sig_reasons = signature_risk_adjustment(path, signature) known_matches = match_known_threat(str(path)) for m in known_matches: k_score += m["risk"] k_reasons.append( f"known threat name in path: {m['name']} ({m['category']}, {m['confidence']})" ) k_score = min(k_score, 100) if yara_score: k_score = max(k_score, yara_score) k_reasons.append(f"YARA rule match via {yara_engine}: {yara_matches}") if k_score == 0 and yara_score == 0 and path.suffix.lower() != ".sys": continue risk = max(0, min(k_score + p_score + sd_score + il_score + sig_adjust, 100)) sig_reasons_for_evidence = sig_reasons if sig_adjust < 0: risk = max(0, risk) if risk < 30: continue if path.suffix.lower() == ".sys": driver_dir = str(path.parent).lower() if driver_dir in seen_driver_dirs and not known_matches: continue seen_driver_dirs.add(driver_dir) sha256 = sha256_file(path) if not sha256 and stat.st_size > MAX_HASH_FILE_BYTES: k_reasons.append("SHA256 skipped because file is larger than scan hash cap") if sha256 and sha256 in seen_hashes: continue if sha256: seen_hashes.add(sha256) findings.append({ "category": "suspicious_file", "risk": risk, "title": "Suspicious file found", "detail": path.name, "evidence": { "path_type": normalize_path(path), "filename": path.name, "extension": path.suffix.lower(), "size": stat.st_size, "created": created.isoformat(), "modified": modified.isoformat(), "sha256": sha256, "signature": signature, "yara": { "engine": yara_engine, "matches": yara_matches, "risk": yara_score }, "known_threat_matches": known_matches, "random_name_suspected": looks_random_exe_name(path), "script_snippet": script_snippet[:800] if script_snippet else "", "reasons": k_reasons + p_reasons + sd_reasons + il_reasons + sig_reasons_for_evidence + large_file_notes } }) except Exception: continue return findings def scan_startup_items() -> list: findings = [] items = ps_json( "Get-CimInstance Win32_StartupCommand | " "Select-Object Name, Command, Location, User" ) for item in items: if item.get("parse_error"): continue name = str(item.get("Name", "")).lower() cmd = str(item.get("Command", "")).lower() il_score, il_reasons = score_injector_loader_text(name + " " + cmd) risk = 0 reasons = [] known_matches = match_known_threat(name + " " + cmd) for m in known_matches: risk += m["risk"] reasons.append(f"known threat startup: {m['name']} ({m['category']}, {m['confidence']})") if name in ["processhost", "svhost", "systemhost", "updatehost"]: risk += 35 reasons.append("generic suspicious startup name") for k in HIGH_KEYWORDS: if contains_keyword(name, k) or contains_keyword(cmd, k): risk += 70 reasons.append(f"high keyword in startup: {k}") if "\\appdata\\" in cmd or "\\temp\\" in cmd: risk += 20 reasons.append("startup from AppData/Temp") risk += il_score reasons.extend(il_reasons) if risk >= 35: findings.append({ "category": "startup", "risk": min(risk, 100), "title": "Suspicious startup item", "detail": item.get("Name", "unknown"), "evidence": { "item": item, "known_threat_matches": known_matches, "reasons": reasons } }) return findings def scan_scheduled_tasks() -> list: findings = [] output = run_cmd("schtasks /query /fo LIST /v") blocks = output.split("\n\n") for block in blocks: lower = block.lower() risk = 0 reasons = [] known_matches = match_known_threat(lower) il_score, il_reasons = score_injector_loader_text(lower) for m in known_matches: risk += m["risk"] reasons.append(f"known threat scheduled task: {m['name']} ({m['category']}, {m['confidence']})") for k in HIGH_KEYWORDS: if contains_keyword(lower, k): risk += 70 reasons.append(f"high keyword in scheduled task: {k}") if "\\appdata\\" in lower or "\\temp\\" in lower: if any(contains_keyword(lower, k) for k in HIGH_KEYWORDS + ["loader", "mapper", "spoofer"]): risk += 30 reasons.append("scheduled task from AppData/Temp") risk += il_score reasons.extend(il_reasons) if risk >= 50: findings.append({ "category": "scheduled_tasks", "risk": min(risk, 100), "title": "Suspicious scheduled task", "detail": "Scheduled task contains high-risk indicator", "evidence": { "raw_excerpt": block[:2000], "known_threat_matches": known_matches, "reasons": reasons } }) return findings def scan_kmbox_network() -> list: findings = [] known_kmbox_ips = ["192.168.2.188"] checks = [] checks.append({"source": "arp_table", "raw": run_cmd("arp -a")}) checks.append({"source": "ipconfig", "raw": run_cmd("ipconfig /all")}) netip_output = run_cmd( 'powershell -NoProfile -Command ' '"Get-NetIPAddress -AddressFamily IPv4 | ' 'Select-Object IPAddress, InterfaceAlias, PrefixLength | ConvertTo-Json -Depth 4"' ) checks.append({"source": "get_net_ipaddress", "raw": netip_output}) neighbor_output = run_cmd( 'powershell -NoProfile -Command ' '"Get-NetNeighbor -AddressFamily IPv4 -ErrorAction SilentlyContinue | ' 'Select-Object IPAddress, LinkLayerAddress, State, InterfaceAlias | ConvertTo-Json -Depth 4"' ) checks.append({"source": "get_netneighbor", "raw": neighbor_output}) adapter_output = run_cmd( 'powershell -NoProfile -Command ' '"Get-NetAdapter | ' 'Select-Object Name, InterfaceDescription, Status, MacAddress, LinkSpeed | ConvertTo-Json -Depth 4"' ) checks.append({"source": "get_netadapter", "raw": adapter_output}) for ip in known_kmbox_ips: matched_sources = [] for c in checks: if ip in c["raw"]: matched_sources.append(c["source"]) if matched_sources: findings.append({ "category": "kmbox_network", "risk": 95, "title": "Known KMBOX Net IP indicator", "detail": ip, "evidence": { "indicator": ip, "matched_sources": matched_sources, "known_threat_matches": [ { "name": ip, "risk": 95, "category": "kmbox_net_ip", "confidence": "confirmed", "note": "Known KMBOX Net default/common IP indicator" } ], "reasons": [f"known KMBOX Net IP observed: {ip}"], "note": "Strong KMBOX Net network indicator. Review network context manually." } }) combined = "\n".join(c["raw"] for c in checks).lower() if "192.168.2." in combined and "192.168.2.188" not in combined: findings.append({ "category": "kmbox_network", "risk": 35, "title": "KMBOX Net subnet indicator", "detail": "192.168.2.x subnet observed", "evidence": { "indicator": "192.168.2.x", "matched_sources": ["network_config_or_neighbor_table"], "known_threat_matches": [], "reasons": ["network evidence contains 192.168.2.x subnet"], "note": "192.168.2.x alone is not proof of KMBOX; many routers/devices can use this subnet." } }) return findings def scan_dma_kmbox_devices() -> list: findings = [] high_indicators = [ "kmbox", "km box", "kmbox net", "kmbox nb", "kmbox nvideo", "pcileech", "screamer", "captaindma", "leechcore", "memprocfs", "usb3380", "ft601", "ft600", "xilinx", "altera", "lattice" ] medium_indicators = [ "ftdi", "ch340", "ch341", "cp210", "cp210x", "usb serial", "serial converter", "usb-uart", "usb uart", "hid-compliant vendor-defined", "vendor-defined", "usb composite device", "usb input device", "hid keyboard device", "hid-compliant mouse" ] commands = [ ( "pnp_signed_drivers", "Get-CimInstance Win32_PnPSignedDriver | " "Select-Object DeviceName, Manufacturer, DriverProviderName, DriverVersion, InfName, DeviceID" ), ( "pnp_entities", "Get-CimInstance Win32_PnPEntity | " "Select-Object Name, DeviceID, Manufacturer, PNPClass, Service, Status" ), ( "usb_devices", "Get-CimInstance Win32_USBControllerDevice | " "ForEach-Object { [WMI]$_.Dependent } | " "Select-Object Name, DeviceID, Manufacturer, PNPClass, Service" ), ( "serial_ports", "Get-CimInstance Win32_SerialPort | " "Select-Object Name, DeviceID, PNPDeviceID, ProviderType, Status" ) ] seen = set() for source_name, ps_command in commands: items = ps_json(ps_command) for item in items: if item.get("parse_error"): continue raw = json.dumps(item, ensure_ascii=False) lower = raw.lower() risk = 0 reasons = [] known_matches = match_known_threat(lower) for m in known_matches: risk += m["risk"] reasons.append( f"known threat device indicator: {m['name']} ({m['category']}, {m['confidence']})" ) for k in high_indicators: if k in lower: risk += 85 reasons.append(f"high DMA/KMBOX indicator: {k}") for k in medium_indicators: if k in lower: risk += 20 reasons.append(f"medium HID/serial device indicator: {k}") fp_score, fp_status, fp_reasons = score_usb_kmbox_fingerprint(lower) risk += fp_score reasons.extend(fp_reasons) dangerous = is_dangerous_whitelist_exception(lower) has_high_indicator = any(k in lower for k in high_indicators) normal_hid = is_normal_hid_device( item.get("DeviceID") or item.get("PNPDeviceID") or "", item.get("Name") or "", ) network_indicator = has_kmbox_network_indicator(lower) if normal_hid and fp_status == "known_good" and not dangerous and not known_matches and not has_high_indicator and not network_indicator: continue if normal_hid and fp_status != "known_bad" and not dangerous and not known_matches and network_indicator: risk = min(risk, 10) reasons.append("normal HID/USB device kept only as low-risk network-context evidence") if risk < 35: continue device_id = ( item.get("DeviceID") or item.get("PNPDeviceID") or item.get("Name") or item.get("DeviceName") or raw[:120] ) key = f"{source_name}:{device_id}".lower() if key in seen: continue seen.add(key) findings.append({ "category": "dma_kmbox_device", "risk": min(risk, 100), "title": "DMA/KMBOX related device indicator", "detail": item.get("Name") or item.get("DeviceName") or "unknown device", "evidence": { "source": source_name, "device": item, "known_threat_matches": known_matches, "reasons": reasons, "note": "USB HID / Serial indicators can be normal hardware. Treat as strong only when known KMBOX/DMA names or suspicious context appear." } }) return findings def scan_device_install_history(max_lines: int = 3000) -> list: findings = [] log_path = Path(r"C:\Windows\INF\setupapi.dev.log") if not log_path.exists(): return findings high_indicators = [ "kmbox", "km box", "kmbox net", "kmbox nb", "kmbox nvideo", "pcileech", "screamer", "captaindma", "leechcore", "memprocfs", "usb3380", "ft601", "ft600", "xilinx", "altera", "lattice", "192.168.2.188" ] medium_indicators = [ "ch340", "ch341", "cp210", "cp210x", "ftdi", "usb serial", "serial converter", "vendor-defined", "hid-compliant vendor-defined" ] try: with open(log_path, "r", encoding="utf-16", errors="ignore") as f: lines = f.readlines() except Exception: try: with open(log_path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() except Exception: return findings tail = lines[-max_lines:] text = "".join(tail) lower = text.lower() reasons = [] risk = 0 known_matches = match_known_threat(lower) for m in known_matches: risk += m["risk"] reasons.append( f"known threat install history: {m['name']} ({m['category']}, {m['confidence']})" ) for k in high_indicators: if k in lower: risk += 80 reasons.append(f"high DMA/KMBOX install-history indicator: {k}") for k in medium_indicators: if k in lower: risk += 20 reasons.append(f"medium install-history indicator: {k}") dangerous = is_dangerous_whitelist_exception(lower) has_high_indicator = any(k in lower for k in high_indicators) normal_hid = is_normal_hid_context(lower) network_indicator = has_kmbox_network_indicator(lower) if normal_hid and not dangerous and not known_matches and not has_high_indicator and not network_indicator: return findings if normal_hid and not dangerous and not known_matches and network_indicator: risk = min(risk, 15) reasons.append("normal HID/USB install-history kept only as low-risk KMBOX network combination evidence") if risk < 35: return findings matched_lines = [] timeline_events = [] current_log_time = "" for line in tail: l = line.lower() time_match = re.search(r"(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?)", line) if time_match: current_log_time = time_match.group(1).replace("/", "-") if any(k in l for k in high_indicators + medium_indicators): matched_lines.append(line.strip()) if current_log_time: timeline_events.append({ "time": current_log_time, "event": "device_install_history", "detail": line.strip()[:240] }) if len(matched_lines) >= 50: break findings.append({ "category": "device_install_history", "risk": min(risk, 100), "title": "DMA/KMBOX related device install history", "detail": "SetupAPI device installation log contains related indicators", "evidence": { "path_type": r"C:\Windows\INF\setupapi.dev.log", "known_threat_matches": known_matches, "matched_lines": matched_lines, "timeline_events": timeline_events[:20], "reasons": reasons, "note": "SetupAPI history proves device/driver installation traces, not cheating by itself." } }) return findings def get_present_hardware_snapshot() -> list: devices = [] pnp_items = ps_json( "Get-PnpDevice -PresentOnly | " "Select-Object Class, FriendlyName, InstanceId, Status, Service" ) for item in pnp_items: if item.get("parse_error"): continue instance_id = str(item.get("InstanceId", "") or "") friendly_name = str(item.get("FriendlyName", "") or "") device_class = str(item.get("Class", "") or "") status = str(item.get("Status", "") or "") service = str(item.get("Service", "") or "") if not instance_id and not friendly_name: continue devices.append({ "source": "Get-PnpDevice", "class": device_class, "name": friendly_name, "instance_id": instance_id, "status": status, "service": service }) cim_items = ps_json( "Get-CimInstance Win32_PnPEntity | " "Where-Object { $_.Status -eq 'OK' } | " "Select-Object Name, DeviceID, Manufacturer, PNPClass, Service, Status" ) seen = set(d.get("instance_id", "").lower() for d in devices) for item in cim_items: if item.get("parse_error"): continue device_id = str(item.get("DeviceID", "") or "") if not device_id: continue key = device_id.lower() if key in seen: continue seen.add(key) devices.append({ "source": "Win32_PnPEntity", "class": str(item.get("PNPClass", "") or ""), "name": str(item.get("Name", "") or ""), "instance_id": device_id, "status": str(item.get("Status", "") or ""), "service": str(item.get("Service", "") or ""), "manufacturer": str(item.get("Manufacturer", "") or "") }) return devices def score_dma_hardware_device(device: dict): raw = json.dumps(device, ensure_ascii=False) lower = raw.lower() risk = 0 reasons = [] known_matches = match_known_threat(lower) for m in known_matches: risk += m["risk"] reasons.append( f"known threat hardware indicator: {m['name']} ({m['category']}, {m['confidence']})" ) high_indicators = [ "pcileech", "screamer", "captaindma", "leechcore", "memprocfs", "kmbox", "km box", "kmbox net", "kmbox nb", "kmbox nvideo", "usb3380", "ft601", "ft600", "xilinx", "altera", "lattice" ] high_hardware_ids = [ "pci\\ven_10ee", "pci\\ven_1172", "usb\\vid_0403", "usb\\vid_1d50" ] medium_indicators = [ "fpga", "jtag", "usb serial", "serial converter", "ch340", "ch341", "cp210", "cp210x", "ftdi", "vendor-defined", "hid-compliant vendor-defined", "thunderbolt", "usb4" ] weak_bridge_indicators = [ "pci-to-pci bridge", "pci express root port", "pcie root port", "thunderbolt controller", "usb4 host router" ] for k in high_indicators: if k in lower: risk += 85 reasons.append(f"high DMA/KMBOX current hardware indicator: {k}") for hid in high_hardware_ids: if hid in lower: if hid == "usb\\vid_0403": risk += 45 reasons.append("FTDI USB VID observed; common in DMA/dev-board ecosystems, requires review") else: risk += 80 reasons.append(f"high-risk hardware vendor/device id observed: {hid}") for k in medium_indicators: if k in lower: risk += 25 reasons.append(f"medium hardware indicator: {k}") fp_score, fp_status, fp_reasons = score_usb_kmbox_fingerprint(lower) risk += fp_score reasons.extend(fp_reasons) for k in weak_bridge_indicators: if k in lower: risk += 10 reasons.append(f"weak bridge/controller indicator: {k}") has_normal_hid = is_normal_hid_context(lower) dangerous = is_dangerous_whitelist_exception(lower) has_high_indicator = any( k in lower for k in [ "kmbox", "km box", "kmbox net", "kmbox nb", "kmbox nvideo", "pcileech", "screamer", "captaindma", "leechcore", "memprocfs", "usb3380", "ft601", "ft600", "xilinx", "altera", "lattice" ] ) network_indicator = has_kmbox_network_indicator(lower) if has_normal_hid and fp_status == "known_good" and not dangerous and not known_matches and not has_high_indicator and not network_indicator: risk = 5 reasons.append("normal HID/USB whitelist suppressed standalone device risk") elif has_normal_hid and fp_status != "known_bad" and not dangerous and not known_matches and network_indicator: risk = min(risk, 15) reasons.append("normal HID/USB device kept only as low-risk KMBOX network combination evidence") return min(risk, 100), reasons, known_matches def scan_current_pcie_dma_hardware() -> list: findings = [] devices = get_present_hardware_snapshot() important_classes = [ "usb", "hidclass", "keyboard", "mouse", "net", "ports", "system", "media", "display", "unknown" ] seen = set() for d in devices: raw = json.dumps(d, ensure_ascii=False).lower() device_class = str(d.get("class", "")).lower() instance_id = str(d.get("instance_id", "")).lower() relevant = False if instance_id.startswith("pci\\"): relevant = True if instance_id.startswith("usb\\"): relevant = True if instance_id.startswith("hid\\"): relevant = True if "thunderbolt" in raw or "usb4" in raw: relevant = True if device_class in important_classes: relevant = True if not relevant: continue key = instance_id or raw[:120] if key in seen: continue seen.add(key) risk, reasons, known_matches = score_dma_hardware_device(d) if risk < 50: continue findings.append({ "category": "dma_current_hardware", "risk": risk, "title": "Possible current DMA/KMBOX hardware indicator", "detail": d.get("name") or d.get("instance_id") or "unknown hardware", "evidence": { "device": d, "known_threat_matches": known_matches, "reasons": reasons, "note": "This is current hardware enumeration. It can indicate DMA/KMBOX/dev-board presence, but still requires manual review." } }) return findings def device_fingerprint(device: dict) -> str: instance_id = str(device.get("instance_id", "") or "").strip().lower() name = str(device.get("name", "") or "").strip().lower() device_class = str(device.get("class", "") or "").strip().lower() service = str(device.get("service", "") or "").strip().lower() return f"{device_class}|{name}|{service}|{instance_id}" def scan_hardware_baseline() -> list: findings = [] current_devices = get_present_hardware_snapshot() current_map = {} for d in current_devices: fp = device_fingerprint(d) if fp.strip("|"): current_map[fp] = d if not HARDWARE_BASELINE_PATH.exists(): baseline = { "created_at": datetime.now().isoformat(), "scanner_version": SCANNER_VERSION, "device_count": len(current_map), "devices": list(current_map.values()), } try: with open(HARDWARE_BASELINE_PATH, "w", encoding="utf-8") as f: json.dump(baseline, f, ensure_ascii=False, indent=2) except Exception: pass findings.append({ "category": "hardware_baseline", "risk": 0, "title": "Hardware baseline initialized", "detail": "First run baseline created", "evidence": { "baseline_path": str(HARDWARE_BASELINE_PATH), "device_count": len(current_map), "note": "This first run creates a baseline. Future scans can detect newly added hardware.", }, }) return findings try: with open(HARDWARE_BASELINE_PATH, "r", encoding="utf-8") as f: baseline = json.load(f) except Exception: return findings baseline_devices = baseline.get("devices", []) baseline_map = {} for d in baseline_devices: fp = device_fingerprint(d) if fp.strip("|"): baseline_map[fp] = d new_fps = sorted(set(current_map.keys()) - set(baseline_map.keys())) reported = 0 network_context = has_kmbox_network_indicator( json.dumps(current_devices, ensure_ascii=False) ) for fp in new_fps: d = current_map[fp] raw = json.dumps(d, ensure_ascii=False).lower() instance_id = str(d.get("instance_id", "") or "").lower() device_class = str(d.get("class", "") or "").lower() device_name = str(d.get("name", "") or "") interesting = False if instance_id.startswith("pci\\"): interesting = True if instance_id.startswith("usb\\"): interesting = True if instance_id.startswith("hid\\"): interesting = True if device_class in ["net", "ports", "usb", "hidclass", "keyboard", "mouse", "system", "unknown"]: interesting = True if "thunderbolt" in raw or "usb4" in raw: interesting = True if not interesting: continue risk, reasons, known_matches = score_dma_hardware_device(d) normal_hid = is_normal_hid_context(f"{instance_id} {device_name} {device_class}") if normal_hid and not known_matches: if network_context: risk = 10 reasons.append( "common HID device; retained only as weak context because KMBOX network indicator exists" ) else: continue else: risk += 25 reasons.append("new hardware device compared to baseline") if instance_id.startswith("pci\\"): risk += 15 reasons.append("new PCI device compared to baseline") if instance_id.startswith("usb\\") or instance_id.startswith("hid\\"): risk += 10 reasons.append("new USB/HID device compared to baseline") risk = min(risk, 100) if risk < 10: continue findings.append({ "category": "hardware_baseline", "risk": risk, "title": "New hardware detected compared to baseline", "detail": d.get("name") or d.get("instance_id") or "unknown new hardware", "evidence": { "device": d, "known_threat_matches": known_matches, "reasons": reasons, "baseline_created_at": baseline.get("created_at"), "note": "New hardware is not proof of cheating by itself. Review if it appears before/during match check-in.", }, }) reported += 1 if reported >= 20: break return findings def scan_drivers() -> list: findings = [] drivers = ps_json( "Get-CimInstance Win32_SystemDriver | " "Select-Object Name, DisplayName, State, StartMode, PathName" ) suspicious_path_parts = [ "\\downloads\\", "\\desktop\\", "\\appdata\\", "\\temp\\", "\\users\\public\\" ] for d in drivers: if d.get("parse_error"): continue name = str(d.get("Name", "")).lower() path = str(d.get("PathName", "")).lower() driver_hay = name + " " + path if is_trusted_vendor_context(driver_hay) or name in {"asmio", "hwinfo_214", "qeeyoupacket"}: continue risk = 0 reasons = [] known_matches = match_known_threat(name + " " + path) for m in known_matches: risk += m["risk"] reasons.append(f"known threat name in driver: {m['name']} ({m['category']}, {m['confidence']})") for k in HIGH_KEYWORDS: if contains_keyword(name, k) or contains_keyword(path, k): risk += 80 reasons.append(f"high keyword in driver name/path: {k}") if any(p in path for p in suspicious_path_parts): risk += 60 reasons.append("driver path is outside normal Windows driver directories") driver_file = extract_windows_file_path(d.get("PathName", "")) signature = {} if driver_file and driver_file.exists(): signature = get_authenticode_signature(driver_file) sig_adjust, sig_reasons = signature_risk_adjustment(driver_file, signature) risk = max(0, min(risk + sig_adjust, 100)) reasons.extend(sig_reasons) if risk >= 60: findings.append({ "category": "drivers", "risk": min(risk, 100), "title": "Suspicious driver", "detail": d.get("Name", "unknown"), "evidence": { "driver": d, "signature": signature, "known_threat_matches": known_matches, "reasons": reasons } }) return findings def scan_security_config() -> list: findings = [] bcd = run_cmd("bcdedit /enum").lower() if "testsigning" in bcd and "yes" in bcd: findings.append({ "category": "security_config", "risk": 70, "title": "Test signing appears enabled", "detail": "Windows test signing can reduce driver trust.", "evidence": {} }) if "nointegritychecks" in bcd and "yes" in bcd: findings.append({ "category": "security_config", "risk": 80, "title": "No integrity checks appears enabled", "detail": "Driver integrity checks may be disabled.", "evidence": {} }) if "debug" in bcd and "yes" in bcd: findings.append({ "category": "security_config", "risk": 70, "title": "Boot debug appears enabled", "detail": "Boot debug mode is unusual for normal gaming systems.", "evidence": {} }) return findings def scan_hwid_baseline() -> list: findings = [] current = get_hwid_snapshot() current_suspicion_score, current_suspicion_reasons, random_fields, default_fields = score_hwid_snapshot_suspicion(current) if not HWID_BASELINE_PATH.exists(): baseline = { "created_at": datetime.now().isoformat(), "scanner_version": SCANNER_VERSION, "snapshot": current } try: with open(HWID_BASELINE_PATH, "w", encoding="utf-8") as f: json.dump(baseline, f, ensure_ascii=False, indent=2) except Exception: pass # First run must never be treated as HWID spoofing. # Many OEM serials / UUIDs look random by design. Establish a baseline only. findings.append({ "category": "hwid_baseline", "risk": 0, "title": "HWID baseline initialized", "detail": "First run HWID baseline created", "evidence": { "baseline_path": str(HWID_BASELINE_PATH), "current_snapshot": current, "random_fields": random_fields, "default_fields": default_fields, "reasons": current_suspicion_reasons, "note": "First run only creates a baseline. It is not HWID spoofing evidence by itself." } }) return findings try: with open(HWID_BASELINE_PATH, "r", encoding="utf-8") as f: baseline = json.load(f) except Exception: return findings baseline_snapshot = baseline.get("snapshot", {}) fields_to_compare = [ "computer_manufacturer", "computer_model", "bios_manufacturer", "bios_serial", "bios_version", "baseboard_manufacturer", "baseboard_product", "baseboard_serial", "csproduct_vendor", "csproduct_name", "csproduct_identifying_number", "uuid" ] changed_fields = [] for field in fields_to_compare: old = normalize_hwid_value(baseline_snapshot.get(field)) new = normalize_hwid_value(current.get(field)) if old and new and old != new: changed_fields.append({ "field": field, "old": old, "new": new }) risk = 0 reasons = [] if changed_fields: risk += min(len(changed_fields) * 12, 80) reasons.append(f"{len(changed_fields)} SMBIOS/DMI fields changed compared to baseline") critical_fields = [ "computer_manufacturer", "computer_model", "bios_manufacturer", "bios_serial", "baseboard_product", "baseboard_serial", "uuid" ] critical_changes = [ item for item in changed_fields if item["field"] in critical_fields ] if len(critical_changes) >= 3: risk += 35 reasons.append("multiple critical HWID fields changed") if len(critical_changes) >= 5: risk += 25 reasons.append("many critical HWID fields changed, likely HWID spoofing") if current_suspicion_score >= 70: risk += 35 reasons.extend(current_suspicion_reasons) risk = min(risk, 100) if risk >= 50: findings.append({ "category": "hwid_spoofing", "risk": risk, "title": "Possible HWID spoofing detected", "detail": "SMBIOS/DMI fields changed compared to baseline", "evidence": { "baseline_created_at": baseline.get("created_at"), "changed_fields": changed_fields, "critical_changes": critical_changes, "current_snapshot": current, "baseline_snapshot": baseline_snapshot, "random_fields": random_fields, "default_fields": default_fields, "reasons": reasons, "note": "Large SMBIOS/DMI changes are strong HWID spoofing indicators. Hardware repair or BIOS update can also change some fields, so review context." } }) elif current_suspicion_score >= 40: findings.append({ "category": "hwid_integrity", "risk": min(current_suspicion_score, 60), "title": "Abnormal HWID values detected", "detail": "Some current SMBIOS/DMI values look abnormal", "evidence": { "current_snapshot": current, "random_fields": random_fields, "default_fields": default_fields, "reasons": current_suspicion_reasons, "note": "Abnormal values alone are not always proof of spoofing, but should be reviewed." } }) return findings def scan_hwid_integrity() -> list: findings = [] snapshot = get_hwid_snapshot() score, reasons, random_fields, default_fields = score_hwid_snapshot_suspicion(snapshot) if is_normal_oem_hwid_snapshot(snapshot) and not default_fields: return findings if score >= 50 or default_fields: risk = min(score, 35) findings.append({ "category": "hwid_integrity", "risk": risk, "title": "Abnormal or default HWID field", "detail": "Some SMBIOS/DMI fields contain default, abnormal, or random-looking values.", "evidence": { "snapshot": snapshot, "random_fields": random_fields, "default_fields": default_fields, "reasons": reasons, "note": "Default DMI values can be normal on some motherboards. Random-looking values or many changed fields are stronger spoofing indicators." } }) return findings def scan_display_edid_fuser() -> list: findings = [] seen = set() monitor_ids = ps_json( "Get-CimInstance -Namespace root\\wmi -ClassName WmiMonitorID | " "Select-Object InstanceName, ManufacturerName, UserFriendlyName, SerialNumberID, YearOfManufacture" ) for item in monitor_ids: if item.get("parse_error"): continue manufacturer = decode_monitor_field(item.get("ManufacturerName")) friendly_name = decode_monitor_field(item.get("UserFriendlyName")) serial = decode_monitor_field(item.get("SerialNumberID")) instance_name = str(item.get("InstanceName", "") or "") year = str(item.get("YearOfManufacture", "") or "") raw_text = " ".join([manufacturer, friendly_name, serial, instance_name, year]) risk, reasons = score_display_fuser_text(raw_text) if not manufacturer and not friendly_name: risk += 20 reasons.append("monitor EDID identity fields are empty or unreadable") if serial and re.fullmatch(r"0+|f+|12345678|abcdefgh", serial.lower()): risk += 20 reasons.append("monitor serial looks default or synthetic") if friendly_name and looks_random_hwid_string(friendly_name): risk += 15 reasons.append("monitor friendly name looks randomized") if manufacturer and manufacturer.lower() in {"unk", "unknown", "default", "gen"}: risk += 15 reasons.append("monitor manufacturer looks generic/default") if risk < 30: continue key = ("monitor:" + instance_name).lower() if key in seen: continue seen.add(key) findings.append({ "category": "display_fuser", "risk": min(risk, 80), "title": "Suspicious display EDID / monitor identity", "detail": friendly_name or manufacturer or instance_name or "unknown monitor", "evidence": { "source": "WmiMonitorID", "instance_name": instance_name, "manufacturer": manufacturer, "friendly_name": friendly_name, "serial": serial, "year": year, "reasons": reasons, "note": "Generic, synthetic, or virtualized EDID information can be normal in some docks, capture paths, or remote-display setups." } }) display_entities = ps_json( "Get-CimInstance Win32_PnPEntity | " "Where-Object { $_.PNPClass -in @('Monitor','Display','Media') -or $_.Name -match 'display|monitor|capture|hdmi|virtual' } | " "Select-Object Name, DeviceID, Manufacturer, PNPClass, Service, Status" ) for item in display_entities: if item.get("parse_error"): continue raw = json.dumps(item, ensure_ascii=False) risk, reasons = score_display_fuser_text(raw) lower = raw.lower() if "virtual" in lower and ("display" in lower or "monitor" in lower): risk += 20 reasons.append("virtual display path detected") if "capture" in lower and ("usb" in lower or "hdmi" in lower): risk += 20 reasons.append("capture bridge present in display device chain") if risk < 35: continue device_id = str(item.get("DeviceID", "") or item.get("Name", "") or raw[:80]) key = ("pnp:" + device_id).lower() if key in seen: continue seen.add(key) findings.append({ "category": "display_fuser", "risk": min(risk, 85), "title": "Suspicious display bridge / fuser indicator", "detail": item.get("Name") or "unknown display device", "evidence": { "source": "Win32_PnPEntity", "device": item, "reasons": reasons, "note": "Display capture, splitter, EDID emulator, and virtual monitor devices can be legitimate; treat as context-dependent evidence." } }) return findings def scan_ai_aim_artifacts(days: int = 30) -> list: findings = [] seen = set() process_items = ps_json( "Get-CimInstance Win32_Process | " "Select-Object Name, ExecutablePath, CommandLine, ProcessId" ) for item in process_items: if item.get("parse_error"): continue raw = " ".join([ str(item.get("Name", "") or ""), str(item.get("ExecutablePath", "") or ""), str(item.get("CommandLine", "") or "") ]) risk, reasons = score_ai_aim_text(raw) lower = raw.lower() if any(x in lower for x in ["python", "pythonw", "node", "cmd.exe", "powershell"]) and any( contains_keyword(lower, k) for k in ["yolo", "onnx", "opencv", "triggerbot", "aimbot", "pixelbot", "colorbot"] ): risk += 20 reasons.append("script/runtime process combined with aim-assist inference keywords") if risk < 40: continue key = "proc:" + str(item.get("ProcessId", "") or raw[:80]).lower() if key in seen: continue seen.add(key) findings.append({ "category": "ai_aim_assist", "risk": min(risk, 90), "title": "Possible AI aim-assist runtime artifact", "detail": item.get("Name") or "unknown process", "evidence": { "source": "Win32_Process", "process": item, "reasons": reasons, "note": "Runtime inference or automation libraries can be legitimate in dev/ML environments; review together with file and startup evidence." } }) user = Path.home() targets = [ user / "Desktop", user / "Downloads", user / "Documents", user / "AppData" / "Local", user / "AppData" / "Roaming", user / "AppData" / "Local" / "Temp" ] model_exts = {".onnx", ".pt", ".pth", ".engine", ".weights", ".cfg", ".yaml", ".yml", ".json"} since = datetime.now() - timedelta(days=days) for base in targets: if not base.exists(): continue for root, dirs, files in os.walk(base): root_path = Path(root) if is_known_good_path(root_path): dirs[:] = [] continue try: depth = len(root_path.relative_to(base).parts) if depth > 4: dirs[:] = [] continue except Exception: pass for filename in files: path = root_path / filename if is_self_artifact_path(path): continue suffix = path.suffix.lower() if suffix not in model_exts and suffix not in {".py", ".ps1", ".bat", ".cmd", ".txt"}: continue try: stat = path.stat() created = datetime.fromtimestamp(stat.st_ctime) modified = datetime.fromtimestamp(stat.st_mtime) if created < since and modified < since: continue snippet = "" if suffix in {".py", ".ps1", ".bat", ".cmd", ".txt", ".json", ".yaml", ".yml", ".cfg"}: snippet = read_text_snippet(path, max_chars=3000) if suffix in {".ps1", ".bat", ".cmd"} else path.read_text(encoding="utf-8", errors="ignore")[:3000] if is_ai_scan_excluded_path(path) and not is_dangerous_whitelist_exception(str(path) + " " + snippet): continue raw = " ".join([str(path), path.name, snippet]) doc_risk, doc_reasons = score_cheat_documentation_text(raw) if doc_risk >= 80: key = "file:" + str(path).lower() if key in seen: continue seen.add(key) findings.append({ "category": "cheat_documentation", "risk": min(max(doc_risk, 95), 100), "title": "Cheat loader / injection instructions found", "detail": path.name, "evidence": { "source": "filesystem", "path_type": normalize_path(path), "extension": suffix, "size": stat.st_size, "created": created.isoformat(), "modified": modified.isoformat(), "snippet": snippet[:800] if snippet else "", "reasons": doc_reasons, "note": "Explicit loader, injection, menu, or aimbot instructions are much stronger than generic AI/ML artifact matches." } }) continue risk, reasons = score_ai_aim_text(raw) if suffix in {".onnx", ".pt", ".pth", ".engine", ".weights"}: risk += 15 reasons.append("local ML model artifact in user-writable path") if any(contains_keyword(raw, k) for k in ["crosshair", "overlay", "capture", "detect", "triggerbot", "aimbot", "pixelbot", "colorbot"]): risk += 20 reasons.append("model/script artifact combined with aim-assist style terminology") if risk < 40: continue key = "file:" + str(path).lower() if key in seen: continue seen.add(key) findings.append({ "category": "ai_aim_assist", "risk": min(risk, 85), "title": "Possible AI aim-assist local artifact", "detail": path.name, "evidence": { "source": "filesystem", "path_type": normalize_path(path), "extension": suffix, "size": stat.st_size, "created": created.isoformat(), "modified": modified.isoformat(), "snippet": snippet[:800] if snippet else "", "reasons": reasons, "note": "Local ML models or OpenCV scripts are not cheating by themselves; treat as stronger only when paired with aim/trigger/overlay context." } }) except Exception: continue return findings # ============================================================ # Added modules: registry / USB history / macro environment / network debugger / evidence correlation # ============================================================ MACRO_TOOL_KEYWORDS = [ "autohotkey", "ahk", "pulover", "macro recorder", "macrocreator", "tinytask", "op auto clicker", "gs auto clicker", "clicker", "rapidfire", "rapid fire", "no recoil", "norecoil", "recoil", "logitech ghub", "lghub", "razer synapse", "steelseries gg", "corsair icue", "bloody", "a4tech", "鼠标宏", "连点器", "压枪", "无后座", "宏脚本" ] MACRO_STRONG_KEYWORDS = ["no recoil", "norecoil", "recoil macro", "rapidfire", "rapid fire", "anti recoil", "压枪", "无后座", "triggerbot", "colorbot", "pixelbot", "aimbot"] MACRO_TOOL_KEYWORDS.extend([ "g hub", "ghub", "lghub_agent", "lghub_updater", "bloody6", "bloody7", "razercentral", "synapse3", "lua script", "macro.lua", "mouse_event", "move mouse relative", "logitech lua", ]) MACRO_STRONG_KEYWORDS.extend([ "no-recoil", "anti-recoil", "recoil table", "recoil compensation", "rapid-fire", "fire rate", "weapon recoil", "enableprimarymousebuttonevents", "move mouserelative", "mouse_event", "pressandreleasekey", ]) REGISTRY_DANGEROUS_PATH_PARTS = ["\\appdata\\local\\temp\\", "\\appdata\\roaming\\", "\\users\\public\\", "\\downloads\\", "\\desktop\\", "\\programdata\\temp", "\\temp\\"] NETWORK_DEBUG_PORTS = {"27042":"Frida default server port", "27043":"Frida secondary server port", "23946":"debug/instrumentation port", "8888":"local HTTP proxy/debugger", "8080":"local HTTP proxy/web panel", "9090":"local proxy/web panel"} def _safe_str(value) -> str: return str(value or "").strip() def _reg_hay(obj) -> str: try: return json.dumps(obj or {}, ensure_ascii=False).lower() except Exception: return str(obj or "").lower() def _risk_from_context(text: str, base: int = 25) -> int: lower = str(text or "").lower() risk = base if any(k in lower for k in MACRO_STRONG_KEYWORDS): risk += 45 if any(k in lower for k in HIGH_KEYWORDS): risk += 35 if any(k in lower for k in REGISTRY_DANGEROUS_PATH_PARTS): risk += 20 if is_trusted_vendor_context(lower) or any(k in lower for k in NORMAL_SOFTWARE_REVIEW_KEYWORDS): risk -= 25 return max(0, min(risk, 95)) def scan_registry_artifacts() -> list: findings = [] startup_cmd = r''' $paths = @('HKCU:\Software\Microsoft\Windows\CurrentVersion\Run','HKCU:\Software\Microsoft\Windows\CurrentVersion\RunOnce','HKLM:\Software\Microsoft\Windows\CurrentVersion\Run','HKLM:\Software\Microsoft\Windows\CurrentVersion\RunOnce','HKLM:\Software\WOW6432Node\Microsoft\Windows\CurrentVersion\Run') $result = @() foreach($p in $paths){ if(Test-Path $p){ $props = Get-ItemProperty -Path $p -ErrorAction SilentlyContinue; foreach($name in $props.PSObject.Properties.Name){ if($name -in @('PSPath','PSParentPath','PSChildName','PSDrive','PSProvider')){ continue }; $value=[string]$props.$name; $result += [pscustomobject]@{ key=$p; name=$name; value=$value } } } } $result ''' for item in ps_json(startup_cmd): hay = _reg_hay(item) if not hay or "parse_error" in item: continue if not (any(k in hay for k in HIGH_KEYWORDS + MEDIUM_KEYWORDS + MACRO_TOOL_KEYWORDS) or any(k in hay for k in REGISTRY_DANGEROUS_PATH_PARTS)): continue risk = _risk_from_context(hay, 25) if risk < 20: continue findings.append({"category":"registry_startup","risk":risk,"title":"Registry startup item","detail":_safe_str(item.get("name")),"evidence":{"registry":item,"reasons":["startup registry entry with suspicious keyword/path context"]}}) security_cmd = r''' $result = @() $ifeo = 'HKLM:\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options' if(Test-Path $ifeo){ Get-ChildItem $ifeo -ErrorAction SilentlyContinue | ForEach-Object { $dbg=(Get-ItemProperty -Path $_.PSPath -ErrorAction SilentlyContinue).Debugger; if($dbg){ $result += [pscustomobject]@{ type='IFEO Debugger'; key=$_.Name; value=[string]$dbg } } } } foreach($p in @('HKLM:\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Windows','HKLM:\SOFTWARE\WOW6432Node\Microsoft\Windows NT\CurrentVersion\Windows')){ if(Test-Path $p){ $props=Get-ItemProperty -Path $p -ErrorAction SilentlyContinue; if([string]$props.AppInit_DLLs){ $result += [pscustomobject]@{ type='AppInit_DLLs'; key=$p; value=[string]$props.AppInit_DLLs } } } } $winlogon='HKLM:\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon' if(Test-Path $winlogon){ $props=Get-ItemProperty -Path $winlogon -ErrorAction SilentlyContinue; $result += [pscustomobject]@{ type='Winlogon Shell'; key=$winlogon; value=[string]$props.Shell }; $result += [pscustomobject]@{ type='Winlogon Userinit'; key=$winlogon; value=[string]$props.Userinit } } $result ''' for item in ps_json(security_cmd): hay = _reg_hay(item); typ = _safe_str(item.get("type")); val = _safe_str(item.get("value")) if not val or "parse_error" in item: continue normal_winlogon = (typ == "Winlogon Shell" and val.lower() == "explorer.exe") or (typ == "Winlogon Userinit" and "userinit.exe" in val.lower() and "," in val) if normal_winlogon: continue risk = 80 if typ in {"IFEO Debugger", "AppInit_DLLs"} else _risk_from_context(hay, 45) findings.append({"category":"registry_security_hook","risk":min(risk,95),"title":typ,"detail":val[:160],"evidence":{"registry":item,"reasons":[f"security-sensitive registry hook: {typ}"]}}) service_cmd = r''' $result=@(); $root='HKLM:\SYSTEM\CurrentControlSet\Services' if(Test-Path $root){ Get-ChildItem $root -ErrorAction SilentlyContinue | ForEach-Object { $props=Get-ItemProperty -Path $_.PSPath -ErrorAction SilentlyContinue; $img=[string]$props.ImagePath; if($img){ $result += [pscustomobject]@{ name=$_.PSChildName; image_path=$img; type=$props.Type; start=$props.Start } } } } $result ''' for item in ps_json(service_cmd): hay = _reg_hay(item) if not hay or "parse_error" in item: continue if not (any(k in hay for k in HIGH_KEYWORDS + MEDIUM_KEYWORDS) or any(k in hay for k in REGISTRY_DANGEROUS_PATH_PARTS)): continue if is_trusted_vendor_context(hay) and not is_dangerous_whitelist_exception(hay): continue findings.append({"category":"registry_driver_service","risk":_risk_from_context(hay,30),"title":"Registry driver/service image path","detail":_safe_str(item.get("name")),"evidence":{"registry":item,"reasons":["service/driver registry ImagePath has suspicious keyword/path context"]}}) recent_cmd = r''' $result=@(); $paths=@('HKCU:\Software\Classes\Local Settings\Software\Microsoft\Windows\Shell\MuiCache','HKCU:\Software\Microsoft\Windows NT\CurrentVersion\AppCompatFlags\Compatibility Assistant\Store') foreach($p in $paths){ if(Test-Path $p){ $props=Get-ItemProperty -Path $p -ErrorAction SilentlyContinue; foreach($name in $props.PSObject.Properties.Name){ if($name -in @('PSPath','PSParentPath','PSChildName','PSDrive','PSProvider')){ continue }; $result += [pscustomobject]@{ key=$p; name=$name; value=[string]$props.$name } } } } $result | Select-Object -First 250 ''' for item in ps_json(recent_cmd): hay = _reg_hay(item) if not hay or "parse_error" in item or not any(k in hay for k in HIGH_KEYWORDS + MACRO_TOOL_KEYWORDS): continue if is_trusted_vendor_context(hay) and not is_dangerous_whitelist_exception(hay): continue findings.append({"category":"registry_recent_execution","risk":min(_risk_from_context(hay,20),65),"title":"Registry recent execution trace","detail":_safe_str(item.get("name"))[:180],"evidence":{"registry":item,"reasons":["recent execution registry trace with suspicious keyword context"]}}) return findings def scan_usb_history() -> list: findings = [] cmd = r''' $result=@(); $roots=@('HKLM:\SYSTEM\CurrentControlSet\Enum\USBSTOR','HKLM:\SYSTEM\CurrentControlSet\Enum\USB','HKLM:\SYSTEM\CurrentControlSet\Enum\HID') foreach($root in $roots){ if(Test-Path $root){ Get-ChildItem $root -Recurse -Depth 2 -ErrorAction SilentlyContinue | ForEach-Object { $props=Get-ItemProperty -Path $_.PSPath -ErrorAction SilentlyContinue; $result += [pscustomobject]@{ key=$_.Name; class=[string]$props.Class; friendly_name=[string]$props.FriendlyName; device_desc=[string]$props.DeviceDesc; mfg=[string]$props.Mfg; service=[string]$props.Service } } } } $result | Select-Object -First 500 ''' for item in ps_json(cmd): hay = _reg_hay(item) if not hay or "parse_error" in item: continue fp_score, fp_status, fp_reasons = score_usb_kmbox_fingerprint(hay) strong = has_strong_dma_context(hay) or fp_status == "known_bad" or any(k in hay for k in ["ft601","ft600","usb3380","pcileech","leechcore","kmbox","ch347","ch341","ch340","cp210","ftdi"]) if not strong: continue risk = max(fp_score, 75 if has_strong_dma_context(hay) else 35) if is_normal_hid_context(hay) and fp_status != "known_bad" and not has_strong_dma_context(hay): risk = min(risk, 20) findings.append({"category":"registry_usb_history","risk":risk,"title":"USB/HID install history","detail":_safe_str(item.get("friendly_name") or item.get("device_desc") or item.get("key"))[:160],"evidence":{"registry":item,"reasons":["USB/HID registry history has DMA/serial bridge context"] + fp_reasons}}) return findings def scan_macro_environment() -> list: findings = [] for item in ps_json("Get-CimInstance Win32_Process | Select-Object Name,ExecutablePath,CommandLine"): hay = _reg_hay(item) if not hay or "parse_error" in item or not any(k in hay for k in MACRO_TOOL_KEYWORDS): continue risk = 70 if any(k in hay for k in MACRO_STRONG_KEYWORDS) else 25 if any(v in hay for v in ["lghub", "razer synapse", "steelseries", "icue"]): risk = min(risk, 10) findings.append({"category":"macro_software","risk":risk,"title":"Macro / automation software process","detail":_safe_str(item.get("Name")),"evidence":{"process":item,"reasons":["macro or automation software context; not proof by itself"]}}) roots = [ Path.home()/"Desktop", Path.home()/"Downloads", Path.home()/"Documents", Path.home()/"AppData"/"Local"/"LGHUB", Path.home()/"AppData"/"Roaming"/"LGHUB", Path.home()/"AppData"/"Local"/"Razer", Path.home()/"AppData"/"Roaming"/"Razer", Path.home()/"AppData"/"Local"/"Bloody7", Path.home()/"AppData"/"Roaming"/"Bloody7", ] for root in roots: try: if not root.exists(): continue for path in root.rglob("*"): try: if not path.is_file() or path.suffix.lower() not in {".ahk", ".lua", ".amc", ".txt", ".ini", ".cfg", ".xml", ".json", ".lgprofile"}: continue snippet = read_text_snippet(path, 2000) combined = (str(path) + "\n" + snippet).lower() if not any(k in combined for k in MACRO_TOOL_KEYWORDS + MACRO_STRONG_KEYWORDS): continue risk = 75 if any(k in combined for k in MACRO_STRONG_KEYWORDS) else 35 findings.append({"category":"macro_script","risk":risk,"title":"Macro / automation script","detail":path.name,"evidence":{"path":normalize_path(path),"extension":path.suffix.lower(),"snippet":snippet[:600],"reasons":["macro script or config found in user location"]}}) except Exception: continue except Exception: continue return findings def scan_network_debugger() -> list: findings = [] out = run_cmd("netstat -ano") if not out or out.startswith("[ERROR]"): return findings for line in out.splitlines(): lower = line.lower() if "listen" not in lower and "listening" not in lower: continue for port, label in NETWORK_DEBUG_PORTS.items(): if f":{port} " in lower or f":{port}\t" in lower: findings.append({"category":"network_proxy_debugger","risk":20 if port in {"8888","8080","9090"} else 45,"title":"Local proxy/debugger port","detail":f"port {port}","evidence":{"line":line[:300],"reasons":[label,"network port is context only; not proof by itself"]}}) return findings[:20] def build_evidence_correlation(findings: list) -> list: hay = _reg_hay(findings) cats = {str(x.get("category", "")) for x in findings or []} results = [] has_known_bad = "known_bad" in hay or any(k in hay for k in ["fakevgc", "ace bypass", "kdmapper", "pcileech", "leechcore", "memprocfs"]) has_strong_network_dma = "192.168.2.188" in hay or "known kmbox net ip" in hay has_weak_kmbox_subnet = "192.168.2.x" in hay or ("192.168.2." in hay and "192.168.2.188" not in hay) has_dma_artifact = bool(cats & {"dma_kmbox_device", "dma_current_hardware", "registry_usb_history"}) or has_strong_dma_context(hay) has_dma = has_dma_artifact or has_strong_network_dma has_driver = bool(cats & {"drivers", "registry_driver_service", "registry_security_hook"}) has_macro = bool(cats & {"macro_software", "macro_script"}) has_ai = bool(cats & {"ai_aim_assist"}) has_virtual_display = bool(cats & {"display_fuser"}) has_prefetch_or_history = bool(cats & {"prefetch", "registry_recent_execution", "device_install_history"}) has_memory = bool(cats & {"memory_toolchain", "memory_module_anomaly", "memory_injection_artifact", "process_handle_suspicious"}) if has_dma_artifact and has_weak_kmbox_subnet and has_virtual_display and has_prefetch_or_history: results.append({"category":"evidence_correlation","risk":90,"title":"DMA companion-machine evidence chain","detail":"DMA artifact, 192.168.2.x subnet, virtual/display fuser, and historical run trace appear together","evidence":{"categories":sorted(list(cats)),"reasons":["evidence chain: DMA artifact + weak KMBOX subnet + display fuser + historical execution/install trace"]}}) if has_known_bad and has_driver: results.append({"category":"evidence_correlation","risk":95,"title":"Known threat + driver/security context","detail":"known bad indicator is paired with driver/security evidence","evidence":{"reasons":["strong evidence chain: known threat plus driver/security context"]}}) if has_dma and has_driver: results.append({"category":"evidence_correlation","risk":85,"title":"DMA/hardware context + driver context","detail":"DMA/HID/USB evidence appears with driver/security evidence","evidence":{"reasons":["evidence chain: DMA/hardware context plus driver/security context"]}}) if has_macro and (has_ai or "sendinput" in hay or "mouse event" in hay): results.append({"category":"evidence_correlation","risk":70,"title":"Macro/automation context correlation","detail":"macro environment appears with automation/AI input context","evidence":{"reasons":["macro/automation evidence should be manually reviewed"]}}) if has_memory and (has_driver or has_known_bad): results.append({"category":"evidence_correlation","risk":90,"title":"Memory evidence chain","detail":"memory module/toolchain combined with driver or known-bad context","evidence":{"categories":sorted(list(cats)),"reasons":["memory module/toolchain evidence appears together with driver/security/known-bad context"]}}) elif has_memory and (has_dma or has_macro or has_ai): results.append({"category":"evidence_correlation","risk":75,"title":"Memory anomaly correlated with other suspicious context","detail":"memory evidence combined with hardware/macro/AI context","evidence":{"categories":sorted(list(cats)),"reasons":["memory anomaly appears together with another suspicious subsystem"]}}) return results def build_timeline_events(findings: list, scan_time: str) -> list: events = [] def add_event(time_value, category, detail, source, risk): text = str(time_value or "").strip() if not text: text = scan_time events.append({ "time": text, "category": category, "detail": str(detail or "")[:240], "source": source, "risk": int(risk or 0), }) for item in findings or []: if not isinstance(item, dict): continue category = str(item.get("category", "") or "") detail = str(item.get("detail", "") or item.get("title", "") or "") risk = int(item.get("risk", 0) or 0) evidence = item.get("evidence", {}) or {} for event in evidence.get("timeline_events", []) or []: if isinstance(event, dict): add_event(event.get("time"), category, event.get("detail") or detail, event.get("event") or category, risk) if category == "prefetch": add_event(evidence.get("modified"), category, detail, "prefetch_modified", risk) elif category in {"suspicious_file", "suspicious_directory", "macro_script", "cheat_documentation"}: add_event(evidence.get("created"), category, detail, "file_created", risk) add_event(evidence.get("modified"), category, detail, "file_modified", risk) elif category in {"drivers", "device_install_history", "registry_usb_history"}: add_event(evidence.get("observed_at") or scan_time, category, detail, "driver_or_usb_observed", risk) elif category == "memory_module_anomaly": module = evidence.get("module", {}) or {} add_event(module.get("observed_at") or scan_time, category, detail, "game_process_module_observed", risk) elif category == "display_fuser": add_event(scan_time, category, detail, "display_chain_observed", risk) events = sorted(events, key=lambda x: str(x.get("time", ""))) return events[:250] def local_verdict_from_findings(findings: list): if not findings: return 0, "CLEAN", "未发现明显异常" total_risk = min(sum(int(item.get("risk", 0) or 0) for item in findings), 100) categories = {str(item.get("category", "")) for item in findings} if any(int(item.get("risk", 0) or 0) >= 90 for item in findings): return total_risk, "HIGH_RISK", "发现高风险证据,建议优先人工复核" if categories and categories.issubset({"hwid_baseline", "hwid_integrity", "hardware_baseline"}): return min(total_risk, 35), "LOW_RISK", "主要为基线或完整性类信号,存在误报可能" if total_risk >= 70: return total_risk, "HIGH_RISK", "存在较强可疑证据,建议尽快人工复核" if total_risk >= 35: return total_risk, "SUSPICIOUS", "存在可疑痕迹,需要结合上下文人工判断" return total_risk, "LOW_RISK", "仅发现弱证据或低风险异常" def local_verdict_label(verdict: str) -> str: mapping = { "CLEAN": "未发现明显异常", "LOW_RISK": "低风险 / 可能误报", "SUSPICIOUS": "可疑 / 建议复核", "HIGH_RISK": "高风险 / 建议重点复核" } return mapping.get(verdict, verdict) def local_finding_line(item: dict) -> str: category = str(item.get("category", "other")) risk = int(item.get("risk", 0) or 0) title = str(item.get("title", "") or "") detail = str(item.get("detail", "") or "") evidence = item.get("evidence", {}) or {} reasons = evidence.get("reasons", []) or [] subject = detail or title or category line = f"[{risk:>3}] {category} | {subject}" if reasons: preview = ";".join([str(x) for x in reasons[:3]]) line += f" | {preview}" return line def build_text_report(payload: dict, upload_result: dict = None, upload_error: str = "") -> str: findings = payload.get("findings", []) or [] player_id = str(payload.get("player_id", "") or "") scan_time = str(payload.get("scan_time", "") or "") system_info = payload.get("system_info", {}) or {} score, verdict, verdict_reason = local_verdict_from_findings(findings) report_id = "" public_url = "" if upload_result: report_id = str(upload_result.get("report_id", "") or "") if report_id: base = SERVER_URL.rsplit("/api/scan", 1)[0] public_url = f"{base}/r/{report_id}" lines = [ "Escort Anti-Cheat Scanner Report", "=" * 72, f"Player ID : {player_id}", f"Scanner Ver. : {payload.get('scanner_version', '')}", f"Rule Version : {payload.get('rule_version', '')}", f"Scan Time : {scan_time}", f"Local Verdict : {local_verdict_label(verdict)}", f"Risk Score : {score}", f"Finding Count : {len(findings)}", f"Summary : {verdict_reason}", ] if upload_result: lines.extend([ f"Upload Status : Success", f"Report ID : {report_id}", ]) if public_url: lines.append(f"Public Report : {public_url}") elif upload_error: lines.extend([ "Upload Status : Failed", f"Upload Error : {upload_error}" ]) computer_name = system_info.get("computer_name", "") os_version = system_info.get("os_version", "") if computer_name or os_version: lines.extend([ "", "System", "-" * 72, f"Computer Name : {computer_name}", f"OS Version : {os_version}" ]) lines.extend([ "", "Top Findings", "-" * 72, ]) top_findings = sorted(findings, key=lambda x: int(x.get("risk", 0) or 0), reverse=True) if top_findings: for item in top_findings[:20]: lines.append(local_finding_line(item)) else: lines.append("No suspicious findings.") category_counts = {} for item in findings: category = str(item.get("category", "other")) category_counts[category] = category_counts.get(category, 0) + 1 lines.extend([ "", "Category Summary", "-" * 72, ]) if category_counts: for category, count in sorted(category_counts.items(), key=lambda x: (-x[1], x[0])): lines.append(f"{category:<28} {count}") else: lines.append("No categories recorded.") timeline = payload.get("timeline", []) or build_timeline_events(findings, scan_time) lines.extend([ "", "Evidence Timeline", "-" * 72, ]) if timeline: for event in timeline[:20]: lines.append( f"{event.get('time', '')} | [{int(event.get('risk', 0) or 0):>3}] " f"{event.get('category', '')} | {event.get('detail', '')}" ) else: lines.append("No timeline events recorded.") lines.extend([ "", "Notes", "-" * 72, "This TXT report is a local quick-view summary.", "Full raw evidence remains in last_scan_payload.json." ]) return "\n".join(lines) + "\n" # ============================================================ # Added modules: memory module anomaly / memory toolchain / process-handle context # Defensive, read-only enumeration only. This is not real-time memory reading. # ============================================================ MEMORY_TOOLCHAIN_KEYWORDS = [ "cheatengine", "cheat engine", "cheatengine-x86_64", "x64dbg", "x32dbg", "ollydbg", "processhacker", "process hacker", "system informer", "procexp", "process explorer", "extreme injector", "extremeinjector", "xenos", "xenos64", "gh injector", "ghinjector", "injector", "dllinject", "dll inject", "manualmap", "manual map", "kdmapper", "drvmap", "dsefix", "kdu", "kernel driver utility", "scylla", "scyllahide", "reclass", "reclass.net", "artmoney", "gameguardian", "memprocfs", "leechcore", "pcileech" ] MEMORY_STRONG_TOOLCHAIN_KEYWORDS = [ "cheatengine", "cheat engine", "extreme injector", "extremeinjector", "xenos", "xenos64", "gh injector", "ghinjector", "kdmapper", "manualmap", "manual map", "drvmap", "dsefix", "kdu", "memprocfs", "leechcore", "pcileech" ] GAME_PROCESS_KEYWORDS = [ "cs2", "csgo", "valorant", "valorant-win64-shipping", "fortnite", "fortniteclient", "pubg", "tslgame", "apex", "r5apex", "overwatch", "cod", "warzone", "mw2", "mw3", "rainbowsix", "rainbow six", "r6", "escape from tarkov", "tarkov", "eft", "destiny2", "thefinals", "crossfire", "cf", "deltaforce", "df.exe", "game.exe" ] SUSPICIOUS_MODULE_KEYWORDS = [ "inject", "injector", "hook", "loader", "manualmap", "manual map", "cheat", "hack", "aimbot", "triggerbot", "silentaim", "esp", "wallhack", "overlay", "bypass", "fakevgc", "ace bypass", "eac bypass", "be bypass", "memory", "readwrite", "rw", "driver", "xenos", "extremeinjector", "extreme injector", "cheatengine", "cheat engine" ] SUSPICIOUS_MODULE_PATH_PARTS = [ "\\appdata\\local\\temp\\", "\\windows\\temp\\", "\\temp\\", "\\downloads\\", "\\desktop\\", "\\users\\public\\", "\\programdata\\temp\\", "\\appdata\\roaming\\" ] NORMAL_MODULE_PATH_PARTS = [ "\\windows\\system32\\", "\\windows\\syswow64\\", "\\windows\\winsxs\\", "\\program files\\nvidia corporation\\", "\\program files (x86)\\nvidia corporation\\", "\\program files\\amd\\", "\\program files\\intel\\", "\\program files (x86)\\steam\\", "\\program files\\steam\\", "\\program files\\epic games\\", "\\program files (x86)\\epic games\\", "\\microsoft\\edge\\", "\\microsoft\\edgewebview\\", "\\discord\\", "\\obs-studio\\" ] def _memory_hay(obj) -> str: try: return json.dumps(obj or {}, ensure_ascii=False).lower() except Exception: return str(obj or "").lower() def _looks_like_game_process(name: str, path: str = "", cmd: str = "") -> bool: hay = " ".join([str(name or ""), str(path or ""), str(cmd or "")]).lower() return any(k in hay for k in GAME_PROCESS_KEYWORDS) def _is_normal_module_path(path: str) -> bool: lower = str(path or "").lower() return any(k in lower for k in NORMAL_MODULE_PATH_PARTS) def _is_suspicious_module_path(path: str) -> bool: lower = str(path or "").lower() return any(k in lower for k in SUSPICIOUS_MODULE_PATH_PARTS) def _is_signed_status_bad(status: str) -> bool: lower = str(status or "").lower() return lower in {"notsigned", "hashmismatch", "nottrusted", "unknownerror"} def scan_memory_toolchain_processes() -> list: """Detect memory editing / injection toolchain processes. This does not read game memory. It only checks currently running process names, paths, and command lines for well-known memory/injection toolchains. """ findings = [] items = ps_json("Get-CimInstance Win32_Process | Select-Object Name,ProcessId,ExecutablePath,CommandLine,ParentProcessId") for item in items: if item.get("parse_error"): continue hay = _memory_hay(item) if not any(k in hay for k in MEMORY_TOOLCHAIN_KEYWORDS): continue risk = 50 reasons = ["memory editing / injection toolchain process context"] if any(k in hay for k in MEMORY_STRONG_TOOLCHAIN_KEYWORDS): risk = 85 reasons.append("strong memory toolchain keyword observed") if any(k in hay for k in ["\\appdata\\local\\temp\\", "\\downloads\\", "\\desktop\\", "\\users\\public\\"]): risk = min(100, risk + 10) reasons.append("toolchain process is running from user/drop path") if any(k in hay for k in ["known_bad", "fakevgc", "ace bypass", "kdmapper"]): risk = 95 reasons.append("toolchain context contains known-bad/bypass wording") findings.append({ "category": "memory_toolchain", "risk": min(risk, 100), "title": "Memory editing / injection toolchain process", "detail": _safe_str(item.get("Name") or "unknown"), "evidence": { "process": item, "reasons": reasons, "note": "Running memory tools are high-risk context, but still require evidence-chain review." } }) return findings[:30] def scan_memory_module_anomalies() -> list: """Enumerate loaded process modules and flag suspicious module context. It focuses on game-like processes and module DLLs loaded from Temp/AppData/Desktop/Downloads or modules with injection/hook/cheat/bypass keywords. No memory contents are read. """ findings = [] ps = r""" $procs = Get-CimInstance Win32_Process | Select-Object Name,ProcessId,ExecutablePath,CommandLine $result = @() foreach($p in $procs){ $hay = (($p.Name + " " + $p.ExecutablePath + " " + $p.CommandLine) -as [string]).ToLower() $isGame = $false foreach($g in @('cs2','csgo','valorant','valorant-win64-shipping','fortnite','fortniteclient','pubg','tslgame','apex','r5apex','overwatch','cod','warzone','rainbowsix','tarkov','destiny2','thefinals','crossfire','deltaforce')){ if($hay.Contains($g)){ $isGame = $true; break } } if(-not $isGame){ continue } try { $gp = Get-Process -Id $p.ProcessId -ErrorAction Stop foreach($m in $gp.Modules){ $mp = [string]$m.FileName $mn = [string]$m.ModuleName $mh = (($mn + " " + $mp) -as [string]).ToLower() $interesting = $false foreach($kw in @('inject','hook','loader','manualmap','manual map','cheat','bypass','xenos','kdmapper','fakevgc','aimbot','triggerbot')){ if($mh.Contains($kw)){ $interesting = $true; break } } foreach($pp in @('\appdata\','\temp\','\downloads\','\desktop\','\users\public\','\programdata\temp')){ if($mh.Contains($pp)){ $interesting = $true; break } } if(-not $interesting){ continue } $sigStatus = "" $sigSubject = "" if($mp){ try { $sig = Get-AuthenticodeSignature -FilePath $mp -ErrorAction SilentlyContinue if($sig){ $sigStatus = [string]$sig.Status; $sigSubject = [string]$sig.SignerCertificate.Subject } } catch {} } $result += [pscustomobject]@{ process_name=$p.Name; pid=$p.ProcessId; process_path=$p.ExecutablePath; module_name=$mn; module_path=$mp; signature_status=$sigStatus; signer=$sigSubject } } } catch {} } $result | Select-Object -First 1200 """ for item in ps_json(ps): if item.get("parse_error"): continue module_path = _safe_str(item.get("module_path")) module_name = _safe_str(item.get("module_name")) process_name = _safe_str(item.get("process_name")) sig_status = _safe_str(item.get("signature_status")) hay = _memory_hay(item) if not module_path and not module_name: continue if _is_normal_module_path(module_path) and not any(k in hay for k in DANGEROUS_KEYWORDS_NEVER_FULLY_WHITELIST): continue risk = 0 reasons = [] if _is_suspicious_module_path(module_path): risk += 35 reasons.append("game process loaded a module from Temp/AppData/Desktop/Downloads/Public path") if any(k in hay for k in SUSPICIOUS_MODULE_KEYWORDS): risk += 45 reasons.append("module name/path contains injection/hook/cheat/loader/bypass keyword") if _is_signed_status_bad(sig_status): risk += 20 reasons.append(f"module signature status is {sig_status}") if any(k in hay for k in MEMORY_STRONG_TOOLCHAIN_KEYWORDS + ["known_bad", "fakevgc", "ace bypass"]): risk += 45 reasons.append("module context contains strong memory/bypass keyword") if risk < 45: continue item["observed_at"] = datetime.now().isoformat() findings.append({ "category": "memory_module_anomaly", "risk": min(risk, 100), "title": "Suspicious loaded module in game process", "detail": f"{process_name}: {module_name}", "evidence": { "module": item, "reasons": reasons, "note": "Loaded module anomaly is not a full memory-integrity verdict; review with other evidence." } }) return findings[:40] def scan_process_handle_suspicious() -> list: """Weak process-handle context without external tools. Windows does not expose full cross-process handle tables through normal PowerShell. This function records suspicious toolchain + game-process coexistence as context only. """ findings = [] processes = ps_json("Get-CimInstance Win32_Process | Select-Object Name,ProcessId,ExecutablePath,CommandLine") if not processes: return findings games = [] tools = [] for item in processes: if item.get("parse_error"): continue name = _safe_str(item.get("Name")) path = _safe_str(item.get("ExecutablePath")) cmd = _safe_str(item.get("CommandLine")) hay = _memory_hay(item) if _looks_like_game_process(name, path, cmd): games.append(item) if any(k in hay for k in MEMORY_TOOLCHAIN_KEYWORDS): tools.append(item) if games and tools: findings.append({ "category": "process_handle_suspicious", "risk": 45, "title": "Memory toolchain and game process coexistence", "detail": f"{len(tools)} toolchain process(es), {len(games)} game-like process(es)", "evidence": { "toolchain_processes": tools[:10], "game_processes": games[:10], "reasons": ["memory/injection toolchain process observed while game-like process is running"], "note": "This is weak context only. Full handle ownership requires kernel/ETW/Sysinternals-level collection." } }) return findings # ============================================================ # Upload safety: cap finding count and evidence size so reports # cannot hang during local JSON generation/encryption/upload. # ============================================================ MAX_UPLOAD_FINDINGS_TOTAL = int(os.getenv("EAC_MAX_UPLOAD_FINDINGS", "30")) MAX_UPLOAD_FINDINGS_PER_CATEGORY = int(os.getenv("EAC_MAX_UPLOAD_FINDINGS_PER_CATEGORY", "5")) MAX_UPLOAD_EVIDENCE_CHARS = int(os.getenv("EAC_MAX_UPLOAD_EVIDENCE_CHARS", "2000")) MAX_UPLOAD_TIMELINE_EVENTS = int(os.getenv("EAC_MAX_UPLOAD_TIMELINE_EVENTS", "80")) def _compact_value_for_upload(value, max_chars: int = MAX_UPLOAD_EVIDENCE_CHARS): """Keep report useful but prevent huge nested evidence from freezing upload.""" try: if value is None or isinstance(value, (int, float, bool)): return value if isinstance(value, str): return value if len(value) <= max_chars else value[:max_chars] + "...[truncated]" if isinstance(value, (list, tuple)): out = [] for item in list(value)[:25]: out.append(_compact_value_for_upload(item, max_chars=max_chars)) if len(value) > 25: out.append(f"...[{len(value)-25} more items truncated]") return out if isinstance(value, dict): out = {} for idx, (k, v) in enumerate(value.items()): if idx >= 40: out["__truncated__"] = f"{len(value)-40} more keys truncated" break out[str(k)[:120]] = _compact_value_for_upload(v, max_chars=max_chars) return out s = str(value) return s if len(s) <= max_chars else s[:max_chars] + "...[truncated]" except Exception: return "[unserializable evidence truncated]" def compact_findings_for_upload(findings: list) -> list: """Deduplicate, sort by risk, cap per category and total findings.""" compacted = [] seen = set() per_cat = {} def safe_risk(item): try: return int((item or {}).get("risk", 0) or 0) except Exception: return 0 sorted_items = sorted(findings or [], key=safe_risk, reverse=True) for raw in sorted_items: item = dict(raw or {}) cat = str(item.get("category", "other") or "other") risk = safe_risk(item) detail = str(item.get("detail", "") or "")[:180].lower() title = str(item.get("title", "") or "")[:120].lower() key = (cat, detail, title, risk) if key in seen: continue seen.add(key) if per_cat.get(cat, 0) >= MAX_UPLOAD_FINDINGS_PER_CATEGORY: continue if len(compacted) >= MAX_UPLOAD_FINDINGS_TOTAL: break evidence = item.get("evidence", {}) or {} item["evidence"] = _compact_value_for_upload(evidence) item["risk"] = risk compacted.append(item) per_cat[cat] = per_cat.get(cat, 0) + 1 dropped = max(0, len(findings or []) - len(compacted)) if dropped: compacted.append({ "category": "scanner_upload_compaction", "risk": 0, "title": "Report evidence compacted for upload", "detail": f"{dropped} lower-priority or duplicate findings were compacted locally to avoid upload timeout.", "evidence": { "original_count": len(findings or []), "uploaded_count": len(compacted), "max_total": MAX_UPLOAD_FINDINGS_TOTAL, "max_per_category": MAX_UPLOAD_FINDINGS_PER_CATEGORY, "note": "Full local scan still completed; compacting prevents huge reports from hanging." } }) return compacted def print_scan_progress(step: int, total: int, label: str, findings_count: int = 0, start_ts: float = None): """Console progress bar for users. This is only UI output; it does not slow the scan intentionally.""" try: total = max(int(total), 1) step = min(max(int(step), 0), total) percent = int((step / total) * 100) bar_width = 28 filled = int(bar_width * step / total) bar = "█" * filled + "░" * (bar_width - filled) elapsed = "" if start_ts is not None: elapsed = f" | {int(time.monotonic() - start_ts)}s" print(f"\r[{bar}] {percent:3d}% {label} | 风险项 {findings_count}{elapsed}", end="", flush=True) if step >= total: print() except Exception: print(f"扫描进度:{step}/{total} {label}") def build_payload(player_id: str) -> dict: findings = [] scan_time = datetime.now().isoformat() scan_start_ts = time.monotonic() scan_steps = [ ("Prefetch 运行痕迹", lambda: scan_prefetch()), ("可疑目录", lambda: scan_suspicious_directories()), ("可疑快捷方式", lambda: scan_suspicious_shortcuts()), ("最近文件 / Hash / YARA", lambda: scan_recent_suspicious_files(scan_start_ts=scan_start_ts)), ("启动项", lambda: scan_startup_items()), ("计划任务", lambda: scan_scheduled_tasks()), ("KMBOX 网络", lambda: scan_kmbox_network()), ("DMA / KMBOX 设备", lambda: scan_dma_kmbox_devices()), ("设备安装历史", lambda: scan_device_install_history()), ("当前 PCIe / DMA 硬件", lambda: scan_current_pcie_dma_hardware()), ("硬件基线", lambda: scan_hardware_baseline()), ("显示链路 / 融合器", lambda: scan_display_edid_fuser()), ("AI 辅助痕迹", lambda: scan_ai_aim_artifacts()), ("驱动检查", lambda: scan_drivers()), ("安全配置", lambda: scan_security_config()), ("注册表痕迹", lambda: scan_registry_artifacts()), ("USB 历史", lambda: scan_usb_history()), ("宏 / 自动化环境", lambda: scan_macro_environment()), ("网络代理 / 调试", lambda: scan_network_debugger()), ("内存工具链进程", lambda: scan_memory_toolchain_processes()), ("游戏进程模块异常", lambda: scan_memory_module_anomalies()), ("进程句柄弱证据", lambda: scan_process_handle_suspicious()), ("系统日志 / 反取证", lambda: scan_anti_forensics()), ("扫描器可信状态", lambda: scan_scanner_integrity()), ] total_steps = len(scan_steps) + 5 step_no = 0 print_scan_progress(step_no, total_steps, "准备扫描", len(findings), scan_start_ts) for label, fn in scan_steps: step_no += 1 print_scan_progress(step_no, total_steps, label, len(findings), scan_start_ts) try: part = fn() or [] findings.extend(part) except Exception as e: findings.append({ "category": "scanner_stage_error", "risk": 5, "title": "Scanner stage failed", "detail": label, "evidence": {"error": str(e)[:300], "note": "stage error does not equal cheat evidence"} }) step_no += 1 print_scan_progress(step_no, total_steps, "应用误报学习", len(findings), scan_start_ts) findings = apply_client_false_positive_rules(findings) step_no += 1 print_scan_progress(step_no, total_steps, "证据链关联", len(findings), scan_start_ts) findings.extend(build_evidence_correlation(findings)) step_no += 1 print_scan_progress(step_no, total_steps, "HWID 基线", len(findings), scan_start_ts) findings.extend(scan_hwid_baseline()) step_no += 1 print_scan_progress(step_no, total_steps, "HWID 完整性", len(findings), scan_start_ts) findings.extend(scan_hwid_integrity()) # Keep the report small and stable before timeline/upload. original_finding_count = len(findings) findings = compact_findings_for_upload(findings) step_no += 1 print_scan_progress(step_no, total_steps, "生成时间线", len(findings), scan_start_ts) timeline = build_timeline_events(findings, scan_time)[:MAX_UPLOAD_TIMELINE_EVENTS] print_scan_progress(total_steps, total_steps, f"扫描完成,已压缩 {original_finding_count}->{len(findings)}", len(findings), scan_start_ts) return { "player_id": player_id, "scanner_version": SCANNER_VERSION, "rule_version": RULE_VERSION, "scan_time": scan_time, "system_info": {**get_system_info(), "scan_limits": {"max_total_seconds": MAX_TOTAL_SCAN_SECONDS, "max_recent_file_candidates": MAX_RECENT_FILE_CANDIDATES, "max_files_per_root": MAX_FILES_PER_ROOT, "max_yara_file_mb": MAX_YARA_FILE_BYTES // 1024 // 1024, "max_hash_file_mb": MAX_HASH_FILE_BYTES // 1024 // 1024, "elapsed_seconds": round(time.monotonic() - scan_start_ts, 2), "max_upload_findings": MAX_UPLOAD_FINDINGS_TOTAL, "max_upload_findings_per_category": MAX_UPLOAD_FINDINGS_PER_CATEGORY, "original_finding_count": original_finding_count}}, "findings": findings, "timeline": timeline } def load_upload_public_key(): if not UPLOAD_PUBLIC_KEY_B64: raise RuntimeError("UPLOAD_PUBLIC_KEY_B64 environment variable is not configured") try: public_pem = base64.b64decode(UPLOAD_PUBLIC_KEY_B64.encode("utf-8")) return serialization.load_pem_public_key(public_pem) except Exception as e: raise RuntimeError(f"Invalid upload public key: {e}") def encrypt_payload_for_upload(payload: dict) -> dict: public_key = load_upload_public_key() plaintext = json.dumps(payload, ensure_ascii=False).encode("utf-8") aes_key = os.urandom(32) nonce = os.urandom(12) ciphertext = AESGCM(aes_key).encrypt( nonce, plaintext, b"escort-anticheat-scan-v1", ) encrypted_key = public_key.encrypt( aes_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None, ), ) return { "encrypted": True, "alg": "RSA-OAEP-SHA256+A256GCM", "key": base64.b64encode(encrypted_key).decode("utf-8"), "nonce": base64.b64encode(nonce).decode("utf-8"), "ciphertext": base64.b64encode(ciphertext).decode("utf-8"), "scanner_version": SCANNER_VERSION, "rule_version": RULE_VERSION, } def upload_payload(payload: dict) -> dict: if not SCAN_API_KEY: raise RuntimeError("SCAN_API_KEY environment variable is not configured") encrypted_body = encrypt_payload_for_upload(payload) response = requests.post( SERVER_URL, json=encrypted_body, headers={ "X-API-Key": SCAN_API_KEY, "X-Upload-Encryption": "RSA-OAEP-SHA256+A256GCM", }, timeout=15, ) response.raise_for_status() return response.json() if __name__ == "__main__": print("Escort Anti-Cheat Scanner MVP") print("只读扫描,不删除文件,不注入游戏,不读取私人文件内容。") print(f"Scanner version: {SCANNER_VERSION}") print(f"Rule version: {RULE_VERSION}") print(f"Known threat names loaded: {len(KNOWN_THREAT_NAMES)}") print() server_status = get_server_status() if server_status: min_scanner_version = str(server_status.get("min_scanner_version", "") or "") server_rule_version = str(server_status.get("rule_version", "") or "") if min_scanner_version and compare_versions(SCANNER_VERSION, min_scanner_version) < 0: print(f"[UPDATE REQUIRED] Scanner {SCANNER_VERSION} is older than server minimum {min_scanner_version}.") print("请更新到最新扫描器后再继续。") elif min_scanner_version: print(f"Server minimum scanner version: {min_scanner_version}") if server_rule_version: print(f"Server rule version: {server_rule_version}") print() typed_player_id = input("请输入玩家ID(可直接回车跳过): ").strip() if typed_player_id: player_id = typed_player_id else: player_id = get_or_create_client_id() print(f"未输入玩家ID,已使用匿名ID:{player_id}") print("正在扫描,请稍等...") payload = build_payload(player_id) with open("last_scan_payload.json", "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) text_report_path = Path("last_scan_report.txt") print(f"扫描完成,发现风险项:{len(payload['findings'])}") print("正在上传到服务器(最多等待 15 秒)...") upload_result = None upload_error = "" try: result = upload_payload(payload) upload_result = result print("上传成功:") print(json.dumps(result, ensure_ascii=False, indent=2)) if result.get("update_required"): print(f"[SERVER NOTICE] {result.get('update_message', 'Please update the scanner.')}") except Exception as e: upload_error = str(e) print(f"上传失败:{e}") print("本地扫描结果已保存到 last_scan_payload.json") text_report = build_text_report(payload, upload_result=upload_result, upload_error=upload_error) text_report_path.write_text(text_report, encoding="utf-8") print(f"本地 TXT 报告已保存到 {text_report_path}")