Skip to content

Snail-job 工作流 上下文

工作流上下文

​ 工作流上下文分为两类:

类型说明配置方式备注
初始化上下文(静态初始化)在工作流配置页面中可以选择配置相关初始化参数,官方支持配置三种类型(String、Number、Boolean)管理端工作流配置页面
任务内上下文(动态传递)可在定时任务中,通过提供的SDK动态的插入相关上下文,向下传递给后续的相关任务使用。(后续的决策节点可以根据工作流上下文完成相关判断,动态的决定后面任务的执行)官方SDK工作流并行执行,无法保证实时获取到相邻并行节点的上下文

Tip:由于工作流上下文是以明文形式存储在数据库中,不建议在上下文中传递敏感信息,若需要建议自己加密传递。

工作流初始化配置

​ 管理端页面配置(可以选择不配置),支持以K,V的形式配置,支持配置多个初始化参数。

image-20240918154230292

工作流上下文动态配置(SDK Demo)

Job任务

java
// job任务
@Component
@JobExecutor(name = "testJobExecutor")
public class TestAnnoJobExecutor {

public ExecuteResult jobExecute(JobArgs jobArgs) {
    // 可以通过jobArgs获取到工作流上下文的内容。
    // 即可以获取到当前任务执行前的所有执行完成的任务的设置的上下文及工作流初始化的上下文
    Map<String, Object> wfContext = jobArgs.getWfContext();

    // 通过#appendContext(String key, Object value) 方法可以设置上下文
    jobArgs.appendContext("testDouble", 5.324);
    jobArgs.appendContext("testLong", 5324);
    jobArgs.appendContext("testBoolean", true);
    PhoneNumberBo phoneNumberBo = new PhoneNumberBo();
    phoneNumberBo.setPhoneNumber("12345");
    jobArgs.appendContext("phone", phoneNumberBo);
    WorkFlowContextUtil.handleJobContext(jobArgs);
    return ExecuteResult.success("测试成功");
}
}

Map任务

Tip:由于Map任务作为一个完整的任务,后续动态分片的相关任务无法获取到RootMap中配置的上下文。

java
// Map任务
@Component
@JobExecutor(name = "testAnnoMapJobExecutor")
public class TestAnnoMapJobExecutor {

@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
    // 获取上下文
    Map<String, Object> wfContext = mapArgs.getWfContext();
    // 设置上下文 A
    mapArgs.appendContext(xxx, xxx);
    return mapHandler.doMap(Lists.newArrayList("1", "2", "3"), "SECOND_MAP");
}

@MapExecutor(taskName = "SECOND_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
    // 获取上下文
    // 无法获取上面 A处设置的上下文
    Map<String, Object> wfContext = mapArgs.getWfContext();
    // 设置上下文
    mapArgs.appendContext(xxx, xxx);
    return ExecuteResult.success();
}
}

MapReduce任务

Tip:由于MapReduce任务作为一个完整的任务,后续动态分片的相关任务无法获取到RootMap/Map/Reduce中配置的上下文。

java
//MapReduce任务
@Component
@JobExecutor(name = "testAnnoMapReduceJobExecutor")
public class TestAnnoMapReduceJobExecutor {

@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
    // 获取上下文
    Map<String, Object> wfContext = mapArgs.getWfContext();
    // 设置上下文 A
    mapArgs.appendContext(xxx, xxx);
    return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4", "5", "6"), "SECOND_MAP");
}

@MapExecutor(taskName = "SECOND_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
    // 获取上下文
    Map<String, Object> wfContext = mapArgs.getWfContext();
    // 设置上下文 B
    // 无法获取上面 A处设置的上下文
    mapArgs.appendContext(xxx, xxx);
    return ExecuteResult.success();
}

@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) {
    // 获取上下文
     Map<String, Object> wfContext = mapReduceArgs.getWfContext();
    // 设置上下文 C
    // 无法获取上面 A、B处设置的上下文
    mapReduceArgs.appendContext(xxx, xxx);
    return ExecuteResult.success();
}

/**
 * 当只有一个reduce任务时无此执行器
 */
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
    // 获取上下文
     Map<String, Object> wfContext = mergeReduceArgs.getWfContext();
    // 设置上下文
    // 无法获取上面 A、B、C处设置的上下文
    mergeReduceArgs.appendContext(xxx, xxx);
    return ExecuteResult.success();
}
}