mirror of
https://github.com/element-hq/dendrite.git
synced 2025-09-14 05:12:26 +03:00
refactor: update GMSL (#3058)
Sister PR to https://github.com/matrix-org/gomatrixserverlib/pull/364 Read this commit by commit to avoid going insane.
This commit is contained in:
parent
9fa39263c0
commit
72285b2659
306 changed files with 2117 additions and 1934 deletions
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
@ -98,8 +99,8 @@ type DeviceListUpdater struct {
|
|||
api DeviceListUpdaterAPI
|
||||
producer KeyChangeProducer
|
||||
fedClient fedsenderapi.KeyserverFederationAPI
|
||||
workerChans []chan gomatrixserverlib.ServerName
|
||||
thisServer gomatrixserverlib.ServerName
|
||||
workerChans []chan spec.ServerName
|
||||
thisServer spec.ServerName
|
||||
|
||||
// When device lists are stale for a user, they get inserted into this map with a channel which `Update` will
|
||||
// block on or timeout via a select.
|
||||
|
@ -113,7 +114,7 @@ type DeviceListUpdater struct {
|
|||
type DeviceListUpdaterDatabase interface {
|
||||
// StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists.
|
||||
// If no domains are given, all user IDs with stale device lists are returned.
|
||||
StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error)
|
||||
StaleDeviceLists(ctx context.Context, domains []spec.ServerName) ([]string, error)
|
||||
|
||||
// MarkDeviceListStale sets the stale bit for this user to isStale.
|
||||
MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error
|
||||
|
@ -146,7 +147,7 @@ func NewDeviceListUpdater(
|
|||
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
||||
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
||||
rsAPI rsapi.KeyserverRoomserverAPI, thisServer gomatrixserverlib.ServerName,
|
||||
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
|
||||
) *DeviceListUpdater {
|
||||
return &DeviceListUpdater{
|
||||
process: process,
|
||||
|
@ -157,7 +158,7 @@ func NewDeviceListUpdater(
|
|||
producer: producer,
|
||||
fedClient: fedClient,
|
||||
thisServer: thisServer,
|
||||
workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
|
||||
workerChans: make([]chan spec.ServerName, numWorkers),
|
||||
userIDToChan: make(map[string]chan bool),
|
||||
userIDToChanMu: &sync.Mutex{},
|
||||
rsAPI: rsAPI,
|
||||
|
@ -170,12 +171,12 @@ func (u *DeviceListUpdater) Start() error {
|
|||
// Allocate a small buffer per channel.
|
||||
// If the buffer limit is reached, backpressure will cause the processing of EDUs
|
||||
// to stop (in this transaction) until key requests can be made.
|
||||
ch := make(chan gomatrixserverlib.ServerName, 10)
|
||||
ch := make(chan spec.ServerName, 10)
|
||||
u.workerChans[i] = ch
|
||||
go u.worker(ch)
|
||||
}
|
||||
|
||||
staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{})
|
||||
staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []spec.ServerName{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -195,7 +196,7 @@ func (u *DeviceListUpdater) Start() error {
|
|||
|
||||
// CleanUp removes stale device entries for users we don't share a room with anymore
|
||||
func (u *DeviceListUpdater) CleanUp() error {
|
||||
staleUsers, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{})
|
||||
staleUsers, err := u.db.StaleDeviceLists(u.process.Context(), []spec.ServerName{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -223,7 +224,7 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
|
|||
|
||||
// ManualUpdate invalidates the device list for the given user and fetches the latest and tracks it.
|
||||
// Blocks until the device list is synced or the timeout is reached.
|
||||
func (u *DeviceListUpdater) ManualUpdate(ctx context.Context, serverName gomatrixserverlib.ServerName, userID string) error {
|
||||
func (u *DeviceListUpdater) ManualUpdate(ctx context.Context, serverName spec.ServerName, userID string) error {
|
||||
mu := u.mutex(userID)
|
||||
mu.Lock()
|
||||
err := u.db.MarkDeviceListStale(ctx, userID, true)
|
||||
|
@ -369,12 +370,12 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
||||
retries := make(map[gomatrixserverlib.ServerName]time.Time)
|
||||
func (u *DeviceListUpdater) worker(ch chan spec.ServerName) {
|
||||
retries := make(map[spec.ServerName]time.Time)
|
||||
retriesMu := &sync.Mutex{}
|
||||
// restarter goroutine which will inject failed servers into ch when it is time
|
||||
go func() {
|
||||
var serversToRetry []gomatrixserverlib.ServerName
|
||||
var serversToRetry []spec.ServerName
|
||||
for {
|
||||
serversToRetry = serversToRetry[:0] // reuse memory
|
||||
time.Sleep(time.Second)
|
||||
|
@ -413,7 +414,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
|||
}
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
|
||||
func (u *DeviceListUpdater) processServer(serverName spec.ServerName) (time.Duration, bool) {
|
||||
ctx := u.process.Context()
|
||||
logger := util.GetLogger(ctx).WithField("server_name", serverName)
|
||||
deviceListUpdateCount.WithLabelValues(string(serverName)).Inc()
|
||||
|
@ -421,7 +422,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
|||
waitTime := defaultWaitTime // How long should we wait to try again?
|
||||
successCount := 0 // How many user requests failed?
|
||||
|
||||
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
|
||||
userIDs, err := u.db.StaleDeviceLists(ctx, []spec.ServerName{serverName})
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to load stale device lists")
|
||||
return waitTime, true
|
||||
|
@ -457,7 +458,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
|||
return waitTime, !allUsersSucceeded
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName gomatrixserverlib.ServerName, userID string) (time.Duration, error) {
|
||||
func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName spec.ServerName, userID string) (time.Duration, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
|
||||
defer cancel()
|
||||
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue