diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index e207015e..d07eaf41 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -102,6 +102,7 @@ type worker struct { subscription *nats.Subscription sentryHub *sentry.Hub ephemeralSeq uint64 + // last seq we fully processed durableSeq uint64 } @@ -306,6 +307,7 @@ func (w *worker) _next() { // from the queue. In which case, we'll shut down the subscriber // and wait to be notified about new room activity again. Maybe // the problem will be corrected by then. + // atomically clear the subscription and unsubscribe w.Lock() logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)