A Polyglot Data Science Pipeline with rixpress

Disclaimer: The code and this text were written with the assistance of LLMs.

Introduction: Showcasing Reproducible Pipelines

This document demonstrates the power of the {rixpress} R package to build, orchestrate, and execute a fully reproducible, polyglot data science pipeline. The primary goal is to showcase {rixpress}’s capabilities, not to conduct a deep economic analysis.

To provide a compelling example, our pipeline will perform a task common in modern quantitative research:

  1. Simulate synthetic economic data using a macroeconomic model in Julia.
  2. Train a machine learning model on this data to make forecasts in Python.
  3. Visualize the model’s performance in R.

I want to insist that this is merely a toy example project to illustrate the functionality of {rixpress}. I make no claims as to the accuracy of the LLM-generated code for model simulation.

The Economic Model: A Means to an End

We will use a foundational model from modern macroeconomics—the Real Business Cycle (RBC) model—to generate our data. The theory and log-linearized equations used in our Julia script are taken directly from the excellent lecture slides by Bianca De Paoli.

Source Attribution

The economic theory and equations for the RBC model are based on the following source. This document serves as a guide for our implementation.

Source: De Paoli, B. (2009). Slides 1: The RBC Model, Analytical and Numerical solutions. https://personal.lse.ac.uk/depaoli/RBC_slides1.pdf

Our Julia script implements the state-space solution to this system.

The Pipeline Components

Our project consists of a set of scripts, each with a dedicated purpose, all orchestrated by {rixpress}.

1. The Environment (gen-env.R)

Reproducibility starts with the environment. We use the {rix} package to declaratively define a default.nix file. This file locks down the exact versions of R, Julia, Python, and all their respective packages, ensuring the pipeline runs identically today, tomorrow, or on any machine with Nix installed.

#| eval: false
#| code-summary: "Environment Definition with {rix}"
s
# This script defines the polyglot environment our pipeline will run in.
library(rix)

# Define the complete execution environment
rix(
  # Pin the environment to a specific date to ensure that all package
  # versions are resolved as they were on this day.
  date = "2025-10-14",

  # 1. R Packages
  # We need packages for plotting, data manipulation, and reading arrow files.
  r_pkgs = c(
    "ggplot2",
    "dplyr",
    "arrow",
    "rix",
    "rixpress"
  ),

  # 2. Julia Configuration
  # We specify the Julia version and the list of packages needed
  # for our manual RBC model simulation.
  jl_conf = list(
    jl_version = "lts",
    jl_pkgs = c(
      "Distributions", # For creating random shocks
      "DataFrames", # For structuring the output
      "Arrow", # For saving the data in a cross-language format
      "Random"
    )
  ),

  # 3. Python Configuration
  # We specify the Python version and the packages needed for the
  # machine learning step.
  py_conf = list(
    py_version = "3.13",
    py_pkgs = c(
      "pandas",
      "scikit-learn",
      "xgboost",
      "pyarrow"
    )
  ),

  # We set the IDE to 'none' for a minimal environment. You could change
  # this to "rstudio" if you prefer to work interactively in RStudio.
  ide = "none",

  # Define the project path and allow overwriting the default.nix file.
  project_path = ".",
  overwrite = TRUE
)

Assuming we are working on a system that has Nix installed, we can “drop” into a temporary Nix shell with R and {rix} available:

nix-shell --expr "$(curl -sl https://raw.githubusercontent.com/ropensci/rix/main/inst/extdata/default.nix)"

Once the shell is ready, start R (by simply typing R) and run source("gen-env.R") to generate the adequate default.nix file. This is the environment that we can use to work interactively with the code while developing, and in which the pipeline will be executed.

2. The Simulation (functions.jl)

This Julia script contains a pure function that simulates the RBC model and returns a DataFrame. The code is a direct implementation of the state-space solution derived from the equations above and is saved under functions/functions.jl.

#| eval: false
#| code-summary: "Julia: RBC Model Simulation"

# This script contains the helper functions for the Julia portion of the pipeline.
# It implements the correct, stable, and canonical log-linearized solution to
# a standard Real Business Cycle (RBC) model, based on the provided lecture slides.

#using LinearAlgebra, Distributions, DataFrames, Arrow, Random

#-------------------------------------------------------------------------------
# 1. Main Simulation Function
#-------------------------------------------------------------------------------

