Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: comments for tsi interface #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/tsdb/Index.md → pkg/tsdb/Series_Index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ https://cloud.tencent.com/developer/article/1397217
### SeriesFile
It could also be named as `SeriesKeyFile`, for it stores keys(measurement + tag set) for all series, and is shared by all shards, across the entire database.
### SeriesIndex
- Two maps there: sereisKey -> id, id -> offset
- Two maps there: sereisKey -> id, id -> offset, the relatsionship between series index and tsi? They are totally different.
- seriesKey: measurement + tags;
- seriesID: in one DB, can be used to identify a unique sereis. About how to generate:
Influxdb将paritition数量定死了为 8, 就是说所有的serieskey放在这8个桶里
Expand Down
3 changes: 3 additions & 0 deletions pkg/tsdb/engine/tsm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,7 @@ func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []m
return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice)
}

// 1.
func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
return e.index.CreateSeriesIfNotExists(key, name, tags)
}
Expand Down Expand Up @@ -2381,6 +2382,7 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que
return newMergeFinalizerIterator(ctx, inputs, opt, e.logger)
}

// b. this way
itrs, err := e.createVarRefIterator(ctx, measurement, opt)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2519,6 +2521,7 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
return itrs, nil
}

// c.
// createVarRefIterator creates an iterator for a variable reference.
func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, opt query.IteratorOptions) ([]query.Iterator, error) {
ref, _ := opt.Expr.(*influxql.VarRef)
Expand Down
1 change: 1 addition & 0 deletions pkg/tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator {
return MergeTagValueIterators(a...)
}

// d.
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
ss := tsdb.NewSeriesIDSet()
Expand Down
4 changes: 4 additions & 0 deletions pkg/tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsS
return nil
}

// 2.
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
ids, err := i.partition(key).createSeriesListIfNotExists([][]byte{name}, []models.Tags{tags})
Expand Down Expand Up @@ -970,6 +971,7 @@ func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) {
return tsdb.MergeTagKeyIterators(a...), nil
}

// a.
// TagValueIterator returns an iterator for all values across a single key.
func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error) {
a := make([]tsdb.TagValueIterator, 0, len(i.partitions))
Expand All @@ -982,6 +984,7 @@ func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error
return tsdb.MergeTagValueIterators(a...), nil
}

// b.
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
Expand All @@ -996,6 +999,7 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
return tsdb.MergeSeriesIDIterators(a...), nil
}

// c.
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
// Check series ID set cache...
Expand Down
1 change: 1 addition & 0 deletions pkg/tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
}

// AddSeriesList adds a list of series to the log file in bulk.
// 4.
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
// README.md (tsi), Writes - step 1
seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
Expand Down
1 change: 1 addition & 0 deletions pkg/tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func (p *Partition) DropMeasurement(name []byte) error {

// createSeriesListIfNotExists creates a list of series if they doesn't exist in
// bulk.
// 3.
func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
// Is there anything to do? The partition may have been sent an empty batch.
if len(names) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (f *SeriesFile) FileSize() (n int64, err error) {

// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The returned ids slice returns IDs for every name+tags, creating new series IDs as needed.
// 5.
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
keys := GenerateSeriesKeys(names, tagsSlice)
keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
Expand Down
3 changes: 2 additions & 1 deletion pkg/tsdb/series_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SeriesIndex struct {

// In-memory data since rebuild.
keyIDMap *rhh.HashMap
idOffsetMap map[uint64]int64
idOffsetMap map[uint64]int64
tombstones map[uint64]struct{}
}

Expand Down Expand Up @@ -161,6 +161,7 @@ func (idx *SeriesIndex) IsDeleted(id uint64) bool {
return idx.FindOffsetByID(id) == 0
}

// 7.
func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) {
switch flag {
case SeriesEntryInsertFlag:
Expand Down
1 change: 1 addition & 0 deletions pkg/tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func (p *SeriesPartition) FileSize() (n int64, err error) {

// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The ids parameter is modified to contain series IDs for all keys belonging to this partition.
// 6.
func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error {
var writeRequired bool
p.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions pkg/tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt
return engine.CreateIterator(ctx, m.Name, opt)
}

// a.
func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {
index, err := s.Index()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ cpu,host=serverB,region=uswest value=25 0
itr, err := sh.CreateIterator(context.Background(), m, query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "val2"}},
// dimension is for tag?
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
Expand Down Expand Up @@ -755,6 +756,7 @@ cpu,host=serverB,region=uswest value=25 0
var err error
m = &influxql.Measurement{Name: "cpu"}
itr, err := sh.CreateIterator(context.Background(), m, query.IteratorOptions{
// how does this parse function return?
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "val2"}},
Dimensions: []string{"host"},
Expand Down