Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature/p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Jun 28, 2024
2 parents d2090c6 + 63c0d5b commit 8b291b0
Show file tree
Hide file tree
Showing 22 changed files with 150 additions and 56 deletions.
40 changes: 36 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ endif
SRCPATH := $(shell pwd)
ARCH := $(shell ./scripts/archtype.sh)
OS_TYPE := $(shell ./scripts/ostype.sh)
# overrides for cross-compiling platform-specific binaries
ifdef CROSS_COMPILE_ARCH
ARCH := $(CROSS_COMPILE_ARCH)
GO_INSTALL := CGO_ENABLED=1 GOOS=$(OS_TYPE) GOARCH=$(ARCH) go build -o $(GOPATH1)/bin-$(OS_TYPE)-$(ARCH)
else
GO_INSTALL := go install
endif
S3_RELEASE_BUCKET = $$S3_RELEASE_BUCKET

GOLANG_VERSIONS := $(shell ./scripts/get_golang_version.sh all)
Expand Down Expand Up @@ -153,10 +160,35 @@ crypto/libs/$(OS_TYPE)/$(ARCH)/lib/libsodium.a:
cp -R crypto/libsodium-fork/. crypto/copies/$(OS_TYPE)/$(ARCH)/libsodium-fork
cd crypto/copies/$(OS_TYPE)/$(ARCH)/libsodium-fork && \
./autogen.sh --prefix $(SRCPATH)/crypto/libs/$(OS_TYPE)/$(ARCH) && \
./configure --disable-shared --prefix="$(SRCPATH)/crypto/libs/$(OS_TYPE)/$(ARCH)" && \
./configure --disable-shared --prefix="$(SRCPATH)/crypto/libs/$(OS_TYPE)/$(ARCH)" $(EXTRA_CONFIGURE_FLAGS) && \
$(MAKE) && \
$(MAKE) install

universal:
ifeq ($(OS_TYPE),darwin)
# build amd64 Mac binaries
mkdir -p $(GOPATH1)/bin-darwin-amd64
CROSS_COMPILE_ARCH=amd64 GOBIN=$(GOPATH1)/bin-darwin-amd64 MACOSX_DEPLOYMENT_TARGET=12.0 EXTRA_CONFIGURE_FLAGS='CFLAGS="-arch x86_64 -mmacos-version-min=12.0" --host=x86_64-apple-darwin' $(MAKE)

# build arm64 Mac binaries
mkdir -p $(GOPATH1)/bin-darwin-arm64
CROSS_COMPILE_ARCH=arm64 GOBIN=$(GOPATH1)/bin-darwin-arm64 MACOSX_DEPLOYMENT_TARGET=12.0 EXTRA_CONFIGURE_FLAGS='CFLAGS="-arch arm64 -mmacos-version-min=12.0" --host=aarch64-apple-darwin' $(MAKE)

# lipo together
mkdir -p $(GOPATH1)/bin-darwin-universal
for binary in $$(ls $(GOPATH1)/bin-darwin-arm64); do \
if [ -f $(GOPATH1)/bin-darwin-amd64/$$binary ]; then \
lipo -create -output $(GOPATH1)/bin-darwin-universal/$$binary \
$(GOPATH1)/bin-darwin-arm64/$$binary \
$(GOPATH1)/bin-darwin-amd64/$$binary; \
else \
echo "Warning: Binary $$binary exists in arm64 but not in amd64"; \
fi \
done
else
$(error OS_TYPE must be darwin for universal builds)
endif

deps:
./scripts/check_deps.sh

Expand Down Expand Up @@ -212,11 +244,11 @@ ${GOCACHE}/file.txt:
touch "${GOCACHE}"/file.txt

buildsrc: check-go-version crypto/libs/$(OS_TYPE)/$(ARCH)/lib/libsodium.a node_exporter NONGO_BIN ${GOCACHE}/file.txt
go install $(GOTRIMPATH) $(GOTAGS) $(GOBUILDMODE) -ldflags="$(GOLDFLAGS)" ./...
$(GO_INSTALL) $(GOTRIMPATH) $(GOTAGS) $(GOBUILDMODE) -ldflags="$(GOLDFLAGS)" ./...

buildsrc-special:
cd tools/block-generator && \
go install $(GOTRIMPATH) $(GOTAGS) $(GOBUILDMODE) -ldflags="$(GOLDFLAGS)" ./...
$(GO_INSTALL) $(GOTRIMPATH) $(GOTAGS) $(GOBUILDMODE) -ldflags="$(GOLDFLAGS)" ./...

check-go-version:
./scripts/check_golang_version.sh build
Expand Down Expand Up @@ -331,7 +363,7 @@ dump: $(addprefix gen/,$(addsuffix /genesis.dump, $(NETWORKS)))
install: build
scripts/dev_install.sh -p $(GOPATH1)/bin

.PHONY: default fmt lint check_shell sanity cover prof deps build test fulltest shorttest clean cleango deploy node_exporter install %gen gen NONGO_BIN check-go-version rebuild_kmd_swagger
.PHONY: default fmt lint check_shell sanity cover prof deps build test fulltest shorttest clean cleango deploy node_exporter install %gen gen NONGO_BIN check-go-version rebuild_kmd_swagger universal

###### TARGETS FOR CICD PROCESS ######
include ./scripts/release/mule/Makefile.mule
Expand Down
2 changes: 2 additions & 0 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/logspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
)

const (
Expand Down Expand Up @@ -113,6 +114,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
defer func() {
close(decoded)
}()
util.SetGoroutineLabels("tokenizeTag", string(tag))
for {
select {
case raw, ok := <-networkMessages:
Expand Down
3 changes: 1 addition & 2 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger,
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err = data.LoadLedger(
log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash,
nil, cfg,
log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, cfg,
)
if err != nil {
t.Fatal("couldn't build ledger", err)
Expand Down
4 changes: 2 additions & 2 deletions catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
for i := 0; i < b.N; i++ {
inMem := true
prefix := b.Name() + "empty" + strconv.Itoa(i)
local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg)
require.NoError(b, err)

// Make Service
Expand Down Expand Up @@ -150,7 +150,7 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da
cfg := config.GetDefaultLocal()
cfg.Archival = true
prefix := t.Name() + "empty"
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg)
require.NoError(t, err)

ledger, err = datatest.FabricateLedger(logging.TestingLog(t), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks))
Expand Down
2 changes: 1 addition & 1 deletion daemon/algod/api/server/v2/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func testingenvWithBalances(t testing.TB, minMoneyAtStart, maxMoneyAtStart, numA
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, cfg)
if err != nil {
panic(err)
}
Expand Down
11 changes: 11 additions & 0 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -230,6 +231,16 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes
NodeExporterPath: cfg.NodeExporterPath,
})

var currentVersion = config.GetCurrentVersion()
var algodBuildInfoGauge = metrics.MakeGauge(metrics.MetricName{Name: "algod_build_info", Description: "Algod build info"})
algodBuildInfoGauge.SetLabels(1, map[string]string{
"version": currentVersion.String(),
"goarch": runtime.GOARCH,
"goos": runtime.GOOS,
"commit": currentVersion.CommitHash,
"channel": currentVersion.Channel,
})

