I am using Redis Streams with Consumer Groups. I have a consumer running a loop that fetches messages from the Pending Entries List (PEL) using ID 0 before it attempts to read new messages.
However, if a message fails to process (or is slow), the XACK is never called. On the next iteration of the loop, XREADGROUP returns the same messages again, causing re-processing.
// Minimal version of my loop
async function consume() {
while (true) {// This returns the same pending messages every time if XACK isn’t called
const results = await redis.xreadgroup(‘GROUP’, ‘mygroup’, ‘consumer1’,‘COUNT’, ‘10’,‘STREAMS’, ‘mystream’, ‘0’);
if (results) {
for (const msg of results\[0\]\[1\]) {
try {
await process(msg);
await redis.xack('mystream', 'mygroup', msg\[0\]);
} catch (err) {
// If it fails here, XACK is never called.
// Next loop iteration fetches this same message immediately.
console.error("Failed to process", err);
}
}
}
}}
What is the standard pattern to fetch messages from the Pending Entries List and also prevent the re-processing ?