1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
(* src/arrow/arrow_io.ml *)
(* Arrow-backed CSV reading with column-level type inference. *)
(* When Arrow C GLib is available (Arrow_ffi.arrow_available = true), *)
(* uses GArrowCSVReader for native Arrow CSV parsing via FFI. *)
(* Falls back to pure OCaml CSV parsing otherwise. *)
(* ===================================================================== *)
(* Pure OCaml CSV Parsing (Fallback) *)
(* ===================================================================== *)
(** Split a CSV line into fields, handling quoted fields *)
let parse_csv_line (line : string) : string list =
let len = String.length line in
let buf = Buffer.create 64 in
let fields = ref [] in
let i = ref 0 in
let in_quotes = ref false in
while !i < len do
let c = line.[!i] in
if !in_quotes then begin
if c = '"' then begin
if !i + 1 < len && line.[!i + 1] = '"' then begin
Buffer.add_char buf '"';
i := !i + 2
end else begin
in_quotes := false;
i := !i + 1
end
end else begin
Buffer.add_char buf c;
i := !i + 1
end
end else begin
if c = '"' then begin
in_quotes := true;
i := !i + 1
end else if c = ',' then begin
fields := Buffer.contents buf :: !fields;
Buffer.clear buf;
i := !i + 1
end else begin
Buffer.add_char buf c;
i := !i + 1
end
end
done;
fields := Buffer.contents buf :: !fields;
List.rev !fields
(** Split a string into lines, handling \r\n and \n *)
let split_lines (s : string) : string list =
let lines = String.split_on_char '\n' s in
List.map (fun line ->
if String.length line > 0 && line.[String.length line - 1] = '\r' then
String.sub line 0 (String.length line - 1)
else
line
) lines
(** Check if a string is a null/NA value *)
let is_na_string (s : string) : bool =
let trimmed = String.trim s in
trimmed = "" || trimmed = "NA" || trimmed = "na" || trimmed = "N/A"
(** Column type inference — determines the best Arrow type for a column *)
let infer_column_type (values : string list) : Arrow_table.arrow_type =
let non_na = List.filter (fun s -> not (is_na_string s)) values in
if non_na = [] then Arrow_table.ArrowNA
else
let all_int = List.for_all (fun s ->
match int_of_string_opt (String.trim s) with Some _ -> true | None -> false
) non_na in
if all_int then Arrow_table.ArrowInt64
else
let all_float = List.for_all (fun s ->
match float_of_string_opt (String.trim s) with Some _ -> true | None -> false
) non_na in
if all_float then Arrow_table.ArrowFloat64
else
let all_bool = List.for_all (fun s ->
match String.lowercase_ascii (String.trim s) with
| "true" | "false" -> true | _ -> false
) non_na in
if all_bool then Arrow_table.ArrowBoolean
else Arrow_table.ArrowString
let parse_date_value (s : string) : int option =
match Chrono.parse_custom_format `Date (String.trim s) "%Y-%m-%d" None with
| Some (Ast.VDate days) -> Some days
| _ -> None
let parse_datetime_value (s : string) (tz : string option) : int64 option =
let trimmed = String.trim s in
let rec try_formats = function
| [] -> None
| fmt :: rest ->
match Chrono.parse_custom_format `Datetime trimmed fmt tz with
| Some (Ast.VDatetime (micros, _)) -> Some micros
| _ -> try_formats rest
in
try_formats [
"%Y-%m-%dT%H:%M:%S";
"%Y-%m-%d %H:%M:%S";
"%Y-%m-%dT%H:%M:%SZ";
"%Y-%m-%d %H:%M:%SZ";
]
(** Build a typed Arrow column from raw strings *)
let build_column (values : string array) (col_type : Arrow_table.arrow_type) : Arrow_table.column_data =
match col_type with
| Arrow_table.ArrowInt64 ->
Arrow_table.IntColumn (Array.map (fun s ->
if is_na_string s then None
else Some (int_of_string (String.trim s))
) values)
| Arrow_table.ArrowFloat64 ->
Arrow_table.FloatColumn (Array.map (fun s ->
if is_na_string s then None
else Some (float_of_string (String.trim s))
) values)
| Arrow_table.ArrowBoolean ->
Arrow_table.BoolColumn (Array.map (fun s ->
if is_na_string s then None
else Some (String.lowercase_ascii (String.trim s) = "true")
) values)
| Arrow_table.ArrowString ->
Arrow_table.StringColumn (Array.map (fun s ->
if is_na_string s then None
else Some (String.trim s)
) values)
| Arrow_table.ArrowDate ->
Arrow_table.DateColumn (Array.map (fun s ->
if is_na_string s then None
else parse_date_value s
) values)
| Arrow_table.ArrowTimestamp tz ->
Arrow_table.DatetimeColumn (Array.map (fun s ->
if is_na_string s then None
else parse_datetime_value s tz
) values, tz)
| Arrow_table.ArrowNA ->
Arrow_table.NAColumn (Array.length values)
| Arrow_table.ArrowDictionary | Arrow_table.ArrowList _ | Arrow_table.ArrowStruct _ ->
(* FFI loading of complex types not fully mapped in IO yet *)
Arrow_table.StringColumn (Array.map (fun s -> if is_na_string s then None else Some s) values)
(** Parse a CSV string into an Arrow table (pure OCaml fallback) *)
let parse_csv_string (content : string) : (Arrow_table.t, string) result =
let lines = split_lines content in
let lines = List.filter (fun l -> String.trim l <> "") lines in
match lines with
| [] -> Ok Arrow_table.empty
| header_line :: data_lines ->
let headers = parse_csv_line header_line in
let ncols = List.length headers in
let data_rows = List.map parse_csv_line data_lines in
let valid_rows = List.filter (fun row -> List.length row = ncols) data_rows in
let nrows = List.length valid_rows in
if nrows = 0 && List.length data_rows > 0 then
Error (Printf.sprintf
"CSV Error: Row column counts do not match header (expected %d columns)" ncols)
else
let rows_arr = Array.of_list valid_rows in
(* Extract raw string values per column *)
let raw_columns = List.mapi (fun col_idx name ->
let col_values = Array.init nrows (fun row_idx ->
List.nth rows_arr.(row_idx) col_idx
) in
(name, col_values)
) headers in
(* Infer types and build typed columns *)
let columns = List.map (fun (name, raw_vals) ->
let col_type = infer_column_type (Array.to_list raw_vals) in
(name, build_column raw_vals col_type)
) raw_columns in
Ok (Arrow_table.create columns nrows)
(* ===================================================================== *)
(* Native Arrow CSV Reading (via FFI) *)
(* ===================================================================== *)
(** Read a CSV file using the native Arrow C GLib CSV reader.
Returns Ok(Arrow_table.t) with a native_handle, or Error if reading fails. *)
let read_csv_native (path : string) : (Arrow_table.t, string) result =
match Arrow_ffi.arrow_read_csv path with
| None -> Error ("Arrow CSV read failed: " ^ path)
| Some ptr ->
let nrows = Arrow_ffi.arrow_table_num_rows ptr in
let schema = Arrow_table.schema_from_native_ptr ptr in
Ok (Arrow_table.create_from_native ptr schema nrows)
(* ===================================================================== *)
(* Public API *)
(* ===================================================================== *)
let starts_with prefix s =
let len_p = String.length prefix in
let len_s = String.length s in
len_s >= len_p && String.sub s 0 len_p = prefix
let is_url path =
starts_with "http://" path || starts_with "https://" path
let download_url ?(suffix=".csv") (url : string) : (string, string) result =
let temp_file = Filename.temp_file "tlang_" suffix in
let cmd = Printf.sprintf "curl -s -L --fail --max-time 120 --max-filesize 536870912 -o %s %s" (Filename.quote temp_file) (Filename.quote url) in
match Sys.command cmd with
| 0 -> Ok temp_file
| n ->
let _ = Sys.remove temp_file in
Error (Printf.sprintf "Download failed with exit code %d" n)
(** Read an Arrow IPC file *)
let read_ipc (path : string) : (Arrow_table.t, string) result =
match Arrow_ffi.arrow_read_ipc path with
| Some ptr ->
let nrows = Arrow_ffi.arrow_table_num_rows ptr in
let schema = Arrow_table.schema_from_native_ptr ptr in
Ok (Arrow_table.create_from_native ptr schema nrows)
| None -> Error ("Arrow IPC read failed: " ^ path)
(** Read a Parquet file into an Arrow table. *)
let read_parquet_local (path : string) : (Arrow_table.t, string) result =
match Arrow_ffi.arrow_read_parquet path with
| Some ptr ->
let nrows = Arrow_ffi.arrow_table_num_rows ptr in
let schema = Arrow_table.schema_from_native_ptr ptr in
Ok (Arrow_table.create_from_native ptr schema nrows)
| None -> Error ("Parquet read failed: " ^ path)
(** Write an Arrow table to an IPC file *)
let write_ipc (table : Arrow_table.t) (path : string) : (unit, string) result =
let table = Arrow_table.materialize table in
match table.native_handle with
| Some handle when not handle.freed ->
if Arrow_ffi.arrow_write_ipc handle.ptr path then Ok ()
else Error ("Arrow IPC write failed: " ^ path)
| _ ->
Error "Arrow IPC write failed: could not materialize table to native Arrow format"
(** Pure OCaml CSV reading fallback *)
let read_csv_fallback (path : string) : (Arrow_table.t, string) result =
try
let ch = open_in path in
let content = really_input_string ch (in_channel_length ch) in
close_in ch;
parse_csv_string content
with
| Sys_error msg -> Error ("File Error: " ^ msg ^ ".")
(** Read a CSV file into an Arrow table.
Uses native Arrow CSV reader when available, falls back to pure OCaml. *)
let read_csv_local (path : string) : (Arrow_table.t, string) result =
if not (Sys.file_exists path) then
Error (Printf.sprintf "File Error: %s: No such file or directory." path)
else if Arrow_ffi.arrow_available then (
(* Try native Arrow CSV reader first *)
match read_csv_native path with
| Ok _ as result -> result
| Error native_err ->
(* Native Arrow reader failed — fall back to pure OCaml parser.
This can happen if the file format is not supported by Arrow's
CSV reader or if the native library encounters an error. *)
Printf.eprintf
"Warning: Native Arrow CSV reader failed (%s). Falling back to the OCaml CSV parser.\n%!"
native_err;
read_csv_fallback path
) else
read_csv_fallback path
(*
--# Read a CSV file
--#
--# Reads a CSV file from a local path or URL into an Arrow table.
--# Supports automatic column type inference.
--#
--# @name read_csv
--# @param path :: String The path or URL to the CSV file.
--# @return :: Result[Table] The loaded Arrow table or an error.
--# @family arrow_io
--# @export
*)
let read_csv (path : string) : (Arrow_table.t, string) result =
if is_url path then
match download_url path with
| Ok temp_path ->
let result = read_csv_local temp_path in
(try Sys.remove temp_path with _ -> ());
result
| Error msg -> Error msg
else
read_csv_local path
(** Read a Parquet file from a local path or URL into an Arrow table. *)
let read_parquet (path : string) : (Arrow_table.t, string) result =
if is_url path then
match download_url ~suffix:".parquet" path with
| Ok temp_path ->
let result = read_parquet_local temp_path in
(try Sys.remove temp_path with _ -> ());
result
| Error msg -> Error msg
else
read_parquet_local path
(* ===================================================================== *)
(* CSV Writing *)
(* ===================================================================== *)
(** Quote a string field for CSV output if it contains the separator, quotes, or newlines *)
let csv_quote_string ~sep s =
let has_sep = String.length sep > 0 && String.contains s sep.[0] in
if has_sep || String.contains s '"' || String.contains s '\n' then
"\"" ^ String.concat "\"\"" (String.split_on_char '"' s) ^ "\""
else s
(** Convert a column value to CSV field string *)
let value_to_csv_field ~sep = function
| Ast.VInt n -> string_of_int n
| Ast.VFloat f -> string_of_float f
| Ast.VBool b -> if b then "true" else "false"
| Ast.VString s -> csv_quote_string ~sep s
| Ast.VNA _ -> "NA"
| Ast.VFactor (idx, levels, _) ->
(match List.nth_opt levels idx with
| Some s -> csv_quote_string ~sep s
| None -> "NA")
| _ -> ""
(** Write an Arrow table to a CSV file *)
(*
--# Write Table to CSV
--#
--# Writes an Arrow table to a CSV file.
--#
--# @name write_csv
--# @param table :: Table The table to write.
--# @param path :: String The output file path.
--# @param sep :: String (Optional) The column separator, defaults to ",".
--# @return :: Result[Unit] Ok(()) or an error.
--# @family arrow_io
--# @export
*)
let write_csv ?(sep=",") (table : Arrow_table.t) (path : string) : (unit, string) result =
try
let ch = open_out path in
let col_names = Arrow_table.column_names table in
let nrows = Arrow_table.num_rows table in
(* Write header *)
output_string ch (String.concat sep col_names);
output_char ch '\n';
(* Write data rows *)
let value_columns = Arrow_bridge.table_to_value_columns table in
for row_idx = 0 to nrows - 1 do
let row_values = List.map (fun (_name, col_data) ->
value_to_csv_field ~sep col_data.(row_idx)
) value_columns in
output_string ch (String.concat sep row_values);
output_char ch '\n'
done;
close_out ch;
Ok ()
with
| Sys_error msg -> Error ("File Error: " ^ msg)