English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Recently, I encountered a business scenario in my work that requires a scheduled push of a batch of data to another system every day. However, since the system is deployed in a cluster, it may cause task contention under unified conditions, so a distributed lock needs to be added to ensure that there is only one Job to complete the scheduled task within a certain time range. The initial solutions considered were using ZooKeeper distributed tasks and Quartz distributed task scheduling, but since ZooKeeper requires an additional component and Quartz requires an additional table, and there is already a Redis component existing in the project, it was considered to use Redis distributed lock to complete the distributed task抢占 this feature
Record the detours taken.
Version 1:
@Override public <T> Long set(String key,T value, Long cacheSeconds) { if (value instanceof HashMap) { BoundHashOperations valueOperations = redisTemplate.boundHashOps(key); valueOperations.putAll((Map) value); valueOperations.expire(cacheSeconds, TimeUnit.SECONDS); } else{ //Using map to store BoundHashOperations valueOperations = redisTemplate.boundHashOps(key); valueOperations.put(key, value); //seconds valueOperations.expire(cacheSeconds, TimeUnit.SECONDS); } return null; } @Override public void del(String key) { redisTemplate.delete(key); }
The lock occupation and release are completed by using set and del, but after testing, it is found that set is not thread-safe and often leads to data inconsistency in concurrent situations.
Second version:
/** * Distributed lock * @param range The length of the lock, allowing how many requests to seize resources * @param key * @return */ public boolean getLock(int range, String key) { ValueOperations<String, Integer> valueOper1 = template.opsForValue(); return valueOper1.increment(key, 1) <= range; } /** * Initialize the lock, set to 0 * @param key * @param expireSeconds * @return */ public void initLock(String key, Long expireSeconds) { ValueOperations<String, Integer> operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); operations.set(key, 0, expireSeconds * 1000); } /** * Release lock * @param key */ public void releaseLock(String key) { ValueOperations<String, Integer> operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.delete(key); }
The lock is acquired by using the increment operation of redis. However, when releasing the lock, any thread can delete the key value in redis. And the initLock method will overwrite the previous operation, so this method is also deprecated.
Final version:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.stereotype.Service; import org.springframework.util.ReflectionUtils; import redis.clients.jedis.Jedis; import java.lang.reflect.Field; import java.util.Collections; @Service public class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisConnectionFactory connectionFactory; /** * Attempt to acquire a distributed lock * @param lockKey Lock * @param requestId Request identifier * @param expireTime Timeout time * @return Whether the acquisition is successful */ public boolean lock(String lockKey, String requestId, int expireTime) { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) {}} return true; } return false; } /** * Release distributed lock * @param lockKey Lock * @param requestId Request identifier * @return Whether the release is successful */ public boolean releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1}) then return redis.call('del', KEYS[1}) else return 0 end; Object result = getJedis().eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } public Jedis getJedis() { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); return jedis; } }