import { Float32, makeData, Table, Vector } from "apache-arrow";
import {
  SailingArrowSchema,
  SailingArrowSchemaType,
} from "./SailingArrowSchema";
import { sailingTableTimeBounds } from "./sailingTableTimeBounds";
import { splitTableAroundTimes } from "./splitTableAtTime";

export const mergeSailingTables = (
  tables: Array<Table<SailingArrowSchemaType>> | Table<SailingArrowSchemaType>,
  table2?: Table<SailingArrowSchemaType>
): Table<SailingArrowSchemaType> => {
  if (Array.isArray(tables)) {
    if (table2 !== undefined) {
      throw new Error(
        `Only one argument expected when first argument is an array.`
      );
    }
    let merged = tables[0];
    for (let i = 1; i < tables.length; i++) {
      merged = mergeSailingTables(merged, tables[i]);
    }
    return merged;
  } else {
    if (table2 === undefined) {
      throw new Error(`Need two to tango.`);
    }
    let table1 = tables;

    const [begin1, end1] = sailingTableTimeBounds(table1);
    const [begin2, end2] = sailingTableTimeBounds(table2);

    if (end1 < begin2) {
      return mergeSequentialSailingTables(table1, table2);
    } else if (end2 < begin1) {
      return mergeSequentialSailingTables(table2, table1);
    } else {
      if (begin2 > begin1) {
        return mergeOverlappingSailingTables(table1, table2);
      } else {
        return mergeOverlappingSailingTables(table2, table1);
      }
    }
  }
};

const mergeOverlappingSailingTables = (
  table: Table<SailingArrowSchemaType>,
  overlapping: Table<SailingArrowSchemaType>
): Table<SailingArrowSchemaType> => {
  const [overlapStart, overlapEnd] = sailingTableTimeBounds(overlapping);

  const [before, after] = splitTableAroundTimes(
    table,
    overlapStart,
    overlapEnd
  );

  let merged = before
    ? mergeSequentialSailingTables(before, overlapping)
    : overlapping;
  merged = after ? mergeSequentialSailingTables(merged, after) : merged;
  return merged;
};

/**
 * Merge two table, the second table is appended to the first one.
 *
 * The timestamps of the second one must be greater than the first one.
 *
 * @param table1 One table that finishes before the other
 * @param table2
 * @returns
 */
const mergeSequentialSailingTables = (
  table1: Table<SailingArrowSchemaType>,
  table2: Table<SailingArrowSchemaType>
): Table<SailingArrowSchemaType> => {
  // Our new schema is the union of table1 and table2 schema.
  const newSchema = SailingArrowSchema.select([
    ...table1.schema.fields.map((f) => f.name),
    ...table2.schema.fields.map((f) => f.name),
  ] as Array<keyof SailingArrowSchemaType>);

  let table = new Table(newSchema);
  for (const field of newSchema.fields) {
    const name = field.name as keyof SailingArrowSchemaType;
    const column1 = table1.getChild(name);
    const column2 = table2.getChild(name);

    let column: typeof column1;
    if (column1 == null && column2 === null) {
      throw new Error(
        `Why are we trying to merge a column that does not exist in either table? ${name}`
      );
    }
    if (column1 && column2) {
      column = column1.concat(column2);
    } else if (column1) {
      column = column1;
    } else {
      if (column2 === null) {
        throw new Error(`assert(column2 should be defined here)`);
      }
      // Inspired by https://github.com/apache/arrow/blob/master/js/src/table.ts#L295
      const emptyPadding = new Vector([
        makeData({
          type: column2.type,
          length: 0,
          nullCount: 0,
        })._changeLengthAndBackfillNullBitmap(table1.numRows),
      ]);
      column = emptyPadding.concat(column2 as Vector<Float32>);
    }
    if (column) {
      table = table.setChild(name, column);
    }
  }

  return table;
};
