java开源调度如何给xxljob加k8s执行器
目录
- 前言
- 执行器接口
- K8S执行器设计
- 1、在XXL-JOB-ADMIN模块新增执行器
- 2、引入K8S-CLIENT-JAVA,使用SERVICEACCOUNT机制与K8S交互
- 3、编写代理执行器调度代码
- 结语
前言 xxljob 是采用 java 开发的开源的任务调度系统,架构上分为调度管理器、执行器,目前除了官方提供的 java 执行器外,也有 go 开发者提供了 go 语言的执行器(看了 go 执行器的代码,除了任务日志没有实现,其他功能实现都比较完整)。 xxljob 在设计上,抽象出了执行器的接口,所以实现一个语言的执行器并不复杂,这里主要探索下,如何利用 k8s 的 pod 的能力,使用 xxljob 调度 pod 运行,实现一个通用的和语言无关的执行器
- xxljob :https://github.com/xuxueli/xxl-job
- k8s-client-java: https://github.com/fabric8io/kubernetes-client
执行器接口 实现一个 xxljob 的执行器,如果不考虑执行器节点自动注册,只需要实现如下五个接口即可:
- /beat :执行器心跳
- /idleBeat :执行器的某个 job 是否空闲
- /run :触发 job 执行
- /kill :终止正在执行的 job
- /log :查看本节点执行器的 job 执行日志
K8S 执行器设计 上面已经了解了实现一个执行器的要素。但是让 k8s 实现这些接口,难度有点高。然后又希望不破坏现有的 xxljob 的设计,怎么办?代理解决。可以直接采用现有的 java 执行器,创建一个 job 任务,这个 job 任务专门发起 k8s 的调度,具体的调度 pod 信息通过调度参数传递,下面来实现下,以及看下需要注意的问题。
1、在 XXL-JOB-ADMIN 模块新增执行器
为了尽量减少系统维护的复杂度,我们可以将代理调度 k8s 的执行器,直接集成到 admin 模块,启动 admin 的时候,自动注册 k8s 执行器。
文章图片
2、引入 K8S-CLIENT-JAVA ,使用 SERVICE ACCOUNT 机制与 K8S 交互
io.fabric8 kubernetes-client5.4.0
这个客户端提供了完整的和 k8s-api-server 交互能力,使用这个客户端,基于 k8s 的 service account 认证,可以轻松在 xxljob 所在 namespace 内完成 pod 的生命周期管理。引入依赖后,首先创建 client 实例:
/** * @author kl (http://kailing.pub) * @since 2021/6/4 */@Configurationpublic class KubernetesClientConfig {@Beanpublic KubernetesClient kubernetesClient(){return new DefaultKubernetesClient(Config.autoConfigure(null)); }}
这里初始化客户端时,采用了自动发现配置的模式,如果是本机开发时,就会自动寻找你本机的 kubectl 配置,当 xxljob 部署到 k8s 内时,如果找不到本地的就会尝试寻找 service account 创建出来的配置,然后从环境变量中自发现 k8s 集群的链接地址。所以无论是开发环境还是线上环境,都不用配置k8s 的链接认证信息。但是,部署到 k8s 时,因为需要借助 k8s 的 service account 机制与 k8s 交互,需要多定义一个 service account 的权限声明,可参考如下:
# In GKE need to get RBAC permissions first with# kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin [--user=|--group=]---apiVersion: v1kind: ServiceAccountmetadata:name: xxljob---kind: RoleapiVersion: rbac.authorization.k8s.io/v1metadata:name: xxljobrules:- apiGroups: [""]resources: ["pods"]verbs: ["create","delete","get","list","patch","update","watch"]- apiGroups: [""]resources: ["pods/exec"]verbs: ["create","delete","get","list","patch","update","watch"]- apiGroups: [""]resources: ["pods/log"]verbs: ["get","list","watch"]- apiGroups: [""]resources: ["events"]verbs: ["watch"]- apiGroups: [""]resources: ["secrets"]verbs: ["get"]---apiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata:name: xxljobroleRef:apiGroup: rbac.authorization.k8s.iokind: Rolename: xxljobsubjects:- kind: ServiceAccountname: xxljob
3、编写代理执行器调度代码
/** * @author kl (http://kailing.pub) * @since 2021/5/28 */@Componentpublic class KubernetesExecutorHandler {private static final Logger logger = LoggerFactory.getLogger(KubernetesExecutorHandler.class); private static final String NAMESPACE = "xxl-job"; private final KubernetesClient client; public KubernetesExecutorHandler(KubernetesClient client) {this.client = client; }@XxlJob(value = "https://www.it610.com/article/callK8s")public void callK8s() throws InterruptedException {String podResource = XxlJobHelper.getJobParam(); Pod pod = Serialization.unmarshal(podResource, Pod.class); pod.getSpec().setRestartPolicy("Never"); //这里强制设置重启策略为不重启pod = client.pods().inNamespace(NAMESPACE).create(pod); client.resource(pod).waitUntilCondition(pod1 -> pod1.getStatus().getPhase().equals("Succeeded") || pod1.getStatus().getPhase().equals("Failed"), 2, TimeUnit.MINUTES); String log = client.pods().inNamespace(NAMESPACE).withName(pod.getMetadata().getName()).getLog(); XxlJobHelper.log(log); //记录 pod 日志到 xxl-joblogger.info(log); client.resource(pod).delete(); }}
如上,一个简版的 k8s 执行器便完成了,使用时,通过定义bean模式的 job ,然后选择 k8s 执行器,jobHandler 名称和填上 callk8s,通过job 参数传递 pod 调度信息,如:
文章图片
这里定义了一个打印 当前时间和当前环境变量的 pod 任务,执行完成后,就可以从 job 的日志里看到执行结果了,如:
文章图片
结语 目前的实现方式,单纯从兼容 xxljob 现有的架构模式,以及现有的实现出发的,所以采用了 java 代理执行器代为调度 pod 的方案,基本继承了所有 java 执行器的功能,比如 job 执行日志记录,并发执行策略等。需要注意的是,因为是单 handler 实现,每个job 都会用同一个 handler 去运行,所以创建任务的时候并发策略这块只能选择单机串行执行,否则非常容易丢任务。另一个需要考虑的问题,如果代理执行器非正常关闭,pod 没来的及删除就挂了,这个时候需要启动一个巡检的线程,检测已经完成或者已经出错的 pod ,然后清理掉。
【java开源调度如何给xxljob加k8s执行器】以上就是java开源调度如何给xxljob加k8s执行器的详细内容,更多关于java开源xxljob加k8s执行器的资料请关注脚本之家其它相关文章!
推荐阅读
- JavaScript中var和let之间有什么区别()
- Java编程面试常见问题分享和解析S2
- 原Android热更新开源项目Tinker源码解析系列之二:资源文件热更新
- Mac Android开发环境变量的配置(javasdkndkgradle)
- 安卓开源项目周报0104
- javascript|ES6中Object.assign() 方法
- vue|uniapp+java小程序支付
- 如何确定Java中数组的长度或大小()
- 如何在另一个JavaScript文件中包含一个JavaScript文件()
- 你需要知道的JavaScript ES2021特性