"""
    simulate_rbc_model(α, β, δ, ρ, σ, σ_z)

    Takes RBC model parameters as input, solves for the state-space representation
    using the method of undetermined coefficients (as per the slides), simulates
    the model for 250 quarters, and returns a DataFrame.
"""
function simulate_rbc_model(α, β, δ, ρ, σ, σ_z)

    # --- STEADY-STATE VALUES AND CONSTRUCTED PARAMETERS (Slides p. 16, 31) ---
    y_k = ((1/β) - 1 + δ) / α

    # --- SOLVE FOR POLICY FUNCTION COEFFICIENTS (Slides p. 19-22) ---
    # We solve for the coefficients of the policy functions:
    # c_hat_t = c_ck * k_hat_{t-1} + c_cz * z_hat_t
    # k_hat_t = c_kk * k_hat_{t-1} + c_kz * z_hat_t

    # Solve the quadratic equation for c_ck (coefficient of capital on consumption)
    # Based on Campbell (1994) and slide 22, we take the stable root.
    # a*x^2 + b*x + c = 0
    a_quad = y_k - δ
    b_quad = -( (1-β*(1-δ))*(y_k - δ) + y_k*(1+α) + 1 )
    c_quad = y_k * α

    # The stable solution is the smaller positive root
    c_ck = (-b_quad - sqrt(b_quad^2 - 4*a_quad*c_quad)) / (2*a_quad)

    # Now solve for the other coefficients based on c_ck
    c_kk = (y_k * α) / (y_k - δ - c_ck)
    c_cz = (y_k * (1 - c_kk * (1 - α) * β * ρ)) /
           ( (y_k - δ - c_ck) * (1 - β * ρ) + σ * (1 - c_ck) * (1 - β * ρ) )
    c_kz = (c_cz * (1 - c_ck)) / (y_k * (1 - α))

    # --- BUILD STATE-SPACE MATRICES FROM SOLVED COEFFICIENTS ---
    # State vector: s_t = [k_{t-1}, z_t]'
    # Transition:   s_t = T*s_{t-1} + R*ε_t

    T = [c_kk  c_kz * ρ
         0     ρ      ]

    R = [c_kz ; 1]

    # Observation Matrix for [output, consumption, investment]
    # y_hat_t = α*k_{t-1} + z_t
    # c_hat_t = c_ck*k_{t-1} + c_cz*z_t
    # i_hat_t = (i_y_ss)^-1 * (k_t - (1-δ)k_{t-1})
    #         = (i_y_ss)^-1 * ( (c_kk - (1-δ))k_{t-1} + c_kz*z_t )

    i_y_ss = δ */ ((1/β) - 1 + δ))

    C =1
         c_ck   c_cz
         (c_kk - (1-δ))/i_y_ss   c_kz/i_y_ss]

    # --- SIMULATE THE MODEL ---
    Random.seed!(1234)
    n_periods = 250
    shocks = randn(n_periods) * σ_z

    states = zeros(2, n_periods)
    for t in 2:n_periods
        # The state is [k_{t-1}, z_t]'
        # To get the next state [k_t, z_{t+1}]', we first find k_t
        k_t = T[1,1]*states[1, t] + T[1,2]*states[2, t] + R[1]*shocks[t]
        z_tp1 = T[2,1]*states[1, t] + T[2,2]*states[2, t] + R[2]*shocks[t]

        states[1, t] = k_t # This is now k_t, which is "k_lag" for t+1
        states[2, t] = z_tp1 # This is now z_{t+1}
    end
    # Re-aligning states to be [k_{t-1}, z_t]
    k_lag = [0; states[1, 1:end-1]]
    z = [0; states[2, 1:end-1]]

    observables = C * [k_lag'; z']

    df = DataFrame(
        period = 1:n_periods,
        output = observables[1, :],
        consumption = observables[2, :],
        investment = observables[3, :],
        capital = k_lag,
        technology = z
    )

    return df
end

#-------------------------------------------------------------------------------
# 2. Encoder Function (Unchanged)
#-------------------------------------------------------------------------------
"""
    arrow_write(df::DataFrame, path::String)
    Encoder function to save a Julia DataFrame to an Arrow file.
"""
function arrow_write(df::DataFrame, path::String)
    Arrow.write(path, df)
end

At the end of the script, we create a wrapper around Arrow.write() called arrow_write() to serialize the generated data frame into an Arrow file.

While developing, it is possible to start the Julia interpreter and test things and see if they work. The generated data frame looks like:

# This is an Arrow data file, so rxp_read(),
# not knowing how to read it, will simply return
# the path to the file in the Nix store.
# We can pass this file to the adequate reader.
rxp_read("/nix/store/acvq35y12rp6c90qmcv5m0rqr0zzd0g0-simulated_rbc_data") |>
  arrow::read_feather() |>
  head()
If you're trying to load a pickle'd object,
you need to install the '{reticulate}' package.
  period       output   consumption  investment      capital    technology
1      1  0.000000000  0.0000000000  0.00000000  0.000000000  0.0000000000
2      2  0.000000000  0.0000000000  0.00000000  0.000000000  0.0000000000
3      3 -0.051252196 -0.0158385367 -0.38443873 -0.138200041 -0.0097921841
4      4  0.047203311  0.0145873040  0.35406835  0.127282340  0.0090186088
5      5 -0.001716913 -0.0005305799 -0.01287843 -0.004629604 -0.0003280313
6      6 -0.031445406 -0.0097176172 -0.23586954 -0.084791614 -0.0060079222

3. The Prediction Model (functions.py)

This Python script defines a function to perform our machine learning task. It takes the DataFrame from the Julia step, trains an XGBoost model to predict next-quarter’s output, and returns a new DataFrame containing the predictions. Of course, we could have simply used the RBC model itself for the predictions, but again, this is a toy example.

The Python script contains several little functions to make the code as modular as possible.

#| eval: false
#| code-summary: "Python: XGBoost Forecasting"

# This script contains modular helper functions for the Python portion of the pipeline.
# Each function performs a single, distinct task in the ML workflow.

# 1. Load required Python packages
# This is only needed if you run this script by hand,
# in which case, uncomment lines 8 to 11.
# With rixpress, the packages get loaded automatically.
#import pandas as pd
#import pyarrow.feather as feather
#from sklearn.model_selection import train_test_split
#import xgboost as xgb
# This script contains the helper functions for the Python portion of the pipeline.
# It defines the ML model training and prediction logic as a pure function.
# This script contains the helper functions for the Python portion of the pipeline.
# It defines the ML model training and prediction logic as a pure function.

# Step 1: Feature Engineering

def prepare_features(simulated_df: pd.DataFrame) -> pd.DataFrame:
    """Takes the raw simulated data and creates lagged features and the target variable."""
    df = simulated_df.copy()

    # Create lagged features
    for col in ['output', 'consumption', 'investment', 'capital', 'technology']:
        df[f'{col}_lag1'] = df[col].shift(1)

    # Drop the first row which now has NaNs
    df.dropna(inplace=True)

    return df

# Step 2: Data Splitting Functions
def get_X_train(processed_df: pd.DataFrame):
    """Gets the training features (X_train) from the processed data."""
    features = [col for col in processed_df.columns if '_lag1' in col]
    X = processed_df[features]
    train_size = int(0.75 * len(X))
    return X[:train_size]

def get_y_train(processed_df: pd.DataFrame):
    """Gets the training target (y_train) from the processed data."""
    y = processed_df['output']
    train_size = int(0.75 * len(y))
    return y[:train_size]

def get_X_test(processed_df: pd.DataFrame):
    """Gets the testing features (X_test) from the processed data."""
    features = [col for col in processed_df.columns if '_lag1' in col]
    X = processed_df[features]
    train_size = int(0.75 * len(X))
    return X[train_size:]

def get_y_test(processed_df: pd.DataFrame):
    """Gets the testing target (y_test) from the processed data."""
    y = processed_df['output']
    train_size = int(0.75 * len(y))
    return y[train_size:]

# Step 3: Model Training
def train_model(X_train: pd.DataFrame, y_train: pd.Series):
    """Initializes and trains the XGBoost model."""
    model = xgb.XGBRegressor(
        objective='reg:squarederror',
        n_estimators=100,
        learning_rate=0.1,
        max_depth=3,
        random_state=42
    )
    model.fit(X_train, y_train)
    return model

# Step 4: Prediction
def make_predictions(model, X_test: pd.DataFrame):
    """Makes predictions on the test set using the trained model."""
    return model.predict(X_test)

