add ClickHouseConnection

This commit is contained in:
Mohammad Azmi
2026-03-11 13:46:35 +07:00
parent 7763f1644b
commit c5ae16e382
4 changed files with 164 additions and 86 deletions

View File

@@ -8,7 +8,6 @@ import {
import { ClickhouseKnexClient } from "@shared/lib/knex-clickhouse";
import knexlib from "knex";
import {
createClient,
InsertParams,
ResponseJSON,
ClickHouseClient as NodeClickHouseClient,
@@ -63,6 +62,7 @@ import { errors } from "@/lib/errors";
import { IDbConnectionServer } from "@/lib/db/backendTypes";
import { ChangeBuilderBase } from "@shared/lib/sql/change_builder/ChangeBuilderBase";
import { ClickHouseCursor } from "./clickhouse/ClickHouseCursor";
import { ClickHouseConnection } from "./clickhouse/ClickHouseConnection";
interface JSONResult {
statement: IdentifyResult;
@@ -122,43 +122,22 @@ const knex = knexlib({ client: ClickhouseKnexClient });
const RE_NULLABLE = /^Nullable\((.*)\)$/;
const RE_SELECT_FORMAT = /^\s*SELECT.+FORMAT\s+(\w+)\s*;?$/i;
export class ClickHouseClient extends BasicDatabaseClient<Result> {
export class ClickHouseClient extends BasicDatabaseClient<
Result,
NodeClickHouseClient
> {
version: string;
client: NodeClickHouseClient;
supportsTransaction: boolean;
constructor(server: IDbConnectionServer, database: IDbConnectionDatabase) {
super(knex, clickhouseContext, server, database);
this.dialect = "generic";
this.readOnlyMode = server?.config?.readOnlyMode || false;
this.connection = new ClickHouseConnection({ server, database });
}
async connect(): Promise<void> {
await super.connect();
let url: string;
if (this.server.config.url) {
url = this.server.config.url
} else {
const urlObj = new URL('http://example.com/');
urlObj.hostname = this.server.config.host;
urlObj.port = this.server.config.port.toString();
urlObj.protocol = this.server.config.ssl ? 'https:' : 'http:';
url = urlObj.toString();
}
this.client = createClient({
url,
username: this.server.config.user,
password: this.server.config.password,
database: this.database.database,
application: "Beekeeper Studio",
clickhouse_settings: {
default_format: "JSONCompact",
},
request_timeout: 120_000, // 2 minutes
});
const result = await this.driverExecuteSingle(
"SELECT version() AS version"
);
@@ -168,11 +147,6 @@ export class ClickHouseClient extends BasicDatabaseClient<Result> {
this.supportsTransaction = await this.checkTransactionSupport();
}
async disconnect(): Promise<void> {
await super.disconnect();
await this.client.close();
}
async versionString(): Promise<string> {
return this.version;
}
@@ -790,7 +764,8 @@ export class ClickHouseClient extends BasicDatabaseClient<Result> {
log.info(`Running Query`, query, options);
if (options.insert) {
await this.client.insert(options.insert as any);
const client = await this.connection.getClient();
await client.insert(options.insert as any);
return [];
}
@@ -805,7 +780,8 @@ export class ClickHouseClient extends BasicDatabaseClient<Result> {
let rows: any[][] | Record<string, any>[] = [];
let columns: ResultColumn[] = [];
if (statement.executionType === "LISTING" && !format) {
const result = await this.client.query({
const client = await this.connection.getClient();
const result = await client.query({
query: statement.text,
query_params: options.params,
query_id: options.queryId,
@@ -816,7 +792,8 @@ export class ClickHouseClient extends BasicDatabaseClient<Result> {
rows = data.data;
columns = data.meta;
} else {
const result = await this.client.exec({
const client = await this.connection.getClient();
const result = await client.exec({
query,
query_params: options.params,
query_id: options.queryId,
@@ -1038,7 +1015,7 @@ export class ClickHouseClient extends BasicDatabaseClient<Result> {
const cursor = new ClickHouseCursor({
query: qs.query,
params: qs.params,
client: this.client,
client: await this.connection.getClient(),
chunkSize,
});
return { totalRows, columns, cursor };
@@ -1048,7 +1025,7 @@ export class ClickHouseClient extends BasicDatabaseClient<Result> {
const cursorOpts = {
query,
params: [],
client: this.client,
client: await this.connection.getClient(),
chunkSize
}

View File

@@ -0,0 +1,60 @@
import { DatabaseConnection } from "@/lib/db/clients/DatabaseConnection";
import {
createClient,
ClickHouseClient as NodeClickHouseClient,
} from "@clickhouse/client";
export class ClickHouseConnection extends DatabaseConnection<NodeClickHouseClient> {
private client: NodeClickHouseClient;
protected async doConnect(): Promise<void> {
let url: string;
if (this.server.config.url) {
url = this.server.config.url;
} else {
const host = this.server.sshTunnel
? this.server.config.localHost
: this.server.config.host;
const port = this.server.sshTunnel
? this.server.config.localPort
: this.server.config.port;
const urlObj = new URL("http://example.com/");
urlObj.hostname = host;
urlObj.port = port.toString();
urlObj.protocol = this.server.config.ssl ? "https:" : "http:";
url = urlObj.toString();
}
this.client = createClient({
url,
username: this.server.config.user,
password: this.server.config.password,
database: this.database.database,
application: "Beekeeper Studio",
clickhouse_settings: {
default_format: "JSONCompact",
},
request_timeout: 120_000, // 2 minutes
});
}
protected async doDisconnect(): Promise<void> {
await this.client.close();
}
protected async doGetClient(): Promise<NodeClickHouseClient> {
const pingResult = await this.client.ping();
if (pingResult.success === false) {
throw pingResult.error;
}
return this.client;
}
protected isConnectionLostError(err: any): boolean {
if ("code" in err && err.code === "ECONNRESET") {
return true;
}
return false;
}
}

View File

@@ -58,6 +58,18 @@ services:
networks:
- internal
clickhouse:
image: clickhouse/clickhouse-server:24.2
container_name: test_ssh_clickhouse
environment:
- CLICKHOUSE_USER=username
- CLICKHOUSE_PASSWORD=password
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1
ports:
- 8123
networks:
- internal
ssh:
build:
context: ./

View File

@@ -4,84 +4,114 @@ import {
StartedDockerComposeEnvironment,
Wait,
} from "testcontainers";
import { WaitStrategy } from "testcontainers/build/wait-strategies/wait-strategy";
import { createServer } from '@commercial/backend/lib/db/server';
export const DB_CONFIGS: Partial<
Record<
ConnectionType,
{ container: string; host: string; port: number; user: string; password: string; database: string }
>
> = {
interface DbConfig {
service: string;
container: string;
host: string;
port: number;
user: string;
password: string;
database: string;
waitMessage?: string;
waitCount?: number;
waitStrategy?: WaitStrategy;
}
export const DB_CONFIGS: Partial<Record<ConnectionType, DbConfig>> = {
postgresql: {
service: "postgres",
container: "test_ssh_postgres",
host: "postgres",
port: 5432,
user: "bks",
password: "example",
database: "test",
waitMessage: "database system is ready to accept connections",
waitCount: 2,
},
mysql: {
service: "mysql",
container: "test_ssh_mysql",
host: "mysql",
port: 3306,
user: "bks",
password: "example",
database: "test",
waitMessage: "ready for connections",
waitCount: 2,
},
mariadb: {
service: "mariadb",
container: "test_ssh_mariadb",
host: "mariadb",
port: 3306,
user: "bks",
password: "example",
database: "test",
waitMessage: "ready for connections",
waitCount: 2,
},
sqlserver: {
service: "sqlserver",
container: "test_ssh_sqlserver",
host: "sqlserver",
port: 1433,
user: "sa",
password: "Example1!",
database: "master",
waitMessage: "SQL Server is now ready for client connections.",
},
clickhouse: {
service: "clickhouse",
container: "test_ssh_clickhouse",
host: "clickhouse",
port: 8123,
user: "username",
password: "password",
database: "default",
waitStrategy: Wait.forListeningPorts(),
},
};
export class SshEnvironment {
private environment!: StartedDockerComposeEnvironment;
private type: ConnectionType;
constructor(type: ConnectionType) {
if (!DB_CONFIGS[type]) {
throw new Error(`No SSH test config for database type: ${type}`);
}
this.type = type;
}
private get config(): DbConfig {
return DB_CONFIGS[this.type]!;
}
async start() {
this.environment = await new DockerComposeEnvironment(
const { container, waitMessage, waitCount, waitStrategy } = this.config;
const dbWaitStrategy = waitStrategy ?? Wait.forLogMessage(waitMessage, waitCount);
let compose = new DockerComposeEnvironment(
"tests/docker",
"ssh.yml"
)
.withWaitStrategy(
"test_ssh_postgres",
Wait.forLogMessage("database system is ready to accept connections", 2)
)
.withWaitStrategy(
"test_ssh_mysql",
Wait.forLogMessage("ready for connections", 2)
)
.withWaitStrategy(
"test_ssh_mariadb",
Wait.forLogMessage("ready for connections", 2)
)
// .withWaitStrategy(
// "test_ssh_sqlserver",
// Wait.forLogMessage("SQL Server is now ready for client connections.")
// )
.withWaitStrategy("test_ssh", Wait.forListeningPorts())
.up();
.withWaitStrategy(container, dbWaitStrategy)
.withWaitStrategy("test_ssh", Wait.forListeningPorts());
this.environment = await compose.up([this.config.service, "ssh"]);
}
async restart() {
const container = this.environment.getContainer("test_ssh");
if (container) {
console.log('restarting')
await container.restart();
// wait until it's fully restarted
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log('restarted')
await new Promise((resolve) => setTimeout(resolve, 500));
}
}
@@ -89,16 +119,6 @@ export class SshEnvironment {
await this.environment?.stop();
}
getDbHost(db: ConnectionType) {
return this.environment.getContainer(DB_CONFIGS[db].container).getHost();
}
getDbPort(db: ConnectionType) {
return this.environment
.getContainer(DB_CONFIGS[db].container)
.getMappedPort(DB_CONFIGS[db].port);
}
getSshHost() {
return this.environment.getContainer("test_ssh").getHost();
}
@@ -107,13 +127,15 @@ export class SshEnvironment {
return 7222;
}
async connect(type: ConnectionType) {
const server = createServer({
client: type,
host: DB_CONFIGS[type].host,
port: DB_CONFIGS[type].port,
user: DB_CONFIGS[type].user,
password: DB_CONFIGS[type].password,
async connect() {
const { host, port, user, password, database } = this.config;
const config = {
client: this.type,
host,
port,
user,
password,
ssh: {
host: this.getSshHost(),
port: this.getSshPort(),
@@ -121,9 +143,16 @@ export class SshEnvironment {
password: 'password',
},
trustServerCertificate: true,
} as IDbConnectionServerConfig);
const database = server.createConnection(DB_CONFIGS[type].database);
await database.connect();
return database;
...(this.type === 'sqlanywhere' ? {
sqlAnywhereOptions: {
mode: 'server' as const,
},
} : {}),
} as IDbConnectionServerConfig;
const server = createServer(config);
const db = server.createConnection(database);
await db.connect();
return db;
}
}