Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jun 21, 2024
1 parent d49832b commit b6e2f57
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 36 deletions.
50 changes: 19 additions & 31 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,53 +905,45 @@ func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID U
}
var startTs, endTs Timestamp
startTs, endTs = math.MaxUint64, 0
for i := 0; i < length; i++ {
ts := data.Tss[i]
for _, ts := range data.Tss {
if ts < startTs {
startTs = ts
}
if ts > endTs {
endTs = ts
}
}
for k := 0; k < 2; k++ {
for _, field := range []string{"pk", "ts"} {
binlogWriter = NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
eventWriter.SetEventTimestamp(startTs, endTs)
if err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}

eventWriter.SetEventTimestamp(startTs, endTs)
sizeTotal := 0
for i := 0; i < length; i++ {
if k == 0 { // write pk
pkStr := strconv.FormatInt(data.Pks[i].GetValue().(int64), 10)
err = eventWriter.AddOneStringToPayload(pkStr)
if err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
sizeTotal += len(pkStr)
} else if k == 1 { // write ts
ts := data.Tss[i]
tsStr := strconv.FormatUint(ts, 10)
err = eventWriter.AddOneStringToPayload(tsStr)
if err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
sizeTotal += len(tsStr)
var dataStr string
switch field {
case "pk":
dataStr = strconv.FormatInt(data.Pks[i].GetValue().(int64), 10)
case "ts":
dataStr = strconv.FormatUint(data.Tss[i], 10)
default:
return nil, fmt.Errorf("unsupported field")
}
if err = eventWriter.AddOneStringToPayload(dataStr, true); err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
sizeTotal += len(dataStr)
}

binlogWriter.SetEventTimeStamp(startTs, endTs)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = binlogWriter.Finish()
if err != nil {
if err = binlogWriter.Finish(); err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
Expand All @@ -962,11 +954,7 @@ func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID U
binlogWriter.Close()
return nil, err
}
// if k == 0 {
// blobKey := fmt.Sprintf("deltaPk", field.FieldID)
// } else if k == 1 {
// blobKey := fmt.Sprintf("deltaTs", field.FieldID)
// }

blobs = append(blobs, &Blob{
Value: buffer,
MemorySize: data.Size(),
Expand Down
18 changes: 18 additions & 0 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,24 @@ func TestDeleteCodec(t *testing.T) {
})
}

func TestDeleteCodecV2(t *testing.T) {
t.Run("int64 pk", func(t *testing.T) {
deleteCodec := NewDeleteCodec()
pk1 := &Int64PrimaryKey{
Value: 1,
}
deleteData := NewDeleteData([]PrimaryKey{pk1}, []uint64{43757345})

pk2 := &Int64PrimaryKey{
Value: 2,
}
deleteData.Append(pk2, 23578294723)
blob, err := deleteCodec.SerializeV2(CollectionID, 1, 1, deleteData)
assert.NoError(t, err)
assert.Equal(t, 2, len(blob))
})
}

func TestUpgradeDeleteLog(t *testing.T) {
t.Run("normal", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (crr *compositeBinlogRecordReader) Close() {
}

func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) {
fmt.Println(blobKey)
if _, _, _, colId, logId, ok := metautil.ParseInsertLogPath(blobKey); ok {
return colId, logId
}
Expand Down
40 changes: 35 additions & 5 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBinlogDeserializeReader(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test deserialize", func(t *testing.T) {
t.Run("test binlog deserialize", func(t *testing.T) {
size := 3
blobs, err := generateTestData(size)
assert.NoError(t, err)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test serialize", func(t *testing.T) {
t.Run("test binlog serialize", func(t *testing.T) {
size := 16
blobs, err := generateTestData(size)
assert.NoError(t, err)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestDeltalogDeserializeReader(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test deserialize", func(t *testing.T) {
t.Run("test deltalog deserialize", func(t *testing.T) {
size := 3
blob, err := generateTestDeltalogData(size, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestDeltalogSerializeWriter(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test serialize", func(t *testing.T) {
t.Run("test deltalog serialize", func(t *testing.T) {
size := 16
blob, err := generateTestDeltalogData(size, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -342,6 +342,36 @@ func TestDeltalogSerializeWriter(t *testing.T) {
})
}

func TestDeltalogDeserializeReaderV2(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewDeltalogDeserializeReaderV2(nil)
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
assert.Equal(t, io.EOF, err)
})

t.Run("test deltalog deserialize v2", func(t *testing.T) {
size := 3
blob, err := generateTestDeltalogData(size, true)
assert.NoError(t, err)
reader, err := NewDeltalogDeserializeReaderV2(blob)
assert.NoError(t, err)
defer reader.Close()

for i := 0; i < size; i++ {
err = reader.Next()
assert.NoError(t, err)

value := reader.Value()
assertTestDeltalogData(t, i, value)
}

err = reader.Next()
assert.Equal(t, io.EOF, err)
})
}

func TestDeltalogSerializeWriterV2(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewDeltalogDeserializeReaderV2(nil)
Expand All @@ -351,7 +381,7 @@ func TestDeltalogSerializeWriterV2(t *testing.T) {
assert.Equal(t, io.EOF, err)
})

t.Run("test serialize", func(t *testing.T) {
t.Run("test deltalog serialize v2", func(t *testing.T) {
size := 16
blobs, err := generateTestDeltalogData(size, true)
assert.NoError(t, err)
Expand Down

0 comments on commit b6e2f57

Please sign in to comment.