Adjust roomserver locks, don't unsubscribe if new event is inflight

Signed-off-by: Vivianne Langdon <puttabutta@gmail.com>
This commit is contained in:
Vivianne Langdon 2025-05-25 13:51:09 -04:00
parent ee42cb48a4
commit acede43611

View file

@ -101,9 +101,11 @@ type worker struct {
roomID string roomID string
subscription *nats.Subscription subscription *nats.Subscription
sentryHub *sentry.Hub sentryHub *sentry.Hub
ephemeralSeq uint64
durableSeq uint64
} }
func (r *Inputer) startWorkerForRoom(roomID string) { func (r *Inputer) startWorkerForRoom(roomID string, seq uint64) {
v, loaded := r.workers.LoadOrStore(roomID, &worker{ v, loaded := r.workers.LoadOrStore(roomID, &worker{
r: r, r: r,
roomID: roomID, roomID: roomID,
@ -112,6 +114,9 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
w := v.(*worker) w := v.(*worker)
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
w.ephemeralSeq = seq
if !loaded || w.subscription == nil { if !loaded || w.subscription == nil {
streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
@ -226,7 +231,8 @@ func (r *Inputer) Start() error {
"", // This is blank because we specified it in BindStream. "", // This is blank because we specified it in BindStream.
func(m *nats.Msg) { func(m *nats.Msg) {
roomID := m.Header.Get(jetstream.RoomID) roomID := m.Header.Get(jetstream.RoomID)
r.startWorkerForRoom(roomID) meta, _ := m.Metadata()
r.startWorkerForRoom(roomID, meta.Sequence.Stream)
_ = m.Ack() _ = m.Ack()
}, },
nats.HeadersOnly(), nats.HeadersOnly(),
@ -262,6 +268,7 @@ func (w *worker) _next() {
w.sentryHub.ConfigureScope(func(scope *sentry.Scope) { w.sentryHub.ConfigureScope(func(scope *sentry.Scope) {
scope.SetTag("room_id", w.roomID) scope.SetTag("room_id", w.roomID)
}) })
msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err { switch err {
case nil: case nil:
@ -280,12 +287,19 @@ func (w *worker) _next() {
// minute for activity in this room. At this point we will shut // minute for activity in this room. At this point we will shut
// down the subscriber to free up resources. It'll get started // down the subscriber to free up resources. It'll get started
// again if new activity happens. // again if new activity happens.
w.Lock()
defer w.Unlock()
// inside the lock, let's check if the ephemeral consumer saw something new!
// If so, we do have new messages after all, they just came at a bad time.
if w.ephemeralSeq > w.durableSeq {
w.Act(nil, w._next)
return
}
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }
w.Lock()
w.subscription = nil w.subscription = nil
w.Unlock()
return return
default: default:
@ -293,11 +307,12 @@ func (w *worker) _next() {
// from the queue. In which case, we'll shut down the subscriber // from the queue. In which case, we'll shut down the subscriber
// and wait to be notified about new room activity again. Maybe // and wait to be notified about new room activity again. Maybe
// the problem will be corrected by then. // the problem will be corrected by then.
w.Lock()
logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }
w.Lock()
w.subscription = nil w.subscription = nil
w.Unlock() w.Unlock()
return return
@ -310,6 +325,9 @@ func (w *worker) _next() {
// fails then we'll terminate the message — this notifies NATS that // fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again. // we are done with the message and never want to see it again.
msg := msgs[0] msg := msgs[0]
meta, _ := msg.Metadata()
w.durableSeq = meta.Sequence.Stream
var inputRoomEvent api.InputRoomEvent var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
// using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS