Ensure we release the lock

This commit is contained in:
Kegan Dougal 2025-08-12 14:12:25 +01:00
parent f24688af11
commit f47155b8bb

View file

@ -291,14 +291,17 @@ func (w *worker) _next() {
// down the subscriber to free up resources. It'll get started // down the subscriber to free up resources. It'll get started
// again if new activity happens. // again if new activity happens.
w.Lock() w.Lock()
defer w.Unlock()
// inside the lock, let's check if the ephemeral consumer saw something new! // inside the lock, let's check if the ephemeral consumer saw something new!
// If so, we do have new messages after all, they just came at a bad time. // If so, we do have new messages after all, they just came at a bad time.
if w.ephemeralSeq > w.durableSeq { if w.ephemeralSeq > w.durableSeq {
w.Act(nil, w._next) w.Act(nil, w._next)
w.Unlock()
return return
} }
w.Unlock()
case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound:
w.Lock()
defer w.Unlock()
// The consumer is gone, therefore it's reached the inactivity // The consumer is gone, therefore it's reached the inactivity
// threshold. Clean up and stop processing at this point, if a // threshold. Clean up and stop processing at this point, if a
// new event comes in for this room then the ordered consumer // new event comes in for this room then the ordered consumer
@ -308,7 +311,6 @@ func (w *worker) _next() {
} }
w.subscription = nil w.subscription = nil
return return
default: default:
// Something went wrong while trying to fetch the next event // Something went wrong while trying to fetch the next event
// from the queue. In which case, we'll shut down the subscriber // from the queue. In which case, we'll shut down the subscriber