实现的功能
- 前端提交导出任务
- 可以实时查看任务进度
- 任务完成后可以直接下载
解决的问题
后台功能经常会有这样的需求,导出数据。比如导出订单数据,可能会关联,订单信息,商品信息,退货信息,优惠信息等等,所以可能会经常关联很多张表。数据少的时候还能将就着直接用sql连接查询,然后生成文件导出。数据多的时候这么实现就显然不合适了,大的表join会影响数据库性能导致正常的线上业务会变慢,而且长时间连接可能会出错,所以数据多了,业务人员会经常反馈下载失败。
在这样的场景下封装实现异步的导出框架可以解决以上痛点,业务任务只要根据自己需要提一个导出任务,然后就可以做自己的事情了不用一直等着,支持实时查看导出进度,当导出任务结束后会生成一个文件上传到存储服务器,并返回前端下载的路径,这样业务人员直接下载生成好的文件就行了。
框架思路解析
包括三部分:任务进度报告器,任务执行器,任务调度器。
任务进度报告器 TaskProgressReporter
使用ScheduledExecutorService定时监测进度的更新,如果有更新就更新到数据库中,用户在前端可以直接看到任务的进度。具体代码参考github。
public TaskProgressReporter(TaskManager manager, int periodSecs) {
this.manager = manager;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(getClass().getSimpleName() + "-%d")
.setUncaughtExceptionHandler((t, e) -> log.warn(t.getName(), e)).build();
this.executor = Executors.newScheduledThreadPool(1, threadFactory);
// 控制一定的时间执行一次去报告进度
this.executor.scheduleWithFixedDelay(this, periodSecs, periodSecs, TimeUnit.SECONDS);
}
@Override
public void run() {
Set<Long> taskIds = taskProgressMap.keySet();
for (long taskId : taskIds) {
Progress progress = taskProgressMap.get(taskId);
if (progress == null) {
continue;
}
if (progress.isChanged()) {
// 任务有变动就更新到数据库中,这样前端可以看到任务进度的变动
this.manager.updateProgress(taskId, progress.report());
}
}
}
static class Progress {
/**
* 使用java原子类控制进度,防止多线程修改进度出问题
*/
private final LongAdder adder = new LongAdder();
/**
* 任务进度更新时间
*/
private long updateTime;
/**
* 任务进度报告时间
*/
private long reportTime;
/**
* 更新进度
* @param delta
*/
public void delta(long delta) {
this.updateTime = System.currentTimeMillis();
adder.add(delta);
}
/**
* 报告当前进度
* @return
*/
public int report() {
this.reportTime = System.currentTimeMillis();
return adder.intValue();
}
/**
* 判断任务进度是否有变动
* @return
*/
public boolean isChanged() {
return reportTime < updateTime;
}
}
任务执行器
封装TaskRunner内部封装具体的执行步骤,定义几个暴露出去的接口由具体业务通过实现TaskRunner来自定义内部的逻辑, 具体代码参考github。
@Override
public void run() {
try {
task = manager.started(task);
parseConfig();
int total = countAll();
task = manager.inProgress(task, total);
doExport();
Attachment attachment = upload();
task = manager.finish(task, attachment.getResPath());
} catch (Throwable t) {
task = manager.error(task, t);
t.printStackTrace();
} finally {
manager.remove(task.getId());
}
}
protected abstract void parseConfig();
protected abstract int countAll();
protected abstract void doExport() throws IOException;
// 自定义订单导出
public class OrderExportTaskRunner extends TaskRunner {
public OrderExportTaskRunner() {
}
public OrderExportTaskRunner(ExportTask task) {
setTask(task);
}
@Override
protected void parseConfig() {
// todo 这里可以根据业务定义一些导出参数,比如导出时间段,这里省略
}
@Override
protected int countAll() {
// todo 根据业务返回订单总条数
return 1000;
}
@Override
protected void doExport() throws IOException {
// todo 这里实现业务数据组装逻辑,假如1000条数据
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
manager.deltaProgress(task.getId(), 1);
}
// todo 数据组装完之后保存到本地文件中 比如
FileUtils.write(tmpFile, "write biz data in this file", StandardCharsets.UTF_8);
}
}
任务调度器TaskManager
使用固定数量的线程池来调度提交的导出任务,并封装一些公用方法。具体代码参考[github (target=_blank)](https://github.com/Kingsea442/project-framework/tree/master/
任务存储更新服务 TaskService
主要与数据库交互保存任务状态和进度
使用例子
ExportTask exportTask = new ExportTask();
exportTask.setName("导出订单");
exportTask.setFileSuffix(".xlsx");
exportTask.setStatus(TaskStatus.TASK_CREATED);
exportTask = taskService.addTask(exportTask);
exportTask = manager.submit(new OrderExportTaskRunner(exportTask));