var serverNode ServerNode
if cfg.EnableFollowMode {
var followerNode *node.AlgorandFollowerNode
Expand Down
2 changes: 1 addition & 1 deletion data/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testingenv(t testing.TB, numAccounts, numTxs int, offlineAccounts bool) (*L
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, cfg)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion data/datatest/fabricateLedger.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func FabricateLedger(log logging.Logger, ledgerName string, accounts []account.P
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil, cfg)
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, cfg)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions data/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type roundSeed struct {
func LoadLedger[T string | ledger.DirsAndPrefix](
log logging.Logger, dir T, memory bool,
genesisProto protocol.ConsensusVersion, genesisBal bookkeeping.GenesisBalances, genesisID string, genesisHash crypto.Digest,
blockListeners []ledgercore.BlockListener, cfg config.Local,
cfg config.Local,
) (*Ledger, error) {
if genesisBal.Balances == nil {
genesisBal.Balances = make(map[basics.Address]basics.AccountData)
Expand Down Expand Up @@ -115,7 +115,6 @@ func LoadLedger[T string | ledger.DirsAndPrefix](
}

l.Ledger = ll
l.RegisterBlockListeners(blockListeners)
return l, nil
}

Expand Down
18 changes: 9 additions & 9 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func BenchmarkTxHandlerProcessing(b *testing.B) {
cfg.Archival = true
cfg.TxBacklogReservedCapacityPerPeer = 1
cfg.IncomingConnectionsLimit = 10
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(b, err)
defer ledger.Close()

Expand Down Expand Up @@ -1028,7 +1028,7 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) {
cfg.Archival = true
cfg.EnableTxBacklogRateLimiting = false
cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -1197,7 +1197,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.EnableTxBacklogRateLimiting = false
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -1642,7 +1642,7 @@ func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Lo
ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString)
ledgerName = strings.Replace(ledgerName, "#", "-", 1)
const inMem = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(tb, err)
return ledger
}
Expand Down Expand Up @@ -2184,7 +2184,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.TxPoolSize = config.MaxTxGroupSize + 1
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2420,7 +2420,7 @@ func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { //nolint:parallelt
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Ledger.Close()

Expand Down Expand Up @@ -2525,7 +2525,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
cfg.TxBacklogSize = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2637,7 +2637,7 @@ func TestTxHandlerAppRateLimiter(t *testing.T) {
cfg.TxBacklogAppTxRateLimiterMaxSize = 100
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2706,7 +2706,7 @@ func TestTxHandlerCapGuard(t *testing.T) {
cfg.IncomingConnectionsLimit = 1
cfg.TxBacklogSize = 3

ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down
9 changes: 9 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (l *Ledger) reloadLedger() error {
l.trackerMu.Lock()
defer l.trackerMu.Unlock()

// save block listeners to recover them later
blockListeners := make([]ledgercore.BlockListener, 0, len(l.notifier.listeners))
blockListeners = append(blockListeners, l.notifier.listeners...)

// close the trackers.
l.trackers.close()

Expand Down Expand Up @@ -256,6 +260,9 @@ func (l *Ledger) reloadLedger() error {
return err
}

// restore block listeners since l.notifier might not survive a reload
l.notifier.register(blockListeners)

// post-init actions
if trackerDBInitParams.VacuumOnStartup || l.cfg.OptimizeAccountsDatabaseOnStartup {
err = l.accts.vacuumDatabase(context.Background())
Expand Down Expand Up @@ -423,6 +430,8 @@ func (l *Ledger) Close() {
// RegisterBlockListeners registers listeners that will be called when a
// new block is added to the ledger.
func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
l.notifier.register(listeners)
}

Expand Down
35 changes: 35 additions & 0 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3422,5 +3422,40 @@ func TestLedgerRetainMinOffCatchpointInterval(t *testing.T) {
}
}()
}
}

type testBlockListener struct {
id int
}

func (t *testBlockListener) OnNewBlock(bookkeeping.Block, ledgercore.StateDelta) {}

// TestLedgerRegisterBlockListeners ensures that the block listeners survive reloadLedger
func TestLedgerRegisterBlockListeners(t *testing.T) {
partitiontest.PartitionTest(t)

genBalances, _, _ := ledgertesting.NewTestGenesis()
var genHash crypto.Digest
crypto.RandBytes(genHash[:])
cfg := config.GetDefaultLocal()
l := newSimpleLedgerFull(t, genBalances, protocol.ConsensusCurrentVersion, genHash, cfg)
defer l.Close()

l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{1}, &testBlockListener{2}})
l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{3}})

require.Equal(t, 3, len(l.notifier.listeners))
var ids []int
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
require.Equal(t, []int{1, 2, 3}, ids)

l.reloadLedger()

ids = nil
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
require.Equal(t, []int{1, 2, 3}, ids)
}
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,14 @@ func (n *P2PNetwork) Start() error {
for i := 0; i < incomingThreads; i++ {
n.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n)
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n, "network", "P2PNetwork")
}

n.wg.Add(1)
go n.httpdThread()

n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n)
go n.broadcaster.broadcastThread(&n.wg, n, "network", "P2PNetwork")

n.wg.Add(1)
go n.meshThread()
Expand Down
10 changes: 6 additions & 4 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,10 @@ func (wn *WebsocketNetwork) Start() error {
for i := 0; i < incomingThreads; i++ {
wn.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn)
go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn, "network", "WebsocketNetwork")
}
wn.wg.Add(1)
go wn.broadcaster.broadcastThread(&wn.wg, wn)
go wn.broadcaster.broadcastThread(&wn.wg, wn, "network", "WebsocketNetwork")
if wn.prioScheme != nil {
wn.wg.Add(1)
go wn.prioWeightRefresh()
Expand Down Expand Up @@ -1160,8 +1160,9 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
}
}

func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager) {
func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager, profLabels ...string) {
defer wg.Done()
util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.messageHandlerThread")...)

for {
select {
Expand Down Expand Up @@ -1262,8 +1263,9 @@ func (wn *msgHandler) sendFilterMessage(msg IncomingMessage, net networkPeerMana
}
}

func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager) {
func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager, profLabels ...string) {
defer wg.Done()
util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.broadcastThread")...)

slowWritingPeerCheckTicker := time.NewTicker(wn.slowWritingPeerMonitorInterval)
defer slowWritingPeerCheckTicker.Stop()
Expand Down
Loading

0 comments on commit 8b291b0

Please sign in to comment.