Skip to content

Reference

Start a project and build the pipeline

rxp_init

Initialize rixpress project files in project_path. This will generate two R scripts: gen-env.R, which when executed using the rix R package will generate a default.nix, which defines the pipeline's execution environment, and gen-pipeline.R, which is where the pipeline is defined. These R scripts are the same as those generated by rixpress, the R version of this package.

Returns:

Type Description
bool

True if initialization completed (or was skipped due to non-interactive but files present),

bool

False if cancelled by the user.

Source code in src/ryxpress/init_proj.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def rxp_init(project_path: str = ".", skip_prompt: bool = False) -> bool:
    """
    rxp_init

    Initialize rixpress project files in project_path. This will generate
    two R scripts: `gen-env.R`, which when executed using the rix R package
    will generate a `default.nix`, which defines the pipeline's execution
    environment, and `gen-pipeline.R`, which is where the pipeline is defined.
    These R scripts are the same as those generated by rixpress, the R version
    of this package.

    Returns:
      True if initialization completed (or was skipped due to non-interactive but files present),
      False if cancelled by the user.
    """
    # Initial confirmation before any action
    if not _confirm(f"Initialize project at '{project_path}'?", skip_prompt=skip_prompt):
        print("Operation cancelled by user. No files or directories were created.")
        return False

    proj = Path(project_path)
    # Ensure project_path exists, create it if it doesn't
    if not proj.exists():
        proj.mkdir(parents=True, exist_ok=True)

    env_file = proj / "gen-env.R"
    pipeline_file = proj / "gen-pipeline.R"

    gen_env_lines = [
        "# This script defines the default environment the pipeline runs in.",
        "# Add the required packages to execute the code necessary for each derivation.",
        "# If you want to create visual representations of the pipeline, consider adding",
        "# `{visNetwork}` and `{ggdag}` to the list of R packages.",
        "library(rix)",
        "",
        "# Define execution environment",
        "rix(",
        "  date = NULL,",
        "  r_pkgs = NULL,",
        "  py_conf = NULL,",
        "  git_pkgs = list(",
        "    \"package_name\" = \"rixpress\",",
        "    \"repo_url\" = \"https://github.com/b-rodrigues/rixpress\",",
        "    \"commit\" = \"HEAD\",",
        "  ),",
        "  ide = \"none\",",
        "  project_path = \".\"",
        ")",
    ]

    gen_pipeline_lines = [
        "library(rixpress)",
        "library(igraph)",
        "",
        "list(",
        "  rxp_r_file(",
        "    name = NULL,",
        "    path = NULL,",
        "    read_function = \"lambda x: polars.read_csv(x, separator='|')\"",
        "  ),",
        "  rxp_r(",
        "    name = NULL,",
        "    expr = NULL",
        "  )",
        ") |>",
        "  rxp_populate(build = FALSE)",
    ]

    # Write files (overwrite if present)
    env_file.write_text("\n".join(gen_env_lines) + "\n", encoding="utf-8")
    print(f"File {env_file} has been written.")
    pipeline_file.write_text("\n".join(gen_pipeline_lines) + "\n", encoding="utf-8")
    print(f"File {pipeline_file} has been written.")

    # Skip Git initialization when on non-interactive sessions (CRAN/CI/test equivalent)
    if not _is_interactive():
        print(
            "Skipping Git initialization (non-interactive session, CRAN, CI, or test environment detected)."
        )
        return True

    # Ask whether to initialise git
    if _confirm("Would you like to initialise a Git repository here?", skip_prompt=skip_prompt):
        git_bin = shutil.which("git")
        if git_bin is None:
            print(
                "Git not found on PATH. Please install git and run 'git init' manually, "
                "or initialise the repository using your preferred tool."
            )
        else:
            try:
                subprocess.run([git_bin, "init"], cwd=str(proj), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                print("Git repository initialised.")
            except subprocess.CalledProcessError as e:
                print("Failed to initialise git repository. You can run 'git init' manually.")
    else:
        print("Skipping Git initialization.")

    return True

rxp_make

Run the rixpress R pipeline (rxp_populate + rxp_make) by sourcing an R script.

Parameters:

  • script: Path or name of the R script to run (defaults to "gen-pipeline.R"). If a relative path is given and doesn't exist in the working directory, this function will attempt to locate the script on PATH.
  • verbose: integer passed to rixpress::rxp_make(verbose = ...)
  • max_jobs: integer passed to rixpress::rxp_make(max_jobs = ...)
  • cores: integer passed to rixpress::rxp_make(cores = ...)
  • rscript_cmd: the Rscript binary to use (defaults to "Rscript")
  • timeout: optional timeout in seconds for the subprocess.run call
  • cwd: optional working directory to run Rscript in. If None, the directory containing the provided script will be used. This is important because pipeline.nix and related files are often imported with relative paths (e.g. ./default.nix), so Rscript needs to be run where those files are reachable.

Returns an RRunResult containing returncode, stdout, stderr.

Source code in src/ryxpress/r_runner.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def rxp_make(
    script: Union[str, Path] = "gen-pipeline.R",
    verbose: int = 0,
    max_jobs: int = 1,
    cores: int = 1,
    rscript_cmd: str = "Rscript",
    timeout: Optional[int] = None,
    cwd: Optional[Union[str, Path]] = None,
) -> RRunResult:
    """
    rxp_make

    Run the rixpress R pipeline (rxp_populate + rxp_make) by sourcing an R script.

    Parameters:

    - script: Path or name of the R script to run (defaults to "gen-pipeline.R").
              If a relative path is given and doesn't exist in the working directory,
              this function will attempt to locate the script on PATH.
    - verbose: integer passed to rixpress::rxp_make(verbose = ...)
    - max_jobs: integer passed to rixpress::rxp_make(max_jobs = ...)
    - cores: integer passed to rixpress::rxp_make(cores = ...)
    - rscript_cmd: the Rscript binary to use (defaults to "Rscript")
    - timeout: optional timeout in seconds for the subprocess.run call
    - cwd: optional working directory to run Rscript in. If None, the directory
           containing the provided script will be used. This is important because
           pipeline.nix and related files are often imported with relative paths
           (e.g. ./default.nix), so Rscript needs to be run where those files are reachable.

    Returns an RRunResult containing returncode, stdout, stderr.
    """
    # Validate integers
    for name, val in (("verbose", verbose), ("max_jobs", max_jobs), ("cores", cores)):
        if not isinstance(val, int):
            raise TypeError(f"{name} must be an int, got {type(val).__name__}")
        if val < 0:
            raise ValueError(f"{name} must be >= 0")

    # Resolve script path: prefer given path if it exists; otherwise try to find on PATH
    script_path = Path(script)
    if not script_path.is_file():
        # If a bare name was provided, attempt to find it on PATH
        found = shutil.which(str(script))
        if found:
            script_path = Path(found)
        else:
            raise FileNotFoundError(
                f"R script '{script}' not found in working directory and not on PATH"
            )
    else:
        script_path = script_path.resolve()

    # Determine working directory for the R process:
    if cwd is not None:
        run_cwd = Path(cwd).resolve()
        if not run_cwd.is_dir():
            raise FileNotFoundError(f"Requested cwd '{cwd}' does not exist or is not a directory")
    else:
        # default to the script's parent directory so relative imports (./default.nix) work
        run_cwd = script_path.parent

    # Verify Rscript binary exists
    if shutil.which(rscript_cmd) is None:
        raise FileNotFoundError(
            f"Rscript binary '{rscript_cmd}' not found in PATH. Ensure R is installed or adjust rscript_cmd."
        )

    # Prepare wrapper R script that:
    #  - loads rixpress,
    #  - sources the user's script,
    #  - if the sourced evaluation returns a list, calls rxp_populate on it,
    #  - then calls rixpress::rxp_make(...) with the provided args.
    wrapper = f"""
suppressPackageStartupMessages(library(rixpress))

script_path <- "{script_path.as_posix()}"

if (!file.exists(script_path)) {{
  stop("Script not found: ", script_path)
}}

result_value <- NULL

res <- tryCatch({{
  # Source & evaluate the user's script and capture the returned value (if any)
  result_value <- eval(parse(script_path))
  # If the script returned a list (a pipeline), run rxp_populate on it
  if (!is.null(result_value) && is.list(result_value)) {{
    pipeline <- result_value
    pipeline <- rixpress::rxp_populate(pipeline)
  }}
  # Finally, run rxp_make with the given integer parameters
  rixpress::rxp_make(
    verbose = {int(verbose)},
    max_jobs = {int(max_jobs)},
    cores = {int(cores)}
  )
}}, error = function(e) {{
  # Print a clear error message and exit with non-zero status
  message("rixpress-python-runner-error: ", conditionMessage(e))
  quit(status = 1)
}})

# If we reach here, exit with success
quit(status = 0)
"""

    # Create temporary file for wrapper
    with tempfile.NamedTemporaryFile(mode="w", suffix=".R", delete=False) as tf:
        tf.write(wrapper)
        wrapper_path = Path(tf.name)

    try:
        # Run Rscript on the wrapper file using the desired working directory
        proc = subprocess.run(
            [rscript_cmd, str(wrapper_path)],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            timeout=timeout,
            cwd=str(run_cwd),
        )
        return RRunResult(returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)
    finally:
        try:
            wrapper_path.unlink()
        except Exception:
            pass

Inspect the pipeline

rxp_inspect

Inspect the build result of a pipeline.

Parameters:

Name Type Description Default
- project_path

path to project root (defaults to ".")

required
- which_log

optional regex to select a specific log file. If None, the most recent log is used.

required
- pretty

if True, pretty-prints the result (and returns nothing).

required
- as_json

if True, pretty prints using json.dumps(indent=2) instead of pprint.

required

Returns:

Type Description
Optional[List[Dict[str, Any]]]

A list of dict rows parsed from the selected JSON log file (unless pretty=True).

Source code in src/ryxpress/inspect_logs.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def rxp_inspect(
    project_path: Union[str, Path] = ".",
    which_log: Optional[str] = None,
    pretty: bool = False,
    as_json: bool = False,
) -> Optional[List[Dict[str, Any]]]:
    """
    rxp_inspect

    Inspect the build result of a pipeline.

    Parameters:
      - project_path: path to project root (defaults to ".")
      - which_log: optional regex to select a specific log file. If None, the most recent log is used.
      - pretty: if True, pretty-prints the result (and returns nothing).
      - as_json: if True, pretty prints using json.dumps(indent=2) instead of pprint.

    Returns:
      A list of dict rows parsed from the selected JSON log file (unless pretty=True).

    Raises:
      FileNotFoundError if no logs are found or _rixpress missing.
      ValueError if which_log is provided but no matching filename is found.
      RuntimeError if the chosen log cannot be read/parsed.
    """
    proj = Path(project_path)
    rixpress_dir = proj / "_rixpress"

    logs = rxp_list_logs(proj)

    chosen_path: Optional[Path] = None

    if which_log is None:
        chosen_path = rixpress_dir / logs[0]["filename"]
    else:
        import re, logging
        logger = logging.getLogger(__name__)
        pattern = re.compile(which_log)
        for entry in logs:
            if pattern.search(entry["filename"]):
                chosen_path = rixpress_dir / entry["filename"]
                logger.info("Using log file: %s", entry["filename"])
                break
        if chosen_path is None:
            raise ValueError(f"No build logs found matching the pattern: {which_log}")

    try:
        with chosen_path.open("r", encoding="utf-8") as fh:
            data = json.load(fh)
    except Exception as e:
        raise RuntimeError(f"Failed to read log file {chosen_path}: {e}")

    rows = _coerce_json_to_rows(data)

    if pretty:
        if as_json:
            print(json.dumps(rows, indent=2, ensure_ascii=False))
        else:
            pprint(rows)
        return  # This ensures REPL shows nothing after print, return value is None

    return rows

rxp_list_logs

List build logs in the project's _rixpress directory.

Parameters:

Name Type Description Default
- project_path

path to project root (defaults to ".")

required
- pretty

if True, pretty-prints the result (and returns nothing).

required
- as_json

if True, pretty prints using json.dumps(indent=2) instead of pprint.

required

Returns:

Type Description
Optional[List[Dict[str, Union[str, float]]]]

A list of dictionaries, each with keys: - filename: basename of log file (str) - modification_time: ISO date string YYYY-MM-DD (str) - size_kb: file size in kilobytes rounded to 2 decimals (float)

Optional[List[Dict[str, Union[str, float]]]]

(unless pretty=True, in which case nothing is returned)

Source code in src/ryxpress/inspect_logs.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def rxp_list_logs(
    project_path: Union[str, Path] = ".",
    pretty: bool = False,
    as_json: bool = False,
) -> Optional[List[Dict[str, Union[str, float]]]]:
    """
    rxp_list_logs

    List build logs in the project's _rixpress directory.

    Parameters:
      - project_path: path to project root (defaults to ".")
      - pretty: if True, pretty-prints the result (and returns nothing).
      - as_json: if True, pretty prints using json.dumps(indent=2) instead of pprint.

    Returns:
      A list of dictionaries, each with keys:
        - filename: basename of log file (str)
        - modification_time: ISO date string YYYY-MM-DD (str)
        - size_kb: file size in kilobytes rounded to 2 decimals (float)
      (unless pretty=True, in which case nothing is returned)

    Raises:
      FileNotFoundError if the _rixpress directory does not exist or if no logs are found.
    """
    proj = Path(project_path)
    rixpress_dir = proj / "_rixpress"

    if not rixpress_dir.exists() or not rixpress_dir.is_dir():
        raise FileNotFoundError("_rixpress directory not found. Did you initialise the project?")

    pattern = re.compile(r"^build_log.*\.json$")
    log_files = [p for p in rixpress_dir.iterdir() if p.is_file() and pattern.search(p.name)]

    # Sort by modification time (most recent first)
    log_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)

    if not log_files:
        raise FileNotFoundError(f"No build logs found in {rixpress_dir}")

    logs: List[Dict[str, Union[str, float]]] = []
    for p in log_files:
        st = p.stat()
        logs.append(
            {
                "filename": p.name,
                "modification_time": _iso_date_from_epoch(st.st_mtime),
                "size_kb": round(st.st_size / 1024.0, 2),
            }
        )

    if pretty:
        if as_json:
            print(json.dumps(logs, indent=2, ensure_ascii=False))
        else:
            pprint(logs)
        return

    return logs

Recover artifacts

rxp_copy

Copy derivations from the Nix store to ./pipeline-output.

Parameters:

  • derivation_name: name of the derivation to copy (string). If None, uses the special derivation name "all-derivations" (mirrors R).
  • dir_mode / file_mode: octal permission strings applied to copied dirs/files.
  • project_path: project root where _rixpress lives (defaults to ".").

Raises:

  • FileNotFoundError if _rixpress or logs are missing.
  • ValueError on invalid modes or derivation not found.
  • RuntimeError on copy failures.
Source code in src/ryxpress/copy_artifacts.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def rxp_copy(
    derivation_name: Optional[str] = None,
    dir_mode: str = "0755",
    file_mode: str = "0644",
    project_path: Union[str, Path] = ".",
) -> None:
    """
    rxp_copy

    Copy derivations from the Nix store to ./pipeline-output.

    Parameters:

      - derivation_name: name of the derivation to copy (string). If None,
        uses the special derivation name "all-derivations" (mirrors R).
      - dir_mode / file_mode: octal permission strings applied to copied dirs/files.
      - project_path: project root where _rixpress lives (defaults to ".").

    Raises:

      - FileNotFoundError if _rixpress or logs are missing.
      - ValueError on invalid modes or derivation not found.
      - RuntimeError on copy failures.
    """
    project = Path(project_path)
    # Validate modes
    if not _valid_mode(dir_mode):
        raise ValueError('Invalid dir_mode: provide a character octal like "0755" or "755".')
    if not _valid_mode(file_mode):
        raise ValueError('Invalid file_mode: provide a character octal like "0644" or "644".')

    # Ensure there is a build log
    logs = rxp_list_logs(project)
    # rxp_list_logs raises if none; if it returned, we have log entries

    # Read latest build log content via rxp_inspect (most recent)
    rows = rxp_inspect(project_path=project, which_log=None)
    if not isinstance(rows, list) or not rows:
        raise RuntimeError("Could not read build log details; rxp_inspect returned no rows.")

    # Build a mapping from derivation name -> list of store paths
    # We try to be tolerant: look for keys 'derivation' (R), then 'deriv', 'name'
    deriv_key_candidates = ("derivation", "deriv", "name")
    path_key_candidates = ("path", "store_path", "path_store", "output_path", "output")

    deriv_to_paths: Dict[str, List[str]] = {}
    for r in rows:
        if not isinstance(r, dict):
            continue
        deriv_val = _extract_field(r, deriv_key_candidates)
        path_val = _extract_field(r, path_key_candidates)
        if deriv_val is None:
            # skip rows without a derivation name
            continue
        derivs = _ensure_iterable_of_strings(deriv_val)
        paths = _ensure_iterable_of_strings(path_val)
        for d in derivs:
            deriv_to_paths.setdefault(d, []).extend(paths)

    # Deduplicate path lists
    for k in list(deriv_to_paths.keys()):
        seen = []
        for p in deriv_to_paths[k]:
            if p not in seen:
                seen.append(p)
        deriv_to_paths[k] = seen

    # Choose derivation_name if not provided
    if derivation_name is None:
        derivation_name = "all-derivations"

    if derivation_name not in deriv_to_paths:
        # Provide hint of available derivations (up to 20)
        available = list(deriv_to_paths.keys())[:20]
        more = ", ..." if len(deriv_to_paths) > 20 else ""
        raise ValueError(
            f"No derivation {derivation_name!r} found in the build log. Available: {', '.join(available)}{more}"
        )

    # Collect paths for this derivation
    deriv_paths = deriv_to_paths.get(derivation_name, [])
    if not deriv_paths:
        raise RuntimeError(f"No store paths recorded for derivation {derivation_name!r} in the build log.")

    output_dir = _ensure_output_dir(Path.cwd())

    # For each store path, copy its contents into output_dir
    copy_failed = False
    errors: List[str] = []
    for store_path_str in deriv_paths:
        store_path = Path(store_path_str)
        if not store_path.exists():
            # Skip non-existing path (warn)
            logger.warning("Store path does not exist, skipping: %s", store_path)
            continue
        try:
            # If the derivation path is a directory, copy its children into output_dir
            if store_path.is_dir():
                # copy each child into output_dir, preserving names
                for child in store_path.iterdir():
                    dest = output_dir / child.name
                    if child.is_dir():
                        # Python 3.8+: dirs_exist_ok True will merge
                        try:
                            shutil.copytree(child, dest, dirs_exist_ok=True)
                        except TypeError:
                            # older Python: fallback to manual merge
                            if dest.exists():
                                # copy contents into existing dest
                                for sub in child.rglob("*"):
                                    rel = sub.relative_to(child)
                                    target = dest / rel
                                    if sub.is_dir():
                                        target.mkdir(parents=True, exist_ok=True)
                                    else:
                                        target.parent.mkdir(parents=True, exist_ok=True)
                                        shutil.copy2(sub, target)
                            else:
                                shutil.copytree(child, dest)
                    else:
                        # file: copy, possibly overwrite
                        shutil.copy2(child, dest)
            else:
                # store_path is a file: copy into output_dir
                dest_file = output_dir / store_path.name
                shutil.copy2(store_path, dest_file)
        except Exception as e:
            copy_failed = True
            errors.append(f"{store_path}: {e}")
            logger.debug("Copy error for %s: %s", store_path, e)

    # Apply permissions
    try:
        _apply_permissions(output_dir, dir_mode=dir_mode, file_mode=file_mode)
    except Exception:
        # Best-effort: ignore permission application errors
        logger.debug("Failed to apply permissions to %s", output_dir)

    if copy_failed:
        raise RuntimeError(f"Copy unsuccessful: errors occurred:\n" + "\n".join(errors))

    # Success message
    print(f"Copy successful, check out {output_dir}")
    return None

