Skip to content

Commit

Permalink
add delta log stream new format reader and writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jun 21, 2024
1 parent 5f02e52 commit 4251a0c
Show file tree
Hide file tree
Showing 5 changed files with 406 additions and 52 deletions.
183 changes: 167 additions & 16 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,25 @@ func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog {
}
}

func DeltalogUnmarshal(strVal string) (*DeleteLog, error) {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(strVal, ",")
if len(splits) != 2 {
return nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal)
}
pkInt, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
return nil, err
}
tsInt, err := strconv.ParseUint(splits[1], 10, 64)
if err != nil {
return nil, err
}
return NewDeleteLog(&Int64PrimaryKey{Value: pkInt}, tsInt), nil
}

func (dl *DeleteLog) UnmarshalJSON(data []byte) error {
var messageMap map[string]*json.RawMessage
var err error
Expand Down Expand Up @@ -870,6 +889,80 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
return blob, nil
}

// Serialize transfer delete data to blob. .
// For each delete message, it will save pk and ts string to binlog separately,
// to avoid json marshal.
func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) ([]*Blob, error) {
blobs := make([]*Blob, 0)
var binlogWriter *DeleteBinlogWriter
length := len(data.Pks)
if length != len(data.Tss) {
return nil, fmt.Errorf("the length of pks, and TimeStamps is not equal")
}
var startTs, endTs Timestamp
startTs, endTs = math.MaxUint64, 0
for _, ts := range data.Tss {
if ts < startTs {
startTs = ts
}
if ts > endTs {
endTs = ts
}
}
for _, blobKey := range []FieldID{common.PrimaryKeyField, common.TimeStampField} {
binlogWriter = NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
if err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
eventWriter.SetEventTimestamp(startTs, endTs)
sizeTotal := 0
for i := 0; i < length; i++ {
var dataStr string
switch blobKey {
case common.PrimaryKeyField:
dataStr = strconv.FormatInt(data.Pks[i].GetValue().(int64), 10)
case common.TimeStampField:
dataStr = strconv.FormatUint(data.Tss[i], 10)
default:
return nil, fmt.Errorf("unsupported field")
}
if err = eventWriter.AddOneStringToPayload(dataStr, true); err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
sizeTotal += len(dataStr)
}

binlogWriter.SetEventTimeStamp(startTs, endTs)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
if err = binlogWriter.Finish(); err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
buffer, err := binlogWriter.GetBuffer()
if err != nil {
eventWriter.Close()
binlogWriter.Close()
return nil, err
}
blobs = append(blobs, &Blob{
Key: strconv.Itoa(int(blobKey)),
Value: buffer,
MemorySize: data.Size(),
})

eventWriter.Close()
binlogWriter.Close()
}

return blobs, nil
}

// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
if len(blobs) == 0 {
Expand Down Expand Up @@ -911,25 +1004,11 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID

v, err := p.Parse(strVal)
if err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(strVal, ",")
if len(splits) != 2 {
return fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal)
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
return err
}
deleteLog.Pk = &Int64PrimaryKey{
Value: pk,
}
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
dl, err := DeltalogUnmarshal(strVal)
if err != nil {
return err
}
deleteLog = dl
} else {
deleteLog.Ts = v.GetUint64("ts")
deleteLog.PkType = v.GetInt64("pkType")
Expand All @@ -956,6 +1035,78 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return pid, sid, result, nil
}

// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) DeserializeV2(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
if len(blobs) == 0 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
}

var pid, sid UniqueID
result := &DeleteData{}

deserializeBlob := func(blob *Blob) error {
key, err := strconv.ParseInt(blob.Key, 10, 64)
if err != nil {
return fmt.Errorf("can not parse blob key")
}
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
return err
}
defer binlogReader.Close()

pid, sid = binlogReader.PartitionID, binlogReader.SegmentID
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return err
}
defer eventReader.Close()

rr, err := eventReader.GetArrowRecordReader()
if err != nil {
return err
}
defer rr.Release()
for rr.Next() {
rec := rr.Record()
defer rec.Release()
column := rec.Column(0)
for i := 0; i < column.Len(); i++ {
switch key {
case common.PrimaryKeyField:
pkInt, err := strconv.ParseInt(column.ValueStr(i), 10, 64)
if err != nil {
return err
}
pk := &Int64PrimaryKey{Value: pkInt}
result.Pks = append(result.Pks, pk)
result.memSize += pk.Size()
case common.TimeStampField:
tsInt, err := strconv.ParseUint(column.ValueStr(i), 10, 64)
if err != nil {
return err
}
result.Tss = append(result.Tss, tsInt)
result.memSize += int64(8)
default:
return fmt.Errorf("not support delta log blob key")
}
}
}
return nil
}
for _, blob := range blobs {
if err := deserializeBlob(blob); err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
}
if len(result.Pks) != len(result.Tss) {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the length of pks and tss should be the same")
}
result.RowCount = int64(len(result.Pks))
return pid, sid, result, nil
}

// DataDefinitionCodec serializes and deserializes the data definition
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
Expand Down
24 changes: 24 additions & 0 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,30 @@ func TestDeleteCodec(t *testing.T) {
})
}

func TestDeleteCodecV2(t *testing.T) {
t.Run("int64 pk", func(t *testing.T) {
deleteCodec := NewDeleteCodec()
pk1 := &Int64PrimaryKey{
Value: 1,
}
deleteData := NewDeleteData([]PrimaryKey{pk1}, []uint64{43757345})

pk2 := &Int64PrimaryKey{
Value: 2,
}
deleteData.Append(pk2, 23578294723)
blob, err := deleteCodec.SerializeV2(CollectionID, 1, 1, deleteData)
assert.NoError(t, err)
assert.Equal(t, 2, len(blob))

pid, sid, data, err := deleteCodec.DeserializeV2(blob)
assert.NoError(t, err)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
})
}

func TestUpgradeDeleteLog(t *testing.T) {
t.Run("normal", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
Expand Down
Loading

0 comments on commit 4251a0c

Please sign in to comment.