Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24,997 changes: 17,573 additions & 7,424 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions webapps/console/lib/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ export const ApiKey = z.object({
type: z.string().nullish(),
name: z.string().nullish(),
expiresAt: z.coerce.date().nullish(),
// When set, this row is an MCP-issued refresh token. Its presence is the
// single source of truth for "MCP-ness" (we don't set type="mcp").
// mcpClientName carries the registered client_name for display on /user.
mcpClientName: z.string().nullish(),
});
export type ApiKey = z.infer<typeof ApiKey>;

Expand Down
12 changes: 12 additions & 0 deletions webapps/console/lib/server/kv/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getSingleton } from "juava";
import { db } from "../db";
import { PgKvStore } from "./postgres";
import type { KvStore } from "./types";

export type { KvStore, SetOpts } from "./types";
export { PgKvStore } from "./postgres";

// Console-wide KV singleton. Backed by Postgres (the `Kv` Prisma model
// → `newjitsu.kv`). Follows the same accessor-function pattern as
// `db.prisma()` — call `consoleKv()` to get the store instance.
export const consoleKv = getSingleton<KvStore>("console-kv", () => new PgKvStore(db.prisma()));
118 changes: 118 additions & 0 deletions webapps/console/lib/server/kv/postgres.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { Prisma, type PrismaClient } from "@prisma/client";
import { getErrorMessage } from "juava";
import { getServerLog } from "../log";
import type { KvStore, SetOpts } from "./types";

const log = getServerLog("kv");

// Schema is defined by the Prisma `KvStoreEntry` model and created via
// `pnpm db:update-schema`. No runtime bootstrap.

function expireDate(ttlMs?: number): Date | null {
if (!ttlMs || ttlMs <= 0) return null;
return new Date(Date.now() + ttlMs);
}

// Probabilistic GC: roughly 5% of mutating ops fire-and-forget a bulk
// delete of expired rows. Keeps the table from growing unboundedly without
// needing a cron. The "lazy on read" filter still hides expired rows in
// the meantime.
function maybeGc(prisma: PrismaClient) {
if (Math.random() < 0.05) {
prisma.kvStoreEntry.deleteMany({ where: { expiresAt: { lte: new Date() } } }).catch(e => {
log.atWarn().log(`KV GC sweep failed: ${getErrorMessage(e)}`);
});
}
}

export class PgKvStore implements KvStore {
constructor(private readonly prisma: PrismaClient) {}

async get<T = unknown>(key: string): Promise<T | undefined> {
maybeGc(this.prisma);
const row = await this.prisma.kvStoreEntry.findUnique({ where: { key } });
if (!row) return undefined;
if (row.expiresAt && row.expiresAt.getTime() <= Date.now()) {
// Expired-on-read: drop it. Don't block the response on the delete.
this.prisma.kvStoreEntry.deleteMany({ where: { key } }).catch(() => undefined);
return undefined;
}
return row.value as T;
}

async set(key: string, value: unknown, opts: SetOpts = {}): Promise<boolean> {
maybeGc(this.prisma);
const expiresAt = expireDate(opts.ttlMs);
if (opts.ifNotExists) {
// Atomicity comes from the unique constraint on `key`: a concurrent
// create from another caller fails with P2002, so at most one
// ifNotExists call wins for a given key.
//
// Caveat: an existing-but-expired row blocks the create (returns false).
// For the current callers (OAuth codes — keys are random 48-char ids),
// this is fine; the keyspace is large enough that collisions never
// happen in practice. Callers that want overwrite-if-expired semantics
// should `del` first.
try {
await this.prisma.kvStoreEntry.create({
data: { key, value: value as never, expiresAt },
});
return true;
} catch (e) {
if (e instanceof Prisma.PrismaClientKnownRequestError && e.code === "P2002") {
return false;
}
throw e;
}
}
await this.prisma.kvStoreEntry.upsert({
where: { key },
create: { key, value: value as never, expiresAt },
update: { value: value as never, expiresAt },
});
return true;
}

async del(key: string): Promise<boolean> {
const r = await this.prisma.kvStoreEntry.deleteMany({ where: { key } });
return r.count > 0;
}

async getDel<T = unknown>(key: string): Promise<T | undefined> {
// Prisma's delete() compiles to a single DELETE ... RETURNING — atomic.
// Two concurrent getDel calls on the same key: at most one sees a row.
// This is the property OAuth code consumption relies on.
try {
const row = await this.prisma.kvStoreEntry.delete({ where: { key } });
if (row.expiresAt && row.expiresAt.getTime() <= Date.now()) {
// Deleted an already-expired row — treat as absent.
return undefined;
}
return row.value as T;
} catch (e) {
// P2025 = record not found. Anyone else racing us already consumed it
// (or it expired). Either way: nothing for us to return.
if (e instanceof Prisma.PrismaClientKnownRequestError && e.code === "P2025") {
return undefined;
}
throw e;
}
}

async scanByPrefix<T = unknown>(
prefix: string,
opts: { limit?: number } = {}
): Promise<Array<{ key: string; value: T }>> {
maybeGc(this.prisma);
const limit = Math.min(opts.limit ?? 1000, 1000);
const rows = await this.prisma.kvStoreEntry.findMany({
where: {
key: { startsWith: prefix },
OR: [{ expiresAt: null }, { expiresAt: { gt: new Date() } }],
},
orderBy: { key: "asc" },
take: limit,
});
return rows.map(r => ({ key: r.key, value: r.value as T }));
}
}
60 changes: 60 additions & 0 deletions webapps/console/lib/server/kv/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Flat key-value store, Redis-shaped. One global namespace; callers separate
* concerns with key prefixes (`oauth:code:abc`, `mcp:event:stream-1:...`).
*
* Design notes:
* - No "table" abstraction. Prefix conventions are simpler and let callers
* mix flat keys with prefix scans freely.
* - Single-key ops are atomic by virtue of running as one SQL statement.
* `getDel` and `ifNotExists` are first-class so callers don't have to
* synthesize them out of non-atomic primitives.
* - Values are JSON. Pick a stable encodable type — Date → ISO string is
* fine, but custom classes need toJSON/serialization.
*/

