From 6b9d1147d409cb5f39a868faf1d55bf52949b498 Mon Sep 17 00:00:00 2001 From: BohanWu <819186192@qq.com> Date: Fri, 23 Jun 2023 12:00:30 +0800 Subject: [PATCH] chore: comments for tsi interface --- pkg/tsdb/{Index.md => Series_Index.md} | 2 +- pkg/tsdb/engine/tsm/engine.go | 3 +++ pkg/tsdb/index/tsi1/file_set.go | 1 + pkg/tsdb/index/tsi1/index.go | 4 ++++ pkg/tsdb/index/tsi1/log_file.go | 1 + pkg/tsdb/index/tsi1/partition.go | 1 + pkg/tsdb/series_file.go | 1 + pkg/tsdb/series_index.go | 3 ++- pkg/tsdb/series_partition.go | 1 + pkg/tsdb/shard.go | 1 + pkg/tsdb/shard_test.go | 2 ++ 11 files changed, 18 insertions(+), 2 deletions(-) rename pkg/tsdb/{Index.md => Series_Index.md} (95%) diff --git a/pkg/tsdb/Index.md b/pkg/tsdb/Series_Index.md similarity index 95% rename from pkg/tsdb/Index.md rename to pkg/tsdb/Series_Index.md index 02ef196..8a0132a 100644 --- a/pkg/tsdb/Index.md +++ b/pkg/tsdb/Series_Index.md @@ -6,7 +6,7 @@ https://cloud.tencent.com/developer/article/1397217 ### SeriesFile It could also be named as `SeriesKeyFile`, for it stores keys(measurement + tag set) for all series, and is shared by all shards, across the entire database. ### SeriesIndex -- Two maps there: sereisKey -> id, id -> offset +- Two maps there: sereisKey -> id, id -> offset, the relatsionship between series index and tsi? They are totally different. - seriesKey: measurement + tags; - seriesID: in one DB, can be used to identify a unique sereis. About how to generate: Influxdb将paritition数量定死了为 8, 就是说所有的serieskey放在这8个桶里 diff --git a/pkg/tsdb/engine/tsm/engine.go b/pkg/tsdb/engine/tsm/engine.go index f75f5ec..4a01e2f 100644 --- a/pkg/tsdb/engine/tsm/engine.go +++ b/pkg/tsdb/engine/tsm/engine.go @@ -1810,6 +1810,7 @@ func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []m return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice) } +// 1. func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { return e.index.CreateSeriesIfNotExists(key, name, tags) } @@ -2381,6 +2382,7 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que return newMergeFinalizerIterator(ctx, inputs, opt, e.logger) } + // b. this way itrs, err := e.createVarRefIterator(ctx, measurement, opt) if err != nil { return nil, err @@ -2519,6 +2521,7 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal return itrs, nil } +// c. // createVarRefIterator creates an iterator for a variable reference. func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, opt query.IteratorOptions) ([]query.Iterator, error) { ref, _ := opt.Expr.(*influxql.VarRef) diff --git a/pkg/tsdb/index/tsi1/file_set.go b/pkg/tsdb/index/tsi1/file_set.go index b4da932..c4442fa 100644 --- a/pkg/tsdb/index/tsi1/file_set.go +++ b/pkg/tsdb/index/tsi1/file_set.go @@ -339,6 +339,7 @@ func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator { return MergeTagValueIterators(a...) } +// d. // TagValueSeriesIDIterator returns a series iterator for a single tag value. func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { ss := tsdb.NewSeriesIDSet() diff --git a/pkg/tsdb/index/tsi1/index.go b/pkg/tsdb/index/tsi1/index.go index 1d18745..943e918 100644 --- a/pkg/tsdb/index/tsi1/index.go +++ b/pkg/tsdb/index/tsi1/index.go @@ -748,6 +748,7 @@ func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsS return nil } +// 2. // CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted. func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { ids, err := i.partition(key).createSeriesListIfNotExists([][]byte{name}, []models.Tags{tags}) @@ -970,6 +971,7 @@ func (i *Index) TagKeyIterator(name []byte) (tsdb.TagKeyIterator, error) { return tsdb.MergeTagKeyIterators(a...), nil } +// a. // TagValueIterator returns an iterator for all values across a single key. func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error) { a := make([]tsdb.TagValueIterator, 0, len(i.partitions)) @@ -982,6 +984,7 @@ func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error return tsdb.MergeTagValueIterators(a...), nil } +// b. // TagKeySeriesIDIterator returns a series iterator for all values across a single key. func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) @@ -996,6 +999,7 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, return tsdb.MergeSeriesIDIterators(a...), nil } +// c. // TagValueSeriesIDIterator returns a series iterator for a single tag value. func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { // Check series ID set cache... diff --git a/pkg/tsdb/index/tsi1/log_file.go b/pkg/tsdb/index/tsi1/log_file.go index ebce4a8..60b810f 100644 --- a/pkg/tsdb/index/tsi1/log_file.go +++ b/pkg/tsdb/index/tsi1/log_file.go @@ -517,6 +517,7 @@ func (f *LogFile) DeleteTagValue(name, key, value []byte) error { } // AddSeriesList adds a list of series to the log file in bulk. +// 4. func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { // README.md (tsi), Writes - step 1 seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice) diff --git a/pkg/tsdb/index/tsi1/partition.go b/pkg/tsdb/index/tsi1/partition.go index 2582485..743f073 100644 --- a/pkg/tsdb/index/tsi1/partition.go +++ b/pkg/tsdb/index/tsi1/partition.go @@ -708,6 +708,7 @@ func (p *Partition) DropMeasurement(name []byte) error { // createSeriesListIfNotExists creates a list of series if they doesn't exist in // bulk. +// 3. func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { // Is there anything to do? The partition may have been sent an empty batch. if len(names) == 0 { diff --git a/pkg/tsdb/series_file.go b/pkg/tsdb/series_file.go index 4100dcf..fd59063 100644 --- a/pkg/tsdb/series_file.go +++ b/pkg/tsdb/series_file.go @@ -175,6 +175,7 @@ func (f *SeriesFile) FileSize() (n int64, err error) { // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. // The returned ids slice returns IDs for every name+tags, creating new series IDs as needed. +// 5. func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { keys := GenerateSeriesKeys(names, tagsSlice) keyPartitionIDs := f.SeriesKeysPartitionIDs(keys) diff --git a/pkg/tsdb/series_index.go b/pkg/tsdb/series_index.go index 5914234..d42d17d 100644 --- a/pkg/tsdb/series_index.go +++ b/pkg/tsdb/series_index.go @@ -49,7 +49,7 @@ type SeriesIndex struct { // In-memory data since rebuild. keyIDMap *rhh.HashMap - idOffsetMap map[uint64]int64 + idOffsetMap map[uint64]int64 tombstones map[uint64]struct{} } @@ -161,6 +161,7 @@ func (idx *SeriesIndex) IsDeleted(id uint64) bool { return idx.FindOffsetByID(id) == 0 } +// 7. func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) { switch flag { case SeriesEntryInsertFlag: diff --git a/pkg/tsdb/series_partition.go b/pkg/tsdb/series_partition.go index 7dc433f..3eb5c02 100644 --- a/pkg/tsdb/series_partition.go +++ b/pkg/tsdb/series_partition.go @@ -196,6 +196,7 @@ func (p *SeriesPartition) FileSize() (n int64, err error) { // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. // The ids parameter is modified to contain series IDs for all keys belonging to this partition. +// 6. func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error { var writeRequired bool p.mu.RLock() diff --git a/pkg/tsdb/shard.go b/pkg/tsdb/shard.go index 72394f1..da00a1c 100644 --- a/pkg/tsdb/shard.go +++ b/pkg/tsdb/shard.go @@ -1017,6 +1017,7 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt return engine.CreateIterator(ctx, m.Name, opt) } +// a. func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) { index, err := s.Index() if err != nil { diff --git a/pkg/tsdb/shard_test.go b/pkg/tsdb/shard_test.go index 6c4f4ad..3f1d6b7 100644 --- a/pkg/tsdb/shard_test.go +++ b/pkg/tsdb/shard_test.go @@ -676,6 +676,7 @@ cpu,host=serverB,region=uswest value=25 0 itr, err := sh.CreateIterator(context.Background(), m, query.IteratorOptions{ Expr: influxql.MustParseExpr(`value`), Aux: []influxql.VarRef{{Val: "val2"}}, + // dimension is for tag? Dimensions: []string{"host"}, Ascending: true, StartTime: influxql.MinTime, @@ -755,6 +756,7 @@ cpu,host=serverB,region=uswest value=25 0 var err error m = &influxql.Measurement{Name: "cpu"} itr, err := sh.CreateIterator(context.Background(), m, query.IteratorOptions{ + // how does this parse function return? Expr: influxql.MustParseExpr(`value`), Aux: []influxql.VarRef{{Val: "val2"}}, Dimensions: []string{"host"},