Kysely

Use Kysely as the storage adapter for memory, runs, and streams.

Install

npm install @better-agent/kysely kysely pg

Setup

Pass your Kysely database to kyselyStorage.

import { betterAgent } from "@better-agent/core";
import { kyselyStorage } from "@better-agent/kysely";
import { Kysely, PostgresDialect } from "kysely";
import { Pool } from "pg";

const db = new Kysely<Database>({
  dialect: new PostgresDialect({
    pool: new Pool({
      connectionString: process.env.DATABASE_URL,
    }),
  }),
});

export const app = betterAgent({
  storage: kyselyStorage({
    db,
    dialect: "postgres",
  }),
  agents: [supportAgent],
});

Schema

The Kysely adapter expects the Better Agent storage tables below. It does not include a generator, so create these tables in your own migration system before using kyselyStorage.

This Postgres migration shows the required shape.

import { Kysely, sql } from "kysely";

export async function up(db: Kysely<any>): Promise<void> {
  await db.schema
    .createTable("threads")
    .addColumn("id", "text", (col) => col.primaryKey())
    .addColumn("agent_name", "text")
    .addColumn("scope", "text")
    .addColumn("title", "text")
    .addColumn("state", "jsonb")
    .addColumn("metadata", "jsonb")
    .addColumn("created_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .addColumn("updated_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .execute();

  await db.schema
    .createIndex("threads_agent_name_scope_idx")
    .on("threads")
    .columns(["agent_name", "scope"])
    .execute();

  await db.schema
    .createTable("messages")
    .addColumn("id", "text", (col) => col.primaryKey())
    .addColumn("thread_id", "text", (col) =>
      col.notNull().references("threads.id"),
    )
    .addColumn("run_id", "text")
    .addColumn("message", "jsonb", (col) => col.notNull())
    .addColumn("created_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .execute();

  await db.schema
    .createIndex("messages_thread_id_idx")
    .on("messages")
    .column("thread_id")
    .execute();

  await db.schema
    .createTable("agent_runs")
    .addColumn("run_id", "text", (col) => col.notNull().unique())
    .addColumn("agent_name", "text", (col) => col.notNull())
    .addColumn("thread_id", "text")
    .addColumn("scope", "text")
    .addColumn("status", "text", (col) => col.notNull())
    .addColumn("abort_requested_at", "timestamptz")
    .addColumn("final_event", "jsonb")
    .addColumn("config", "jsonb")
    .addColumn("started_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .addColumn("updated_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .addColumn("finished_at", "timestamptz")
    .execute();

  await db.schema
    .createIndex("agent_runs_run_id_scope_idx")
    .on("agent_runs")
    .columns(["run_id", "scope"])
    .execute();

  await db.schema
    .createTable("streams")
    .addColumn("id", "text", (col) => col.primaryKey())
    .addColumn("run_id", "text", (col) => col.notNull().unique())
    .addColumn("status", "text", (col) => col.notNull())
    .addColumn("created_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .addColumn("updated_at", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .addColumn("closed_at", "timestamptz")
    .execute();

  await db.schema
    .createTable("stream_events")
    .addColumn("id", "text", (col) => col.primaryKey())
    .addColumn("run_id", "text", (col) => col.notNull())
    .addColumn("seq", "integer", (col) => col.notNull())
    .addColumn("timestamp", "timestamptz", (col) =>
      col.notNull().defaultTo(sql`now()`),
    )
    .addColumn("event", "jsonb", (col) => col.notNull())
    .execute();

  await db.schema
    .createIndex("stream_events_run_id_idx")
    .on("stream_events")
    .column("run_id")
    .execute();

  await db.schema
    .createIndex("stream_events_run_id_seq_unique")
    .on("stream_events")
    .columns(["run_id", "seq"])
    .unique()
    .execute();
}

export async function down(db: Kysely<any>): Promise<void> {
  await db.schema.dropTable("stream_events").execute();
  await db.schema.dropTable("streams").execute();
  await db.schema.dropTable("agent_runs").execute();
  await db.schema.dropTable("messages").execute();
  await db.schema.dropTable("threads").execute();
}

For MySQL or SQLite, keep the same table names, column names, nullability, and constraints, and translate jsonb and timestamptz to your dialect's JSON and datetime types.

If you need different physical table names or a different schema layout, use a custom Storage implementation instead of kyselyStorage.

Options

OptionDescription
dbExisting Kysely database.
dialectpostgres, mysql, or sqlite.

Notes

Use Kysely when your app already owns SQL migrations and database types.

See Storage for what Better Agent stores.

Powered by Farming Labs ORM.