diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 0e2d71c4..6215e6ba 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -291,14 +291,17 @@ func (w *worker) _next() { // down the subscriber to free up resources. It'll get started // again if new activity happens. w.Lock() - defer w.Unlock() // inside the lock, let's check if the ephemeral consumer saw something new! // If so, we do have new messages after all, they just came at a bad time. if w.ephemeralSeq > w.durableSeq { w.Act(nil, w._next) + w.Unlock() return } + w.Unlock() case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: + w.Lock() + defer w.Unlock() // The consumer is gone, therefore it's reached the inactivity // threshold. Clean up and stop processing at this point, if a // new event comes in for this room then the ordered consumer @@ -308,7 +311,6 @@ func (w *worker) _next() { } w.subscription = nil return - default: // Something went wrong while trying to fetch the next event // from the queue. In which case, we'll shut down the subscriber