1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
(* src/arrow/arrow_bridge.ml *)
(* Conversion between Arrow column_data and T runtime values (Ast.value). *)
(* This module bridges the Arrow-backed storage with T's value system. *)
open Ast
(** Convert an Arrow column to a T value array *)
let column_to_values (col : Arrow_table.column_data) : value array =
match col with
| Arrow_table.IntColumn a ->
Array.map (fun v -> match v with Some i -> VInt i | None -> VNA NAInt) a
| Arrow_table.FloatColumn a ->
Array.map (fun v -> match v with Some f -> VFloat f | None -> VNA NAFloat) a
| Arrow_table.BoolColumn a ->
Array.map (fun v -> match v with Some b -> VBool b | None -> VNA NABool) a
| Arrow_table.StringColumn a ->
Array.map (fun v -> match v with Some s -> VString s | None -> VNA NAString) a
| Arrow_table.DateColumn a ->
Array.map (fun v -> match v with Some d -> VDate d | None -> VNA NADate) a
| Arrow_table.DatetimeColumn (a, tz) ->
Array.map (fun v -> match v with Some ts -> VDatetime (ts, tz) | None -> VNA NADate) a
| Arrow_table.NAColumn n ->
Array.make n ((VNA NAGeneric))
| Arrow_table.DictionaryColumn (a, levels, ordered) ->
Array.map (fun v -> match v with Some i -> VFactor (i, levels, ordered) | None -> (VNA NAGeneric)) a
| Arrow_table.ListColumn a ->
Array.map (function Some t -> VDataFrame { arrow_table = t; group_keys = [] } | None -> (VNA NAGeneric)) a
(** Extract a single value from an Arrow column at a given row index *)
let value_at (col : Arrow_table.column_data) (row : int) : value =
match col with
| Arrow_table.IntColumn a -> (match a.(row) with Some i -> VInt i | None -> VNA NAInt)
| Arrow_table.FloatColumn a -> (match a.(row) with Some f -> VFloat f | None -> VNA NAFloat)
| Arrow_table.BoolColumn a -> (match a.(row) with Some b -> VBool b | None -> VNA NABool)
| Arrow_table.StringColumn a -> (match a.(row) with Some s -> VString s | None -> VNA NAString)
| Arrow_table.DateColumn a -> (match a.(row) with Some d -> VDate d | None -> VNA NADate)
| Arrow_table.DatetimeColumn (a, tz) -> (match a.(row) with Some ts -> VDatetime (ts, tz) | None -> VNA NADate)
| Arrow_table.NAColumn _ -> (VNA NAGeneric)
| Arrow_table.DictionaryColumn (a, levels, ordered) ->
(match a.(row) with Some i -> VFactor (i, levels, ordered) | None -> (VNA NAGeneric))
| Arrow_table.ListColumn a ->
(match a.(row) with Some t -> VDataFrame { arrow_table = t; group_keys = [] } | None -> (VNA NAGeneric))
let values_to_column (values : value array) : Arrow_table.column_data =
(* Infer column type from non-NA values *)
let has_int = ref false in
let has_float = ref false in
let has_bool = ref false in
let has_string = ref false in
let has_date = ref false in
let has_datetime = ref false in
let has_factor = ref false in
let factor_levels = ref [] in
let factor_ordered = ref false in
let factor_inconsistent = ref false in
let has_dataframe = ref false in
let all_na = ref true in
Array.iter (fun v ->
match v with
| VInt _ -> has_int := true; all_na := false
| VFloat _ -> has_float := true; all_na := false
| VBool _ -> has_bool := true; all_na := false
| VString _ -> has_string := true; all_na := false
| VDate _ -> has_date := true; all_na := false
| VDatetime _ -> has_datetime := true; all_na := false
| VDataFrame _ -> has_dataframe := true; all_na := false
| VFactor (_, levels, ordered) ->
all_na := false;
(match !factor_levels with
| [] ->
has_factor := true;
factor_levels := levels;
factor_ordered := ordered
| existing when existing <> levels ->
(* Inconsistent level sets across factor values; fall back to string *)
factor_inconsistent := true
| _ ->
has_factor := true;
if not !factor_ordered then factor_ordered := ordered)
| VNA _ -> ()
| _ -> has_string := true; all_na := false (* fallback to string *)
) values;
if !all_na then
Arrow_table.NAColumn (Array.length values)
else if !has_dataframe then
if !has_int || !has_float || !has_bool || !has_string || !has_date || !has_datetime || !has_factor || !factor_inconsistent then
raise (Invalid_argument "values_to_column: mixed DataFrame and non-DataFrame values cannot be stored in a single column")
else
Arrow_table.ListColumn (Array.map (function
| VDataFrame df -> Some df.arrow_table
| VNA _ -> None
| _ -> None
) values)
else if !has_factor && not !factor_inconsistent then
Arrow_table.DictionaryColumn (Array.map (function
| VFactor (i, _, _) -> Some i
| VNA _ -> None
| _ -> None
) values, !factor_levels, !factor_ordered)
else if !has_datetime && not (!has_int || !has_float || !has_bool || !has_string || !has_date || !has_factor) then
let tz =
Array.fold_left (fun acc v ->
match acc, v with
| Some tz, _ -> Some tz
| None, VDatetime (_, tz) -> tz
| None, _ -> None
) None values
in
Arrow_table.DatetimeColumn (Array.map (function
| VDatetime (ts, _) -> Some ts
| VNA _ -> None
| _ -> None
) values, tz)
else if !has_date && not (!has_int || !has_float || !has_bool || !has_string || !has_datetime || !has_factor) then
Arrow_table.DateColumn (Array.map (function
| VDate d -> Some d
| VNA _ -> None
| _ -> None
) values)
else if !has_string || !factor_inconsistent then
Arrow_table.StringColumn (Array.map (fun v ->
match v with
| VString s -> Some s
| VFactor (i, levels, _) -> (match List.nth_opt levels i with Some s -> Some s | None -> None)
| VNA _ -> None
| v -> Some (Utils.value_to_string v)
) values)
else if !has_float then
Arrow_table.FloatColumn (Array.map (function
| VFloat f -> Some f
| VInt i -> Some (float_of_int i)
| VNA _ -> None
| _ -> None
) values)
else if !has_int then
Arrow_table.IntColumn (Array.map (function
| VInt i -> Some i
| VNA _ -> None
| _ -> None
) values)
else if !has_bool then
Arrow_table.BoolColumn (Array.map (function
| VBool b -> Some b
| VNA _ -> None
| _ -> None
) values)
else
Arrow_table.NAColumn (Array.length values)
(** Extract a row from an Arrow table as a T Dict (list of name-value pairs).
For native-backed tables, extracts column data via FFI as needed. *)
let row_to_dict (table : Arrow_table.t) (row_idx : int) : (string * value) list =
let get_col_data name =
match Arrow_table.get_column table name with
| Some col -> col
| None -> Arrow_table.NAColumn table.nrows
in
List.map (fun (name, _) ->
let col = get_col_data name in
let v = match col with
| Arrow_table.IntColumn a ->
(match a.(row_idx) with Some i -> VInt i | None -> VNA NAInt)
| Arrow_table.FloatColumn a ->
(match a.(row_idx) with Some f -> VFloat f | None -> VNA NAFloat)
| Arrow_table.BoolColumn a ->
(match a.(row_idx) with Some b -> VBool b | None -> VNA NABool)
| Arrow_table.StringColumn a ->
(match a.(row_idx) with Some s -> VString s | None -> VNA NAString)
| Arrow_table.DateColumn a ->
(match a.(row_idx) with Some d -> VDate d | None -> VNA NADate)
| Arrow_table.DatetimeColumn (a, tz) ->
(match a.(row_idx) with Some ts -> VDatetime (ts, tz) | None -> VNA NADate)
| Arrow_table.NAColumn _ -> (VNA NAGeneric)
| Arrow_table.DictionaryColumn (a, levels, ordered) ->
(match a.(row_idx) with Some i -> VFactor (i, levels, ordered) | None -> (VNA NAGeneric))
| Arrow_table.ListColumn a ->
(match a.(row_idx) with Some t -> VDataFrame { arrow_table = t; group_keys = [] } | None -> (VNA NAGeneric))
in
(name, v)
) table.schema
(** Create an Arrow table from T value columns *)
let table_from_value_columns (columns : (string * value array) list) (nrows : int) : Arrow_table.t =
let arrow_columns = List.map (fun (name, values) ->
(name, values_to_column values)
) columns in
Arrow_table.create arrow_columns nrows |> Arrow_table.materialize
(** Convert an Arrow table back to T value columns.
For native-backed tables, extracts column data via FFI as needed. *)
let table_to_value_columns (table : Arrow_table.t) : (string * value array) list =
List.map (fun (name, _) ->
let col = match Arrow_table.get_column table name with
| Some c -> c
| None -> Arrow_table.NAColumn table.nrows
in
(name, column_to_values col)
) table.schema
(** Recursively prepare a T value for serialization across process boundaries.
Ensures any DataFrames are materialized and native pointers are cleared. *)
let rec prepare_value_for_serialization (v : value) : value =
match v with
| VDataFrame df ->
VDataFrame { df with arrow_table = Arrow_table.prepare_for_serialization df.arrow_table }
| VList items ->
VList (List.map (fun (n, v) -> (n, prepare_value_for_serialization v)) items)
| VDict pairs ->
VDict (List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) pairs)
| VVector arr ->
VVector (Array.map prepare_value_for_serialization arr)
| VPipeline p ->
(* Pipelines store node results, build env vars, and runtime args — cleanse all value-bearing fields *)
let p_nodes = List.map (fun (n, v) -> (n, prepare_value_for_serialization v)) p.p_nodes in
let p_env_vars = List.map (fun (node, vars) ->
(node, List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) vars)
) p.p_env_vars in
let p_args = List.map (fun (node, args) ->
(node, List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) args)
) p.p_args in
VPipeline { p with p_nodes; p_env_vars; p_args }
| VNode un ->
(* Unbuilt nodes carry env_vars and args that may contain DataFrames *)
let un_env_vars = List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) un.un_env_vars in
let un_args = List.map (fun (k, v) -> (k, prepare_value_for_serialization v)) un.un_args in
VNode { un with un_env_vars; un_args }
| _ -> v