拓扑排序 So Easy! 带你从0手撸一个依赖任务加载框架
我收回标题上的话,从0手撸一个框架一点也不轻松,需要考虑的地方比较多,一些实现和细节值得商榷,对我这种菜鸡也是一个比较大的挑战,有不足的地方欢迎大佬们提供意见,最后标题是某个大佬常用的风格,我看到等于是我的,没错。
依赖任务加载
平时我们常常会使用各种第三方框架,如mmkv、glide、leakcanary等优秀的第三方库,大多数第三方库需要初始化后才能使用,因此会出现下面的代码:
private void init() { mmkv.init(context); glide.init(context); leakcanary.init(context); ...... } 复制代码
如果不想让任务的初始化阻塞主线程太久,我们可以考虑通过异步的方式加载任务,直到最后一个必要任务加载完毕,开始进行对应的操作。
如果部分任务是依赖关系,如下图任务A依赖任务B,单纯异步的方式的方式显然不能满足述求。
我们通常会想到的解决办法有三类:
将任务B写进任务A的末尾
监听任务A加载成功的回调函数执行任务B
通过volatile关键字卡住加载流程
这样确实能够解决依赖任务的加载问题,但如果任务的数量和依赖关系更复杂呢?
那如果是这样,你要怎么去处理?
显然是有一种更通用的方法来解决这种场景,也就是下面会讲到的有向无环图。
有向无环图的拓扑排序
上面的依赖关系可以看成一种有向无环图(Directed Acyclic Graph, DAG),有向可以理解,表现的是任务的依赖关系,而无环是必要的,因为如果任务A和任务B相互依赖,都需要等待对方的结束来开始,经典死锁套娃。
我们可以通过拓扑排序将最后的线性执行关系呈现出来,什么是拓扑排序?
将上面复杂依赖任务简单的分析一下,任务A前方没有依赖,因此我们可以将任务A的度记为0,任务B、C、E前方各有一个依赖关系,我们把度记为1,剩下的任务D前方由于有两个依赖关系,我们将度计为2;用一个任务队列储存度为0的任务,每当入列任务加载完毕,它对应依赖任务的度-1,新的度为0的任务进队列。
A入队列,A任务完成后,依赖A任务的任务B、C度-1
这时任务B、C度都为0,都可以入队列,没有既定的顺序,我们选择入任务C,待C任务完成后,依赖C任务的D任务的度-1
接着是任务B进去,B任务完成后,任务D、E的度-1
最后是任务D、E其中的一个进去,随意选择,我们选择任务D
最后一个任务E
不考虑各个任务之间的耗时情况,依赖任务关系被拓扑排序成A->C->B->D->E,是不是发现依赖关系被解开了,排成了线性关系,这种将有向无环图拓扑成线性关系的方式被称为拓扑排序,拓扑结果根据所使用算法的不同而有所差异,这也是后面实现依赖任务加载框架的中心思想。
手撸依赖任务加载框架
定义IDAGTask类
上面提到依赖任务的加载可以通过有向无环图的拓扑排序解决,我们开始用代码实现,先定义一个IDAGTask类:
public class IDAGTask{ } 复制代码
可能大家会疑问,为什么不用接口或者抽象类的思想去做这个基础类,后面解答这个疑惑。
特殊的任务会存在加载线程限制,比如只能在主线程对这个任务进行加载,因此我们需要考虑这个任务是否可以同步。异步任务显然需要使用到线程池,定义IDAGTask类实现Runnable接口,方便后续丢进线程池。
除此之外,之前讲到拓扑排序中任务有个度的概念,其实就是依赖关系的数量,在并发环境下为了保证依赖关系数量的线程可见性,这里我们使用AtomicInteger变量,通过CAS锁来保证依赖数量的实时正确性,因此IDAGTask类变成了这样:
public class IDAGTask implements Runnable { private final boolean mIsSyn; private final AtomicInteger mAtomicInteger; boolean getIsAsync() { return mIsSyn; } void addRely() { mAtomicInteger.incrementAndGet(); } void deleteRely() { mAtomicInteger.decrementAndGet(); } int getRely() { return mAtomicInteger.get(); } @Override public void run() { } } 复制代码
回到之前为什么不用接口或者抽象类的方式来实现这个基础类,一方面为了后续将任务丢进线程池,IDAGTask实现了Runnable接口,接口的方式显然pass,另一方面抽象类的方式涉及到了另一个问题:
抽象run方法,可以将IDAGTask任务的监听封装进去,比如startTask、completeDAGTask,如果我们继承IDATask,只需要将初始化部分单纯写进run方法就好了,非常优雅,但是有一种case,如果这个任务的初始化是用多线程实现的,我们调用完Task.init(),马上执行completeDAGTask的监听其实是不对的
基于上面的case,我选择了一种不优雅的实现方式,将startTask的监听写在run方法的开头,completeDAGTask的监听需要调用者自己添加,任务初始化是单线程实现写在run方法的末尾即可,任务初始化采用多线程实现,需要将completeDAGTask监听写进加载成功回调
综上,run方法写进了startTask的回调,因此抽象失败,那么IDAGTask没有抽象方法,自然也不需要作为一个抽象类
经过一些加工,最后IDATask实现如下:
public class IDAGTask implements Runnable { private final boolean mIsSyn; private final AtomicInteger mAtomicInteger; private IDAGCallBack mDAGCallBack; private final Set<IDAGTask> mNextTaskSet; public IDAGTask() { this(""); } public IDAGTask(boolean isSyn) { this("", isSyn); } public IDAGTask(String alias) { this(alias, false); } public IDAGTask(String alias, boolean IsSyn) { mIsSyn = IsSyn; mAtomicInteger = new AtomicInteger(); mDAGCallBack = new DAGCallBack(alias); mNextTaskSet = new HashSet<>(); } boolean getIsAsync() { return mIsSyn; } void addRely() { mAtomicInteger.incrementAndGet(); } void deleteRely() { mAtomicInteger.decrementAndGet(); } int getRely() { return mAtomicInteger.get(); } void addNextDAGTask(IDAGTask DAGTask) { mNextTaskSet.add(DAGTask); } public void setDAGCallBack(IDAGCallBack DAGCallBack) { this.mDAGCallBack = DAGCallBack; } public void completeDAGTask() { for (IDAGTask DAGTask : mNextTaskSet) { DAGTask.deleteRely(); } mDAGCallBack.onCompleteDAGTask(); } @Override public void run() { mDAGCallBack.onStartDAGTask(); } } 复制代码
定义DAGProject类
IDAGTask的模板就被敲定了,接下来我们需要建立任务之间的关系
Set储存所有的任务,通过addDAGTask添加任务
Map<IDAGTask, Set>储存所有的任务与其前置依赖关系,通过addDAGEdge添加任务依赖关系
整体采用建造者模式构建DAGProject类
于是DAGProject实现如下:
public class DAGProject { private final Set<IDAGTask> mTaskSet; private final Map<IDAGTask, Set<IDAGTask>> mTaskMap; public DAGProject(Builder builder) { mTaskSet = builder.mTaskSet; mTaskMap = builder.mTaskMap; } Set<IDAGTask> getDAGTaskSet() { return mTaskSet; } Map<IDAGTask, Set<IDAGTask>> getDAGTaskMap() { return mTaskMap; } public static class Builder { private final Set<IDAGTask> mTaskSet = new HashSet<>(); private final Map<IDAGTask, Set<IDAGTask>> mTaskMap = new HashMap<>(); public Builder addDAGTask(IDAGTask DAGTask) { if (this.mTaskSet.contains(DAGTask)) { throw new IllegalArgumentException(); } this.mTaskSet.add(DAGTask); return this; } public Builder addDAGEdge(IDAGTask DAGTask, IDAGTask preDAGTask) { if (!this.mTaskSet.contains(DAGTask) || !this.mTaskSet.contains(preDAGTask)) { throw new IllegalArgumentException(); } Set<IDAGTask> preDAGTaskSet = this.mTaskMap.get(DAGTask); if (preDAGTaskSet == null) { preDAGTaskSet = new HashSet<>(); this.mTaskMap.put(DAGTask, preDAGTaskSet); } if (preDAGTaskSet.contains(preDAGTask)) { throw new IllegalArgumentException(); } DAGTask.addRely(); preDAGTaskSet.add(preDAGTask); preDAGTask.addNextDAGTask(DAGTask); return this; } public DAGProject builder() { return new DAGProject(this); } } } 复制代码
使用时,我们需要创建对应的IDAGTask,通过addDAGTask、addDAGEdge方法构建出对应有向无环图:
ATask a = new ATask(); BTask b = new BTask(); CTask c = new CTask(); DTask d = new DTask(); ETask e = new ETask(); DAGProject dagProject = new DAGProject.Builder() .addDAGTask(a) .addDAGTask(b) .addDAGTask(c) .addDAGTask(e) .addDAGTask(d) .addDAGEdge(b, a) .addDAGEdge(c, a) .addDAGEdge(d, b) .addDAGEdge(d, c) .addDAGEdge(e, b) .builder(); 复制代码
表达任务依赖关系的DAGProject对象就通过建造者模式构建成功了。
依赖任务加载的调度
当多个任务构建成有向无环图的DAGProject后,我们先不着急丢进线程池,执行对应逻辑前先检测是否有环,这样我们可以在任务加载前抛出相互依赖的错误,大可不必等到执行至有环那一步才抛出。虽然有环可以靠输入者去保障,但是在一些小细节方面,我们要求输入者保证过于苛刻也过于差体验。
将度为0的任务储存在tempTaskQueue
while循环将任务取出,存在依赖关系则对应的任务度-1,如果度为0,添加到resultTaskQueue
判断最后的resultTaskQueue与之前储存任务的set个数是否相等,false则有环抛出异常
public class DAGScheduler { private void checkCircle(Set<IDAGTask> TaskSet, Map<IDAGTask, Set<IDAGTask>> TaskMap) { LinkedList<IDAGTask> resultTaskQueue = new LinkedList<>(); LinkedList<IDAGTask> tempTaskQueue = new LinkedList<>(); for (IDAGTask DAGTask : tempTaskSet) { if (tempTaskMap.get(DAGTask) == null) { tempTaskQueue.add(DAGTask); } } while (!tempTaskQueue.isEmpty()) { IDAGTask tempDAGTask = tempTaskQueue.pop(); resultTaskQueue.add(tempDAGTask); for (IDAGTask DAGTask : tempTaskMap.keySet()) { Set<IDAGTask> tempDAGSet = tempTaskMap.get(DAGTask); if (tempDAGSet != null && tempDAGSet.contains(tempDAGTask)) { tempDAGSet.remove(tempDAGTask); if (tempDAGSet.size() == 0) { tempTaskQueue.add(DAGTask); } } } } if (resultTaskQueue.size() != tempTaskSet.size()) { throw new IllegalArgumentException("相互依赖,玩屁啊,我不跑了!"); } } } 复制代码
检测完环后,开始调度这些依赖任务,将度为0的任务加入阻塞队列,通过newSingleThreadExecutor开启一个线程不断去阻塞队列拿任务。
判断拿出的任务属于主线程执行还是异步执行,主线程执行通过handler.post发送出去,异步执行通过线程池execute
所有任务执行完毕,关闭线程池,结束遍历
不断将度为0的任务加入阻塞队列
public class DAGScheduler { private void loop() { for (IDAGTask DAGTask : mTaskSet) { if (DAGTask.getRely() == 0) { mTaskBlockingDeque.add(DAGTask); } } ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); singleThreadExecutor.execute(() -> { for (; ; ) { try { while (!mTaskBlockingDeque.isEmpty()) { IDAGTask executedDAGTsk = (IDAGTask) mTaskBlockingDeque.take(); if (executedDAGTsk.getIsAsync()) { Handler handler = new Handler(getMainLooper()); handler.post(executedDAGTsk); } else { mTaskThreadPool.execute(executedDAGTsk); } mTaskSet.remove(executedDAGTsk); } if (mTaskSet.isEmpty()) { singleThreadExecutor.shutdown(); mTaskThreadPool.shutdown(); return; } Iterator<IDAGTask> iterator = mTaskSet.iterator(); while (iterator.hasNext()) { IDAGTask DAGTask = iterator.next(); if (DAGTask.getRely() == 0) { mTaskBlockingDeque.put(DAGTask); iterator.remove(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }); } } 复制代码
至此依赖任务的调度器搭建完毕,配合之前构建好的DAGProject,使用方法如下:
DAGScheduler dagScheduler = new DAGScheduler();dagScheduler.start(dagProject); 复制代码
使用方式
第一步,对应build.gradle配置远程依赖,已经发布到maven central,不用担心jcenter弃用
implementation 'work.lingling.dagtask:dagtsk:1.0.0' 复制代码
第二步,继承IDAGTask类,在run方法中实现对应的初始化逻辑
public class ATask extends IDAGTask { public ATask(String alias) { super(alias); } @Override public void run() { super.run(); try { // 模拟随机时间 Random random = new Random(); Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } // 第三方框架内部使用同步加载 // completeDAGTask方法写在run方法末尾即可 completeDAGTask(); } // 第三方框架内部使用异步加载 // completeDAGTask方法需要写进成功回调 /*onLibrarySuccess(){ completeDAGTask(); }*/ } 复制代码
tips:加载任务内部未开线程,completeDAGTask方法写在run方法的末尾,感知初始化结束;加载任务内部使用多线程,需要将completeDAGTask方法写进加载成功回调。
第三步,根据任务的依赖关系构建DAGProject并执行
回首一开始出现的复杂依赖关系:
我们模拟对应的任务,任务A、B、C、D、E,构建DAGProject如下:
ATask a = new ATask("ATask"); BTask b = new BTask("BTask"); CTask c = new CTask("CTask"); DTask d = new DTask("DTask"); ETask e = new ETask("ETask"); DAGProject dagProject = new DAGProject.Builder() .addDAGTask(b) .addDAGTask(c) .addDAGTask(a) .addDAGTask(d) .addDAGTask(e) .addDAGEdge(b, a) .addDAGEdge(c, a) .addDAGEdge(d, b) .addDAGEdge(d, c) .addDAGEdge(e, b) .builder(); DAGScheduler dagScheduler = new DAGScheduler(); dagScheduler.start(dagProject); 复制代码
依赖任务执行结果如下:
可以看到依赖任务被拆开成A、C、B、E、D的顺序进行执行。
结语
行文至此,总算凑到了结尾,1202年了,居然还有人在用java写客户端。
框架实现整体很简单,但还是踩了很多坑,大到框架整体应该如何实现,小到设计模式应该如何使用、对外应该暴露什么方法、maven central如何上传等等各种细节问题,综上,这是一篇很青涩的文章。中途参考了很多大佬的文章思路和美好意见,但还是很不足,欢迎大佬们下场one one指导。
作者:LING-0001
链接:https://juejin.cn/post/7027798345179463687