From f47155b8bb345308ca06f101877a0a94ccdc6736 Mon Sep 17 00:00:00 2001 From: Kegan Dougal <7190048+kegsay@users.noreply.github.com> Date: Tue, 12 Aug 2025 14:12:25 +0100 Subject: [PATCH] Ensure we release the lock --- roomserver/internal/input/input.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 0e2d71c4..6215e6ba 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -291,14 +291,17 @@ func (w *worker) _next() { // down the subscriber to free up resources. It'll get started // again if new activity happens. w.Lock() - defer w.Unlock() // inside the lock, let's check if the ephemeral consumer saw something new! // If so, we do have new messages after all, they just came at a bad time. if w.ephemeralSeq > w.durableSeq { w.Act(nil, w._next) + w.Unlock() return } + w.Unlock() case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: + w.Lock() + defer w.Unlock() // The consumer is gone, therefore it's reached the inactivity // threshold. Clean up and stop processing at this point, if a // new event comes in for this room then the ordered consumer @@ -308,7 +311,6 @@ func (w *worker) _next() { } w.subscription = nil return - default: // Something went wrong while trying to fetch the next event // from the queue. In which case, we'll shut down the subscriber