English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Requirement: Function A needs to call a third-party API to get data, and the third-party API itself is an asynchronous processing method. After calling it, it will return data and status { data: "query result", "status": "async processing in progress" }. This requires calling the third-party API again after a period of time to get the data. In order to prevent the user from having to wait because the third-party API is in asynchronous processing when using function A, the user request is added to the task queue, partial data is returned, and the request is closed. Then, tasks are scheduled to be taken out from the task queue and called by the third-party API. If the returned status is "async processing in progress", the task is added to the task queue again. If the returned status is "completed", the returned data is stored in the database.
Based on the above problems, it occurred to me to use Node.js + Redis sorted set is used to implement the task queue. Node.js implements its own application API to accept user requests, merge the existing data in the database with the partial data returned by the API, and return it to the user. Tasks are added to the task queue. Use Node.js child process and cron to schedule the execution of tasks from the task queue.
Several issues need to be considered in the design process of the task queue
Solution to the above problems
Example code
// remote_api.js simulates a third-party API 'use strict'; const app = require('express')(); app.get('/', (req, res) => { setTimeout(() => { let arr = [200, 300]; // 200 represents success,300 represents failure and requires a re-request res.status(200).send({ 'status': arr[parseInt(Math.random() * 2]); }, 3000); }); app.listen('9001', () => { console.log('API service listening port:9001); }); // producer.js is the self-application API used to accept user requests and add tasks to the task queue 'use strict'; const app = require('express')(); const redisClient = require('redis').createClient(); const QUEUE_NAME = 'queue:example'; function addTaskToQueue(taskName, callback) { // Firstly, judge whether the task already exists, if it exists: skip, if not: add the task to the task queue redisClient.zscore(QUEUE_NAME, taskName, (error, task) => { if (error) { console.log(error); } if (task) { console.log('Task already exists, no duplicate task will be added'); callback(null, task); } redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } }); } } }); } app.get('/', (req, res) => { let taskName = req.query['task-name']; addTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } res.status(200).send('Querying in progress......'); } }); }); app.listen(9002, () => { console.log('Producer service listening port:9002); }); // consumer.js schedules tasks and executes them 'use strict'; const redisClient = require('redis').createClient(); const request = require('request'); const schedule = require('node-schedule); const QUEUE_NAME = 'queue:expmple'; const PARALLEL_TASK_NUMBER = 2; // Number of tasks executed in parallel function getTasksFromQueue(callback) { // Get multiple tasks redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => { if (error) { callback(error); } // Set the task score to 0, indicating it is being processed if (tasks.length > 0) { let tmp = []; tasks.forEach((task) => { tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => { if (error) { callback(error); } callback(null, tasks); } }); } } }); } function addFailedTaskToQueue(taskName, callback) { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } }); } function removeSucceedTaskFromQueue(taskName, callback) { redisClient.zrem(QUEUE_NAME, taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } } } function execTask(taskName) { return new Promise((resolve, reject) => { let requestOptions = { 'url': 'http://127.0.0.1:9001', 'method': 'GET', 'timeout': 5000 }; request(requestOptions, (error, response, body) => { if (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error) => { if (error) { console.log(error); } } }); } try { body = typeof body !== 'object' ? JSON.parse(body) : body; } resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); return; } if (body.status !== 200) { resolve('failed'); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); } resolve('succeed'); removeSucceedTaskFromQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); } } }); }); } // Schedule, every 5 Get new tasks to execute every second let job = schedule.scheduleJob('*/5 * * * * *', () => { console.log('Get new task'); getTasksFromQueue((error, tasks) => { if (error) { console.log(error); } if (tasks.length > 0) { console.log(tasks); Promise.all(tasks.map(execTask)) .then((results) => { console.log(results); } .catch((error) => { console.log(error); }); } } }); });