1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
(* src/pipeline/builder_read_node.ml *)
open Ast
open Builder_utils
open Builder_logs

let parse_node_warnings path =
  if Sys.file_exists path then
    match Serialization.read_json path with
    | Ok (VList items) ->
        List.filter_map (fun (_, v) ->
          match v with
          | VString msg ->
              Some {
                nw_kind = "Generic";
                nw_fn = "unknown";
                nw_na_count = 0;
                nw_na_indices = [];
                nw_message = msg;
                nw_source = WarningOwn;
              }
          | VDict d ->
              let get_s k = match List.assoc_opt k d with Some (VString s) -> s | _ -> "" in
              let get_i k = match List.assoc_opt k d with Some (VInt i) -> i | _ -> 0 in
              Some {
                nw_kind = get_s "kind" |> (fun s -> if s = "" then "Generic" else s);
                nw_fn = get_s "fn" |> (fun s -> if s = "" then "unknown" else s);
                nw_na_count = get_i "na_count";
                nw_na_indices = (match List.assoc_opt "na_indices" d with Some (VList l) -> List.filter_map (fun (_, v) -> match v with VInt i -> Some i | _ -> None) l | _ -> []);
                nw_message = get_s "message";
                nw_source = WarningOwn;
              }
          | _ -> None
        ) items
    | _ -> []
  else []

let is_error_class = function
  | "VError" | "Error" -> true
  | _ -> false

let generic_logged_node_error name cn =
  {
    ne_kind = cn.cn_class;
    ne_fn = "unknown";
    ne_message = Printf.sprintf "Node `%s` failed during pipeline build." name;
    ne_na_count = 0;
  }

let node_error_of_logged_value name cn value =
  if is_error_class cn.cn_class then
    match value with
    | VError e ->
        Some {
          ne_kind = Utils.error_code_to_string e.code;
          ne_fn = "unknown";
          ne_message = e.message;
          ne_na_count = e.na_count;
        }
    | _ -> Some (generic_logged_node_error name cn)
  else
    None

let logged_node_diagnostics ?value name cn =
  let node_dir = Filename.dirname cn.cn_path in
  let warnings_path = Filename.concat node_dir "warnings" in
  let warnings = parse_node_warnings warnings_path in
  let error =
    match value with
    | Some value -> node_error_of_logged_value name cn value
    | None ->
        if is_error_class cn.cn_class then
          match Serialization.read_verror_json cn.cn_path with
          | Ok value -> node_error_of_logged_value name cn value
          | Error _ -> Some (generic_logged_node_error name cn)
        else
          None
  in
  {
    nd_warnings = warnings;
    nd_error = error;
    nd_warnings_suppressed = false;
    nd_recovered = false;
    nd_upstream_errors = [];
  }

let wrap_with_diagnostics name cn v =
  VNodeResult { v; node_name = name; diagnostics = logged_node_diagnostics ~value:v name cn }

(* Add node_name to the error context unless it is already present. *)
let add_node_name_context name context =
  if List.exists (fun (k, _) -> k = "node_name") context then context
  else ("node_name", VString name) :: context

let is_visual_metadata_class = function
  | "ggplot" | "matplotlib" | "plotnine" | "seaborn" | "plotly" | "altair" -> true
  | _ -> false

let read_standard_node_value cn =
  if cn.cn_serializer = "json" then
    match Serialization.read_json cn.cn_path with
    | Ok v -> v
    | Error _ -> VComputedNode cn
  else if cn.cn_serializer = "arrow" then
    match Arrow_io.read_ipc cn.cn_path with
    | Ok v -> VDataFrame { arrow_table = v; group_keys = [] }
    | Error _ -> VComputedNode cn
  else if cn.cn_serializer = "csv" then
    (try
       let ch = open_in cn.cn_path in
       let content = really_input_string ch (in_channel_length ch) in
       close_in ch;
       T_read_csv.parse_csv_string content
     with _ ->
       VComputedNode cn)
  else if cn.cn_serializer = "pmml" then
    match Pmml_utils.read_pmml cn.cn_path with
    | Ok v -> Pmml_utils.attach_source_path cn.cn_path v
    | Error _ -> VComputedNode cn
  else
    VComputedNode cn

