Kysely
Use Kysely as the storage adapter for memory, runs, and streams.
Install
npm install @better-agent/kysely kysely pgSetup
Pass your Kysely database to kyselyStorage.
import { betterAgent } from "@better-agent/core";
import { kyselyStorage } from "@better-agent/kysely";
import { Kysely, PostgresDialect } from "kysely";
import { Pool } from "pg";
const db = new Kysely<Database>({
dialect: new PostgresDialect({
pool: new Pool({
connectionString: process.env.DATABASE_URL,
}),
}),
});
export const app = betterAgent({
storage: kyselyStorage({
db,
dialect: "postgres",
}),
agents: [supportAgent],
});Schema
The Kysely adapter expects the Better Agent storage tables below. It does not
include a generator, so create these tables in your own migration system before
using kyselyStorage.
This Postgres migration shows the required shape.
import { Kysely, sql } from "kysely";
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable("threads")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("agent_name", "text")
.addColumn("scope", "text")
.addColumn("title", "text")
.addColumn("state", "jsonb")
.addColumn("metadata", "jsonb")
.addColumn("created_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn("updated_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.execute();
await db.schema
.createIndex("threads_agent_name_scope_idx")
.on("threads")
.columns(["agent_name", "scope"])
.execute();
await db.schema
.createTable("messages")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("thread_id", "text", (col) =>
col.notNull().references("threads.id"),
)
.addColumn("run_id", "text")
.addColumn("message", "jsonb", (col) => col.notNull())
.addColumn("created_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.execute();
await db.schema
.createIndex("messages_thread_id_idx")
.on("messages")
.column("thread_id")
.execute();
await db.schema
.createTable("agent_runs")
.addColumn("run_id", "text", (col) => col.notNull().unique())
.addColumn("agent_name", "text", (col) => col.notNull())
.addColumn("thread_id", "text")
.addColumn("scope", "text")
.addColumn("status", "text", (col) => col.notNull())
.addColumn("abort_requested_at", "timestamptz")
.addColumn("final_event", "jsonb")
.addColumn("config", "jsonb")
.addColumn("started_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn("updated_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn("finished_at", "timestamptz")
.execute();
await db.schema
.createIndex("agent_runs_run_id_scope_idx")
.on("agent_runs")
.columns(["run_id", "scope"])
.execute();
await db.schema
.createTable("streams")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("run_id", "text", (col) => col.notNull().unique())
.addColumn("status", "text", (col) => col.notNull())
.addColumn("created_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn("updated_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn("closed_at", "timestamptz")
.execute();
await db.schema
.createTable("stream_events")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("run_id", "text", (col) => col.notNull())
.addColumn("seq", "integer", (col) => col.notNull())
.addColumn("timestamp", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn("event", "jsonb", (col) => col.notNull())
.execute();
await db.schema
.createIndex("stream_events_run_id_idx")
.on("stream_events")
.column("run_id")
.execute();
await db.schema
.createIndex("stream_events_run_id_seq_unique")
.on("stream_events")
.columns(["run_id", "seq"])
.unique()
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable("stream_events").execute();
await db.schema.dropTable("streams").execute();
await db.schema.dropTable("agent_runs").execute();
await db.schema.dropTable("messages").execute();
await db.schema.dropTable("threads").execute();
}For MySQL or SQLite, keep the same table names, column names, nullability, and
constraints, and translate jsonb and timestamptz to your dialect's JSON and
datetime types.
If you need different physical table names or a different schema layout, use a
custom Storage implementation instead of
kyselyStorage.
Options
| Option | Description |
|---|---|
db | Existing Kysely database. |
dialect | postgres, mysql, or sqlite. |
Notes
Use Kysely when your app already owns SQL migrations and database types.
See Storage for what Better Agent stores.
Powered by Farming Labs ORM.