"""lunapi.gpa — Python interface to Luna's GPA association-analysis commands.
Two underlying Luna commands are exposed:
--gpa-prep build a binary data matrix from tabular input files
--gpa run linear association models on that matrix
Both are invoked in-process via the lunapi0 C++ bindings (no subprocess).
"""
import io
import json
import os
import tempfile
from typing import Dict, List, Optional, Union
import pandas as pd
import lunapi.lunapi0 as _l0
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _engine():
return _l0.inaugurate()
def _parse_tsv(text: str) -> pd.DataFrame:
"""Parse a tab-delimited stdout block (manifest / dump) into a DataFrame."""
text = text.strip()
if not text:
return pd.DataFrame()
return pd.read_csv(io.StringIO(text), sep="\t", dtype=str)
def _rtables_to_dfs(raw: dict) -> Dict[str, pd.DataFrame]:
"""Convert rtables_return_t dict to {\"CMD: STRATA\" -> DataFrame}."""
out: Dict[str, pd.DataFrame] = {}
for cmd, strata_map in raw.items():
for stratum, (cols, data) in strata_map.items():
key = f"{cmd}: {stratum}"
df = pd.DataFrame(data).T
df.columns = cols
out[key] = df
return out
def _join(v: Union[str, List[str], None]) -> Optional[str]:
if v is None:
return None
return ",".join(v) if isinstance(v, (list, tuple)) else str(v)
def _ensure_lf(path: str) -> tuple[str, bool]:
"""Normalize CR/CRLF→LF and strip trailing tabs from each line.
Returns (path, created_temp). Creates a temp copy only if the file
actually needed changes; otherwise returns the original path unchanged.
"""
with open(path, "rb") as fh:
data = fh.read()
normalized = data.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
lines = normalized.split(b"\n")
if lines:
ncols = len(lines[0].split(b"\t"))
for idx in range(1, len(lines)):
if not lines[idx]:
continue
fields = lines[idx].split(b"\t")
# Strip trailing empty fields that exceed the header column count
while len(fields) > ncols and fields[-1] == b"":
fields.pop()
# Replace remaining empty fields with "." so the C++ parser accepts them
fields = [f if f else b"." for f in fields]
lines[idx] = b"\t".join(fields)
normalized = b"\n".join(lines)
if normalized == data:
return path, False
fd, tmp = tempfile.mkstemp(suffix=".tsv")
try:
with os.fdopen(fd, "wb") as fh:
fh.write(normalized)
except Exception:
os.unlink(tmp)
raise
return tmp, True
def _validate_tsv(path: str, display_name: Optional[str] = None) -> None:
"""Pre-validate a tab-delimited file before passing it to the C++ engine.
Raises ValueError identifying the file, row number, and column counts so
the user gets an actionable message instead of a cryptic C++ RuntimeError.
display_name overrides the filename shown in error messages (use the
original path when validating a normalized temp copy).
"""
label = os.path.basename(display_name or path)
with open(path, "rb") as fh:
raw = fh.read()
lines = raw.split(b"\n")
while lines and not lines[-1].strip():
lines.pop()
if not lines:
raise ValueError(f"{label!r} is empty")
expected = len(lines[0].split(b"\t"))
for i, line in enumerate(lines[1:], start=2):
if not line.strip():
continue
n = len(line.split(b"\t"))
if n != expected:
preview = repr(line[:120])
raise ValueError(
f"{label!r}: row {i} has {n} tab-delimited columns but the "
f"header has {expected}\n line content: {preview}"
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def gpa_prep(
dat_path: str,
specs: Optional[List[dict]] = None,
specs_path: Optional[str] = None,
) -> str:
"""Run ``--gpa-prep`` to build a binary GPA data matrix.
Exactly one of *specs* or *specs_path* should be supplied.
Parameters
----------
dat_path : str
Output path for the binary ``.dat`` file.
specs : list[dict] or None
Structured input-file specification list. Each dict may contain
``file``, ``group``, ``vars``, ``facs``, ``fixed``, ``mappings``.
The list is serialised to a temporary JSON file and passed as
``specs=<tmpfile>`` to ``--gpa-prep``.
specs_path : str or None
Path to an existing JSON specs file.
Returns
-------
str
Manifest text captured from stdout (tab-delimited, same columns as
:func:`gpa_manifest` output). Empty if no manifest was produced.
Raises
------
RuntimeError
Propagated from ``Helper::halt()`` inside the Luna C++ library.
"""
opts: Dict[str, str] = {"dat": dat_path}
tmp_path: Optional[str] = None
lf_tmps: List[str] = []
if specs is not None:
normalized: List[dict] = []
for entry in specs:
if "file" in entry:
orig = entry["file"]
lf_path, created = _ensure_lf(orig)
if created:
lf_tmps.append(lf_path)
entry = {**entry, "file": lf_path}
_validate_tsv(entry["file"], display_name=orig)
normalized.append(entry)
fd, tmp_path = tempfile.mkstemp(suffix=".json")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump({"inputs": normalized}, fh, indent=2)
except Exception:
os.unlink(tmp_path)
raise
opts["specs"] = tmp_path
elif specs_path is not None:
opts["specs"] = specs_path
try:
_, stdout = _engine().run_gpa(opts, True)
except RuntimeError as exc:
# Attach the specs file list so the error is traceable even when the
# C++ message doesn't include a filename.
files = [e["file"] for e in (specs or []) if "file" in e]
context = ", ".join(os.path.basename(f) for f in files)
raise RuntimeError(
f"{exc}" + (f" (files: {context})" if context else "")
) from exc
finally:
for p in lf_tmps:
try:
os.unlink(p)
except OSError:
pass
if tmp_path is not None:
try:
os.unlink(tmp_path)
except OSError:
pass
return stdout
[docs]
def gpa_manifest(dat_path: str) -> pd.DataFrame:
"""Return the variable manifest for a ``.dat`` file as a DataFrame.
Runs ``--gpa manifest`` and parses the tab-delimited stdout.
Columns always include ``NV``, ``VAR``, ``NI``, ``GRP``, ``BASE``, plus
any factor columns present in the dataset (e.g. ``CH``, ``F``, ``SS``).
"""
opts: Dict[str, str] = {"dat": dat_path, "manifest": ""}
_, stdout = _engine().run_gpa(opts, False)
return _parse_tsv(stdout)
[docs]
def gpa_run(
dat_path: str,
X: Union[str, List[str], None] = None,
Y: Union[str, List[str], None] = None,
Z: Union[str, List[str], None] = None,
Xg: Union[str, List[str], None] = None,
Yg: Union[str, List[str], None] = None,
Zg: Union[str, List[str], None] = None,
mode: str = "assoc",
nreps: int = 0,
fdr: bool = True,
bonf: bool = False,
holm: bool = False,
fdr_by: bool = False,
adj_all_x: bool = False,
x_factors: bool = False,
p: Optional[float] = None,
padj: Optional[float] = None,
vars: Optional[str] = None,
xvars: Optional[str] = None,
grps: Optional[str] = None,
xgrps: Optional[str] = None,
facs: Optional[str] = None,
xfacs: Optional[str] = None,
faclvls: Optional[str] = None,
xfaclvls: Optional[str] = None,
n_prop: Optional[float] = None,
n_req: Optional[int] = None,
knn: Optional[int] = None,
winsor: Optional[float] = None,
subset: Optional[str] = None,
inc_ids: Optional[str] = None,
ex_ids: Optional[str] = None,
verbose: bool = False,
) -> Dict[str, pd.DataFrame]:
"""Run GPA association analysis against a pre-built ``.dat`` file.
Parameters
----------
dat_path : str
Binary data file created by :func:`gpa_prep`.
X, Y, Z : str | list[str] | None
Predictor, outcome, and covariate variable names.
Lists are joined with commas and passed as a single ``X=a,b,c`` argument.
Xg, Yg, Zg : str | list[str] | None
Group-based variable selection (predictor, outcome, covariate groups).
mode : "assoc" | "stats" | "comp"
* ``"assoc"`` — linear association models (default)
* ``"stats"`` — descriptive statistics only
* ``"comp"`` — comparison-style enrichment tests
nreps : int
Permutation replicates (0 = asymptotic p-values only).
fdr : bool
Apply FDR(B&H) correction (default True; pass ``fdr=False`` to disable).
bonf, holm, fdr_by : bool
Additional multiple-testing corrections to add to the output.
adj_all_x : bool
Adjust p-values across all X variables jointly instead of per-X.
x_factors : bool
Append X-variable manifest columns (XBASE, XGROUP, XSTRAT) to output.
p, padj : float | None
Only return rows below this nominal or adjusted significance threshold.
vars, xvars : str | None
Explicit variable include / exclude lists (comma-separated).
grps, xgrps : str | None
Group include / exclude lists.
facs, xfacs : str | None
Factor include / exclude lists.
faclvls, xfaclvls : str | None
Factor-level include / exclude filters (``CH/FZ|CZ`` syntax).
n_prop : float | None
Drop columns with more than this proportion of missing values.
n_req : int | None
Drop columns with fewer than this many non-missing values.
knn : int | None
k for kNN imputation of missing values.
winsor : float | None
Winsorisation proportion applied before modelling.
subset : str | None
Include only subjects positive for these variables (``+VAR`` syntax).
inc_ids, ex_ids : str | None
Comma-separated subject ID include / exclude lists.
verbose : bool
Returns
-------
dict[str, pd.DataFrame]
Keys follow ``"GPA: STRATA"`` convention, e.g.:
``"GPA: X,Y"`` — main association results
``"GPA: VAR"`` — descriptive statistics (mode="stats")
``"GPA: X"`` — comparison test results (mode="comp")
"""
opts: Dict[str, str] = {"dat": dat_path}
for key, val in [("X", X), ("Y", Y), ("Z", Z),
("Xg", Xg), ("Yg", Yg), ("Zg", Zg)]:
v = _join(val)
if v is not None:
opts[key] = v
if nreps: opts["nreps"] = str(nreps)
if not fdr: opts["fdr"] = "F"
if bonf: opts["bonf"] = ""
if holm: opts["holm"] = ""
if fdr_by: opts["fdr-by"] = ""
if adj_all_x: opts["adj-all-X"] = ""
if x_factors: opts["X-factors"] = ""
if mode == "stats": opts["stats"] = ""
elif mode == "comp": opts["comp"] = ""
if p is not None: opts["p"] = str(p)
if padj is not None: opts["padj"] = str(padj)
for k, v in [
("vars", vars), ("xvars", xvars),
("grps", grps), ("xgrps", xgrps),
("facs", facs), ("xfacs", xfacs),
("faclvls", faclvls), ("xfaclvls", xfaclvls),
]:
if v is not None:
opts[k] = v
if n_prop is not None: opts["n-prop"] = str(n_prop)
if n_req is not None: opts["n-req"] = str(n_req)
if knn is not None: opts["knn"] = str(knn)
if winsor is not None: opts["winsor"] = str(winsor)
if subset: opts["subset"] = subset
if inc_ids: opts["inc-ids"] = inc_ids
if ex_ids: opts["ex-ids"] = ex_ids
if verbose: opts["verbose"] = ""
raw, _ = _engine().run_gpa(opts, False)
return _rtables_to_dfs(raw)
[docs]
def gpa_dump(dat_path: str, **filter_opts) -> pd.DataFrame:
"""Dump the raw data matrix from a ``.dat`` file as a DataFrame.
Any keyword argument is forwarded as a Luna parameter string, e.g.
``X="male"``, ``lvars="PSD_CH_CZ_F_13.5"``.
"""
opts: Dict[str, str] = {"dat": dat_path, "dump": ""}
opts.update({k: str(v) for k, v in filter_opts.items()})
_, stdout = _engine().run_gpa(opts, False)
return _parse_tsv(stdout)
[docs]
def gpa_get_xy_partial(xvar: str, yvar: str, zvars: List[str]):
"""Return (ids, x_resid, y_resid) after regressing *zvars* out of both axes.
Uses the same Rz = I - Z(Z'Z)^{-1}Z' projection as the GPA linear model,
so the residual scatter exactly matches what went into the regression.
Falls back to :func:`gpa_get_xy` when *zvars* is empty.
Raises ``RuntimeError`` if no matrix is cached (call :func:`gpa_run` first).
"""
eng = _engine()
if not eng.gpa_has_cache():
raise RuntimeError(
"No GPA matrix cached — call gpa_run() before gpa_get_xy_partial().")
return eng.gpa_get_xy_partial(xvar, yvar, list(zvars))
[docs]
def gpa_get_xy(xvar: str, yvar: str):
"""Return (ids, x_vals, y_vals) from the cached GPA analysis matrix.
Filters to rows where both *xvar* and *yvar* are non-NaN — the exact
same subjects used in the most recent :func:`gpa_run` call.
Raises ``RuntimeError`` if no matrix is cached (call :func:`gpa_run` first).
"""
eng = _engine()
if not eng.gpa_has_cache():
raise RuntimeError(
"No GPA matrix cached — call gpa_run() before gpa_get_xy().")
return eng.gpa_get_xy(xvar, yvar)
[docs]
def gpa_clear_cache():
"""Release the cached GPA analysis matrix to free memory."""
_engine().gpa_clear_cache()
__all__ = [
"gpa_prep",
"gpa_manifest",
"gpa_run",
"gpa_dump",
"gpa_get_xy",
"gpa_get_xy_partial",
"gpa_clear_cache",
]