From 2cb26d51c479aeb949ad91e13de3b2f780f734a4 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Thu, 13 Jun 2024 12:17:56 +0800 Subject: [PATCH] refactor data codec deserialize Signed-off-by: shaoting-huang --- internal/storage/data_codec.go | 558 ++++++++++++++------------------- 1 file changed, 231 insertions(+), 327 deletions(-) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index e578c3ec5320e..01ea8964f9737 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -510,344 +510,257 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int if eventReader == nil { break } - switch dataType { - case schemapb.DataType_Bool: - singleData, err := eventReader.GetBoolFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BoolFieldData{ - Data: make([]bool, 0, rowNum), - } - } - boolFieldData := insertData.Data[fieldID].(*BoolFieldData) - - boolFieldData.Data = append(boolFieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = boolFieldData - - case schemapb.DataType_Int8: - singleData, err := eventReader.GetInt8FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int8FieldData{ - Data: make([]int8, 0, rowNum), - } - } - int8FieldData := insertData.Data[fieldID].(*Int8FieldData) - - int8FieldData.Data = append(int8FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int8FieldData - - case schemapb.DataType_Int16: - singleData, err := eventReader.GetInt16FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int16FieldData{ - Data: make([]int16, 0, rowNum), - } - } - int16FieldData := insertData.Data[fieldID].(*Int16FieldData) - - int16FieldData.Data = append(int16FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int16FieldData - - case schemapb.DataType_Int32: - singleData, err := eventReader.GetInt32FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int32FieldData{ - Data: make([]int32, 0, rowNum), - } - } - int32FieldData := insertData.Data[fieldID].(*Int32FieldData) - - int32FieldData.Data = append(int32FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int32FieldData - - case schemapb.DataType_Int64: - singleData, err := eventReader.GetInt64FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int64FieldData{ - Data: make([]int64, 0, rowNum), - } - } - int64FieldData := insertData.Data[fieldID].(*Int64FieldData) - - int64FieldData.Data = append(int64FieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int64FieldData - - case schemapb.DataType_Float: - singleData, err := eventReader.GetFloatFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &FloatFieldData{ - Data: make([]float32, 0, rowNum), - } - } - floatFieldData := insertData.Data[fieldID].(*FloatFieldData) - - floatFieldData.Data = append(floatFieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = floatFieldData - - case schemapb.DataType_Double: - singleData, err := eventReader.GetDoubleFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &DoubleFieldData{ - Data: make([]float64, 0, rowNum), - } - } - doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData) - - doubleFieldData.Data = append(doubleFieldData.Data, singleData...) - totalLength += len(singleData) - insertData.Data[fieldID] = doubleFieldData - - case schemapb.DataType_String, schemapb.DataType_VarChar: - stringPayload, err := eventReader.GetStringFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + data, _, err := eventReader.GetDataFromPayload() + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + length, err := AddInsertData(dataType, data, insertData, fieldID, rowNum, eventReader, dim) + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + totalLength += length + eventReader.Close() + } - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &StringFieldData{ - Data: make([]string, 0, rowNum), - } - } - stringFieldData := insertData.Data[fieldID].(*StringFieldData) + if rowNum <= 0 { + rowNum = totalLength + } - stringFieldData.Data = append(stringFieldData.Data, stringPayload...) - stringFieldData.DataType = dataType - totalLength += len(stringPayload) - insertData.Data[fieldID] = stringFieldData + if fieldID == common.TimeStampField { + blobInfo := BlobInfo{ + Length: totalLength, + } + insertData.Infos = append(insertData.Infos, blobInfo) + } + binlogReader.Close() + } - case schemapb.DataType_Array: - arrayPayload, err := eventReader.GetArrayFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + return collectionID, partitionID, segmentID, nil +} - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &ArrayFieldData{ - Data: make([]*schemapb.ScalarField, 0, rowNum), - } - } - arrayFieldData := insertData.Data[fieldID].(*ArrayFieldData) +func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *InsertData, fieldID int64, rowNum int, eventReader *EventReader, dim int) (dataLength int, err error) { + switch dataType { + case schemapb.DataType_Bool: + singleData := data.([]bool) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &BoolFieldData{ + Data: make([]bool, 0, rowNum), + } + } + boolFieldData := insertData.Data[fieldID].(*BoolFieldData) - arrayFieldData.Data = append(arrayFieldData.Data, arrayPayload...) - totalLength += len(arrayPayload) - insertData.Data[fieldID] = arrayFieldData + boolFieldData.Data = append(boolFieldData.Data, singleData...) + insertData.Data[fieldID] = boolFieldData + return len(singleData), nil - case schemapb.DataType_JSON: - jsonPayload, err := eventReader.GetJSONFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + case schemapb.DataType_Int8: + singleData := data.([]int8) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int8FieldData{ + Data: make([]int8, 0, rowNum), + } + } + int8FieldData := insertData.Data[fieldID].(*Int8FieldData) - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &JSONFieldData{ - Data: make([][]byte, 0, rowNum), - } - } - jsonFieldData := insertData.Data[fieldID].(*JSONFieldData) + int8FieldData.Data = append(int8FieldData.Data, singleData...) + insertData.Data[fieldID] = int8FieldData + return len(singleData), nil - jsonFieldData.Data = append(jsonFieldData.Data, jsonPayload...) - totalLength += len(jsonPayload) - insertData.Data[fieldID] = jsonFieldData + case schemapb.DataType_Int16: + singleData := data.([]int16) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int16FieldData{ + Data: make([]int16, 0, rowNum), + } + } + int16FieldData := insertData.Data[fieldID].(*Int16FieldData) - case schemapb.DataType_BinaryVector: - var singleData []byte - singleData, dim, err = eventReader.GetBinaryVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + int16FieldData.Data = append(int16FieldData.Data, singleData...) + insertData.Data[fieldID] = int16FieldData + return len(singleData), nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BinaryVectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData) + case schemapb.DataType_Int32: + singleData := data.([]int32) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int32FieldData{ + Data: make([]int32, 0, rowNum), + } + } + int32FieldData := insertData.Data[fieldID].(*Int32FieldData) - binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - binaryVectorFieldData.Dim = dim - insertData.Data[fieldID] = binaryVectorFieldData + int32FieldData.Data = append(int32FieldData.Data, singleData...) + insertData.Data[fieldID] = int32FieldData + return len(singleData), nil - case schemapb.DataType_Float16Vector: - var singleData []byte - singleData, dim, err = eventReader.GetFloat16VectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + case schemapb.DataType_Int64: + singleData := data.([]int64) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int64FieldData{ + Data: make([]int64, 0, rowNum), + } + } + int64FieldData := insertData.Data[fieldID].(*Int64FieldData) - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Float16VectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - float16VectorFieldData := insertData.Data[fieldID].(*Float16VectorFieldData) + int64FieldData.Data = append(int64FieldData.Data, singleData...) + insertData.Data[fieldID] = int64FieldData + return len(singleData), nil - float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - float16VectorFieldData.Dim = dim - insertData.Data[fieldID] = float16VectorFieldData + case schemapb.DataType_Float: + singleData := data.([]float32) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &FloatFieldData{ + Data: make([]float32, 0, rowNum), + } + } + floatFieldData := insertData.Data[fieldID].(*FloatFieldData) - case schemapb.DataType_BFloat16Vector: - var singleData []byte - singleData, dim, err = eventReader.GetBFloat16VectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + floatFieldData.Data = append(floatFieldData.Data, singleData...) + insertData.Data[fieldID] = floatFieldData + return len(singleData), nil - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BFloat16VectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - bfloat16VectorFieldData := insertData.Data[fieldID].(*BFloat16VectorFieldData) + case schemapb.DataType_Double: + singleData := data.([]float64) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &DoubleFieldData{ + Data: make([]float64, 0, rowNum), + } + } + doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData) - bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - bfloat16VectorFieldData.Dim = dim - insertData.Data[fieldID] = bfloat16VectorFieldData + doubleFieldData.Data = append(doubleFieldData.Data, singleData...) + insertData.Data[fieldID] = doubleFieldData + return len(singleData), nil - case schemapb.DataType_FloatVector: - var singleData []float32 - singleData, dim, err = eventReader.GetFloatVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } + case schemapb.DataType_String, schemapb.DataType_VarChar: + singleData := data.([]string) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &StringFieldData{ + Data: make([]string, 0, rowNum), + } + } + stringFieldData := insertData.Data[fieldID].(*StringFieldData) + + stringFieldData.Data = append(stringFieldData.Data, singleData...) + stringFieldData.DataType = dataType + insertData.Data[fieldID] = stringFieldData + return len(singleData), nil + + case schemapb.DataType_Array: + singleData := data.([]*schemapb.ScalarField) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &ArrayFieldData{ + Data: make([]*schemapb.ScalarField, 0, rowNum), + } + } + arrayFieldData := insertData.Data[fieldID].(*ArrayFieldData) - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &FloatVectorFieldData{ - Data: make([]float32, 0, rowNum*dim), - } - } - floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData) + arrayFieldData.Data = append(arrayFieldData.Data, singleData...) + insertData.Data[fieldID] = arrayFieldData + return len(singleData), nil - floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - floatVectorFieldData.Dim = dim - insertData.Data[fieldID] = floatVectorFieldData + case schemapb.DataType_JSON: + singleData := data.([][]byte) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &JSONFieldData{ + Data: make([][]byte, 0, rowNum), + } + } + jsonFieldData := insertData.Data[fieldID].(*JSONFieldData) - case schemapb.DataType_SparseFloatVector: - sparseData, _, err := eventReader.GetSparseFloatVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &SparseFloatVectorFieldData{} - } - vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData) - vec.AppendAllRows(sparseData) + jsonFieldData.Data = append(jsonFieldData.Data, singleData...) + insertData.Data[fieldID] = jsonFieldData + return len(singleData), nil - totalLength += sparseData.RowNum() - insertData.Data[fieldID] = vec + case schemapb.DataType_BinaryVector: + singleData := data.([]byte) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &BinaryVectorFieldData{ + Data: make([]byte, 0, rowNum*dim), + } + } + binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData) - default: - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, fmt.Errorf("undefined data type %d", dataType) + binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + binaryVectorFieldData.Dim = dim + insertData.Data[fieldID] = binaryVectorFieldData + return length, nil + + case schemapb.DataType_Float16Vector: + singleData := data.([]byte) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Float16VectorFieldData{ + Data: make([]byte, 0, rowNum*dim), } - eventReader.Close() } + float16VectorFieldData := insertData.Data[fieldID].(*Float16VectorFieldData) - if rowNum <= 0 { - rowNum = totalLength + float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + float16VectorFieldData.Dim = dim + insertData.Data[fieldID] = float16VectorFieldData + return length, nil + + case schemapb.DataType_BFloat16Vector: + singleData := data.([]byte) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &BFloat16VectorFieldData{ + Data: make([]byte, 0, rowNum*dim), + } } + bfloat16VectorFieldData := insertData.Data[fieldID].(*BFloat16VectorFieldData) - if fieldID == common.TimeStampField { - blobInfo := BlobInfo{ - Length: totalLength, + bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + bfloat16VectorFieldData.Dim = dim + insertData.Data[fieldID] = bfloat16VectorFieldData + return length, nil + + case schemapb.DataType_FloatVector: + singleData := data.([]float32) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &FloatVectorFieldData{ + Data: make([]float32, 0, rowNum*dim), } - insertData.Infos = append(insertData.Infos, blobInfo) } - binlogReader.Close() - } + floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData) - return collectionID, partitionID, segmentID, nil + floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return 0, err + } + floatVectorFieldData.Dim = dim + insertData.Data[fieldID] = floatVectorFieldData + return length, nil + + case schemapb.DataType_SparseFloatVector: + singleData := data.(*SparseFloatVectorFieldData) + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &SparseFloatVectorFieldData{} + } + vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData) + vec.AppendAllRows(singleData) + insertData.Data[fieldID] = vec + return singleData.RowNum(), nil + + default: + return 0, fmt.Errorf("undefined data type %d", dataType) + } + return 0, nil } // Deserialize transfer blob back to insert data. @@ -877,13 +790,12 @@ func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog { func (dl *DeleteLog) UnmarshalJSON(data []byte) error { var messageMap map[string]*json.RawMessage - err := json.Unmarshal(data, &messageMap) - if err != nil { + var err error + if err = json.Unmarshal(data, &messageMap); err != nil { return err } - err = json.Unmarshal(*messageMap["pkType"], &dl.PkType) - if err != nil { + if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil { return err } @@ -894,13 +806,11 @@ func (dl *DeleteLog) UnmarshalJSON(data []byte) error { dl.Pk = &VarCharPrimaryKey{} } - err = json.Unmarshal(*messageMap["pk"], dl.Pk) - if err != nil { + if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil { return err } - err = json.Unmarshal(*messageMap["ts"], &dl.Ts) - if err != nil { + if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil { return err } @@ -1184,8 +1094,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1194,8 +1103,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1204,8 +1112,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1214,8 +1121,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1226,12 +1132,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ // https://github.com/milvus-io/milvus/issues/9620 writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - err = writer.Finish() - if err != nil { + if err = writer.Finish(); err != nil { return nil, err } - buffer, err = writer.GetBuffer() - if err != nil { + if buffer, err = writer.GetBuffer(); err != nil { return nil, err } blobs = append(blobs, &Blob{