Files
llvm-project/lldb/test/API/symstore/TestSymStore.py
Stefan Gränitz 9ec6788421 [lldb] Add HTTPS tests for SymbolLocatorSymStore (#192274)
Using self-signed certificates is the only way forward for testing 
security features on the HTTPS path. As we don't want to allow
any arbitrary certificate, we add a new property that pins a
fingerprint and any self-signed certificate is only accepted if it
matches this fingerprint.
2026-04-22 10:11:10 +02:00

486 lines
18 KiB
Python

import datetime
import http.server
import ipaddress
import os
import shutil
import socketserver
import ssl
import sys
import threading
from functools import partial
import lldb
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
"""
Test debug symbol acquisition from a local SymStore repository. This can work
cross-platform and for arbitrary debug info formats. We only support PDB
currently.
"""
class MockedSymStore:
"""
Context Manager to populate a file structure equivalent to SymStore.exe
"""
def __init__(self, test, exe, pdb):
self._test = test
self._exe = exe
self._pdb = pdb
self.cache_dir = test.getBuildArtifact("cache")
def get_key_pdb(self, exe):
"""
Module UUID: 12345678-1234-5678-9ABC-DEF012345678-00000001
To SymStore key: 12345678123456789ABCDEF0123456781
"""
spec = lldb.SBModuleSpec()
spec.SetFileSpec(lldb.SBFileSpec(self._test.getBuildArtifact(exe)))
module = lldb.SBModule(spec)
raw = module.GetUUIDString().replace("-", "").upper()
if len(raw) != 40:
raise RuntimeError("Unexpected number of bytes in embedded UUID")
guid_hex = raw[:32]
age = int(raw[32:], 16)
return guid_hex + str(age)
def __enter__(self):
"""
Mock local symstore directory tree, move PDB there and report path.
"""
key = None
if self._test.getDebugInfo() == "pdb":
key = self.get_key_pdb(self._exe)
self._test.assertIsNotNone(key)
symstore_dir = self._test.getBuildArtifact("symstore")
pdb_dir = os.path.join(symstore_dir, self._pdb, key)
os.makedirs(pdb_dir, exist_ok=True)
shutil.move(
self._test.getBuildArtifact(self._pdb),
os.path.join(pdb_dir, self._pdb),
)
# We always configure a valid fallback cache, because we might not have
# permission to write outside the test directory.
self._test.runCmd(
f"settings set plugin.symbol-locator.symstore.cache {self.cache_dir}"
)
return symstore_dir
def __exit__(self, *exc_info):
"""
Reset settings
"""
self._test.runCmd("settings clear plugin.symbol-locator.symstore")
class NtSymbolPath:
"""
Context Manager to temporarily set the _NT_SYMBOL_PATH environment variable.
"""
def __init__(self, value):
self._value = value
self._saved = None
def __enter__(self):
self._saved = os.environ.get("_NT_SYMBOL_PATH")
os.environ["_NT_SYMBOL_PATH"] = self._value
def __exit__(self, *exc_info):
if self._saved is None:
os.environ.pop("_NT_SYMBOL_PATH", None)
else:
os.environ["_NT_SYMBOL_PATH"] = self._saved
class HTTPServer:
"""
Context Manager to serve a local directory tree via HTTP.
"""
def __init__(self, dir, handler=None):
address = ("localhost", 0) # auto-select free port
if handler is None:
handler = partial(http.server.SimpleHTTPRequestHandler, directory=dir)
self._server = socketserver.ThreadingTCPServer(address, handler)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
def __enter__(self):
self._thread.start()
host, port = self._server.server_address
return f"http://{host}:{port}"
def __exit__(self, *exc_info):
if self._server:
self._server.shutdown()
self._server.server_close()
if self._thread:
self._thread.join()
class HTTPSServer:
"""
Context Manager to serve a local directory tree via HTTPS.
"""
class ErrorAwareTCPServer(socketserver.ThreadingTCPServer):
"""TCP layer that will suppress errors of the given type."""
def __init__(self, address, handler, err):
self.err = err
super().__init__(address, handler)
def handle_error(self, request, client_address):
if isinstance(sys.exc_info()[1], self.err):
return
super().handle_error(request, client_address)
def __init__(self, dir=None, handler=None, cert=None):
if handler is None:
handler = partial(http.server.SimpleHTTPRequestHandler, directory=dir)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(cert.file, cert.key_file)
address = ("localhost", 0) # auto-select free port
self._server = self.ErrorAwareTCPServer(address, handler, ssl.SSLError)
self._server.socket = ctx.wrap_socket(self._server.socket, server_side=True)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
def __enter__(self):
self._thread.start()
host, port = self._server.server_address
return f"https://{host}:{port}"
def __exit__(self, *exc_info):
if self._server:
self._server.shutdown()
self._server.server_close()
if self._thread:
self._thread.join()
class SelfSignedCert:
"""
Self-signed cert/key pair for localhost.
"""
def __init__(self, tmpdir):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=1))
.add_extension(
x509.SubjectAlternativeName(
[
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]
),
critical=False,
)
.sign(key, hashes.SHA256())
)
os.makedirs(tmpdir, exist_ok=False)
self.key_file = os.path.join(tmpdir, "server.key")
self.file = os.path.join(tmpdir, "server.crt")
with open(self.key_file, "wb") as f:
f.write(
key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
)
with open(self.file, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
self.fingerprint = cert.fingerprint(hashes.SHA256()).hex()
class RedirectHandler(http.server.BaseHTTPRequestHandler):
base_url = None
def do_GET(self):
self.send_response(301)
self.send_header("Location", self.base_url + self.path)
self.end_headers()
def log_message(self, *args):
pass # suppress request logs
class RequestCounter(http.server.SimpleHTTPRequestHandler):
requests = 0 # class-level so all instances share one counter
def __init__(self, *args, directory=None, **kwargs):
super().__init__(*args, directory=directory, **kwargs)
def do_GET(self):
RequestCounter.requests += 1
super().do_GET()
class SymStoreTests(TestBase):
TEST_WITH_PDB_DEBUG_INFO = True
def build_inferior(self):
if self.getDebugInfo() != "pdb":
self.skipTest("Non-PDB debug info variants not yet supported")
self.build()
exe_file = "a.out"
sym_file = "a.pdb"
self.assertTrue(os.path.isfile(self.getBuildArtifact(exe_file)))
self.assertTrue(os.path.isfile(self.getBuildArtifact(sym_file)))
return exe_file, sym_file
def assertFiles(self, dir, expected):
actual = sum(len(f) for _, _, f in os.walk(dir))
self.assertEqual(actual, expected)
def try_breakpoint(self, exe, should_have_loc, ext_lookup=True):
enable = "true" if ext_lookup else "false"
self.runCmd(f"settings set symbols.enable-external-lookup {enable}")
target = self.dbg.CreateTarget(self.getBuildArtifact(exe))
self.assertTrue(target and target.IsValid(), "Target is valid")
bp = target.BreakpointCreateByName("func")
self.assertTrue(bp and bp.IsValid(), "Breakpoint is valid")
self.assertEqual(bp.GetNumLocations(), 1 if should_have_loc else 0)
self.dbg.DeleteTarget(target)
def test_no_symstore_url(self):
"""
Check that breakpoint doesn't resolve without SymStore.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym):
self.try_breakpoint(exe, should_have_loc=False)
def test_external_lookup_off(self):
"""
Check that breakpoint doesn't resolve with external lookup disabled.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as dir:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {dir}")
self.try_breakpoint(exe, ext_lookup=False, should_have_loc=False)
def test_local_dir(self):
"""
Check that breakpoint resolves with local SymStore.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as dir:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {dir}")
self.try_breakpoint(exe, should_have_loc=True)
def test_http_not_found(self):
"""
Check that we don't issue a warning for a 404 response from a symbol server.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
os.makedirs(f"{symstore_dir}_empty", exist_ok=False)
with HTTPServer(f"{symstore_dir}_empty") as url:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {url}")
warnings = ""
with open(self.getBuildArtifact("stderr.txt"), "w+b") as err_file:
self.dbg.SetErrorFileHandle(err_file, False)
self.try_breakpoint(exe, should_have_loc=False)
self.dbg.SetErrorFileHandle(sys.stderr, False)
err_file.seek(0)
warnings = err_file.read().decode()
self.assertEqual(warnings, "")
def test_http(self):
"""
Check that breakpoint resolves with remote SymStore.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as dir:
with HTTPServer(dir) as url:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {url}")
self.try_breakpoint(exe, should_have_loc=True)
def test_sympath_local_dir(self):
"""
Check that breakpoint resolves with plain directory in _NT_SYMBOL_PATH.
The PDB is not copied to the configured cache.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with NtSymbolPath(dir):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
def test_sympath_local_srv(self):
"""
Check that breakpoint resolves with local directory in server notation
in _NT_SYMBOL_PATH. The PDB is not copied to the configured cache.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with NtSymbolPath(f"srv*{dir}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
def test_sympath_srv(self):
"""
Check that breakpoint resolves with an HTTP symbol server in _NT_SYMBOL_PATH
using the srv* notation. The PDB is stored in the configured cache.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
self.assertFiles(symstore.cache_dir, 0)
with HTTPServer(dir) as url:
with NtSymbolPath(f"srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
key = symstore.get_key_pdb(exe)
cache_file = os.path.join(symstore.cache_dir, sym, key, sym)
self.assertTrue(os.path.isfile(cache_file))
self.assertFiles(symstore.cache_dir, 1)
def test_sympath_cache_explicit(self):
"""
Check PDB storage with explicit cache in _NT_SYMBOL_PATH.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
explicit_cache = self.getBuildArtifact("explicit_cache")
with NtSymbolPath(f"srv*{explicit_cache}*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
self.assertFiles(explicit_cache, 1)
def test_sympath_cache_implicit(self):
"""
Check PDB storage with implicit cache in _NT_SYMBOL_PATH.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
implicit_cache = self.getBuildArtifact("implicit_cache")
with NtSymbolPath(f"cache*{implicit_cache};srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 0)
self.assertFiles(implicit_cache, 1)
def test_sympath_cache_invalid(self):
"""
Check that PDB is stored in configured default cache
if path in _NT_SYMBOL_PATH is invalid.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
invalid_cache = ":\\<invalid_path>"
self.assertFiles(symstore.cache_dir, 0)
with NtSymbolPath(f"cache*{invalid_cache};srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 1)
def test_sympath_cache_empty(self):
"""
Check that PDB is stored in configured default cache
if path in _NT_SYMBOL_PATH is empty.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
with symstore as dir:
with HTTPServer(dir) as url:
self.assertFiles(symstore.cache_dir, 0)
with NtSymbolPath(f"cache*;srv*{url}"):
self.try_breakpoint(exe, should_have_loc=True)
self.assertFiles(symstore.cache_dir, 1)
def test_lookup_order(self):
"""
Check that _NT_SYMBOL_PATH takes precedence over symstore.urls setting.
"""
exe, sym = self.build_inferior()
symstore = MockedSymStore(self, exe, sym)
RequestCounter.requests = 0
with symstore as dir:
with HTTPServer(dir, RequestCounter) as url:
self.runCmd(f"settings set plugin.symbol-locator.symstore.urls {url}")
with NtSymbolPath(dir):
self.try_breakpoint(exe, should_have_loc=True)
self.assertEqual(RequestCounter.requests, 0)
@skipUnlessPackageAvailable("cryptography")
def test_https(self):
"""
Check that breakpoint resolves with remote SymStore via HTTPS.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
cert = SelfSignedCert(self.getBuildArtifact("cert"))
with HTTPSServer(dir=symstore_dir, cert=cert) as https_url:
# We accept only the self-signed certificate with this fingerprint
self.runCmd(
f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {cert.fingerprint}"
)
self.runCmd(
f"settings set plugin.symbol-locator.symstore.urls {https_url}"
)
self.try_breakpoint(exe, should_have_loc=True)
@skipUnlessPackageAvailable("cryptography")
def test_https_reject_selfsigned_cert(self):
"""
Check that LLDB rejects an HTTPS server with an untrusted self-signed cert.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
cert = SelfSignedCert(self.getBuildArtifact("cert"))
with HTTPSServer(dir=symstore_dir, cert=cert) as https_url:
# No fingerprint set
self.runCmd(
f"settings set plugin.symbol-locator.symstore.urls {https_url}"
)
self.try_breakpoint(exe, should_have_loc=False)
# Incorrect fingerprint set
bogus = "DEADBEEFCAFEBABE"
self.runCmd(
f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {bogus}{bogus}{bogus}{bogus}"
)
self.try_breakpoint(exe, should_have_loc=False)
@skipUnlessPackageAvailable("cryptography")
def test_https_reject_redirect_http(self):
"""
Check that LLDB does not retrieve symbols from servers that redirect with security downgrades.
"""
exe, sym = self.build_inferior()
with MockedSymStore(self, exe, sym) as symstore_dir:
with HTTPServer(symstore_dir) as http_url:
RedirectHandler.base_url = http_url
cert = SelfSignedCert(self.getBuildArtifact("cert"))
with HTTPSServer(handler=RedirectHandler, cert=cert) as https_url:
self.runCmd(
f"settings set plugin.symbol-locator.symstore.urls {https_url}"
)
self.runCmd(
f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {cert.fingerprint}"
)
self.try_breakpoint(exe, should_have_loc=False)