fix(server): transaction error (#14518)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Events can be dispatched in a detached context to avoid inheriting the
current transaction.

* **Bug Fixes**
* Improved resilience and error handling for event processing (graceful
handling of deleted workspaces and ignorable DB errors).
  * More reliable owner assignment flow when changing document owners.

* **Tests**
  * Added tests for doc content staleness with deleted workspaces.
  * Added permission event tests for missing workspace/editor scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-02-26 19:53:22 +08:00
committed by GitHub
parent 5215c73166
commit 11cf1928b5
8 changed files with 228 additions and 50 deletions

View File

@@ -8,6 +8,7 @@ export class MockEventBus {
emit = this.stub.emitAsync;
emitAsync = this.stub.emitAsync;
emitDetached = this.stub.emitAsync;
broadcast = this.stub.broadcast;
last<Event extends EventName>(

View File

@@ -88,12 +88,21 @@ export class EventBus
emit<T extends EventName>(event: T, payload: Events[T]) {
this.logger.debug(`Dispatch event: ${event}`);
// NOTE(@forehalo):
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
// So we catch it here with `emitAsync`
this.emitter.emitAsync(event, payload).catch(e => {
this.emitter.emit('error', { event, payload, error: e });
this.dispatchAsync(event, payload);
return true;
}
/**
* Emit event in detached cls context to avoid inheriting current transaction.
*/
emitDetached<T extends EventName>(event: T, payload: Events[T]) {
this.logger.debug(`Dispatch event: ${event} (detached)`);
const requestId = this.cls.getId();
this.cls.run({ ifNested: 'override' }, () => {
this.cls.set(CLS_ID, requestId ?? genRequestId('event'));
this.dispatchAsync(event, payload);
});
return true;
@@ -166,6 +175,16 @@ export class EventBus
return this.emitter.waitFor(name, timeout);
}
private dispatchAsync<T extends EventName>(event: T, payload: Events[T]) {
// NOTE:
// Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick.
// In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting.
// So we catch it here with `emitAsync`
this.emitter.emitAsync(event, payload).catch(e => {
this.emitter.emit('error', { event, payload, error: e });
});
}
private readonly bindEventHandlers = once(() => {
this.scanner.scan().forEach(({ event, handler, opts }) => {
this.on(event, handler, opts);

View File

@@ -68,7 +68,7 @@ test('should update doc content to database when doc is updated', async t => {
const docId = randomUUID();
await adapter.pushDocUpdates(workspace.id, docId, updates);
await adapter.getDoc(workspace.id, docId);
await adapter.getDocBinNative(workspace.id, docId);
mock.method(docReader, 'parseDocContent', () => {
return {
@@ -181,3 +181,22 @@ test('should ignore update workspace content to database when parse workspace co
t.is(content!.name, null);
t.is(content!.avatarKey, null);
});
test('should ignore stale workspace when updating doc meta from snapshot event', async t => {
const { docReader, listener, models } = t.context;
const docId = randomUUID();
mock.method(docReader, 'parseDocContent', () => ({
title: 'test title',
summary: 'test summary',
}));
await models.workspace.delete(workspace.id);
await t.notThrowsAsync(async () => {
await listener.markDocContentCacheStale({
workspaceId: workspace.id,
docId,
blob: Buffer.from([0x01]),
});
});
});

View File

@@ -110,7 +110,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
});
if (isNewDoc) {
this.event.emit('doc.created', {
this.event.emitDetached('doc.created', {
workspaceId,
docId,
editor: editorId,
@@ -334,7 +334,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
});
if (updatedSnapshot) {
this.event.emit('doc.snapshot.updated', {
this.event.emitDetached('doc.snapshot.updated', {
workspaceId: snapshot.spaceId,
docId: snapshot.docId,
blob,

View File

@@ -1,12 +1,29 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { OnEvent } from '../../base';
import { Models } from '../../models';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
import { DocReader } from './reader';
const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']);
function isIgnorableDocEventError(error: unknown) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
return IGNORED_PRISMA_CODES.has(error.code);
}
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
return /transaction is aborted|transaction already closed/i.test(
error.message
);
}
return false;
}
@Injectable()
export class DocEventsListener {
private readonly logger = new Logger(DocEventsListener.name);
constructor(
private readonly docReader: DocReader,
private readonly models: Models,
@@ -20,21 +37,39 @@ export class DocEventsListener {
blob,
}: Events['doc.snapshot.updated']) {
await this.docReader.markDocContentCacheStale(workspaceId, docId);
const workspace = await this.models.workspace.get(workspaceId);
if (!workspace) {
this.logger.warn(
`Skip stale doc snapshot event for missing workspace ${workspaceId}/${docId}`
);
return;
}
const isDoc = workspaceId !== docId;
// update doc content to database
if (isDoc) {
const content = this.docReader.parseDocContent(blob);
if (!content) {
try {
if (isDoc) {
const content = this.docReader.parseDocContent(blob);
if (!content) {
return;
}
await this.models.doc.upsertMeta(workspaceId, docId, content);
} else {
// update workspace content to database
const content = this.docReader.parseWorkspaceContent(blob);
if (!content) {
return;
}
await this.models.workspace.update(workspaceId, content);
}
} catch (error) {
if (isIgnorableDocEventError(error)) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(
`Ignore stale doc snapshot event for ${workspaceId}/${docId}: ${message}`
);
return;
}
await this.models.doc.upsertMeta(workspaceId, docId, content);
} else {
// update workspace content to database
const content = this.docReader.parseWorkspaceContent(blob);
if (!content) {
return;
}
await this.models.workspace.update(workspaceId, content);
throw error;
}
}

View File

@@ -0,0 +1,77 @@
import { randomUUID } from 'node:crypto';
import ava, { TestFn } from 'ava';
import {
createTestingModule,
type TestingModule,
} from '../../../__tests__/utils';
import { DocRole, Models, User, Workspace } from '../../../models';
import { EventsListener } from '../event';
import { PermissionModule } from '../index';
interface Context {
module: TestingModule;
models: Models;
listener: EventsListener;
}
const test = ava as TestFn<Context>;
let owner: User;
let workspace: Workspace;
test.before(async t => {
const module = await createTestingModule({ imports: [PermissionModule] });
t.context.module = module;
t.context.models = module.get(Models);
t.context.listener = module.get(EventsListener);
});
test.beforeEach(async t => {
await t.context.module.initTestingDB();
owner = await t.context.models.user.create({
email: `${randomUUID()}@affine.pro`,
});
workspace = await t.context.models.workspace.create(owner.id);
});
test.after.always(async t => {
await t.context.module.close();
});
test('should ignore default owner event when workspace does not exist', async t => {
await t.notThrowsAsync(async () => {
await t.context.listener.setDefaultPageOwner({
workspaceId: randomUUID(),
docId: randomUUID(),
editor: owner.id,
});
});
});
test('should ignore default owner event when editor does not exist', async t => {
await t.notThrowsAsync(async () => {
await t.context.listener.setDefaultPageOwner({
workspaceId: workspace.id,
docId: randomUUID(),
editor: randomUUID(),
});
});
});
test('should set owner when workspace and editor exist', async t => {
const docId = randomUUID();
await t.context.listener.setDefaultPageOwner({
workspaceId: workspace.id,
docId,
editor: owner.id,
});
const role = await t.context.models.docUser.get(
workspace.id,
docId,
owner.id
);
t.is(role?.type, DocRole.Owner);
});

View File

@@ -1,10 +1,27 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { OnEvent } from '../../base';
import { Models } from '../../models';
const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']);
function isIgnorablePermissionEventError(error: unknown) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
return IGNORED_PRISMA_CODES.has(error.code);
}
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
return /transaction is aborted|transaction already closed/i.test(
error.message
);
}
return false;
}
@Injectable()
export class EventsListener {
private readonly logger = new Logger(EventsListener.name);
constructor(private readonly models: Models) {}
@OnEvent('doc.created')
@@ -15,6 +32,33 @@ export class EventsListener {
return;
}
await this.models.docUser.setOwner(workspaceId, docId, editor);
const workspace = await this.models.workspace.get(workspaceId);
if (!workspace) {
this.logger.warn(
`Skip default doc owner event for missing workspace ${workspaceId}/${docId}`
);
return;
}
const user = await this.models.user.get(editor);
if (!user) {
this.logger.warn(
`Skip default doc owner event for missing editor ${workspaceId}/${docId}/${editor}`
);
return;
}
try {
await this.models.docUser.setOwner(workspaceId, docId, editor);
} catch (error) {
if (isIgnorablePermissionEventError(error)) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(
`Ignore stale doc owner event for ${workspaceId}/${docId}/${editor}: ${message}`
);
return;
}
throw error;
}
}
}

View File

@@ -2,6 +2,7 @@ import assert from 'node:assert';
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import type { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
import { WorkspaceDocUserRole } from '@prisma/client';
import { CanNotBatchGrantDocOwnerPermissions, PaginationInput } from '../base';
@@ -14,31 +15,20 @@ export class DocUserModel extends BaseModel {
* Set or update the [Owner] of a doc.
* The old [Owner] will be changed to [Manager] if there is already an [Owner].
*/
@Transactional()
@Transactional<TransactionalAdapterPrisma>({ timeout: 15000 })
async setOwner(workspaceId: string, docId: string, userId: string) {
const oldOwner = await this.db.workspaceDocUserRole.findFirst({
await this.db.workspaceDocUserRole.updateMany({
where: {
workspaceId,
docId,
type: DocRole.Owner,
userId: { not: userId },
},
data: {
type: DocRole.Manager,
},
});
if (oldOwner) {
await this.db.workspaceDocUserRole.update({
where: {
workspaceId_docId_userId: {
workspaceId,
docId,
userId: oldOwner.userId,
},
},
data: {
type: DocRole.Manager,
},
});
}
await this.db.workspaceDocUserRole.upsert({
where: {
workspaceId_docId_userId: {
@@ -57,16 +47,9 @@ export class DocUserModel extends BaseModel {
type: DocRole.Owner,
},
});
if (oldOwner) {
this.logger.log(
`Transfer doc owner of [${workspaceId}/${docId}] from [${oldOwner.userId}] to [${userId}]`
);
} else {
this.logger.log(
`Set doc owner of [${workspaceId}/${docId}] to [${userId}]`
);
}
this.logger.log(
`Set doc owner of [${workspaceId}/${docId}] to [${userId}]`
);
}
/**