Skip to content

parallel task survey

wt_better edited this page Dec 21, 2023 · 2 revisions

目前 Seata Saga 编排使用的方案为:由 JSON DSL 声明状态机定义,由StateMachineParser进行解析得到StateMachine。 JSON 声明的编排方案虽然可移植性高、有利于与业务代码解耦,但也存在以下缺点:

  1. 学习成本高:声明性编排由于配置属性较多,通常要较长时间的学习或文档查询。
  2. 灵活性有限:声明式编排在定义结构时仍然不如代码灵活,无法满足特定需求。
  3. 速度稍慢:由于声明性语言是解释执行的,处理大文件时有一定性能损失。如果开发者使用辅助设计工具生成的 JSON 文件,则框架还需要将 Designer JSON 转换为标准 JSON 再进行解释。

为解决以上问题,可考虑采用流式编程进行 Saga 编排作为 JSON 解释的互补方案。

设计目标

在 Seata Saga 中实现 Java 流式编排的功能需要注意以下几个关键结果:

  1. [KR1] 代码复用:新的 Stream API 需要兼容已有代码,如statelang.domain包下的状态机组件。
  2. [KR2] 异常处理:不同于 JSON 解释在框架内实现,有解释代码稳定的特点;而流式编排代码交给开发者用户实现,有编排代码灵活的特点,故要仔细地处理异常以避免未定义的数据流中断。
  3. [KR3] 功能覆盖:检查statelang.parser.impl包的实现,覆盖基于 JSON 解释编排的已有功能。
  4. [KR4] 单测覆盖:参照seata-test中saga.engine基于解释的单测,构建详尽的流式编排单测。

相关工作

对于 Saga 编排的流式编程方案已有较多可参考的解决方案,比如 Apache Camel,Eventuate Tram Saga等。这里提供两者的编排样例以供参考: Saga - Apache Camel Saga EIP 提供了一种在 Camel 路由中定义一系列相关操作的方法,这些操作应该全部完成或补偿。假设用户想下一个新订单,系统中存在两种服务:订单管理、信用管理。用户如果有足够的信用就可以下订单。

// 将 direct:buy 路由建模为由两个不同操作组成的 Saga 模式。
// 两个动作都必须执行,或者一个都不执行。
from("direct:buy")
 .saga()
   .to("direct:newOrder")
   .to("direct:reserveCredit");

// direct:newOrder 路由声明了一个 direct:cancelOrder 的补偿操作,
// 负责在 Saga 被取消的情况下撤消订单。
from("direct:newOrder")
 .saga()
 .propagation(SagaPropagation.MANDATORY)
 .compensation("direct:cancelOrder")
   .bean(orderManagerService, "newOrder")
   .log("Order ${body} created");

// 定义 direct:newOrder 的补偿动作为 direct:cancelOrder
from("direct:cancelOrder")
 .bean(orderManagerService, "cancelOrder")
 .log("Order ${body} cancelled");

Eventuate Tram Saga Eventuate Tram Saga 也使用流式编程的方式定义编排。

private SagaDefinition<CreateOrderSagaData> sagaDefinition =
      step()
        .withCompensation(this::reject)
      .step()
        .invokeParticipant(this::reserveCredit)
      .step()
        .invokeParticipant(this::approve)
      .build();

除了可以参考开源的 Saga 编排实现,由于 Seata Saga 的长事务视作为状态机,所以也可以参考一些状态机流式编排的实现,比如 Spring Statemachine。 Spring Statemachine 例如定义如下状态机1,其演示并行任务处理并添加了错误处理以自动或手动修复任务问题,然后继续返回到可以再次运行任务的状态: 使用 spring-statemachine 可以将其状态定义如下:

@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
    throws Exception {
  states
    .withStates()
      .initial(States.READY)
      .fork(States.FORK)
      .state(States.TASKS)
      .join(States.JOIN)
      .choice(States.CHOICE)
      .state(States.ERROR)
      .and()
      .withStates()
        .parent(States.TASKS)
        .initial(States.T1)
        .end(States.T1E)
        .and()
        // ... 篇幅有限
}

初步方案

基于上述的相关工作分析,实现类似于 Apache Camel 或 Eventuate Tram Saga 的流式编排对于 Seata 侵入式改动较大,可能需要整体架构上的调整。成本最小的方案是对状态机流式编排,可参照 Spring Statemachine 的用户 API。 期望达成的初步效果如下,用户通过状态机构造器StateMachineBuilder进行类似以下的方式进行流式编排即可:

// 新增 StateMachineBuilderFactory 组件用于获取一个状态机 builder
StateMachineBuilder builder = stateMachineEngine.getStateMachineConfig().getStateMachineBuilderFactory().create();

// 流式编排状态机
StateMachine simpleTestStateMachine = builder
    .name("simpleTestStateMachine")
    .comment("测试状态机定义")
    .version("0.0.2")
    .initial("FirstState")
    .withStates()  // 返回一个 StatesConfigurer
      .state<ServiceTaskStateBuilder>()  // 返回一个 ServiceTaskStateBuilder
        .name("FirstState")
        .serviceName("demoService")
        .serviceMethod("foo")
        .next("SuccessState")
        .and()  // 返回当前 StatesConfigurer
      .state<SucceedStateBuilder>()  // 返回一个 SucceedStateBuilder
        .name("SuccessState")
        .and()
      .configure()  // 返回当前 StateMachineBuilder
    .build();  // 构建出一个 StateMachine
        
// 流式编排后构建状态机定义后,存储到状态机仓库中
stateMachineEngine.getStateMachineConfig().getStateMachineRepository()
    .registryStateMachine(simpleTestStateMachine);

为实现以上效果,可将流式编排相关组件设计如下 (宏观架构设计,还需补全细节): builder.svg

Clone this wiki locally