利用 JavaScript 实现并发控制的示例代码

(编辑:jimmy 日期: 2024/12/23 浏览:2)

一、前言

"htmlcode">

const task = timeout => new Promise((resolve) => setTimeout(() => {
  resolve(timeout);
 }, timeout))

 const taskList = [1000, 3000, 200, 1300, 800, 2000];

 async function startNoConcurrentControl() {
  console.time(NO_CONCURRENT_CONTROL_LOG);
  await Promise.all(taskList.map(item => task(item)));
  console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
 }

 startNoConcurrentControl();

"htmlcode">

class Queue {
  constructor() {
   this._queue = [];
  }

  push(value) {
   return this._queue.push(value);
  }

  shift() {
   return this._queue.shift();
  }

  isEmpty() {
   return this._queue.length === 0;
  }
 }

"htmlcode">

class DelayedTask {
  constructor(resolve, fn, args) {
   this.resolve = resolve;
   this.fn = fn;
   this.args = args;
  }
 }

"htmlcode">

class TaskPool {
  constructor(size) {
   this.size = size;
   this.queue = new Queue();
  }

  addTask(fn, args) {
   return new Promise((resolve) => {
    this.queue.push(new DelayedTask(resolve, fn, args));
    if (this.size) {
     this.size--;
     const { resolve: taskResole, fn, args } = this.queue.shift();
     taskResole(this.runTask(fn, args));
    }
   })
  }

  pullTask() {
   if (this.queue.isEmpty()) {
    return;
   }

   if (this.size === 0) {
    return;
   }

   this.size++;
   const { resolve, fn, args } = this.queue.shift();
   resolve(this.runTask(fn, args));
  }

  runTask(fn, args) {
   const result = Promise.resolve(fn(...args));

   result.then(() => {
    this.size--;
    this.pullTask();
   }).catch(() => {
    this.size--;
    this.pullTask();
   })

   return result;
  }
 }

TaskPool 包含三个关键方法:

  • addTask: 将新的任务放入队列当中,并触发任务池状态检测,如果当前任务池非满载状态,则从队列中取出任务放入任务池中执行。
  • runTask: 执行当前任务,任务执行完成之后,更新任务池状态,此时触发主动拉取新任务的机制。
  • pullTask: 如果当前队列不为空,且任务池不满载,则主动取出队列中的任务执行。

利用 JavaScript 实现并发控制的示例代码

"htmlcode">

 const cc = new ConcurrentControl(2);

 async function startConcurrentControl() {
  console.time(CONCURRENT_CONTROL_LOG);
  await Promise.all(taskList.map(item => cc.addTask(task, [item])))
  console.timeEnd(CONCURRENT_CONTROL_LOG);
 }

 startConcurrentControl();

"text-align: center">利用 JavaScript 实现并发控制的示例代码

"htmlcode">

await Promise.all(taskList.map(item => cc.addTask(task, [item])))

"htmlcode">

addTask(fn) {
  return (...args) => {
   return new Promise((resolve) => {
    this.queue.push(new DelayedTask(resolve, fn, args));

    if (this.size) {
     this.size--;
     const { resolve: taskResole, fn: taskFn, args: taskArgs } = this.queue.shift();
     taskResole(this.runTask(taskFn, taskArgs));
    }
   })
  }
 }

改造之后的代码显得简洁了很多:

 await Promise.all(taskList.map(cc.addTask(task)))

五、优化出队操作

"text-align: center">利用 JavaScript 实现并发控制的示例代码

"htmlcode">

const Benchmark = require('benchmark');
 const suite = new Benchmark.Suite;

 suite.add('shift', function() {
  let count = 10;
  const arr = generateArray(count);
  while (count--) {
   arr.shift();
  }
 })
 .add('reverse + pop', function() {
  let count = 10;
  const arr = generateArray(count);
  arr.reverse();
  while (count--) {
   arr.pop();
  }
 })
 .on('cycle', function(event) {
  console.log(String(event.target));
 })
 .on('complete', function() {
  console.log('Fastest is ' + this.filter('fastest').map('name'));
  console.log('\n')
 })
 .run({
  async: true
 })

通过 benchmark.js 跑出的基准测试数据,可以很容易地看出哪种方式的效率更高:

利用 JavaScript 实现并发控制的示例代码

"htmlcode">

 class HighPerformanceQueue {
  constructor() {
   this.q1 = []; // 用于 push 数据
   this.q2 = []; // 用于 shift 数据
  }

  push(value) {
   return this.q1.push(value);
  }

  shift() {
   let q2 = this.q2;
   if (q2.length === 0) {
    const q1 = this.q1;
    if (q1.length === 0) {
     return;
    }
    q2 = this.q2 = q1.reverse();
   }

   return q2.pop();
  }

  isEmpty() {
   if (this.q1.length === 0 && this.q2.length === 0) {
    return true;
   }
   return false;
  }
 }

最后通过基准测试来验证优化的效果:

利用 JavaScript 实现并发控制的示例代码