English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Redis Stream is a 5.0 version of the new data structure.
Redis Stream is mainly used for message queue (MQ, Message Queue), Redis itself has a Redis publish-subscribe (pub/sub) is used to implement the message queue function, but it has the drawback that messages cannot be persisted, and if there is a network disconnection, Redis failure, and other situations, messages will be discarded.
In simple terms, publish-subscribe (pub/(sub) can distribute messages, but cannot record historical messages.
Redis Stream provides message persistence and master-slave replication features, allowing any client to access data at any moment and remember the access position of each client, and ensure that messages are not lost.
The structure of Redis Stream is as follows, it has a message list that links all the messages added together, each message has a unique ID and corresponding content:
Each Stream has a unique name, which is the Redis key, and it is automatically created when we use the xadd command to append messages for the first time.
Above diagram analysis:
Consumer Group : Consumer group, created by the XGROUP CREATE command, a consumer group has multiple consumers (Consumer).
last_delivered_id : Cursor, each consumer group has a cursor last_delivered_id, and any consumer reading messages will cause the cursor last_delivered_id to move forward.
pending_ids : The status variable of the consumer (Consumer), which is used to maintain the unconfirmed id of the consumer. pending_ids records the messages that have been read by the client but have not yet been ack (Acknowledge character: confirmation character).
Commands related to message queue:
XADD - Add message to the end
XTRIM - Trim the stream to limit the length
XDEL - Delete messages
XLEN - Get the number of elements in the stream, i.e., the message length
XRANGE - Get a list of messages, which will automatically filter out deleted messages
XREVRANGE - Get a list of messages in reverse order, ID from large to small
XREAD - Get a list of messages in a blocking or non-blocking way
Consumer group related commands:
XGROUP CREATE - Create consumer groups
XREADGROUP GROUP - Read messages from the consumer group
XACK - Mark messages as "processed"
XGROUP SETID - Set a new last message ID for the consumer group
XGROUP DELCONSUMER - Delete consumers
XGROUP DESTROY - Delete consumer groups
XPENDING - Display information about pending messages
XCLAIM - Transfer the ownership of messages
XINFO - View related information about streams and consumer groups
XINFO GROUPS - Print information about consumer groups
XINFO STREAM - Print stream information
Use XADD to add messages to the queue, if the specified queue does not exist, it will create a queue, XADD syntax format:
XADD key ID field value [field value ...]
key Queue name, if it does not exist, it will be created
ID : Message ID, we use * Represents a field generated by redis, which can be customized, but you must ensure its incrementality.
field value : Record.
redis> XADD mystream * name Sara surname O'Connor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) ""1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "O'Connor" 2) 1) ""1601372323627-1" 2) 1) "field"1" 2) "value"1" 3) "field"2" 4) "value"2" 5) "field"3" 6) "value"3" redis>
Use XTRIM to trim the stream, limit the length, syntax format:
XTRIM key MAXLEN [~] count
key : Stream name
MAXLEN : Length
count : Quantity
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) ""1601372434568-0" 2) 1) "field"1" 2) "A" 3) "field"2" 4) "B" 5) "field"3" 6) "C" 7) "field"4" 8) "D" 127.0.0.1:6379> redis>
Use XDEL to delete messages, syntax format:
XDEL key ID [ID ...]
key: Stream name
ID : Message ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) ""1" 2) 1) 1538561701744-0 2) 1) "c" 2) ""3"
Use XLEN to get the number of elements in the stream, i.e., the message length, syntax format:
XLEN key
key: Stream name
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
Use XRANGE to get a list of messages, which will automatically filter out deleted messages, syntax format:
XRANGE key start end [COUNT count]
key : Queue name
start : Starting value, - Represents the minimum value
end : Ending value, + Represents the maximum value
count : Quantity
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) ""1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) ""1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
Use XREVRANGE to get a list of messages, which will automatically filter out deleted messages, syntax format:
XREVRANGE key end start [COUNT count]
key : Queue name
end : Ending value, + Represents the maximum value
start : Starting value, - Represents the minimum value
count : Quantity
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) ""1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis>
Use XREAD to get a list of messages in a blocking or non-blocking way, syntax format:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count : Quantity
milliseconds : Optional, blocking milliseconds, not set means non-blocking mode
key : Queue name
id : Message ID
# Read two messages from the head of the Stream > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) ""1532" 3) "event"-id" 4) ""5" 5) "user"-id" 6) ""7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) ""812" 3) "event"-id" 4) ""9" 5) "user"-id" 6) ""388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
Use XGROUP CREATE to create a consumer group, syntax format:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id]-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key Queue name, if it does not exist, it will be created
groupname Group name.
$ It indicates to consume from the end, only accept new messages, and ignore all current Stream messages.
Consume from the beginning:
XGROUP CREATE mystream consumer-group-name 0-0
Consume from the end:
XGROUP CREATE mystream consumer-group-name $
Use XREADGROUP GROUP to read messages from a consumer group, syntax format:
XREADGROUP GROUP consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group Consumer group name
consumer Consumer name.
count Reading count.
milliseconds Blocking milliseconds.
key Queue name.
ID Message ID.
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >