Skip to content

Reference

Start a project and build the pipeline

Initialize rixpress project files in project_path. This will generate two R scripts: gen-env.R, which when executed using the rix R package will generate a default.nix, which defines the pipeline's execution environment, and gen-pipeline.R, which is where the pipeline is defined. These R scripts are the same as those generated by rixpress, the R version of this package.

Parameters:

Name Type Description Default
project_path str

path to the project directory (defaults to ".")

'.'
skip_prompt bool

if True, skip user confirmation prompts (defaults to False)

False

Returns:

Type Description
bool

True if initialization completed (or was skipped due to non-interactive but files present),

bool

False if cancelled by the user.

Source code in src/ryxpress/init_proj.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def rxp_init(project_path: str = ".", skip_prompt: bool = False) -> bool:
    """
    Initialize rixpress project files in project_path. This will generate
    two R scripts: `gen-env.R`, which when executed using the rix R package
    will generate a `default.nix`, which defines the pipeline's execution
    environment, and `gen-pipeline.R`, which is where the pipeline is defined.
    These R scripts are the same as those generated by rixpress, the R version
    of this package.

    Args:
        project_path: path to the project directory (defaults to ".")
        skip_prompt: if True, skip user confirmation prompts (defaults to False)

    Returns:
        True if initialization completed (or was skipped due to non-interactive but files present),
        False if cancelled by the user.
    """
    # Initial confirmation before any action
    if not _confirm(f"Initialize project at '{project_path}'?", skip_prompt=skip_prompt):
        print("Operation cancelled by user. No files or directories were created.")
        return False

    proj = Path(project_path)
    # Ensure project_path exists, create it if it doesn't
    if not proj.exists():
        proj.mkdir(parents=True, exist_ok=True)

    env_file = proj / "gen-env.R"
    pipeline_file = proj / "gen-pipeline.R"

    gen_env_lines = [
        "# This script defines the default environment the pipeline runs in.",
        "# Add the required packages to execute the code necessary for each derivation.",
        "# If you want to create visual representations of the pipeline, consider adding",
        "# `{visNetwork}` and `{ggdag}` to the list of R packages.",
        "library(rix)",
        "",
        "# Define execution environment",
        "rix(",
        "  date = NULL,",
        "  r_pkgs = NULL,",
        "  py_conf = NULL,",
        "  git_pkgs = list(",
        "    \"package_name\" = \"rixpress\",",
        "    \"repo_url\" = \"https://github.com/b-rodrigues/rixpress\",",
        "    \"commit\" = \"HEAD\",",
        "  ),",
        "  ide = \"none\",",
        "  project_path = \".\"",
        ")",
    ]

    gen_pipeline_lines = [
        "library(rixpress)",
        "library(igraph)",
        "",
        "list(",
        "  rxp_r_file(",
        "    name = NULL,",
        "    path = NULL,",
        "    read_function = \"lambda x: polars.read_csv(x, separator='|')\"",
        "  ),",
        "  rxp_r(",
        "    name = NULL,",
        "    expr = NULL",
        "  )",
        ") |>",
        "  rxp_populate(build = FALSE)",
    ]

    # Write files (overwrite if present)
    env_file.write_text("\n".join(gen_env_lines) + "\n", encoding="utf-8")
    print(f"File {env_file} has been written.")
    pipeline_file.write_text("\n".join(gen_pipeline_lines) + "\n", encoding="utf-8")
    print(f"File {pipeline_file} has been written.")

    # Skip Git initialization when on non-interactive sessions (CRAN/CI/test equivalent)
    if not _is_interactive():
        print(
            "Skipping Git initialization (non-interactive session, CRAN, CI, or test environment detected)."
        )
        return True

    # Ask whether to initialise git
    if _confirm("Would you like to initialise a Git repository here?", skip_prompt=skip_prompt):
        git_bin = shutil.which("git")
        if git_bin is None:
            print(
                "Git not found on PATH. Please install git and run 'git init' manually, "
                "or initialise the repository using your preferred tool."
            )
        else:
            try:
                subprocess.run([git_bin, "init"], cwd=str(proj), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                print("Git repository initialised.")
            except subprocess.CalledProcessError as e:
                print("Failed to initialise git repository. You can run 'git init' manually.")
    else:
        print("Skipping Git initialization.")

    return True

Run the rixpress R pipeline (rxp_populate + rxp_make) by sourcing an R script.

Parameters:

Name Type Description Default
script Union[str, Path]

Path or name of the R script to run (defaults to "gen-pipeline.R"). If a relative path is given and doesn't exist in the working directory, this function will attempt to locate the script on PATH.

'gen-pipeline.R'
verbose int

integer passed to rixpress::rxp_make(verbose = ...)

0
max_jobs int

integer passed to rixpress::rxp_make(max_jobs = ...)

1
cores int

integer passed to rixpress::rxp_make(cores = ...)

1
rscript_cmd str

the Rscript binary to use (defaults to "Rscript")

'Rscript'
timeout Optional[int]

optional timeout in seconds for the subprocess.run call

None
cwd Optional[Union[str, Path]]

optional working directory to run Rscript in. If None, the directory containing the provided script will be used. This is important because pipeline.nix and related files are often imported with relative paths (e.g. ./default.nix), so Rscript needs to be run where those files are reachable.

None

Returns:

Type Description
RRunResult

An RRunResult containing returncode, stdout, stderr.

Source code in src/ryxpress/r_runner.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def rxp_make(
    script: Union[str, Path] = "gen-pipeline.R",
    verbose: int = 0,
    max_jobs: int = 1,
    cores: int = 1,
    rscript_cmd: str = "Rscript",
    timeout: Optional[int] = None,
    cwd: Optional[Union[str, Path]] = None,
) -> RRunResult:
    """
    Run the rixpress R pipeline (rxp_populate + rxp_make) by sourcing an R script.

    Args:
        script: Path or name of the R script to run (defaults to "gen-pipeline.R").
            If a relative path is given and doesn't exist in the working directory,
            this function will attempt to locate the script on PATH.
        verbose: integer passed to rixpress::rxp_make(verbose = ...)
        max_jobs: integer passed to rixpress::rxp_make(max_jobs = ...)
        cores: integer passed to rixpress::rxp_make(cores = ...)
        rscript_cmd: the Rscript binary to use (defaults to "Rscript")
        timeout: optional timeout in seconds for the subprocess.run call
        cwd: optional working directory to run Rscript in. If None, the directory
            containing the provided script will be used. This is important because
            pipeline.nix and related files are often imported with relative paths
            (e.g. ./default.nix), so Rscript needs to be run where those files are reachable.

    Returns:
        An RRunResult containing returncode, stdout, stderr.
    """
    # Validate integers
    for name, val in (("verbose", verbose), ("max_jobs", max_jobs), ("cores", cores)):
        if not isinstance(val, int):
            raise TypeError(f"{name} must be an int, got {type(val).__name__}")
        if val < 0:
            raise ValueError(f"{name} must be >= 0")

    # Resolve script path: prefer given path if it exists; otherwise try to find on PATH
    script_path = Path(script)
    if not script_path.is_file():
        # If a bare name was provided, attempt to find it on PATH
        found = shutil.which(str(script))
        if found:
            script_path = Path(found)
        else:
            raise FileNotFoundError(
                f"R script '{script}' not found in working directory and not on PATH"
            )
    else:
        script_path = script_path.resolve()

    # Determine working directory for the R process:
    if cwd is not None:
        run_cwd = Path(cwd).resolve()
        if not run_cwd.is_dir():
            raise FileNotFoundError(f"Requested cwd '{cwd}' does not exist or is not a directory")
    else:
        # default to the script's parent directory so relative imports (./default.nix) work
        run_cwd = script_path.parent

    # Verify Rscript binary exists
    if shutil.which(rscript_cmd) is None:
        raise FileNotFoundError(
            f"Rscript binary '{rscript_cmd}' not found in PATH. Ensure R is installed or adjust rscript_cmd."
        )

    # Prepare wrapper R script that:
    #  - loads rixpress,
    #  - sources the user's script,
    #  - if the sourced evaluation returns a list, calls rxp_populate on it,
    #  - then calls rixpress::rxp_make(...) with the provided args.
    wrapper = f"""
suppressPackageStartupMessages(library(rixpress))

script_path <- "{script_path.as_posix()}"

if (!file.exists(script_path)) {{
  stop("Script not found: ", script_path)
}}

result_value <- NULL

res <- tryCatch({{
  # Source & evaluate the user's script and capture the returned value (if any)
  result_value <- eval(parse(script_path))
  # If the script returned a list (a pipeline), run rxp_populate on it
  if (!is.null(result_value) && is.list(result_value)) {{
    pipeline <- result_value
    pipeline <- rixpress::rxp_populate(pipeline)
  }}
  # Finally, run rxp_make with the given integer parameters
  rixpress::rxp_make(
    verbose = {int(verbose)},
    max_jobs = {int(max_jobs)},
    cores = {int(cores)}
  )
}}, error = function(e) {{
  # Print a clear error message and exit with non-zero status
  message("rixpress-python-runner-error: ", conditionMessage(e))
  quit(status = 1)
}})

# If we reach here, exit with success
quit(status = 0)
"""

    # Create temporary file for wrapper
    with tempfile.NamedTemporaryFile(mode="w", suffix=".R", delete=False) as tf:
        tf.write(wrapper)
        wrapper_path = Path(tf.name)

    try:
        # Run Rscript on the wrapper file using the desired working directory
        proc = subprocess.run(
            [rscript_cmd, str(wrapper_path)],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            timeout=timeout,
            cwd=str(run_cwd),
        )
        return RRunResult(returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)
    finally:
        try:
            wrapper_path.unlink()
        except Exception:
            pass

Inspect the pipeline

Inspect the build result of a pipeline.

Parameters:

Name Type Description Default
project_path Union[str, Path]

path to project root (defaults to ".")

'.'
which_log Optional[str]

optional regex to select a specific log file. If None, the most recent log is used.

None
pretty bool

if True, pretty-prints the result (and returns nothing).

False
as_json bool

if True, pretty prints using json.dumps(indent=2) instead of pprint.

False

Returns:

Type Description
Optional[List[Dict[str, Any]]]

A list of dict rows parsed from the selected JSON log file (unless pretty=True).

Raises:

Type Description
FileNotFoundError

if no logs are found or _rixpress missing.

ValueError

if which_log is provided but no matching filename is found.

RuntimeError

if the chosen log cannot be read/parsed.

Source code in src/ryxpress/inspect_logs.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def rxp_inspect(
    project_path: Union[str, Path] = ".",
    which_log: Optional[str] = None,
    pretty: bool = False,
    as_json: bool = False,
) -> Optional[List[Dict[str, Any]]]:
    """
    Inspect the build result of a pipeline.

    Args:
        project_path: path to project root (defaults to ".")
        which_log: optional regex to select a specific log file. If None, the most recent log is used.
        pretty: if True, pretty-prints the result (and returns nothing).
        as_json: if True, pretty prints using json.dumps(indent=2) instead of pprint.

    Returns:
        A list of dict rows parsed from the selected JSON log file (unless pretty=True).

    Raises:
        FileNotFoundError: if no logs are found or _rixpress missing.
        ValueError: if which_log is provided but no matching filename is found.
        RuntimeError: if the chosen log cannot be read/parsed.
    """
    proj = Path(project_path)
    rixpress_dir = proj / "_rixpress"

    logs = rxp_list_logs(proj)

    chosen_path: Optional[Path] = None

    if which_log is None:
        chosen_path = rixpress_dir / logs[0]["filename"]
    else:
        import re, logging
        logger = logging.getLogger(__name__)
        pattern = re.compile(which_log)
        for entry in logs:
            if pattern.search(entry["filename"]):
                chosen_path = rixpress_dir / entry["filename"]
                logger.info("Using log file: %s", entry["filename"])
                break
        if chosen_path is None:
            raise ValueError(f"No build logs found matching the pattern: {which_log}")

    try:
        with chosen_path.open("r", encoding="utf-8") as fh:
            data = json.load(fh)
    except Exception as e:
        raise RuntimeError(f"Failed to read log file {chosen_path}: {e}")

    rows = _coerce_json_to_rows(data)

    if pretty:
        if as_json:
            print(json.dumps(rows, indent=2, ensure_ascii=False))
        else:
            pprint(rows)
        return  # This ensures REPL shows nothing after print, return value is None

    return rows

List build logs in the project's _rixpress directory.

Parameters:

Name Type Description Default
project_path Union[str, Path]

path to project root (defaults to ".")

'.'
pretty bool

if True, pretty-prints the result (and returns nothing).

False
as_json bool

if True, pretty prints using json.dumps(indent=2) instead of pprint.

False

Returns:

Type Description
Optional[List[Dict[str, Union[str, float]]]]

A list of dictionaries, each with keys:

Optional[List[Dict[str, Union[str, float]]]]
  • filename: basename of log file (str)
Optional[List[Dict[str, Union[str, float]]]]
  • modification_time: ISO date string YYYY-MM-DD (str)
Optional[List[Dict[str, Union[str, float]]]]
  • size_kb: file size in kilobytes rounded to 2 decimals (float)
Optional[List[Dict[str, Union[str, float]]]]

(unless pretty=True, in which case nothing is returned)

Raises:

Type Description
FileNotFoundError

if the _rixpress directory does not exist or if no logs are found.

Source code in src/ryxpress/inspect_logs.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def rxp_list_logs(
    project_path: Union[str, Path] = ".",
    pretty: bool = False,
    as_json: bool = False,
) -> Optional[List[Dict[str, Union[str, float]]]]:
    """
    List build logs in the project's _rixpress directory.

    Args:
        project_path: path to project root (defaults to ".")
        pretty: if True, pretty-prints the result (and returns nothing).
        as_json: if True, pretty prints using json.dumps(indent=2) instead of pprint.

    Returns:
        A list of dictionaries, each with keys:
        - filename: basename of log file (str)
        - modification_time: ISO date string YYYY-MM-DD (str)
        - size_kb: file size in kilobytes rounded to 2 decimals (float)
        (unless pretty=True, in which case nothing is returned)

    Raises:
        FileNotFoundError: if the _rixpress directory does not exist or if no logs are found.
    """
    proj = Path(project_path)
    rixpress_dir = proj / "_rixpress"

    if not rixpress_dir.exists() or not rixpress_dir.is_dir():
        raise FileNotFoundError("_rixpress directory not found. Did you initialise the project?")

    pattern = re.compile(r"^build_log.*\.json$")
    log_files = [p for p in rixpress_dir.iterdir() if p.is_file() and pattern.search(p.name)]

    # Sort by modification time (most recent first)
    log_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)

    if not log_files:
        raise FileNotFoundError(f"No build logs found in {rixpress_dir}")

    logs: List[Dict[str, Union[str, float]]] = []
    for p in log_files:
        st = p.stat()
        logs.append(
            {
                "filename": p.name,
                "modification_time": _iso_date_from_epoch(st.st_mtime),
                "size_kb": round(st.st_size / 1024.0, 2),
            }
        )

    if pretty:
        if as_json:
            print(json.dumps(logs, indent=2, ensure_ascii=False))
        else:
            pprint(logs)
        return

    return logs

Recover artifacts

Copy derivations from the Nix store to ./pipeline-output.

Parameters:

Name Type Description Default
derivation_name Optional[str]

name of the derivation to copy (string). If None, uses the special derivation name "all-derivations" (mirrors R).

None
dir_mode str

octal permission string applied to copied directories (default "0755").

'0755'
file_mode str

octal permission string applied to copied files (default "0644").

'0644'
project_path Union[str, Path]

project root where _rixpress lives (defaults to ".").

'.'

Returns:

Type Description
None

None. Prints a success message upon completion.

Raises:

Type Description
FileNotFoundError

if _rixpress or logs are missing.

ValueError

on invalid modes or derivation not found.

RuntimeError

on copy failures.

Source code in src/ryxpress/copy_artifacts.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def rxp_copy(
    derivation_name: Optional[str] = None,
    dir_mode: str = "0755",
    file_mode: str = "0644",
    project_path: Union[str, Path] = ".",
) -> None:
    """
    Copy derivations from the Nix store to ./pipeline-output.

    Args:
        derivation_name: name of the derivation to copy (string). If None,
            uses the special derivation name "all-derivations" (mirrors R).
        dir_mode: octal permission string applied to copied directories (default "0755").
        file_mode: octal permission string applied to copied files (default "0644").
        project_path: project root where _rixpress lives (defaults to ".").

    Returns:
        None. Prints a success message upon completion.

    Raises:
        FileNotFoundError: if _rixpress or logs are missing.
        ValueError: on invalid modes or derivation not found.
        RuntimeError: on copy failures.
    """
    project = Path(project_path)
    # Validate modes
    if not _valid_mode(dir_mode):
        raise ValueError('Invalid dir_mode: provide a character octal like "0755" or "755".')
    if not _valid_mode(file_mode):
        raise ValueError('Invalid file_mode: provide a character octal like "0644" or "644".')

    # Ensure there is a build log
    logs = rxp_list_logs(project)
    # rxp_list_logs raises if none; if it returned, we have log entries

    # Read latest build log content via rxp_inspect (most recent)
    rows = rxp_inspect(project_path=project, which_log=None)
    if not isinstance(rows, list) or not rows:
        raise RuntimeError("Could not read build log details; rxp_inspect returned no rows.")

    # Build a mapping from derivation name -> list of store paths
    # We try to be tolerant: look for keys 'derivation' (R), then 'deriv', 'name'
    deriv_key_candidates = ("derivation", "deriv", "name")
    path_key_candidates = ("path", "store_path", "path_store", "output_path", "output")

    deriv_to_paths: Dict[str, List[str]] = {}
    for r in rows:
        if not isinstance(r, dict):
            continue
        deriv_val = _extract_field(r, deriv_key_candidates)
        path_val = _extract_field(r, path_key_candidates)
        if deriv_val is None:
            # skip rows without a derivation name
            continue
        derivs = _ensure_iterable_of_strings(deriv_val)
        paths = _ensure_iterable_of_strings(path_val)
        for d in derivs:
            deriv_to_paths.setdefault(d, []).extend(paths)

    # Deduplicate path lists
    for k in list(deriv_to_paths.keys()):
        seen = []
        for p in deriv_to_paths[k]:
            if p not in seen:
                seen.append(p)
        deriv_to_paths[k] = seen

    # Choose derivation_name if not provided
    if derivation_name is None:
        derivation_name = "all-derivations"

    if derivation_name not in deriv_to_paths:
        # Provide hint of available derivations (up to 20)
        available = list(deriv_to_paths.keys())[:20]
        more = ", ..." if len(deriv_to_paths) > 20 else ""
        raise ValueError(
            f"No derivation {derivation_name!r} found in the build log. Available: {', '.join(available)}{more}"
        )

    # Collect paths for this derivation
    deriv_paths = deriv_to_paths.get(derivation_name, [])
    if not deriv_paths:
        raise RuntimeError(f"No store paths recorded for derivation {derivation_name!r} in the build log.")

    output_dir = _ensure_output_dir(Path.cwd())

    # For each store path, copy its contents into output_dir
    copy_failed = False
    errors: List[str] = []
    for store_path_str in deriv_paths:
        store_path = Path(store_path_str)
        if not store_path.exists():
            # Skip non-existing path (warn)
            logger.warning("Store path does not exist, skipping: %s", store_path)
            continue
        try:
            # If the derivation path is a directory, copy its children into output_dir
            if store_path.is_dir():
                # copy each child into output_dir, preserving names
                for child in store_path.iterdir():
                    dest = output_dir / child.name
                    if child.is_dir():
                        # Python 3.8+: dirs_exist_ok True will merge
                        try:
                            shutil.copytree(child, dest, dirs_exist_ok=True)
                        except TypeError:
                            # older Python: fallback to manual merge
                            if dest.exists():
                                # copy contents into existing dest
                                for sub in child.rglob("*"):
                                    rel = sub.relative_to(child)
                                    target = dest / rel
                                    if sub.is_dir():
                                        target.mkdir(parents=True, exist_ok=True)
                                    else:
                                        target.parent.mkdir(parents=True, exist_ok=True)
                                        shutil.copy2(sub, target)
                            else:
                                shutil.copytree(child, dest)
                    else:
                        # file: copy, possibly overwrite
                        shutil.copy2(child, dest)
            else:
                # store_path is a file: copy into output_dir
                dest_file = output_dir / store_path.name
                shutil.copy2(store_path, dest_file)
        except Exception as e:
            copy_failed = True
            errors.append(f"{store_path}: {e}")
            logger.debug("Copy error for %s: %s", store_path, e)

    # Apply permissions
    try:
        _apply_permissions(output_dir, dir_mode=dir_mode, file_mode=file_mode)
    except Exception:
        # Best-effort: ignore permission application errors
        logger.debug("Failed to apply permissions to %s", output_dir)

    if copy_failed:
        raise RuntimeError(f"Copy unsuccessful: errors occurred:\n" + "\n".join(errors))

    # Success message
    print(f"Copy successful, check out {output_dir}")
    return None

Read the output of a derivation.

Parameters:

Name Type Description Default
derivation_name str

name of the derivation to read.

required
which_log Optional[str]

optional regex to select a specific log file. If None, the most recent log is used.

None
project_path Union[str, Path]

path to project root (defaults to ".").

'.'

Returns:

Type Description
Union[object, str, List[str]]

The loaded object if successfully unpickled or parsed via rds2py.

Union[object, str, List[str]]

Otherwise, returns the path string (or list of paths if multiple outputs).

Note

All failures are silent; no exceptions/warnings are raised for "can't load" cases. When cronista is available, warns if the loaded object is a Chronicle with Nothing value.

Source code in src/ryxpress/read_load.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def rxp_read(
    derivation_name: str,
    which_log: Optional[str] = None,
    project_path: Union[str, Path] = ".",
) -> Union[object, str, List[str]]:
    """
    Read the output of a derivation.

    Args:
        derivation_name: name of the derivation to read.
        which_log: optional regex to select a specific log file. If None, the most recent log is used.
        project_path: path to project root (defaults to ".").

    Returns:
        The loaded object if successfully unpickled or parsed via rds2py.
        Otherwise, returns the path string (or list of paths if multiple outputs).

    Note:
        All failures are silent; no exceptions/warnings are raised for "can't load" cases.
        When cronista is available, warns if the loaded object is a Chronicle with Nothing value.
    """
    resolved = rxp_read_load_setup(derivation_name, which_log=which_log, project_path=project_path)

    # If multiple outputs (list), return them directly
    if isinstance(resolved, list):
        return resolved

    # Single path (string) or fallback value (derivation_name)
    path = str(resolved)

    # If path points to a directory, return it
    if os.path.isdir(path):
        return path

    obj = None

    # Try to unpickle first (regardless of extension)
    try:
        with open(path, "rb") as fh:
            obj = pickle.load(fh)
    except Exception:
        # Silent failure — try the next loader
        logger.debug("pickle load failed for %s; will try rds2py if available", path, exc_info=True)

    # Try rds2py as a fallback (regardless of extension)
    if obj is None:
        obj = _load_rds_with_rds2py(path)

    if obj is None:
        # Nothing worked; return the path string (no errors/warnings)
        return path

    # Check for chronicle Nothing values if cronista is available
    try:
        from .cronista_helpers import chronicle_state
        state = chronicle_state(obj)
        if state == "nothing":
            import warnings
            warnings.warn(
                f"Derivation '{derivation_name}' contains a chronicle with Nothing value! "
                "Use cronista.read_log() on this object for details."
            )
        elif state == "warning":
            logger.info(
                "Derivation '%s' is a chronicle with captured warnings. "
                "Use cronista.read_log() for details.",
                derivation_name
            )
    except ImportError:
        pass  # cronista not available, skip check

    return obj