let read_logged_node_value name cn =
  if cn.cn_runtime = "T"
     && cn.cn_serializer = "default"
  then
    (match Serialization.deserialize_from_file cn.cn_path with
     | Ok v -> v
     | Error msg -> Error.make_error ~context:[("runtime", VString cn.cn_runtime)] FileError (Printf.sprintf "Failed to read node `%s` from `%s`: %s" name cn.cn_path msg))
  else if cn.cn_serializer = "json" then
    (match Serialization.read_json cn.cn_path with
     | Ok v -> v
     | Error msg -> Error.make_error ~context:[("runtime", VString cn.cn_runtime)] FileError (Printf.sprintf "Failed to read JSON node `%s` from `%s`: %s" name cn.cn_path msg))
  else if cn.cn_serializer = "arrow" then
    (match Arrow_io.read_ipc cn.cn_path with
     | Ok v -> VDataFrame { arrow_table = v; group_keys = [] }
     | Error msg -> Error.make_error ~context:[("runtime", VString cn.cn_runtime)] FileError (Printf.sprintf "Failed to read Arrow node `%s` from `%s`: %s" name cn.cn_path msg))
  else if cn.cn_serializer = "csv" then
    (try
       let ch = open_in cn.cn_path in
       let content = really_input_string ch (in_channel_length ch) in
       close_in ch;
       T_read_csv.parse_csv_string content
     with exn ->
       Error.make_error ~context:[("runtime", VString cn.cn_runtime)] FileError (Printf.sprintf "Failed to read CSV node `%s` from `%s`: %s" name cn.cn_path (Printexc.to_string exn)))
  else if cn.cn_serializer = "pmml" then
    (match Pmml_utils.read_pmml cn.cn_path with
     | Ok v -> Pmml_utils.attach_source_path cn.cn_path v
     | Error msg -> Error.make_error ~context:[("runtime", VString cn.cn_runtime)] FileError (Printf.sprintf "Failed to read PMML node `%s` from `%s`: %s" name cn.cn_path msg))
  else
    VComputedNode cn

(* Best-effort deserialization for nodes exposed through T_NODE_<name> in the
   Nix sandbox: recover structured VError artifacts when possible and otherwise
   fall back to the computed node handle. *)
let read_env_node_value name cn =
  if is_error_class cn.cn_class then
    match Serialization.read_verror_json cn.cn_path with
    | Ok (VError e) -> VError { e with context = add_node_name_context name e.context }
    | Ok v -> v
    | Error _ -> VComputedNode cn
  else if is_visual_metadata_class cn.cn_class then
    let viz_path = Filename.concat (Filename.dirname cn.cn_path) "viz" in
    if Sys.file_exists viz_path then
      match Serialization.read_json viz_path with
      | Ok v -> v
      | Error _ -> VComputedNode cn
    else
      read_standard_node_value cn
  else
    read_standard_node_value cn

let candidate_logs ?which_log () =
  match which_log with
  | Some _ -> get_all_logs ()
  | None -> get_logs ()

let logged_node_value name cn =
  if is_error_class cn.cn_class then
    (match Serialization.read_verror_json cn.cn_path with
     | Ok (VError e) ->
          VError { e with context = add_node_name_context name e.context }
     | Ok v -> v
     | Error msg ->
         Error.make_error
           ~context:[("runtime", VString cn.cn_runtime)]
           FileError
           (Printf.sprintf
              "Failed to read Error node `%s` from `%s`: %s"
              name
              cn.cn_path
              msg))
  else if is_visual_metadata_class cn.cn_class then
    let viz_path = Filename.concat (Filename.dirname cn.cn_path) "viz" in
    if Sys.file_exists viz_path then
      (match Serialization.read_json viz_path with
       | Ok v -> v
       | Error msg ->
           Error.make_error
             ~context:[("runtime", VString cn.cn_runtime)]
             FileError
             (Printf.sprintf
                "Failed to read plot metadata node `%s` from `%s`: %s"
                name
                viz_path
                msg))
    else
      read_logged_node_value name cn
  else
    read_logged_node_value name cn

let pipeline_matches_logged_entries (p : Ast.pipeline_result) entries =
  let pipeline_node_names = List.map fst p.p_nodes in
  let runtimes = p.p_runtimes in
  let runtime_matches_logged_entry (name, cn) =
    match List.assoc_opt name runtimes with
    | Some runtime -> runtime = cn.cn_runtime
    | None -> true
  in
  let expected = List.sort String.compare pipeline_node_names in
  let actual = entries |> List.map fst |> List.sort String.compare in
  expected = actual
  && List.for_all runtime_matches_logged_entry entries

let matching_pipeline_log_entries ?which_log (p : Ast.pipeline_result) =
  let logs = candidate_logs ?which_log () in
  let try_log log_file =
    match read_log (Filename.concat pipeline_dir log_file) with
    | Ok entries when pipeline_matches_logged_entries p entries -> Some entries
    | _ -> None
  in
  match which_log with
  | None ->
      (* Fast path: in the common case the most recent log is the correct
         match. Try it first to avoid parsing every log in the directory. *)
      (match logs with
       | [] -> None
       | newest :: rest ->
           (match try_log newest with
            | Some _ as hit -> hit
            | None -> List.find_map try_log rest))
  | Some pattern ->
      let candidate_log_files =
        try
          let re = Str.regexp pattern in
          Some
            (List.filter
               (fun log ->
                 try
                   let _ = Str.search_forward re log 0 in
                   true
                 with Not_found -> false)
               logs)
        with Failure _ ->
          None
      in
      (match candidate_log_files with
       | Some log_files -> List.find_map try_log log_files
       | None -> None)

