Skip to content

Commit

Permalink
chore_: address feedback from igor
Browse files Browse the repository at this point in the history
  • Loading branch information
qfrank committed Nov 22, 2024
1 parent 42023d3 commit 5a6e8db
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 50 deletions.
33 changes: 33 additions & 0 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4169,6 +4169,39 @@ func (m *Manager) CommunitySettingsExist(id types.HexBytes) (bool, error) {
return m.persistence.CommunitySettingsExist(id)
}

func (m *Manager) EnsureCommunitySettings(org *Community) error {
// This is for status-go versions that didn't have `CommunitySettings`
// We need to ensure communities that existed before community settings
// were introduced will have community settings as well
exists, err := m.CommunitySettingsExist(org.ID())
if err != nil {
return err
}

if !exists {
communitySettings := CommunitySettings{
CommunityID: org.IDString(),
HistoryArchiveSupportEnabled: true,
}
return m.SaveCommunitySettings(communitySettings)
}

// In case we do have settings, but the history archive support is disabled
// for this community, we enable it, as this should be the default for all
// non-admin communities
communitySettings, err := m.GetCommunitySettingsByID(org.ID())
if err != nil {
return err
}

if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled {
communitySettings.HistoryArchiveSupportEnabled = true
return m.UpdateCommunitySettings(*communitySettings)
}

return nil
}

func (m *Manager) DeleteCommunitySettings(id types.HexBytes) error {
return m.persistence.DeleteCommunitySettings(id)
}
Expand Down
74 changes: 24 additions & 50 deletions protocol/messenger_filter_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package protocol

import (
"crypto/ecdsa"
stderrors "errors"
"math/rand"
"sync"
"time"

"github.com/pkg/errors"

gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/deprecation"
"github.com/status-im/status-go/protocol/common/shard"
Expand Down Expand Up @@ -51,14 +53,9 @@ func (m *Messenger) collectFiltersAndKeys() ([]transport.FiltersToInitialize, []
wg.Wait()
close(filtersCh)
close(publicKeysCh)
close(errCh)

select {
case err := <-errCh:
return nil, nil, err
default:
}

return m.collectResults(filtersCh, publicKeysCh)
return m.collectResults(filtersCh, publicKeysCh, errCh)
}

func (m *Messenger) processJoinedCommunities(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, errCh chan<- error) {
Expand All @@ -77,53 +74,20 @@ func (m *Messenger) processJoinedCommunities(wg *sync.WaitGroup, filtersCh chan<

func (m *Messenger) processCommunitiesSettings(communities []*communities.Community) []transport.FiltersToInitialize {
logger := m.logger.With(zap.String("site", "processCommunitiesSettings"))
var filtersToInit []transport.FiltersToInitialize
filtersToInit := make([]transport.FiltersToInitialize, 0, len(communities))

for _, org := range communities {
// the org advertise on the public topic derived by the pk
filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)

if err := m.ensureCommunitySettings(org); err != nil {
if err := m.communitiesManager.EnsureCommunitySettings(org); err != nil {
logger.Warn("failed to process community settings", zap.Error(err))
}
}

return filtersToInit
}

func (m *Messenger) ensureCommunitySettings(org *communities.Community) error {
// This is for status-go versions that didn't have `CommunitySettings`
// We need to ensure communities that existed before community settings
// were introduced will have community settings as well
exists, err := m.communitiesManager.CommunitySettingsExist(org.ID())
if err != nil {
return err
}

if !exists {
communitySettings := communities.CommunitySettings{
CommunityID: org.IDString(),
HistoryArchiveSupportEnabled: true,
}
return m.communitiesManager.SaveCommunitySettings(communitySettings)
}

// In case we do have settings, but the history archive support is disabled
// for this community, we enable it, as this should be the default for all
// non-admin communities
communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(org.ID())
if err != nil {
return err
}

if !org.IsControlNode() && !communitySettings.HistoryArchiveSupportEnabled {
communitySettings.HistoryArchiveSupportEnabled = true
return m.communitiesManager.UpdateCommunitySettings(*communitySettings)
}

return nil
}

func (m *Messenger) processSpectatedCommunities(wg *sync.WaitGroup, filtersCh chan<- []transport.FiltersToInitialize, errCh chan<- error) {
defer gocommon.LogOnPanic()
defer wg.Done()
Expand All @@ -134,7 +98,7 @@ func (m *Messenger) processSpectatedCommunities(wg *sync.WaitGroup, filtersCh ch
return
}

var filtersToInit []transport.FiltersToInitialize
filtersToInit := make([]transport.FiltersToInitialize, 0, len(spectatedCommunities))
for _, org := range spectatedCommunities {
filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)
}
Expand All @@ -153,7 +117,7 @@ func (m *Messenger) processChats(wg *sync.WaitGroup, filtersCh chan<- []transpor
return
}

validChats, communityInfo := m.validateAndProcessChats(chats)
validChats, communityInfo := m.validateAndInitChatsFirstMessageTimestamp(chats)
filters, publicKeys, err := m.processValidChats(validChats, communityInfo)
if err != nil {
errCh <- err
Expand All @@ -168,9 +132,9 @@ func (m *Messenger) processChats(wg *sync.WaitGroup, filtersCh chan<- []transpor
}
}

func (m *Messenger) validateAndProcessChats(chats []*Chat) ([]*Chat, map[string]*communities.Community) {
logger := m.logger.With(zap.String("site", "validateAndProcessChats"))
communityInfo := make(map[string]*communities.Community)
func (m *Messenger) validateAndInitChatsFirstMessageTimestamp(chats []*Chat) ([]*Chat, map[string]*communities.Community) {
logger := m.logger.With(zap.String("site", "validateAndInitChatsFirstMessageTimestamp"))
communitiesCache := make(map[string]*communities.Community)
var validChats []*Chat

for _, chat := range chats {
Expand All @@ -181,8 +145,8 @@ func (m *Messenger) validateAndProcessChats(chats []*Chat) ([]*Chat, map[string]
validChats = append(validChats, chat)
}

m.initChatsFirstMessageTimestamp(communityInfo, validChats)
return validChats, communityInfo
m.initChatsFirstMessageTimestamp(communitiesCache, validChats)
return validChats, communitiesCache
}

func (m *Messenger) processValidChats(validChats []*Chat, communityInfo map[string]*communities.Community) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) {
Expand Down Expand Up @@ -361,7 +325,17 @@ func (m *Messenger) processControlledCommunities(wg *sync.WaitGroup, errCh chan<
}
}

func (m *Messenger) collectResults(filtersCh <-chan []transport.FiltersToInitialize, publicKeysCh <-chan []*ecdsa.PublicKey) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) {
func (m *Messenger) collectResults(filtersCh <-chan []transport.FiltersToInitialize, publicKeysCh <-chan []*ecdsa.PublicKey, errCh <-chan error) ([]transport.FiltersToInitialize, []*ecdsa.PublicKey, error) {
var errs []error
for err := range errCh {
m.logger.Error("error collecting filters and public keys", zap.Error(err))
errs = append(errs, err)
}

if len(errs) > 0 {
return nil, nil, stderrors.Join(errs...)
}

var allFilters []transport.FiltersToInitialize
var allPublicKeys []*ecdsa.PublicKey

Expand Down

0 comments on commit 5a6e8db

Please sign in to comment.