Load the output of a derivation into the caller's globals.

Parameters:

Name Type Description Default
derivation_name str

name of the derivation to load. Also used as the variable name in globals.

required
which_log Optional[str]

optional regex to select a specific log file. If None, the most recent log is used.

None
project_path Union[str, Path]

path to project root (defaults to ".").

'.'

Returns:

Type Description
Union[object, str, List[str]]

The loaded object if successfully unpickled or parsed.

Union[object, str, List[str]]

Otherwise, returns the path string (or list of paths if multiple outputs).

Note

The loaded object is assigned to the caller's globals under derivation_name. All failures are silent. When cronista is available, warns if the loaded object is a Chronicle with Nothing value.

Source code in src/ryxpress/read_load.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def rxp_load(
    derivation_name: str,
    which_log: Optional[str] = None,
    project_path: Union[str, Path] = ".",
) -> Union[object, str, List[str]]:
    """
    Load the output of a derivation into the caller's globals.

    Args:
        derivation_name: name of the derivation to load. Also used as the variable name in globals.
        which_log: optional regex to select a specific log file. If None, the most recent log is used.
        project_path: path to project root (defaults to ".").

    Returns:
        The loaded object if successfully unpickled or parsed.
        Otherwise, returns the path string (or list of paths if multiple outputs).

    Note:
        The loaded object is assigned to the caller's globals under `derivation_name`.
        All failures are silent.
        When cronista is available, warns if the loaded object is a Chronicle with Nothing value.
    """
    resolved = rxp_read_load_setup(derivation_name, which_log=which_log, project_path=project_path)

    # If multiple outputs, return them
    if isinstance(resolved, list):
        return resolved

    path = str(resolved)

    if os.path.isdir(path):
        return path

    # Try to unpickle first
    try:
        with open(path, "rb") as fh:
            obj = pickle.load(fh)
    except Exception:
        obj = None
        logger.debug("pickle load failed for %s; will try rds2py if available", path, exc_info=True)

    # If pickle failed, try rds2py
    if obj is None:
        obj = _load_rds_with_rds2py(path)

    if obj is None:
        # Nothing we can load silently; return the path
        return path

    # Check for chronicle Nothing values if cronista is available
    try:
        from .cronista_helpers import chronicle_state
        state = chronicle_state(obj)
        if state == "nothing":
            import warnings
            warnings.warn(
                f"Derivation '{derivation_name}' contains a chronicle with Nothing value! "
                "Use cronista.read_log() on this object for details."
            )
        elif state == "warning":
            logger.info(
                "Derivation '%s' is a chronicle with captured warnings. "
                "Use cronista.read_log() for details.",
                derivation_name
            )
    except ImportError:
        pass  # cronista not available, skip check

    # Assign into caller's globals (best-effort); silence any assignment errors
    try:
        caller_frame = inspect.currentframe().f_back
        if caller_frame is not None:
            caller_globals = caller_frame.f_globals
            # Use derivation_name as the variable name; keep last path component if it's a path
            try:
                var_name = derivation_name
                # If derivation_name looks like a path, use the basename without extension
                if derivation_name.startswith("/nix/store/") or os.path.sep in derivation_name:
                    var_name = os.path.splitext(os.path.basename(str(path)))[0]
                # ensure valid identifier fallback
                if not var_name.isidentifier():
                    var_name = "_".join(re.findall(r"\w+", var_name)) or "loaded_artifact"
            except Exception:
                var_name = "loaded_artifact"
            caller_globals[var_name] = obj
    except Exception:
        logger.debug("Failed to assign loaded object into caller globals", exc_info=True)

    return obj

Visually exploring the pipeline

Build an igraph object from nodes_and_edges and write a DOT file for CI.

Parameters:

Name Type Description Default
nodes_and_edges Optional[Dict[str, List[Dict]]]

dict with keys 'nodes' and 'edges' as returned by get_nodes_edges(). If None, get_nodes_edges() is called.

None
output_file Union[str, Path]

path to write DOT file. Parent directories are created as needed.

'_rixpress/dag.dot'

Raises:

Type Description
ImportError

if python-igraph is not installed.

