假设系统里有一个模块,用来管理化合物从 hit、lead 到 candidate 的推进。

药化团队合成了一批化合物。生物团队上传活性、选择性、ADME、早期毒理等实验结果。项目负责人看完某个化合物的数据包后,要做一个决策:

  • 推进为候选化合物。
  • 退回继续优化。
  • 暂停该方向。

提交这个决策时,系统通常要做这些事:

  • 写入一条候选化合物推进决策。
  • 改变化合物当前研发阶段。
  • 如果推进成功,创建候选药物档案。
  • 关闭当前评审责任。
  • 锁定本次使用的实验数据包版本。
  • 写入审计事件,说明阶段变化依据哪一次数据评审。

这些写入必须一起成功。不能出现决策已经提交,但化合物阶段没变, 或是候选药物档案已创建,但数据包没有锁定等问题。

最直接的实现,是在 Command Handler 里注入 Prisma,然后开事务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SubmitCompoundPromotionDecisionHandler {
constructor(private readonly prisma: PrismaClient) {}

async execute(command: SubmitCompoundPromotionDecision) {
return this.prisma.$transaction(async (tx) => {
const decision = await tx.compoundDecision.create({
data: {
compoundId: command.compoundId,
result: command.result,
reason: command.reason,
decidedBy: command.actorUserId,
},
});

await tx.compound.update({
where: { id: command.compoundId },
data: { stage: command.result === "promote" ? "candidate" : "lead" },
});

if (command.result === "promote") {
await tx.developmentCandidate.create({
data: { compoundId: command.compoundId },
});
}

await tx.reviewAssignment.update(...);
await tx.auditEvent.create(...);

return decision;
});
}
}

但它把几个层次混在了一起。

Command Handler 本来应该编排业务流程,但现在它编排了数据库表的更新顺序。

化合物阶段变化本来应该由 Compound 聚合判断。比如只有 lead_optimization 阶段的化合物才能推进到 candidate,已经暂停的化合物不能直接推进。现在这条规则可能散落在 tx.compound.update 前后的 if 判断里。

事务边界本来属于用例。现在它由具体 ORM 的 $transaction 暴露在 Command Handler 里。换 ORM、拆 outbox、改审计事件写法,Command Handler 都要动。


如果把这段逻辑挪到 repository 里:

1
compoundRepository.submitPromotionDecision(command);

这看起来把 Prisma 从 Command Handler 里拿掉了。但如果 compoundRepository 内部仍然创建决策、更新化合物、创建候选药物档案、关闭责任、写审计事件,那只是把混乱换了一个地方,因为 Repository 的核心职责应该只是持久化聚合,例如:

1
2
3
4
compoundRepository.findById(id);
compoundRepository.save(compound);
compoundDecisionRepository.save(decision);
developmentCandidateRepository.save(candidate);

如果一个 CompoundRepository 开始保存 CompoundDecisionDevelopmentCandidateReviewAssignmentAuditEvent,它就不再只是 Compound 的 repository。它变成了一个跨模块事务服务,只是名字还叫 repository。


所以这里我的解决方案之一是在 application 层定义一个 Unit of Work port,它表达的是一个具体业务动作:提交候选化合物推进决策。

1
2
3
4
5
6
7
8
9
export interface SubmitCompoundPromotionDecisionUnitOfWork {
submit(input: {
compoundId: string;
dataPackageId: string;
result: "promote" | "optimize" | "pause";
reason?: string;
actorUserId: string;
}): Promise<CompoundPromotionDecisionDto>;
}

Command Handler 依赖这个端口,而不是依赖 Prisma。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SubmitCompoundPromotionDecisionHandler {
constructor(
private readonly unitOfWork: SubmitCompoundPromotionDecisionUnitOfWork,
) {}

async execute(command: SubmitCompoundPromotionDecision) {
return this.unitOfWork.submit({
compoundId: command.compoundId,
dataPackageId: command.dataPackageId,
result: command.result,
reason: command.reason,
actorUserId: command.actorUserId,
});
}
}

这段代码没有事务细节。它只转交命令意图。

具体事务放在 infrastructure:

