From 4bfb75af9b4cce1a2ad8487faf8ebc1bc1f7f588 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Thu, 13 Jun 2024 12:17:56 +0800 Subject: [PATCH] refactor data codec deserialize Signed-off-by: shaoting-huang --- internal/storage/data_codec.go | 521 +++++++++++++-------------------- 1 file changed, 198 insertions(+), 323 deletions(-) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index b8091614292cc..62d3d1874978e 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -436,7 +436,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int dataType := binlogReader.PayloadDataType fieldID := binlogReader.FieldID totalLength := 0 - dim := 0 for { eventReader, err := binlogReader.NextEventReader() @@ -446,344 +445,229 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int if eventReader == nil { break } - switch dataType { - case schemapb.DataType_Bool: - singleData, err := eventReader.GetBoolFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BoolFieldData{ - Data: make([]bool, 0, rowNum), - } - } - boolFieldData := insertData.Data[fieldID].(*BoolFieldData) - - boolFieldData.Data = append(boolFieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = boolFieldData - - case schemapb.DataType_Int8: - singleData, err := eventReader.GetInt8FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int8FieldData{ - Data: make([]int8, 0, rowNum), - } - } - int8FieldData := insertData.Data[fieldID].(*Int8FieldData) - - int8FieldData.Data = append(int8FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int8FieldData - - case schemapb.DataType_Int16: - singleData, err := eventReader.GetInt16FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int16FieldData{ - Data: make([]int16, 0, rowNum), - } - } - int16FieldData := insertData.Data[fieldID].(*Int16FieldData) - - int16FieldData.Data = append(int16FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int16FieldData - - case schemapb.DataType_Int32: - singleData, err := eventReader.GetInt32FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int32FieldData{ - Data: make([]int32, 0, rowNum), - } - } - int32FieldData := insertData.Data[fieldID].(*Int32FieldData) - - int32FieldData.Data = append(int32FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int32FieldData - - case schemapb.DataType_Int64: - singleData, err := eventReader.GetInt64FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int64FieldData{ - Data: make([]int64, 0, rowNum), - } - } - int64FieldData := insertData.Data[fieldID].(*Int64FieldData) + data, dim, err := eventReader.GetDataFromPayload() + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + length, err := AddInsertData(dataType, data, insertData, fieldID, rowNum, eventReader, dim) + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + totalLength += length + eventReader.Close() + } - int64FieldData.Data = append(int64FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int64FieldData + if rowNum <= 0 { + rowNum = totalLength + } - case schemapb.DataType_Float: - singleData, err := eventReader.GetFloatFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + if fieldID == common.TimeStampField { + blobInfo := BlobInfo{ + Length: totalLength, + } + insertData.Infos = append(insertData.Infos, blobInfo) + } + binlogReader.Close() + } - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &FloatFieldData{ - Data: make([]float32, 0, rowNum), - } - } - floatFieldData := insertData.Data[fieldID].(*FloatFieldData) + return collectionID, partitionID, segmentID, nil +} - floatFieldData.Data = append(floatFieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = floatFieldData +func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *InsertData, fieldID int64, rowNum int, eventReader *EventReader, dim int) (dataLength int, err error) { + fieldData := insertData.Data[fieldID] + switch dataType { + case schemapb.DataType_Bool: + singleData := data.([]bool) + if fieldData == nil { + fieldData = &BoolFieldData{Data: make([]bool, 0, rowNum)} + } + boolFieldData := fieldData.(*BoolFieldData) - case schemapb.DataType_Double: - singleData, err := eventReader.GetDoubleFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + boolFieldData.Data = append(boolFieldData.Data, singleData...) + insertData.Data[fieldID] = boolFieldData + return len(singleData), nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &DoubleFieldData{ - Data: make([]float64, 0, rowNum), - } - } - doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData) + case schemapb.DataType_Int8: + singleData := data.([]int8) + if fieldData == nil { + fieldData = &Int8FieldData{Data: make([]int8, 0, rowNum)} + } + int8FieldData := fieldData.(*Int8FieldData) - doubleFieldData.Data = append(doubleFieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = doubleFieldData + int8FieldData.Data = append(int8FieldData.Data, singleData...) + insertData.Data[fieldID] = int8FieldData + return len(singleData), nil - case schemapb.DataType_String, schemapb.DataType_VarChar: - stringPayload, err := eventReader.GetStringFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + case schemapb.DataType_Int16: + singleData := data.([]int16) + if fieldData == nil { + fieldData = &Int16FieldData{Data: make([]int16, 0, rowNum)} + } + int16FieldData := fieldData.(*Int16FieldData) - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &StringFieldData{ - Data: make([]string, 0, rowNum), - } - } - stringFieldData := insertData.Data[fieldID].(*StringFieldData) + int16FieldData.Data = append(int16FieldData.Data, singleData...) + insertData.Data[fieldID] = int16FieldData + return len(singleData), nil - stringFieldData.Data = append(stringFieldData.Data, stringPayload...) - stringFieldData.DataType = dataType - totalLength += len(stringPayload) - insertData.Data[fieldID] = stringFieldData + case schemapb.DataType_Int32: + singleData := data.([]int32) + if fieldData == nil { + fieldData = &Int32FieldData{Data: make([]int32, 0, rowNum)} + } + int32FieldData := fieldData.(*Int32FieldData) - case schemapb.DataType_Array: - arrayPayload, err := eventReader.GetArrayFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + int32FieldData.Data = append(int32FieldData.Data, singleData...) + insertData.Data[fieldID] = int32FieldData + return len(singleData), nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &ArrayFieldData{ - Data: make([]*schemapb.ScalarField, 0, rowNum), - } - } - arrayFieldData := insertData.Data[fieldID].(*ArrayFieldData) + case schemapb.DataType_Int64: + singleData := data.([]int64) + if fieldData == nil { + fieldData = &Int64FieldData{Data: make([]int64, 0, rowNum)} + } + int64FieldData := fieldData.(*Int64FieldData) - arrayFieldData.Data = append(arrayFieldData.Data, arrayPayload...) - totalLength += len(arrayPayload) - insertData.Data[fieldID] = arrayFieldData + int64FieldData.Data = append(int64FieldData.Data, singleData...) + insertData.Data[fieldID] = int64FieldData + return len(singleData), nil - case schemapb.DataType_JSON: - jsonPayload, err := eventReader.GetJSONFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + case schemapb.DataType_Float: + singleData := data.([]float32) + if fieldData == nil { + fieldData = &FloatFieldData{Data: make([]float32, 0, rowNum)} + } + floatFieldData := fieldData.(*FloatFieldData) - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &JSONFieldData{ - Data: make([][]byte, 0, rowNum), - } - } - jsonFieldData := insertData.Data[fieldID].(*JSONFieldData) + floatFieldData.Data = append(floatFieldData.Data, singleData...) + insertData.Data[fieldID] = floatFieldData + return len(singleData), nil - jsonFieldData.Data = append(jsonFieldData.Data, jsonPayload...) - totalLength += len(jsonPayload) - insertData.Data[fieldID] = jsonFieldData + case schemapb.DataType_Double: + singleData := data.([]float64) + if fieldData == nil { + fieldData = &DoubleFieldData{Data: make([]float64, 0, rowNum)} + } + doubleFieldData := fieldData.(*DoubleFieldData) - case schemapb.DataType_BinaryVector: - var singleData []byte - singleData, dim, err = eventReader.GetBinaryVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + doubleFieldData.Data = append(doubleFieldData.Data, singleData...) + insertData.Data[fieldID] = doubleFieldData + return len(singleData), nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BinaryVectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData) - - binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - binaryVectorFieldData.Dim = dim - insertData.Data[fieldID] = binaryVectorFieldData + case schemapb.DataType_String, schemapb.DataType_VarChar: + singleData := data.([]string) + if fieldData == nil { + fieldData = &StringFieldData{Data: make([]string, 0, rowNum)} + } + stringFieldData := fieldData.(*StringFieldData) - case schemapb.DataType_Float16Vector: - var singleData []byte - singleData, dim, err = eventReader.GetFloat16VectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + stringFieldData.Data = append(stringFieldData.Data, singleData...) + stringFieldData.DataType = dataType + insertData.Data[fieldID] = stringFieldData + return len(singleData), nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Float16VectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - float16VectorFieldData := insertData.Data[fieldID].(*Float16VectorFieldData) + case schemapb.DataType_Array: + singleData := data.([]*schemapb.ScalarField) + if fieldData == nil { + fieldData = &ArrayFieldData{Data: make([]*schemapb.ScalarField, 0, rowNum)} + } + arrayFieldData := fieldData.(*ArrayFieldData) - float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - float16VectorFieldData.Dim = dim - insertData.Data[fieldID] = float16VectorFieldData + arrayFieldData.Data = append(arrayFieldData.Data, singleData...) + insertData.Data[fieldID] = arrayFieldData + return len(singleData), nil - case schemapb.DataType_BFloat16Vector: - var singleData []byte - singleData, dim, err = eventReader.GetBFloat16VectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + case schemapb.DataType_JSON: + singleData := data.([][]byte) + if fieldData == nil { + fieldData = &JSONFieldData{Data: make([][]byte, 0, rowNum)} + } + jsonFieldData := fieldData.(*JSONFieldData) - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BFloat16VectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - bfloat16VectorFieldData := insertData.Data[fieldID].(*BFloat16VectorFieldData) + jsonFieldData.Data = append(jsonFieldData.Data, singleData...) + insertData.Data[fieldID] = jsonFieldData + return len(singleData), nil - bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - bfloat16VectorFieldData.Dim = dim - insertData.Data[fieldID] = bfloat16VectorFieldData + case schemapb.DataType_BinaryVector: + singleData := data.([]byte) + if fieldData == nil { + fieldData = &BinaryVectorFieldData{Data: make([]byte, 0, rowNum*dim)} + } + binaryVectorFieldData := fieldData.(*BinaryVectorFieldData) - case schemapb.DataType_FloatVector: - var singleData []float32 - singleData, dim, err = eventReader.GetFloatVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + binaryVectorFieldData.Dim = dim + insertData.Data[fieldID] = binaryVectorFieldData + return length, nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &FloatVectorFieldData{ - Data: make([]float32, 0, rowNum*dim), - } - } - floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData) + case schemapb.DataType_Float16Vector: + singleData := data.([]byte) + if fieldData == nil { + fieldData = &Float16VectorFieldData{Data: make([]byte, 0, rowNum*dim)} + } + float16VectorFieldData := fieldData.(*Float16VectorFieldData) - floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - floatVectorFieldData.Dim = dim - insertData.Data[fieldID] = floatVectorFieldData + float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + float16VectorFieldData.Dim = dim + insertData.Data[fieldID] = float16VectorFieldData + return length, nil - case schemapb.DataType_SparseFloatVector: - sparseData, _, err := eventReader.GetSparseFloatVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &SparseFloatVectorFieldData{} - } - vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData) - vec.AppendAllRows(sparseData) + case schemapb.DataType_BFloat16Vector: + singleData := data.([]byte) + if fieldData == nil { + fieldData = &BFloat16VectorFieldData{Data: make([]byte, 0, rowNum*dim)} + } + bfloat16VectorFieldData := fieldData.(*BFloat16VectorFieldData) - totalLength += sparseData.RowNum() - insertData.Data[fieldID] = vec + bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + bfloat16VectorFieldData.Dim = dim + insertData.Data[fieldID] = bfloat16VectorFieldData + return length, nil - default: - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, fmt.Errorf("undefined data type %d", dataType) - } - eventReader.Close() + case schemapb.DataType_FloatVector: + singleData := data.([]float32) + if fieldData == nil { + fieldData = &FloatVectorFieldData{Data: make([]float32, 0, rowNum*dim)} } + floatVectorFieldData := fieldData.(*FloatVectorFieldData) - if rowNum <= 0 { - rowNum = totalLength + floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return 0, err } + floatVectorFieldData.Dim = dim + insertData.Data[fieldID] = floatVectorFieldData + return length, nil - if fieldID == common.TimeStampField { - blobInfo := BlobInfo{ - Length: totalLength, - } - insertData.Infos = append(insertData.Infos, blobInfo) + case schemapb.DataType_SparseFloatVector: + singleData := data.(*SparseFloatVectorFieldData) + if fieldData == nil { + fieldData = &SparseFloatVectorFieldData{} } - binlogReader.Close() - } + vec := fieldData.(*SparseFloatVectorFieldData) + vec.AppendAllRows(singleData) + insertData.Data[fieldID] = vec + return singleData.RowNum(), nil - return collectionID, partitionID, segmentID, nil + default: + return 0, fmt.Errorf("undefined data type %d", dataType) + } } // Deserialize transfer blob back to insert data. @@ -813,13 +697,12 @@ func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog { func (dl *DeleteLog) UnmarshalJSON(data []byte) error { var messageMap map[string]*json.RawMessage - err := json.Unmarshal(data, &messageMap) - if err != nil { + var err error + if err = json.Unmarshal(data, &messageMap); err != nil { return err } - err = json.Unmarshal(*messageMap["pkType"], &dl.PkType) - if err != nil { + if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil { return err } @@ -830,13 +713,11 @@ func (dl *DeleteLog) UnmarshalJSON(data []byte) error { dl.Pk = &VarCharPrimaryKey{} } - err = json.Unmarshal(*messageMap["pk"], dl.Pk) - if err != nil { + if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil { return err } - err = json.Unmarshal(*messageMap["ts"], &dl.Ts) - if err != nil { + if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil { return err } @@ -1120,8 +1001,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1130,8 +1010,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1140,8 +1019,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1150,8 +1028,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1162,12 +1039,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ // https://github.com/milvus-io/milvus/issues/9620 writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - err = writer.Finish() - if err != nil { + if err = writer.Finish(); err != nil { return nil, err } - buffer, err = writer.GetBuffer() - if err != nil { + if buffer, err = writer.GetBuffer(); err != nil { return nil, err } blobs = append(blobs, &Blob{