Source code in src/ryxpress/plotting.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def rxp_dag_for_ci(nodes_and_edges: Optional[Dict[str, List[Dict]]] = None,
                   output_file: Union[str, Path] = "_rixpress/dag.dot") -> None:
    """
    Build an igraph object from nodes_and_edges and write a DOT file for CI.

    Args:
        nodes_and_edges: dict with keys 'nodes' and 'edges' as returned by
            get_nodes_edges(). If None, get_nodes_edges() is called.
        output_file: path to write DOT file. Parent directories are created as needed.

    Raises:
        ImportError: if python-igraph is not installed.
    """
    # Lazy import igraph and raise helpful error if not available
    try:
        import igraph  # python-igraph
    except Exception as e:  # ImportError or other import-time errors
        raise ImportError(
            "The python 'igraph' package is required for rxp_dag_for_ci. "
            "Install it with e.g. 'pip install python-igraph' and try again."
        ) from e

    if nodes_and_edges is None:
        nodes_and_edges = get_nodes_edges()

    edges = nodes_and_edges.get("edges", [])
    # Build a list of tuples (from, to) for igraph
    edge_tuples = [(e["from"], e["to"]) for e in edges]

    # Ensure output directory exists
    out_path = Path(output_file)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    # Create the graph from edge tuples. TupleList will create vertices named by
    # the unique labels encountered in the tuples.
    # If there are no edges but there are nodes, create an empty graph and add vertices.
    if edge_tuples:
        g = igraph.Graph.TupleList(edge_tuples, directed=True, vertex_name_attr="name")
    else:
        # no edges — create graph and add vertices from nodes list
        nodes = nodes_and_edges.get("nodes", [])
        vertex_names = [n["id"] for n in nodes]
        g = igraph.Graph(directed=True)
        if vertex_names:
            g.add_vertices(vertex_names)
            # set the 'name' attribute automatically when vertices are named

    # Set vertex 'label' attribute from vertex name
    # g.vs['name'] should exist; copy to 'label'
    try:
        names = g.vs["name"]
        g.vs["label"] = names
        # Attempt to remove the 'name' attribute to mirror R behavior.
        # python-igraph allows deleting vertex attributes via 'del g.vs["attr"]'.
        try:
            del g.vs["name"]
        except Exception:
            # If deletion is not supported in some igraph versions, leave it;
            # having both 'name' and 'label' is harmless for DOT output.
            logger.debug("Could not delete 'name' vertex attribute; leaving it in place.")
    except Exception:
        # If the graph has no vertices or attribute access fails, continue.
        pass

    # Write graph to DOT format
    # Use Graph.write with format="dot"
    try:
        g.write(str(out_path), format="dot")
    except Exception as e:
        raise RuntimeError(f"Failed to write DOT file to {out_path}: {e}") from e

Render a DOT graph file as an ASCII diagram using phart, showing node labels.

This function reads a DOT file, parses it with pydot and networkx, and renders it in ASCII using phart. Node labels from the DOT file are used instead of numeric node IDs.

Dependencies
  • phart
  • pydot
  • networkx

Make sure to add these dependencies to the execution environment to use this function.

Parameters:

Name Type Description Default
dot_path str

Path to the DOT file to render.

required

Raises:

Type Description
FileNotFoundError

If the specified DOT file does not exist.

ValueError

If the DOT file is empty or cannot be parsed into a graph.

Source code in src/ryxpress/plotting.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def rxp_phart(dot_path: str) -> None:
    """
    Render a DOT graph file as an ASCII diagram using phart, showing node labels.

    This function reads a DOT file, parses it with pydot and networkx, and
    renders it in ASCII using phart. Node labels from the DOT file are used
    instead of numeric node IDs.

    Dependencies:
        - phart
        - pydot
        - networkx

    Make sure to add these dependencies to the execution environment to use this function.

    Args:
        dot_path: Path to the DOT file to render.

    Raises:
        FileNotFoundError: If the specified DOT file does not exist.
        ValueError: If the DOT file is empty or cannot be parsed into a graph.
    """

    # Dependency checks
    missing = []
    try:
        import phart
        from phart import ASCIIRenderer
    except ImportError:
        missing.append("phart")
    try:
        import pydot
    except ImportError:
        missing.append("pydot")
    try:
        import networkx as nx
    except ImportError:
        missing.append("networkx")

    if missing:
        print(
            f"The following dependencies are required but not installed: {', '.join(missing)}"
        )
        print(f"Please add them to the execution environment.")
        return

    # Check file exists
    import os
    if not os.path.exists(dot_path):
        raise FileNotFoundError(f"DOT file not found: {dot_path}")

    # Load DOT file
    with open(dot_path) as f:
        dot_data = f.read()

    if not dot_data.strip():
        raise ValueError("DOT file is empty.")

    # Parse DOT into networkx graph
    graphs = pydot.graph_from_dot_data(dot_data)
    if not graphs:
        raise ValueError("No valid graphs found in DOT file.")

    G = nx.nx_pydot.from_pydot(graphs[0])

    # Map node keys to labels for display
    mapping = {node: data.get("label", str(node)) for node, data in G.nodes(data=True)}
    H = nx.relabel_nodes(G, mapping)

    # Render ASCII
    renderer = ASCIIRenderer(H)
    print(renderer.render())

Trace lineage of derivations.

Parameters:

Name Type Description Default
name Optional[str]

Name of the derivation to trace. If None, traces the whole pipeline.

None
dag_file Union[str, Path]

Path to the dag.json file (defaults to "_rixpress/dag.json").

Path('_rixpress') / 'dag.json'
transitive bool

If True, include transitive dependencies marked with '*'.

True
include_self bool

If True, include the node itself in dependency lists.

False
color bool

If True and derivations have pipeline_color, names are coloured in output.

True

Returns:

Type Description
Dict[str, Dict[str, List[str]]]

A dict mapping each inspected derivation name to a dict with keys:

Dict[str, Dict[str, List[str]]]
  • 'dependencies' : list of dependency names (ancestors), with transitive-only names marked with '*'
Dict[str, Dict[str, List[str]]]
  • 'reverse_dependencies' : list of reverse dependents (children), with transitive-only names marked with '*'
Side-effect

Prints a tree representation to stdout (either the whole pipeline or the single-node lineage). When color=True and terminal supports it, derivation names are coloured by their pipeline_color.

