diff --git a/packages/backend/server/src/__tests__/mocks/eventbus.mock.ts b/packages/backend/server/src/__tests__/mocks/eventbus.mock.ts index c9f887b22f..9a09b59fc5 100644 --- a/packages/backend/server/src/__tests__/mocks/eventbus.mock.ts +++ b/packages/backend/server/src/__tests__/mocks/eventbus.mock.ts @@ -8,6 +8,7 @@ export class MockEventBus { emit = this.stub.emitAsync; emitAsync = this.stub.emitAsync; + emitDetached = this.stub.emitAsync; broadcast = this.stub.broadcast; last( diff --git a/packages/backend/server/src/base/event/eventbus.ts b/packages/backend/server/src/base/event/eventbus.ts index 7d8c46ff61..cb90ba3ad5 100644 --- a/packages/backend/server/src/base/event/eventbus.ts +++ b/packages/backend/server/src/base/event/eventbus.ts @@ -88,12 +88,21 @@ export class EventBus emit(event: T, payload: Events[T]) { this.logger.debug(`Dispatch event: ${event}`); - // NOTE(@forehalo): - // Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick. - // In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting. - // So we catch it here with `emitAsync` - this.emitter.emitAsync(event, payload).catch(e => { - this.emitter.emit('error', { event, payload, error: e }); + this.dispatchAsync(event, payload); + + return true; + } + + /** + * Emit event in detached cls context to avoid inheriting current transaction. + */ + emitDetached(event: T, payload: Events[T]) { + this.logger.debug(`Dispatch event: ${event} (detached)`); + + const requestId = this.cls.getId(); + this.cls.run({ ifNested: 'override' }, () => { + this.cls.set(CLS_ID, requestId ?? genRequestId('event')); + this.dispatchAsync(event, payload); }); return true; @@ -166,6 +175,16 @@ export class EventBus return this.emitter.waitFor(name, timeout); } + private dispatchAsync(event: T, payload: Events[T]) { + // NOTE: + // Because all event handlers are wrapped in promisified metrics and cls context, they will always run in standalone tick. + // In which way, if handler throws, an unhandled rejection will be triggered and end up with process exiting. + // So we catch it here with `emitAsync` + this.emitter.emitAsync(event, payload).catch(e => { + this.emitter.emit('error', { event, payload, error: e }); + }); + } + private readonly bindEventHandlers = once(() => { this.scanner.scan().forEach(({ event, handler, opts }) => { this.on(event, handler, opts); diff --git a/packages/backend/server/src/core/doc/__tests__/event.spec.ts b/packages/backend/server/src/core/doc/__tests__/event.spec.ts index d1483b90c1..eea4402b55 100644 --- a/packages/backend/server/src/core/doc/__tests__/event.spec.ts +++ b/packages/backend/server/src/core/doc/__tests__/event.spec.ts @@ -68,7 +68,7 @@ test('should update doc content to database when doc is updated', async t => { const docId = randomUUID(); await adapter.pushDocUpdates(workspace.id, docId, updates); - await adapter.getDoc(workspace.id, docId); + await adapter.getDocBinNative(workspace.id, docId); mock.method(docReader, 'parseDocContent', () => { return { @@ -181,3 +181,22 @@ test('should ignore update workspace content to database when parse workspace co t.is(content!.name, null); t.is(content!.avatarKey, null); }); + +test('should ignore stale workspace when updating doc meta from snapshot event', async t => { + const { docReader, listener, models } = t.context; + const docId = randomUUID(); + mock.method(docReader, 'parseDocContent', () => ({ + title: 'test title', + summary: 'test summary', + })); + + await models.workspace.delete(workspace.id); + + await t.notThrowsAsync(async () => { + await listener.markDocContentCacheStale({ + workspaceId: workspace.id, + docId, + blob: Buffer.from([0x01]), + }); + }); +}); diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index 6e8ce11af9..ae00fb5986 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -110,7 +110,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { }); if (isNewDoc) { - this.event.emit('doc.created', { + this.event.emitDetached('doc.created', { workspaceId, docId, editor: editorId, @@ -334,7 +334,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { }); if (updatedSnapshot) { - this.event.emit('doc.snapshot.updated', { + this.event.emitDetached('doc.snapshot.updated', { workspaceId: snapshot.spaceId, docId: snapshot.docId, blob, diff --git a/packages/backend/server/src/core/doc/event.ts b/packages/backend/server/src/core/doc/event.ts index a9982fa954..c3d42c068d 100644 --- a/packages/backend/server/src/core/doc/event.ts +++ b/packages/backend/server/src/core/doc/event.ts @@ -1,12 +1,29 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; import { OnEvent } from '../../base'; import { Models } from '../../models'; import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; import { DocReader } from './reader'; +const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']); + +function isIgnorableDocEventError(error: unknown) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + return IGNORED_PRISMA_CODES.has(error.code); + } + if (error instanceof Prisma.PrismaClientUnknownRequestError) { + return /transaction is aborted|transaction already closed/i.test( + error.message + ); + } + return false; +} + @Injectable() export class DocEventsListener { + private readonly logger = new Logger(DocEventsListener.name); + constructor( private readonly docReader: DocReader, private readonly models: Models, @@ -20,21 +37,39 @@ export class DocEventsListener { blob, }: Events['doc.snapshot.updated']) { await this.docReader.markDocContentCacheStale(workspaceId, docId); + const workspace = await this.models.workspace.get(workspaceId); + if (!workspace) { + this.logger.warn( + `Skip stale doc snapshot event for missing workspace ${workspaceId}/${docId}` + ); + return; + } const isDoc = workspaceId !== docId; // update doc content to database - if (isDoc) { - const content = this.docReader.parseDocContent(blob); - if (!content) { + try { + if (isDoc) { + const content = this.docReader.parseDocContent(blob); + if (!content) { + return; + } + await this.models.doc.upsertMeta(workspaceId, docId, content); + } else { + // update workspace content to database + const content = this.docReader.parseWorkspaceContent(blob); + if (!content) { + return; + } + await this.models.workspace.update(workspaceId, content); + } + } catch (error) { + if (isIgnorableDocEventError(error)) { + const message = error instanceof Error ? error.message : String(error); + this.logger.warn( + `Ignore stale doc snapshot event for ${workspaceId}/${docId}: ${message}` + ); return; } - await this.models.doc.upsertMeta(workspaceId, docId, content); - } else { - // update workspace content to database - const content = this.docReader.parseWorkspaceContent(blob); - if (!content) { - return; - } - await this.models.workspace.update(workspaceId, content); + throw error; } } diff --git a/packages/backend/server/src/core/permission/__tests__/event.spec.ts b/packages/backend/server/src/core/permission/__tests__/event.spec.ts new file mode 100644 index 0000000000..21803c0532 --- /dev/null +++ b/packages/backend/server/src/core/permission/__tests__/event.spec.ts @@ -0,0 +1,77 @@ +import { randomUUID } from 'node:crypto'; + +import ava, { TestFn } from 'ava'; + +import { + createTestingModule, + type TestingModule, +} from '../../../__tests__/utils'; +import { DocRole, Models, User, Workspace } from '../../../models'; +import { EventsListener } from '../event'; +import { PermissionModule } from '../index'; + +interface Context { + module: TestingModule; + models: Models; + listener: EventsListener; +} + +const test = ava as TestFn; + +let owner: User; +let workspace: Workspace; + +test.before(async t => { + const module = await createTestingModule({ imports: [PermissionModule] }); + t.context.module = module; + t.context.models = module.get(Models); + t.context.listener = module.get(EventsListener); +}); + +test.beforeEach(async t => { + await t.context.module.initTestingDB(); + owner = await t.context.models.user.create({ + email: `${randomUUID()}@affine.pro`, + }); + workspace = await t.context.models.workspace.create(owner.id); +}); + +test.after.always(async t => { + await t.context.module.close(); +}); + +test('should ignore default owner event when workspace does not exist', async t => { + await t.notThrowsAsync(async () => { + await t.context.listener.setDefaultPageOwner({ + workspaceId: randomUUID(), + docId: randomUUID(), + editor: owner.id, + }); + }); +}); + +test('should ignore default owner event when editor does not exist', async t => { + await t.notThrowsAsync(async () => { + await t.context.listener.setDefaultPageOwner({ + workspaceId: workspace.id, + docId: randomUUID(), + editor: randomUUID(), + }); + }); +}); + +test('should set owner when workspace and editor exist', async t => { + const docId = randomUUID(); + await t.context.listener.setDefaultPageOwner({ + workspaceId: workspace.id, + docId, + editor: owner.id, + }); + + const role = await t.context.models.docUser.get( + workspace.id, + docId, + owner.id + ); + t.is(role?.type, DocRole.Owner); +}); diff --git a/packages/backend/server/src/core/permission/event.ts b/packages/backend/server/src/core/permission/event.ts index 4e6651a145..442fd6877a 100644 --- a/packages/backend/server/src/core/permission/event.ts +++ b/packages/backend/server/src/core/permission/event.ts @@ -1,10 +1,27 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; import { OnEvent } from '../../base'; import { Models } from '../../models'; +const IGNORED_PRISMA_CODES = new Set(['P2003', 'P2025', 'P2028']); + +function isIgnorablePermissionEventError(error: unknown) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + return IGNORED_PRISMA_CODES.has(error.code); + } + if (error instanceof Prisma.PrismaClientUnknownRequestError) { + return /transaction is aborted|transaction already closed/i.test( + error.message + ); + } + return false; +} + @Injectable() export class EventsListener { + private readonly logger = new Logger(EventsListener.name); + constructor(private readonly models: Models) {} @OnEvent('doc.created') @@ -15,6 +32,33 @@ export class EventsListener { return; } - await this.models.docUser.setOwner(workspaceId, docId, editor); + const workspace = await this.models.workspace.get(workspaceId); + if (!workspace) { + this.logger.warn( + `Skip default doc owner event for missing workspace ${workspaceId}/${docId}` + ); + return; + } + + const user = await this.models.user.get(editor); + if (!user) { + this.logger.warn( + `Skip default doc owner event for missing editor ${workspaceId}/${docId}/${editor}` + ); + return; + } + + try { + await this.models.docUser.setOwner(workspaceId, docId, editor); + } catch (error) { + if (isIgnorablePermissionEventError(error)) { + const message = error instanceof Error ? error.message : String(error); + this.logger.warn( + `Ignore stale doc owner event for ${workspaceId}/${docId}/${editor}: ${message}` + ); + return; + } + throw error; + } } } diff --git a/packages/backend/server/src/models/doc-user.ts b/packages/backend/server/src/models/doc-user.ts index d8a71e3bda..f189fd5d01 100644 --- a/packages/backend/server/src/models/doc-user.ts +++ b/packages/backend/server/src/models/doc-user.ts @@ -2,6 +2,7 @@ import assert from 'node:assert'; import { Injectable } from '@nestjs/common'; import { Transactional } from '@nestjs-cls/transactional'; +import type { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma'; import { WorkspaceDocUserRole } from '@prisma/client'; import { CanNotBatchGrantDocOwnerPermissions, PaginationInput } from '../base'; @@ -14,31 +15,20 @@ export class DocUserModel extends BaseModel { * Set or update the [Owner] of a doc. * The old [Owner] will be changed to [Manager] if there is already an [Owner]. */ - @Transactional() + @Transactional({ timeout: 15000 }) async setOwner(workspaceId: string, docId: string, userId: string) { - const oldOwner = await this.db.workspaceDocUserRole.findFirst({ + await this.db.workspaceDocUserRole.updateMany({ where: { workspaceId, docId, type: DocRole.Owner, + userId: { not: userId }, + }, + data: { + type: DocRole.Manager, }, }); - if (oldOwner) { - await this.db.workspaceDocUserRole.update({ - where: { - workspaceId_docId_userId: { - workspaceId, - docId, - userId: oldOwner.userId, - }, - }, - data: { - type: DocRole.Manager, - }, - }); - } - await this.db.workspaceDocUserRole.upsert({ where: { workspaceId_docId_userId: { @@ -57,16 +47,9 @@ export class DocUserModel extends BaseModel { type: DocRole.Owner, }, }); - - if (oldOwner) { - this.logger.log( - `Transfer doc owner of [${workspaceId}/${docId}] from [${oldOwner.userId}] to [${userId}]` - ); - } else { - this.logger.log( - `Set doc owner of [${workspaceId}/${docId}] to [${userId}]` - ); - } + this.logger.log( + `Set doc owner of [${workspaceId}/${docId}] to [${userId}]` + ); } /**