Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf!: Optimize Filter Initialization with Concurrent Processing #6106

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 21 additions & 232 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"database/sql"
"encoding/json"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
Expand All @@ -33,7 +32,6 @@ import (
utils "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/contracts"
"github.com/status-im/status-go/deprecation"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/images"
Expand All @@ -44,7 +42,6 @@ import (
"github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/protocol/anonmetrics"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/multidevice"
Expand Down Expand Up @@ -920,20 +917,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
}
}

joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return nil, err
}

for _, joinedCommunity := range joinedCommunities {
// resume importing message history archives in case
// imports have been interrupted previously
err := m.resumeHistoryArchivesImport(joinedCommunity.ID())
if err != nil {
return nil, err
}
}
m.enableHistoryArchivesImportAfterDelay()
go m.startHistoryArchivesImportLoop()

if m.httpServer != nil {
err = m.httpServer.Start()
Expand Down Expand Up @@ -974,6 +958,26 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
return response, nil
}

func (m *Messenger) startHistoryArchivesImportLoop() {
defer gocommon.LogOnPanic()
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
m.logger.Error("failed to get joined communities", zap.Error(err))
return
}

for _, joinedCommunity := range joinedCommunities {
// resume importing message history archives in case
// imports have been interrupted previously
err := m.resumeHistoryArchivesImport(joinedCommunity.ID())
if err != nil {
m.logger.Error("failed to resume history archives import", zap.Error(err))
continue
}
}
m.enableHistoryArchivesImportAfterDelay()
}

func (m *Messenger) SetMediaServer(server *server.MediaServer) {
m.httpServer = server
m.communitiesManager.SetMediaServer(server)
Expand Down Expand Up @@ -1775,221 +1779,6 @@ func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) {
}()
}

// InitFilters analyzes chats and contacts in order to setup filters
// which are responsible for retrieving messages.
func (m *Messenger) InitFilters() error {
qfrank marked this conversation as resolved.
Show resolved Hide resolved

// Seed the for color generation
rand.Seed(time.Now().Unix())

logger := m.logger.With(zap.String("site", "Init"))

// Community requests will arrive in this pubsub topic
err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil)
if err != nil {
return err
}

var (
filtersToInit []transport.FiltersToInitialize
publicKeys []*ecdsa.PublicKey
)

joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return err
}
for _, org := range joinedCommunities {
// the org advertise on the public topic derived by the pk
filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)

// This is for status-go versions that didn't have `CommunitySettings`
// We need to ensure communities that existed before community settings
// were introduced will have community settings as well
exists, err := m.communitiesManager.CommunitySettingsExist(org.ID())
if err != nil {
logger.Warn("failed to check if community settings exist", zap.Error(err))
continue
}

if !exists {
communitySettings := communities.CommunitySettings{
CommunityID: org.IDString(),
HistoryArchiveSupportEnabled: true,
}

err = m.communitiesManager.SaveCommunitySettings(communitySettings)
if err != nil {
logger.Warn("failed to save community settings", zap.Error(err))
}
continue
}

// In case we do have settings, but the history archive support is disabled
// for this community, we enable it, as this should be the default for all
// non-admin communities
communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(org.ID())
if err != nil {
logger.Warn("failed to fetch community settings", zap.Error(err))
continue
}

if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled {
communitySettings.HistoryArchiveSupportEnabled = true
err = m.communitiesManager.UpdateCommunitySettings(*communitySettings)
if err != nil {
logger.Warn("failed to update community settings", zap.Error(err))
}
}
}

spectatedCommunities, err := m.communitiesManager.Spectated()
if err != nil {
return err
}
for _, org := range spectatedCommunities {
filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)
}

// Get chat IDs and public keys from the existing chats.
// TODO: Get only active chats by the query.
chats, err := m.persistence.Chats()
if err != nil {
return err
}

communityInfo := make(map[string]*communities.Community)
var validChats []*Chat
for _, chat := range chats {
if err := chat.Validate(); err != nil {
logger.Warn("failed to validate chat", zap.Error(err))
continue
}
validChats = append(validChats, chat)
}

m.initChatsFirstMessageTimestamp(communityInfo, validChats)

for _, chat := range validChats {
if !chat.Active || chat.Timeline() {
m.allChats.Store(chat.ID, chat)
continue
}

switch chat.ChatType {
case ChatTypePublic, ChatTypeProfile:
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID})
case ChatTypeCommunityChat:
community, ok := communityInfo[chat.CommunityID]
if !ok {
community, err = m.communitiesManager.GetByIDString(chat.CommunityID)
if err != nil {
return err
}
communityInfo[chat.CommunityID] = community
}

if chat.UnviewedMessagesCount > 0 || chat.UnviewedMentionsCount > 0 {
// Make sure the unread count is 0 for the channels the user cannot view
// It's possible that the users received messages to a channel before permissions were added
canView := community.CanView(&m.identity.PublicKey, chat.CommunityChatID())

if !canView {
chat.UnviewedMessagesCount = 0
chat.UnviewedMentionsCount = 0
}
}

filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
return err
}
publicKeys = append(publicKeys, pk)
case ChatTypePrivateGroupChat:
for _, member := range chat.Members {
publicKey, err := member.PublicKey()
if err != nil {
return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name)
}
publicKeys = append(publicKeys, publicKey)
}
default:
return errors.New("invalid chat type")
}

m.allChats.Store(chat.ID, chat)
}

// Timeline and profile chats are deprecated.
// This code can be removed after some reasonable time.

// upsert timeline chat
if !deprecation.ChatProfileDeprecated {
err = m.ensureTimelineChat()
if err != nil {
return err
}
}

// upsert profile chat
if !deprecation.ChatTimelineDeprecated {
err = m.ensureMyOwnProfileChat()
if err != nil {
return err
}
}

// Get chat IDs and public keys from the contacts.
contacts, err := m.persistence.Contacts()
if err != nil {
return err
}
for idx, contact := range contacts {
if err = m.updateContactImagesURL(contact); err != nil {
return err
}
m.allContacts.Store(contact.ID, contacts[idx])
// We only need filters for contacts added by us and not blocked.
if !contact.added() || contact.Blocked {
continue
}
publicKey, err := contact.PublicKey()
if err != nil {
logger.Error("failed to get contact's public key", zap.Error(err))
continue
}
publicKeys = append(publicKeys, publicKey)
}

_, err = m.transport.InitFilters(filtersToInit, publicKeys)
if err != nil {
return err
}

// Init filters for the communities we control
var communityFiltersToInitialize []transport.CommunityFilterToInitialize
controlledCommunities, err := m.communitiesManager.Controlled()
if err != nil {
return err
}

for _, c := range controlledCommunities {
communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{
Shard: c.Shard(),
PrivKey: c.PrivateKey(),
})
}

_, err = m.InitCommunityFilters(communityFiltersToInitialize)
if err != nil {
return err
}

return nil
}

// Shutdown takes care of ensuring a clean shutdown of Messenger
func (m *Messenger) Shutdown() (err error) {
if m == nil {
Expand Down
27 changes: 25 additions & 2 deletions protocol/messenger_chats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"github.com/status-im/status-go/protocol/transport"
)

type ChatPreviewFilterType int

const (
ChatPreviewFilterTypeAll ChatPreviewFilterType = iota
ChatPreviewFilterTypeCommunity
ChatPreviewFilterTypeNonCommunity
)

func (m *Messenger) getOneToOneAndNextClock(contact *Contact) (*Chat, uint64, error) {
chat, ok := m.allChats.Load(contact.ID)
if !ok {
Expand Down Expand Up @@ -46,10 +54,25 @@ func (m *Messenger) Chats() []*Chat {
return chats
}

func (m *Messenger) ChatsPreview() []*ChatPreview {
// ChatsPreview returns a list of chat previews.
qfrank marked this conversation as resolved.
Show resolved Hide resolved
// When onlyCommunityChats is nil, returns all chats
// When onlyCommunityChats is true, only returns community chats
// When onlyCommunityChats is false, returns all non-community chats
func (m *Messenger) ChatsPreview(filterPointer *ChatPreviewFilterType) []*ChatPreview {
qfrank marked this conversation as resolved.
Show resolved Hide resolved
var chats []*ChatPreview

filter := ChatPreviewFilterTypeAll
if filterPointer != nil {
filter = *filterPointer
}
m.allChats.Range(func(chatID string, chat *Chat) (shouldContinue bool) {
// Skip if chat doesn't match the filter
isCommunityChat := chat.ChatType == ChatTypeCommunityChat
if filter == ChatPreviewFilterTypeCommunity && !isCommunityChat {
return true
}
if filter == ChatPreviewFilterTypeNonCommunity && isCommunityChat {
return true
}
if chat.Active || chat.Muted {
chatPreview := &ChatPreview{
ID: chat.ID,
Expand Down
Loading