Source code in src/ryxpress/tracing.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def rxp_trace(
    name: Optional[str] = None,
    dag_file: Union[str, Path] = Path("_rixpress") / "dag.json",
    transitive: bool = True,
    include_self: bool = False,
    color: bool = True,
) -> Dict[str, Dict[str, List[str]]]:
    """
    Trace lineage of derivations.

    Args:
        name: Name of the derivation to trace. If None, traces the whole pipeline.
        dag_file: Path to the dag.json file (defaults to "_rixpress/dag.json").
        transitive: If True, include transitive dependencies marked with '*'.
        include_self: If True, include the node itself in dependency lists.
        color: If True and derivations have pipeline_color, names are coloured in output.

    Returns:
        A dict mapping each inspected derivation name to a dict with keys:
        - 'dependencies' : list of dependency names (ancestors), with transitive-only names marked with '*'
        - 'reverse_dependencies' : list of reverse dependents (children), with transitive-only names marked with '*'

    Side-effect:
        Prints a tree representation to stdout (either the whole pipeline or
        the single-node lineage). When color=True and terminal supports it,
        derivation names are coloured by their pipeline_color.
    """
    derivs, color_map = _load_dag(dag_file)

    # Check if we should use colour
    use_color = color and _supports_color()

    # Helper to get coloured name
    def maybe_color(node_name: str, suffix: str = "") -> str:
        """Return node name with optional colour and suffix."""
        base_name = node_name.rstrip("*")
        star = "*" if node_name.endswith("*") else ""
        display_name = base_name + star + suffix

        if use_color:
            hex_color = color_map.get(base_name)
            if hex_color:
                ansi = _hex_to_ansi(hex_color)
                return _colorize(display_name, ansi)
        return display_name

    all_names: List[str] = []
    for d in derivs:
        nm = _extract_name(d)
        if nm is None:
            raise ValueError("Found derivations with missing or unparsable names in dag.json.")
        all_names.append(nm)

    if name is not None and name not in all_names:
        # mirror R's head(...) behaviour for listing available names
        snippet = ", ".join(all_names[:20])
        more = ", ..." if len(all_names) > 20 else ""
        raise ValueError(f"Derivation '{name}' not found in dag.json (available: {snippet}{more}).")

    depends_map = _make_depends_map(derivs, all_names)
    reverse_map = _build_reverse_map(depends_map, all_names)

    # helper to print single lineage (deps and reverse deps)
    def print_single(target: str) -> None:
        print(f"==== Lineage for: {maybe_color(target)} ====")
        # Dependencies (ancestors)
        print("Dependencies (ancestors):")
        visited: List[str] = []

        def rec_dep(node: str, depth: int) -> None:
            parents = depends_map.get(node) or []
            if not parents:
                if depth == 0:
                    print("  - <none>")
                return
            for p in parents:
                label = f"{p}*" if (transitive and depth >= 1) else p
                print(("  " * (depth + 1)) + "- " + maybe_color(label))
                if p not in visited:
                    visited.append(p)
                    rec_dep(p, depth + 1)

        rec_dep(target, 0)

        print("\nReverse dependencies (children):")
        visited = []

        def rec_rev(node: str, depth: int) -> None:
            kids = reverse_map.get(node) or []
            if not kids:
                if depth == 0:
                    print("  - <none>")
                return
            for k in kids:
                label = f"{k}*" if (transitive and depth >= 1) else k
                print(("  " * (depth + 1)) + "- " + maybe_color(label))
                if k not in visited:
                    visited.append(k)
                    rec_rev(k, depth + 1)

        rec_rev(target, 0)

        if transitive:
            print("\nNote: '*' marks transitive dependencies (depth >= 2).\n")

    # helper to print forest starting from given roots, using depends_map (outputs -> inputs)
    def print_forest_once(roots: List[str], graph: Dict[str, List[str]], transitive_flag: bool) -> None:
        visited_nodes: List[str] = []

        def rec(node: str, depth: int) -> None:
            label = f"{node}*" if (transitive_flag and depth >= 2) else node
            print(("  " * depth) + "- " + maybe_color(label))
            if node in visited_nodes:
                return
            visited_nodes.append(node)
            kids = graph.get(node) or []
            if not kids:
                return
            for k in kids:
                rec(k, depth + 1)

        for r in roots:
            rec(r, 0)

    # sinks: nodes with no children in reverse_map
    def sinks() -> List[str]:
        no_children = [n for n, kids in reverse_map.items() if not kids]
        if no_children:
            return no_children
        outdeg_vals = {n: len(kids) for n, kids in reverse_map.items()}
        if outdeg_vals:
            min_outdeg = min(outdeg_vals.values())
            return [n for n, v in outdeg_vals.items() if v == min_outdeg]
        return []

    # Build results mapping
    results: Dict[str, Dict[str, List[str]]] = {}
    for nm in all_names:
        deps = _marked_vec(nm, depends_map, transitive)
        rdeps = _marked_vec(nm, reverse_map, transitive)
        if include_self:
            deps = _unique_preserve_order([nm] + deps)
            rdeps = _unique_preserve_order([nm] + rdeps)
        results[nm] = {"dependencies": deps, "reverse_dependencies": rdeps}

    if name is None:
        print("==== Pipeline dependency tree (outputs \u2192 inputs) ====")
        for root in sinks():
            print_forest_once([root], depends_map, transitive)
        if transitive:
            print("\nNote: '*' marks transitive dependencies (depth >= 2).\n")
        return results
    else:
        print_single(name)
        # return only the single-name mapping to match the R invisible(results[name]) behaviour
        return {name: results[name]}

Utilities

Garbage collect Nix store paths and build logs produced by rixpress.

Parameters:

Name Type Description Default
keep_since Optional[Union[str, date]]

None for full GC, or a date/ISO date string (YYYY-MM-DD) to keep logs newer-or-equal to that date.

None
project_path Union[str, Path]

project root containing _rixpress

'.'
dry_run bool

if True, show what would be deleted without deleting

True
timeout_sec int

timeout for invoked nix-store commands and for lock staleness checks

300
verbose bool

if True, print extra diagnostic output

False
ask bool

if True, prompt for confirmation before destructive operations (default True)

True
pretty bool

if True, pretty-prints the result (and returns nothing).

False
as_json bool

if True, pretty prints using json.dumps(indent=2) instead of pprint.

False

Returns:

Type Description
Dict[str, object]

A summary dict with canonical keys:

Dict[str, object]

kept, deleted, protected, deleted_count, failed_count, referenced_count,

Dict[str, object]

log_files_deleted, log_files_failed, dry_run_details

