Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ Dockerfile* text
#
.gitattributes export-ignore
.gitignore export-ignore

# napi-rs auto-generates these files from the kernel's `napi-binding/napi/`
# crate; regenerated by `npm run build:native`. Tell git/GitHub they're
# machine-generated so they collapse in diffs and are excluded from
# blame and language stats.
native/sea/index.d.ts linguist-generated=true
native/sea/index.js linguist-generated=true
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ coverage_unit
dist
*.DS_Store
lib/version.ts

# SEA native binding — copied/generated from kernel workspace by `npm run build:native`.
# The committed contract is `native/sea/index.d.ts` (TypeScript declarations) and
# `native/sea/index.js` (the napi-rs platform router — small, stable, and required in
# the publish tarball so a missing build step can't ship a tarball that can't load).
# The `.node` binaries are large per-platform artifacts and must NOT be committed;
# in production they arrive via the `@databricks/sql-kernel-<triple>` optional deps.
native/sea/index.node
native/sea/index.*.node
7 changes: 7 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
!dist/**/*
!thrift/**/*

# SEA napi-rs router shim + TypeScript declarations. The router (index.js)
# selects the per-platform `.node` artifact from `@databricks/sql-kernel-*`
# optionalDependencies (populated when the kernel CI publishes them);
# the .d.ts is the consumer-facing type contract.
!native/sea/index.js
!native/sea/index.d.ts

!LICENSE
!NOTICE
!package.json
Expand Down
6 changes: 6 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ coverage
dist
thrift
package-lock.json

# Generated by napi-rs from the kernel's `napi-binding/napi/` crate;
# regenerated by `npm run build:native`. Format follows napi-rs's
# defaults (no semicolons), not this repo's prettier config.
native/sea/index.d.ts
native/sea/index.js
129 changes: 55 additions & 74 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import thrift from 'thrift';
import Int64 from 'node-int64';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import IDriver from './contracts/IDriver';
import IClientContext, { ClientConfig } from './contracts/IClientContext';
Expand All @@ -14,9 +12,12 @@ import IDBSQLSession from './contracts/IDBSQLSession';
import IAuthentication from './connection/contracts/IAuthentication';
import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
import { buildUserAgentString } from './utils';
import IBackend from './contracts/IBackend';
import { InternalConnectionOptions } from './contracts/InternalConnectionOptions';
import ThriftBackend from './thrift-backend/ThriftBackend';
import SeaBackend from './sea/SeaBackend';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
import {
Expand All @@ -39,19 +40,6 @@ function prependSlash(str: string): string {
return str;
}

function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
if (!catalogName && !schemaName) {
return {};
}

return {
initialNamespace: {
catalogName,
schemaName,
},
};
}

export type ThriftLibrary = Pick<typeof thrift, 'createClient'>;

export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
Expand All @@ -75,6 +63,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private readonly sessions = new CloseableCollection<DBSQLSession>();

private backend?: IBackend;

private static getDefaultLogger(): IDBSQLLogger {
if (!this.defaultLogger) {
this.defaultLogger = new DBSQLLogger();
Expand Down Expand Up @@ -248,38 +238,53 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

this.connectionProvider = this.createConnectionProvider(options);

const thriftConnection = await this.connectionProvider.getThriftConnection();

thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});
// M0: `useSEA` is consumed via a non-exported internal-options cast so it
// doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")`
// pattern (see databricks-sql-python/src/databricks/sql/session.py).
const internalOptions = options as ConnectionOptions & InternalConnectionOptions;
this.backend = internalOptions.useSEA
? new SeaBackend()
: new ThriftBackend({
context: this,
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});
await this.backend.connect(options);

return this;
}

private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
switch (event) {
case 'error': {
const error = payload as Error;
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter throws when 'error' has no listeners; we've already logged it.
}
return;
}
case 'reconnecting':
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`);
this.emit('reconnecting', payload);
return;
case 'close':
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
return;
case 'timeout':
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
// Explicit return mirrors the other cases and protects against
// fall-through if a new event is added below.
// eslint-disable-next-line no-useless-return
return;
// no default
}
}

/**
* Starts new session
* @public
Expand All @@ -290,44 +295,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
* const session = await client.openSession();
*/
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
// Prepare session configuration
const configuration = request.configuration ? { ...request.configuration } : {};

// Add metric view metadata config if enabled
if (this.config.enableMetricViewMetadata) {
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
}

// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
if (request.queryTags !== undefined) {
const serialized = serializeQueryTags(request.queryTags);
if (serialized) {
configuration.QUERY_TAGS = serialized;
} else {
delete configuration.QUERY_TAGS;
}
if (!this.backend) {
throw new HiveDriverError('DBSQLClient: not connected');
}

const response = await this.driver.openSession({
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
configuration,
canUseMultipleCatalogs: true,
});

Status.assert(response.status);
const session = new DBSQLSession({
handle: definedOrError(response.sessionHandle),
context: this,
serverProtocolVersion: response.serverProtocolVersion,
});
const sessionBackend = await this.backend.openSession(request);
const session = new DBSQLSession({ backend: sessionBackend, context: this });
this.sessions.add(session);
return session;
}

public async close(): Promise<void> {
await this.sessions.closeAll();
await this.backend?.close();

this.backend = undefined;
this.client = undefined;
this.connectionProvider = undefined;
this.authProvider = undefined;
Expand Down
Loading
Loading