WANG LH , Research & Development

异步导出框架

2021.05.31 22:05

异步框架实现代码

实现的功能

  1. 前端提交导出任务
  2. 可以实时查看任务进度
  3. 任务完成后可以直接下载

解决的问题

后台功能经常会有这样的需求,导出数据。比如导出订单数据,可能会关联,订单信息,商品信息,退货信息,优惠信息等等,所以可能会经常关联很多张表。数据少的时候还能将就着直接用sql连接查询,然后生成文件导出。数据多的时候这么实现就显然不合适了,大的表join会影响数据库性能导致正常的线上业务会变慢,而且长时间连接可能会出错,所以数据多了,业务人员会经常反馈下载失败。

在这样的场景下封装实现异步的导出框架可以解决以上痛点,业务任务只要根据自己需要提一个导出任务,然后就可以做自己的事情了不用一直等着,支持实时查看导出进度,当导出任务结束后会生成一个文件上传到存储服务器,并返回前端下载的路径,这样业务人员直接下载生成好的文件就行了。

框架思路解析

包括三部分:任务进度报告器,任务执行器,任务调度器。

任务进度报告器 TaskProgressReporter

使用ScheduledExecutorService定时监测进度的更新,如果有更新就更新到数据库中,用户在前端可以直接看到任务的进度。具体代码参考github

  public TaskProgressReporter(TaskManager manager, int periodSecs) {
    this.manager = manager;
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
        .setNameFormat(getClass().getSimpleName() + "-%d")
        .setUncaughtExceptionHandler((t, e) -> log.warn(t.getName(), e)).build();
    this.executor = Executors.newScheduledThreadPool(1, threadFactory);
    // 控制一定的时间执行一次去报告进度
    this.executor.scheduleWithFixedDelay(this, periodSecs, periodSecs, TimeUnit.SECONDS);
  }

  @Override
  public void run() {
    Set<Long> taskIds = taskProgressMap.keySet();
    for (long taskId : taskIds) {
      Progress progress = taskProgressMap.get(taskId);
      if (progress == null) {
        continue;
      }

      if (progress.isChanged()) {
        // 任务有变动就更新到数据库中,这样前端可以看到任务进度的变动
        this.manager.updateProgress(taskId, progress.report());
      }
    }
  }

  static class Progress {

    /**
     * 使用java原子类控制进度,防止多线程修改进度出问题
     */
    private final LongAdder adder = new LongAdder();

    /**
     * 任务进度更新时间
     */
    private long updateTime;

    /**
     * 任务进度报告时间
     */
    private long reportTime;

    /**
     * 更新进度
     * @param delta
     */
    public void delta(long delta) {
      this.updateTime = System.currentTimeMillis();
      adder.add(delta);
    }

    /**
     * 报告当前进度
     * @return
     */
    public int report() {
      this.reportTime = System.currentTimeMillis();
      return adder.intValue();
    }

    /**
     * 判断任务进度是否有变动
     * @return
     */
    public boolean isChanged() {
      return reportTime < updateTime;
    }
  }

任务执行器

封装TaskRunner内部封装具体的执行步骤,定义几个暴露出去的接口由具体业务通过实现TaskRunner来自定义内部的逻辑, 具体代码参考github

  @Override
  public void run() {
    try {
      task = manager.started(task);
      parseConfig();
      int total = countAll();
      task = manager.inProgress(task, total);
      doExport();
      Attachment attachment = upload();
      task = manager.finish(task, attachment.getResPath());
    } catch (Throwable t) {
      task = manager.error(task, t);
      t.printStackTrace();
    } finally {
      manager.remove(task.getId());
    }

  }

  protected abstract void parseConfig();

  protected abstract int countAll();

  protected abstract void doExport() throws IOException;

// 自定义订单导出
public class OrderExportTaskRunner extends TaskRunner {
  public OrderExportTaskRunner() {
  }

  public OrderExportTaskRunner(ExportTask task) {
    setTask(task);
  }

  @Override
  protected void parseConfig() {
    // todo 这里可以根据业务定义一些导出参数,比如导出时间段,这里省略

  }

  @Override
  protected int countAll() {
    // todo 根据业务返回订单总条数
    return 1000;
  }

  @Override
  protected void doExport() throws IOException {
    // todo 这里实现业务数据组装逻辑,假如1000条数据
    for (int i = 0; i < 1000; i++) {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      manager.deltaProgress(task.getId(), 1);
    }

    // todo 数据组装完之后保存到本地文件中 比如
    FileUtils.write(tmpFile, "write biz data in this file", StandardCharsets.UTF_8);
  }
}

任务调度器TaskManager

使用固定数量的线程池来调度提交的导出任务,并封装一些公用方法。具体代码参考[github (target=_blank)](https://github.com/Kingsea442/project-framework/tree/master/

任务存储更新服务 TaskService

主要与数据库交互保存任务状态和进度

使用例子

ExportTask exportTask = new ExportTask();
exportTask.setName("导出订单");
exportTask.setFileSuffix(".xlsx");
exportTask.setStatus(TaskStatus.TASK_CREATED);
exportTask = taskService.addTask(exportTask);
exportTask = manager.submit(new OrderExportTaskRunner(exportTask));