1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
open Ast
open Arrow_table

(* expand_input describes one "slot" of the cartesian product:
   - Single: all unique values of a single column
   - Nested: all existing row-wise combinations of a group of columns *)
type expand_input =
  | Single of string * value list
  | Nested of string list * (value list) list

(*
--# Complete a data frame
--#
--# Turns implicit missing values into explicit missing values.
--# Supports nesting() to restrict combinations to those present in the data.
--#
--# @name complete
--# @param df :: DataFrame The DataFrame.
--# @param ... :: Symbol | Call Variable number of column names (use $col syntax) or nesting(...) calls.
--# @param fill :: Dict (Optional) A dictionary supplying a single value to use instead of NA for missing combinations.
--# @param explicit :: Bool (Optional) Should both implicit and explicit missing values be filled? (Default: true)
--# @return :: DataFrame The completed DataFrame.
--# @example
--#   complete(df, $group, $item_id, $item_name)
--#   complete(df, $group, nesting($item_id, $item_name))
--# @family colcraft
--# @export
*)
let register env =
  Env.add "complete"
    (make_builtin_named ~name:"complete" ~variadic:true 1 (fun named_args _env ->
      let df_arg = match named_args with
        | (_, VDataFrame df) :: _ -> Some df
        | _ -> None
      in
      
      let get_named k = List.find_map (fun (nk, v) -> if nk = Some k then Some v else None) named_args in
      let positional = List.filter_map (fun (k, v) -> if k = None then Some v else None) named_args in
      
      let fill_dict = match get_named "fill" with
        | Some (VDict d) -> d
        | _ -> []
      in

      let explicit_val = match get_named "explicit" with
        | Some (VBool b) -> b
        | _ -> true
      in
      
      let id_cols_variants = match positional with _::tail -> tail | [] -> [] in

      match df_arg with
      | None -> Error.type_error "Function `complete` expects a DataFrame as first argument."
      | Some df ->
          
          let orig_nrows = Arrow_table.num_rows df.arrow_table in
          let all_cols = Arrow_table.column_names df.arrow_table in

          (* Gather a single value from a column at a given row index *)
           let get_val col i =
             match Arrow_table.get_column df.arrow_table col with
              | Some (StringColumn a) -> (match a.(i) with Some x -> VString x | None -> (VNA NAGeneric))
              | Some (IntColumn a) -> (match a.(i) with Some x -> VInt x | None -> (VNA NAGeneric))
              | Some (FloatColumn a) -> (match a.(i) with Some x -> VFloat x | None -> (VNA NAGeneric))
              | Some (BoolColumn a) -> (match a.(i) with Some x -> VBool x | None -> (VNA NAGeneric))
              | Some (DateColumn a) -> (match a.(i) with Some x -> VDate x | None -> VNA NADate)
              | Some (DatetimeColumn (a, tz)) -> (match a.(i) with Some x -> VDatetime (x, tz) | None -> VNA NADate)
              | _ -> (VNA NAGeneric)
           in

          (* Get unique values for a single column (insertion-order preserved) *)
          let get_unique_vals col =
            let seen = Hashtbl.create orig_nrows in
            let ordered = ref [] in
            for i = 0 to orig_nrows - 1 do
              let v = get_val col i in
              if not (Hashtbl.mem seen v) then begin
                Hashtbl.add seen v ();
                ordered := v :: !ordered
              end
            done;
            List.rev !ordered
          in

          (* Get unique row-wise combinations for a set of columns (sorted) *)
          let get_nested_combos cols =
            let seen = Hashtbl.create orig_nrows in
            let ordered = ref [] in
            for i = 0 to orig_nrows - 1 do
              let row = List.map (fun c -> get_val c i) cols in
              if not (Hashtbl.mem seen row) then begin
                Hashtbl.add seen row ();
                ordered := row :: !ordered
              end
            done;
            List.rev !ordered
          in

          (* Parse each positional arg into an expand_input.
             nesting() returns a VDict with key "__nesting__" (see expand.ml:nesting_impl);
             we detect this marker here to restrict those columns to existing combinations. *)
          let expand_inputs = List.filter_map (fun v ->
            match v with
            | VDict d when List.mem_assoc "__nesting__" d ->
                (* Dict produced by nesting(): cols holds the column symbol list *)
                let cols = match List.assoc_opt "cols" d with
                  | Some (VList l) -> List.filter_map (fun (_, sv) -> Utils.extract_column_name sv) l
                  | _ -> []
                in
                if cols = [] then None
                else Some (Nested (cols, get_nested_combos cols))
            | _ ->
                (match Utils.extract_column_name v with
                 | Some col -> Some (Single (col, get_unique_vals col))
                 | None -> None)
          ) id_cols_variants in

          if expand_inputs = [] then
            Error.make_error ValueError "Function `complete` requires at least one column or nesting() expression."
          else

          (* Flat list of all id column names, in order *)
          let id_cols = List.concat_map (function
            | Single (n, _) -> [n]
            | Nested (ns, _) -> ns
          ) expand_inputs in

          let missing_cols = List.filter (fun c -> not (List.mem c all_cols)) id_cols in
          if missing_cols <> [] then Error.make_error KeyError (Printf.sprintf "Column(s) not found: %s" (String.concat ", " missing_cols)) else

          (* Cartesian product of unique values *)
          let rec cartesian lists =
            match lists with
            | [] -> [[]]
            | h :: t ->
                let t_prod = cartesian t in
                List.concat (List.map (fun elm -> List.map (fun t_line -> elm :: t_line) t_prod) h)
          in
          (* Each expand_input contributes one "slot": Single -> list of single-element lists;
             Nested -> list of multi-element lists (the existing combinations). *)
          let combo_lists = List.map (function
            | Single (_, vals) -> List.map (fun v -> [v]) vals
            | Nested (_, combos) -> combos
          ) expand_inputs in
          let combos = cartesian combo_lists |> List.map List.flatten in
          
          let combo_to_rows = Hashtbl.create orig_nrows in
          for i = 0 to orig_nrows - 1 do
             let current_combo = List.map (fun c -> get_val c i) id_cols in
             let current_list = match Hashtbl.find_opt combo_to_rows current_combo with Some l -> l | None -> [] in
             Hashtbl.replace combo_to_rows current_combo (i :: current_list)
          done;

          (* Build the output rows *)
          let out_row_indices = ref [] in
          let combo_for_out_row = ref [] in

          List.iter (fun combo ->
             match Hashtbl.find_opt combo_to_rows combo with
             | Some row_indices -> 
                 List.iter (fun r -> 
                   out_row_indices := (Some r) :: !out_row_indices; 
                   combo_for_out_row := combo :: !combo_for_out_row
                 ) (List.rev row_indices)
             | None -> 
                 out_row_indices := None :: !out_row_indices;
                 combo_for_out_row := combo :: !combo_for_out_row
          ) combos;

          let final_out_row_indices = List.rev !out_row_indices in
          let final_combos = List.rev !combo_for_out_row in
          let final_nrows = List.length final_out_row_indices in
          (* Convert to arrays for O(1) indexed access during column reconstruction *)
          let final_out_row_indices_arr = Array.of_list final_out_row_indices in
          let final_combos_arr = Array.of_list final_combos in

          (* Reconstruct columns *)
          let new_columns = List.map (fun col_name ->
            let is_id_col = List.mem col_name id_cols in
            let id_idx = if is_id_col then
               let rec find_idx lst curr = match lst with h::t -> if h = col_name then curr else find_idx t (curr + 1) | [] -> -1 in
               find_idx id_cols 0
             else -1
            in

            let col_data = match Arrow_table.get_column df.arrow_table col_name with
              | Some d -> d
              | None -> NAColumn orig_nrows
            in

            let new_col_data = 
               if is_id_col then
                  let extract_combo_val i = List.nth final_combos_arr.(i) id_idx in
                  match col_data with
                  | IntColumn _ -> IntColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VInt x -> Some x | _ -> None))
                  | FloatColumn _ -> FloatColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VFloat x -> Some x | _ -> None))
                  | StringColumn _ -> StringColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VString x -> Some x | _ -> None))
                  | BoolColumn _ -> BoolColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VBool x -> Some x | _ -> None))
                  | DateColumn _ -> DateColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VDate x -> Some x | _ -> None))
                  | DatetimeColumn (_, tz) -> DatetimeColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VDatetime (x, _) -> Some x | _ -> None), tz)
                  | NAColumn _ -> NAColumn final_nrows
                  | DictionaryColumn (_, levels, ordered) -> DictionaryColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VFactor (x, _, _) -> Some x | _ -> None), levels, ordered)
                  | ListColumn _ -> ListColumn (Array.init final_nrows (fun i -> match extract_combo_val i with VDataFrame df -> Some df.arrow_table | _ -> None))
               else
                  let fill_val = List.assoc_opt col_name fill_dict in
                  match col_data with
                  | IntColumn a -> 
                      let fill_i = match fill_val with Some (VInt i) -> Some i | Some (VFloat f) -> Some (int_of_float f) | _ -> None in
                      IntColumn (Array.init final_nrows (fun i -> 
                        match final_out_row_indices_arr.(i) with 
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_i else None)
                        | None -> fill_i))
                  | FloatColumn a -> 
                      let fill_f = match fill_val with Some (VFloat f) -> Some f | Some (VInt i) -> Some (float_of_int i) | _ -> None in
                      FloatColumn (Array.init final_nrows (fun i -> 
                        match final_out_row_indices_arr.(i) with 
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_f else None)
                        | None -> fill_f))
                  | StringColumn a -> 
                      let fill_s = match fill_val with Some (VString s) -> Some s | _ -> None in
                      StringColumn (Array.init final_nrows (fun i -> 
                        match final_out_row_indices_arr.(i) with 
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_s else None)
                        | None -> fill_s))
                  | BoolColumn a -> 
                      let fill_b = match fill_val with Some (VBool b) -> Some b | _ -> None in
                      BoolColumn (Array.init final_nrows (fun i -> 
                        match final_out_row_indices_arr.(i) with 
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_b else None)
                        | None -> fill_b))
                  | DateColumn a ->
                      let fill_d = match fill_val with Some (VDate d) -> Some d | _ -> None in
                      DateColumn (Array.init final_nrows (fun i ->
                        match final_out_row_indices_arr.(i) with
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_d else None)
                        | None -> fill_d))
                  | DatetimeColumn (a, tz) ->
                      let fill_dt = match fill_val with
                        | Some (VDatetime (ts, fill_tz)) when fill_tz = tz -> Some ts
                        | _ -> None
                      in
                      DatetimeColumn (Array.init final_nrows (fun i ->
                        match final_out_row_indices_arr.(i) with
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_dt else None)
                        | None -> fill_dt), tz)
                  | NAColumn _ -> NAColumn final_nrows
                  | DictionaryColumn (a, levels, ordered) ->
                      let fill_i = match fill_val with
                        | Some (VFactor (i, factor_levels, factor_ordered))
                          when factor_levels = levels && factor_ordered = ordered ->
                            Some i
                        | Some (VString s) -> Factors.level_index_of levels s
                        | _ -> None
                      in
                      DictionaryColumn (Array.init final_nrows (fun i -> 
                        match final_out_row_indices_arr.(i) with 
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_i else None)
                        | None -> fill_i), levels, ordered)
                  | ListColumn a ->
                      let fill_t = match fill_val with Some (VDataFrame df) -> Some df.arrow_table | _ -> None in
                      ListColumn (Array.init final_nrows (fun i -> 
                        match final_out_row_indices_arr.(i) with 
                        | Some r -> (match a.(r) with Some x -> Some x | None -> if explicit_val then fill_t else None)
                        | None -> fill_t))
            in
            (col_name, new_col_data)

          ) all_cols in

          let new_schema = List.map (fun (n, c) -> (n, Arrow_table.column_type_of c)) new_columns in
          VDataFrame { arrow_table = { schema = new_schema; columns = new_columns; nrows = final_nrows; native_handle = None } |> Arrow_table.materialize; group_keys = df.group_keys }
    ))
    env