1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
open Ast
(*
--# Filter Pipeline Nodes
--#
--# Returns a new pipeline containing only the nodes for which the predicate
--# returns `true`. Uses NSE (`$field`) to refer to node metadata fields.
--#
--# No DAG validity check is performed. If a retained node depends on a node
--# that was removed, that inconsistency surfaces only at `build_pipeline` or
--# `pipeline_run`.
--#
--# Supported metadata fields: `$name`, `$runtime`, `$serializer`,
--# `$deserializer`, `$noop`, `$depth`, `$command_type`, `$diagnostics`.
--#
--# @name filter_node
--# @param p :: Pipeline The pipeline to filter.
--# @param predicate :: Function A predicate function returning Bool for each node.
--# @return :: Pipeline A new pipeline with only the matching nodes.
--# @example
--# p |> filter_node($runtime == "python")
--# p |> filter_node($noop == false)
--# p |> filter_node($depth <= 2)
--# p |> filter_node(!is_na($diagnostics.error))
--# @family pipeline
--# @seealso mutate_node, select_node, rename_node
--# @export
*)
let register ~eval_call env =
Env.add "filter_node"
(make_builtin ~name:"filter_node" 2 (fun args env ->
match args with
| [VPipeline p; predicate] ->
let p =
{
p with
p_nodes =
Builder.merge_pipeline_nodes_with_latest_log p;
p_node_diagnostics =
Builder.merge_pipeline_node_diagnostics_with_latest_log p;
}
in
let depths = Pipeline_to_frame.compute_depths p.p_deps in
let keep = List.filter (fun (name, _) ->
let row_dict = VDict (Pipeline_to_frame.node_metadata_dict name p depths) in
match eval_call env predicate [(None, Ast.mk_expr (Value row_dict))] with
| VBool b -> b
| _ -> false
) p.p_exprs in
let keep_names = List.map fst keep in
let keep_set name = List.mem name keep_names in
VPipeline {
p_nodes = List.filter (fun (n, _) -> keep_set n) p.p_nodes;
p_exprs = keep;
p_deps = List.filter (fun (n, _) -> keep_set n) p.p_deps;
p_imports = p.p_imports;
p_runtimes = List.filter (fun (n, _) -> keep_set n) p.p_runtimes;
p_serializers = List.filter (fun (n, _) -> keep_set n) p.p_serializers;
p_deserializers = List.filter (fun (n, _) -> keep_set n) p.p_deserializers;
p_env_vars = List.filter (fun (n, _) -> keep_set n) p.p_env_vars;
p_args = List.filter (fun (n, _) -> keep_set n) p.p_args;
p_shells = List.filter (fun (n, _) -> keep_set n) p.p_shells;
p_shell_args = List.filter (fun (n, _) -> keep_set n) p.p_shell_args;
p_functions = List.filter (fun (n, _) -> keep_set n) p.p_functions;
p_includes = List.filter (fun (n, _) -> keep_set n) p.p_includes;
p_noops = List.filter (fun (n, _) -> keep_set n) p.p_noops;
p_scripts = List.filter (fun (n, _) -> keep_set n) p.p_scripts;
p_explicit_deps = List.filter (fun (n, _) -> keep_set n) p.p_explicit_deps;
p_node_diagnostics = List.filter (fun (n, _) -> keep_set n) p.p_node_diagnostics;
}
| [_; _] -> Error.type_error "Function `filter_node` expects a Pipeline as first argument."
| _ -> Error.arity_error_named "filter_node" 2 (List.length args)
))
env