# Step 5: Format Final Results
def format_results(y_test: pd.Series, predictions) -> pd.DataFrame:
    """Combines test data and predictions into a final DataFrame for R."""
    results_df = pd.DataFrame({
        'period': y_test.index,
        'actual_output': y_test.values,
        'predicted_output': predictions
    })
    return results_df

# Encoder Function (for saving the final output)
def save_arrow(df: pd.DataFrame, path: str):
    """Encoder function to save a pandas DataFrame to an Arrow file."""
    feather.write_feather(df, path)

Just as for the Julia script, I write a wrapper around feather.write_feather() to serialize the data into an Arrow file and pass it finally to R. Just as before, it is possible to start a Python interpreter as defined in the default.nix file and use to test and debug.

4. The Visualization (functions.R)

Finally, this R script contains a function that takes the predictions from the Python step and creates a publication-quality visualization using {ggplot2}.

#| eval: false
#| code-summary: "R: ggplot2 Visualization"

# This script contains the helper functions for the R portion of the pipeline.
# It defines the visualization logic using ggplot2.

# 1. Load required R packages
# This is only needed if you run this script by hand,
# in which case, uncomment lines 8 and 9.
# With rixpress, the packages get loaded automatically.

#library(ggplot2)
#library(dplyr)

#-------------------------------------------------------------------------------
# 2. Main Function: Create the Visualization
#-------------------------------------------------------------------------------

#' Create a plot comparing actual vs. predicted output
#'
#' This function takes a data frame with actual and predicted time series data
#' and generates a ggplot visualization.
#'
#' @param predictions_df A data frame containing columns: 'period',
#'   'actual_output', and 'predicted_output'. This data frame will be the
#'   output of the Python XGBoost step.
#' @return A ggplot object.
#'
plot_predictions <- function(predictions_df) {
  # Create the plot object
  p <- ggplot(predictions_df, aes(x = period)) +

    # Add a line for the actual output from the RBC model simulation
    geom_line(
      aes(y = actual_output, color = "Actual (RBC Model)"),
      linewidth = 1
    ) +

    # Add a dashed line for the XGBoost model's predictions
    geom_line(
      aes(y = predicted_output, color = "Predicted (XGBoost)"),
      linetype = "dashed",
      linewidth = 1
    ) +

    # Define custom colors and legend labels
    scale_color_manual(
      name = "Series",
      values = c("Actual (RBC Model)" = "blue", "Predicted (XGBoost)" = "red")
    ) +

    # Add informative labels and a title
    labs(
      title = "XGBoost Prediction of RBC Model Output",
      subtitle = "Forecasting next-quarter output based on current-quarter economic variables",
      x = "Time (Quarters)",
      y = "Output (Log-deviations from steady state)"
    ) +

    # Use a clean theme
    theme_minimal() +

    # Improve legend position
    theme(legend.position = "bottom")

  # Return the final ggplot object
  return(p)
}

This simply returns the following plot:

rxp_read("/nix/store/ad4zbmln08zd23nqrwm5pi0kricprgk0-output_plot")

Orchestration with {rixpress}

The gen-pipeline.R script is the “conductor” of our orchestra. It uses {rixpress}’s intuitive, R-based syntax to define the dependencies and data flow between our multi-language components.

Notice how the output of one step becomes the input to the next, and how {rixpress} uses encoder and decoder arguments to seamlessly handle the serialization (saving to disk) and deserialization (reading from disk) of data between languages using the wrappers we wrote.

#| eval: false
#| code-summary: "The {rixpress} Pipeline Definition"

# This script defines and orchestrates the entire reproducible analytical pipeline
# using the {rixpress} package.

library(rixpress)

