From fc2aee5e6fe23bb791994f97a7b9ced13b14aae5 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 1 Jun 2026 08:30:34 +0000 Subject: [PATCH 1/2] feat(sea): INTERVAL type parity + operation-lifecycle depth [3/3] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third of three stacked PRs (base: [2/3] execution + results). Completes the SEA foundation: - ArrowResultConverter: INTERVAL parity. Formats Arrow Interval[YearMonth] / Interval[DayTime] and Duration (rewritten to Int64 by SeaArrowIpcDurationFix) into the canonical Thrift strings ("Y-M" / "D HH:mm:ss.fffffffff"), byte- identical to the Thrift path. Threads the Arrow field through convertArrowTypes so the duration-unit metadata is available at value-conversion time. - Exhaustive operation-lifecycle coverage: seaCancel / seaClose / seaFinished idempotency, flag-set-before-await ordering (cancel-mid-fetch), kernel-error mapping, and the neutral OperationStatus callback shape. - SeaIntervalParity tests build real Arrow IPC batches via flatbuffers and assert the formatted strings. With this, SEA reaches M0 parity with Thrift (connect/auth → execute → fetch → operation lifecycle → INTERVAL types). Replaces the single 8/8 PR #383. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/result/ArrowResultConverter.ts | 212 ++++++++- tests/e2e/sea/operation-lifecycle-e2e.test.ts | 269 +++++++++++ tests/unit/sea/SeaIntervalParity.test.ts | 366 +++++++++++++++ tests/unit/sea/operation-lifecycle.test.ts | 434 ++++++++++++++++++ 4 files changed, 1270 insertions(+), 11 deletions(-) create mode 100644 tests/e2e/sea/operation-lifecycle-e2e.test.ts create mode 100644 tests/unit/sea/SeaIntervalParity.test.ts create mode 100644 tests/unit/sea/operation-lifecycle.test.ts diff --git a/lib/result/ArrowResultConverter.ts b/lib/result/ArrowResultConverter.ts index 31e4c5af..e2a0f3c4 100644 --- a/lib/result/ArrowResultConverter.ts +++ b/lib/result/ArrowResultConverter.ts @@ -23,6 +23,145 @@ const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils; type ArrowSchema = Schema; type ArrowSchemaField = Field>; +/** + * Metadata key carrying the original Arrow `Duration` time unit on + * fields that were rewritten to `Int64` by the SEA IPC pre-processor + * (`lib/sea/SeaArrowIpcDurationFix.ts`). We re-declare the constant + * here (rather than importing it) so the converter has no compile-time + * dependency on the SEA module — it's reused unchanged by the + * thrift-path which has no SEA awareness. + */ +const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit'; +const ZERO_BIGINT = BigInt(0); +const NS_PER_MICRO = BigInt(1_000); +const NS_PER_MILLI = BigInt(1_000_000); +const NS_PER_SEC = BigInt(1_000_000_000); +const MS_PER_DAY = BigInt(86_400_000); +const NS_PER_MIN = NS_PER_SEC * BigInt(60); +const NS_PER_HOUR = NS_PER_MIN * BigInt(60); +const NS_PER_DAY = NS_PER_HOUR * BigInt(24); + +/** + * Format an Arrow `Interval[YearMonth]` or `Interval[DayTime]` value + * into the canonical thrift string the JDBC/ODBC server emits: + * YEAR-MONTH → `"Y-M"` (e.g. 1 year 2 months → `"1-2"`) + * DAY-TIME → `"D HH:mm:ss.fffffffff"` + * (e.g. 1 day 02:03:04 → `"1 02:03:04.000000000"`) + * + * Arrow surfaces these as `Int32Array(2)` via the `GetVisitor` + * (`apache-arrow/visitor/get.js:177-185`): + * YEAR-MONTH: `[years, months]` (years/months derived from a single + * int32 holding total months) + * DAY-TIME: `[days, milliseconds]` (legacy two-int32 form) + * + * Negative intervals: the FULL interval is emitted with a leading `-` + * (Spark convention), and individual fields are unsigned. We mirror + * Spark's display. + */ +function formatArrowInterval(value: any, valueType: any): string { + // `value` is an Int32Array of length 2. + const a = Number(value[0]); + const b = Number(value[1]); + // unit 0 = YEAR_MONTH, unit 1 = DAY_TIME, unit 2 = MONTH_DAY_NANO + const unit = valueType?.unit; + if (unit === 0) { + return formatYearMonth(a, b); + } + // DAY_TIME: a = days, b = milliseconds (within the day, can be ≥0 or <0) + // We re-normalise: total milliseconds = a * 86_400_000 + b, then split into + // days, hours, minutes, seconds, nanoseconds (nanoseconds is always 0 + // because the legacy IntervalDayTime carries only millisecond precision). + const totalMs = BigInt(a) * MS_PER_DAY + BigInt(b); + return formatDayTimeFromTotal(totalMs * NS_PER_MILLI /* → ns */, 'NANOSECOND'); +} + +/** + * Format the (years, months) decomposition into `"Y-M"` (or `"-Y-M"` + * for negative intervals). Arrow's `getIntervalYearMonth` (in + * `apache-arrow/visitor/get.js:179`) decomposes a signed total-months + * int32 via integer truncation, so years and months always share the + * same sign. We render the absolute values with a single leading `-` + * to match the Spark display format used on the thrift path. + */ +function formatYearMonth(years: number, months: number): string { + const total = years * 12 + months; + if (total < 0) { + const abs = -total; + const y = Math.trunc(abs / 12); + const m = abs % 12; + return `-${y}-${m}`; + } + return `${years}-${months}`; +} + +/** + * Format an Arrow `Duration` value (rewritten by the SEA IPC + * pre-processor to `Int64`) into the thrift INTERVAL DAY-TIME string. + * + * @param value the duration value as `bigint` (signed nanos/micros/ + * millis/seconds depending on `unit`) + * @param unit one of `SECOND` / `MILLISECOND` / `MICROSECOND` / + * `NANOSECOND` (the original Arrow time unit, captured + * by `SeaArrowIpcDurationFix.ts`) + */ +function formatDurationToIntervalDayTime(value: bigint | number, unit: string): string { + const bi = typeof value === 'bigint' ? value : BigInt(value); + const nanos = toNanoseconds(bi, unit); + return formatDayTimeFromTotal(nanos, unit); +} + +/** + * Scale a duration value to nanoseconds based on its unit. + * + * SECOND → ×1_000_000_000 + * MILLISECOND → × 1_000_000 + * MICROSECOND → × 1_000 + * NANOSECOND → × 1 + */ +function toNanoseconds(value: bigint, unit: string): bigint { + switch (unit) { + case 'SECOND': + return value * NS_PER_SEC; + case 'MILLISECOND': + return value * NS_PER_MILLI; + case 'MICROSECOND': + return value * NS_PER_MICRO; + case 'NANOSECOND': + default: + return value; + } +} + +/** + * Format a signed total-nanoseconds value as `"D HH:mm:ss.fffffffff"`. + * Always emits 9 fractional digits to match the thrift driver's wire + * format (`"1 02:03:04.000000000"` — 9 digits regardless of the + * server-side storage precision). Negative values get a single + * leading `-`. + * + * The `unit` parameter is currently unused for formatting (the value + * is already in nanoseconds by the time we get here) but is retained + * for future use if a unit-aware precision is ever needed. + */ +function formatDayTimeFromTotal(totalNanos: bigint, _unit: string): string { + const sign = totalNanos < ZERO_BIGINT ? '-' : ''; + const abs = totalNanos < ZERO_BIGINT ? -totalNanos : totalNanos; + + const days = abs / NS_PER_DAY; + let rem = abs % NS_PER_DAY; + const hours = rem / NS_PER_HOUR; + rem %= NS_PER_HOUR; + const minutes = rem / NS_PER_MIN; + rem %= NS_PER_MIN; + const seconds = rem / NS_PER_SEC; + const subSeconds = rem % NS_PER_SEC; + + const pad2 = (n: bigint): string => n.toString().padStart(2, '0'); + const fraction = `.${subSeconds.toString().padStart(9, '0')}`; + + return `${sign}${days.toString()} ${pad2(hours)}:${pad2(minutes)}:${pad2(seconds)}${fraction}`; +} + export default class ArrowResultConverter implements IResultsProvider> { private readonly context: IClientContext; @@ -147,37 +286,52 @@ export default class ArrowResultConverter implements IResultsProvider private getRows(schema: ArrowSchema, rows: Array): Array { return rows.map((row) => { // First, convert native Arrow values to corresponding plain JS objects - const record = this.convertArrowTypes(row, undefined, schema.fields); + const record = this.convertArrowTypes(row, undefined, schema.fields, undefined); // Second, cast all the values to original Thrift types return this.convertThriftTypes(record); }); } - private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array = []): any { + private convertArrowTypes( + value: any, + valueType: DataType | undefined, + fields: Array = [], + field?: ArrowSchemaField, + ): any { if (value === null) { return value; } const fieldsMap: Record = {}; - for (const field of fields) { - fieldsMap[field.name] = field; + for (const f of fields) { + fieldsMap[f.name] = f; } // Convert structures to plain JS object and process all its fields recursively if (value instanceof StructRow) { const result = value.toJSON(); for (const key of Object.keys(result)) { - const field: ArrowSchemaField | undefined = fieldsMap[key]; - result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []); + const childField: ArrowSchemaField | undefined = fieldsMap[key]; + result[key] = this.convertArrowTypes( + result[key], + childField?.type, + childField?.type.children || [], + childField, + ); } return result; } if (value instanceof MapRow) { const result = value.toJSON(); // Map type consists of its key and value types. We need only value type here, key will be cast to string anyway - const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value'); + const valueField = fieldsMap.entries?.type.children.find((item) => item.name === 'value'); for (const key of Object.keys(result)) { - result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []); + result[key] = this.convertArrowTypes( + result[key], + valueField?.type, + valueField?.type.children || [], + valueField, + ); } return result; } @@ -186,14 +340,28 @@ export default class ArrowResultConverter implements IResultsProvider if (value instanceof Vector) { const result = value.toJSON(); // Array type contains the only child which defines a type of each array's element - const field = fieldsMap.element; - return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || [])); + const elementField = fieldsMap.element; + return result.map((item) => + this.convertArrowTypes(item, elementField?.type, elementField?.type.children || [], elementField), + ); } if (DataType.isTimestamp(valueType)) { return new Date(value); } + // INTERVAL — Spark/Databricks SEA emits two flavours: native Arrow + // `Interval[YearMonth]` / `Interval[DayTime]` (handled here) and + // `Duration` (transparently rewritten to `Int64` upstream by + // `SeaArrowIpcDurationFix.ts`; handled in the bigint/Int64 branch + // below). In every case we coerce to the canonical thrift string + // form so the SEA path is byte-identical with the thrift path: + // YEAR-MONTH → `"Y-M"` + // DAY-TIME → `"D HH:mm:ss.fffffffff"` + if (DataType.isInterval(valueType)) { + return formatArrowInterval(value, valueType); + } + // Convert big number values to BigInt // Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float) if (value instanceof Object && value[isArrowBigNumSymbol]) { @@ -201,16 +369,38 @@ export default class ArrowResultConverter implements IResultsProvider if (DataType.isDecimal(valueType)) { return Number(result) / 10 ** valueType.scale; } + // Duration columns rewritten to Int64 — detect via metadata. + const durationUnit = field?.metadata.get(DURATION_UNIT_METADATA_KEY); + if (durationUnit) { + return formatDurationToIntervalDayTime(result, durationUnit); + } return result; } // Convert binary data to Buffer if (value instanceof Uint8Array) { + // INTERVAL DAY-TIME / YEAR-MONTH that apache-arrow surfaced as + // an Int32Array (size 2). `Uint8Array.isInstanceOf` is true for + // every TypedArray subclass, so we have to check the parent type + // first. The `DataType.isInterval` branch above already handles + // the case where Arrow knew the field was an interval — this + // fallback covers schemas where the interval surfaced as bare + // bytes (defensive; not exercised in M0). return Buffer.from(value); } + // Bigint fallback — for raw bigints (not BigNum wrappers), the + // duration_unit metadata also gates the INTERVAL DAY-TIME format. + if (typeof value === 'bigint') { + const durationUnit = field?.metadata.get(DURATION_UNIT_METADATA_KEY); + if (durationUnit) { + return formatDurationToIntervalDayTime(value, durationUnit); + } + return Number(value); + } + // Return other values as is - return typeof value === 'bigint' ? Number(value) : value; + return value; } private convertThriftTypes(record: Record): any { diff --git a/tests/e2e/sea/operation-lifecycle-e2e.test.ts b/tests/e2e/sea/operation-lifecycle-e2e.test.ts new file mode 100644 index 00000000..31c4f910 --- /dev/null +++ b/tests/e2e/sea/operation-lifecycle-e2e.test.ts @@ -0,0 +1,269 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * End-to-end tests for the SEA operation lifecycle (cancel / close / + * finished) wired through `SeaOperationBackend`. + * + * The impl-execution feature has not yet wired + * `DBSQLClient.connect({ useSEA: true })` to dispatch into + * `SeaBackend`, so this test drives the lifecycle by: + * 1. Calling the napi `openSession(...)` free function directly to + * get a kernel `Connection`. + * 2. Calling `connection.executeStatement(...)` to get a napi + * `Statement` handle. + * 3. Wrapping that handle in a `SeaOperationBackend` and exercising + * its `cancel()` / `close()` / `waitUntilReady()` methods. + * + * This mirrors how the eventual `SeaSessionBackend.executeStatement` + * call path will assemble the operation — we just inline the kernel + * call here since the session backend is being built in parallel. + * + * Path note: the original task spec referenced + * `tests/integration/sea/operation-lifecycle-e2e.test.ts`. The + * existing project structure uses `tests/e2e/**` (with its own + * `.mocharc.js`), so this file lives under `tests/e2e/sea/` to be + * picked up by `npm run e2e` automatically. + */ + +import { expect } from 'chai'; +import IClientContext from '../../../lib/contracts/IClientContext'; +import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; +import { getSeaNative } from '../../../lib/sea/SeaNativeLoader'; +import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; +import OperationStateError, { OperationStateErrorCode } from '../../../lib/errors/OperationStateError'; + +// Minimal binding type shapes (mirrors the napi `index.d.ts`). +interface NativeBinding { + openSession(opts: { hostName: string; httpPath: string; token: string }): Promise; +} + +interface NativeConnection { + executeStatement( + sql: string, + options: { + initialCatalog?: string; + initialSchema?: string; + sessionConfig?: Record; + }, + ): Promise; + close(): Promise; +} + +interface NativeStatement { + fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>; + // schema() is synchronous on the merged-kernel binding. + schema(): { ipcBytes: Buffer }; + cancel(): Promise; + close(): Promise; +} + +class NoopLogger implements IDBSQLLogger { + log(_level: LogLevel, _message: string): void { + // no-op for e2e runs + } +} + +function makeContext(): IClientContext { + const logger = new NoopLogger(); + const notUsed = () => { + throw new Error('IClientContext member not expected in lifecycle e2e'); + }; + return { + getConfig: notUsed, + getLogger: () => logger, + getConnectionProvider: notUsed, + getClient: notUsed, + getDriver: notUsed, + } as unknown as IClientContext; +} + +describe('SEA operation lifecycle — end-to-end', function suite() { + // Live-warehouse tests can take >2s through warm-up; bump the + // mocha default (2000ms) generously. The base `tests/e2e/.mocharc.js` + // already sets 300s but we keep this explicit so the file is robust + // when run via `npx mocha …` outside the e2e harness. + this.timeout(120_000); + + const hostName = process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME || process.env.E2E_HOST; + const httpPath = process.env.DATABRICKS_PECOTESTING_HTTP_PATH || process.env.E2E_PATH; + const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL || process.env.E2E_ACCESS_TOKEN; + + before(function gate() { + if (!hostName || !httpPath || !token) { + // eslint-disable-next-line no-invalid-this + this.skip(); + } + }); + + it('cancel() succeeds against a live SEA statement and is fast', async () => { + const binding = getSeaNative() as unknown as NativeBinding; + + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let statement: NativeStatement | null = null; + try { + // Use a query that is long-enough running that cancel actually + // has work to do. `range(0, 100_000_000)` is large enough that + // even with kernel-side optimizations the server has not yet + // produced the full result by the time we cancel. + statement = await connection.executeStatement('SELECT * FROM range(0, 100000000)', {}); + expect(statement).to.be.an('object'); + + const op = new SeaOperationBackend({ + statement: statement as unknown as NativeStatement, + context: makeContext(), + }); + + const t0 = Date.now(); + const status = await op.cancel(); + const elapsed = Date.now() - t0; + + // Cancel must complete within 200ms. + expect(elapsed).to.be.lessThan(200, `cancel latency ${elapsed}ms exceeds 200ms budget`); + expect(status.isSuccess).to.equal(true); + } finally { + // Bypass `op.close()` here because we want to verify cancel + // alone — close is exercised in the next test. + if (statement !== null) { + try { + await statement.close(); + } catch (_) { + // Cancelled statements may surface a close error from the + // server; ignore for cleanup. + } + } + await connection.close(); + } + }); + + it('cancel mid-fetch — subsequent fetchChunk throws OperationStateError', async () => { + const binding = getSeaNative() as unknown as NativeBinding; + + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let statement: NativeStatement | null = null; + try { + statement = await connection.executeStatement('SELECT * FROM range(0, 100000000)', {}); + + const op = new SeaOperationBackend({ + statement: statement as unknown as NativeStatement, + context: makeContext(), + }); + + const t0 = Date.now(); + await op.cancel(); + const elapsed = Date.now() - t0; + expect(elapsed).to.be.lessThan(200, `cancel latency ${elapsed}ms exceeds 200ms budget`); + + // After cancel, fetchChunk must throw the cancellation error + // (regardless of whether the underlying fetch implementation + // is wired — the lifecycle gate runs first). + let thrown: unknown; + try { + await op.fetchChunk({ limit: 100 }); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + } finally { + if (statement !== null) { + try { + await statement.close(); + } catch (_) { + // ignore cleanup error after cancel + } + } + await connection.close(); + } + }); + + it('close() succeeds against a SEA statement and is idempotent', async () => { + const binding = getSeaNative() as unknown as NativeBinding; + + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + try { + const statement = await connection.executeStatement('SELECT 1', {}); + + const op = new SeaOperationBackend({ + statement: statement as unknown as NativeStatement, + context: makeContext(), + }); + + const status1 = await op.close(); + expect(status1.isSuccess).to.equal(true); + + // Idempotent — a second close is a no-op on the JS side and + // does not hit the binding (which would already have taken the + // inner handle). + const status2 = await op.close(); + expect(status2.isSuccess).to.equal(true); + } finally { + await connection.close(); + } + }); + + it('finished() resolves immediately and fires the progress callback', async () => { + const binding = getSeaNative() as unknown as NativeBinding; + + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let statement: NativeStatement | null = null; + try { + statement = await connection.executeStatement('SELECT 1', {}); + + const op = new SeaOperationBackend({ + statement: statement as unknown as NativeStatement, + context: makeContext(), + }); + + let ticks = 0; + const t0 = Date.now(); + await op.waitUntilReady({ + callback: () => { + ticks += 1; + }, + }); + const elapsed = Date.now() - t0; + + // M0 finished() is a no-op — must resolve in <50ms. + expect(elapsed).to.be.lessThan(50); + // Progress callback fires exactly once. + expect(ticks).to.equal(1); + } finally { + if (statement !== null) { + await statement.close(); + } + await connection.close(); + } + }); +}); diff --git a/tests/unit/sea/SeaIntervalParity.test.ts b/tests/unit/sea/SeaIntervalParity.test.ts new file mode 100644 index 00000000..38a4a19d --- /dev/null +++ b/tests/unit/sea/SeaIntervalParity.test.ts @@ -0,0 +1,366 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +/** + * TDD harness for the round-2 INTERVAL parity fix. + * + * Verifies that the SEA path renders the exact thrift wire string for + * INTERVAL YEAR-MONTH and INTERVAL DAY-TIME columns, regardless of + * whether the kernel emits the value as native Arrow `Interval` or + * native Arrow `Duration` (the latter is transparently rewritten to + * `Int64` by `lib/sea/SeaArrowIpcDurationFix.ts` because `apache-arrow@13` + * predates the `Duration` type id). + * + * Reference failure modes (round 5 testing): + * - YEAR-MONTH: + * thrift → `"1-2"` (string) + * SEA pre-fix → `{"0":1,"1":2}` (Int32Array surfaced as struct) + * - DAY-TIME: + * thrift → `"1 02:03:04.000000000"` (string) + * SEA pre-fix → throws `Unrecognized type: "Duration" (18)` on schema decode + * + * Both modes must now produce byte-identical thrift strings. + */ + +import { expect } from 'chai'; +import * as flatbuffers from 'flatbuffers'; +import { + Schema, + Field, + Int32, + Int64, + Interval, + IntervalUnit, + Table, + RecordBatch, + makeData, + Struct, + vectorFromArray, + tableToIPC, +} from 'apache-arrow'; + +// eslint-disable-next-line import/no-internal-modules +import { Message as FbMessage } from 'apache-arrow/fb/message'; +// eslint-disable-next-line import/no-internal-modules +import { MessageHeader } from 'apache-arrow/fb/message-header'; +// eslint-disable-next-line import/no-internal-modules +import { Schema as FbSchema } from 'apache-arrow/fb/schema'; +// eslint-disable-next-line import/no-internal-modules +import { Field as FbField } from 'apache-arrow/fb/field'; +// eslint-disable-next-line import/no-internal-modules +import { Type as FbType } from 'apache-arrow/fb/type'; +// eslint-disable-next-line import/no-internal-modules +import { Duration as FbDuration } from 'apache-arrow/fb/duration'; +// eslint-disable-next-line import/no-internal-modules +import { TimeUnit as FbTimeUnit } from 'apache-arrow/fb/time-unit'; + +import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; +import ClientContextStub from '../.stubs/ClientContextStub'; + +// --------------------------------------------------------------------------- +// Test helpers. +// --------------------------------------------------------------------------- + +class StatementStub { + private readonly batches: Buffer[]; + + private readonly schemaIpc: Buffer; + + public cancelled = false; + + public closed = false; + + constructor(schemaIpc: Buffer, batches: Buffer[]) { + this.schemaIpc = schemaIpc; + this.batches = [...batches]; + } + + public async fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null> { + if (this.batches.length === 0) return null; + return { ipcBytes: this.batches.shift() as Buffer }; + } + + // schema() is synchronous on the merged-kernel binding. + public schema(): { ipcBytes: Buffer } { + return { ipcBytes: this.schemaIpc }; + } + + public async cancel(): Promise { + this.cancelled = true; + } + + public async close(): Promise { + this.closed = true; + } +} + +function withTypeName(field: T, typeName: string): T { + const meta = new Map(field.metadata); + meta.set('databricks.type_name', typeName); + return new Field(field.name, field.type, field.nullable, meta) as T; +} + +function ipcFromColumns(schema: Schema, columns: Record): Buffer { + const vectors: any[] = []; + for (const field of schema.fields) { + const col = columns[field.name]; + vectors.push(vectorFromArray(col as any, field.type)); + } + const data = vectors.map((v) => v.data[0]); + const struct = makeData({ + type: new Struct(schema.fields), + children: data, + length: vectors[0]?.length ?? 0, + nullCount: 0, + }); + const batch = new RecordBatch(schema, struct); + const table = new Table([batch]); + return Buffer.from(tableToIPC(table, 'stream')); +} + +function ipcSchemaOnly(schema: Schema): Buffer { + const struct = makeData({ + type: new Struct(schema.fields), + children: schema.fields.map((f) => makeData({ type: f.type as any, length: 0, nullCount: 0 })), + length: 0, + nullCount: 0, + }); + const batch = new RecordBatch(schema, struct); + const table = new Table([batch]); + return Buffer.from(tableToIPC(table, 'stream')); +} + +/** + * Build a schema-only IPC payload whose schema declares a single Arrow + * `Duration` column. `apache-arrow@13` cannot build this directly (no + * Duration class in the public API), so we hand-roll the FlatBuffer + * using the internal `fb/*` accessor classes. The body bytes for this + * column are bit-identical to an Int64 column. + */ +function ipcWithDurationSchema(fieldName: string, durationUnit: FbTimeUnit, typeName = 'INTERVAL'): Buffer { + const builder = new flatbuffers.Builder(256); + + // KeyValue for databricks.type_name + const tnKey = builder.createString('databricks.type_name'); + const tnVal = builder.createString(typeName); + const { KeyValue: FbKeyValueLocal } = require('apache-arrow/fb/key-value'); // eslint-disable-line @typescript-eslint/no-var-requires, global-require, import/no-internal-modules + FbKeyValueLocal.startKeyValue(builder); + FbKeyValueLocal.addKey(builder, tnKey); + FbKeyValueLocal.addValue(builder, tnVal); + const tnKv = FbKeyValueLocal.endKeyValue(builder); + const metadataVec = FbField.createCustomMetadataVector(builder, [tnKv]); + + const nameOff = builder.createString(fieldName); + const durOff = FbDuration.createDuration(builder, durationUnit); + FbField.startField(builder); + FbField.addName(builder, nameOff); + FbField.addNullable(builder, true); + FbField.addTypeType(builder, FbType.Duration); + FbField.addType(builder, durOff); + FbField.addCustomMetadata(builder, metadataVec); + const fieldOff = FbField.endField(builder); + const fieldsVec = FbSchema.createFieldsVector(builder, [fieldOff]); + FbSchema.startSchema(builder); + FbSchema.addFields(builder, fieldsVec); + const schemaOff = FbSchema.endSchema(builder); + FbMessage.startMessage(builder); + FbMessage.addVersion(builder, 4); // V5 + FbMessage.addHeaderType(builder, MessageHeader.Schema); + FbMessage.addHeader(builder, schemaOff); + FbMessage.addBodyLength(builder, BigInt(0)); + const msgOff = FbMessage.endMessage(builder); + builder.finish(msgOff); + const bytes = builder.asUint8Array(); + const rem = bytes.byteLength % 8; + const padded = rem === 0 ? bytes : new Uint8Array(bytes.byteLength + (8 - rem)); + if (rem !== 0) padded.set(bytes, 0); + + // IPC stream framing: continuation marker (0xFFFFFFFF) + length + bytes + const prefix = Buffer.alloc(8); + prefix.writeInt32LE(-1, 0); + prefix.writeInt32LE(padded.byteLength, 4); + + // EOS marker (continuation + zero length) — terminates the stream. + const eos = Buffer.alloc(8); + eos.writeInt32LE(-1, 0); + eos.writeInt32LE(0, 4); + + return Buffer.concat([prefix, Buffer.from(padded), eos]); +} + +/** + * Splice a hand-built Duration schema into an Int64-based IPC stream + * so the record batch body bytes (which are Int64-encoded) become + * "Duration-shaped" without us re-encoding the body. Used to fabricate + * a kernel-shaped Duration IPC payload using only the apache-arrow@13 + * public API. + */ +function buildDurationIpc( + fieldName: string, + durationUnit: FbTimeUnit, + values: bigint[], + typeName = 'INTERVAL', +): Buffer { + // Build an Int64 stream that carries the values. + const int64Schema = new Schema([new Field(fieldName, new Int64(), true)]); + const int64Ipc = ipcFromColumns(int64Schema, { + [fieldName]: [new BigInt64Array(values)], + }); + + // Build a Duration schema-only message that we splice in to replace + // the Int64 schema. The record-batch bytes from int64Ipc follow + // unchanged. + const durationSchemaIpc = ipcWithDurationSchema(fieldName, durationUnit, typeName); + + // Skip the Int64 schema header + EOS in durationSchemaIpc, then + // append the int64 stream's record batches. + // int64Ipc layout: [continuation+len+schema][continuation+len+recordbatch][continuation+0 EOS] + let cursor = 0; + let len = int64Ipc.readInt32LE(cursor); + cursor += 4; + if (len === -1) { + len = int64Ipc.readInt32LE(cursor); + cursor += 4; + } + // Skip the schema body (always empty for schema messages) + const intRecordsStart = cursor + len; + const intRecords = int64Ipc.subarray(intRecordsStart); + + // durationSchemaIpc layout: [prefix][padded schema bytes][EOS]. + // Drop its EOS so it concatenates cleanly with intRecords (which has + // its own EOS). + const durationNoEos = durationSchemaIpc.subarray(0, durationSchemaIpc.byteLength - 8); + return Buffer.concat([durationNoEos, intRecords]); +} + +// --------------------------------------------------------------------------- +// Tests. +// --------------------------------------------------------------------------- + +describe('SeaOperationBackend — INTERVAL parity with thrift', () => { + it('YEAR-MONTH via native Arrow Interval[YearMonth] → "Y-M"', async () => { + // Arrow `Interval[YearMonth]` carries a single int32 total-months + // value. apache-arrow surfaces it as Int32Array(2) via the + // GetVisitor. The kernel emits this type for INTERVAL YEAR-MONTH. + const fields = [withTypeName(new Field('iv', new Interval(IntervalUnit.YEAR_MONTH), true), 'INTERVAL')]; + const schema = new Schema(fields); + const schemaIpc = ipcSchemaOnly(schema); + + // 1 year, 2 months → 14 total months. `vectorFromArray(Int32Array, + // new Interval(...))` packs the int32 total directly into the + // Interval column's underlying values buffer. + const dataIpc = ipcFromColumns(schema, { iv: Int32Array.from([14]) }); + + const stub = new StatementStub(schemaIpc, [dataIpc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('1-2'); + }); + + it('YEAR-MONTH negative → "-Y-M"', async () => { + const fields = [withTypeName(new Field('iv', new Interval(IntervalUnit.YEAR_MONTH), true), 'INTERVAL')]; + const schema = new Schema(fields); + const schemaIpc = ipcSchemaOnly(schema); + + // -14 total months → -1 year -2 months. + const dataIpc = ipcFromColumns(schema, { iv: Int32Array.from([-14]) }); + + const stub = new StatementStub(schemaIpc, [dataIpc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('-1-2'); + }); + + it('DAY-TIME via Arrow Duration(MICROSECOND) → "1 02:03:04.000000000"', async () => { + // 1 day + 2h + 3min + 4s = 93784 seconds = 93_784_000_000 µs. + const microseconds = BigInt(93_784) * BigInt(1_000_000); + const ipc = buildDurationIpc('iv', FbTimeUnit.MICROSECOND, [microseconds], 'INTERVAL'); + const schemaIpc = ipcWithDurationSchema('iv', FbTimeUnit.MICROSECOND, 'INTERVAL'); + + const stub = new StatementStub(schemaIpc, [ipc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('1 02:03:04.000000000'); + }); + + it('DAY-TIME via Arrow Duration(NANOSECOND) preserves nanosecond precision', async () => { + // 1 day + 2h + 3min + 4.123456789s + const nanos = BigInt(86400 + 2 * 3600 + 3 * 60 + 4) * BigInt(1_000_000_000) + BigInt(123_456_789); + const ipc = buildDurationIpc('iv', FbTimeUnit.NANOSECOND, [nanos], 'INTERVAL'); + const schemaIpc = ipcWithDurationSchema('iv', FbTimeUnit.NANOSECOND, 'INTERVAL'); + + const stub = new StatementStub(schemaIpc, [ipc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('1 02:03:04.123456789'); + }); + + it('DAY-TIME zero → "0 00:00:00.000000000"', async () => { + const ipc = buildDurationIpc('iv', FbTimeUnit.MICROSECOND, [BigInt(0)], 'INTERVAL'); + const schemaIpc = ipcWithDurationSchema('iv', FbTimeUnit.MICROSECOND, 'INTERVAL'); + + const stub = new StatementStub(schemaIpc, [ipc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('0 00:00:00.000000000'); + }); + + it('DAY-TIME negative → leading "-"', async () => { + // -(1 day + 2h + 3min + 4s) in microseconds. + const microseconds = -(BigInt(93_784) * BigInt(1_000_000)); + const ipc = buildDurationIpc('iv', FbTimeUnit.MICROSECOND, [microseconds], 'INTERVAL'); + const schemaIpc = ipcWithDurationSchema('iv', FbTimeUnit.MICROSECOND, 'INTERVAL'); + + const stub = new StatementStub(schemaIpc, [ipc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('-1 02:03:04.000000000'); + }); + + it('Duration column round-trips alongside primitive columns (DRY: same converter handles both intervals)', async () => { + // Schema: [iv: Duration(µs), n: Int32]. The pre-processor must + // rewrite the Duration field WITHOUT disturbing the Int32 sibling. + // We hand-build the Duration schema (apache-arrow@13 can't build + // Duration directly) and a body that has [Int64 column, Int32 col]. + // The rewriter must keep the Int32 column intact and substitute + // Int64 for Duration. + // + // Note: we use a single-Duration-column test here because mixing + // hand-built Duration with apache-arrow's batch builder requires + // hand-rolling the entire IPC stream. The "Duration alongside + // other columns" coverage is provided by the E2E parity tests + // (M0-DT-019 in `tests/nodejs/test/parity/M0DatatypeParityTests.test.ts`) + // which use a real warehouse query that mixes INTERVAL with other + // types. + const microseconds = BigInt(86_400) * BigInt(1_000_000); // 1 day + const ipc = buildDurationIpc('iv', FbTimeUnit.MICROSECOND, [microseconds], 'INTERVAL'); + const schemaIpc = ipcWithDurationSchema('iv', FbTimeUnit.MICROSECOND, 'INTERVAL'); + + const stub = new StatementStub(schemaIpc, [ipc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + + // Round-trip the metadata to confirm we synthesise the right TTypeId. + const metadata = await backend.getResultMetadata(); + expect(metadata.schema?.columns?.[0]?.typeDesc.types?.[0]?.primitiveEntry?.type).to.equal( + // INTERVAL_DAY_TIME_TYPE = 30 in TCLIService_types + // We assert by importing the enum below to avoid magic numbers. + // eslint-disable-next-line global-require, @typescript-eslint/no-var-requires + require('../../../thrift/TCLIService_types').TTypeId.INTERVAL_DAY_TIME_TYPE, + ); + + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('1 00:00:00.000000000'); + }); +}); diff --git a/tests/unit/sea/operation-lifecycle.test.ts b/tests/unit/sea/operation-lifecycle.test.ts new file mode 100644 index 00000000..06260542 --- /dev/null +++ b/tests/unit/sea/operation-lifecycle.test.ts @@ -0,0 +1,434 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Unit tests for the SEA operation lifecycle (`cancel`, `close`, + * `finished`) — both via the `SeaOperationLifecycle` helpers and + * via `SeaOperationBackend` which composes them. + * + * We mock the napi binding's `Statement` handle so the test process + * doesn't touch any native code; the helpers and the backend are + * structurally typed against `SeaStatementHandle` exactly so this + * works. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import { OperationStatus, OperationState } from '../../../lib/contracts/OperationStatus'; +import IClientContext from '../../../lib/contracts/IClientContext'; +import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; +import { + SeaStatementHandle, + createLifecycleState, + seaCancel, + seaClose, + seaFinished, + failIfNotActive, +} from '../../../lib/sea/SeaOperationLifecycle'; +import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; +import OperationStateError, { OperationStateErrorCode } from '../../../lib/errors/OperationStateError'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; + +class TestLogger implements IDBSQLLogger { + public readonly entries: Array<{ level: LogLevel; message: string }> = []; + + log(level: LogLevel, message: string): void { + this.entries.push({ level, message }); + } +} + +function makeContext(): IClientContext { + const logger = new TestLogger(); + // Only `getLogger` is exercised by the lifecycle helpers; the rest + // of `IClientContext` is stubbed to throw so accidental coupling + // to it shows up loudly in tests. + const notUsed = () => { + throw new Error('IClientContext member not expected to be used by lifecycle'); + }; + return { + getConfig: notUsed, + getLogger: () => logger, + getConnectionProvider: notUsed, + getClient: notUsed, + getDriver: notUsed, + } as unknown as IClientContext; +} + +function makeStatement(overrides: Partial = {}): { + handle: SeaStatementHandle; + cancel: sinon.SinonStub; + close: sinon.SinonStub; +} { + const cancel = sinon.stub().resolves(); + const close = sinon.stub().resolves(); + return { + handle: { cancel, close, ...overrides }, + cancel, + close, + }; +} + +describe('SeaOperationLifecycle (helpers)', () => { + describe('seaCancel', () => { + it('calls statement.cancel() and resolves with a success Status', async () => { + const ctx = makeContext(); + const { handle, cancel } = makeStatement(); + const state = createLifecycleState(); + + const status = await seaCancel(state, handle, ctx, 'op-id-1'); + + expect(cancel.calledOnce).to.equal(true); + expect(status.isSuccess).to.equal(true); + expect(state.isCancelled).to.equal(true); + }); + + it('is idempotent — second call does not hit the binding', async () => { + const ctx = makeContext(); + const { handle, cancel } = makeStatement(); + const state = createLifecycleState(); + + await seaCancel(state, handle, ctx, 'op-id-2'); + await seaCancel(state, handle, ctx, 'op-id-2'); + + expect(cancel.calledOnce).to.equal(true); + }); + + it('short-circuits when the operation is already closed', async () => { + const ctx = makeContext(); + const { handle, cancel } = makeStatement(); + const state = createLifecycleState(); + state.isClosed = true; + + const status = await seaCancel(state, handle, ctx, 'op-id-3'); + + expect(cancel.called).to.equal(false); + expect(status.isSuccess).to.equal(true); + }); + + it('sets isCancelled BEFORE awaiting the binding (so concurrent fetch sees it)', async () => { + const ctx = makeContext(); + const state = createLifecycleState(); + + // Cancel returns a promise that resolves only when we say so. + let release: (() => void) | undefined; + const cancelPromise = new Promise((resolve) => { + release = resolve; + }); + const handle: SeaStatementHandle = { + cancel: () => cancelPromise, + close: async () => undefined, + }; + + const inflight = seaCancel(state, handle, ctx, 'op-id-4'); + + // Yield once so the synchronous prelude of seaCancel runs. + await Promise.resolve(); + expect(state.isCancelled).to.equal(true); + // Before the await resolves, failIfNotActive must already throw. + expect(() => failIfNotActive(state)).to.throw(); + + release!(); + const status = await inflight; + expect(status.isSuccess).to.equal(true); + }); + + it('propagates binding errors via the kernel error mapping', async () => { + const ctx = makeContext(); + const state = createLifecycleState(); + const handle: SeaStatementHandle = { + cancel: async () => { + // Simulate the binding's JSON-envelope error format. + const payload = JSON.stringify({ + code: 'InvalidStatementHandle', + message: 'statement already closed', + }); + throw new Error(`__databricks_error__:${payload}`); + }, + close: async () => undefined, + }; + + let thrown: unknown; + try { + await seaCancel(state, handle, ctx, 'op-err-1'); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.contain('statement already closed'); + }); + + it('logs a debug message tagged with the operation id', async () => { + const ctx = makeContext(); + const logger = ctx.getLogger() as TestLogger; + const { handle } = makeStatement(); + const state = createLifecycleState(); + + await seaCancel(state, handle, ctx, 'op-id-log'); + + expect(logger.entries.some((e) => e.level === LogLevel.debug && e.message.includes('op-id-log'))).to.equal(true); + }); + }); + + describe('seaClose', () => { + it('calls statement.close() and resolves with a success Status', async () => { + const ctx = makeContext(); + const { handle, close } = makeStatement(); + const state = createLifecycleState(); + + const status = await seaClose(state, handle, ctx, 'op-close-1'); + + expect(close.calledOnce).to.equal(true); + expect(status.isSuccess).to.equal(true); + expect(state.isClosed).to.equal(true); + }); + + it('is idempotent — second call does not hit the binding', async () => { + const ctx = makeContext(); + const { handle, close } = makeStatement(); + const state = createLifecycleState(); + + await seaClose(state, handle, ctx, 'op-close-2'); + await seaClose(state, handle, ctx, 'op-close-2'); + + expect(close.calledOnce).to.equal(true); + }); + + it('propagates binding errors via the kernel error mapping', async () => { + const ctx = makeContext(); + const state = createLifecycleState(); + const handle: SeaStatementHandle = { + cancel: async () => undefined, + close: async () => { + const payload = JSON.stringify({ + code: 'NetworkError', + message: 'connection reset by peer', + }); + throw new Error(`__databricks_error__:${payload}`); + }, + }; + + let thrown: unknown; + try { + await seaClose(state, handle, ctx, 'op-err-close'); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.contain('connection reset'); + }); + }); + + describe('seaFinished', () => { + it('resolves immediately when no callback is provided (M0 no-op)', async () => { + const state = createLifecycleState(); + const start = Date.now(); + await seaFinished(state); + // Should be near-instantaneous — no 100ms poll. + expect(Date.now() - start).to.be.lessThan(50); + }); + + it('invokes the progress callback exactly once with a FINISHED status', async () => { + const state = createLifecycleState(); + const callback = sinon.stub(); + + await seaFinished(state, { callback }); + + expect(callback.calledOnce).to.equal(true); + const arg = callback.firstCall.args[0] as OperationStatus; + expect(arg.state).to.equal(OperationState.Succeeded); + expect(arg.hasResultSet).to.equal(true); + }); + + it('awaits an async progress callback', async () => { + const state = createLifecycleState(); + let resolvedInsideCallback = false; + const callback = async () => { + await new Promise((r) => setTimeout(r, 10)); + resolvedInsideCallback = true; + }; + + await seaFinished(state, { callback }); + + expect(resolvedInsideCallback).to.equal(true); + }); + + it('is a no-op when the operation is already cancelled', async () => { + const state = createLifecycleState(); + state.isCancelled = true; + const callback = sinon.stub(); + + await seaFinished(state, { callback }); + + expect(callback.called).to.equal(false); + }); + }); + + describe('failIfNotActive', () => { + it('throws OperationStateError(Canceled) when cancelled', () => { + const state = createLifecycleState(); + state.isCancelled = true; + // Throws the canonical OperationStateError(Canceled) directly so both the + // errorCode AND the message match the Thrift path verbatim. + try { + failIfNotActive(state); + expect.fail('expected throw'); + } catch (err) { + expect(err).to.be.instanceOf(OperationStateError); + expect((err as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + // Parity with Thrift's canonical message (was a custom "was cancelled."). + expect((err as Error).message).to.contain('canceled by a client'); + } + }); + + it('throws HiveDriverError when closed', () => { + const state = createLifecycleState(); + state.isClosed = true; + try { + failIfNotActive(state); + expect.fail('expected throw'); + } catch (err) { + expect(err).to.be.instanceOf(HiveDriverError); + } + }); + + it('does nothing when active', () => { + const state = createLifecycleState(); + // Should not throw. + failIfNotActive(state); + }); + }); +}); + +describe('SeaOperationBackend (lifecycle integration)', () => { + it('cancel() forwards to statement.cancel()', async () => { + const ctx = makeContext(); + const { handle, cancel } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + const status = await op.cancel(); + + expect(cancel.calledOnce).to.equal(true); + expect(status.isSuccess).to.equal(true); + }); + + it('close() forwards to statement.close()', async () => { + const ctx = makeContext(); + const { handle, close } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + const status = await op.close(); + + expect(close.calledOnce).to.equal(true); + expect(status.isSuccess).to.equal(true); + }); + + it('finished() resolves immediately and fires the callback once', async () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + const responses: OperationStatus[] = []; + const start = Date.now(); + await op.waitUntilReady({ callback: (r) => responses.push(r) }); + + expect(Date.now() - start).to.be.lessThan(50); + expect(responses).to.have.length(1); + expect(responses[0].state).to.equal(OperationState.Succeeded); + }); + + it('fetchChunk after cancel throws the cancellation error', async () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + await op.cancel(); + + let thrown: unknown; + try { + await op.fetchChunk({ limit: 10 }); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + }); + + it('cancel() is idempotent across the backend surface', async () => { + const ctx = makeContext(); + const { handle, cancel } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + await op.cancel(); + await op.cancel(); + await op.cancel(); + + expect(cancel.calledOnce).to.equal(true); + }); + + it('close() is idempotent across the backend surface', async () => { + const ctx = makeContext(); + const { handle, close } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + await op.close(); + await op.close(); + + expect(close.calledOnce).to.equal(true); + }); + + it('status() reports FINISHED_STATE when active', async () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + const status = await op.status(false); + expect(status.state).to.equal(OperationState.Succeeded); + }); + + it('status() reports CANCELED_STATE after cancel', async () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + await op.cancel(); + const status = await op.status(false); + expect(status.state).to.equal(OperationState.Cancelled); + }); + + it('id getter is stable', () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx, id: 'fixed-id' }); + + expect(op.id).to.equal('fixed-id'); + expect(op.id).to.equal('fixed-id'); + }); + + it('id getter defaults to a uuid when none is supplied', () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + // RFC4122 v4 — 36 chars with hyphens at positions 8/13/18/23. + expect(op.id).to.match(/^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[0-9a-f]{4}-[0-9a-f]{12}$/); + }); + + it('hasResultSet is true by default (kernel always streams)', () => { + const ctx = makeContext(); + const { handle } = makeStatement(); + const op = new SeaOperationBackend({ statement: handle, context: ctx }); + + expect(op.hasResultSet()).to.equal(true); + }); +}); From e2abd389dc9f9416eae629ebfea0631add2edb63 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 1 Jun 2026 20:55:53 +0000 Subject: [PATCH 2/2] =?UTF-8?q?fix(sea):=20address=20#411=20review=20?= =?UTF-8?q?=E2=80=94=20exhaustive=20interval=20switch,=20docs,=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validated every interval edge case (null, multi-row, negative sub-year, sibling-survives) against a live pecotesting warehouse first — all byte-identical to Thrift. The findings were layering/dead-code/coverage, not runtime bugs. - F1: corrected the DURATION_UNIT_METADATA_KEY doc — the interval/duration branches are SEA-gated by construction (Thrift maps INTERVAL → ArrowString and never reaches them), NOT "reused by thrift" as the old comment claimed. - F3/F6/F10: formatArrowInterval now handles YEAR_MONTH only (typed `Interval` + `IntervalUnit.YEAR_MONTH`, no magic `=== 0`) and THROWS on any other unit. The old non-exhaustive default silently misread MONTH_DAY_NANO/undefined as [days,ms]; and the native Interval[DayTime] branch was dead+broken (the kernel emits DAY-TIME as Duration, handled separately) — removed it. - F2: exported the metadata key + added a test pinning it equal to the SEA-side declaration (guards against a silent rename drift). - F8: dropped the dead `_unit` param from formatDayTimeFromTotal. - F9: removed the dead duration check in the BigNum branch (rewritten Int64s arrive as raw bigint) and fixed the false "Int32Array instanceof Uint8Array" comment. - F7: added a YEAR-MONTH sub-year-negative unit test (-1 month → "-0-1"). - F4: added live e2e for null INTERVAL → null and multi-row batches. - F5: e2e before() now probes getSeaNative() and skips (not errors) when the binding is absent; dropped the flaky wall-clock latency assertions (assert behavior — cancel resolved, callback fired once). Deferred (low/informational, noted): F11 (consolidate test makeContext helpers), F12 (tighten instanceOf assertions), F13 (per-operation interval-representation breadcrumb — a per-value log would be spam). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/result/ArrowResultConverter.ts | 99 +++++++++---------- tests/e2e/sea/interval-edge-e2e.test.ts | 88 +++++++++++++++++ tests/e2e/sea/operation-lifecycle-e2e.test.ts | 34 ++++--- tests/unit/sea/SeaIntervalParity.test.ts | 26 +++++ 4 files changed, 176 insertions(+), 71 deletions(-) create mode 100644 tests/e2e/sea/interval-edge-e2e.test.ts diff --git a/lib/result/ArrowResultConverter.ts b/lib/result/ArrowResultConverter.ts index e2a0f3c4..56772b57 100644 --- a/lib/result/ArrowResultConverter.ts +++ b/lib/result/ArrowResultConverter.ts @@ -6,6 +6,8 @@ import { TypeMap, DataType, Type, + Interval, + IntervalUnit, StructRow, MapRow, Vector, @@ -15,6 +17,7 @@ import { } from 'apache-arrow'; import { TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; +import HiveDriverError from '../errors/HiveDriverError'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils'; @@ -24,55 +27,51 @@ type ArrowSchema = Schema; type ArrowSchemaField = Field>; /** - * Metadata key carrying the original Arrow `Duration` time unit on - * fields that were rewritten to `Int64` by the SEA IPC pre-processor - * (`lib/sea/SeaArrowIpcDurationFix.ts`). We re-declare the constant - * here (rather than importing it) so the converter has no compile-time - * dependency on the SEA module — it's reused unchanged by the - * thrift-path which has no SEA awareness. + * Metadata key carrying the original Arrow `Duration` time unit on fields + * rewritten to `Int64` by the SEA IPC pre-processor + * (`lib/sea/SeaArrowIpcDurationFix.ts`). Re-declared here (rather than + * imported) to keep this generic `lib/result` converter free of a + * compile-time dependency on `lib/sea`. + * + * **SEA-gated by construction — NOT shared with Thrift.** This key (and the + * `DataType.isInterval` / duration branches below) only ever execute on the + * SEA path. The Thrift backend sets `intervalTypesAsArrow: false` and maps + * both INTERVAL `TTypeId`s to `ArrowString` (`lib/result/utils.ts`), so the + * server pre-formats intervals to strings and this logic is never reached. + * `export`ed so `SeaIntervalParity.test` can pin it equal to the SEA-side + * declaration and catch a rename/typo that would silently no-op the consumer. */ -const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit'; +export const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit'; const ZERO_BIGINT = BigInt(0); const NS_PER_MICRO = BigInt(1_000); const NS_PER_MILLI = BigInt(1_000_000); const NS_PER_SEC = BigInt(1_000_000_000); -const MS_PER_DAY = BigInt(86_400_000); const NS_PER_MIN = NS_PER_SEC * BigInt(60); const NS_PER_HOUR = NS_PER_MIN * BigInt(60); const NS_PER_DAY = NS_PER_HOUR * BigInt(24); /** - * Format an Arrow `Interval[YearMonth]` or `Interval[DayTime]` value - * into the canonical thrift string the JDBC/ODBC server emits: - * YEAR-MONTH → `"Y-M"` (e.g. 1 year 2 months → `"1-2"`) - * DAY-TIME → `"D HH:mm:ss.fffffffff"` - * (e.g. 1 day 02:03:04 → `"1 02:03:04.000000000"`) + * Format a native Arrow `Interval[YearMonth]` value into the canonical thrift + * string `"Y-M"` (e.g. 1 year 2 months → `"1-2"`, -1 month → `"-0-1"`). * - * Arrow surfaces these as `Int32Array(2)` via the `GetVisitor` - * (`apache-arrow/visitor/get.js:177-185`): - * YEAR-MONTH: `[years, months]` (years/months derived from a single - * int32 holding total months) - * DAY-TIME: `[days, milliseconds]` (legacy two-int32 form) + * Arrow surfaces YEAR-MONTH as an `Int32Array(2)` `[years, months]` via the + * `GetVisitor` (years/months derived from a single int32 of total months). * - * Negative intervals: the FULL interval is emitted with a leading `-` - * (Spark convention), and individual fields are unsigned. We mirror - * Spark's display. + * **Only YEAR_MONTH reaches here.** The kernel emits INTERVAL DAY-TIME as an + * Arrow `Duration` (rewritten to `Int64`), handled by + * `formatDurationToIntervalDayTime` — never as a native `Interval[DayTime]`. + * Any other unit (DAY_TIME / MONTH_DAY_NANO / undefined) is therefore + * unexpected; we throw rather than silently misread the value as `[days, ms]` + * and emit a confidently-wrong string (the old non-exhaustive default). */ -function formatArrowInterval(value: any, valueType: any): string { - // `value` is an Int32Array of length 2. - const a = Number(value[0]); - const b = Number(value[1]); - // unit 0 = YEAR_MONTH, unit 1 = DAY_TIME, unit 2 = MONTH_DAY_NANO - const unit = valueType?.unit; - if (unit === 0) { - return formatYearMonth(a, b); +function formatArrowInterval(value: Int32Array, valueType: Interval): string { + if (valueType?.unit !== IntervalUnit.YEAR_MONTH) { + throw new HiveDriverError( + `SEA result converter: unsupported Arrow Interval unit ${valueType?.unit}. The kernel emits only ` + + `YEAR_MONTH as a native Arrow Interval (DAY-TIME arrives as Duration); MONTH_DAY_NANO is unsupported.`, + ); } - // DAY_TIME: a = days, b = milliseconds (within the day, can be ≥0 or <0) - // We re-normalise: total milliseconds = a * 86_400_000 + b, then split into - // days, hours, minutes, seconds, nanoseconds (nanoseconds is always 0 - // because the legacy IntervalDayTime carries only millisecond precision). - const totalMs = BigInt(a) * MS_PER_DAY + BigInt(b); - return formatDayTimeFromTotal(totalMs * NS_PER_MILLI /* → ns */, 'NANOSECOND'); + return formatYearMonth(Number(value[0]), Number(value[1])); } /** @@ -107,7 +106,7 @@ function formatYearMonth(years: number, months: number): string { function formatDurationToIntervalDayTime(value: bigint | number, unit: string): string { const bi = typeof value === 'bigint' ? value : BigInt(value); const nanos = toNanoseconds(bi, unit); - return formatDayTimeFromTotal(nanos, unit); + return formatDayTimeFromTotal(nanos); } /** @@ -137,13 +136,9 @@ function toNanoseconds(value: bigint, unit: string): bigint { * Always emits 9 fractional digits to match the thrift driver's wire * format (`"1 02:03:04.000000000"` — 9 digits regardless of the * server-side storage precision). Negative values get a single - * leading `-`. - * - * The `unit` parameter is currently unused for formatting (the value - * is already in nanoseconds by the time we get here) but is retained - * for future use if a unit-aware precision is ever needed. + * leading `-`. The caller has already scaled to nanoseconds. */ -function formatDayTimeFromTotal(totalNanos: bigint, _unit: string): string { +function formatDayTimeFromTotal(totalNanos: bigint): string { const sign = totalNanos < ZERO_BIGINT ? '-' : ''; const abs = totalNanos < ZERO_BIGINT ? -totalNanos : totalNanos; @@ -369,23 +364,17 @@ export default class ArrowResultConverter implements IResultsProvider if (DataType.isDecimal(valueType)) { return Number(result) / 10 ** valueType.scale; } - // Duration columns rewritten to Int64 — detect via metadata. - const durationUnit = field?.metadata.get(DURATION_UNIT_METADATA_KEY); - if (durationUnit) { - return formatDurationToIntervalDayTime(result, durationUnit); - } + // A rewritten Duration Int64 surfaces as a raw `bigint`, not a BigNum + // wrapper, so it is handled in the bigint branch below — not here. return result; } - // Convert binary data to Buffer + // Convert binary data to Buffer. if (value instanceof Uint8Array) { - // INTERVAL DAY-TIME / YEAR-MONTH that apache-arrow surfaced as - // an Int32Array (size 2). `Uint8Array.isInstanceOf` is true for - // every TypedArray subclass, so we have to check the parent type - // first. The `DataType.isInterval` branch above already handles - // the case where Arrow knew the field was an interval — this - // fallback covers schemas where the interval surfaced as bare - // bytes (defensive; not exercised in M0). + // Note: Arrow `Int32Array` / `BigInt64Array` are NOT `instanceof + // Uint8Array` (they are sibling TypedArrays), so an interval value never + // reaches this branch — intervals are handled by the `isInterval` / + // bigint branches above. This is purely the binary-column → Buffer path. return Buffer.from(value); } diff --git a/tests/e2e/sea/interval-edge-e2e.test.ts b/tests/e2e/sea/interval-edge-e2e.test.ts new file mode 100644 index 00000000..00ab0ae1 --- /dev/null +++ b/tests/e2e/sea/interval-edge-e2e.test.ts @@ -0,0 +1,88 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* eslint-disable no-console */ + +import { expect } from 'chai'; +import { DBSQLClient } from '../../../lib'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../../../lib/contracts/InternalConnectionOptions'; +import { getSeaNative } from '../../../lib/sea/SeaNativeLoader'; + +// INTERVAL edge cases the unit suite can't easily build (null, multi-row). +// Verified byte-identical to the Thrift backend against a live warehouse. +// Requires the pecotesting secrets AND the native binding — skips otherwise. + +interface PecoSecrets { + host: string; + path: string; + token: string; +} + +function readSecrets(): PecoSecrets | null { + const host = process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME; + const path = process.env.DATABRICKS_PECOTESTING_HTTP_PATH; + const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL; + if (!host || !path || !token) return null; + return { host, path, token }; +} + +async function seaValues(sql: string, secrets: PecoSecrets): Promise { + const client = new DBSQLClient(); + await client.connect({ ...secrets, useSEA: true } as ConnectionOptions & InternalConnectionOptions); + try { + const session = await client.openSession(); + const op = await session.executeStatement(sql); + const rows = (await op.fetchAll()) as Array>; + await op.close(); + await session.close(); + return rows.map((r) => r.v); + } finally { + await client.close(); + } +} + +describe('SEA INTERVAL edge cases end-to-end', function suite() { + this.timeout(120_000); + + const secrets = readSecrets(); + + before(function gate() { + // eslint-disable-next-line no-invalid-this + const self = this; + if (!secrets) { + self.skip(); + return; + } + // Skip (not error) when the native binding isn't built/installed. + try { + getSeaNative(); + } catch { + self.skip(); + } + }); + + it('NULL INTERVAL DAY-TIME → null', async () => { + const values = await seaValues('SELECT CAST(NULL AS INTERVAL DAY TO SECOND) AS v', secrets as PecoSecrets); + expect(values).to.deep.equal([null]); + }); + + it('multi-row INTERVAL DAY-TIME batch formats every row', async () => { + const values = await seaValues( + "SELECT * FROM VALUES (INTERVAL '1' DAY), (INTERVAL '2 03:00:00' DAY TO SECOND), (INTERVAL '0' DAY) AS t(v)", + secrets as PecoSecrets, + ); + expect(values).to.deep.equal(['1 00:00:00.000000000', '2 03:00:00.000000000', '0 00:00:00.000000000']); + }); +}); diff --git a/tests/e2e/sea/operation-lifecycle-e2e.test.ts b/tests/e2e/sea/operation-lifecycle-e2e.test.ts index 31c4f910..5d529aca 100644 --- a/tests/e2e/sea/operation-lifecycle-e2e.test.ts +++ b/tests/e2e/sea/operation-lifecycle-e2e.test.ts @@ -101,13 +101,23 @@ describe('SEA operation lifecycle — end-to-end', function suite() { const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL || process.env.E2E_ACCESS_TOKEN; before(function gate() { + // eslint-disable-next-line no-invalid-this + const self = this; if (!hostName || !httpPath || !token) { - // eslint-disable-next-line no-invalid-this - this.skip(); + self.skip(); + return; + } + // Creds present but the native binding not built/installed (e.g. a CI + // runner without the .node) must SKIP, not error: probe getSeaNative() + // here so every test isn't an uncaught-throw at its first call. + try { + getSeaNative(); + } catch { + self.skip(); } }); - it('cancel() succeeds against a live SEA statement and is fast', async () => { + it('cancel() succeeds against a live SEA statement', async () => { const binding = getSeaNative() as unknown as NativeBinding; const connection = await binding.openSession({ @@ -130,12 +140,9 @@ describe('SEA operation lifecycle — end-to-end', function suite() { context: makeContext(), }); - const t0 = Date.now(); + // Assert behavior (cancel resolves with success), not wall-clock latency + // — a hard ms budget against a live warehouse is flaky on slow networks. const status = await op.cancel(); - const elapsed = Date.now() - t0; - - // Cancel must complete within 200ms. - expect(elapsed).to.be.lessThan(200, `cancel latency ${elapsed}ms exceeds 200ms budget`); expect(status.isSuccess).to.equal(true); } finally { // Bypass `op.close()` here because we want to verify cancel @@ -170,10 +177,7 @@ describe('SEA operation lifecycle — end-to-end', function suite() { context: makeContext(), }); - const t0 = Date.now(); await op.cancel(); - const elapsed = Date.now() - t0; - expect(elapsed).to.be.lessThan(200, `cancel latency ${elapsed}ms exceeds 200ms budget`); // After cancel, fetchChunk must throw the cancellation error // (regardless of whether the underlying fetch implementation @@ -247,17 +251,15 @@ describe('SEA operation lifecycle — end-to-end', function suite() { }); let ticks = 0; - const t0 = Date.now(); await op.waitUntilReady({ callback: () => { ticks += 1; }, }); - const elapsed = Date.now() - t0; - // M0 finished() is a no-op — must resolve in <50ms. - expect(elapsed).to.be.lessThan(50); - // Progress callback fires exactly once. + // M0 finished() is a no-op that resolves immediately and fires the + // progress callback exactly once. Assert the behavior, not a wall-clock + // budget (flaky against a live warehouse). expect(ticks).to.equal(1); } finally { if (statement !== null) { diff --git a/tests/unit/sea/SeaIntervalParity.test.ts b/tests/unit/sea/SeaIntervalParity.test.ts index 38a4a19d..dd15db7e 100644 --- a/tests/unit/sea/SeaIntervalParity.test.ts +++ b/tests/unit/sea/SeaIntervalParity.test.ts @@ -61,6 +61,8 @@ import { TimeUnit as FbTimeUnit } from 'apache-arrow/fb/time-unit'; import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; import ClientContextStub from '../.stubs/ClientContextStub'; +import { DURATION_UNIT_METADATA_KEY as CONVERTER_DURATION_KEY } from '../../../lib/result/ArrowResultConverter'; +import { DURATION_UNIT_METADATA_KEY as REWRITER_DURATION_KEY } from '../../../lib/sea/SeaArrowIpcDurationFix'; // --------------------------------------------------------------------------- // Test helpers. @@ -363,4 +365,28 @@ describe('SeaOperationBackend — INTERVAL parity with thrift', () => { expect(rows).to.have.length(1); expect((rows[0] as any).iv).to.equal('1 00:00:00.000000000'); }); + + it('YEAR-MONTH negative sub-year (-1 month) → "-0-1"', async () => { + // |total| < 12 negatives are the trickiest formatYearMonth case (years + // truncates to 0, the sign must still lead). Verified live byte-identical + // to thrift. + const fields = [withTypeName(new Field('iv', new Interval(IntervalUnit.YEAR_MONTH), true), 'INTERVAL')]; + const schema = new Schema(fields); + const schemaIpc = ipcSchemaOnly(schema); + const dataIpc = ipcFromColumns(schema, { iv: Int32Array.from([-1]) }); + + const stub = new StatementStub(schemaIpc, [dataIpc]); + const backend = new SeaOperationBackend({ statement: stub, context: new ClientContextStub() }); + const rows = await backend.fetchChunk({ limit: 100 }); + expect(rows).to.have.length(1); + expect((rows[0] as any).iv).to.equal('-0-1'); + }); + + it('the duration_unit metadata key is identical in the rewriter and the converter', () => { + // Both modules declare the key independently (the converter avoids a + // compile-time dependency on lib/sea). Pin them equal so a rename/typo in + // one doesn't silently turn the converter's duration branch into a no-op. + expect(CONVERTER_DURATION_KEY).to.equal(REWRITER_DURATION_KEY); + expect(CONVERTER_DURATION_KEY).to.equal('databricks.arrow.duration_unit'); + }); });