How to prevent re-processing when reading pending entries (ID 0) in Redis stream using XREADGROUP?

I am using Redis Streams with Consumer Groups. I have a consumer running a loop that fetches messages from the Pending Entries List (PEL) using ID 0 before it attempts to read new messages.

However, if a message fails to process (or is slow), the XACK is never called. On the next iteration of the loop, XREADGROUP returns the same messages again, causing re-processing.

// Minimal version of my loop
async function consume() {
      while (true) {// This returns the same pending messages every time if XACK         isn’t called

      const results = await redis.xreadgroup(‘GROUP’, ‘mygroup’, ‘consumer1’,‘COUNT’, ‘10’,‘STREAMS’, ‘mystream’, ‘0’);

 if (results) {
   for (const msg of results\[0\]\[1\]) {
     try {
       await process(msg); 
       await redis.xack('mystream', 'mygroup', msg\[0\]);
     } catch (err) {
       // If it fails here, XACK is never called.
       // Next loop iteration fetches this same message immediately.
       console.error("Failed to process", err);
     }
   }
 }
}}

What is the standard pattern to fetch messages from the Pending Entries List and also prevent the re-processing ?

Hello Pawan,

With Redis Streams with Consumer Groups, the Pending Entries List helps Redis track which consumer received which message. Think of it like a Buffer or a sub-Stream branch coming from your initial Stream. Because entries are tracked, once a consumer instance processes an entry successfully, the XACK command must be used to acknowledge and remove that entry from the consumer’s PEL.

So here is the standard pattern with Consumer Groups :

  1. You read your message using XREADGROUP and your process the messages in your PEL
  2. Then you acknowledge using XACK, the message gets removed from the PEL
  3. Once the entries in your consumer’s PEL are all removed and the PEL is empty, your consumer can call again XREADGROUP to retrieve new messages from the stream.

In your case, you are calling :

      const results = await redis.xreadgroup(‘GROUP’, ‘mygroup’, ‘consumer1’,‘COUNT’, ‘10’,‘STREAMS’, ‘mystream’, ‘0’);

The “0” in the end of the line means the consumer is reading from the beginning of its PEL.

To read the next “batch” of entries from the main stream, you need to replace it with “>”.

      const results = await redis.xreadgroup( 'GROUP', 'mygroup', 
        'consumer1', 
        'COUNT', '10',
        'BLOCK', '5000', 
        'STREAMS', 'mystream', 
        '>' );

That’s why, when a consumer fails to process a message, as you face an error and skip acknowledging (XACK), the first message stays in the PEL and the same message will be read again on the next loop.

If you don’t want a failed message to be reprocessed, you can skip it using the NOACK option in your XREADGROUP command. The NOACK subcommand can be used to avoid adding the message to the PEL in cases where reliability is not a requirement and the occasional message loss is acceptable. This is equivalent to acknowledging the message when it is read.

To monitor your PELs, you can use the XPENDING command, it gives you the number of entries waiting in each PEL of each consumer. And, you’ll see that with the NOACK option, the PELs are always empty as messages get removed without any ACK needed.