1
2
3
4
5
6
7
8
9
10
11
class PrismaSubmitCompoundPromotionDecisionUnitOfWork
implements SubmitCompoundPromotionDecisionUnitOfWork
{
constructor(private readonly prisma: PrismaClient) {}

async submit(input: SubmitCompoundPromotionDecisionInput) {
return this.prisma.$transaction(async (tx) => {
// 在这里组合 tx-bound store / repository
});
}
}

这样就把“业务事务边界”和“命令入口”分开,Command Handler 不需要知道用 Prisma 还是别的 ORM。它也不需要知道事务里要调用哪些表。它只知道这个命令由一个原子用例完成。

但聚合仍然要负责状态规则,Unit of Work 不能变成新的数据库脚本,在事务内部,仍然应该先读取聚合,让聚合执行状态变化,再保存聚合。

例如 Compound 聚合可以这样表达规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Compound {
promoteToCandidate(input: {
dataPackageId: string;
actorUserId: string;
}) {
if (this.stage !== "lead_optimization") {
throw new DomainError("只有先导优化阶段的化合物才能推进为候选化合物");
}

if (this.lockedDataPackageId !== null) {
throw new DomainError("已锁定数据包的化合物不能重复推进");
}

this.stage = "candidate";
this.lockedDataPackageId = input.dataPackageId;
this.updatedByUserId = input.actorUserId;
}

returnToOptimization(input: { reason: string; actorUserId: string }) {
if (this.stage !== "lead_optimization") {
throw new DomainError("只有评审中的化合物才能退回优化");
}

this.stage = "lead_optimization";
this.optimizationReason = input.reason;
this.updatedByUserId = input.actorUserId;
}
}

Unit of Work 在事务里调用这些方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
await prisma.$transaction(async (tx) => {
const compoundStore = new PrismaCompoundStore(tx);
const compound = await compoundStore.findById(input.compoundId);

if (input.result === "promote") {
compound.promoteToCandidate({
dataPackageId: input.dataPackageId,
actorUserId: input.actorUserId,
});
}

if (input.result === "optimize") {
compound.returnToOptimization({
reason: input.reason ?? "需要补充结构优化",
actorUserId: input.actorUserId,
});
}

await compoundStore.save(compound);
await decisionStore.save(decision);
await candidateStore.createIfPromoted(...);
await assignmentStore.closeCurrent(...);
await auditStore.append(...);
});

而为了让事务里的代码复用聚合持久化逻辑,我们需要一种 tx-bound repository 或 store。

最粗暴的写法是在领域 repository interface 里加一个可选事务参数:

1
2
3
4
interface CompoundRepository {
findById(id: string, tx?: Prisma.TransactionClient): Promise<Compound | null>;
save(compound: Compound, tx?: Prisma.TransactionClient): Promise<void>;
}

但它又把 Prisma 带进了领域层。就算把 Prisma.TransactionClient 包成 TransactionClient 类型别名,领域接口仍然在为基础设施妥协。

我更倾向于把 tx-bound 能力留在 infrastructure,领域层只定义干净的 repository port:

1
2
3
4
export interface CompoundRepository {
findById(id: string): Promise<Compound | null>;
save(compound: Compound): Promise<void>;
}

infrastructure 层拆一个 store,接收最小持久化客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type CompoundStoreClient = {
compound: {
findUnique(args: unknown): Promise<unknown>;
update(args: unknown): Promise<unknown>;
};
};

class PrismaCompoundStore implements CompoundRepository {
constructor(private readonly client: CompoundStoreClient) {}

async findById(id: string) {
const record = await this.client.compound.findUnique(...);
return Compound.restore(record);
}

async save(compound: Compound) {
const snapshot = compound.toSnapshot();
await this.client.compound.update(...);
}
}

普通 repository 可以继承这个 store,传完整 Prisma client:

1
2
3
4
5
class PrismaCompoundRepository extends PrismaCompoundStore {
constructor(prisma: PrismaClient) {
super(prisma);
}
}

Unit of Work 在事务里传 tx:

1
2
3
4
5
6
await prisma.$transaction(async (tx) => {
const compoundStore = new PrismaCompoundStore(tx);
const compound = await compoundStore.findById(input.compoundId);
// ...
await compoundStore.save(compound);
});