Source code in src/ryxpress/garbage.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def rxp_gc(
    keep_since: Optional[Union[str, date]] = None,
    project_path: Union[str, Path] = ".",
    dry_run: bool = True,
    timeout_sec: int = 300,
    verbose: bool = False,
    ask: bool = True,
    pretty: bool = False,
    as_json: bool = False,
) -> Dict[str, object]:
    """
    Garbage collect Nix store paths and build logs produced by rixpress.

    Args:
        keep_since: None for full GC, or a date/ISO date string (YYYY-MM-DD) to keep logs newer-or-equal to that date.
        project_path: project root containing _rixpress
        dry_run: if True, show what would be deleted without deleting
        timeout_sec: timeout for invoked nix-store commands and for lock staleness checks
        verbose: if True, print extra diagnostic output
        ask: if True, prompt for confirmation before destructive operations (default True)
        pretty: if True, pretty-prints the result (and returns nothing).
        as_json: if True, pretty prints using json.dumps(indent=2) instead of pprint.

    Returns:
        A summary dict with canonical keys:
        kept, deleted, protected, deleted_count, failed_count, referenced_count,
        log_files_deleted, log_files_failed, dry_run_details
    """
    nix_bin = shutil.which("nix-store")
    if not nix_bin:
        raise FileNotFoundError("nix-store not found on PATH. Install Nix or adjust PATH.")

    project_path = Path(project_path).resolve()
    if not project_path.exists():
        raise FileNotFoundError(f"Project path does not exist: {project_path}")

    lock_file_path = Path(tempfile.gettempdir()) / "rixpress_gc.lock"

    # record of temp gcroot symlink paths we created so we can remove them later
    created_gcroot_links: List[Path] = []

    # ensure we cleanup on signals
    def _cleanup_on_signal(signum, frame):
        logger.info("Received signal %s, cleaning up...", signum)
        # remove any gcroot links
        for p in created_gcroot_links:
            try:
                if p.exists():
                    p.unlink()
            except Exception:
                pass
        # remove lock file if held
        try:
            if lock_path_context and lock_path_context.acquired:
                lock_path_context.release()
        except Exception:
            pass
        raise SystemExit(1)

    # placeholder for context so signal handler can access
    lock_path_context: Optional[LockFile] = None

    # Register handlers
    old_sigint = signal.getsignal(signal.SIGINT)
    old_sigterm = signal.getsignal(signal.SIGTERM)
    signal.signal(signal.SIGINT, _cleanup_on_signal)
    signal.signal(signal.SIGTERM, _cleanup_on_signal)

    try:
        # Acquire lock with context manager (atomic)
        lock_path_context = LockFile(lock_file_path, timeout_sec=timeout_sec)
        lock_path_context.acquire()

        # parse keep_since
        if keep_since is not None:
            if isinstance(keep_since, date) and not isinstance(keep_since, datetime):
                keep_date = keep_since
            else:
                # accept YYYY-MM-DD string
                try:
                    keep_date = _parse_iso_date(str(keep_since))
                except Exception:
                    raise ValueError("Invalid 'keep_since'. Use a date or 'YYYY-MM-DD' string.")
        else:
            keep_date = None

        # Gather logs
        all_logs = rxp_list_logs(project_path)
        # Expect list of dicts with 'filename' and 'modification_time'
        if not isinstance(all_logs, list) or not all_logs:
            logger.info("No build logs found. Nothing to do.")
            # canonical empty summary
            return {
                "kept": [],
                "deleted": [],
                "protected": 0,
                "deleted_count": 0,
                "failed_count": 0,
                "referenced_count": 0,
                "log_files_deleted": 0,
                "log_files_failed": 0,
                "dry_run_details": None,
            }

        # Partition logs
        logs_to_keep = []
        logs_to_delete = []
        for entry in all_logs:
            fn = entry.get("filename")
            mtime = entry.get("modification_time")
            if not fn or not mtime:
                continue
            try:
                mdate = _parse_iso_date(mtime)
            except Exception:
                # If malformed, treat as older than keep_since to be conservative
                mdate = datetime.min.date()
            if keep_date is None:
                logs_to_keep.append(entry)
            else:
                if mdate >= keep_date:
                    logs_to_keep.append(entry)
                else:
                    logs_to_delete.append(entry)

        def _filenames(entries: Sequence[Dict]) -> List[str]:
            return [e["filename"] for e in entries]

        # helper to get store paths per log using rxp_inspect
        def get_paths_from_logs(filenames: Sequence[str]) -> Dict[str, List[str]]:
            out: Dict[str, List[str]] = {}
            for fn in filenames:
                wl = _extract_which_log(fn)
                if wl is None:
                    logger.warning("Could not parse which_log from filename: %s", fn)
                    out[fn] = []
                    continue
                try:
                    insp_rows = rxp_inspect(project_path=project_path, which_log=wl)
                except Exception as e:
                    logger.warning("rxp_inspect failed for %s: %s", fn, e)
                    out[fn] = []
                    continue
                # rxp_inspect returns list of dicts; look for 'path' keys
                paths = []
                if isinstance(insp_rows, list):
                    for row in insp_rows:
                        if isinstance(row, dict) and "path" in row and isinstance(row["path"], str):
                            paths.append(row["path"])
                out[fn] = _validate_store_paths(paths)
            return out

        keep_paths_by_log = get_paths_from_logs(_filenames(logs_to_keep)) if logs_to_keep else {}
        delete_paths_by_log = get_paths_from_logs(_filenames(logs_to_delete)) if logs_to_delete else {}

        keep_paths_all = _validate_store_paths(sorted({p for lst in keep_paths_by_log.values() for p in lst}))
        delete_paths_all = _validate_store_paths(sorted({p for lst in delete_paths_by_log.values() for p in lst}))

        summary_info: Dict[str, object] = {
            "kept": _filenames(logs_to_keep),
            "deleted": _filenames(logs_to_delete),
            "protected": 0,
            "deleted_count": 0,
            "failed_count": 0,
            "referenced_count": 0,
            "log_files_deleted": 0,
            "log_files_failed": 0,
            "dry_run_details": None,
        }

        # DRY RUN branch (date-based)
        if keep_date is not None and dry_run:
            logger.info("--- DRY RUN --- No changes will be made. ---")
            logger.info("Logs that would be deleted (%d):", len(logs_to_delete))
            for fn in summary_info["deleted"]:
                logger.info("  %s", fn)
            details: Dict[str, List[Dict[str, str]]] = {}
            if delete_paths_by_log:
                logger.info("Artifacts per log (from rxp_inspect):")
                for fn, _ in delete_paths_by_log.items():
                    logger.info("== %s ==", fn)
                    try:
                        insp_rows = rxp_inspect(project_path=project_path, which_log=_extract_which_log(fn) or "")
                    except Exception:
                        logger.info("  (rxp_inspect unavailable)")
                        details[fn] = []
                        continue
                    rows = []
                    if isinstance(insp_rows, list):
                        for r in insp_rows:
                            if not isinstance(r, dict):
                                continue
                            rows.append({"path": r.get("path", ""), "output": r.get("output", "")})
                    details[fn] = rows
            existing_delete_paths = [p for p in delete_paths_all if os.path.exists(p) or os.path.isdir(p)]
            missing_paths = [p for p in delete_paths_all if p not in existing_delete_paths]
            logger.info("Aggregate store paths targeted for deletion (deduped): %d total, %d existing, %d missing",
                        len(delete_paths_all), len(existing_delete_paths), len(missing_paths))
            if existing_delete_paths:
                logger.info("Existing paths that would be deleted:")
                for p in existing_delete_paths:
                    logger.info("  %s", p)
            if missing_paths:
                logger.info("Paths already missing (will be skipped):")
                for p in missing_paths:
                    logger.info("  %s", p)
            summary_info["dry_run_details"] = details
            if logs_to_delete:
                logger.info("Build log files that would be deleted:")
                for fn in summary_info["deleted"]:
                    log_path = project_path / "_rixpress" / fn
                    exists_indicator = "[OK]" if log_path.exists() else "[X]"
                    logger.info("  %s %s", exists_indicator, fn)
            if pretty:
                if as_json:
                    print(json.dumps(summary_info, indent=2, ensure_ascii=False))
                else:
                    pprint(summary_info)
                return

            return summary_info

        # dry-run full GC preview
        if keep_date is None and dry_run:
            logger.info("--- DRY RUN --- Would run 'nix-store --gc' (delete all unreferenced store paths). ---")
            if verbose:
                logger.info("(Tip: for an approximate preview, run 'nix-collect-garbage -n' from a shell.)")
            return summary_info

        # Full GC mode
        if keep_date is None:
            if ask:
                proceed = _ask_yes_no("Run full Nix garbage collection (delete all unreferenced artifacts)?", default=False)
                if not proceed:
                    logger.info("Operation cancelled.")
                    return summary_info
            logger.info("Running Nix garbage collector...")
            try:
                _, stdout, stderr = _safe_run([nix_bin, "--gc"], timeout=timeout_sec, check=True)
                if stdout:
                    if verbose:
                        logger.info(stdout)
                    else:
                        rel = [l for l in stdout.splitlines() if re.search(r"freed|removing|deleting", l, re.I)]
                        if rel:
                            for line in rel[-10:]:
                                logger.info(line)
                logger.info("Garbage collection complete.")
                return summary_info
            except RxpGCError as e:
                raise

        # Targeted deletion mode
        if not logs_to_delete:
            logger.info("No build logs older than %s found. Nothing to do.", keep_date.isoformat())
            return summary_info

        if not delete_paths_all:
            logger.info("No valid store paths found in logs older than %s. Nothing to delete.", keep_date.isoformat())
            return summary_info

        prompt = f"This will permanently delete {len(delete_paths_all)} store paths from {len(logs_to_delete)} build(s) older than {keep_date.isoformat()}. Continue?"
        if ask:
            if not _ask_yes_no(prompt, default=False):
                logger.info("Operation cancelled.")
                return summary_info

        # Protect recent artifacts (date-based mode only) by adding indirect GC roots.
        temp_gcroots_dir: Optional[Path] = None
        protected = 0
        try:
            if keep_paths_all:
                temp_gcroots_dir = Path(tempfile.mkdtemp(prefix="rixpress-gc-"))
                logger.info("Protecting %d recent artifacts via GC roots...", len(keep_paths_all))
                for i, p in enumerate(keep_paths_all, start=1):
                    link_path = temp_gcroots_dir / f"root-{i}"
                    try:
                        # create a placeholder link path (the nix-store --add-root will create the gcroot)
                        # use link_path as the path to register the indirect root
                        _safe_run([nix_bin, "--add-root", str(link_path), "--indirect", p], timeout=timeout_sec, check=True)
                        created_gcroot_links.append(link_path)
                        protected += 1
                    except RxpGCError as e:
                        logger.warning("Failed to add GC root for %s: %s", p, e)
                if protected == 0:
                    raise RxpGCError("Failed to protect any store paths. Aborting.")
                summary_info["protected"] = protected

            # Delete specific store paths
            logger.info("Deleting %d targeted store paths...", len(delete_paths_all))
            existing_paths = [p for p in delete_paths_all if os.path.exists(p) or os.path.isdir(p)]
            missing_paths = [p for p in delete_paths_all if p not in existing_paths]
            if missing_paths:
                logger.info("Skipping %d paths that no longer exist.", len(missing_paths))
                if verbose:
                    for p in missing_paths:
                        logger.info("  Missing: %s", p)
            if not existing_paths:
                logger.info("No existing paths to delete. All targeted paths are already gone.")
                return summary_info

            total_deleted = 0
            failed_paths: List[str] = []
            referenced_paths: List[str] = []

            for i, pth in enumerate(existing_paths, start=1):
                if not (os.path.exists(pth) or os.path.isdir(pth)):
                    logger.info("  [%d/%d] Skipping %s (already gone)", i, len(existing_paths), os.path.basename(pth))
                    continue
                logger.info("  [%d/%d] Attempting to delete %s...", i, len(existing_paths), os.path.basename(pth))
                try:
                    proc = subprocess.run([nix_bin, "--delete", pth], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout_sec)
                    out = (proc.stdout or "") + "\n" + (proc.stderr or "")
                    if proc.returncode == 0:
                        total_deleted += 1
                        logger.info("    [OK] Successfully deleted")
                        if verbose and out.strip():
                            logger.info("    %s", out.strip())
                    else:
                        if re.search(r"still alive|Cannot delete", out, re.I):
                            referenced_paths.append(pth)
                            logger.info("    [!] Skipped (still referenced)")
                            if verbose:
                                logger.info("    Details: %s", out.strip())
                        else:
                            failed_paths.append(pth)
                            logger.info("    [X] Failed to delete")
                            if verbose:
                                logger.info("    Details: %s", out.strip())
                except subprocess.TimeoutExpired:
                    failed_paths.append(pth)
                    logger.info("    [X] Timeout while deleting")
                except Exception as e:
                    failed_paths.append(pth)
                    logger.info("    [X] Error: %s", e)

            # Summary of deletion
            logger.info("\nDeletion summary:")
            logger.info("  Successfully deleted: %d paths", total_deleted)
            logger.info("  Skipped (still referenced): %d paths", len(referenced_paths))
            logger.info("  Failed (other errors): %d paths", len(failed_paths))

            if referenced_paths and verbose:
                logger.info("\nReferenced paths (cannot delete):")
                for pth in referenced_paths:
                    logger.info("  %s", os.path.basename(pth))
                    try:
                        _, roots_out, _ = _safe_run([nix_bin, "--query", "--roots", pth], timeout=timeout_sec, check=False)
                        if roots_out.strip():
                            logger.info("    GC roots: %s", roots_out.strip().replace("\n", ", "))
                        else:
                            logger.info("    GC roots: (none found)")
                    except Exception:
                        logger.info("    GC roots: (query failed)")
                    try:
                        _, refs_out, _ = _safe_run([nix_bin, "--query", "--referrers", pth], timeout=timeout_sec, check=False)
                        if refs_out.strip():
                            refs = [os.path.basename(x) for x in refs_out.splitlines() if x.strip()]
                            logger.info("    Referenced by: %s", ", ".join(refs) if refs else "(none)")
                        else:
                            logger.info("    Referenced by: (none)")
                    except Exception:
                        logger.info("    Referenced by: (query failed)")

            summary_info["deleted_count"] = total_deleted
            summary_info["failed_count"] = len(failed_paths)
            summary_info["referenced_count"] = len(referenced_paths)

            # Delete old build log files
            if logs_to_delete:
                logger.info("\nDeleting old build log files...")
                log_files_deleted = 0
                log_files_failed: List[str] = []
                for i, entry in enumerate(logs_to_delete, start=1):
                    log_file = entry["filename"]
                    log_path = project_path / "_rixpress" / log_file
                    logger.info("  [%d/%d] Deleting %s...", i, len(logs_to_delete), log_file)
                    if not log_path.exists():
                        logger.info("    [!] File not found (already deleted?)")
                        continue
                    try:
                        log_path.unlink()
                        if not log_path.exists():
                            log_files_deleted += 1
                            logger.info("    [OK] Successfully deleted")
                        else:
                            log_files_failed.append(log_file)
                            logger.info("    [X] Failed to delete (file still exists)")
                    except Exception as e:
                        log_files_failed.append(log_file)
                        logger.info("    [X] Error: %s", e)
                logger.info("\nBuild log deletion summary:")
                logger.info("  Successfully deleted: %d files", log_files_deleted)
                logger.info("  Failed: %d files", len(log_files_failed))
                if log_files_failed and verbose:
                    logger.info("\nFailed to delete log files:")
                    for lf in log_files_failed:
                        logger.info("  %s", lf)
                summary_info["log_files_deleted"] = log_files_deleted
                summary_info["log_files_failed"] = len(log_files_failed)

            logger.info("\nCleanup complete!")
            return summary_info
        finally:
            # Always attempt to remove created gcroot links and the temp dir
            if created_gcroot_links:
                for p in created_gcroot_links:
                    try:
                        if p.exists():
                            p.unlink()
                    except Exception:
                        logger.debug("Failed to unlink gcroot link %s", p)
                # attempt to remove the parent temp directory if exists and empty
                if temp_gcroots_dir and temp_gcroots_dir.exists():
                    try:
                        shutil.rmtree(temp_gcroots_dir)
                    except Exception:
                        # ignore: best-effort cleanup
                        logger.debug("Failed to remove temp gcroots dir %s", temp_gcroots_dir)
    finally:
        # always release lock and restore signals
        try:
            if lock_path_context is not None:
                lock_path_context.release()
        except Exception:
            pass
        try:
            signal.signal(signal.SIGINT, old_sigint)
            signal.signal(signal.SIGTERM, old_sigterm)
        except Exception:
            pass