let merge_pipeline_nodes_with_latest_log ?which_log (p : Ast.pipeline_result) =
  let should_overlay_value = function
    | VComputedNode cn -> cn.cn_path = "<unbuilt>" || cn.cn_path = ""
    | _ -> false
  in
  match matching_pipeline_log_entries ?which_log p with
  | Some entries ->
      List.map
        (fun (name, value) ->
          match value, List.assoc_opt name entries with
          | _, None -> (name, value)
          | value, Some cn when should_overlay_value value ->
              (name, logged_node_value name cn)
          | _ -> (name, value))
        p.p_nodes
  | None ->
      p.p_nodes

let read_node ?which_log name =
  let env_name = "T_NODE_" ^ name in
  match Sys.getenv_opt env_name with
  | Some path when which_log = None ->
      let artifact_path = Filename.concat path "artifact" in
      let class_path = Filename.concat path "class" in
      if Sys.file_exists artifact_path && Sys.file_exists class_path then
        let ch = open_in class_path in
        let cls = try input_line ch |> String.trim with _ -> "unknown" in
        close_in ch;
        
        let cn = {
          cn_name = name;
          cn_runtime = "unknown";
          cn_path = artifact_path;
          cn_serializer = (
            match cls with
            | "ArrowDataFrame" | "data.frame" | "DataFrame" | "Table" -> "arrow"
            | "JSON" | "VDict" | "VList" | "list" | "dict" -> "json"
            | "PMML" | "pmml" -> "pmml"
            | _ -> "default"
           );
          cn_class = cls;
          cn_dependencies = [];
        } in
        
        let v = read_env_node_value name cn in
        wrap_with_diagnostics name cn v
      else
        Error.make_error FileError (Printf.sprintf "read_node: node `%s` found in environment as %s, but artifact is missing." name path)
  | _ ->
      let logs = candidate_logs ?which_log () in
  let log_file_result =
    match which_log with
    | None -> Ok (match logs with [] -> None | l :: _ -> Some l)
    | Some pattern ->
        (try
          Ok (List.find_opt (fun l ->
            try let _ = Str.search_forward (Str.regexp pattern) l 0 in true
            with Not_found -> false
          ) logs)
        with Failure msg ->
          Error msg)
  in
  match log_file_result with
  | Error msg ->
      Error.type_error (Printf.sprintf "read_node: invalid regex pattern for 'which_log': %s" msg)
  | Ok None ->
      let suffix = match which_log with
        | Some pat -> " matching \"" ^ pat ^ "\""
        | None -> ""
      in
      Error.make_error FileError
        (Printf.sprintf "No build logs found in `_pipeline/`%s. Run `populate_pipeline(p, build=true)` first." suffix)
  | Ok (Some f) ->
      match read_log (Filename.concat pipeline_dir f) with
       | Error msg -> Error.make_error FileError (Printf.sprintf "Failed to read log `%s`: %s" f msg)
       | Ok entries ->
           (match List.assoc_opt name entries with
           | None -> Error.make_error KeyError (Printf.sprintf "Node `%s` not found in build log `%s`." name f)
           | Some cn ->
               let v = logged_node_value name cn in
               wrap_with_diagnostics name cn v)

let merge_pipeline_node_diagnostics_with_latest_log ?which_log (p : Ast.pipeline_result) =
  let merge_diagnostics base overlay =
    {
      nd_warnings =
        if base.nd_warnings <> [] then base.nd_warnings else overlay.nd_warnings;
      nd_error =
        (match base.nd_error with
         | Some _ -> base.nd_error
         | None -> overlay.nd_error);
      nd_warnings_suppressed = base.nd_warnings_suppressed;
      nd_recovered = base.nd_recovered;
      nd_upstream_errors = base.nd_upstream_errors;
    }
  in
  match matching_pipeline_log_entries ?which_log p with
  | Some entries ->
      List.map
        (fun name ->
          let base =
            match List.assoc_opt name p.p_node_diagnostics with
            | Some diagnostics -> diagnostics
            | None -> Ast.Utils.empty_node_diagnostics
          in
          match List.assoc_opt name entries with
          | Some cn ->
              let overlay = logged_node_diagnostics name cn in
              (name, merge_diagnostics base overlay)
          | None ->
              (name, base))
        (List.map fst p.p_nodes)
  | None ->
      p.p_node_diagnostics