Harmony backports (#3581)

Backport a few things from Harmony.

### Pull Request Checklist

<!-- Please read
https://matrix-org.github.io/dendrite/development/contributing before
submitting your pull request -->

* [ ] I have added Go unit tests or [Complement integration
tests](https://github.com/matrix-org/complement) for this PR _or_ I have
justified why this PR doesn't need tests
* [x] Pull request includes a [sign off
below](https://element-hq.github.io/dendrite/development/contributing#sign-off)
_or_ I have already signed off privately

---------

Signed-off-by: Till Faelligen <2353100+S7evinK@users.noreply.github.com>
Co-authored-by: Kegan Dougal <7190048+kegsay@users.noreply.github.com>
This commit is contained in:
Till 2025-06-19 09:49:03 +02:00 committed by GitHub
parent c133596baf
commit 331a6f221b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 18 additions and 11 deletions

View file

@ -264,7 +264,11 @@ func (w *worker) _next() {
}) })
msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err { switch err {
case nil: case nil, nats.ErrTimeout, context.DeadlineExceeded, context.Canceled:
// Is the server shutting down? If so, stop processing.
if w.r.ProcessContext.Context().Err() != nil {
return
}
// Make sure that once we're done here, we queue up another call // Make sure that once we're done here, we queue up another call
// to _next in the inbox. // to _next in the inbox.
defer w.Act(nil, w._next) defer w.Act(nil, w._next)
@ -275,11 +279,11 @@ func (w *worker) _next() {
return return
} }
case context.DeadlineExceeded, context.Canceled: case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound:
// The context exceeded, so we've been waiting for more than a // The consumer is gone, therefore it's reached the inactivity
// minute for activity in this room. At this point we will shut // threshold. Clean up and stop processing at this point, if a
// down the subscriber to free up resources. It'll get started // new event comes in for this room then the ordered consumer
// again if new activity happens. // over the entire stream will recreate this anyway.
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }

View file

@ -13,7 +13,7 @@ import (
"errors" "errors"
"fmt" "fmt"
//"github.com/element-hq/dendrite/roomserver/internal" // "github.com/element-hq/dendrite/roomserver/internal"
"github.com/element-hq/dendrite/setup/config" "github.com/element-hq/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/gomatrixserverlib/spec"
@ -747,7 +747,7 @@ func GetAuthChain(
// from the database and the `eventsToFetch` will be updated with any new // from the database and the `eventsToFetch` will be updated with any new
// events that we have learned about and need to find. When `eventsToFetch` // events that we have learned about and need to find. When `eventsToFetch`
// is eventually empty, we should have reached the end of the chain. // is eventually empty, we should have reached the end of the chain.
eventsToFetch := authEventIDs eventsToFetch := append([]string{}, authEventIDs...)
authEventsMap := make(map[string]gomatrixserverlib.PDU) authEventsMap := make(map[string]gomatrixserverlib.PDU)
for len(eventsToFetch) > 0 { for len(eventsToFetch) > 0 {
@ -779,7 +779,7 @@ func GetAuthChain(
// We've now retrieved all of the events we can. Flatten them down into an // We've now retrieved all of the events we can. Flatten them down into an
// array and return them. // array and return them.
var authEvents []gomatrixserverlib.PDU authEvents := make([]gomatrixserverlib.PDU, 0, len(authEventsMap))
for _, event := range authEventsMap { for _, event := range authEventsMap {
authEvents = append(authEvents, event) authEvents = append(authEvents, event)
} }

View file

@ -92,6 +92,7 @@ func (s *accountDataStatements) SelectAccountDataInRange(
accountDataEventFilter *synctypes.EventFilter, accountDataEventFilter *synctypes.EventFilter,
) (data map[string][]string, pos types.StreamPosition, err error) { ) (data map[string][]string, pos types.StreamPosition, err error) {
data = make(map[string][]string) data = make(map[string][]string)
pos = r.Low()
rows, err := sqlutil.TxStmt(txn, s.selectAccountDataInRangeStmt).QueryContext( rows, err := sqlutil.TxStmt(txn, s.selectAccountDataInRangeStmt).QueryContext(
ctx, userID, r.Low(), r.High(), ctx, userID, r.Low(), r.High(),
@ -122,7 +123,7 @@ func (s *accountDataStatements) SelectAccountDataInRange(
pos = id pos = id
} }
} }
if pos == 0 { if len(data) == 0 {
pos = r.High() pos = r.High()
} }
return data, pos, rows.Err() return data, pos, rows.Err()

View file

@ -84,6 +84,8 @@ func (s *accountDataStatements) SelectAccountDataInRange(
filter *synctypes.EventFilter, filter *synctypes.EventFilter,
) (data map[string][]string, pos types.StreamPosition, err error) { ) (data map[string][]string, pos types.StreamPosition, err error) {
data = make(map[string][]string) data = make(map[string][]string)
pos = r.Low()
stmt, params, err := prepareWithFilters( stmt, params, err := prepareWithFilters(
s.db, txn, selectAccountDataInRangeSQL, s.db, txn, selectAccountDataInRangeSQL,
[]interface{}{ []interface{}{
@ -119,7 +121,7 @@ func (s *accountDataStatements) SelectAccountDataInRange(
pos = id pos = id
} }
} }
if pos == 0 { if len(data) == 0 {
pos = r.High() pos = r.High()
} }
return data, pos, rows.Err() return data, pos, rows.Err()