rxp_read

Read the output of a derivation.

Behavior:

  • If resolved to multiple paths -> return list[str].
  • If single path:
    1. If path is a directory -> return the path string.
    2. Try to pickle.load the file (regardless of extension). If successful, return object.
    3. Try rds2py (if available) to parse; if successful, return object.
    4. Otherwise return the path string.

All failures are silent; no exceptions/warnings are raised for "can't load" cases.

Source code in src/ryxpress/read_load.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def rxp_read(
    derivation_name: str,
    which_log: Optional[str] = None,
    project_path: Union[str, Path] = ".",
) -> Union[object, str, List[str]]:
    """
    rxp_read

    Read the output of a derivation.

    Behavior:

    - If resolved to multiple paths -> return list[str].
    - If single path:
        1. If path is a directory -> return the path string.
        2. Try to pickle.load the file (regardless of extension). If successful, return object.
        3. Try rds2py (if available) to parse; if successful, return object.
        4. Otherwise return the path string.

    All failures are silent; no exceptions/warnings are raised for "can't load" cases.
    """
    resolved = rxp_read_load_setup(derivation_name, which_log=which_log, project_path=project_path)

    # If multiple outputs (list), return them directly
    if isinstance(resolved, list):
        return resolved

    # Single path (string) or fallback value (derivation_name)
    path = str(resolved)

    # If path points to a directory, return it
    if os.path.isdir(path):
        return path

    # Try to unpickle first (regardless of extension)
    try:
        with open(path, "rb") as fh:
            obj = pickle.load(fh)
        return obj
    except Exception:
        # Silent failure — try the next loader
        logger.debug("pickle load failed for %s; will try rds2py if available", path, exc_info=True)

    # Try rds2py as a fallback (regardless of extension)
    rds_obj = _load_rds_with_rds2py(path)
    if rds_obj is not None:
        return rds_obj

    # Nothing worked; return the path string (no errors/warnings)
    return path

rxp_load

Load the output of a derivation into the caller's globals under the name derivation_name if successfully loaded as an object.

Otherwise return the path(s) (string or list[str]). Silent on failures.

Source code in src/ryxpress/read_load.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def rxp_load(
    derivation_name: str,
    which_log: Optional[str] = None,
    project_path: Union[str, Path] = ".",
) -> Union[object, str, List[str]]:
    """
    rxp_load

    Load the output of a derivation into the caller's globals under the name
    `derivation_name` if successfully loaded as an object.

    Otherwise return the path(s) (string or list[str]). Silent on failures.
    """
    resolved = rxp_read_load_setup(derivation_name, which_log=which_log, project_path=project_path)

    # If multiple outputs, return them
    if isinstance(resolved, list):
        return resolved

    path = str(resolved)

    if os.path.isdir(path):
        return path

    # Try to unpickle first
    try:
        with open(path, "rb") as fh:
            obj = pickle.load(fh)
    except Exception:
        obj = None
        logger.debug("pickle load failed for %s; will try rds2py if available", path, exc_info=True)

    # If pickle failed, try rds2py
    if obj is None:
        obj = _load_rds_with_rds2py(path)

    if obj is None:
        # Nothing we can load silently; return the path
        return path

    # Assign into caller's globals (best-effort); silence any assignment errors
    try:
        caller_frame = inspect.currentframe().f_back
        if caller_frame is not None:
            caller_globals = caller_frame.f_globals
            # Use derivation_name as the variable name; keep last path component if it's a path
            try:
                var_name = derivation_name
                # If derivation_name looks like a path, use the basename without extension
                if derivation_name.startswith("/nix/store/") or os.path.sep in derivation_name:
                    var_name = os.path.splitext(os.path.basename(str(path)))[0]
                # ensure valid identifier fallback
                if not var_name.isidentifier():
                    var_name = "_".join(re.findall(r"\w+", var_name)) or "loaded_artifact"
            except Exception:
                var_name = "loaded_artifact"
            caller_globals[var_name] = obj
    except Exception:
        logger.debug("Failed to assign loaded object into caller globals", exc_info=True)

    return obj

Make sense of the pipeline

rxp_dag_for_ci

Build an igraph object from nodes_and_edges and write a DOT file for CI.

  • nodes_and_edges: dict with keys 'nodes' and 'edges' as returned by get_nodes_edges(). If None, get_nodes_edges() is called.
  • output_file: path to write DOT file. Parent directories are created as needed.

Raises ImportError if python-igraph is not installed.

Source code in src/ryxpress/plotting.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def rxp_dag_for_ci(nodes_and_edges: Optional[Dict[str, List[Dict]]] = None,
                   output_file: Union[str, Path] = "_rixpress/dag.dot") -> None:
    """
    rxp_dag_for_ci

    Build an igraph object from nodes_and_edges and write a DOT file for CI.

    - nodes_and_edges: dict with keys 'nodes' and 'edges' as returned by
      get_nodes_edges(). If None, get_nodes_edges() is called.
    - output_file: path to write DOT file. Parent directories are created as needed.

    Raises ImportError if python-igraph is not installed.
    """
    # Lazy import igraph and raise helpful error if not available
    try:
        import igraph  # python-igraph
    except Exception as e:  # ImportError or other import-time errors
        raise ImportError(
            "The python 'igraph' package is required for rxp_dag_for_ci. "
            "Install it with e.g. 'pip install python-igraph' and try again."
        ) from e

    if nodes_and_edges is None:
        nodes_and_edges = get_nodes_edges()

    edges = nodes_and_edges.get("edges", [])
    # Build a list of tuples (from, to) for igraph
    edge_tuples = [(e["from"], e["to"]) for e in edges]

    # Ensure output directory exists
    out_path = Path(output_file)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    # Create the graph from edge tuples. TupleList will create vertices named by
    # the unique labels encountered in the tuples.
    # If there are no edges but there are nodes, create an empty graph and add vertices.
    if edge_tuples:
        g = igraph.Graph.TupleList(edge_tuples, directed=True, vertex_name_attr="name")
    else:
        # no edges — create graph and add vertices from nodes list
        nodes = nodes_and_edges.get("nodes", [])
        vertex_names = [n["id"] for n in nodes]
        g = igraph.Graph(directed=True)
        if vertex_names:
            g.add_vertices(vertex_names)
            # set the 'name' attribute automatically when vertices are named

    # Set vertex 'label' attribute from vertex name
    # g.vs['name'] should exist; copy to 'label'
    try:
        names = g.vs["name"]
        g.vs["label"] = names
        # Attempt to remove the 'name' attribute to mirror R behavior.
        # python-igraph allows deleting vertex attributes via 'del g.vs["attr"]'.
        try:
            del g.vs["name"]
        except Exception:
            # If deletion is not supported in some igraph versions, leave it;
            # having both 'name' and 'label' is harmless for DOT output.
            logger.debug("Could not delete 'name' vertex attribute; leaving it in place.")
    except Exception:
        # If the graph has no vertices or attribute access fails, continue.
        pass

    # Write graph to DOT format
    # Use Graph.write with format="dot"
    try:
        g.write(str(out_path), format="dot")
    except Exception as e:
        raise RuntimeError(f"Failed to write DOT file to {out_path}: {e}") from e

rxp_trace

Trace lineage of derivations.

Returns:

A dict mapping each inspected derivation name to a dict with keys: - 'dependencies' : list of dependency names (ancestors), with transitive-only names marked with '' - 'reverse_dependencies' : list of reverse dependents (children), with transitive-only names marked with ''

Side-effect:

Prints a tree representation to stdout (either the whole pipeline or the single-node lineage).

Source code in src/ryxpress/tracing.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def rxp_trace(
    name: Optional[str] = None,
    dag_file: Union[str, Path] = Path("_rixpress") / "dag.json",
    transitive: bool = True,
    include_self: bool = False,
) -> Dict[str, Dict[str, List[str]]]:
    """
    rxp_trace

    Trace lineage of derivations.

    Returns:

      A dict mapping each inspected derivation name to a dict with keys:
        - 'dependencies' : list of dependency names (ancestors), with transitive-only names marked with '*'
        - 'reverse_dependencies' : list of reverse dependents (children), with transitive-only names marked with '*'

    Side-effect:

      Prints a tree representation to stdout (either the whole pipeline or
      the single-node lineage).
    """
    derivs = _load_dag(dag_file)

    all_names: List[str] = []
    for d in derivs:
        nm = _extract_name(d)
        if nm is None:
            raise ValueError("Found derivations with missing or unparsable names in dag.json.")
        all_names.append(nm)

    if name is not None and name not in all_names:
        # mirror R's head(...) behaviour for listing available names
        snippet = ", ".join(all_names[:20])
        more = ", ..." if len(all_names) > 20 else ""
        raise ValueError(f"Derivation '{name}' not found in dag.json (available: {snippet}{more}).")

    depends_map = _make_depends_map(derivs, all_names)
    reverse_map = _build_reverse_map(depends_map, all_names)

    # helper to print single lineage (deps and reverse deps)
    def print_single(target: str) -> None:
        print(f"==== Lineage for: {target} ====")
        # Dependencies (ancestors)
        print("Dependencies (ancestors):")
        visited: List[str] = []

        def rec_dep(node: str, depth: int) -> None:
            parents = depends_map.get(node) or []
            if not parents:
                if depth == 0:
                    print("  - <none>")
                return
            for p in parents:
                label = f"{p}*" if (transitive and depth >= 1) else p
                print(("  " * (depth + 1)) + "- " + label)
                if p not in visited:
                    visited.append(p)
                    rec_dep(p, depth + 1)

        rec_dep(target, 0)

        print("\nReverse dependencies (children):")
        visited = []

        def rec_rev(node: str, depth: int) -> None:
            kids = reverse_map.get(node) or []
            if not kids:
                if depth == 0:
                    print("  - <none>")
                return
            for k in kids:
                label = f"{k}*" if (transitive and depth >= 1) else k
                print(("  " * (depth + 1)) + "- " + label)
                if k not in visited:
                    visited.append(k)
                    rec_rev(k, depth + 1)

        rec_rev(target, 0)

        if transitive:
            print("\nNote: '*' marks transitive dependencies (depth >= 2).\n")

    # helper to print forest starting from given roots, using depends_map (outputs -> inputs)
    def print_forest_once(roots: List[str], graph: Dict[str, List[str]], transitive_flag: bool) -> None:
        visited_nodes: List[str] = []

        def rec(node: str, depth: int) -> None:
            label = f"{node}*" if (transitive_flag and depth >= 2) else node
            print(("  " * depth) + "- " + label)
            if node in visited_nodes:
                return
            visited_nodes.append(node)
            kids = graph.get(node) or []
            if not kids:
                return
            for k in kids:
                rec(k, depth + 1)

        for r in roots:
            rec(r, 0)

    # sinks: nodes with no children in reverse_map
    def sinks() -> List[str]:
        no_children = [n for n, kids in reverse_map.items() if not kids]
        if no_children:
            return no_children
        outdeg_vals = {n: len(kids) for n, kids in reverse_map.items()}
        if outdeg_vals:
            min_outdeg = min(outdeg_vals.values())
            return [n for n, v in outdeg_vals.items() if v == min_outdeg]
        return []

    # Build results mapping
    results: Dict[str, Dict[str, List[str]]] = {}
    for nm in all_names:
        deps = _marked_vec(nm, depends_map, transitive)
        rdeps = _marked_vec(nm, reverse_map, transitive)
        if include_self:
            deps = _unique_preserve_order([nm] + deps)
            rdeps = _unique_preserve_order([nm] + rdeps)
        results[nm] = {"dependencies": deps, "reverse_dependencies": rdeps}

    if name is None:
        print("==== Pipeline dependency tree (outputs \u2192 inputs) ====")
        for root in sinks():
            print_forest_once([root], depends_map, transitive)
        if transitive:
            print("\nNote: '*' marks transitive dependencies (depth >= 2).\n")
        return results
    else:
        print_single(name)
        # return only the single-name mapping to match the R invisible(results[name]) behaviour
        return {name: results[name]}