export type SetOpts = {
/** Time-to-live in milliseconds. Omit (or 0) for no expiration. */
ttlMs?: number;
/**
* Only store if the key doesn't already exist (or only an expired row
* exists). Atomic. The return value of `set` tells you whether the write
* happened. The primitive for distributed-lock-style flows.
*/
ifNotExists?: boolean;
};

export interface KvStore {
/** Read a key. Returns undefined if absent or expired. */
get<T = unknown>(key: string): Promise<T | undefined>;

/**
* Write a key. Returns true iff the value was actually written; this is
* always true unless `ifNotExists` was set and the key was present.
*/
set(key: string, value: unknown, opts?: SetOpts): Promise<boolean>;

/** Remove a key. Returns true iff a key was present. */
del(key: string): Promise<boolean>;

/**
* Atomic get-and-delete. Returns the value (or undefined) and removes the
* key in one statement. This is the keystone primitive for one-shot
* credentials — OAuth authorization codes, password reset tokens,
* single-use links — where get-then-delete races would let two callers
* consume the same token.
*/
getDel<T = unknown>(key: string): Promise<T | undefined>;

/**
* Lexicographic scan of keys with the given prefix. Returns matches
* sorted ascending by key. Useful when keys carry their own ordering
* (e.g., time-sortable IDs in an event log).
*
* Implementations MAY enforce a hard cap on result size; callers should
* pass an explicit `limit` when the prefix could match a lot of keys.
*/
scanByPrefix<T = unknown>(
prefix: string,
opts?: { limit?: number }
): Promise<Array<{ key: string; value: T }>>;
}
79 changes: 79 additions & 0 deletions webapps/console/lib/server/mcp-server/auth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import type { NextApiRequest, NextApiResponse } from "next";
import type { PrismaClient } from "@prisma/client";
import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
import { checkHash } from "juava";
import { getServerLog } from "../log";
import { getPublicOrigin } from "../origin";

const log = getServerLog("mcp-auth");

function bearer(req: NextApiRequest): string | undefined {
const h = req.headers.authorization;
if (!h) return undefined;
const [scheme, token] = h.split(" ", 2);
return scheme?.toLowerCase() === "bearer" && token ? token : undefined;
}

// Mints a 401 response that points the client at our OAuth metadata
// (per MCP / RFC 9728). Without this header, MCP clients won't know how
// to start the OAuth flow.
function send401(res: NextApiResponse, error: string) {
const base = getPublicOrigin();
res.setHeader(
"WWW-Authenticate",
`Bearer realm="jitsu-mcp", error="${error}", resource_metadata="${base}/.well-known/oauth-protected-resource"`
);
res.status(401).json({ error });
}

