English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Redis Streams

Redis Stream is a 5.0 version of the new data structure.

Redis Stream is mainly used for message queue (MQ, Message Queue), Redis itself has a Redis publish-subscribe (pub/sub) is used to implement the message queue function, but it has the drawback that messages cannot be persisted, and if there is a network disconnection, Redis failure, and other situations, messages will be discarded.

In simple terms, publish-subscribe (pub/(sub) can distribute messages, but cannot record historical messages.

Redis Stream provides message persistence and master-slave replication features, allowing any client to access data at any moment and remember the access position of each client, and ensure that messages are not lost.

The structure of Redis Stream is as follows, it has a message list that links all the messages added together, each message has a unique ID and corresponding content:

Each Stream has a unique name, which is the Redis key, and it is automatically created when we use the xadd command to append messages for the first time.

Above diagram analysis:

  • Consumer Group : Consumer group, created by the XGROUP CREATE command, a consumer group has multiple consumers (Consumer).

  • last_delivered_id : Cursor, each consumer group has a cursor last_delivered_id, and any consumer reading messages will cause the cursor last_delivered_id to move forward.

  • pending_ids : The status variable of the consumer (Consumer), which is used to maintain the unconfirmed id of the consumer. pending_ids records the messages that have been read by the client but have not yet been ack (Acknowledge character: confirmation character).

Commands related to message queue:

  • XADD - Add message to the end

  • XTRIM - Trim the stream to limit the length

  • XDEL - Delete messages

  • XLEN - Get the number of elements in the stream, i.e., the message length

  • XRANGE - Get a list of messages, which will automatically filter out deleted messages

  • XREVRANGE -  Get a list of messages in reverse order, ID from large to small

  • XREAD - Get a list of messages in a blocking or non-blocking way

Consumer group related commands:

  • XGROUP CREATE - Create consumer groups

  • XREADGROUP GROUP - Read messages from the consumer group

  • XACK - Mark messages as "processed"

  • XGROUP SETID - Set a new last message ID for the consumer group

  • XGROUP DELCONSUMER - Delete consumers

  • XGROUP DESTROY - Delete consumer groups

  • XPENDING - Display information about pending messages

  • XCLAIM - Transfer the ownership of messages

  • XINFO - View related information about streams and consumer groups

  • XINFO GROUPS - Print information about consumer groups

  • XINFO STREAM - Print stream information

XADD

Use XADD to add messages to the queue, if the specified queue does not exist, it will create a queue, XADD syntax format:

XADD key ID field value [field value ...]
  • key Queue name, if it does not exist, it will be created

  • ID : Message ID, we use * Represents a field generated by redis, which can be customized, but you must ensure its incrementality.

  • field value : Record.

redis> XADD mystream * name Sara surname O'Connor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) ""1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "O'Connor"
2) 1) ""1601372323627-1"
   2) 1) "field"1"
      2) "value"1"
      3) "field"2"
      4) "value"2"
      5) "field"3"
      6) "value"3"
redis>

XTRIM

Use XTRIM to trim the stream, limit the length, syntax format:

XTRIM key MAXLEN [~] count
  • key : Stream name

  • MAXLEN : Length

  • count : Quantity

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) ""1601372434568-0"
   2) 1) "field"1"
      2) "A"
      3) "field"2"
      4) "B"
      5) "field"3"
      6) "C"
      7) "field"4"
      8) "D"
127.0.0.1:6379> 
redis>

XDEL

Use XDEL to delete messages, syntax format:

XDEL key ID [ID ...]
  • key: Stream name

  • ID : Message ID

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) ""1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) ""3"

XLEN

Use XLEN to get the number of elements in the stream, i.e., the message length, syntax format:

XLEN key
  • key: Stream name

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

Use XRANGE to get a list of messages, which will automatically filter out deleted messages, syntax format:

XRANGE key start end [COUNT count]
  • key : Queue name

  • start : Starting value, - Represents the minimum value

  • end : Ending value, + Represents the maximum value

  • count : Quantity

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) ""1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) ""1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

Use XREVRANGE to get a list of messages, which will automatically filter out deleted messages, syntax format:

XREVRANGE key end start [COUNT count]
  • key : Queue name

  • end : Ending value, + Represents the maximum value

  • start : Starting value, - Represents the minimum value

  • count : Quantity

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) ""1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

Use XREAD to get a list of messages in a blocking or non-blocking way, syntax format:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count : Quantity

  • milliseconds : Optional, blocking milliseconds, not set means non-blocking mode

  • key : Queue name

  • id : Message ID

# Read two messages from the head of the Stream
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) ""1532"
            3) "event"-id"
            4) ""5"
            5) "user"-id"
            6) ""7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) ""812"
            3) "event"-id"
            4) ""9"
            5) "user"-id"
            6) ""388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

Use XGROUP CREATE to create a consumer group, syntax format:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id]-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key Queue name, if it does not exist, it will be created

  • groupname Group name.

  • $ It indicates to consume from the end, only accept new messages, and ignore all current Stream messages.

Consume from the beginning:

XGROUP CREATE mystream consumer-group-name 0-0

Consume from the end:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

Use XREADGROUP GROUP to read messages from a consumer group, syntax format:

XREADGROUP GROUP consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group Consumer group name

  • consumer Consumer name.

  • count Reading count.

  • milliseconds Blocking milliseconds.

  • key Queue name.

  • ID Message ID.

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >