English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Node.js + Redis Sorted Set Implementation of Task Queue

Requirement: Function A needs to call a third-party API to get data, and the third-party API itself is an asynchronous processing method. After calling it, it will return data and status { data: "query result", "status": "async processing in progress" }. This requires calling the third-party API again after a period of time to get the data. In order to prevent the user from having to wait because the third-party API is in asynchronous processing when using function A, the user request is added to the task queue, partial data is returned, and the request is closed. Then, tasks are scheduled to be taken out from the task queue and called by the third-party API. If the returned status is "async processing in progress", the task is added to the task queue again. If the returned status is "completed", the returned data is stored in the database.

Based on the above problems, it occurred to me to use Node.js + Redis sorted set is used to implement the task queue. Node.js implements its own application API to accept user requests, merge the existing data in the database with the partial data returned by the API, and return it to the user. Tasks are added to the task queue. Use Node.js child process and cron to schedule the execution of tasks from the task queue.

Several issues need to be considered in the design process of the task queue

  • Parallel execution of multiple tasks
  • Task uniqueness
  • Handling after task success or failure

Solution to the above problems

  • Parallel execution of multiple tasks is implemented using Promise.all
  • The uniqueness of tasks is implemented using Redis sorted set. Using timestamps as scores allows the sorted set to be used as a list. When adding a task, it is checked whether the task already exists. When executing the task, the task score is set to 0. Each time, execute the task with a score greater than 0, which can avoid the repetition of task execution.
  • After the task is executed successfully, delete the task. If the task execution fails, update the task score to the current timestamp, so that the failed tasks can be re-added to the end of the task queue

Example code

// remote_api.js simulates a third-party API
'use strict';
const app = require('express')();
app.get('/', (req, res) => {
  setTimeout(() => {
    let arr = [200, 300]; // 200 represents success,300 represents failure and requires a re-request
    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2]);
  }, 3000);
});
app.listen('9001', () => {
  console.log('API service listening port:9001);
});
// producer.js is the self-application API used to accept user requests and add tasks to the task queue
'use strict';
const app = require('express')();
const redisClient = require('redis').createClient();
const QUEUE_NAME = 'queue:example';
function addTaskToQueue(taskName, callback) {
  // Firstly, judge whether the task already exists, if it exists: skip, if not: add the task to the task queue
  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
    if (error) {
      console.log(error);
    }
      if (task) {
        console.log('Task already exists, no duplicate task will be added');
        callback(null, task);
      }
        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
          if (error) {
            callback(error);
          }
            callback(null, result);
          }
        });
      }
    }
  });
}
app.get('/', (req, res) => {
  let taskName = req.query['task-name'];
  addTaskToQueue(taskName, (error, result) => {
    if (error) {
      console.log(error);
    }
      res.status(200).send('Querying in progress......');
    }
  });
});
app.listen(9002, () => {
  console.log('Producer service listening port:9002);
});
// consumer.js schedules tasks and executes them
'use strict';
const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule);
const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // Number of tasks executed in parallel
function getTasksFromQueue(callback) {
  // Get multiple tasks
  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
    if (error) {
      callback(error);
    }
      // Set the task score to 0, indicating it is being processed
      if (tasks.length > 0) {
        let tmp = [];
        tasks.forEach((task) => {
          tmp.push(0);
          tmp.push(task);
        });
        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
          if (error) {
            callback(error);
          }
            callback(null, tasks);
          }
        });
      }
    }
  });
}
function addFailedTaskToQueue(taskName, callback) {
  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
    if (error) {
      callback(error);
    }
      callback(null, result);
    }
  });
}
function removeSucceedTaskFromQueue(taskName, callback) {
  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
    if (error) {
      callback(error);
    }
      callback(null, result);
    }
  }
}
function execTask(taskName) {
  return new Promise((resolve, reject) => {
    let requestOptions = {
      'url': 'http://127.0.0.1:9001',
      'method': 'GET',
      'timeout': 5000
    };
    request(requestOptions, (error, response, body) => {
      if (error) {
        resolve('failed');
        console.log(error);
        addFailedTaskToQueue(taskName, (error) => {
          if (error) {
            console.log(error);
          }
          }
        });
      }
        try {
          body = typeof body !== 'object' ? JSON.parse(body) : body;
        }
          resolve('failed');
          console.log(error);
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            }
            }
          });
          return;
        }
        if (body.status !== 200) {
          resolve('failed');
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            }
            }
          });
        }
          resolve('succeed');
          removeSucceedTaskFromQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            }
            }
          });
        }
      }
    });
  });
}
// Schedule, every 5 Get new tasks to execute every second
let job = schedule.scheduleJob('*/5 * * * * *', () => {
  console.log('Get new task');
  getTasksFromQueue((error, tasks) => {
    if (error) {
      console.log(error);
    }
      if (tasks.length > 0) {
        console.log(tasks);
        Promise.all(tasks.map(execTask))
        .then((results) => {
          console.log(results);
        }
        .catch((error) => {
          console.log(error);
        });
      }
    }
  });
});
Elasticsearch Tutorial