Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lmrpc: Maintain local piecerefs #178

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion harmony/harmonydb/sql/20240228-piece-park.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ create table parked_pieces (
foreign key (task_id) references harmony_task (id) on delete set null, -- dropped
foreign key (cleanup_task_id) references harmony_task (id) on delete set null, -- dropped

unique (piece_cid)
unique (piece_cid) -- dropped
-- unique (piece_cid, cleanup_task_id) -- Added in 20240827-piecepark-uniq-cleanup.sql
);

/*
Expand Down
4 changes: 4 additions & 0 deletions harmony/harmonydb/sql/20240827-piecepark-uniq-cleanup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- this migration fixes piecepark unique constraint issue to allow multiple pieceparks with same name when cleanup task is set

ALTER TABLE parked_pieces DROP CONSTRAINT IF EXISTS parked_pieces_piece_cid_key;
ALTER TABLE parked_pieces ADD CONSTRAINT parked_pieces_piece_cid_cleanup_task_id_key UNIQUE (piece_cid, cleanup_task_id);
109 changes: 15 additions & 94 deletions market/lmrpc/lmrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package lmrpc
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -194,6 +193,11 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address
Host: laddr.String(),
}

pieceRefTracker, err := newRefTracker(db, rootUrl)
if err != nil {
return xerrors.Errorf("failed to create piece ref tracker: %w", err)
}

ast := lapi.StorageMinerStruct{}

ast.CommonStruct.Internal.Version = func(ctx context.Context) (lapi.APIVersion, error) {
Expand All @@ -216,7 +220,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address
ast.Internal.SectorsListInStates = lp.SectorsListInStates
adaptFunc(&ast.Internal.StorageRedeclareLocal, lp.StorageRedeclareLocal)
adaptFunc(&ast.Internal.ComputeDataCid, lp.ComputeDataCid)
ast.Internal.SectorAddPieceToAny = sectorAddPieceToAnyOperation(maddr, rootUrl, conf, pieceInfoLk, pieceInfos, pin, db, mi.SectorSize)
ast.Internal.SectorAddPieceToAny = sectorAddPieceToAnyOperation(maddr, rootUrl, conf, pieceInfoLk, pieceInfos, pin, db, pieceRefTracker, mi.SectorSize)
adaptFunc(&ast.Internal.StorageList, si.StorageList)
adaptFunc(&ast.Internal.StorageDetach, si.StorageDetach)
adaptFunc(&ast.Internal.StorageReportHealth, si.StorageReportHealth)
Expand Down Expand Up @@ -275,7 +279,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address
), int64(pi.size))

n, err := io.Copy(w, pieceData)
close(pi.done)

took := time.Since(start)
mbps := float64(n) / (1024 * 1024) / took.Seconds()
Expand Down Expand Up @@ -312,8 +315,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address
type pieceInfo struct {
data storiface.Data
size abi.UnpaddedPieceSize

done chan struct{}
}

// A util to convert jsonrpc methods using incompatible Go types with same jsonrpc representation
Expand Down Expand Up @@ -369,7 +370,7 @@ type PieceIngester interface {
AllocatePieceToSector(ctx context.Context, maddr address.Address, piece lpiece.PieceDealInfo, rawSize int64, source url.URL, header http.Header) (lapi.SectorOffset, error)
}

func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *config.CurioConfig, pieceInfoLk *sync.Mutex, pieceInfos map[uuid.UUID][]pieceInfo, pin PieceIngester, db *harmonydb.DB, ssize abi.SectorSize) func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal lpiece.PieceDealInfo) (lapi.SectorOffset, error) {
func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *config.CurioConfig, pieceInfoLk *sync.Mutex, pieceInfos map[uuid.UUID][]pieceInfo, pin PieceIngester, db *harmonydb.DB, prt *refTracker, ssize abi.SectorSize) func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal lpiece.PieceDealInfo) (lapi.SectorOffset, error) {
return func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal lpiece.PieceDealInfo) (lapi.SectorOffset, error) {
if (deal.PieceActivationManifest == nil && deal.DealProposal == nil) || (deal.PieceActivationManifest != nil && deal.DealProposal != nil) {
return lapi.SectorOffset{}, xerrors.Errorf("deal info must have either deal proposal or piece manifest")
Expand All @@ -390,8 +391,6 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
pi := pieceInfo{
data: pieceData,
size: pieceSize,

done: make(chan struct{}),
}

pieceUUID := uuid.New()
Expand All @@ -410,23 +409,14 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
dataUrl.RawQuery = "piece_id=" + pieceUUID.String()

// add piece entry
refID, pieceWasCreated, err := addPieceEntry(ctx, db, conf, deal, pieceSize, dataUrl, ssize)
refID, cleanup, err := prt.addPieceEntry(ctx, db, conf, deal, pieceSize, dataUrl, ssize)
if err != nil {
return lapi.SectorOffset{}, err
}

// wait for piece to be parked
if pieceWasCreated {
<-pi.done
} else {
// If the piece was not created, we need to close the done channel
close(pi.done)

closeDataReader(pieceData)
}
defer cleanup()

{
// piece park is either done or currently happening from another AP call
// piece park is either done or currently happening
// now we need to make sure that the piece is definitely parked successfully
// - in case of errors we return, and boost should be able to retry the call

Expand Down Expand Up @@ -465,6 +455,7 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
time.Sleep(5 * time.Second)
continue
}

if err != pgx.ErrNoRows {
return lapi.SectorOffset{}, xerrors.Errorf("checking park-piece task in harmony_tasks: %w", err)
}
Expand All @@ -483,6 +474,10 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
}
}

// piece is parked, ensure the data reader is closed
closeDataReader(pieceData)

// prepare pieceref url. TreeD / UpdateEncode etc. are aware of the "pieceref" scheme, and will use piecepark to get the data
pieceIDUrl := url.URL{
Scheme: "pieceref",
Opaque: fmt.Sprintf("%d", refID),
Expand All @@ -500,80 +495,6 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
}
}

func addPieceEntry(ctx context.Context, db *harmonydb.DB, conf *config.CurioConfig, deal lpiece.PieceDealInfo, pieceSize abi.UnpaddedPieceSize, dataUrl url.URL, ssize abi.SectorSize) (int64, bool, error) {
var refID int64
var pieceWasCreated bool

for {
var backpressureWait bool

comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// BACKPRESSURE
wait, err := maybeApplyBackpressure(tx, conf.Ingest, ssize)
if err != nil {
return false, xerrors.Errorf("backpressure checks: %w", err)
}
if wait {
backpressureWait = true
return false, nil
}

var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.PieceCID().String()).Scan(&pieceID)

if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
VALUES ($1, $2, $3)
ON CONFLICT (piece_cid) DO NOTHING
RETURNING id`, deal.PieceCID().String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
}
} else {
pieceWasCreated = false // Piece already exists, no new piece was created
}

// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
}

// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return refID, pieceWasCreated, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
if backpressureWait {
// Backpressure was applied, wait and try again
select {
case <-time.After(backpressureWaitTime):
case <-ctx.Done():
return refID, pieceWasCreated, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err())
}
continue
}

return refID, pieceWasCreated, xerrors.Errorf("piece tx didn't commit")
}

break
}
return refID, pieceWasCreated, nil
}

func closeDataReader(pieceData storiface.Data) {
go func() {
// close the data reader (drain to eof if it's not a closer)
Expand Down
Loading