Utilities

rxp_gc

Garbage collect Nix store paths and build logs produced by rixpress.

Parameters

  • keep_since: None for full GC, or a date/ISO date string (YYYY-MM-DD) to keep logs newer-or-equal to that date.
  • project_path: project root containing _rixpress
  • dry_run: if True, show what would be deleted without deleting
  • timeout_sec: timeout for invoked nix-store commands and for lock staleness checks
  • verbose: if True, print extra diagnostic output
  • ask: if True, prompt for confirmation before destructive operations (default True)

Returns:

A summary dict with canonical keys: kept, deleted, protected, deleted_count, failed_count, referenced_count, log_files_deleted, log_files_failed, dry_run_details

Source code in src/ryxpress/garbage.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def rxp_gc(
    keep_since: Optional[Union[str, date]] = None,
    project_path: Union[str, Path] = ".",
    dry_run: bool = True,
    timeout_sec: int = 300,
    verbose: bool = False,
    ask: bool = True,
    pretty: bool = False,
    as_json: bool = False,
) -> Dict[str, object]:
    """
    rxp_gc

    Garbage collect Nix store paths and build logs produced by rixpress.

    Parameters

    - keep_since: None for full GC, or a date/ISO date string (YYYY-MM-DD) to keep logs newer-or-equal to that date.
    - project_path: project root containing _rixpress
    - dry_run: if True, show what would be deleted without deleting
    - timeout_sec: timeout for invoked nix-store commands and for lock staleness checks
    - verbose: if True, print extra diagnostic output
    - ask: if True, prompt for confirmation before destructive operations (default True)

    Returns:

      A summary dict with canonical keys:
        kept, deleted, protected, deleted_count, failed_count, referenced_count,
        log_files_deleted, log_files_failed, dry_run_details
    """
    nix_bin = shutil.which("nix-store")
    if not nix_bin:
        raise FileNotFoundError("nix-store not found on PATH. Install Nix or adjust PATH.")

    project_path = Path(project_path).resolve()
    if not project_path.exists():
        raise FileNotFoundError(f"Project path does not exist: {project_path}")

    lock_file_path = Path(tempfile.gettempdir()) / "rixpress_gc.lock"

    # record of temp gcroot symlink paths we created so we can remove them later
    created_gcroot_links: List[Path] = []

    # ensure we cleanup on signals
    def _cleanup_on_signal(signum, frame):
        logger.info("Received signal %s, cleaning up...", signum)
        # remove any gcroot links
        for p in created_gcroot_links:
            try:
                if p.exists():
                    p.unlink()
            except Exception:
                pass
        # remove lock file if held
        try:
            if lock_path_context and lock_path_context.acquired:
                lock_path_context.release()
        except Exception:
            pass
        raise SystemExit(1)

    # placeholder for context so signal handler can access
    lock_path_context: Optional[LockFile] = None

    # Register handlers
    old_sigint = signal.getsignal(signal.SIGINT)
    old_sigterm = signal.getsignal(signal.SIGTERM)
    signal.signal(signal.SIGINT, _cleanup_on_signal)
    signal.signal(signal.SIGTERM, _cleanup_on_signal)

    try:
        # Acquire lock with context manager (atomic)
        lock_path_context = LockFile(lock_file_path, timeout_sec=timeout_sec)
        lock_path_context.acquire()

        # parse keep_since
        if keep_since is not None:
            if isinstance(keep_since, date) and not isinstance(keep_since, datetime):
                keep_date = keep_since
            else:
                # accept YYYY-MM-DD string
                try:
                    keep_date = _parse_iso_date(str(keep_since))
                except Exception:
                    raise ValueError("Invalid 'keep_since'. Use a date or 'YYYY-MM-DD' string.")
        else:
            keep_date = None

        # Gather logs
        all_logs = rxp_list_logs(project_path)
        # Expect list of dicts with 'filename' and 'modification_time'
        if not isinstance(all_logs, list) or not all_logs:
            logger.info("No build logs found. Nothing to do.")
            # canonical empty summary
            return {
                "kept": [],
                "deleted": [],
                "protected": 0,
                "deleted_count": 0,
                "failed_count": 0,
                "referenced_count": 0,
                "log_files_deleted": 0,
                "log_files_failed": 0,
                "dry_run_details": None,
            }

        # Partition logs
        logs_to_keep = []
        logs_to_delete = []
        for entry in all_logs:
            fn = entry.get("filename")
            mtime = entry.get("modification_time")
            if not fn or not mtime:
                continue
            try:
                mdate = _parse_iso_date(mtime)
            except Exception:
                # If malformed, treat as older than keep_since to be conservative
                mdate = datetime.min.date()
            if keep_date is None:
                logs_to_keep.append(entry)
            else:
                if mdate >= keep_date:
                    logs_to_keep.append(entry)
                else:
                    logs_to_delete.append(entry)

        def _filenames(entries: Sequence[Dict]) -> List[str]:
            return [e["filename"] for e in entries]

        # helper to get store paths per log using rxp_inspect
        def get_paths_from_logs(filenames: Sequence[str]) -> Dict[str, List[str]]:
            out: Dict[str, List[str]] = {}
            for fn in filenames:
                wl = _extract_which_log(fn)
                if wl is None:
                    logger.warning("Could not parse which_log from filename: %s", fn)
                    out[fn] = []
                    continue
                try:
                    insp_rows = rxp_inspect(project_path=project_path, which_log=wl)
                except Exception as e:
                    logger.warning("rxp_inspect failed for %s: %s", fn, e)
                    out[fn] = []
                    continue
                # rxp_inspect returns list of dicts; look for 'path' keys
                paths = []
                if isinstance(insp_rows, list):
                    for row in insp_rows:
                        if isinstance(row, dict) and "path" in row and isinstance(row["path"], str):
                            paths.append(row["path"])
                out[fn] = _validate_store_paths(paths)
            return out

        keep_paths_by_log = get_paths_from_logs(_filenames(logs_to_keep)) if logs_to_keep else {}
        delete_paths_by_log = get_paths_from_logs(_filenames(logs_to_delete)) if logs_to_delete else {}

        keep_paths_all = _validate_store_paths(sorted({p for lst in keep_paths_by_log.values() for p in lst}))
        delete_paths_all = _validate_store_paths(sorted({p for lst in delete_paths_by_log.values() for p in lst}))

        summary_info: Dict[str, object] = {
            "kept": _filenames(logs_to_keep),
            "deleted": _filenames(logs_to_delete),
            "protected": 0,
            "deleted_count": 0,
            "failed_count": 0,
            "referenced_count": 0,
            "log_files_deleted": 0,
            "log_files_failed": 0,
            "dry_run_details": None,
        }

        # DRY RUN branch (date-based)
        if keep_date is not None and dry_run:
            logger.info("--- DRY RUN --- No changes will be made. ---")
            logger.info("Logs that would be deleted (%d):", len(logs_to_delete))
            for fn in summary_info["deleted"]:
                logger.info("  %s", fn)
            details: Dict[str, List[Dict[str, str]]] = {}
            if delete_paths_by_log:
                logger.info("Artifacts per log (from rxp_inspect):")
                for fn, _ in delete_paths_by_log.items():
                    logger.info("== %s ==", fn)
                    try:
                        insp_rows = rxp_inspect(project_path=project_path, which_log=_extract_which_log(fn) or "")
                    except Exception:
                        logger.info("  (rxp_inspect unavailable)")
                        details[fn] = []
                        continue
                    rows = []
                    if isinstance(insp_rows, list):
                        for r in insp_rows:
                            if not isinstance(r, dict):
                                continue
                            rows.append({"path": r.get("path", ""), "output": r.get("output", "")})
                    details[fn] = rows
            existing_delete_paths = [p for p in delete_paths_all if os.path.exists(p) or os.path.isdir(p)]
            missing_paths = [p for p in delete_paths_all if p not in existing_delete_paths]
            logger.info("Aggregate store paths targeted for deletion (deduped): %d total, %d existing, %d missing",
                        len(delete_paths_all), len(existing_delete_paths), len(missing_paths))
            if existing_delete_paths:
                logger.info("Existing paths that would be deleted:")
                for p in existing_delete_paths:
                    logger.info("  %s", p)
            if missing_paths:
                logger.info("Paths already missing (will be skipped):")
                for p in missing_paths:
                    logger.info("  %s", p)
            summary_info["dry_run_details"] = details
            if logs_to_delete:
                logger.info("Build log files that would be deleted:")
                for fn in summary_info["deleted"]:
                    log_path = project_path / "_rixpress" / fn
                    exists_indicator = "[OK]" if log_path.exists() else "[X]"
                    logger.info("  %s %s", exists_indicator, fn)
            if pretty:
                if as_json:
                    print(json.dumps(summary_info, indent=2, ensure_ascii=False))
                else:
                    pprint(summary_info)
                return

            return summary_info

        # dry-run full GC preview
        if keep_date is None and dry_run:
            logger.info("--- DRY RUN --- Would run 'nix-store --gc' (delete all unreferenced store paths). ---")
            if verbose:
                logger.info("(Tip: for an approximate preview, run 'nix-collect-garbage -n' from a shell.)")
            return summary_info

        # Full GC mode
        if keep_date is None:
            if ask:
                proceed = _ask_yes_no("Run full Nix garbage collection (delete all unreferenced artifacts)?", default=False)
                if not proceed:
                    logger.info("Operation cancelled.")
                    return summary_info
            logger.info("Running Nix garbage collector...")
            try:
                _, stdout, stderr = _safe_run([nix_bin, "--gc"], timeout=timeout_sec, check=True)
                if stdout:
                    if verbose:
                        logger.info(stdout)
                    else:
                        rel = [l for l in stdout.splitlines() if re.search(r"freed|removing|deleting", l, re.I)]
                        if rel:
                            for line in rel[-10:]:
                                logger.info(line)
                logger.info("Garbage collection complete.")
                return summary_info
            except RxpGCError as e:
                raise

        # Targeted deletion mode
        if not logs_to_delete:
            logger.info("No build logs older than %s found. Nothing to do.", keep_date.isoformat())
            return summary_info

        if not delete_paths_all:
            logger.info("No valid store paths found in logs older than %s. Nothing to delete.", keep_date.isoformat())
            return summary_info

        prompt = f"This will permanently delete {len(delete_paths_all)} store paths from {len(logs_to_delete)} build(s) older than {keep_date.isoformat()}. Continue?"
        if ask:
            if not _ask_yes_no(prompt, default=False):
                logger.info("Operation cancelled.")
                return summary_info

        # Protect recent artifacts (date-based mode only) by adding indirect GC roots.
        temp_gcroots_dir: Optional[Path] = None
        protected = 0
        try:
            if keep_paths_all:
                temp_gcroots_dir = Path(tempfile.mkdtemp(prefix="rixpress-gc-"))
                logger.info("Protecting %d recent artifacts via GC roots...", len(keep_paths_all))
                for i, p in enumerate(keep_paths_all, start=1):
                    link_path = temp_gcroots_dir / f"root-{i}"
                    try:
                        # create a placeholder link path (the nix-store --add-root will create the gcroot)
                        # use link_path as the path to register the indirect root
                        _safe_run([nix_bin, "--add-root", str(link_path), "--indirect", p], timeout=timeout_sec, check=True)
                        created_gcroot_links.append(link_path)
                        protected += 1
                    except RxpGCError as e:
                        logger.warning("Failed to add GC root for %s: %s", p, e)
                if protected == 0:
                    raise RxpGCError("Failed to protect any store paths. Aborting.")
                summary_info["protected"] = protected

            # Delete specific store paths
            logger.info("Deleting %d targeted store paths...", len(delete_paths_all))
            existing_paths = [p for p in delete_paths_all if os.path.exists(p) or os.path.isdir(p)]
            missing_paths = [p for p in delete_paths_all if p not in existing_paths]
            if missing_paths:
                logger.info("Skipping %d paths that no longer exist.", len(missing_paths))
                if verbose:
                    for p in missing_paths:
                        logger.info("  Missing: %s", p)
            if not existing_paths:
                logger.info("No existing paths to delete. All targeted paths are already gone.")
                return summary_info

            total_deleted = 0
            failed_paths: List[str] = []
            referenced_paths: List[str] = []

            for i, pth in enumerate(existing_paths, start=1):
                if not (os.path.exists(pth) or os.path.isdir(pth)):
                    logger.info("  [%d/%d] Skipping %s (already gone)", i, len(existing_paths), os.path.basename(pth))
                    continue
                logger.info("  [%d/%d] Attempting to delete %s...", i, len(existing_paths), os.path.basename(pth))
                try:
                    proc = subprocess.run([nix_bin, "--delete", pth], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout_sec)
                    out = (proc.stdout or "") + "\n" + (proc.stderr or "")
                    if proc.returncode == 0:
                        total_deleted += 1
                        logger.info("    [OK] Successfully deleted")
                        if verbose and out.strip():
                            logger.info("    %s", out.strip())
                    else:
                        if re.search(r"still alive|Cannot delete", out, re.I):
                            referenced_paths.append(pth)
                            logger.info("    [!] Skipped (still referenced)")
                            if verbose:
                                logger.info("    Details: %s", out.strip())
                        else:
                            failed_paths.append(pth)
                            logger.info("    [X] Failed to delete")
                            if verbose:
                                logger.info("    Details: %s", out.strip())
                except subprocess.TimeoutExpired:
                    failed_paths.append(pth)
                    logger.info("    [X] Timeout while deleting")
                except Exception as e:
                    failed_paths.append(pth)
                    logger.info("    [X] Error: %s", e)

            # Summary of deletion
            logger.info("\nDeletion summary:")
            logger.info("  Successfully deleted: %d paths", total_deleted)
            logger.info("  Skipped (still referenced): %d paths", len(referenced_paths))
            logger.info("  Failed (other errors): %d paths", len(failed_paths))

            if referenced_paths and verbose:
                logger.info("\nReferenced paths (cannot delete):")
                for pth in referenced_paths:
                    logger.info("  %s", os.path.basename(pth))
                    try:
                        _, roots_out, _ = _safe_run([nix_bin, "--query", "--roots", pth], timeout=timeout_sec, check=False)
                        if roots_out.strip():
                            logger.info("    GC roots: %s", roots_out.strip().replace("\n", ", "))
                        else:
                            logger.info("    GC roots: (none found)")
                    except Exception:
                        logger.info("    GC roots: (query failed)")
                    try:
                        _, refs_out, _ = _safe_run([nix_bin, "--query", "--referrers", pth], timeout=timeout_sec, check=False)
                        if refs_out.strip():
                            refs = [os.path.basename(x) for x in refs_out.splitlines() if x.strip()]
                            logger.info("    Referenced by: %s", ", ".join(refs) if refs else "(none)")
                        else:
                            logger.info("    Referenced by: (none)")
                    except Exception:
                        logger.info("    Referenced by: (query failed)")

            summary_info["deleted_count"] = total_deleted
            summary_info["failed_count"] = len(failed_paths)
            summary_info["referenced_count"] = len(referenced_paths)

            # Delete old build log files
            if logs_to_delete:
                logger.info("\nDeleting old build log files...")
                log_files_deleted = 0
                log_files_failed: List[str] = []
                for i, entry in enumerate(logs_to_delete, start=1):
                    log_file = entry["filename"]
                    log_path = project_path / "_rixpress" / log_file
                    logger.info("  [%d/%d] Deleting %s...", i, len(logs_to_delete), log_file)
                    if not log_path.exists():
                        logger.info("    [!] File not found (already deleted?)")
                        continue
                    try:
                        log_path.unlink()
                        if not log_path.exists():
                            log_files_deleted += 1
                            logger.info("    [OK] Successfully deleted")
                        else:
                            log_files_failed.append(log_file)
                            logger.info("    [X] Failed to delete (file still exists)")
                    except Exception as e:
                        log_files_failed.append(log_file)
                        logger.info("    [X] Error: %s", e)
                logger.info("\nBuild log deletion summary:")
                logger.info("  Successfully deleted: %d files", log_files_deleted)
                logger.info("  Failed: %d files", len(log_files_failed))
                if log_files_failed and verbose:
                    logger.info("\nFailed to delete log files:")
                    for lf in log_files_failed:
                        logger.info("  %s", lf)
                summary_info["log_files_deleted"] = log_files_deleted
                summary_info["log_files_failed"] = len(log_files_failed)

            logger.info("\nCleanup complete!")
            return summary_info
        finally:
            # Always attempt to remove created gcroot links and the temp dir
            if created_gcroot_links:
                for p in created_gcroot_links:
                    try:
                        if p.exists():
                            p.unlink()
                    except Exception:
                        logger.debug("Failed to unlink gcroot link %s", p)
                # attempt to remove the parent temp directory if exists and empty
                if temp_gcroots_dir and temp_gcroots_dir.exists():
                    try:
                        shutil.rmtree(temp_gcroots_dir)
                    except Exception:
                        # ignore: best-effort cleanup
                        logger.debug("Failed to remove temp gcroots dir %s", temp_gcroots_dir)
    finally:
        # always release lock and restore signals
        try:
            if lock_path_context is not None:
                lock_path_context.release()
        except Exception:
            pass
        try:
            signal.signal(signal.SIGINT, old_sigint)
            signal.signal(signal.SIGTERM, old_sigterm)
        except Exception:
            pass