1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
open Ast
type join_kind =
| Left
| Inner
| Full
| Semi
| Anti
let positional_args named_args =
List.filter_map (function None, v -> Some v | _ -> None) named_args
let find_named_arg name named_args =
List.find_map (function Some n, v when n = name -> Some v | _ -> None) named_args
let string_list_of_value = function
| VString s -> Ok [s]
| VSymbol s when String.starts_with ~prefix:"$" s -> Ok [String.sub s 1 (String.length s - 1)]
| VVector arr ->
let items = Array.to_list arr in
let maybe_names = List.map Utils.extract_column_name items in
if List.exists Option.is_none maybe_names then
Error (Error.type_error "Join keys must be strings or symbols.")
else
Ok (List.filter_map Fun.id maybe_names)
| VList items ->
let items = List.map snd items in
let maybe_names = List.map Utils.extract_column_name items in
if List.exists Option.is_none maybe_names then
Error (Error.type_error "Join keys must be strings or symbols.")
else
Ok (List.filter_map Fun.id maybe_names)
| _ -> Error (Error.type_error "Join keys must be a string, symbol, List, or Vector.")
let common_columns left right =
let right_names = Arrow_table.column_names right in
Arrow_table.column_names left
|> List.filter (fun name -> List.mem name right_names)
let parse_by named_args left right =
let positional = positional_args named_args in
let explicit_by =
match find_named_arg "by" named_args with
| Some value -> Some value
| None ->
(match positional with
| [_; _; value] -> Some value
| _ -> None)
in
let by_result =
match explicit_by with
| Some value -> string_list_of_value value
| None ->
let inferred = common_columns left right in
if inferred = [] then
Error
(Error.value_error
"Join requires at least one shared column, or an explicit `by` argument.")
else
Ok inferred
in
match by_result with
| Error err -> Error err
| Ok by ->
let missing_left =
List.filter (fun name -> not (Arrow_table.has_column left name)) by
in
let missing_right =
List.filter (fun name -> not (Arrow_table.has_column right name)) by
in
if missing_left <> [] then
Error
(Error.make_error KeyError
(Printf.sprintf
"Join key(s) not found in left DataFrame: %s."
(String.concat ", " missing_left)))
else if missing_right <> [] then
Error
(Error.make_error KeyError
(Printf.sprintf
"Join key(s) not found in right DataFrame: %s."
(String.concat ", " missing_right)))
else
Ok by
let table_rows table =
let nrows = Arrow_table.num_rows table in
Array.init nrows (fun idx -> Arrow_bridge.row_to_dict table idx)
let assoc_value row name =
match List.assoc_opt name row with
| Some value -> value
| None -> (VNA NAGeneric)
let key_of_row by row =
by
|> List.map (fun name -> Ast.Utils.value_to_string (assoc_value row name))
|> String.concat "\x1f"
let make_name_unique used base =
let rec loop idx =
let candidate =
if idx = 0 then base else Printf.sprintf "%s_%d" base idx
in
if List.mem candidate !used then
loop (idx + 1)
else begin
used := candidate :: !used;
candidate
end
in
loop 0
let right_projection left_names right_names by =
let used = ref left_names in
List.fold_left (fun acc name ->
if List.mem name by then
acc
else
let base =
if List.mem name left_names then name ^ "_y" else name
in
let output_name = make_name_unique used base in
(name, output_name) :: acc
) [] right_names
|> List.rev
let merge_left_right ~left_names ~right_projection ~by left_row right_row_opt =
let right_lookup name =
match right_row_opt with
| Some row -> assoc_value row name
| None -> (VNA NAGeneric)
in
let left_pairs =
List.map (fun name -> (name, assoc_value left_row name)) left_names
in
let right_pairs =
List.map (fun (source_name, output_name) -> (output_name, right_lookup source_name)) right_projection
in
let key_backfill =
match right_row_opt with
| Some row ->
List.map (fun name -> (name, assoc_value row name)) by
| None -> []
in
left_pairs
|> List.map (fun (name, value) ->
if value = (VNA NAGeneric) && List.mem_assoc name key_backfill then
(name, List.assoc name key_backfill)
else
(name, value))
|> fun pairs -> pairs @ right_pairs
let rows_to_dataframe ?(group_keys=[]) column_order rows =
let nrows = List.length rows in
let columns =
List.map (fun name ->
let values =
Array.of_list
(List.map (fun row ->
match List.assoc_opt name row with
| Some value -> value
| None -> (VNA NAGeneric)) rows)
in
(name, values)
) column_order
in
let group_keys = List.filter (fun key -> List.mem key column_order) group_keys in
VDataFrame {
arrow_table = Arrow_bridge.table_from_value_columns columns nrows;
group_keys;
}
let join_impl kind named_args _env =
match positional_args named_args with
| [VDataFrame left; VDataFrame right]
| [VDataFrame left; VDataFrame right; _] ->
(match parse_by named_args left.arrow_table right.arrow_table with
| Error err -> err
| Ok by ->
let left_names = Arrow_table.column_names left.arrow_table in
let right_names = Arrow_table.column_names right.arrow_table in
let right_projection = right_projection left_names right_names by in
let output_columns =
match kind with
| Semi | Anti -> left_names
| Left | Inner | Full -> left_names @ List.map snd right_projection
in
let left_rows = table_rows left.arrow_table in
let right_rows = table_rows right.arrow_table in
let right_matches = Array.make (Array.length right_rows) false in
let right_index = Hashtbl.create 32 in
Array.iteri
(fun idx row ->
let key = key_of_row by row in
let existing =
match Hashtbl.find_opt right_index key with
| Some indices -> indices
| None -> []
in
Hashtbl.replace right_index key (idx :: existing))
right_rows;
let joined_rows = ref [] in
Array.iter (fun left_row ->
let key = key_of_row by left_row in
let matches =
match Hashtbl.find_opt right_index key with
| Some indices -> List.rev indices
| None -> []
in
match kind, matches with
| Anti, [] ->
joined_rows := List.map (fun name -> (name, assoc_value left_row name)) left_names :: !joined_rows
| Anti, _ -> ()
| Semi, [] -> ()
| Semi, _ ->
joined_rows := List.map (fun name -> (name, assoc_value left_row name)) left_names :: !joined_rows
| (Left | Full), [] ->
joined_rows :=
merge_left_right ~left_names ~right_projection ~by left_row None :: !joined_rows
| Inner, [] -> ()
| _, indices ->
List.iter (fun idx ->
right_matches.(idx) <- true;
joined_rows :=
merge_left_right ~left_names ~right_projection ~by left_row (Some right_rows.(idx))
:: !joined_rows
) indices
) left_rows;
let joined_rows =
match kind with
| Full ->
let unmatched_right_rows =
Array.to_list
(Array.mapi (fun idx row -> (idx, row)) right_rows)
|> List.filter_map (fun (idx, row) ->
if right_matches.(idx) then
None
else
let left_stub =
List.map (fun name ->
if List.mem name by then
(name, assoc_value row name)
else
(name, (VNA NAGeneric))) left_names
in
Some (merge_left_right ~left_names ~right_projection ~by left_stub (Some row)))
in
List.rev !joined_rows @ unmatched_right_rows
| _ -> List.rev !joined_rows
in
let preserved_group_keys =
match kind with
| Left | Inner | Semi | Anti -> left.group_keys
| Full -> []
in
rows_to_dataframe ~group_keys:preserved_group_keys output_columns joined_rows)
| _ :: _ :: _ :: _ ->
Error.make_error ArityError
"Join functions accept exactly two positional DataFrame arguments and at most one join-key argument (or named `by`)."
| _ :: _ ->
Error.type_error "Join functions expect two DataFrames as the first positional arguments."
| [] ->
Error.make_error ArityError "Join functions require at least two DataFrames."
let bind_rows_impl args _env =
match args with
| [] ->
Error.make_error ArityError "Function `bind_rows` requires at least one DataFrame."
| _ ->
let dataframes =
List.fold_right (fun value acc ->
match value, acc with
| VDataFrame df, Ok dfs -> Ok (df :: dfs)
| _, Ok _ -> Error (Error.type_error "Function `bind_rows` expects only DataFrame arguments.")
| _, Error err -> Error err
) args (Ok [])
in
(match dataframes with
| Error err -> err
| Ok dfs ->
let column_order =
List.fold_left (fun acc df ->
List.fold_left (fun inner name ->
if List.mem name inner then inner else inner @ [name]
) acc (Arrow_table.column_names df.arrow_table)
) [] dfs
in
let rows =
List.concat_map (fun df ->
Array.to_list (table_rows df.arrow_table)
) dfs
in
rows_to_dataframe column_order rows)
let bind_cols_impl args _env =
match args with
| [] ->
Error.make_error ArityError "Function `bind_cols` requires at least one DataFrame."
| _ ->
let dataframes =
List.fold_right (fun value acc ->
match value, acc with
| VDataFrame df, Ok dfs -> Ok (df :: dfs)
| _, Ok _ -> Error (Error.type_error "Function `bind_cols` expects only DataFrame arguments.")
| _, Error err -> Error err
) args (Ok [])
in
(match dataframes with
| Error err -> err
| Ok dfs ->
let row_counts =
List.map (fun df -> Arrow_table.num_rows df.arrow_table) dfs
in
let expected_rows =
match row_counts with
| n :: _ -> n
| [] -> 0
in
if List.exists ((<>) expected_rows) row_counts then
Error.value_error "Function `bind_cols` requires all DataFrames to have the same number of rows."
else
let used_names = ref [] in
let columns =
List.concat_map (fun df ->
Arrow_bridge.table_to_value_columns df.arrow_table
|> List.map (fun (name, values) ->
let output_name = make_name_unique used_names name in
(output_name, values))
) dfs
in
VDataFrame { arrow_table = Arrow_bridge.table_from_value_columns columns expected_rows; group_keys = [] })
(*
--# Join rows from the left table
--#
--# Joins two DataFrames and keeps every row from the left-hand side.
--#
--# @name left_join
--# @family colcraft
--# @export
*)
(*
--# Join matching rows
--#
--# Joins two DataFrames and keeps only rows whose keys match in both inputs.
--#
--# @name inner_join
--# @family colcraft
--# @export
*)
(*
--# Join all rows from both tables
--#
--# Joins two DataFrames and keeps rows appearing in either input.
--#
--# @name full_join
--# @family colcraft
--# @export
*)
(*
--# Filter rows using matches in another table
--#
--# Keeps rows from the left DataFrame that have a matching key in the right DataFrame.
--#
--# @name semi_join
--# @family colcraft
--# @export
*)
(*
--# Filter rows lacking matches
--#
--# Keeps rows from the left DataFrame that do not have a matching key in the right DataFrame.
--#
--# @name anti_join
--# @family colcraft
--# @export
*)
(*
--# Stack DataFrames by rows
--#
--# Appends rows from multiple DataFrames into a single DataFrame.
--#
--# @name bind_rows
--# @family colcraft
--# @export
*)
(*
--# Combine DataFrames by columns
--#
--# Combines columns from multiple DataFrames side by side.
--#
--# @name bind_cols
--# @family colcraft
--# @export
*)
let register env =
let env =
Env.add "left_join"
(make_builtin_named ~name:"left_join" ~variadic:true 2 (join_impl Left))
env
in
let env =
Env.add "inner_join"
(make_builtin_named ~name:"inner_join" ~variadic:true 2 (join_impl Inner))
env
in
let env =
Env.add "full_join"
(make_builtin_named ~name:"full_join" ~variadic:true 2 (join_impl Full))
env
in
let env =
Env.add "semi_join"
(make_builtin_named ~name:"semi_join" ~variadic:true 2 (join_impl Semi))
env
in
let env =
Env.add "anti_join"
(make_builtin_named ~name:"anti_join" ~variadic:true 2 (join_impl Anti))
env
in
let env =
Env.add "bind_rows"
(make_builtin ~name:"bind_rows" ~variadic:true 1 bind_rows_impl)
env
in
let env =
Env.add "bind_cols"
(make_builtin ~name:"bind_cols" ~variadic:true 1 bind_cols_impl)
env
in
env