常见SEO国内比较好的云服务器高德打车通用可编排订单状态机引擎设计
广东服务器品牌云服务器
一 背景
订单状态流转是交易系统的最为核心的工作,订单系统往往都会存在状态多、链路长、逻辑复杂的特点,还存在多场景、多类型、多业务维度等业务特性。在保证订单状态流转稳定性的前提下、可扩展性和可维护性是我们需要重点关注和解决的问题。
以高德打车业务的订单状态为例,订单状态就有乘客下单、司机接单、司机已到达乘车点、开始行程、行程结束、确认费用、支付成功、订单取消、订单关闭等;订单车型有专车、快车、出租车等几种车型,而专车又分舒适型、豪华型、商务型等;业务场景接送机、企业用车、城际拼车等等场景。
当订单状态、类型、场景、以及其他一些维度组合时,每一种组合都可能会有不同的处理逻辑、也可能会存在共性的业务逻辑,这种情况下代码中各种if-else肯定是不敢想象的。怎么处理这种"多状态+多类型+多场景+多维度"的复杂订单状态流转业务,又要保证整个系统的可扩展性和可维护性,本文的解决思路和方案同大家一起探讨。
二 实现方案
要解决"多状态+多类型+多场景+多维度"的复杂订单状态流转业务,我们从纵向和横向两个维度进行设计。纵向主要从业务隔离和流程编排的角度出发解决问题、而横向主要从逻辑复用和业务扩展的角度解决问题。
1 纵向解决业务隔离和流程编排
状态模式的应用
通常我们处理一个多状态或者多维度的业务逻辑,都会采用状态模式或者策略模式来解决,我们这里不讨论两种设计模式的异同,其核心其实可以概括为一个词"分而治之",抽象一个基础逻辑接口、每一个状态或者类型都实现该接口,业务处理时根据不同的状态或者类型调用对应的业务实现,以到达逻辑相互独立互不干扰、代码隔离的目的。
这不仅仅是从可扩展性和可维护性的角度出发,其实我们做架构做稳定性、隔离是一种减少影响面的基本手段,类似的隔离环境做灰度、分批发布等,这里不做扩展。
/** * 状态机处理器接口 */publicinterfaceStateProcessor{/** * 执行状态迁移的入口 */voidaction(StateContext context) throws Exception; }/** * 状态A对应的状态处理器 */publicclassStateAProcessorinterfaceStateProcessor {/** * 执行状态迁移的入口 */@Overridepublicvoidaction(StateContext context) throws Exception { } }单一状态或类型可以通过上面的方法解决,那么"多状态+多类型+多场景+多维度"这种组合业务呢,当然也可以采用这种模式或思路来解决。首先在开发阶段通过一个注解@OrderPorcessor将不同的维度予以组合、开发出多个对应的具体实现类,在系统运行阶段,通过判断上下文来动态选择具体使用哪一个实现类执行。@OrderPorcessor中分别定义state代表当前处理器要处理的状态,bizCode和sceneId分别代表业务类型和场景,这两个字段留给业务进行扩展,比如可以用bizCode代表产品或订单类型、sceneId代表业务形态或来源场景等等,如果要扩展多个维度的组合、也可以用多个维度拼接后的字符串赋值到bizCode和sceneId上。
受限于Java枚举不能继承的规范,如果要开发通用的功能、注解中就不能使用枚举、所以此处只好使用String。
/** * 状态机引擎的处理器注解标识 */@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Componentpublic@interfaceOrderProcessor {/** * 指定状态,state不能同时存在 */String[]state()default{};/** * 业务 */String[]bizCode()default{};/** * 场景 */String[]sceneId()default{}; }/** * 创建订单状态对应的状态处理器 */@OrderProcessor(state ="INIT", bizCode = {"CHEAP","POPULAR"}, sceneId ="H5")publicclassStateCreateProcessorinterfaceStateProcessor { }再想一下,因为涉及到状态流转,不可能会是一个状态A只能流转到状态B、状态A可能在不同的场景下流转到状态B、状态C、状态D;还有虽然都是由状态A流转到状态B、但是不同的场景处理流程也可能不一样,比如都是将订单从从待支付状态进行支付、用户主动发起支付和系统免密支付的流程可能就不一样。针对上面这两种情况、我们把这里的"场景"统一封装为"事件(event)",以"事件驱动"的方式来控制状态的流向,一个状态遇到一个特定的处理事件来决定该状态的业务处理流程和最终状态流向。我们可以总结下,其实状态机模式简单说就是:基于某些特定业务和场景下,根据源状态和发生的事件,来执行下一步的流程处理逻辑,并设置一个目标状态。
这里有人可能有一些疑问,这个"事件"和上面说的"多场景"、"多维度"有什么不一样。解释一下,我们这里说的是"事件"是一个具体的业务要执行的动作,比如用户下单是一个业务事件、用户取消订单是一个业务事件、用户支付订单也是一个业务事件。而"多场景"、"多维度"则是可交由业务自行进行扩展的维度,比如自有标准模式来源的订单、通过开放平台API来的订单、通过第三方标准来源的订单,某某小程序、某某APP来源可以定义为不同场景,而接送机、企业用车、拼车等可以定义为维度。
public@interfaceOrderProcessor{/** * 指定状态 */String[]state()default{};/** * 订单操作事件 */Stringevent(); ...... }/** * 订单状态迁移事件 */publicinterfaceOrderStateEvent{/** * 订单状态事件 */StringgetEventType();/** * 订单ID */StringgetOrderId();/** * 如果orderState不为空,则代表只有订单是当前状态才进行迁移 */defaultStringorderState(){returnnull; }/** * 是否要新创建订单 */booleannewCreate(); }状态迁移流程的封装
在满足了上面说的多维度组合的业务场景、开发多个实现类来执行的情况,我们思考执行这些实现类在流程上是否有再次抽象和封装的地方、以减少研发工作量和尽量的实现通用流程。我们经过观察和抽象,发现每一个订单状态流转的流程中,都会有三个流程:校验、业务逻辑执行、数据更新持久化;于是再次抽象,可以将一个状态流转分为数据准备(prepare)——>校验(check)——>获取下一个状态(getNextState)——>业务逻辑执行(action)——>数据持久化(save)——>后续处理(after)这六个阶段;然后通过一个模板方法将六个阶段方法串联在一起、形成一个有顺序的执行逻辑。这样一来整个状态流程的执行逻辑就更加清晰和简单了、可维护性上也得到的一定的提升。
/** * 状态迁移动作处理步骤 */publicinterfaceStateActionStep {/** * 准备数据 */defaultvoidprepare(StateContext context) { }/** * 校验 */ServiceResult check(StateContext context);/** * 获取当前状态处理器处理完毕后,所处于的下一个状态 */StringgetNextState(StateContext context);/** * 状态动作方法,主要状态迁移逻辑 */ServiceResult action(StringnextState, StateContext context) throws Exception;/** * 状态数据持久化 */ServiceResult save(StringnextState, StateContext context) throws Exception;/** * 状态迁移成功,持久化后执行的后续处理 */voidafter(StateContext context); }/** * 状态机处理器模板类 */@ComponentpublicabstractclassAbstractStateProcessor<T, C>implementsStateProcessor<T, C>,StateActionStep<T, C>{@OverridepublicfinalServiceResult action(StateContext context) throws Exception { ServiceResult result =null;try{// 数据准备this.prepare(context);// 串行校验器result =this.check(context);if(!result.isSuccess()) {returnresult; }// getNextState不能在prepare前,因为有的nextState是根据prepare中的数据转换而来String nextState =this.getNextState(context);// 业务逻辑result =this.action(nextState, context);if(!result.isSuccess()) {returnresult; }// 持久化result =this.save(nextState, context);if(!result.isSuccess()) {returnresult; }// afterthis.after(context);returnresult; }catch(Exception e) {throwe; } }/** * 状态A对应的状态处理器 */@OrderProcessor(state ="INIT", bizCode = {"CHEAP","POPULAR"}, sceneId ="H5")publicclassStateCreateProcessorextendsAbstractStateProcessor<String,CreateOrderContext>{ ...... }(1)校验器
上面提到了校验(check),我们都知道任何一个状态的流转甚至接口的调用其实都少不了一些校验规则,尤其是对于复杂的业务、其校验规则和校验逻辑也会更加复杂。那么对于这些校验规则怎么解耦呢,既要将校验逻辑从复杂的业务流程中解耦出来、同时又需要把复杂的校验规则简单化,使整个校验逻辑更具有可扩展性和可维护性。其实做法也比较简单、参考上面的逻辑,只需要抽象一个校验器接口checker、把复杂的校验逻辑拆开、形成多个单一逻辑的校验器实现类,状态处理器在调用check时只需要调用一个接口、由校验器执行多个checker的集合就可以了。将校验器checker进行封装之后,发现要加入一个新的校验逻辑就十分简单了,只需要写一个新的checker实现类加入校验器就行、对其他代码基本没有改动。
/** * 状态机校验器 */publicinterfaceChecker<T,C>{ServiceResultcheck(StateContext context);/** * 多个checker时的执行顺序 */defaultintorder(){return0; } }逻辑简单了、扩展性和维护性解决了、性能问题就会显现出来。多个校验器checker串行执行性能肯定性能比较差,此时很简单的可以想到使用并行执行,是的、此处使用多线程并行执行多个校验器checker能显著提高执行效率。但是也应该意识到,有些校验器逻辑可能是有前后依赖的(其实不应该出现),还有些业务流程中要求某些校验器的执行必须有前后顺序,还有些流程不要求校验器的执行顺序但是要求错误时的返回顺序、那么怎么在并行的前提下保证顺序呢、此处就可以用order+Future实现了。经过一系列的思考和总结,我们把校验器分为参数校验(paramChecker)、同步校验(syncChecker)、异步校验(asyncChecker)三种类型,其中参数校验paramChecker是需要在状态处理器最开始处执行的,为什么这么做、因为参数都不合法了肯定没有继续向下执行的必要了。
/** * 状态机校验器 */publicinterfaceCheckable{/** * 参数校验 */defaultListgetParamChecker(){returnCollections.EMPTY_LIST; }/** * 需同步执行的状态检查器 */defaultListgetSyncChecker(){returnCollections.EMPTY_LIST; }/** * 可异步执行的校验器 */defaultListgetAsyncChecker(){returnCollections.EMPTY_LIST; } }/** * 校验器的执行器 */publicclassCheckerExecutor{/** * 执行并行校验器, * 按照任务投递的顺序判断返回。 */publicServiceResult parallelCheck(List checkers, StateContext context) {if(!CollectionUtils.isEmpty(checkers)) {if(checkers.size() ==1) {returncheckers.get(0).check(context); } Listcheckable在具体状态处理器中的代码应用举例。
@OrderProcessor(state ="INIT", bizCode = {"CHEAP","POPULAR"}, sceneId ="H5")publicclassOrderCreatedProcessorextendsAbstractStateProcessor<String,CreateOrderContext>{@ResourceprivateCreateParamChecker createParamChecker;@ResourceprivateUserChecker userChecker;@ResourceprivateUnfinshChecker unfinshChecker;@OverridepublicCheckablegetCheckable(StateContext context){returnnewCheckable() {@OverridepublicListgetParamChecker(){returnArrays.asList(createParamChecker); }@OverridepublicListgetSyncChecker(){returnCollections.EMPTY_LIST; }@OverridepublicListgetAsyncChecker(){returnArrays.asList(userChecker, unfinshChecker); } }; } ......checker的定位是校验器,负责校验参数或业务的合法性,但实际编码过程中、checker中可能会有一些临时状态类操作,比如在校验之前进行计数或者加锁操作、在校验完成后根据结果进行释放,这里就需要支持统一的释放功能。
publicinterfaceChecker<T,C>{ ....../** * 是否需求release */defaultbooleanneedRelease(){returnfalse; }/** * 业务执行完成后的释放方法, * 比如有些业务会在checker中加一些状态操作,等业务执行完成后根据结果选择处理这些状态操作, * 最典型的就是checker中加一把锁,release根据结果释放锁. */defaultvoidrelease(StateContext context, ServiceResult result){ } }publicclassCheckerExecutor{/** * 执行checker的释放操作 */public void releaseCheck(Checkable checkable, StateContext context, ServiceResult result) {List checkers =newArrayList<>(); checkers.addAll(checkable.getParamChecker()); checkers.addAll(checkable.getSyncChecker()); checkers.addAll(checkable.getAsyncChecker()); checkers.removeIf(Checker::needRelease);if(!CollectionUtils.isEmpty(checkers)) {if(checkers.size() ==1) { checkers.get(0).release(context, result);return; } CountDownLatch latch =newCountDownLatch(checkers.size());for(Checker c : checkers) { executor.execute(() -> {try{ c.release(context, result); }finally{ latch.countDown(); } }); }try{ latch.await(); }catch(InterruptedException e) {thrownewRuntimeException(e); } } } }(2)上下文
从上面代码可以发现,整个状态迁移的几个方法都是使用上下文Context对象串联的。Context对象中一共有三类对象,(1)订单的基本信息(订单ID、状态、业务属性、场景属性)、(2)事件对象(其参数基本就是状态迁移行为的入参)、(3)具体处理器决定的泛型类。一般要将数据在多个方法中进行传递有两种方案:一个是包装使用ThreadLocal、每个方法都可以对当前ThreadLocal进行赋值和取值;另一种是使用一个上下文Context对象做为每个方法的入参传递。这种方案都有一些优缺点,使用ThreadLocal其实是一种"隐式调用",虽然可以在"随处"进行调用、但是对使用方其实不明显的、在中间件中会大量使用、在开发业务代码中是需要尽量避免的;而使用Context做为参数在方法中进行传递、可以有效的减少"不可知"的问题。
不管是使用ThreadLocal还是Context做为参数传递,对于实际承载的数据载体有两种方案,常见的是使用Map做为载体,业务在使用的时候可以根据需要随意的设置任何kv,但是这种情况对代码的可维护性和可读性是极大的挑战,所以这里使用泛型类来固定数据格式,一个具体的状态处理流程到底需要对哪些数据做传递需要明确定义好。其实原则是一样的,业务开发尽量用用可见性避免不可知。
publicclassStateContext {/** * 订单操作事件 */privateOrderStateEvent orderStateEvent;/** * 状态机需要的订单基本信息 */privateFsmOrder fsmOrder;/** * 业务可定义的上下文泛型对象 */privateC context;publicStateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder){this.orderStateEvent = orderStateEvent;this.fsmOrder = fsmOrder; } ....../** * 状态机引擎所需的订单信息基类信息 */publicinterfaceFsmOrder{/** * 订单ID */StringgetOrderId();/** * 订单状态 */StringgetOrderState();/** * 订单的业务属性 */StringbizCode();/** * 订单的场景属性 */StringsceneId(); }(3)迁移到的状态判定
为什么要把下一个状态(getNextState)抽象为单独一个步骤、而不是交由业务自己进行设置呢?是因为要迁移到的下一个状态不一定是固定的,就是说根据当前状态和发生的事件、再遇到更加细节的逻辑时也可能会流转到不同的状态。举个例子,当前状态是用户已下单完成、要发生的事件是用户取消订单,此时根据不同的逻辑,订单有可能流转到取消状态、也有可能流转到取消待审核状态、甚至有可能流转到取消待支付费用状态。当然这里要取决于业务系统对状态和事件定义的粗细和状态机的复杂程度,做为状态机引擎、这里把下一个状态的判定交由业务根据上下文对象自己来判断。
getNextState()使用及状态迁移持久化举例:
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode ="BUSINESS")publicclassOrderCreatedProcessorextendsAbstractStateProcessor<String,CreateOrderContext>{ ........@OverridepublicStringgetNextState(StateContext context){// if (context.getOrderStateEvent().getEventType().equals("xxx")) {// return OrderStateEnum.INIT;// }returnOrderStateEnum.NEW; }@OverridepublicServiceResultsave(String nextState, StateContext context)throwsException{ OrderInfo orderInfo = context.getContext().getOrderInfo();// 更新状态orderInfo.setOrderState(nextState);// 持久化// this.updateOrderInfo(orderInfo);log.info("save BUSINESS order success, userId:{}, orderId:{}", orderInfo.getUserId(), orderInfo.getOrderId());returnnewServiceResult<>(orderInfo.getOrderId(),"business下单成功"); } }状态消息
一般来说,所有的状态迁移都应该发出对应的消息,由下游消费方订阅进行相应的业务处理。
(1)状态消息内容
对于状态迁移消息的发送内容通常有两种形式,一个是只发状态发生迁移这个通知、举例子就是只发送"订单ID、变更前状态、变更后状态"等几个关键字段,具体下游业务需要哪些具体内容在调用相应的接口进行反查;还有一种是发送所有字段出去、类似于发一个状态变更后的订单内容快照,下游接到消息后几乎不需要在调用接口进行反查。
(2)状态消息的时序
状态迁移是有时序的,因此很多下游依赖方也需要判断消息的顺序。一种实现方案是使用顺序消息(rocketmq、kafka等),但基于并发吞吐量的考虑很少采用这种方案;一般都是在消息体中加入"消息发送时间"或者"状态变更时间"字段,有消费方自己进行处理。
(3)数据库状态变更和消息的一致性
状态变更需要和消息保持一致吗?
很多时候是需要的,如果数据库状态变更成功了、但是状态消息没有发送出去、则会导致一些下游依赖方处理逻辑的缺失。而我们知道,数据库和消息系统是无法保证100%一致的,我们要保证的是主要数据库状态变更了、消息就要尽量接近100%的发送成功。
那么怎么保证呢?
其实通常确实有几种方案:
a)使用rocketmq等支持的两阶段式消息提交方式:
先向消息服务器发送一条预处理消息当本地数据库变更提交之后、再向消息服务器发送一条确认发送的消息如果本地数据库变更失败、则向消息服务器发送一条取消发送的消息如果长时间没有向消息服务器发生确认发送的消息,消息系统则会回调一个提前约定的接口、来查看本地业务是否成功,以此决定是否真正发生消息b)使用数据库事务方案保证:
创建一个消息发送表,将要发送的消息插入到该表中,同本地业务在一个数据库事务中进行提交之后在由一个定时任务来轮询发送、直到发送成功后在删除当前表记录c)还是使用数据库事务方案保证:
创建一个消息发送表,将要发送的消息插入到该表中,同本地业务在一个数据库事务中进行提交向消息服务器发送消息发送成功则删除掉当前表记录对于没有发送成功的消息(也就是表里面没有被删除的记录),再由定时任务来轮询发送还有其他方案吗?有的。
d)数据对账、发现不一致时进行补偿处理、以此保证数据的最终一致。其实不管使用哪种方案来保证数据库状态变更和消息的一致,数据对账的方案都是"必须"要有的一种兜底方案。
那么、还有其他方案吗?还是有的,对于数据库状态变更和消息的一致性的问题,细节比较多,每种方案又都有相应的优缺点,本文主要是介绍状态机引擎的设计,对于消息一致性的问题就不过多介绍,后面也许会有单独的文章对数据库变更和消息的一致性的问题进行介绍和讨论。
2 横向解决逻辑复用和实现业务扩展
云服务器可以换系统吗
实现基于"多类型+多场景+多维度"的代码分离治理、以及标准处理流程模板的状态机模型之后,其实在真正编码的时候会发现不同类型不同维度对于同一个状态的流程处理过程,有时多个处理逻辑中的一部分流程一样的或者是相似的,比如支付环节不管是采用免密还是其他方式、其中核销优惠券的处理逻辑、设置发票金额的处理逻辑等都是一样的;甚至有些时候多个类型间的处理逻辑大部分是相同的而差异是小部分,比如下单流程的处理逻辑基本逻辑都差不多,而出租车对比网约车可能就多了出租车红包、无预估价等个别流程的差异。
对于上面这种情况、其实就是要实现在纵向解决业务隔离和流程编排的基础上,需要支持小部分逻辑或代码段的复用、或者大部分流程的复用,减少重复建设和开发。对此我们在状态机引擎中支持两种解决方案:
基于插件化的解决方案
插件的主要逻辑是:可以在业务逻辑执行(action)、数据持久化(save)这两个节点前加载对应到的插件类进行执行,主要是对上下文Context对象进行操作、或者根据Context参数发起不同的流程调用,已到达改变业务数据或流程的目的。
(1)标准流程+差异化插件
上面讲到同一个状态模型下、不同的类型或维度有些逻辑或处理流程是一样的小部分逻辑是不同的。于是我们可以把一种处理流程定义为标准的或默认的处理逻辑,把差异化的代码写成插件,当业务执行到具体差异化逻辑时会调用到不同的插件进行处理,这样只需要为不同的类型或维度编写对应有差异逻辑的插件即可、标准的处理流程由默认的处理器执行就行。
(2)差异流程+公用插件
当然对于小部分逻辑和代码可以公用的场景,也可以用插件化的方案解决。比如对于同一个状态下多个维修下不同处理器中、我们可以把相同的逻辑或代码封装成一个插件,多个处理器中都可以识别加载该插件进行执行,从而实现多个差异的流程使用想用插件的形式。
/** * 插件注解 */@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Componentpublic@interfaceProcessorPlugin {/** * 指定状态,state不能同时存在 */String[]state()default{};/** * 订单操作事件 */Stringevent();/** * 业务 */String[]bizCode()default{};/** * 场景 */String[]sceneId()default{}; }* 插件处理器 */publicinterfacePluginHandler<T,C>extendsStateProcessor<T,C>{ }Plug在处理器模板中的执行逻辑。
publicabstractclassAbstractStateProcessor<T, C>implementsStateProcessor<T>,StateActionStep<T, C>{@OverridepublicfinalServiceResult action(StateContext context) throws Exception { ServiceResult result =null;try{ ......// 业务逻辑result =this.action(nextState, context);if(!result.isSuccess()) {returnresult; }// 在action和save之间执行插件逻辑this.pluginExecutor.parallelExecutor(context);// 持久化result =this.save(nextState, context));if(!result.isSuccess()) {returnresult; } ...... }catch(Exception e) {throwe; } }插件使用的例子:
/** * 预估价插件 */@ProcessorPlugin(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode ="BUSINESS")publicclassEstimatePricePluginimplementsPluginHandler<String,CreateOrderContext>{@OverridepublicServiceResultaction(StateContext context)throwsException{// String price = priceSerive.getPrice();String price =""; context.getContext().setEstimatePriceInfo(price);returnnewServiceResult(); } }基于代码继承方式的解决方案
当发现新增一个状态不同维度的处理流程,和当前已存在的一个处理器大部分逻辑是相同的,此时就可以使新写的这个处理器B继承已存在的处理器A,只需要让处理器B覆写A中不同方法逻辑、实现差异逻辑的替换。这种方案比较好理解,但是需要处理器A已经规划好一些可以扩展的点、其他处理器可以基于这些扩展点进行覆写替换。当然更好的方案其实是,先实现一个默认的处理器,把所有的标准处理流程和可扩展点进行封装实现、其他处理器进行继承、覆写、替换就好。
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode ="CHEAP")publicclassOrderCreatedProcessorextendsAbstractStateProcessor<String,CreateOrderContext>{@OverridepublicServiceResultaction(String nextState, StateContext context)throwsException{ CreateEvent createEvent = (CreateEvent) context.getOrderStateEvent();// 促销信息信息String promtionInfo =this.doPromotion(); ...... }/** * 促销相关扩展点 */protectedStringdoPromotion(){return"1"; } }@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode ="TAXI")publicclassOrderCreatedProcessor4TaxiextendsOrderCreatedProcessor<String,CreateOrderContext>{@OverrideprotectedStringdoPromotion(){return"taxt1"; } }3 状态迁移流程的执行流程
状态机引擎的执行过程
通过上面的介绍,大体明白了怎么实现状态流程编排、业务隔离和扩展等等,但是状态机引擎具体是怎么把这个过程串联起来的呢?简单说、分为两个阶段:初始化阶段和运行时阶段。
(1)状态机引擎初始化阶段
阿里云服务器稳定吗
首先在代码编写阶段、根据上面的分析,业务通过实现AbstractStateProcessor模板类、并添加@OrderProcessor注解来实现自己的多个需要的特定状态处理器。
那么在系统初始化阶段,所有添加了@OrderProcessor注解的实现类都会被spring所管理成为spring bean,状态机引擎在通过监听spring bean的注册(BeanPostProcessor)来将这些状态处理器processor装载到自己管理的容器中。直白来说、这个状态处理器容器其实就是一个多层map实现的,第一层map的key是状态(state),第二层map的key是状态对应的事件(event)、一个状态可以有多个要处理的事件,第三层map的key是具体的场景code(也就是bizCode和sceneId的组合),最后的value是AbstractStateProcessor集合。
publicclassDefaultStateProcessRegistryimplementsBeanPostProcessor{/** * 第一层key是订单状态。 * 第二层key是订单状态对应的事件,一个状态可以有多个事件。 * 第三层key是具体场景code,场景下对应的多个处理器,需要后续进行过滤选择出一个具体的执行。 */privatestaticMap<String,Map<String,Map<String, List>>> stateProcessMap =newConcurrentHashMap<>(); @Override publicObjectpostProcessAfterInitialization(Objectbean,StringbeanName) throws BeansException {if(beaninstanceofAbstractStateProcessor && bean.getClass().isAnnotationPresent(OrderProcessor.class)) { OrderProcessor annotation = bean.getClass().getAnnotation(OrderProcessor.class);String[] states = annotation.state();Stringevent = annotation.event();String[] bizCodes = annotation.bizCode().length ==0?newString[]{""} : annotation.bizCode();String[] sceneIds = annotation.sceneId().length ==0?newString[]{""} : annotation.sceneId(); initProcessMap(states, event, bizCodes, sceneIds, stateProcessMap, (AbstractStateProcessor) bean); }returnbean; } private voidinitProcessMap(String[] states,Stringevent,String[] bizCodes,String[] sceneIds,Map<String,Map<String,Map<String, List>>> map, E processor) {for(StringbizCode : bizCodes) {for(StringsceneId : sceneIds) { Arrays.asList(states).parallelStream().forEach(orderStateEnum -> { registerStateHandlers(orderStateEnum, event, bizCode, sceneId, map, processor); }); } } }/** * 初始化状态机处理器 */public voidregisterStateHandlers(StringorderStateEnum,Stringevent,StringbizCode,StringsceneId,Map<String,Map<String,Map<String, List>>> map, E processor) {// state维度if(!map.containsKey(orderStateEnum)) { map.put(orderStateEnum,newConcurrentHashMap<>()); }Map<String,Map<String, List>> stateTransformEventEnumMap = map.get(orderStateEnum);// event维度if(!stateTransformEventEnumMap.containsKey(event)) { stateTransformEventEnumMap.put(event,newConcurrentHashMap<>()); }// bizCode and sceneIdMap<String, List> processorMap = stateTransformEventEnumMap.get(event);StringbizCodeAndSceneId = bizCode +"@"+ sceneId;if(!processorMap.containsKey(bizCodeAndSceneId)) { processorMap.put(bizCodeAndSceneId,newCopyOnWriteArrayList<>()); } processorMap.get(bizCodeAndSceneId).add(processor); } }(2)状态机引擎运行时阶段
经过初始化之后,所有的状态处理器processor都被装载到容器。在运行时,通过一个入口来发起对状态机的调用,方法的主要参数是操作事件(event)和业务入参,如果是新创建订单请求需要携带业务(bizCode)和场景(sceneId)信息、如果是已存在订单的更新状态机引擎会根据oderId自动获取业务(bizCode)、场景(sceneId)和当前状态(state)。之后引擎根据state+event+bizCode+sceneId从状态处理器容器中获取到对应的具体处理器processor,从而进行状态迁移处理。
/** * 状态机执行引擎 */publicinterfaceOrderFsmEngine{/** * 执行状态迁移事件,不传FsmOrder默认会根据orderId从FsmOrderService接口获取 */ServiceResultsendEvent(OrderStateEvent orderStateEvent) throws Exception;/** * 执行状态迁移事件,可携带FsmOrder参数 */ServiceResultsendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception; }@ComponentpublicclassDefaultOrderFsmEngineimplementsOrderFsmEngine{@OverridepublicServiceResultsendEvent(OrderStateEvent orderStateEvent)throwsException{ FsmOrder fsmOrder =null;if(orderStateEvent.newCreate()) { fsmOrder =this.fsmOrderService.getFsmOrder(orderStateEvent.getOrderId());if(fsmOrder ==null) {thrownewFsmException(ErrorCodeEnum.ORDER_NOT_FOUND); } }returnsendEvent(orderStateEvent, fsmOrder); }@OverridepublicServiceResultsendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder)throwsException{// 构造当前事件上下文StateContext context =this.getStateContext(orderStateEvent, fsmOrder);// 获取当前事件处理器StateProcessor stateProcessor =this.getStateProcessor(context);// 执行处理逻辑returnstateProcessor.action(context); }private StateProcessor getStateProcessor(StateContext context) { OrderStateEvent stateEvent = context.getOrderStateEvent(); FsmOrder fsmOrder = context.getFsmOrder();// 根据状态+事件对象获取所对应的业务处理器集合List processorList = stateProcessorRegistry.acquireStateProcess(fsmOrder.getOrderState(), stateEvent.getEventType(), fsmOrder.bizCode(), fsmOrder.sceneId());if(processorList ==null) {// 订单状态发生改变if(!Objects.isNull(stateEvent.orderState()) && !stateEvent.orderState().equals(fsmOrder.getOrderState())) {thrownewFsmException(ErrorCodeEnum.ORDER_STATE_NOT_MATCH); }thrownewFsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR); }if(CollectionUtils.isEmpty(processorResult)) {thrownewFsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR); }if(processorResult.size() >1) {thrownewFsmException(ErrorCodeEnum.FOUND_MORE_PROCESSOR); }returnprocessorResult.get(0); }privateStateContext getStateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) { StateContext context =newStateContext(orderStateEvent, fsmOrder);returncontext; } }检测到多个状态执行器怎么处理
有一点要说明,有可能根据state+event+bizCode+sceneId信息获取到的是多个状态处理器processor,有可能确实业务需要单纯依赖bizCode和sceneId两个属性无法有效识别和定位唯一processor,那么我们这里给业务开一个口、由业务决定从多个处理器中选一个适合当前上下文的,具体做法是业务processor通过filter方法根据当前context来判断是否符合调用条件。
private<T>StateProcessor<T,?>getStateProcessor(StateContext推荐阅读
- 网络摄像头云服务器Banana2联动可灵正确玩法!我现在做Ai视频只要一张图就够了 2025-12-21 14:18:21
- erp系统怎么放到云服务器上视频丨智能监测 服务可视化住院“免陪护”让家属更安心 2025-12-21 14:08:18
- 云服务器 本地服务器100多元买的“智驾神器”,实现无人驾驶?央视曝光 2025-12-21 13:58:14
- 青海服务器托管云服务器免抵押、免个人连带担保财信“启航贷”为大学生创业护航 2025-12-21 13:48:11
- 阿里云服务器接入协议即梦AI保姆级教程|30分钟搞定短视频制作,新手也能出爆款! 2025-12-21 13:38:08

