Various fixes in fetchAuthEvents (#3447)

This might fix issues with state events gone missing.

---------

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>

[skip ci]
This commit is contained in:
Till 2024-12-17 19:18:05 +01:00 committed by GitHub
parent 23e097c3f0
commit e8b1a89ff6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 41 additions and 29 deletions

View file

@ -205,9 +205,27 @@ func (r *Inputer) processRoomEvent(
}
}
// Check that the auth events of the event are known.
// If they aren't then we will ask the federation API for them.
authEvents, _ := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{}
if err = r.fetchAuthEvents(ctx, logger, roomInfo, virtualHost, headered, authEvents, knownEvents, serverRes.ServerNames); err != nil {
return fmt.Errorf("r.fetchAuthEvents: %w", err)
}
isRejected := false
var rejectionErr error
// Check if the event is allowed by its auth events. If it isn't then
// we consider the event to be "rejected" — it will still be persisted.
if err = gomatrixserverlib.Allowed(event, authEvents, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return r.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
}); err != nil {
isRejected = true
rejectionErr = err
logger.WithError(rejectionErr).Warnf("Event %s not allowed by auth events", event.EventID())
}
// At this point we are checking whether we know all of the prev events, and
// if we know the state before the prev events. This is necessary before we
// try to do `calculateAndSetState` on the event later, otherwise it will fail
@ -283,24 +301,6 @@ func (r *Inputer) processRoomEvent(
}
}
// Check that the auth events of the event are known.
// If they aren't then we will ask the federation API for them.
authEvents, _ := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{}
if err = r.fetchAuthEvents(ctx, logger, roomInfo, virtualHost, headered, authEvents, knownEvents, serverRes.ServerNames); err != nil {
return fmt.Errorf("r.fetchAuthEvents: %w", err)
}
// Check if the event is allowed by its auth events. If it isn't then
// we consider the event to be "rejected" — it will still be persisted.
if err = gomatrixserverlib.Allowed(event, authEvents, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return r.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
}); err != nil {
isRejected = true
rejectionErr = err
logger.WithError(rejectionErr).Warnf("Event %s not allowed by auth events", event.EventID())
}
// Accumulate the auth event NIDs.
authEventIDs := event.AuthEventIDs()
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
@ -323,7 +323,7 @@ func (r *Inputer) processRoomEvent(
)
}
}
} else {
} else if !knownEvents[authEventID].Rejected {
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
}
}
@ -704,15 +704,14 @@ func (r *Inputer) fetchAuthEvents(
}
ev := authEvents[0]
isRejected := false
if roomInfo != nil {
isRejected, err = r.DB.IsEventRejected(ctx, roomInfo.RoomNID, ev.EventID())
ev.Rejected, err = r.DB.IsEventRejected(ctx, roomInfo.RoomNID, ev.EventID())
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("r.DB.IsEventRejected failed: %w", err)
}
}
known[authEventID] = &ev // don't take the pointer of the iterated event
if !isRejected {
if !ev.Rejected {
if err = auth.AddEvent(ev.PDU); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
@ -744,8 +743,13 @@ func (r *Inputer) fetchAuthEvents(
return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers)
}
// Start with a clean state and see if we can auth with what the remote
// server told us. Otherwise earlier topologically sorted events could
// fail to be authed by more recent referenced ones.
auth.Clear()
// Reuse these to reduce allocations.
authEventNIDs := make([]types.EventNID, 0, 5)
_authEventNIDs := [5]types.EventNID{}
isRejected := false
nextAuthEvent:
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
@ -755,7 +759,11 @@ nextAuthEvent:
// If we already know about this event from the database then we don't
// need to store it again or do anything further with it, so just skip
// over it rather than wasting cycles.
if ev, ok := known[authEvent.EventID()]; ok && ev != nil {
if ev, ok := known[authEvent.EventID()]; ok && ev != nil && !ev.Rejected {
// Need to add to the auth set for the next event being processed.
if err := auth.AddEvent(authEvent); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
continue nextAuthEvent
}
@ -770,11 +778,11 @@ nextAuthEvent:
// In order to store the new auth event, we need to know its auth chain
// as NIDs for the `auth_event_nids` column. Let's see if we can find those.
authEventNIDs = authEventNIDs[:0]
authEventNIDs := _authEventNIDs[:0]
for _, eventID := range authEvent.AuthEventIDs() {
knownEvent, ok := known[eventID]
if !ok {
continue nextAuthEvent
return fmt.Errorf("auth event ID %s not known but should be", eventID)
}
authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
}
@ -821,6 +829,7 @@ nextAuthEvent:
// Now we know about this event, it was stored and the signatures were OK.
known[authEvent.EventID()] = &types.Event{
EventNID: eventNID,
Rejected: isRejected,
PDU: authEvent,
}
}

View file

@ -228,6 +228,7 @@ func (s StateAtEventAndReferences) EventIDs() string {
// It is when performing bulk event lookup in the database.
type Event struct {
EventNID EventNID
Rejected bool
gomatrixserverlib.PDU
}