Python 客户端
Snail Job Python 客户端要求 Python >= 3.8
前言
snail-job 项目的 python 客户端。snail-job项目 java 后端
Snail Job Python客户端主打的是“原汁原昧”, 无需嵌套在其他语言环境中; 具备与SnailJob的Java客户端Job模块一样的能力包括(集群、广播、静态分片、Map、MapReuce、DAG工作流、实时日志等功能),而 xxl-job
、PowerJob
等其他的任务调度系统都是通过 Java 客户端使用 Runtime
执行 Python
脚本, 那么会有如下几个问题:
- 需要运行在Java环境中,即耗内存和又显得笨重
- 不方便编写复杂的 Python 脚本
- Java 客户端通过 Python 命令执行脚本,需要系统全局安装脚本的第三方依赖
- 代码可维护性和可调试比较差
Snail Job Python 客户端可以直接对接 SnailJob 服务器,实现定时任务调度,并上报日志。Python 客户端当前仍不支持重试任务
,也没有支持计划。
快速启动
服务器配置
使用服务器默认配置,并配置如下信息:
- 组:
snail_job_demo_group
;token:SJ_Wyz3dmsdbDOkDujOTSSoBjGQP1BMsVnj
- 定时任务:组名称:
snail_job_demo_group
;执行器类型:Python
;执行器名称:testJobExecutor
;任务类型:集群
启动客户端
shell
git clone https://gitee.com/opensnail/snail-job-python.git
cd snail-job-python
pip install -r requrements.txt
python main.py
配置客户端
Snail Job Python 采用 dotenv 配置方式,你可以复制 .env.example
为 .env
,然后修改相关配置项,然后再重新启动客户端。
shell
# 复制 `.env.example` 为 `.env`
cp .env.example .env # windows命令为 copy
# 创建虚拟环境
python -m venv venv
# 安装依赖
pip install -r requirements.txt
# 启动程序
python main.py
- 登录后台,能看到对应host-id 为
py-xxxxxx
的客户端
示例
定时任务
python
from snailjob import *
@job("testJobExecutor") # 1. testJobExecutor 为执行器名称
def test_job_executor(args: JobArgs) -> ExecuteResult:
SnailLog.REMOTE.info(f"job_params: {args.job_params}")
return ExecuteResult.success() # 2. 返回执行结果
if __name__ == "__main__":
ExecutorManager.register(test_job_executor) # 3. 注册执行器
client_main() # 4. 执行客户端主函数
新建定时任务, 执行器类型选择【Python】,执行器名称填入【testJobExecutor】
动态分片
python
from snailjob import *
testMyMapExecutor = MapExecutor("testMyMapExecutor") # 1. 定义 MapExecutor 变量
@testMyMapExecutor.map() # 2. 定义 ROOT_MAP 阶段任务
def testMyMapExecutor_rootMap(args: MapArgs):
assert args.task_name == ROOT_MAP
return mr_do_map(["1", "2", "3", "4"], "TWO_MAP")
@testMyMapExecutor.map("TWO_MAP") # 3. 定义 TWO_MAP 阶段任务
def testMyMapExecutor_twoMap(args: MapArgs):
return ExecuteResult.success(args.map_result)
if __name__ == "__main__":
ExecutorManager.register(testMyMapExecutor) # 4. 注册执行器
client_main()
MapReduce
python
from snailjob import *
testMapReduceJobExecutor = MapReduceExecutor("testMapReduceJobExecutor") # 1. 定义 MapReduceExecutor 变量
@testMapReduceJobExecutor.map() # 2. 定义 ROOT_MAP 阶段任务
def testMapReduceJobExecutor_rootMap(args: MapArgs):
return mr_do_map(["1", "2", "3", "4", "5", "6"], "MONTH_MAP") # 3. 上报分片信息
@testMapReduceJobExecutor.map("MONTH_MAP") # 4. 定义 ROOT_MAP 阶段任务
def testMapReduceJobExecutor_monthMap(args: MapArgs):
return ExecuteResult.success(int(args.map_result) * 2)
@testMapReduceJobExecutor.reduce() # 5. 定义 reduce 阶段任务
def testMapReduceJobExecutor_reduce(args: ReduceArgs):
return ExecuteResult.success(sum([int(x) for x in args.map_result]))
@testMapReduceJobExecutor.merge() # 6. 定义 merge 阶段任务
def testMapReduceJobExecutor_merge(args: MergeReduceArgs):
return ExecuteResult.success(sum([int(x) for x in args.map_result]))
if __name__ == "__main__":
ExecutorManager.register(testMyMapExecutor) # 7. 注册执行器
client_main()
响应停止事件
python
@job("testJobExecutor")
def test_job_executor(args: JobArgs) -> ExecuteResult:
for i in range(40):
if ThreadPoolCache.event_is_set(args.task_batch_id): # 1. 判断当前任务批次是否被终止
SnailLog.REMOTE.info("任务已经被中断,立即返回")
return ExecuteResult.failure()
time.sleep(1)
return ExecuteResult.success()
工作流、静态分片与普通定时任务类似,不做赘述
gRPC
开发者工具
shell
pip install grpcio-tools==1.66.2
cd snailjob/grpc/
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto
HACK, 需要手动修改自动生成的文件 snailjob/grpc/snailjob_pb2_grpc.py
diff
- import snailjob_pb2 as snailjob__pb2
+ from . import snailjob_pb2 as snailjob__pb2