export class AuthChecker {
constructor(private readonly prisma: PrismaClient) {}

async requireAccessToken(req: NextApiRequest, res: NextApiResponse): Promise<AuthInfo | undefined> {
const raw = bearer(req);
if (!raw) {
send401(res, "missing_token");
return undefined;
}
const [tokenId, secret] = raw.split(":");
if (!tokenId || !secret) {
send401(res, "invalid_token");
return undefined;
}
const at = await this.prisma.oAuthAccessToken.findUnique({
where: { id: tokenId },
include: { refreshToken: { include: { user: true, oauthClient: true } } },
});
if (!at) {
send401(res, "invalid_token");
return undefined;
}
if (!checkHash(at.hash, secret)) {
send401(res, "invalid_token");
return undefined;
}
if (at.expiresAt.getTime() < Date.now()) {
send401(res, "expired_token");
return undefined;
}
// Fire-and-forget lastUsed bump — don't block the request on this write.
this.prisma.oAuthAccessToken
.update({ where: { id: at.id }, data: { lastUsed: new Date() } })
.catch(e => log.atWarn().withCause(e).log("Failed to bump OAuthAccessToken.lastUsed"));

const user = at.refreshToken.user;
const clientId = at.refreshToken.oauthClientId ?? "unknown";
return {
token: raw,
clientId,
scopes: [],
expiresAt: Math.floor(at.expiresAt.getTime() / 1000),
extra: {
userId: user.id,
email: user.email,
name: user.name,
clientName: at.refreshToken.oauthClient?.name,
},
};
}
}
49 changes: 49 additions & 0 deletions webapps/console/lib/server/mcp-server/clients.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { PrismaClient } from "@prisma/client";
import { checkHash, createHash, randomId } from "juava";

export type RegisteredClient = {
clientId: string;
clientSecret: string; // plaintext — returned only at registration time
name: string;
redirectUris: string[];
};

// Thin CRUD + secret hashing for OAuthClient rows. Used by OAuthHandlers.
// DI: takes the prisma client in the constructor; never reaches for db.prisma().
export class OAuthClientsRepo {
constructor(private readonly prisma: PrismaClient) {}

async register(name: string, redirectUris: string[]): Promise<RegisteredClient> {
if (!name?.trim()) throw new Error("client_name is required");
if (!redirectUris.length) throw new Error("redirect_uris must contain at least one URI");
for (const uri of redirectUris) {
try {
// Basic sanity check; consent-time check is the real security gate.
new URL(uri);
} catch {
throw new Error(`Invalid redirect_uri: ${uri}`);
}
}
const clientSecret = randomId(48);
const row = await this.prisma.oAuthClient.create({
data: {
clientSecretHash: createHash(clientSecret),
name: name.trim().slice(0, 200),
redirectUris,
},
});
return { clientId: row.id, clientSecret, name: row.name, redirectUris: row.redirectUris };
}

async findById(clientId: string) {
return this.prisma.oAuthClient.findUnique({ where: { id: clientId } });
}

// Returns the client if both id and secret match, otherwise undefined.
// Pure validation — no side effects.
async verifyCredentials(clientId: string, clientSecret: string) {
const client = await this.findById(clientId);
if (!client) return undefined;
return checkHash(client.clientSecretHash, clientSecret) ? client : undefined;
}
}
33 changes: 33 additions & 0 deletions webapps/console/lib/server/mcp-server/codes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { randomId } from "juava";
import type { KvStore } from "../kv";

const PREFIX = "oauth:code:";
const TTL_MS = 60 * 1000;

export type CodePayload = {
clientId: string;
userId: string;
redirectUri: string;
codeChallenge: string; // PKCE S256 challenge from the authorize request
codeChallengeMethod: "S256"; // we don't accept "plain"
createdAt: number;
};

// One-shot OAuth authorization codes. The KV's atomic `getDel` is what makes
// consumption safe: two concurrent /oauth/token requests for the same code
// will see at most one row, so a code can never be exchanged twice.
export class OAuthCodesRepo {
constructor(private readonly kv: KvStore) {}

async issueCode(payload: Omit<CodePayload, "createdAt">): Promise<string> {
const code = randomId(48);
await this.kv.set(PREFIX + code, { ...payload, createdAt: Date.now() } satisfies CodePayload, {
ttlMs: TTL_MS,
});
return code;
}

async consumeCode(code: string): Promise<CodePayload | undefined> {
return this.kv.getDel<CodePayload>(PREFIX + code);
}
}
Loading
Loading