Add message stats to reporting (#2748)

Since we're now listening on the `OutputRoomEvent` stream, we are able
to store messages stats.
This commit is contained in:
Till 2022-11-02 11:18:11 +01:00 committed by GitHub
parent b367cfeddf
commit 86b25a6337
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 383 additions and 11 deletions

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -23,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
userAPITypes "github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/dendrite/userapi/util"
)
@ -36,6 +38,11 @@ type OutputRoomEventConsumer struct {
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats
roomCounts map[gomatrixserverlib.ServerName]map[string]bool // map from serverName to map from rommID to "isEncrypted"
lastUpdate time.Time
countsLock sync.Mutex
serverName gomatrixserverlib.ServerName
}
func NewOutputRoomEventConsumer(
@ -57,6 +64,11 @@ func NewOutputRoomEventConsumer(
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
roomCounts: map[gomatrixserverlib.ServerName]map[string]bool{},
lastUpdate: time.Now(),
countsLock: sync.Mutex{},
serverName: cfg.Matrix.ServerName,
}
}
@ -88,6 +100,10 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
if s.cfg.Matrix.ReportStats.Enabled {
go s.storeMessageStats(ctx, event.Type(), event.Sender(), event.RoomID())
}
log.WithFields(log.Fields{
"event_id": event.EventID(),
"event_type": event.Type(),
@ -107,6 +123,68 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventType, eventSender, roomID string) {
s.countsLock.Lock()
defer s.countsLock.Unlock()
// reset the roomCounts on a day change
if s.lastUpdate.Day() != time.Now().Day() {
s.roomCounts[s.serverName] = make(map[string]bool)
s.lastUpdate = time.Now()
}
_, sender, err := gomatrixserverlib.SplitID('@', eventSender)
if err != nil {
return
}
msgCount := s.msgCounts[s.serverName]
roomCount := s.roomCounts[s.serverName]
if roomCount == nil {
roomCount = make(map[string]bool)
}
switch eventType {
case "m.room.message":
roomCount[roomID] = false
msgCount.Messages++
if sender == s.serverName {
msgCount.SentMessages++
}
case "m.room.encrypted":
roomCount[roomID] = true
msgCount.MessagesE2EE++
if sender == s.serverName {
msgCount.SentMessagesE2EE++
}
default:
return
}
s.msgCounts[s.serverName] = msgCount
s.roomCounts[s.serverName] = roomCount
for serverName, stats := range s.msgCounts {
var normalRooms, encryptedRooms int64 = 0, 0
for _, isEncrypted := range s.roomCounts[s.serverName] {
if isEncrypted {
encryptedRooms++
} else {
normalRooms++
}
}
err := s.db.UpsertDailyRoomsMessages(ctx, serverName, stats, normalRooms, encryptedRooms)
if err != nil {
log.WithError(err).Errorf("failed to upsert daily messages")
}
// Clear stats if we successfully stored it
if err == nil {
stats.Messages = 0
stats.SentMessages = 0
stats.MessagesE2EE = 0
stats.SentMessagesE2EE = 0
s.msgCounts[serverName] = stats
}
}
}
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {