Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jun 24, 2024
1 parent 2f38ccd commit f4b77ce
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 69 deletions.
5 changes: 4 additions & 1 deletion internal/storage/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,14 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
}

// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter {
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64, FieldID ...int64) *DeleteBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
if len(FieldID) > 0 {
descriptorEvent.FieldID = FieldID[0]
}
descriptorEvent.SegmentID = segmentID
w := &DeleteBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID U
}
}
for _, blobKey := range []FieldID{common.RowIDField, common.TimeStampField} {
writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID)
writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID, blobKey)
eventWriter, err := writer.NextDeleteEventWriter()
if err != nil {
writer.Close()
Expand Down
106 changes: 59 additions & 47 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (crr *compositeBinlogRecordReader) iterateNextBatch() error {
if err != nil {
return err
}

crr.fields[i] = reader.FieldID
// TODO: assert schema being the same in every blobs
crr.r.schema[reader.FieldID] = reader.PayloadDataType
Expand Down Expand Up @@ -420,6 +419,7 @@ type DeltalogStreamWriter struct {
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldSchema *schemapb.FieldSchema

memorySize int // To be updated on the fly
buf bytes.Buffer
Expand All @@ -430,10 +430,18 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
if dsw.rw != nil {
return dsw.rw, nil
}

rw, err := newSingleFieldRecordWriter(0, arrow.Field{
Name: "delta",
Type: arrow.BinaryTypes.String,
var dataType arrow.DataType
switch dsw.fieldSchema.DataType {
case schemapb.DataType_Int64:
dataType = &arrow.Int64Type{}
case schemapb.DataType_String:
dataType = arrow.BinaryTypes.String
default:
return nil, fmt.Errorf("does not support data type")
}
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{
Name: dsw.fieldSchema.Name,
Type: dataType,
Nullable: false,
}, &dsw.buf)
if err != nil {
Expand Down Expand Up @@ -474,6 +482,7 @@ func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
de.CollectionID = dsw.collectionID
de.PartitionID = dsw.partitionID
de.SegmentID = dsw.segmentID
de.FieldID = dsw.fieldSchema.FieldID
de.StartTimestamp = 0
de.EndTimestamp = 0
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
Expand Down Expand Up @@ -502,6 +511,11 @@ func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{
FieldID: common.RowIDField,
Name: "delta",
DataType: schemapb.DataType_String,
},
}
}

Expand Down Expand Up @@ -543,24 +557,32 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De
}, batchSize), nil
}

func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) map[FieldID]*BinlogStreamWriter {
dws := make(map[FieldID]*BinlogStreamWriter, 2)
dws[common.RowIDField] = &BinlogStreamWriter{
func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) map[FieldID]*DeltalogStreamWriter {
dws := make(map[FieldID]*DeltalogStreamWriter, 2)
dws[common.RowIDField] = &DeltalogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
fieldSchema: &schemapb.FieldSchema{
FieldID: common.RowIDField,
Name: common.RowIDFieldName,
DataType: schemapb.DataType_Int64,
},
}
dws[common.TimeStampField] = &BinlogStreamWriter{
dws[common.TimeStampField] = &DeltalogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
fieldSchema: &schemapb.FieldSchema{
FieldID: common.TimeStampField,
Name: common.TimeStampFieldName,
DataType: schemapb.DataType_Int64,
},
}
return dws
}

func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters map[FieldID]*BinlogStreamWriter, batchSize int,
func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters map[FieldID]*DeltalogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error) {
rws := make(map[FieldID]RecordWriter, len(eventWriters))
for fid := range eventWriters {
Expand All @@ -573,72 +595,62 @@ func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters
}
compositeRecordWriter := newCompositeRecordWriter(rws)
return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) {
builders := [2]array.Builder{
array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String),
array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String),
array.NewInt64Builder(memory.DefaultAllocator)
builders := [2]*array.Int64Builder{
array.NewInt64Builder(memory.DefaultAllocator),
array.NewInt64Builder(memory.DefaultAllocator),
}
var memorySize uint64
for _, vv := range v {
pkStr := strconv.FormatInt(vv.Pk.GetValue().(int64), 10)
builders[0].AppendValueFromString(pkStr)
memorySize += uint64(len(pkStr))
pk := vv.Pk.GetValue().(int64)
builders[0].Append(pk)
memorySize += uint64(arrow.Int64SizeBytes)

tsStr := strconv.FormatUint(vv.Ts, 10)
builders[1].AppendValueFromString(tsStr)
memorySize += uint64(len(tsStr))
ts := int64(vv.Ts)
builders[1].Append(ts)
memorySize += uint64(arrow.Int64SizeBytes)
}
arrays := []arrow.Array{builders[0].NewArray(), builders[1].NewArray()}
fields := []arrow.Field{
{
Name: common.RowIDFieldName,
Type: arrow.BinaryTypes.String,
Type: &arrow.Int64Type{},
Nullable: false,
},
{
Name: common.TimeStampFieldName,
Type: arrow.BinaryTypes.String,
Type: &arrow.Int64Type{},
Nullable: false,
},
}
field2Col := map[FieldID]int{
0: 0,
1: 1,
common.RowIDField: 0,
common.TimeStampField: 1,
}
schema := map[FieldID]schemapb.DataType{
0: 0,
1: 1,
common.RowIDField: schemapb.DataType_Int64,
common.TimeStampField: schemapb.DataType_Int64,
}
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), schema, field2Col), memorySize, nil
}, batchSize), nil
}

func NewDeltalogDeserializeReaderV2(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*DeleteLog], error) {
func NewDeltalogDeserializeReaderV2(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
reader, err := newCompositeBinlogRecordReader(blobs)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
Tss := r.Column(common.TimeStampField).(*array.Int64)
Pks := r.Column(common.RowIDField).(*array.Int64)
if Tss.Len() != Pks.Len() {
return fmt.Errorf("the length of primary keys and timestamps should be the same for delta log")
}
for i := 0; i < r.Len(); i++ {
dl := v[i]
if dl == nil {
dl = &DeleteLog{}
v[i] = dl
}

for j, dt := range r.Schema() {
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
fmt.Println(d)
if ok {
switch j {
case common.TimeStampField:
dl.Ts = d.(uint64)
case common.RowIDField:
dl.Pk = &Int64PrimaryKey{Value: d.(int64)}
}
} else {
return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt))
}
if v[i] == nil {
v[i] = &DeleteLog{}
}
v[i] = NewDeleteLog(&Int64PrimaryKey{Value: Pks.Value(i)}, uint64(Tss.Value(i)))
}
return nil
}), nil
Expand Down
29 changes: 9 additions & 20 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestNull(t *testing.T) {
})
}

func generateTestDeltalogData(size int, useNewFormat bool) ([]*Blob, error) {
func generateTestDeltalogData(size int, useNewFormat ...bool) ([]*Blob, error) {
codec := NewDeleteCodec()
pks := make([]int64, size)
tss := make([]uint64, size)
Expand All @@ -251,7 +251,7 @@ func generateTestDeltalogData(size int, useNewFormat bool) ([]*Blob, error) {
for i := range pks {
data.Append(NewInt64PrimaryKey(pks[i]), tss[i])
}
if useNewFormat {
if len(useNewFormat) > 0 {
return codec.SerializeV2(0, 0, 0, data)
}
blob, err := codec.Serialize(0, 0, 0, data)
Expand All @@ -274,7 +274,7 @@ func TestDeltalogDeserializeReader(t *testing.T) {

t.Run("test deserialize", func(t *testing.T) {
size := 3
blob, err := generateTestDeltalogData(size, false)
blob, err := generateTestDeltalogData(size)
assert.NoError(t, err)
reader, err := NewDeltalogDeserializeReader(blob)
assert.NoError(t, err)
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestDeltalogSerializeWriter(t *testing.T) {

t.Run("test serialize", func(t *testing.T) {
size := 16
blob, err := generateTestDeltalogData(size, false)
blob, err := generateTestDeltalogData(size)
assert.NoError(t, err)
reader, err := NewDeltalogDeserializeReader(blob)
assert.NoError(t, err)
Expand Down Expand Up @@ -348,9 +348,9 @@ func TestDeltalogSerializeWriter(t *testing.T) {
})
}

func TestDeltalogDeserializeReaderV2(t *testing.T) {
func TestDeltalogV2(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewDeltalogDeserializeReaderV2(nil, common.RowIDField)
reader, err := NewDeltalogDeserializeReaderV2(nil)
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
Expand All @@ -361,7 +361,7 @@ func TestDeltalogDeserializeReaderV2(t *testing.T) {
size := 3
blob, err := generateTestDeltalogData(size, true)
assert.NoError(t, err)
reader, err := NewDeltalogDeserializeReaderV2(blob, common.RowIDField)
reader, err := NewDeltalogDeserializeReaderV2(blob)
assert.NoError(t, err)
defer reader.Close()

Expand All @@ -376,22 +376,12 @@ func TestDeltalogDeserializeReaderV2(t *testing.T) {
err = reader.Next()
assert.Equal(t, io.EOF, err)
})
}

func TestDeltalogSerializeWriterV2(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewDeltalogDeserializeReaderV2(nil, common.RowIDField)
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
assert.Equal(t, io.EOF, err)
})

t.Run("test serialize v2", func(t *testing.T) {
size := 16
blobs, err := generateTestDeltalogData(size, true)
assert.NoError(t, err)
reader, err := NewDeltalogDeserializeReaderV2(blobs, common.RowIDField)
reader, err := NewDeltalogDeserializeReaderV2(blobs)
assert.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -422,12 +412,11 @@ func TestDeltalogSerializeWriterV2(t *testing.T) {
blob, err := w.Finalize()
assert.NoError(t, err)
assert.NotNil(t, blob)
assert.True(t, blob.MemorySize > 0)
newblobs[i] = blob
i++
}
// assert.Equal(t, blobs[0].Value, newblobs[0].Value)
reader, err = NewDeltalogDeserializeReaderV2(blobs, common.RowIDField)
reader, err = NewDeltalogDeserializeReaderV2(blobs)
assert.NoError(t, err)
defer reader.Close()
for i := 0; i < size; i++ {
Expand Down

0 comments on commit f4b77ce

Please sign in to comment.