# Define the full pipeline as a list of derivations and pipe it to rxp_populate()
# to generate the Nix build instructions and execute the pipeline.
list(
  # STEP 0: Define RBC Model Parameters as Derivations
  # This makes the parameters an explicit part of the pipeline.
  # Changing a parameter will cause downstream steps to rebuild.
  rxp_jl(alpha, 0.3), # Capital's share of income
  rxp_jl(beta, 1 / 1.01), # Discount factor
  rxp_jl(delta, 0.025), # Depreciation rate
  rxp_jl(rho, 0.95), # Technology shock persistence
  rxp_jl(sigma, 1.0), # Risk aversion (log-utility)
  rxp_jl(sigma_z, 0.01), # Technology shock standard deviation

  # STEP 1: Julia - Simulate a Real Business Cycle (RBC) model.
  # This derivation runs our Julia script to generate the source data.
  rxp_jl(
    name = simulated_rbc_data,
    expr = "simulate_rbc_model(alpha, beta, delta, rho, sigma, sigma_z)",
    user_functions = "functions/functions.jl", # The file containing the function
    encoder = "arrow_write" # The function to use for saving the output
  ),

 # STEP 2.1: Python - Prepare features (lagging data)
  rxp_py(
    name = processed_data,
    expr = "prepare_features(simulated_rbc_data)",
    user_functions = "functions/functions.py",
    # Decode the Arrow file from Julia into a pandas DataFrame
    decoder = "feather.read_feather"
    # Note: No encoder needed here. {rixpress} will use pickle by default
    # to pass the DataFrame between Python steps.
  ),

  # STEP 2.2: Python - Split data into training and testing sets
  rxp_py(
    name = X_train,
    expr = "get_X_train(processed_data)",
    user_functions = "functions/functions.py"
  ),

  rxp_py(
    name = y_train,
    expr = "get_y_train(processed_data)",
    user_functions = "functions/functions.py"
  ),

  rxp_py(
    name = X_test,
    expr = "get_X_test(processed_data)",
    user_functions = "functions/functions.py"
  ),

  rxp_py(
    name = y_test,
    expr = "get_y_test(processed_data)",
    user_functions = "functions/functions.py"
  ),

  # STEP 2.3: Python - Train the model
  rxp_py(
    name = trained_model,
    expr = "train_model(X_train, y_train)",
    user_functions = "functions/functions.py"
  ),

  # STEP 2.4: Python - Make predictions
  rxp_py(
    name = model_predictions,
    expr = "make_predictions(trained_model, X_test)",
    user_functions = "functions/functions.py"
  ),

  # STEP 2.5: Python - Format final results for R
  rxp_py(
    name = predictions,
    expr = "format_results(y_test, model_predictions)",
    user_functions = "functions/functions.py",
    # We need an encoder here to save the final DataFrame as an Arrow file
    # so the R step can read it.
    encoder = "save_arrow"
  ),

  # STEP 3: R - Visualize the predictions from the Python model.
  # This final derivation depends on the output of the Python step.
  rxp_r(
    name = output_plot,
    expr = plot_predictions(predictions), # The function to call from functions.R
    user_functions = "functions/functions.R",
    # Specify how to load the upstream data (from Python) into R.
    decoder = arrow::read_feather
  ),

  # STEP 4: Quarto - Compile the final report.
  rxp_qmd(
    name = final_report,
    qmd_file = "readme.qmd"
  )
) |>
  rxp_populate(
    py_imports = c(
      pandas = "import pandas as pd",
      pyarrow = "import pyarrow.feather as feather",
      sklearn = "from sklearn.model_selection import train_test_split",
      xgboost = "import xgboost as xgb"
    ),
    project_path = ".", # The root of our project
    build = TRUE, # Set to TRUE to execute the pipeline immediately
    verbose = 0
  )

# Plot DAG for CI
rxp_dag_for_ci()

To run the pipeline, start the environment by simply typing nix-shell in the terminal, which will use the environment as defined in the default.nix and run source("gen-pipeline.R"). If there are issues, setting the verbose argument of rxp_populate() to 1 is helpful to see the full error logs.

Calling rxp_ggdag() shows the following plot:

rxp_ggdag()

In an interactive session, one can use rxp_read(), rxp_load() to read or load artifacts into the R session respectively and rxp_trace() to check the lineage of an artifact. The pipeline could also be built from a Python session, using ryxpress, the Python port of {rixpress}.

The Final Result

After running the entire pipeline, the final artifact is a {ggplot2} object named output_plot, which is saved in the Nix store. We can load and display it directly in this document using the rixpress::rxp_read() function.

Conclusion

This project successfully demonstrates how {rixpress} leverages the Nix package manager to create a robust, end-to-end reproducible pipeline that seamlessly integrates Julia, Python, and R. By defining each component as a pure function and orchestrating the data flow with {rixpress}, we ensure that the entire workflow—from the environment to the final plot—can be reproduced perfectly on any machine.


How to Render This Document

To render this Quarto file and display the plot, you must first run the entire pipeline as described in the previous steps. Then, from your terminal, execute the following command:

nix-shell --run "quarto render index.qmd"