DAG叫做有向无环图(Directed Acyclic Graph, DAG),读过数据结构的应该都有点印象。
DAG可以用来实现依赖管理,比如:执行A任务之前,必须先完成B任务,因为A任务的输入是B任务的输出。
当整个依赖关系继续变多的时候,就催生出了依赖管理的需求,而A对B的依赖可以描述为图中的一条边。
项目地址
https://github.com/owenliang/task_schedule
DAG
掌握DAG是实现依赖调度的先决条件,DAG有3部分内容需要掌握:
- DAG数据结构
- DAG拓扑排序(基于入度)
- DAG逆拓扑序列(基于出度)
推荐读一下这篇博客温习DAG数据结构,拓扑排序的含义与算法:【图论】有向无环图的拓扑排序。
任务依赖
A任务依赖B与C任务,那么就需要建立一条A->B的边、一条A->C的边。
我们需要先执行那些依赖已经执行OK或者压根没有依赖的任务,在这里也就是先执行B和C,最后执行A。
所以,我们需要基于”出度=0″这个条件作为执行条件,任何满足”出度=0″的任务均可以立即执行;在上述例子中,B和C没有依赖其他任务,它们从一开始出度就为0,所以可以立即执行。
任务调度
每次从DAG中找出”出度=0″且”尚未执行”的任务列表,逐个或者并发的执行它们;每当1个任务完成时,就从DAG图中删除这个任务,并发现更多”出度=0″的任务,放入待执行的任务集合中去。
调度过程,就是不断执行依赖完备的任务,并不断发现更多依赖完备的新任务的过程。
构建DAG
1 2 3 4 5 6 7 8 9 10 |
/* * * J O B 1 * / \ \ * V V V * JOB2 JOB3 JOB5 * \ / * V V * JOB4 */ |
这样一个依赖关系,JOB1依赖JOB2、JOB3、JOB5;JOB2依赖JOB4;JOB3依赖JOB4;
我们通过代码控制,建立这个DAG图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// JOB5: 没有依赖 std::vector<std::string> job5Deps; graph.addTask("job5", job5Deps); // JOB4: 没有依赖 std::vector<std::string> job4Deps; graph.addTask("job4", job4Deps); // JOB3: 依赖JOB4 std::vector<std::string> job3Deps; job3Deps.push_back("job4"); graph.addTask("job3", job3Deps); // JOB2: 依赖JOB4 std::vector<std::string> job2Deps; job2Deps.push_back("job4"); graph.addTask("job2", job2Deps); // JOB1: 依赖JOB2和JOB3和JOB5 std::vector<std::string> job1Deps; job1Deps.push_back("job2"); job1Deps.push_back("job3"); job1Deps.push_back("job5"); graph.addTask("job1", job1Deps); // 初始化调度 if (!graph.initGraph()) { std::cerr << "initGraph fail" << std::endl; return -1; } |
调度任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// 串行执行这些任务 std::vector<std::string> todo; do { todo.clear(); // 获取当前待办任务 graph.getTodoTasks(&todo); if (!todo.size()) { // 没有待办任务, 结束调度 break; } // 输出可以并行处理的待办任务 std::cout << "<<<<<<<<< 可执行待办任务列表: "; for (size_t i = 0; i != todo.size(); ++i) { std::cout << " " << todo[i] << " "; } std::cout << std::endl; // 串行执行待办任务(可以通过程序并行调度, 不做演示) for (size_t i = 0; i != todo.size(); ++i) { // 模拟任务执行 std::cout << ">>>>>>>>>" << todo[i] << " 被执行" << std::endl; // 标记任务完成 if (!graph.markTaskDone(todo[i])) { std::cerr << "markTaskDone fail" << std::endl; return -1; } // 再次打印任务调度图 graph.printGraph(std::cout); } } while (true); |
每次从DAG中找到”出度=0″且”尚未执行”的任务,放入todo数组。
这些任务之间必然没有依赖关系,可以完全并行化处理,并行度由程序灵活控制;在这里为了简化问题,我采用了串行执行任务的方式。
每完成一个任务,更新DAG图标记任务完成,这将使一些依赖该任务的任务变成可执行状态,从而在下一次调度时可以获取到这些新任务。
打印DAG
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// 打印所有任务信息 void TaskGraph::printGraph(std::ostream& ostream) { ostream << "---------------" << std::endl; for (Graph::iterator iter = graph_.begin(); iter != graph_.end(); ++iter) { TaskNode* node = iter->second; ostream << "任务名:" << iter->first << std::endl; ostream << "是否完成:" << (node->done ? "YES" : "NO") << std::endl; ostream << "(当前)依赖这些任务:"; for (std::map<std::string, bool>::iterator outIter = node->outEdge.begin(); outIter != node->outEdge.end(); ++outIter) { if (!outIter->second) { ostream << " " << outIter->first << " "; } } ostream << std::endl; ostream << "(当前)被这些任务依赖:"; for (std::map<std::string, bool>::iterator inIter = node->inEdge.begin(); inIter != node->inEdge.end(); ++inIter) { if (!inIter->second) { ostream << " " << inIter->first << " "; } } ostream << std::endl; } ostream << "---------------" << std::endl; } |
这个过程会打印每一个任务当前依赖哪些任务,以及被哪些任务依赖,方便我调试代码。
TaskNode表示DAG中的一个节点,而TaskNode.outEdge表示这个节点指向了哪些节点,TaskNode.inEdge表示这个节点被哪些节点指向。
上述2个集合随着任务的完成被更新,从而可以随时获知最新的任务依赖关系。
任务调度日志
下面是完整执行日志,展现了整个调度过程中,每个任务依赖与被依赖的关系,以及任务之间的并行关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
--------------- 任务名:job1 是否完成:NO (当前)依赖这些任务: job2 job3 job5 (当前)被这些任务依赖: 任务名:job2 是否完成:NO (当前)依赖这些任务: job4 (当前)被这些任务依赖: job1 任务名:job3 是否完成:NO (当前)依赖这些任务: job4 (当前)被这些任务依赖: job1 任务名:job4 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job2 job3 任务名:job5 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 --------------- <<<<<<<<< 可执行待办任务列表: job4 job5 >>>>>>>>>job4 被执行 --------------- 任务名:job1 是否完成:NO (当前)依赖这些任务: job2 job3 job5 (当前)被这些任务依赖: 任务名:job2 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job3 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job4 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job2 job3 任务名:job5 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 --------------- >>>>>>>>>job5 被执行 --------------- 任务名:job1 是否完成:NO (当前)依赖这些任务: job2 job3 (当前)被这些任务依赖: 任务名:job2 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job3 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job4 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job2 job3 任务名:job5 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job1 --------------- <<<<<<<<< 可执行待办任务列表: job2 job3 >>>>>>>>>job2 被执行 --------------- 任务名:job1 是否完成:NO (当前)依赖这些任务: job3 (当前)被这些任务依赖: 任务名:job2 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job3 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job4 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job3 任务名:job5 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job1 --------------- >>>>>>>>>job3 被执行 --------------- 任务名:job1 是否完成:NO (当前)依赖这些任务: (当前)被这些任务依赖: 任务名:job2 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job3 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job1 任务名:job4 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: 任务名:job5 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: job1 --------------- <<<<<<<<< 可执行待办任务列表: job1 >>>>>>>>>job1 被执行 --------------- 任务名:job1 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: 任务名:job2 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: 任务名:job3 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: 任务名:job4 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: 任务名:job5 是否完成:YES (当前)依赖这些任务: (当前)被这些任务依赖: --------------- |
最后
基于该demo,可以演化出一个任务调度系统。
用户向系统提交一个DAG拓扑,大概长这样:
拓扑名:hadoop挖掘任务
任务列表:
- JOB1
- 依赖:JOB2,JOB3
- JOB2:
- 依赖:JOB3
系统根据上述配置,创建对应的DAG图,并启动线程/协程完成调度。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
