1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
open Ast

(*
--# Mutate Pipeline Node Metadata
--#
--# Modifies metadata fields on pipeline nodes. Supports a `where` named
--# argument to scope changes to a subset of nodes. Without `where`, all
--# nodes are affected.
--#
--# Mutable metadata fields: `noop` (Bool), `serializer` (String),
--# `deserializer` (String), `runtime` (String).
--#
--# The `where` clause uses NSE (`$field`) just like `filter_node`.
--#
--# @name mutate_node
--# @param p :: Pipeline The pipeline to modify.
--# @param ... :: KeywordArgs Metadata assignments as `$field = value` pairs.
--# @param where :: Function (Optional) Predicate scoping which nodes are updated.
--# @return :: Pipeline A new pipeline with updated node metadata.
--# @example
--#   p |> mutate_node($noop = true)
--#   p |> mutate_node($serializer = "pmml", where = $runtime == "R")
--# @family pipeline
--# @seealso filter_node, rename_node
--# @export
*)
let register ~eval_call env =
  Env.add "mutate_node"
    (make_builtin_named ~name:"mutate_node" ~variadic:true 1 (fun named_args env ->
      match named_args with
      | [] -> Error.arity_error_named "mutate_node" 1 0
      | (_, VPipeline p) :: rest ->
          (* Separate the optional `where` predicate from field assignments.
             Named args arrive as (string option * value) pairs. *)
          let where_pred_opt = List.assoc_opt (Some "where") rest in
          let mutations = List.filter (fun (name, _) -> name <> Some "where") rest in
          let depths = Pipeline_to_frame.compute_depths p.p_deps in
          (* Determine whether a node matches the optional where predicate *)
          let matches name =
            match where_pred_opt with
            | None -> true
            | Some pred ->
                let row_dict = VDict (Pipeline_to_frame.node_metadata_dict name p depths) in
                (match eval_call env pred [(None, Ast.mk_expr (Value row_dict))] with
                 | VBool b -> b
                 | _ -> false)
          in
          (* Apply all mutations to the appropriate pipeline fields.
             Collect the first type error if any mutation argument has the wrong type. *)
          let first_error = ref None in
          let check name type_name expected_type =
            Printf.sprintf "Function `mutate_node`: `%s` must be a %s, got %s."
              name expected_type (Utils.type_name type_name)
          in
          let new_runtimes =
            match List.assoc_opt (Some "runtime") mutations with
            | None -> p.p_runtimes
            | Some (VString v) ->
                List.map (fun (n, old) -> if matches n then (n, v) else (n, old)) p.p_runtimes
            | Some v ->
                first_error := Some (Error.type_error (check "runtime" v "String"));
                p.p_runtimes
          in
          let new_noops =
            match List.assoc_opt (Some "noop") mutations with
            | None -> p.p_noops
            | Some (VBool v) ->
                List.map (fun (n, old) -> if matches n then (n, v) else (n, old)) p.p_noops
            | Some v ->
                if !first_error = None then
                  first_error := Some (Error.type_error (check "noop" v "Bool"));
                p.p_noops
          in
          let new_serializers =
            match List.assoc_opt (Some "serializer") mutations with
            | None -> p.p_serializers
            | Some (VString v) ->
                List.map (fun (n, old) ->
                  if matches n then (n, Ast.mk_expr (Ast.Value (Ast.VString v))) else (n, old)
                ) p.p_serializers
            | Some v ->
                if !first_error = None then
                  first_error := Some (Error.type_error (check "serializer" v "String"));
                p.p_serializers
          in
          let new_deserializers =
            match List.assoc_opt (Some "deserializer") mutations with
            | None -> p.p_deserializers
            | Some (VString v) ->
                List.map (fun (n, old) ->
                  if matches n then (n, Ast.mk_expr (Ast.Value (Ast.VString v))) else (n, old)
                ) p.p_deserializers
            | Some v ->
                if !first_error = None then
                  first_error := Some (Error.type_error (check "deserializer" v "String"));
                p.p_deserializers
          in
          let new_explicit_deps, new_p_deps =
            match List.assoc_opt (Some "deps") mutations with
            | None -> p.p_explicit_deps, p.p_deps
            | Some (VList items) ->
                let invalid_dep =
                  List.find_map (fun (_, v) ->
                    match v with
                    | VString _ | VSymbol _ -> None
                    | bad -> Some bad
                  ) items
                in
                (match invalid_dep with
                | Some bad ->
                    if !first_error = None then
                      first_error := Some (Error.type_error (check "deps" bad "String or Symbol"));
                    p.p_explicit_deps, p.p_deps
                | None ->
                    let deps =
                      List.map (fun (_, v) ->
                        match v with
                        | VString s | VSymbol s -> s
                        (* Defensive: validation above guarantees all items are String/Symbol *)
                        | _ -> "_invalid"
                      ) items
                    in
                    let new_explicit = List.map (fun (n, old) -> if matches n then (n, Some deps) else (n, old)) p.p_explicit_deps in
                    let new_pdeps = List.map (fun (n, old) -> if matches n then (n, deps) else (n, old)) p.p_deps in
                    new_explicit, new_pdeps)
            | Some (VNA _) ->
                (* Clearing explicit deps would leave p_explicit_deps and p_deps inconsistent.
                   Dependency edges cannot be safely re-derived here because that requires the
                   original eval environment and raw code text. Reject the operation so callers
                   don't get a silently stale dependency graph. *)
                if !first_error = None then
                  first_error := Some (Error.type_error "Function `mutate_node` cannot clear `deps` with NA because dependency edges cannot be re-derived here; rerun the pipeline to rebuild deps.");
                p.p_explicit_deps, p.p_deps
            | Some v ->
                if !first_error = None then
                  first_error := Some (Error.type_error (check "deps" v "List of Strings or Symbols"));
                p.p_explicit_deps, p.p_deps
          in
          (match !first_error with
          | Some e -> e
          | None ->
              VPipeline {
                p with
                p_runtimes     = new_runtimes;
                p_noops        = new_noops;
                p_serializers  = new_serializers;
                p_deserializers = new_deserializers;
                p_explicit_deps = new_explicit_deps;
                p_deps         = new_p_deps;
              })
      | (_, _) :: _ -> Error.type_error "Function `mutate_node` expects a Pipeline as first argument."
    ))
    env