1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
(* src/arrow/arrow_bridge.ml *)
(* Conversion between Arrow column_data and T runtime values (Ast.value). *)
(* This module bridges the Arrow-backed storage with T's value system.    *)

open Ast

(** Convert an Arrow column to a T value array *)
let column_to_values (col : Arrow_table.column_data) : value array =
  match col with
  | Arrow_table.IntColumn a ->
      Array.map (fun v -> match v with Some i -> VInt i | None -> VNA NAInt) a
  | Arrow_table.FloatColumn a ->
      Array.map (fun v -> match v with Some f -> VFloat f | None -> VNA NAFloat) a
  | Arrow_table.BoolColumn a ->
      Array.map (fun v -> match v with Some b -> VBool b | None -> VNA NABool) a
  | Arrow_table.StringColumn a ->
      Array.map (fun v -> match v with Some s -> VString s | None -> VNA NAString) a
  | Arrow_table.DateColumn a ->
      Array.map (fun v -> match v with Some d -> VDate d | None -> VNA NADate) a
  | Arrow_table.DatetimeColumn (a, tz) ->
      Array.map (fun v -> match v with Some ts -> VDatetime (ts, tz) | None -> VNA NADate) a
  | Arrow_table.NAColumn n ->
      Array.make n ((VNA NAGeneric))
  | Arrow_table.DictionaryColumn (a, levels, ordered) ->
      Array.map (fun v -> match v with Some i -> VFactor (i, levels, ordered) | None -> (VNA NAGeneric)) a
  | Arrow_table.ListColumn a ->
      Array.map (function Some t -> VDataFrame { arrow_table = t; group_keys = [] } | None -> (VNA NAGeneric)) a

(** Extract a single value from an Arrow column at a given row index *)
let value_at (col : Arrow_table.column_data) (row : int) : value =
  match col with
  | Arrow_table.IntColumn a -> (match a.(row) with Some i -> VInt i | None -> VNA NAInt)
  | Arrow_table.FloatColumn a -> (match a.(row) with Some f -> VFloat f | None -> VNA NAFloat)
  | Arrow_table.BoolColumn a -> (match a.(row) with Some b -> VBool b | None -> VNA NABool)
  | Arrow_table.StringColumn a -> (match a.(row) with Some s -> VString s | None -> VNA NAString)
  | Arrow_table.DateColumn a -> (match a.(row) with Some d -> VDate d | None -> VNA NADate)
  | Arrow_table.DatetimeColumn (a, tz) -> (match a.(row) with Some ts -> VDatetime (ts, tz) | None -> VNA NADate)
  | Arrow_table.NAColumn _ -> (VNA NAGeneric)
  | Arrow_table.DictionaryColumn (a, levels, ordered) ->
      (match a.(row) with Some i -> VFactor (i, levels, ordered) | None -> (VNA NAGeneric))
  | Arrow_table.ListColumn a ->
      (match a.(row) with Some t -> VDataFrame { arrow_table = t; group_keys = [] } | None -> (VNA NAGeneric))
let values_to_column (values : value array) : Arrow_table.column_data =
  (* Infer column type from non-NA values *)
  let has_int = ref false in
  let has_float = ref false in
  let has_bool = ref false in
  let has_string = ref false in
  let has_date = ref false in
  let has_datetime = ref false in
  let has_factor = ref false in
  let factor_levels = ref [] in
  let factor_ordered = ref false in
  let factor_inconsistent = ref false in
  let has_dataframe = ref false in
  let all_na = ref true in
  Array.iter (fun v ->
    match v with
    | VInt _ -> has_int := true; all_na := false
    | VFloat _ -> has_float := true; all_na := false
    | VBool _ -> has_bool := true; all_na := false
    | VString _ -> has_string := true; all_na := false
    | VDate _ -> has_date := true; all_na := false
    | VDatetime _ -> has_datetime := true; all_na := false
    | VDataFrame _ -> has_dataframe := true; all_na := false
    | VFactor (_, levels, ordered) ->
        all_na := false;
        (match !factor_levels with
         | [] ->
             has_factor := true;
             factor_levels := levels;
             factor_ordered := ordered
         | existing when existing <> levels ->
             (* Inconsistent level sets across factor values; fall back to string *)
             factor_inconsistent := true
         | _ ->
             has_factor := true;
             if not !factor_ordered then factor_ordered := ordered)
    | VNA _ -> ()
    | _ -> has_string := true; all_na := false  (* fallback to string *)
  ) values;
  if !all_na then
    Arrow_table.NAColumn (Array.length values)
  else if !has_dataframe then
    if !has_int || !has_float || !has_bool || !has_string || !has_date || !has_datetime || !has_factor || !factor_inconsistent then
      raise (Invalid_argument "values_to_column: mixed DataFrame and non-DataFrame values cannot be stored in a single column")
    else
      Arrow_table.ListColumn (Array.map (function
        | VDataFrame df -> Some df.arrow_table
        | VNA _ -> None
        | _ -> None
      ) values)
  else if !has_factor && not !factor_inconsistent then
    Arrow_table.DictionaryColumn (Array.map (function
      | VFactor (i, _, _) -> Some i
      | VNA _ -> None
      | _ -> None
    ) values, !factor_levels, !factor_ordered)
  else if !has_datetime && not (!has_int || !has_float || !has_bool || !has_string || !has_date || !has_factor) then
    let tz =
      Array.fold_left (fun acc v ->
        match acc, v with
        | Some tz, _ -> Some tz
        | None, VDatetime (_, tz) -> tz
        | None, _ -> None
      ) None values
    in
    Arrow_table.DatetimeColumn (Array.map (function
      | VDatetime (ts, _) -> Some ts
      | VNA _ -> None
      | _ -> None
    ) values, tz)
  else if !has_date && not (!has_int || !has_float || !has_bool || !has_string || !has_datetime || !has_factor) then
    Arrow_table.DateColumn (Array.map (function
      | VDate d -> Some d
      | VNA _ -> None
      | _ -> None
    ) values)
  else if !has_string || !factor_inconsistent then
    Arrow_table.StringColumn (Array.map (fun v ->
      match v with
      | VString s -> Some s
      | VFactor (i, levels, _) -> (match List.nth_opt levels i with Some s -> Some s | None -> None)
      | VNA _ -> None
      | v -> Some (Utils.value_to_string v)
    ) values)
  else if !has_float then
    Arrow_table.FloatColumn (Array.map (function
      | VFloat f -> Some f
      | VInt i -> Some (float_of_int i)
      | VNA _ -> None
      | _ -> None
    ) values)
  else if !has_int then
    Arrow_table.IntColumn (Array.map (function
      | VInt i -> Some i
      | VNA _ -> None
      | _ -> None
    ) values)
  else if !has_bool then
    Arrow_table.BoolColumn (Array.map (function
      | VBool b -> Some b
      | VNA _ -> None
      | _ -> None
    ) values)
  else
    Arrow_table.NAColumn (Array.length values)

(** Extract a row from an Arrow table as a T Dict (list of name-value pairs).
    For native-backed tables, extracts column data via FFI as needed. *)
let row_to_dict (table : Arrow_table.t) (row_idx : int) : (string * value) list =
  let get_col_data name =
    match Arrow_table.get_column table name with
    | Some col -> col
    | None -> Arrow_table.NAColumn table.nrows
  in
  List.map (fun (name, _) ->
    let col = get_col_data name in
    let v = match col with
      | Arrow_table.IntColumn a ->
          (match a.(row_idx) with Some i -> VInt i | None -> VNA NAInt)
      | Arrow_table.FloatColumn a ->
          (match a.(row_idx) with Some f -> VFloat f | None -> VNA NAFloat)
      | Arrow_table.BoolColumn a ->
          (match a.(row_idx) with Some b -> VBool b | None -> VNA NABool)
      | Arrow_table.StringColumn a ->
          (match a.(row_idx) with Some s -> VString s | None -> VNA NAString)
      | Arrow_table.DateColumn a ->
          (match a.(row_idx) with Some d -> VDate d | None -> VNA NADate)
      | Arrow_table.DatetimeColumn (a, tz) ->
          (match a.(row_idx) with Some ts -> VDatetime (ts, tz) | None -> VNA NADate)
      | Arrow_table.NAColumn _ -> (VNA NAGeneric)
      | Arrow_table.DictionaryColumn (a, levels, ordered) ->
          (match a.(row_idx) with Some i -> VFactor (i, levels, ordered) | None -> (VNA NAGeneric))
      | Arrow_table.ListColumn a ->
          (match a.(row_idx) with Some t -> VDataFrame { arrow_table = t; group_keys = [] } | None -> (VNA NAGeneric))
    in
    (name, v)
  ) table.schema

(** Create an Arrow table from T value columns *)
let table_from_value_columns (columns : (string * value array) list) (nrows : int) : Arrow_table.t =
  let arrow_columns = List.map (fun (name, values) ->
    (name, values_to_column values)
  ) columns in
  Arrow_table.create arrow_columns nrows |> Arrow_table.materialize

(** Convert an Arrow table back to T value columns.
    For native-backed tables, extracts column data via FFI as needed. *)
let table_to_value_columns (table : Arrow_table.t) : (string * value array) list =
  List.map (fun (name, _) ->
    let col = match Arrow_table.get_column table name with
      | Some c -> c
      | None -> Arrow_table.NAColumn table.nrows
    in
    (name, column_to_values col)
  ) table.schema

(** Recursively prepare a T value for serialization across process boundaries.
    Ensures any DataFrames are materialized and native pointers are cleared. *)
let rec prepare_value_for_serialization (v : value) : value =
  match v with
  | VDataFrame df ->
      VDataFrame { df with arrow_table = Arrow_table.prepare_for_serialization df.arrow_table }
  | VList items ->
      VList (List.map (fun (n, v) -> (n, prepare_value_for_serialization v)) items)
  | VDict pairs ->
      VDict (List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) pairs)
  | VVector arr ->
      VVector (Array.map prepare_value_for_serialization arr)
  | VPipeline p ->
      (* Pipelines store node results, build env vars, and runtime args — cleanse all value-bearing fields *)
      let p_nodes = List.map (fun (n, v) -> (n, prepare_value_for_serialization v)) p.p_nodes in
      let p_env_vars = List.map (fun (node, vars) ->
        (node, List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) vars)
      ) p.p_env_vars in
      let p_args = List.map (fun (node, args) ->
        (node, List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) args)
      ) p.p_args in
      VPipeline { p with p_nodes; p_env_vars; p_args }
  | VNode un ->
      (* Unbuilt nodes carry env_vars and args that may contain DataFrames *)
      let un_env_vars = List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) un.un_env_vars in
      let un_args = List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) un.un_args in
      VNode { un with un_env_vars; un_args }
  | _ -> v