diff --git a/go/vt/proto/vtadmin/vtadmin.pb.go b/go/vt/proto/vtadmin/vtadmin.pb.go index 8cdb420002d..132c302d7c1 100644 --- a/go/vt/proto/vtadmin/vtadmin.pb.go +++ b/go/vt/proto/vtadmin/vtadmin.pb.go @@ -1239,8 +1239,12 @@ type ApplySchemaRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` - Request *vtctldata.ApplySchemaRequest `protobuf:"bytes,2,opt,name=request,proto3" json:"request,omitempty"` + ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` + // Request.Sql will be overriden by this Sql field. + Sql string `protobuf:"bytes,2,opt,name=sql,proto3" json:"sql,omitempty"` + // Request.CallerId will be overriden by this CallerId field. + CallerId string `protobuf:"bytes,3,opt,name=caller_id,json=callerId,proto3" json:"caller_id,omitempty"` + Request *vtctldata.ApplySchemaRequest `protobuf:"bytes,4,opt,name=request,proto3" json:"request,omitempty"` } func (x *ApplySchemaRequest) Reset() { @@ -1282,6 +1286,20 @@ func (x *ApplySchemaRequest) GetClusterId() string { return "" } +func (x *ApplySchemaRequest) GetSql() string { + if x != nil { + return x.Sql + } + return "" +} + +func (x *ApplySchemaRequest) GetCallerId() string { + if x != nil { + return x.CallerId + } + return "" +} + func (x *ApplySchemaRequest) GetRequest() *vtctldata.ApplySchemaRequest { if x != nil { return x.Request @@ -7995,11 +8013,14 @@ var file_vtadmin_proto_rawDesc = []byte{ 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x77, 0x69, 0x74, 0x63, 0x68, 0x54, 0x72, 0x61, 0x66, 0x66, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x22, 0x6c, 0x0a, 0x12, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, + 0x22, 0x9b, 0x01, 0x0a, 0x12, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x71, 0x6c, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x71, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x74, 0x63, 0x74, 0x6c, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x70, 0x70, 0x6c, 0x79, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x80, 0x01, 0x0a, 0x1c, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x4d, diff --git a/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go b/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go index 06bc371c4f8..2a99c730ecd 100644 --- a/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go +++ b/go/vt/proto/vtadmin/vtadmin_vtproto.pb.go @@ -454,6 +454,8 @@ func (m *ApplySchemaRequest) CloneVT() *ApplySchemaRequest { } r := new(ApplySchemaRequest) r.ClusterId = m.ClusterId + r.Sql = m.Sql + r.CallerId = m.CallerId r.Request = m.Request.CloneVT() if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) @@ -4038,6 +4040,20 @@ func (m *ApplySchemaRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= size i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) i-- + dAtA[i] = 0x22 + } + if len(m.CallerId) > 0 { + i -= len(m.CallerId) + copy(dAtA[i:], m.CallerId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.CallerId))) + i-- + dAtA[i] = 0x1a + } + if len(m.Sql) > 0 { + i -= len(m.Sql) + copy(dAtA[i:], m.Sql) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Sql))) + i-- dAtA[i] = 0x12 } if len(m.ClusterId) > 0 { @@ -10321,6 +10337,14 @@ func (m *ApplySchemaRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.Sql) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + l = len(m.CallerId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } if m.Request != nil { l = m.Request.SizeVT() n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) @@ -15873,6 +15897,70 @@ func (m *ApplySchemaRequest) UnmarshalVT(dAtA []byte) error { m.ClusterId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Sql", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Sql = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CallerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CallerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Request", wireType) } diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go index cef8816504a..4f91459d9ed 100644 --- a/go/vt/vtadmin/api.go +++ b/go/vt/vtadmin/api.go @@ -59,6 +59,7 @@ import ( "vitess.io/vitess/go/vt/vtadmin/rbac" "vitess.io/vitess/go/vt/vtadmin/sort" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" + "vitess.io/vitess/go/vt/vtctl/grpcvtctldserver" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" @@ -488,6 +489,31 @@ func (api *API) ApplySchema(ctx context.Context, req *vtadminpb.ApplySchemaReque return nil, err } + // Parser with default options. New() itself initializes with default MySQL version. + parser, err := sqlparser.New(sqlparser.Options{ + TruncateUILen: 512, + TruncateErrLen: 0, + }) + if err != nil { + return nil, err + } + + // Split the sql statement received from request. + sqlParts, err := parser.SplitStatementToPieces(req.Sql) + if err != nil { + return nil, err + } + + req.Request.Sql = sqlParts + + // Set the callerID if not empty. + if req.CallerId != "" { + req.Request.CallerId = &vtrpcpb.CallerID{Principal: req.CallerId} + } + + // Set the default wait replicas timeout. + req.Request.WaitReplicasTimeout = protoutil.DurationToProto(grpcvtctldserver.DefaultWaitReplicasTimeout) + return c.ApplySchema(ctx, req.Request) } diff --git a/go/vt/vtadmin/http/schema_migrations.go b/go/vt/vtadmin/http/schema_migrations.go index e0207989648..3da6026fe9f 100644 --- a/go/vt/vtadmin/http/schema_migrations.go +++ b/go/vt/vtadmin/http/schema_migrations.go @@ -34,19 +34,26 @@ func ApplySchema(ctx context.Context, r Request, api *API) *JSONResponse { decoder := json.NewDecoder(r.Body) defer r.Body.Close() - var req vtctldatapb.ApplySchemaRequest - if err := decoder.Decode(&req); err != nil { + var body struct { + Sql string `json:"sql"` + CallerId string `json:"caller_id"` + Request vtctldatapb.ApplySchemaRequest `json:"request"` + } + + if err := decoder.Decode(&body); err != nil { return NewJSONResponse(nil, &errors.BadRequest{ Err: err, }) } vars := mux.Vars(r.Request) - req.Keyspace = vars["keyspace"] + body.Request.Keyspace = vars["keyspace"] resp, err := api.server.ApplySchema(ctx, &vtadminpb.ApplySchemaRequest{ ClusterId: vars["cluster_id"], - Request: &req, + Sql: body.Sql, + CallerId: body.CallerId, + Request: &body.Request, }) return NewJSONResponse(resp, err) diff --git a/proto/vtadmin.proto b/proto/vtadmin.proto index 78f086ec345..963d1fa5779 100644 --- a/proto/vtadmin.proto +++ b/proto/vtadmin.proto @@ -388,7 +388,11 @@ message WorkflowSwitchTrafficRequest { message ApplySchemaRequest { string cluster_id = 1; - vtctldata.ApplySchemaRequest request = 2; + // Request.Sql will be overriden by this Sql field. + string sql = 2; + // Request.CallerId will be overriden by this CallerId field. + string caller_id = 3; + vtctldata.ApplySchemaRequest request = 4; } message CancelSchemaMigrationRequest { diff --git a/web/vtadmin/src/api/http.ts b/web/vtadmin/src/api/http.ts index 3f75330d240..674df961ef0 100644 --- a/web/vtadmin/src/api/http.ts +++ b/web/vtadmin/src/api/http.ts @@ -1068,3 +1068,41 @@ export const showVDiff = async ({ clusterID, request }: ShowVDiffParams) => { return vtadmin.VDiffShowResponse.create(result); }; + +export const fetchSchemaMigrations = async (request: vtadmin.IGetSchemaMigrationsRequest) => { + const { result } = await vtfetch(`/api/migrations/`, { + body: JSON.stringify(request), + method: 'post', + }); + + const err = vtadmin.GetSchemaMigrationsResponse.verify(result); + if (err) throw Error(err); + + return vtadmin.GetSchemaMigrationsResponse.create(result); +}; + +export interface ApplySchemaParams { + clusterID: string; + keyspace: string; + callerID: string; + sql: string; + request: vtctldata.IApplySchemaRequest; +} + +export const applySchema = async ({ clusterID, keyspace, callerID, sql, request }: ApplySchemaParams) => { + const body = { + sql, + caller_id: callerID, + request, + }; + + const { result } = await vtfetch(`/api/migration/${clusterID}/${keyspace}`, { + body: JSON.stringify(body), + method: 'post', + }); + + const err = vtctldata.ApplySchemaResponse.verify(result); + if (err) throw Error(err); + + return vtctldata.ApplySchemaResponse.create(result); +}; diff --git a/web/vtadmin/src/components/App.tsx b/web/vtadmin/src/components/App.tsx index ef27a35dc95..3bb41ea35f0 100644 --- a/web/vtadmin/src/components/App.tsx +++ b/web/vtadmin/src/components/App.tsx @@ -45,6 +45,8 @@ import { Transactions } from './routes/Transactions'; import { Transaction } from './routes/transaction/Transaction'; import { CreateReshard } from './routes/createWorkflow/CreateReshard'; import { CreateMaterialize } from './routes/createWorkflow/CreateMaterialize'; +import { SchemaMigrations } from './routes/SchemaMigrations'; +import { CreateSchemaMigration } from './routes/createSchemaMigration/CreateSchemaMigration'; export const App = () => { return ( @@ -140,6 +142,16 @@ export const App = () => { + + + + + {!isReadOnlyMode() && ( + + + + )} + diff --git a/web/vtadmin/src/components/NavRail.tsx b/web/vtadmin/src/components/NavRail.tsx index 9f9e1bf1681..b30cd165684 100644 --- a/web/vtadmin/src/components/NavRail.tsx +++ b/web/vtadmin/src/components/NavRail.tsx @@ -65,6 +65,9 @@ export const NavRail = () => {