diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 46cd206a..3e81a2ec 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -101,9 +101,11 @@ type worker struct { roomID string subscription *nats.Subscription sentryHub *sentry.Hub + ephemeralSeq uint64 + durableSeq uint64 } -func (r *Inputer) startWorkerForRoom(roomID string) { +func (r *Inputer) startWorkerForRoom(roomID string, seq uint64) { v, loaded := r.workers.LoadOrStore(roomID, &worker{ r: r, roomID: roomID, @@ -112,6 +114,9 @@ func (r *Inputer) startWorkerForRoom(roomID string) { w := v.(*worker) w.Lock() defer w.Unlock() + + w.ephemeralSeq = seq + if !loaded || w.subscription == nil { streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) @@ -226,7 +231,8 @@ func (r *Inputer) Start() error { "", // This is blank because we specified it in BindStream. func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) - r.startWorkerForRoom(roomID) + meta, _ := m.Metadata() + r.startWorkerForRoom(roomID, meta.Sequence.Stream) _ = m.Ack() }, nats.HeadersOnly(), @@ -262,6 +268,7 @@ func (w *worker) _next() { w.sentryHub.ConfigureScope(func(scope *sentry.Scope) { scope.SetTag("room_id", w.roomID) }) + msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { case nil: @@ -280,12 +287,19 @@ func (w *worker) _next() { // minute for activity in this room. At this point we will shut // down the subscriber to free up resources. It'll get started // again if new activity happens. + w.Lock() + defer w.Unlock() + // inside the lock, let's check if the ephemeral consumer saw something new! + // If so, we do have new messages after all, they just came at a bad time. + if w.ephemeralSeq > w.durableSeq { + w.Act(nil, w._next) + return + } + if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) } - w.Lock() w.subscription = nil - w.Unlock() return default: @@ -293,11 +307,12 @@ func (w *worker) _next() { // from the queue. In which case, we'll shut down the subscriber // and wait to be notified about new room activity again. Maybe // the problem will be corrected by then. + w.Lock() + logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID) if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) } - w.Lock() w.subscription = nil w.Unlock() return @@ -310,6 +325,9 @@ func (w *worker) _next() { // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. msg := msgs[0] + meta, _ := msg.Metadata() + w.durableSeq = meta.Sequence.Stream + var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS