diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 232f4feb..fb132338 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -292,11 +292,17 @@ func (w *worker) _next() { // inside the lock, let's check if the ephemeral consumer saw something new! // If so, we do have new messages after all, they just came at a bad time. if w.ephemeralSeq > w.durableSeq { - w.Act(nil, w._next) w.Unlock() + w.Act(nil, w._next) return } + + if err = w.subscription.Unsubscribe(); err != nil { + logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) + } + w.subscription = nil w.Unlock() + return case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: w.Lock() defer w.Unlock()