azkaban二次开发 – 自定义钉钉告警
azkaban默认只支持邮件告警,但支持plugin扩展机制自定义告警,所以我决定实现一个钉钉告警,毕竟没人天天盯着邮箱。
azkaban插件原理
下载azkaban最新代码,我们分析一下告警plugin的完整工作流程,以便正确开发&配置plugin。
插件回调
源码文件:azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java,关键函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
/** * When a flow is finished, alert the user as is configured in the execution options. * * @param flow the execution * @param alerterHolder the alerter holder * @param extraReasons the extra reasons for alerting */ public static void alertUserOnFlowFinished(final ExecutableFlow flow, final AlerterHolder alerterHolder, final String[] extraReasons) { final ExecutionOptions options = flow.getExecutionOptions(); // 获取内置的email类型的alerter插件 final Alerter mailAlerter = alerterHolder.get("email"); if (flow.getStatus() != Status.SUCCEEDED) { // flow失败,如果配置了email则回调它的alertOnError方法 if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) { try { mailAlerter.alertOnError(flow, extraReasons); // 这里~~~ } catch (final Exception e) { logger.error("Failed to alert on error for execution " + flow.getExecutionId(), e); } } // 如果flow执行时配置了alert.type参数 if (options.getFlowParameters().containsKey("alert.type")) { final String alertType = options.getFlowParameters().get("alert.type"); final Alerter alerter = alerterHolder.get(alertType); // 那么加载alert.type指定的自定义alerter插件 if (alerter != null) { try { alerter.alertOnError(flow, extraReasons); // 回调它的alertOnError方法 } catch (final Exception e) { logger.error("Failed to alert on error by " + alertType + " for execution " + flow .getExecutionId(), e); } } else { logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert."); } } |
可见:
- alerterHolder.get()可以获取alerter插件的实例对象,默认有email实现,也可以获取到自定义的实现。
- 必须在flow执行时配置参数alert.type,其值为插件的名字。
- flow执行失败会回调插件的alertOnError函数,其他情况会有其他函数对应,这里没有粘完整代码。
alerterHolder加载插件
源码文件:azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
该类初始化时会直接得到一个现成的email实现,另外会去plugins/alerter目录下加载自定义插件:
1 2 3 4 5 6 7 8 9 |
private Map<String, Alerter> loadAlerters(final Props props, final Emailer mailAlerter) { final Map<String, Alerter> allAlerters = new HashMap<>(); // load built-in alerters allAlerters.put("email", mailAlerter); // load all plugin alerters final String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter"); allAlerters.putAll(loadPluginAlerters(pluginDir)); return allAlerters; } |
plugins/alerter目录下应该放置我们的告警插件,每个告警插件一个目录,它会逐个扫描加载:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
final Map<String, Alerter> installedAlerterPlugins = new HashMap<>(); // 保存 插件名 --> 插件对象 final ClassLoader parentLoader = getClass().getClassLoader(); // 当前classloader final File[] pluginDirs = alerterPluginPath.listFiles(); // 有哪些alerter插件目录 final ArrayList<String> jarPaths = new ArrayList<>(); // 逐个加载alerter插件 for (final File pluginDir : pluginDirs) { // load plugin properties // 加载插件目录下conf配置文件 final Props pluginProps = PropsUtils.loadPluginProps(pluginDir); if (pluginProps == null) { continue; } // 从配置文件中找到插件名字,与alert.type对应 final String pluginName = pluginProps.getString("alerter.name"); final List<String> extLibClassPaths = pluginProps.getStringList("alerter.external.classpaths", (List<String>) null); // 从配置文件中找到要创建的类名 final String pluginClass = pluginProps.getString("alerter.class"); if (pluginClass == null) { logger.error("Alerter class is not set."); continue; } else { logger.info("Plugin class " + pluginClass); } // 创建alert.class的实例 Class<?> alerterClass = PluginUtils.getPluginClass(pluginClass, pluginDir, extLibClassPaths, parentLoader); |
它首先loadPluginProps加载插件私有配置,位于插件目录的conf子目录下的plugin.properties:
1 2 3 |
final File propertiesDir = new File(pluginDir, "conf"); if (propertiesDir.exists() && propertiesDir.isDirectory()) { final File propertiesFile = new File(propertiesDir, "plugin.properties"); |
例如我的插件是这样放置的:
1 2 3 4 5 6 |
(base) [hadoop@10 azkaban]$ cat ./plugins/alerter/azkaban-dingding-alert/conf/plugin.properties alerter.name=azkaban-dingding-alert alerter.class=cc.yuerblog.AzkabanDingDingAlert # 其他自定义的Prop(钉钉 or ...) dingding.serverUrl=https://oapi.dingtalk.com/robot/send?access_token=..... |
文件里的alert.name为插件的唯一标识,alert.class为插件的类名。
上述代码最终通过getPluginClass来实例化插件class对象,它要做的事情其实是创建一个classLoader可以从我们上传的jar包中找到alert.class类,因此我们仔细看看getPluginClass函数的实现。
它的重要传参包含azkaban当前的classloader,插件的class名字,插件的目录。
源码文件:az-core/src/main/java/azkaban/utils/PluginUtils.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public static URLClassLoader getURLClassLoader(final File pluginDir, List<String> extLibClassPaths, ClassLoader parentLoader) { // LIBRARY_FOLDER_NAME是lib,因此就是打开插件目录的lib子目录 final File libDir = new File(pluginDir, LIBRARY_FOLDER_NAME); if (libDir.exists() && libDir.isDirectory()) { final File[] files = libDir.listFiles(); // 列举lib目录下面的所有jar文件 final ArrayList<URL> urls = getUrls(files); // 将文件转成URL对象 if (extLibClassPaths != null) { for (final String extLibClassPath : extLibClassPaths) { // 如果配置了extLib也会把这些目录下的jar文件加入URL列表 try { final File extLibFile = new File(pluginDir, extLibClassPath); if (extLibFile.exists()) { if (extLibFile.isDirectory()) { // extLibFile is a directory; load all the files in the // directory. final File[] extLibFiles = extLibFile.listFiles(); urls.addAll(getUrls(extLibFiles)); } else { final URL url = extLibFile.toURI().toURL(); urls.add(url); } } else { logger.error( "External library path not found. path = " + extLibFile.getAbsolutePath() ); continue; } } catch (final MalformedURLException e) { logger.error( "Invalid External library path. path = " + extLibClassPath + " dir = " + pluginDir, e ); } } } // 创建新的classloader,继承自azkaban原本的classloader以便可以访问到已经加载过的的class。 // 父classloader找不到的class,则由新classloader从这批url指向的jar包中加载到,主要就是我们的插件类和依赖的第三方类。 return new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader); |
上面定义了自己的classloader对象,由它负责去插件目录下的jar包寻找要加载的alert.class类以及依赖的各种class,熟悉java classloader机制的话应该不会有疑惑。
至此,插件的加载过程就基本熟悉了,至于我们要实现的Alerter接口,就很简单了:
1 2 3 4 5 6 7 8 9 |
public interface Alerter { void alertOnSuccess(ExecutableFlow var1) throws Exception; void alertOnError(ExecutableFlow var1, String... var2) throws Exception; void alertOnFirstError(ExecutableFlow var1) throws Exception; void alertOnSla(SlaOption var1, String var2) throws Exception; } |
flow成功、flow失败、flow首次失败、flow SLA违约,一共4个回调时机。
实现插件
我开源了一个demo项目供大家参考:https://github.com/owenliang/azkaban-dingding-alert。
POM的配置比较重要:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
<dependencies> <!-- https://mvnrepository.com/artifact/com.linkedin.azkaban/azkaban --> <dependency> <groupId>com.linkedin.azkaban</groupId> <artifactId>azkaban</artifactId> <version>2.5.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.dingtalk.api</groupId> <artifactId>dingtalk</artifactId> <version>1.0.0</version> <scope>system</scope> <systemPath>${project.basedir}/doc/azkaban-dingding-alert/lib/taobao-sdk-java-auto_1479188381469-20210114.jar</systemPath> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> <scope>provided</scope> </dependency> </dependencies> |
我们只在provided模式依赖了一下azkaban库和log4j库,前者包含了实现Alerter需要的那些class,后者是azkaban打日志用的库,我们打包的jar中不需要包含这些包,因为azkaban运行时classloader都能加载得到。
特别一点的是dingding的SDK,他们没提供maven只提供了jar包,我们把jar包放在项目下做本地编译时依赖,真正运行环境则是将dingding的jar包放在plugin的lib目录下来满足URLClassLoader的查找即可。
我只实现了flow失败时报警的逻辑,调用钉钉报警:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
package cc.yuerblog; import azkaban.alert.Alerter; import azkaban.executor.ExecutableFlow; import azkaban.sla.SlaOption; import azkaban.utils.Props; import com.dingtalk.api.DefaultDingTalkClient; import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiRobotSendRequest; import com.dingtalk.api.response.OapiRobotSendResponse; import java.text.SimpleDateFormat; import org.apache.log4j.Logger; public class AzkabanDingDingAlert implements Alerter { private static final Logger logger = Logger.getLogger(AzkabanDingDingAlert.class); private String serverUrl; public AzkabanDingDingAlert(Props props) { serverUrl = props.get("dingding.serverUrl"); logger.info("MyAlert construct..." + serverUrl); } private void sendDing(String msg) { logger.info("sendDing, msg=" + msg); try { DingTalkClient client = new DefaultDingTalkClient(serverUrl); // 请求 OapiRobotSendRequest request = new OapiRobotSendRequest(); request.setMsgtype("text"); // 文本附加到请求 OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text(); text.setContent(msg); request.setText(text); // 发送 OapiRobotSendResponse response = client.execute(request); } catch (Exception e) { logger.error(e.getMessage()); } } private String dateFormat(long ts) { SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return fmt.format(ts); } @Override public void alertOnSuccess(ExecutableFlow executableFlow) throws Exception { logger.info("alertOnSuccess"); } @Override public void alertOnError(ExecutableFlow executableFlow, String... strings) { logger.info("alertOnError"); sendDing(String.format("[Error] azkaban projectId=%d flow=%s executionId=%d startTime=%s endTime=%s error=%s", executableFlow.getProjectId(), executableFlow.getFlowId(), executableFlow.getExecutionId(), dateFormat(executableFlow.getStartTime()), dateFormat(executableFlow.getEndTime()), String.join("\n", strings) )); } @Override public void alertOnFirstError(ExecutableFlow executableFlow) throws Exception { logger.info("alertOnFirstError"); } @Override public void alertOnSla(SlaOption slaOption, String s) throws Exception { logger.info("alertOnSla"); } } |
dingding.serverUrl配置由azkaban加载自插件的plugin.properties文件。
插件部署
我写了一个build.sh编译脚本,它把插件jar包放到了一个符合azkaban plugin要求的目录结构下,该目录下我提前放好了插件配置和dingding sdk的jar包。
1 2 3 4 5 6 7 8 9 |
#!/bin/bash # 编译jar mvn clean package # 把jar放入plugin/lib目录 cp -f ./target/azkaban-dingding-alert-1.0.jar ./doc/azkaban-dingding-alert/lib/ # 把plugin目录整体打包 cd ./doc && tar czvf azkaban-dingding-alert.tar.gz azkaban-dingding-alert && mv ./azkaban-dingding-alert.tar.gz .. && cd - |
这是最终插件目录结构:
把上述azkaban-dingding-alert放到azkaban安装目录的plugins/alerter目录下,重启azkaban(我是solo server)即可生效。
插件使用
根据源码分析,使用时必须在配置flow时传参alter.type为插件的唯一标识,在flow里面或者global.properties配置都无效:
运行flow,观察azkaban的日志文件可以看到我们插件打印的log:
1 2 3 |
2021/01/15 14:46:41.546 +0800 INFO [AzkabanDingDingAlert] [main] [Azkaban] MyAlert construct...https://oapi.dingtalk.com/robot/send?access_toke 2021/01/15 14:46:42.539 +0800 INFO [AzkabanDingDingAlert] [ExecutorManagerUpdaterThread] [Azkaban] alertOnError 2021/01/15 14:46:42.541 +0800 INFO [AzkabanDingDingAlert] [ExecutorManagerUpdaterThread] [Azkaban] sendDing, msg=[Error] azkaban projectId=1 flow=main executionId=1743 startTime=2021-01-15 14:42:29 endTime=2021-01-15 14:46:42 error=Not running on the assigned executor (any more) |
完成~
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

2021/07/23 14:48:03.425 +0800 ERROR [ExecutionControllerUtils] [ExecutorManagerUpdaterThread] [Azkaban] Alerter type wechat doesn’t exist. Failed to alert.
参考您的方案,始终不能将自定义的alert实现效果,请教该怎么解决呢
插件使用 应该是 alert.type 而不是 alter.type