Skip to content

Commit

Permalink
new queue map attribute to store metadata, now when the user request …
Browse files Browse the repository at this point in the history
…the creation of a census that is already created, it is returned through the queue
  • Loading branch information
lucasmenendez committed Sep 1, 2023
1 parent 99df946 commit e61318c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 46 deletions.
57 changes: 32 additions & 25 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ func (capi *census3API) launchCensusCreation(msg *api.APIdata, ctx *httprouter.H
// create and publish census merkle tree in background
queueID := capi.queue.Enqueue()
go func(req *CreateCensusResquest) {
if err := capi.createAndPublishCensus(req, queueID); err != nil {
if ok := capi.queue.Update(queueID, true, err); !ok {
censusID, err := capi.createAndPublishCensus(req, queueID)
if err != nil && !errors.Is(ErrCensusAlreadyExists, err) {
if ok := capi.queue.Update(queueID, true, nil, err); !ok {
log.Errorf("error updating census queue process with error: %v", err)
}
return
}
if ok := capi.queue.Update(queueID, true, nil); !ok {
queueData := map[string]any{"censusID": censusID}
if ok := capi.queue.Update(queueID, true, queueData, nil); !ok {
log.Errorf("error updating census queue process with error")
}
}(req)
Expand All @@ -132,14 +134,14 @@ func (capi *census3API) launchCensusCreation(msg *api.APIdata, ctx *httprouter.H
// all the required information from the database, and then creates and publish
// the census merkle tree on IPFS. Then saves the resulting information of the
// census tree in the database.
func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID string) error {
bgCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID string) (int, error) {
bgCtx, cancel := context.WithTimeout(context.Background(), censusCreationTimeout)
defer cancel()
// begin a transaction for group sql queries
tx, err := capi.db.BeginTx(bgCtx, nil)
if err != nil {
log.Errorw(err, "error starting database")
return ErrCantCreateCensus.With("error starting database")
return -1, ErrCantCreateCensus.With("error starting database")
}
defer func() {
if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) {
Expand All @@ -152,22 +154,22 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID st
if err != nil {
if errors.Is(sql.ErrNoRows, err) {
log.Errorf("no strategy found for id %d: %s", req.StrategyID, err.Error())
return ErrNoStrategyTokens.With("no strategy found")
return -1, ErrNoStrategyTokens.With("no strategy found")
}
log.Errorf("error getting strategy with id %d: %s", req.StrategyID, err.Error())
return ErrCantCreateCensus.With("error getting strategy")
return -1, ErrCantCreateCensus.With("error getting strategy")
}
if len(strategyTokens) == 0 {
log.Errorf("no tokens for strategy %d", req.StrategyID)
return ErrNoStrategyTokens.With("no tokens for strategy")
return -1, ErrNoStrategyTokens.With("no tokens for strategy")
}

// get the maximun current census ID to calculate the next one, if any
// census has been created yet, continue
lastCensusID, err := qtx.LastCensusID(bgCtx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
log.Errorw(err, "error getting last census ID")
return ErrCantCreateCensus.With("error getting last census ID")
return -1, ErrCantCreateCensus.With("error getting last census ID")
}
// compute the new censusId and censusType
newCensusID := int(lastCensusID) + 1
Expand All @@ -187,7 +189,7 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID st
continue
}
log.Errorf("error getting token holders of %s: %v", common.BytesToAddress(token.ID), err)
return ErrCantCreateCensus.With("error getting token holders")
return -1, ErrCantCreateCensus.With("error getting token holders")
}
for _, holder := range holders {
holderAddr := common.BytesToAddress(holder.ID)
Expand All @@ -200,40 +202,40 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID st
}
if len(strategyHolders) == 0 {
log.Errorf("no holders for strategy '%d'", req.StrategyID)
return ErrNotFoundTokenHolders.With("no holders for strategy")
return -1, ErrNotFoundTokenHolders.With("no holders for strategy")
}
// create a census tree and publish on IPFS
def := census.NewCensusDefinition(newCensusID, int(req.StrategyID), strategyHolders, req.Anonymous)
newCensus, err := capi.censusDB.CreateAndPublish(def)
if err != nil {
log.Errorf("error creating or publishing the census: %v", err)
return ErrCantCreateCensus.With("error creating or publishing the census")
return -1, ErrCantCreateCensus.With("error creating or publishing the census")
}
// check if the census already exists using the merkle root of the generated
// census
_, err = qtx.CensusByMerkleRoot(bgCtx, newCensus.RootHash)
currentCensus, err := qtx.CensusByMerkleRoot(bgCtx, newCensus.RootHash)
if err == nil {
return ErrCensusAlreadyExists
return int(currentCensus.ID), ErrCensusAlreadyExists
}
if err != nil && !errors.Is(sql.ErrNoRows, err) {
log.Errorf("error checking if the generated census already exists: %v", err)
return ErrCantCreateCensus.With("error checking if the generated census already exists")
return -1, ErrCantCreateCensus.With("error checking if the generated census already exists")
}
// save the new census in the SQL database
sqlURI := new(sql.NullString)
if err := sqlURI.Scan(newCensus.URI); err != nil {
log.Errorf("error encoding census uri: %v", err)
return ErrCantCreateCensus.With("error encoding census uri")
return -1, ErrCantCreateCensus.With("error encoding census uri")
}
sqlCensusSize := sql.NullInt32{}
if err := sqlCensusSize.Scan(int64(len(strategyHolders))); err != nil {
log.Errorf("error encoding census size: %v", err)
return ErrCantCreateCensus.With("error encoding census size")
return -1, ErrCantCreateCensus.With("error encoding census size")
}
sqlCensusWeight := sql.NullString{}
if err := sqlCensusWeight.Scan(censusWeight.String()); err != nil {
log.Errorf("error encoding census weight: %v", err)
return ErrCantCreateCensus.With("error encoding census weight")
return -1, ErrCantCreateCensus.With("error encoding census weight")
}
_, err = qtx.CreateCensus(bgCtx, queries.CreateCensusParams{
ID: int64(newCensus.ID),
Expand All @@ -247,13 +249,13 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID st
})
if err != nil {
log.Errorf("error saving the census on the database: %v", err)
return ErrCantCreateCensus.With("error saving the census on the database")
return -1, ErrCantCreateCensus.With("error saving the census on the database")
}
if err := tx.Commit(); err != nil {
log.Errorf("error committing the census on the database: %v", err)
return ErrCantCreateCensus.With("error committing the census on the database")
return -1, ErrCantCreateCensus.With("error committing the census on the database")
}
return nil
return newCensus.ID, nil
}

// getEnqueueCensus handler returns the current status of the queue item
Expand All @@ -267,11 +269,10 @@ func (capi *census3API) getEnqueueCensus(msg *api.APIdata, ctx *httprouter.HTTPC
return ErrMalformedCensusQueueID
}
// try to get and check if the census is in the queue
done, err, exists := capi.queue.Done(queueID)
exists, done, data, err := capi.queue.Done(queueID)
if !exists {
return ErrNotFoundCensus.Withf("the ID %s does not exist in the queue", queueID)
}
log.Info(done, err)
// init queue item response
queueItem := QueueItemResponse{
Done: done,
Expand All @@ -282,8 +283,14 @@ func (capi *census3API) getEnqueueCensus(msg *api.APIdata, ctx *httprouter.HTTPC
// if everything is ok, get the census information an return it
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
censusID, ok := data["censusID"].(int)
if !ok {
log.Errorf("no census id registered on queue item: %v", err)
return ErrCantGetCensus
}

// get the census from the database by queue_id
currentCensus, err := capi.sqlc.CensusByQueueID(internalCtx, queueID)
currentCensus, err := capi.sqlc.CensusByID(internalCtx, int64(censusID))
if err != nil {
log.Errorf("error getting census by queue id: %v", err)
return ErrCantGetCensus
Expand Down
7 changes: 7 additions & 0 deletions api/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package api

import "time"

const (
censusCreationTimeout = 10 * time.Minute
)
40 changes: 19 additions & 21 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (

const QueueIDLen = 20

type QueueItem struct {
done bool
err error
data map[string]any
}

type BackgroundQueue struct {
mtx *sync.Mutex
processes map[string]struct {
done bool
err error
}
processes map[string]QueueItem
}

func NewBackgroundQueue() *BackgroundQueue {
return &BackgroundQueue{
mtx: &sync.Mutex{},
processes: make(map[string]struct {
done bool
err error
}),
mtx: &sync.Mutex{},
processes: make(map[string]QueueItem),
}
}

Expand All @@ -31,10 +31,11 @@ func (q *BackgroundQueue) Enqueue() string {
defer q.mtx.Unlock()

id := util.RandomHex(QueueIDLen)
q.processes[id] = struct {
done bool
err error
}{done: false, err: nil}
q.processes[id] = QueueItem{
done: false,
err: nil,
data: make(map[string]any),
}
return id
}

Expand All @@ -49,26 +50,23 @@ func (q *BackgroundQueue) Dequeue(id string) bool {
return true
}

func (q *BackgroundQueue) Update(id string, done bool, err error) bool {
func (q *BackgroundQueue) Update(id string, done bool, data map[string]any, err error) bool {
q.mtx.Lock()
defer q.mtx.Unlock()

if _, ok := q.processes[id]; !ok {
return false
}
q.processes[id] = struct {
done bool
err error
}{done, err}
q.processes[id] = QueueItem{done: done, err: err, data: data}
return true
}

func (q *BackgroundQueue) Done(id string) (bool, error, bool) {
func (q *BackgroundQueue) Done(id string) (bool, bool, map[string]any, error) {
q.mtx.Lock()
defer q.mtx.Unlock()

if p, ok := q.processes[id]; ok {
return p.done, p.err, true
return true, p.done, p.data, p.err
}
return false, nil, false
return false, false, nil, nil
}

0 comments on commit e61318c

Please sign in to comment.