Python 客户端
前言
采用GO原生语言开发的SnailJob客户端具备与SnailJob的Java客户端Job模块一样的能力包括(集群、广播、静态分片、Map、MapReuce、DAG工作流、实时日志等功能)
仓库地址
- github: https://github.com/open-snail/snail-job-go.git
- gitee: https://gitee.com/opensnail/snail-job-go.git
开始使用
- 在go.mod文件中添加依赖
shell
require github.com/open-snail/snail-job-go {版本号}
- 配置客户端参数
go
// 配置Options参数
exec := snailjob.NewSnailJobManager(&dto.Options{
ServerHost: "127.0.0.1",
ServerPort: "17888",
HostIP: "127.0.0.1",
HostPort: "17889",
Namespace: "764d604ec6fc45f68cd92514c40e9e1a",
GroupName: "snail_job_demo_group",
Token: "SJ_Wyz3dmsdbDOkDujOTSSoBjGQP1BMsVnj",
Level: logrus.InfoLevel,
ReportCaller: true,
})
// 注册执行器
exec.Register("testJobExecutor", func() job.IJobExecutor {
return &Test3JobExecutor{}
})
// 初始化环境
if nil == exec.Init() {
// 启动客户端
exec.Run()
}
登录后台,能看到对应host-id 为 go-xxxxxx
的客户端
示例
定时任务
go
// TestJobExecutor 这是一个示例执行器
type TestJobExecutor struct {
job.BaseJobExecutor
}
func (executor *Test2JobExecutor) DoJobExecute(jobArgs dto.IJobArgs) dto.ExecuteResult {
executor.RemoteLogger.Infof("TestJobExecutor 执行结束 DoJobExecute. jobId: [%d] now:[%s]", jobArgs.GetJobId(), time.Now().String())
return *dto.Success().WithMessage("hello 这是go客户端")
}
新建定时任务, 执行器类型选择【Go】,执行器名称填入【testJobExecutor】
动态分片
go
// TestMapJobExecutor 这是一个测试类
type TestMapJobExecutor struct {
job.BaseMapJobExecutor
}
func (executor *TestMapJobExecutor) DoJobMapExecute(mpArgs *dto.MapArgs) dto.ExecuteResult {
logger := executor.LocalLogger
if mpArgs.TaskName == constant.ROOT_MAP {
_, _ = executor.DoMap([]interface{}{1, 2, 3}, "secondTaskName")
return *dto.Success()
}
logger.Infof("TestMapJobExecutor执行 DoJobMapExecute. jobId: [%d] TaskName:[%s] ", mpArgs.GetJobId(), mpArgs.TaskName)
return *dto.Success().WithMessage("这是动态分片")
}
MapReduce
go
// TestMapReduceJobExecutor 这是一个测试类
type TestMapReduceJobExecutor struct {
job.BaseMapReduceJobExecutor
}
func (executor *TestMapReduceJobExecutor) DoJobMapExecute(mpArgs *dto.MapArgs) dto.ExecuteResult {
logger := executor.LocalLogger
return *dto.Success().WithMessage("这是动态分片阶段")
}
// DoReduceExecute 模板类
func (executor *TestMapReduceJobExecutor) DoReduceExecute(jobArgs *dto.ReduceArgs) dto.ExecuteResult {
logger := executor.LocalLogger
logger.Infof("TestMapReduceJobExecutor 开始执行 DoReduceExecute.")
return *dto.Success().WithMessage("这是Reduce阶段")
}
func (executor *TestMapReduceJobExecutor) DoMergeReduceExecute(jobArgs *dto.MergeReduceArgs) dto.ExecuteResult {
logger := executor.LocalLogger
logger.Info("TestMapReduceJobExecutor 开始执行 DoMergeReduceExecute.")
return *dto.Success().WithMessage("这是merge阶段")
}
响应停止事件
go
type TestJobExecutor struct {
job.BaseJobExecutor
}
// 测试超时时间
func (executor *TestJobExecutor) DoJobExecute(jobArgs dto.IJobArgs) dto.ExecuteResult {
time.Sleep(1 * time.Second)
interrupt := executor.Context().Value(constant.INTERRUPT_KEY)
if interrupt != nil {
executor.LocalLogger.Errorf("任务被中断. jobId: [%d] now:[%s]", jobArgs.GetJobId(), time.Now().String())
return *dto.Failure().WithMessage("任务被中断")
}
return *dto.Success().WithMessage("hello 这是go客户端")
}
工作流
go
// TestWorkflowJobExecutor 这是一个测试类
type TestWorkflowJobExecutor struct {
job.BaseJobExecutor
}
func (executor *TestWorkflowJobExecutor) DoJobExecute(jobArgs dto.IJobArgs) dto.ExecuteResult {
executor.LocalLogger.Infof("TestWorkflowJobExecutor. jobId: [%d] wfContext:[%+v]",
jobArgs.GetJobId(), jobArgs.GetWfContext("name"))
jobArgs.AppendContext("name", "xiaowoniu")
return *dto.Success().WithMessage("测试工作流")
}