diff --git a/acceptance/clients/clients.go b/acceptance/clients/clients.go index fafe91f86..651d59daf 100644 --- a/acceptance/clients/clients.go +++ b/acceptance/clients/clients.go @@ -632,6 +632,16 @@ func NewDmsV1Client() (*golangsdk.ServiceClient, error) { }) } +func NewDisV2Client() (*golangsdk.ServiceClient, error) { + cc, err := CloudAndClient() + if err != nil { + return nil, err + } + return openstack.NewDISServiceV2(cc.ProviderClient, golangsdk.EndpointOpts{ + Region: cc.RegionName, + }) +} + func NewDmsV11Client() (*golangsdk.ServiceClient, error) { cc, err := CloudAndClient() if err != nil { diff --git a/acceptance/openstack/dis/v2/dis_test.go b/acceptance/openstack/dis/v2/dis_test.go new file mode 100644 index 000000000..0487e07fb --- /dev/null +++ b/acceptance/openstack/dis/v2/dis_test.go @@ -0,0 +1,322 @@ +package v2 + +import ( + "log" + "strings" + "testing" + "time" + + "github.com/opentelekomcloud/gophertelekomcloud/acceptance/clients" + "github.com/opentelekomcloud/gophertelekomcloud/acceptance/tools" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/common/pointerto" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/apps" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/checkpoints" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/data" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/dump" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/monitors" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/streams" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/obs" + th "github.com/opentelekomcloud/gophertelekomcloud/testhelper" +) + +func TestDISWorkflow(t *testing.T) { + client, err := clients.NewDisV2Client() + th.AssertNoErr(t, err) + now := time.Now() + appName := tools.RandomString("app-create-test-", 3) + log.Printf("Create DIS App, Name: %s", appName) + err = apps.CreateApp(client, apps.CreateAppOpts{ + AppName: appName, + }) + th.AssertNoErr(t, err) + t.Cleanup(func() { + log.Printf("Delete DIS App, Name: %s", appName) + err = apps.DeleteApp(client, appName) + th.AssertNoErr(t, err) + }) + + log.Printf("Get DIS App, Name: %s", appName) + app, err := apps.GetApp(client, appName) + th.AssertNoErr(t, err) + th.AssertEquals(t, app.AppName, appName) + + log.Print("List DIS Apps") + listApps, err := apps.ListApps(client, apps.ListAppOpts{Limit: pointerto.Int(10)}) + th.AssertNoErr(t, err) + th.AssertEquals(t, *listApps.TotalNumber, 1) + + streamName := tools.RandomString("stream-create-test-", 3) + log.Printf("Create DIS Stream, Name: %s", streamName) + err = streams.CreateStream(client, streams.CreateStreamOpts{ + StreamName: streamName, + PartitionCount: 3, + }) + th.AssertNoErr(t, err) + t.Cleanup(func() { + log.Printf("Delete DIS Stream, Name: %s", streamName) + err = streams.DeleteStream(client, streamName) + th.AssertNoErr(t, err) + }) + + log.Printf("Get DIS App status, Name: %s", appName) + appStatus, err := apps.GetAppStatus(client, apps.GetAppStatusOpts{ + AppName: appName, + StreamName: streamName, + CheckpointType: "LAST_READ", + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, len(appStatus.PartitionConsumingStates), 3) + + log.Printf("Get DIS Stream, Name: %s", streamName) + getStream, err := streams.GetStream(client, streams.GetStreamOpts{ + StreamName: streamName, + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, getStream.StreamName, streamName) + + log.Printf("Update DIS Stream partitions count, Name: %s", streamName) + err = streams.UpdatePartitionCount(client, streams.UpdatePartitionCountOpts{ + StreamName: streamName, + TargetPartitionCount: 4, + }) + th.AssertNoErr(t, err) + + // "Bad request with: [PUT https://dis.eu-de.otc.t-systems.com/v2/5045c215010c440d91b2f7dce1f3753b/streams/stream-create-test-jmn], + // error message: {\"errorCode\":\"DIS.4200\",\"message\":\"Invalid request. [Invalid target_partition_count null.]\"}" + // https://jira.tsi-dev.otc-service.com/browse/BM-2472 + // err = streams.UpdateStream(client, streams.UpdateStreamOpts{ + // StreamName: streamName, + // DataType: "JSON", + // }) + // th.AssertNoErr(t, err) + + // getStreamUpdated, err := streams.GetStream(client, streams.GetStreamOpts{ + // StreamName: streamName, + // }) + // th.AssertNoErr(t, err) + // th.AssertEquals(t, getStreamUpdated.StreamName, streamName) + // th.AssertEquals(t, getStreamUpdated.DataType, "JSON") + + // url not found: https://jira.tsi-dev.otc-service.com/browse/BM-2474 + // log.Printf("Create DIS Stream Policy Rule, Name: %s", streamName) + // err = streams.CreatePolicyRule(client, streams.CreatePolicyRuleOpts{ + // StreamName: streamName, + // StreamId: getStream.StreamId, + // PrincipalName: client.DomainID, + // ActionType: "putRecords", + // Effect: "effect", + // }) + // th.AssertNoErr(t, err) + // + // log.Printf("Get DIS Stream Policy Rule, Name: %s", streamName) + // rule, err := streams.GetPolicyRule(client, streamName) + // th.AssertNoErr(t, err) + // tools.PrintResource(t, rule) + + log.Print("List DIS Streams") + listStreams, err := streams.ListStreams(client, streams.ListStreamsOpts{}) + th.AssertNoErr(t, err) + th.AssertEquals(t, listStreams.StreamInfoList[0].DataType, "BLOB") + + log.Printf("Commit DIS App Checkpoint, Name: %s", appName) + err = checkpoints.CommitCheckpoint(client, checkpoints.CommitCheckpointOpts{ + AppName: appName, + CheckpointType: "LAST_READ", + StreamName: streamName, + PartitionId: "0", + SequenceNumber: "0", + }) + th.AssertNoErr(t, err) + t.Cleanup(func() { + log.Printf("Delete DIS App Checkpoint, Name: %s", appName) + err = checkpoints.DeleteCheckpoint(client, checkpoints.DeleteCheckpointOpts{ + AppName: appName, + StreamName: streamName, + PartitionId: "0", + CheckpointType: "LAST_READ", + }) + th.AssertNoErr(t, err) + }) + + log.Printf("Get DIS App Checkpoint, Name: %s", appName) + checkpoint, err := checkpoints.GetCheckpoint(client, checkpoints.GetCheckpointOpts{ + AppName: appName, + StreamName: streamName, + PartitionId: "0", + CheckpointType: "LAST_READ", + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, checkpoint.SequenceNumber, "0") + + log.Printf("Create DIS Stream Data records, Name: %s", streamName) + _, err = data.PutRecords(client, data.PutRecordsOpts{ + StreamName: streamName, + StreamId: getStream.StreamId, + Records: []data.PutRecordsRequestEntry{ + { + Data: "dGVzdCBzdHJpbmc=", + }, + }, + }) + th.AssertNoErr(t, err) + + log.Printf("Create DIS Stream cursor, Name: %s", streamName) + cursor, err := data.GetCursor(client, data.GetCursorOpts{ + StreamName: streamName, + PartitionId: "0", + }) + th.AssertNoErr(t, err) + tools.PrintResource(t, cursor) + + log.Printf("Get DIS Stream records, Name: %s", streamName) + records, err := data.GetRecords(client, data.GetRecordsOpts{ + PartitionCursor: cursor.PartitionCursor, + }) + th.AssertNoErr(t, err) + tools.PrintResource(t, records) + + log.Print("Querying Stream Monitoring Data") + streamMon, err := monitors.GetStreamMonitor(client, monitors.GetStreamMonitorOpts{ + StreamName: streamName, + Label: "total_put_bytes_per_stream", + StartTime: now.Unix(), + EndTime: now.Unix() + 50, + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, streamMon.Metrics.Label, "total_put_bytes_per_stream") + + log.Print("Querying Partition Monitoring Data") + streamPartMon, err := monitors.GetPartitionMonitor(client, monitors.GetPartitionMonitorOpts{ + PartitionId: "0", + StreamName: streamName, + Label: "total_put_bytes_per_partition", + StartTime: now.Unix(), + EndTime: now.Unix() + 50, + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, streamPartMon.Metrics.Label, "total_put_bytes_per_partition") +} + +func TestDISDumpWorkflow(t *testing.T) { + t.Skip("Need to create dis_admin_agency first") + client, err := clients.NewDisV2Client() + th.AssertNoErr(t, err) + + clientObs, err := clients.NewOBSClient() + th.AssertNoErr(t, err) + + bucketName := strings.ToLower(tools.RandomString("obs-dis-test", 5)) + + _, err = clientObs.CreateBucket(&obs.CreateBucketInput{ + Bucket: bucketName, + }) + t.Cleanup(func() { + _, err = clientObs.DeleteBucket(bucketName) + th.AssertNoErr(t, err) + }) + th.AssertNoErr(t, err) + + appName := tools.RandomString("app-create-test-", 3) + log.Printf("Create DIS App, Name: %s", appName) + err = apps.CreateApp(client, apps.CreateAppOpts{ + AppName: appName, + }) + th.AssertNoErr(t, err) + t.Cleanup(func() { + log.Printf("Delete DIS App, Name: %s", appName) + err = apps.DeleteApp(client, appName) + th.AssertNoErr(t, err) + }) + + streamName := tools.RandomString("stream-create-test-", 3) + log.Printf("Create DIS Stream, Name: %s", streamName) + err = streams.CreateStream(client, streams.CreateStreamOpts{ + StreamName: streamName, + PartitionCount: 3, + }) + th.AssertNoErr(t, err) + t.Cleanup(func() { + log.Printf("Delete DIS Stream, Name: %s", streamName) + err = streams.DeleteStream(client, streamName) + th.AssertNoErr(t, err) + }) + + taskName := tools.RandomString("task-create-test-", 3) + log.Printf("Delete DIS Dump task, Name: %s", taskName) + err = dump.CreateOBSDumpTask(client, dump.CreateOBSDumpTaskOpts{ + StreamName: streamName, + DestinationType: "OBS", + OBSDestinationDescriptor: dump.OBSDestinationDescriptorOpts{ + TaskName: taskName, + AgencyName: "dis_admin_agency", + ConsumerStrategy: "LATEST", + DestinationFileType: "text", + OBSBucketPath: bucketName, + FilePrefix: "", + PartitionFormat: "yyyy/MM/dd/HH/mm", + RecordDelimiter: "|", + DeliverTimeInterval: pointerto.Int(30), + }, + }) + th.AssertNoErr(t, err) + + t.Cleanup(func() { + log.Printf("Delete DIS Dump task, Name: %s", taskName) + err = dump.DeleteTransferTask(client, dump.DeleteTransferTaskOpts{ + StreamName: streamName, + TaskName: taskName, + }) + th.AssertNoErr(t, err) + }) + + log.Printf("Get DIS Dump task, Name: %s", taskName) + getDump, err := dump.GetTransferTask(client, dump.GetTransferTaskOpts{ + StreamName: streamName, + TaskName: taskName, + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, getDump.TaskName, taskName) + + err = dump.TransferTaskAction(client, dump.TransferTaskActionOpts{ + StreamName: streamName, + Action: "stop", + Tasks: []dump.BatchTransferTask{ + { + Id: getDump.TaskId, + }, + }, + }) + th.AssertNoErr(t, err) + + log.Printf("Check DIS Dump task state is paused, Name: %s", taskName) + stateDumpStopped, err := dump.GetTransferTask(client, dump.GetTransferTaskOpts{ + StreamName: streamName, + TaskName: taskName, + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, stateDumpStopped.State, "PAUSED") + + log.Print("List DIS Dump tasks") + listTasks, err := dump.ListTransferTasks(client, streamName) + th.AssertNoErr(t, err) + th.AssertEquals(t, *listTasks.TotalNumber, 1) + + err = dump.TransferTaskAction(client, dump.TransferTaskActionOpts{ + StreamName: streamName, + Action: "start", + Tasks: []dump.BatchTransferTask{ + { + Id: getDump.TaskId, + }, + }, + }) + th.AssertNoErr(t, err) + + log.Printf("Check DIS Dump task state is running, Name: %s", taskName) + stateDumpStarted, err := dump.GetTransferTask(client, dump.GetTransferTaskOpts{ + StreamName: streamName, + TaskName: taskName, + }) + th.AssertNoErr(t, err) + th.AssertEquals(t, stateDumpStarted.State, "RUNNING") +} diff --git a/openstack/client.go b/openstack/client.go index 4bd8e2ef6..cddea2797 100644 --- a/openstack/client.go +++ b/openstack/client.go @@ -727,10 +727,16 @@ func NewDCSServiceV1(client *golangsdk.ProviderClient, eo golangsdk.EndpointOpts // NewDDSServiceV3 creates a ServiceClient that may be used to access the Document Database Service. func NewDDSServiceV3(client *golangsdk.ProviderClient, eo golangsdk.EndpointOpts) (*golangsdk.ServiceClient, error) { sc, err := initClientOpts(client, eo, "ddsv3") + return sc, err +} + +func NewDISServiceV2(client *golangsdk.ProviderClient, eo golangsdk.EndpointOpts) (*golangsdk.ServiceClient, error) { + sc, err := initClientOpts(client, eo, "dis") if err != nil { return nil, err } - return sc, nil + sc.ResourceBase = sc.Endpoint + "v2/" + client.ProjectID + "/" + return sc, err } // NewOBSService creates a ServiceClient that may be used to access the Object Storage Service. diff --git a/openstack/dis/v2/apps/CreateApp.go b/openstack/dis/v2/apps/CreateApp.go new file mode 100644 index 000000000..d744c1c9a --- /dev/null +++ b/openstack/dis/v2/apps/CreateApp.go @@ -0,0 +1,27 @@ +package apps + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreateAppOpts struct { + // Unique identifier of the consumer application to be created. + // The application name contains 1 to 200 characters, including letters, digits, underscores (_), and hyphens (-). + // Minimum: 1 + // Maximum: 200 + AppName string `json:"app_name"` +} + +func CreateApp(client *golangsdk.ServiceClient, opts CreateAppOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/apps + _, err = client.Post(client.ServiceURL("apps"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{201}, + }) + return err +} diff --git a/openstack/dis/v2/apps/DeleteApp.go b/openstack/dis/v2/apps/DeleteApp.go new file mode 100644 index 000000000..0b5e3d377 --- /dev/null +++ b/openstack/dis/v2/apps/DeleteApp.go @@ -0,0 +1,9 @@ +package apps + +import golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + +func DeleteApp(client *golangsdk.ServiceClient, appName string) (err error) { + // DELETE /v2/{project_id}/apps/{app_name} + _, err = client.Delete(client.ServiceURL("apps", appName), nil) + return +} diff --git a/openstack/dis/v2/apps/GetApp.go b/openstack/dis/v2/apps/GetApp.go new file mode 100644 index 000000000..591c592ea --- /dev/null +++ b/openstack/dis/v2/apps/GetApp.go @@ -0,0 +1,29 @@ +package apps + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +func GetApp(client *golangsdk.ServiceClient, appName string) (*GetAppResponse, error) { + // GET /v2/{project_id}/apps/{app_name} + raw, err := client.Get(client.ServiceURL("apps", appName), nil, nil) + if err != nil { + return nil, err + } + + var res GetAppResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetAppResponse struct { + // Name of the app. + AppName string `json:"app_name,omitempty"` + // Unique identifier of the app. + AppId string `json:"app_id,omitempty"` + // Time when the app is created, in milliseconds. + CreateTime *int64 `json:"create_time,omitempty"` + // List of associated streams. + CommitCheckPointStreamNames []string `json:"commit_checkpoint_stream_names,omitempty"` +} diff --git a/openstack/dis/v2/apps/GetAppStatus.go b/openstack/dis/v2/apps/GetAppStatus.go new file mode 100644 index 000000000..dd3fd143d --- /dev/null +++ b/openstack/dis/v2/apps/GetAppStatus.go @@ -0,0 +1,80 @@ +package apps + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetAppStatusOpts struct { + // Name of the app to be queried. + AppName string + // Name of the stream to be queried. + // Maximum: 60 + StreamName string + // Max. number of partitions to list in a single API call. + // The minimum value is 1 and the maximum value is 1,000. + // The default value is 100. + // Minimum: 1 + // Maximum: 1000 + // Default: 100 + Limit *int `q:"limit,omitempty"` + // Name of the partition to start the partition list with. + // The returned partition list does not contain this partition. + StartPartitionId string `q:"start_partition_id,omitempty"` + // Type of the checkpoint. + // - LAST_READ: Only sequence numbers are recorded in databases. + // Enumeration values: + // LAST_READ + CheckpointType string `q:"checkpoint_type"` +} + +func GetAppStatus(client *golangsdk.ServiceClient, opts GetAppStatusOpts) (*GetAppStatusResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/apps/{app_name}/streams/{stream_name} + raw, err := client.Get(client.ServiceURL("apps", opts.AppName, "streams", opts.StreamName)+q.String(), nil, nil) + if err != nil { + return nil, err + } + + var res GetAppStatusResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetAppStatusResponse struct { + // Specify whether there are more matching DIS streams to list. Possible values: + // true: yes + // false: no + // Default: false. + HasMore bool `json:"has_more"` + // Stream Name + StreamName string `json:"stream_name"` + // App name + AppName string `json:"app_name"` + // Partition consuming state list + PartitionConsumingStates []struct { + // Partition Id + PartitionId string `json:"partition_id"` + // Partition Sequence Number + SequenceNumber string `json:"sequence_number"` + // Partition data latest offset + LatestOffset int `json:"latest_offset"` + // Partition data earliest offset + EarliestOffset int `json:"earliest_offset"` + // Type of the checkpoint. + // LAST_READ: Only sequence numbers are recorded in databases. + // Enumeration values: + // LAST_READ + CheckpointType string `json:"checkpoint_type"` + // Partition Status: + // CREATING + // ACTIVE + // DELETED + // EXPIRED + Status string `json:"status"` + } `json:"partition_consuming_states"` +} diff --git a/openstack/dis/v2/apps/ListApp.go b/openstack/dis/v2/apps/ListApp.go new file mode 100644 index 000000000..ec8fbf0e5 --- /dev/null +++ b/openstack/dis/v2/apps/ListApp.go @@ -0,0 +1,57 @@ +package apps + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type ListAppOpts struct { + // Maximum number of apps to list in a single API call. Value range: 1-100 Default value: 10 + // Minimum: 1 + // Maximum: 1000 + // Default: 10 + Limit *int `q:"limit,omitempty"` + // Name of the app to start the list with. The returned app list does not contain this app name. + StartAppName string `q:"start_app_name,omitempty"` + // Name of the stream whose apps will be returned. + StreamName string `q:"stream_name,omitempty"` +} + +func ListApps(client *golangsdk.ServiceClient, opts ListAppOpts) (*ListAppResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/apps + raw, err := client.Get(client.ServiceURL("apps")+q.String(), nil, nil) + if err != nil { + return nil, err + } + + var res ListAppResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type ListAppResponse struct { + // Specifies whether there are more matching consumer applications to list. + // true: yes + // false: no + HasMoreApp *bool `json:"has_more_app,omitempty"` + // AppEntry list that meets the current request. + Apps []DescribeAppResult `json:"apps,omitempty"` + // Total number of apps that meet criteria. + TotalNumber *int `json:"total_number,omitempty"` +} + +type DescribeAppResult struct { + // Name of the app. + AppName string `json:"app_name,omitempty"` + // Unique identifier of the app. + AppId string `json:"app_id,omitempty"` + // Time when the app is created, in milliseconds. + CreateTime *int64 `json:"create_time,omitempty"` + // List of associated streams. + CommitCheckPointStreamNames []string `json:"commit_checkpoint_stream_names,omitempty"` +} diff --git a/openstack/dis/v2/checkpoints/CommitCheckpoint.go b/openstack/dis/v2/checkpoints/CommitCheckpoint.go new file mode 100644 index 000000000..4cc29c3f2 --- /dev/null +++ b/openstack/dis/v2/checkpoints/CommitCheckpoint.go @@ -0,0 +1,43 @@ +package checkpoints + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CommitCheckpointOpts struct { + // Name of the app, which is the unique identifier of a user data consumption program. + AppName string `json:"app_name"` + // Type of the checkpoint. + // LAST_READ: Only sequence numbers are recorded in databases. + // Enumeration values: + // LAST_READ + CheckpointType string `json:"checkpoint_type"` + // Name of the stream. + StreamName string `json:"stream_name"` + // Partition identifier of the stream. The value can be in either of the following formats: + // shardId-0000000000 + // 0 + // For example, if a stream has three partitions, the partition identifiers are 0, 1, and 2, or shardId-0000000000, shardId-0000000001, and shardId-0000000002, respectively. + PartitionId string `json:"partition_id"` + // Sequence number to be submitted, which is used to record the consumption checkpoint of the stream. + // Ensure that the sequence number is within the valid range. + SequenceNumber string `json:"sequence_number"` + // Metadata information of the consumer application. + // The metadata information can contain a maximum of 1,000 characters. + // Maximum: 1000 + Metadata string `json:"metadata,omitempty"` +} + +func CommitCheckpoint(client *golangsdk.ServiceClient, opts CommitCheckpointOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/checkpoints + _, err = client.Post(client.ServiceURL("checkpoints"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{201}, + }) + return err +} diff --git a/openstack/dis/v2/checkpoints/DeleteCheckpoint.go b/openstack/dis/v2/checkpoints/DeleteCheckpoint.go new file mode 100644 index 000000000..2e074d79d --- /dev/null +++ b/openstack/dis/v2/checkpoints/DeleteCheckpoint.go @@ -0,0 +1,32 @@ +package checkpoints + +import golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + +type DeleteCheckpointOpts struct { + // Name of the stream to which the checkpoint belongs. + StreamName string `q:"stream_name"` + // Name of the application associated with the checkpoint. + // Minimum: 1 + // Maximum: 50 + AppName string `q:"app_name"` + // Type of the checkpoint. LAST_READ: Only sequence numbers are recorded in databases. + // Enumeration values: + // LAST_READ + CheckpointType string `q:"checkpoint_type"` + // Identifier of the stream partition to which the checkpoint belongs. The value can be in either of the following formats: + // shardId-0000000000 + // 0 + // For example, if a stream has three partitions, the partition identifiers are 0, 1, and 2, and shardId-0000000000, shardId-0000000001, shardId-0000000002, respectively. + PartitionId string `q:"partition_id,omitempty"` +} + +func DeleteCheckpoint(client *golangsdk.ServiceClient, opts DeleteCheckpointOpts) (err error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return err + } + + // DELETE /v2/{project_id}/checkpoints + _, err = client.Delete(client.ServiceURL("checkpoints")+q.String(), nil) + return +} diff --git a/openstack/dis/v2/checkpoints/GetCheckpoint.go b/openstack/dis/v2/checkpoints/GetCheckpoint.go new file mode 100644 index 000000000..10353dc51 --- /dev/null +++ b/openstack/dis/v2/checkpoints/GetCheckpoint.go @@ -0,0 +1,49 @@ +package checkpoints + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetCheckpointOpts struct { + // Name of the stream to which the checkpoint belongs. + StreamName string `q:"stream_name"` + // Identifier of the stream partition to which the checkpoint belongs. + // The value can be in either of the following formats: + // - shardId-0000000000 + // - 0 + // For example, if a stream has three partitions, the partition identifiers are 0, 1, and 2, + // or shardId-0000000000, shardId-0000000001, and shardId-0000000002, respectively. + PartitionId string `q:"partition_id"` + // Name of the app associated with the checkpoint. + AppName string `q:"app_name"` + // Type of the checkpoint. + // LAST_READ: Only sequence numbers are recorded in databases. + // Enumeration values: + // LAST_READ + CheckpointType string `q:"checkpoint_type"` +} + +func GetCheckpoint(client *golangsdk.ServiceClient, opts GetCheckpointOpts) (*GetCheckpointResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/checkpoints + raw, err := client.Get(client.ServiceURL("checkpoints")+q.String(), nil, nil) + if err != nil { + return nil, err + } + + var res GetCheckpointResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetCheckpointResponse struct { + // Sequence number used to record the consumption checkpoint of the stream. + SequenceNumber string `json:"sequence_number,omitempty"` + // Metadata information of the consumer application. + Metadata string `json:"metadata,omitempty"` +} diff --git a/openstack/dis/v2/data/GetCursor.go b/openstack/dis/v2/data/GetCursor.go new file mode 100644 index 000000000..6ab9661ab --- /dev/null +++ b/openstack/dis/v2/data/GetCursor.go @@ -0,0 +1,76 @@ +package data + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetCursorOpts struct { + // Name of the stream. + StreamName string `q:"stream-name"` + // Partition ID of the stream. The value can be in either of the following formats: + // shardId-00000000000 + // For example, if a stream has three partitions, the partition identifiers are 0, 1, and 2, + // or shardId-0000000000, shardId-0000000001, and shardId-0000000002, respectively. + PartitionId string `q:"partition-id"` + // Cursor type. + // AT_SEQUENCE_NUMBER: Data is read from the position denoted by a specific sequence number + // (that is defined by starting-sequence-number). This is the default cursor type. + // AFTER_SEQUENCE_NUMBER: Data is read right after the position denoted by a specific sequence number + // (that is defined by starting-sequence-number). + // TRIM_HORIZON: Data is read from the earliest data record in the partition. + // For example, a tenant uses a DIS stream to upload three pieces of data A1, A2, and A3. N days later, + // A1 has expired and A2 and A3 are still in the validity period. + // In this case, if the tenant uses TRIM_HORIZON to download the data, the system downloads data from A2. + // LATEST: Data is read from the latest record in the partition. + // This setting ensures that you always read the latest record in the partition. + // AT_TIMESTAMP: Data is read from the position denoted by a specific timestamp. + // Enumeration values: + // AT_SEQUENCE_NUMBER + // AFTER_SEQUENCE_NUMBER + // TRIM_HORIZON + // LATEST + // AT_TIMESTAMP + CursorType string `q:"cursor-type,omitempty"` + // Serial number. A sequence number is a unique identifier for each record. + // DIS automatically allocates a sequence number when the data producer calls the PutRecords operation to add data to the DIS stream. + // SN of the same partition key usually changes with time. + // A longer interval between PutRecords requests results in a larger sequence number. + // The sequence number is closely related to cursor types AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER. + // The two parameters determine the position of the data to be read. + // Value range: 0 to 9,223,372,036,854,775,807 + StartingSequenceNumber string `q:"starting-sequence-number,omitempty"` + // Timestamp when the data record starts to be read, which is closely related to cursor type AT_TIMESTAMP. + // The two parameters determine the position of the data to be read. + // Note: + // The timestamp is accurate to milliseconds. + Timestamp *int64 `q:"timestamp,omitempty"` + // Unique ID of the stream. This parameter is mandatory for obtaining the iterator of an authorized stream. + StreamId string `q:"stream-id,omitempty"` +} + +func GetCursor(client *golangsdk.ServiceClient, opts GetCursorOpts) (*GetCursorResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/cursors + raw, err := client.Get(client.ServiceURL("cursors")+q.String(), nil, nil) + if err != nil { + return nil, err + } + + var res GetCursorResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetCursorResponse struct { + // Data cursor. Value range: a string of 1 to 512 characters + // Note: + // The validity period of a data cursor is 5 minutes. + // Minimum: 1 + // Maximum: 512 + PartitionCursor string `json:"partition_cursor,omitempty"` +} diff --git a/openstack/dis/v2/data/GetRecords.go b/openstack/dis/v2/data/GetRecords.go new file mode 100644 index 000000000..f9a2ef786 --- /dev/null +++ b/openstack/dis/v2/data/GetRecords.go @@ -0,0 +1,65 @@ +package data + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetRecordsOpts struct { + // Data cursor, which needs to be obtained through the API for obtaining data cursors. + // Value range: a string of 1 to 512 characters + // Note: + // The validity period of a data cursor is 5 minutes. + PartitionCursor string `q:"partition-cursor"` + // Maximum number of bytes that can be obtained for each request. + // Note: + // If the value is less than the size of a single record in the partition, the record cannot be obtained. + MaxFetchBytes *int `q:"max_fetch_bytes,omitempty"` +} + +func GetRecords(client *golangsdk.ServiceClient, opts GetRecordsOpts) (*GetRecordsResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/records + raw, err := client.Get(client.ServiceURL("records")+q.String(), nil, &golangsdk.RequestOpts{ + MoreHeaders: map[string]string{"Content-Type": "application/json"}, + }) + if err != nil { + return nil, err + } + + var res GetRecordsResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetRecordsResponse struct { + Records []Record `json:"records,omitempty"` + // Next iterator. + // Note: + // The validity period of a data cursor is 5 minutes. + NextPartitionCursor string `json:"next_partition_cursor,omitempty"` +} + +type Record struct { + // Partition key set when data is being uploaded. + // Note: + // If the partition_key parameter is passed when data is uploaded, this parameter will be returned when data is downloaded. + // If partition_id instead of partition_key is passed when data is uploaded, no partition_key is returned. + PartitionKey string `json:"partition_key,omitempty"` + // Sequence number of the data record. + SequenceNumber string `json:"sequence_number,omitempty"` + // Downloaded data. + // The downloaded data is the serialized binary data (Base64-encoded character string). + // For example, the data returned by the data download API is "ZGF0YQ==", which is "data" after Base64 decoding. + Data string `json:"data,omitempty"` + // Timestamp when the record is written to DIS. + Timestamp *int64 `json:"timestamp,omitempty"` + // Timestamp data type. + // CreateTime: creation time. + // Default: CreateTime + TimestampType string `json:"timestamp_type,omitempty"` +} diff --git a/openstack/dis/v2/data/PutRecords.go b/openstack/dis/v2/data/PutRecords.go new file mode 100644 index 000000000..034e06e46 --- /dev/null +++ b/openstack/dis/v2/data/PutRecords.go @@ -0,0 +1,77 @@ +package data + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type PutRecordsOpts struct { + // Name of the stream. + // Maximum: 64 + StreamName string `json:"stream_name" required:"true"` + // Unique ID of the stream. + // If no stream is found by stream_name and stream_id is not empty, stream_id is used to search for the stream. + // Note: + // This parameter is mandatory when data is uploaded to the authorized stream. + StreamId string `json:"stream_id,omitempty"` + // List of records to be uploaded. + Records []PutRecordsRequestEntry `json:"records" required:"true"` +} + +func PutRecords(client *golangsdk.ServiceClient, opts PutRecordsOpts) (*PutRecordsResponse, error) { + b, err := build.RequestBody(opts, "") + if err != nil { + return nil, err + } + + // POST /v2/{project_id}/records + raw, err := client.Post(client.ServiceURL("records"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + if err != nil { + return nil, err + } + + var res PutRecordsResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type PutRecordsRequestEntry struct { + // Data to be uploaded. The uploaded data is the serialized binary data (character string encoded using Base64). + // For example, if the character string data needs to be uploaded, the character string after Base64 encoding is ZGF0YQ==. + Data string `json:"data"` + // Hash value of the data to be written to the partition. The hash value overwrites the hash value of partition_key. Value range: 0–long.max + ExplicitHashKey string `json:"explicit_hash_key,omitempty"` + // Partition ID of the stream. The value can be in either of the following formats: + // shardId-0000000000 + // 0 + // For example, if a stream has three partitions, the partition identifiers are 0, 1, and 2, or shardId-0000000000, shardId-0000000001, and shardId-0000000002, respectively. + PartitionId string `json:"partition_id,omitempty"` + // Partition to which data is written to. Note: + // If the partition_id parameter is transferred, the partition_id parameter is used preferentially. + // If partition_id is not transferred, partition_key is used. + PartitionKey string `json:"partition_key,omitempty"` +} + +type PutRecordsResponse struct { + // Number of data records that fail to be uploaded. + FailedRecordCount *int `json:"failed_record_count,omitempty"` + + Records []PutRecordsResultEntry `json:"records,omitempty"` +} + +type PutRecordsResultEntry struct { + // ID of the partition to which data is uploaded. + PartitionId string `json:"partition_id,omitempty"` + // Sequence number of the data to be uploaded. + // A sequence number is a unique identifier for each record. + // DIS automatically allocates a sequence number the data producer calls the PutRecords operation to add data to the DIS stream. + // Sequence number of the same partition key usually changes with time. A longer interval between PutRecords requests results in a larger sequence number. + SequenceNumber string `json:"sequence_number,omitempty"` + // Error code. + ErrorCode string `json:"error_code,omitempty"` + // Error message. + ErrorMessage string `json:"error_message,omitempty"` +} diff --git a/openstack/dis/v2/dump/CreateCloudTableDumpTask.go b/openstack/dis/v2/dump/CreateCloudTableDumpTask.go new file mode 100644 index 000000000..6d9d4c637 --- /dev/null +++ b/openstack/dis/v2/dump/CreateCloudTableDumpTask.go @@ -0,0 +1,256 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreateCloudTableDumpTaskOpts struct { + // Name of the stream. + // Maximum: 60 + StreamName string + // Dump destination. + // Possible values: + // - OBS: Data is dumped to OBS. + // - MRS: Data is dumped to MRS. + // - DLI: Data is dumped to DLI. + // - CLOUDTABLE: Data is dumped to CloudTable. + // - DWS: Data is dumped to DWS. + // Default: NOWHERE + // Enumeration values: + // MRS + DestinationType string `json:"destination_type"` + // Parameter list of the CloudTable to which data in the DIS stream will be dumped. + CloudTableDestinationDescriptor CloudTableDestinationDescriptorOpts `json:"cloudtable_destination_descriptor,omitempty"` +} + +func CreateCloudTableDumpTask(client *golangsdk.ServiceClient, opts CreateCloudTableDumpTaskOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/transfer-tasks + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "transfer-tasks"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} + +type CloudTableDestinationDescriptorOpts struct { + // Name of the dump task. + // The task name consists of letters, digits, hyphens (-), and underscores (_). + // It must be a string of 1 to 64 characters. + TaskName string `json:"task_name"` + // Name of the agency created on IAM. + // DIS uses an agency to access your specified resources. + // The parameters for creating an agency are as follows: + // - Agency Type: Cloud service + // - Cloud Service: DIS + // - Validity Period: unlimited + // - Scope: Global service, + // Project: OBS. Select the Tenant Administrator role for the global service project. + // If agencies have been created, you can obtain available agencies from the agency list by using the "Listing Agencies " API. + // This parameter cannot be left blank and the parameter value cannot exceed 64 characters. + // If there are dump tasks on the console, the system displays a message indicating that an agency will be automatically created. + // The name of the automatically created agency is dis_admin_agency. + // Maximum: 64 + AgencyName string `json:"agency_name"` + // User-defined interval at which data is imported from the current DIS stream into OBS. + // If no data is pushed to the DIS stream during the current interval, no dump file package will be generated. + // Value range: 30-900 + // Default value: 300 + // Unit: second + // Minimum: 30 + // Maximum: 900 + // Default: 300 + DeliverTimeInterval string `json:"deliver_time_interval"` + // Offset. + // LATEST: Maximum offset indicating that the latest data will be extracted. + // TRIM_HORIZON: Minimum offset indicating that the earliest data will be extracted. + // Default value: LATEST + // Default: LATEST + // Enumeration values: + // LATEST + // TRIM_HORIZON + ConsumerStrategy string `json:"consumer_strategy,omitempty"` + // Name of the CloudTable cluster to which data will be dumped. + // If you choose to dump data to OpenTSDB, OpenTSDB must be enabled for the cluster. + CloudTableClusterName string `json:"cloudtable_cluster_name"` + // ID of the CloudTable cluster to which data will be dumped. + // If you choose to dump data to OpenTSDB, OpenTSDB must be enabled for the cluster. + CloudTableClusterID string `json:"cloudtable_cluster_id"` + // HBase table name of the CloudTable cluster to which data will be dumped. + // The parameter is mandatory when data is dumped to the CloudTable HBase. + CloudTableTableName string `json:"cloudtable_table_name,omitempty"` + // Schema configuration of the CloudTable HBase data. + // You can set either this parameter or opentsdb_schema, but this parameter is mandatory when data will be dumped to HBase. + // After this parameter is set, the JSON data in the stream can be converted to another format and then be imported to the CloudTable HBase. + CloudtableSchema CloudtableSchema `json:"cloudtable_schema,omitempty"` + // Schema configuration of the CloudTable OpenTSDB data. + // You can set either this parameter or opentsdb_schema, but this parameter is mandatory when data will be dumped to OpenTSDB. + // After this parameter is set, the JSON data in the stream can be converted to another format and then be imported to the CloudTable OpenTSDB. + OpenTSDBSchema OpenTSDBSchema `json:"opentsdb_schema,omitempty"` + // Delimiter used to separate the user data that generates HBase row keys. + // Value range: , . | ; \ - _ and ~ + // Default value: . + CloudtableRowKeyDelimiter string `json:"cloudtable_row_key_delimiter,omitempty"` + // Name of the OBS bucket used to back up data that failed to be dumped to CloudTable. + OBSBackupBucketPath string `json:"obs_backup_bucket_path,omitempty"` + // Self-defined directory created in the OBS bucket and used to back up data that failed to be dumped to CloudTable. + // Directory levels are separated by slashes (/) and cannot start with slashes. + // Value range: a string of letters, digits, and underscores (_) + // The maximum length is 50 characters. + // This parameter is left empty by default. + BackupFilePrefix string `json:"backup_file_prefix,omitempty"` + // Time duration for DIS to retry if data fails to be dumped to CloudTable. + // If this threshold is exceeded, the data that fails to be dumped will be backed up to the OBS bucket/ backup_file_prefix / cloudtable_error or OBS bucket/ backup_file_prefix / opentsdb_error directory. + // Value range: 0-7,200 + // Unit: second + // Default value: 1,800 + RetryDuration string `json:"retry_duration,omitempty"` +} + +type CloudtableSchema struct { + // HBase rowkey schema used by the CloudTable cluster to convert JSON data into HBase rowkeys. + // Value range: 1-64 + RowKey []RowKey `json:"row_key"` + // HBase column schema used by the CloudTable cluster to convert JSON data into HBase columns. + // Value range: 1 to 4,096 + Columns []Column `json:"columns"` +} + +type RowKey struct { + // JSON attribute name, which is used to generate HBase rowkeys for JSON data in the DIS stream. + Value string `json:"value"` + // JSON attribute type of JSON data in the DIS stream. + // Value range: + // - Bigint + // - Double + // - Boolean + // - Timestamp + // - String + // - Decimal + // Enumeration values: + // Bigint + // Double + // Boolean + // Timestamp + // String + // Decimal + Type string `json:"type"` +} + +type Column struct { + // Name of the HBase column family to which data will be dumped. + ColumnFamilyName string `json:"column_family_name"` + // Name of the HBase column to which data will be dumped. + // Value range: a string of 1 to 32 characters, consisting of only letters, digits, and underscores (_) + ColumnName string `json:"column_name"` + // JSON attribute name, which is used to generate HBase column values for JSON data in the DIS stream. + Value string `json:"value"` + // JSON attribute type of JSON data in the DIS stream. + // Value range: + // - Bigint + // - Double + // - Boolean + // - Timestamp + // - String + // - Decimal + // Enumeration values: + // Bigint + // Double + // Boolean + // Timestamp + // String + // Decimal + Type string `json:"type"` +} + +type OpenTSDBSchema struct { + // Schema configuration of the OpenTSDB data metric in the CloudTable cluster. + // After this parameter is set, the JSON data in the stream can be converted to the metric of the OpenTSDB data. + Metric []OpenTSDBMetric `json:"metric"` + // Schema configuration of the OpenTSDB data timestamp in the CloudTable cluster. + // After this parameter is set, the JSON data in the stream can be converted to the timestamp of the OpenTSDB data. + Timestamp OpenTSDBTimestamp `json:"timestamp"` + // Schema configuration of the OpenTSDB data value in the CloudTable cluster. + // After this parameter is set, the JSON data in the stream can be converted to the value of the OpenTSDB data. + Value OpenTSDBValue `json:"value"` + // Schema configuration of the OpenTSDB data tags in the CloudTable cluster. + // After this parameter is set, the JSON data in the stream can be converted to the tags of the OpenTSDB data. + Tags []OpenTSDBTags `json:"tags"` +} + +type OpenTSDBMetric struct { + // When type is set to Constant, the value of metric is the value of Value. + // When value is set to String, the value of metric is the value of the JSON attribute of the user data in the stream. + // Enumeration values: + // Constant + // String + Type string `json:"type"` + // Constant value or JSON attribute name of the user data in the stream. + // This value is 1 to 32 characters long. + // Only letters, digits, and periods (.) are allowed. + Value string `json:"value"` +} + +type OpenTSDBTimestamp struct { + // When type is set to Timestamp, the value type of the JSON attribute of the user data in the stream is Timestamp, and the timestamp of OpenTSDB can be generated without converting the data format. + // When type is set to String, the value type of the JSON attribute of the user data in the stream is Date, and the timestamp of OpenTSDB can be generated only after the data format is converted. + Type string `json:"type"` + // JSON attribute name of the user data in the stream. + // Value range: a string of 1 to 32 characters, consisting of only letters, digits, and underscores (_) + Value string `json:"value"` + // This parameter is mandatory when type is set to String. + // When the value type of the JSON attribute of the user data in the stream is Date, format is required to convert the data format to generate the timestamp of OpenTSDB. + // Value range: + // - yyyy/MM/dd HH:mm:ss + // - MM/dd/yyyy HH:mm:ss + // - dd/MM/yyyy HH:mm:ss + // - yyyy-MM-dd HH:mm:ss + // - MM-dd-yyyy HH:mm:ss + // - dd-MM-yyyy HH:mm:ss + // Enumeration values: + // yyyy/MM/dd HH:mm:ss + // MM/dd/yyyy HH:mm:ss + // dd/MM/yyyy HH:mm:ss + // yyyy-MM-dd HH:mm:ss + // MM-dd-yyyy HH:mm:ss + // dd-MM-yyyy HH:mm:ss + Format string `json:"format"` +} + +type OpenTSDBValue struct { + // Dump destination. + // Possible values: + // Value range: + // - Bigint + // - Double + // - Boolean + // - Timestamp + // - String + // - Decimal + Type string `json:"type"` + // Constant value or JSON attribute name of the user data in the stream. + // Value range: a string of 1 to 32 characters, consisting of only letters, digits, and underscores (_) + Value string `json:"value"` +} + +type OpenTSDBTags struct { + // Tag name of the OpenTSDB data that stores the data in the stream. + // Value range: a string of 1 to 32 characters, consisting of only letters, digits, and underscores (_) + Name string `json:"name"` + // Type name of the JSON attribute of the user data in the stream. + // Value range: + // - Bigint + // - Double + // - Boolean + // - Timestamp + // - String + // - Decimal + Type string `json:"type"` + // Constant value or JSON attribute name of the user data in the stream. + // Value range: a string of 1 to 32 characters, consisting of only letters, digits, and underscores (_) + Value string `json:"value"` +} diff --git a/openstack/dis/v2/dump/CreateDLIDumpTask.go b/openstack/dis/v2/dump/CreateDLIDumpTask.go new file mode 100644 index 000000000..d24f5bae8 --- /dev/null +++ b/openstack/dis/v2/dump/CreateDLIDumpTask.go @@ -0,0 +1,97 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreateDLIDumpTaskOpts struct { + // Name of the stream. + // Maximum: 60 + StreamName string + // Dump destination. + // Possible values: + // - OBS: Data is dumped to OBS. + // - MRS: Data is dumped to MRS. + // - DLI: Data is dumped to DLI. + // - CLOUDTABLE: Data is dumped to CloudTable. + // - DWS: Data is dumped to DWS. + // Default: NOWHERE + // Enumeration values: + // DLI + DestinationType string `json:"destination_type"` + // Parameter list of the DLI to which data in the DIS stream will be dumped. + DLIDestinationDescriptor DLIDestinationDescriptorOpts `json:"mrs_destination_descriptor,omitempty"` +} + +func CreateDLIDumpTask(client *golangsdk.ServiceClient, opts CreateDLIDumpTaskOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/transfer-tasks + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "transfer-tasks"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} + +type DLIDestinationDescriptorOpts struct { + // Name of the dump task. + // The task name consists of letters, digits, hyphens (-), and underscores (_). + // It must be a string of 1 to 64 characters. + TaskName string `json:"task_name"` + // Name of the agency created on IAM. + // DIS uses an agency to access your specified resources. + // The parameters for creating an agency are as follows: + // - Agency Type: Cloud service + // - Cloud Service: DIS + // - Validity Period: unlimited + // - Scope: Global service, + // Project: OBS. Select the Tenant Administrator role for the global service project. + // If agencies have been created, you can obtain available agencies from the agency list by using the "Listing Agencies " API. + // This parameter cannot be left blank and the parameter value cannot exceed 64 characters. + // If there are dump tasks on the console, the system displays a message indicating that an agency will be automatically created. + // The name of the automatically created agency is dis_admin_agency. + // Maximum: 64 + AgencyName string `json:"agency_name"` + // User-defined interval at which data is imported from the current DIS stream into OBS. + // If no data is pushed to the DIS stream during the current interval, no dump file package will be generated. + // Value range: 30-900 + // Default value: 300 + // Unit: second + // Minimum: 30 + // Maximum: 900 + // Default: 300 + DeliverTimeInterval string `json:"deliver_time_interval"` + // Offset. + // LATEST: Maximum offset indicating that the latest data will be extracted. + // TRIM_HORIZON: Minimum offset indicating that the earliest data will be extracted. + // Default value: LATEST + // Default: LATEST + // Enumeration values: + // LATEST + // TRIM_HORIZON + ConsumerStrategy string `json:"consumer_strategy,omitempty"` + // Name of the DLI database to which data in the DIS stream will be dumped. + DLIDatabaseName string `json:"dli_database_name"` + // Name of the DLI table to which data in the DIS stream will be dumped. + // Note: + // Only tables whose data location is DLI are supported, and you must have the permission to insert data into the tables. + DLITableName string `json:"dli_table_name"` + // Name of the OBS bucket used to temporarily store data in the DIS stream. + OBSBucketPath string `json:"obs_bucket_path"` + // Self-defined directory created in the OBS bucket and used to temporarily store data in the DIS stream. + // Directory levels are separated by slashes (/) and cannot start with slashes. + // The value can contain a maximum of 50 characters, including letters, digits, underscores (_), and slashes (/). + // This parameter is left empty by default. + FilePrefix string `json:"file_prefix,omitempty"` + // Time duration for DIS to retry if data fails to be dumped to DLI. + // If the retry time exceeds the value of this parameter, the data that fails to be dumped is backed up to the OBS bucket_path/file_prefix/mrs_error directory + // Value range: 0-7,200 + // Unit: second + // Default value: 1,800 + // If this parameter is set to 0, DIS does not retry when the dump fails. + RetryDuration string `json:"retry_duration,omitempty"` +} diff --git a/openstack/dis/v2/dump/CreateDWSDumpTask.go b/openstack/dis/v2/dump/CreateDWSDumpTask.go new file mode 100644 index 000000000..4ec6a0a0a --- /dev/null +++ b/openstack/dis/v2/dump/CreateDWSDumpTask.go @@ -0,0 +1,161 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreateDWSDumpTaskOpts struct { + // Name of the stream. + // Maximum: 60 + StreamName string + // Dump destination. + // Possible values: + // - OBS: Data is dumped to OBS. + // - MRS: Data is dumped to MRS. + // - DLI: Data is dumped to DLI. + // - CLOUDTABLE: Data is dumped to CloudTable. + // - DWS: Data is dumped to DWS. + // Default: NOWHERE + // Enumeration values: + // DWS + DestinationType string `json:"destination_type"` + // Parameter list of the DWS to which data in the DIS stream will be dumped. + DWSDestinationDescriptor DWSDestinationDescriptorOpts `json:"dws_destination_descriptor,omitempty"` +} + +func CreateDWSDumpTask(client *golangsdk.ServiceClient, opts CreateDWSDumpTaskOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/transfer-tasks + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "transfer-tasks"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} + +type DWSDestinationDescriptorOpts struct { + // Name of the dump task. + // The task name consists of letters, digits, hyphens (-), and underscores (_). + // It must be a string of 1 to 64 characters. + TaskName string `json:"task_name"` + // Name of the agency created on IAM. + // DIS uses an agency to access your specified resources. + // The parameters for creating an agency are as follows: + // - Agency Type: Cloud service + // - Cloud Service: DIS + // - Validity Period: unlimited + // - Scope: Global service, + // Project: OBS. Select the Tenant Administrator role for the global service project. + // If agencies have been created, you can obtain available agencies from the agency list by using the "Listing Agencies " API. + // This parameter cannot be left blank and the parameter value cannot exceed 64 characters. + // If there are dump tasks on the console, the system displays a message indicating that an agency will be automatically created. + // The name of the automatically created agency is dis_admin_agency. + // Maximum: 64 + AgencyName string `json:"agency_name"` + // User-defined interval at which data is imported from the current DIS stream into OBS. + // If no data is pushed to the DIS stream during the current interval, no dump file package will be generated. + // Value range: 30-900 + // Default value: 300 + // Unit: second + // Minimum: 30 + // Maximum: 900 + // Default: 300 + DeliverTimeInterval string `json:"deliver_time_interval"` + // Offset. + // LATEST: Maximum offset indicating that the latest data will be extracted. + // TRIM_HORIZON: Minimum offset indicating that the earliest data will be extracted. + // Default value: LATEST + // Default: LATEST + // Enumeration values: + // LATEST + // TRIM_HORIZON + ConsumerStrategy string `json:"consumer_strategy,omitempty"` + // Name of the DWS cluster that stores the data in the stream. + DWSClusterName string `json:"dws_cluster_name"` + // ID of the DWS cluster to which will be dumped. + DWSClusterID string `json:"dws_cluster_id"` + // Name of the DWS database that stores the data in the stream. + DWSDatabaseName string `json:"dws_database_name"` + // Schema of the DWS database to which data will be dumped. + DWSSchema string `json:"dws_schema"` + // Name of the DWS table that stores the data in the stream. + DWSTableName string `json:"dws_table_name"` + // Delimiter used to separate the columns in the DWS tables. + // The value can be a comma (,), semicolon (;), or vertical bar (|). + DWSDelimiter string `json:"dws_delimiter"` + // Username of the DWS database to which data will be dumped. + UserName string `json:"user_name"` + // Password of the DWS database to which data will be dumped. + UserPassword string `json:"user_password"` + // Key created in Key Management Service (KMS) and used to encrypt the password of the DWS database. + KMSUserKeyName string `json:"kms_user_key_name"` + // ID of the key created in KMS and used to encrypt the password of the DWS database. + KMSUserKeyID string `json:"kms_user_key_id"` + // Name of the OBS bucket used to temporarily store data in the DIS stream. + OBSBucketPath string `json:"obs_bucket_path"` + // User-defined directory created in the OBS bucket and used to temporarily store data in the DIS stream. + // Directory levels are separated by slashes (/) and cannot start with slashes. + // The value can contain a maximum of 50 characters, including letters, digits, underscores (_), and slashes (/). + // This parameter is left empty by default. + FilePrefix string `json:"file_prefix,omitempty"` + // Duration when you can constantly retry dumping data to DWS after the dump fails. + // If the dump time exceeds the value of this parameter, the data that fails to be dumped to DWS will be backed up to the OBS bucket_path/file_prefix/dws_error directory. + // Value range: 0-7,200 + // Unit: second + // Default value: 1,800 + RetryDuration string `json:"retry_duration,omitempty"` + // Column to be dumped to the DWS table. + // If the value is null or empty, all columns are dumped by default. + // For example, c1,c2 indicates that columns c1 and c2 in the schema are dumped to DWS. + // This parameter is left blank by default. + DWSTableColumns string `json:"dws_table_columns,omitempty"` + // DWS fault tolerance option (used to specify various parameters of foreign table data). + Options Options `json:"options,omitempty"` +} + +type Options struct { + // Specifies whether to set the field to Null or enable an error message to be displayed in the error table when the last field in a row of the data source file is missing during database import. + // Value range: + // - true/on + // - false/off + // Default value: false/off + // Enumeration values: + // true/on + // false/off + FillMissingFields string `json:"fill_missing_fields,omitempty"` + // Specifies whether to ignore excessive columns when the number of columns in a source data file exceeds that defined in the foreign table. + // This parameter is used only during data import. + // Value range: + // - true/on + // - false/off + // Default value: false/off + // Enumeration values: + // true/on + // false/off + IgnoreExtraData string `json:"ignore_extra_data,omitempty"` + // Specifies whether to tolerate invalid characters during data import. + // Specifies whether to convert invalid characters based on the conversion rule and import them to the database, or to report an error and stop the import. + // Value range: + // - true/on + // - false/off + // Default value: false/off + // Enumeration values: + // true/on + // false/off + CompatibleIllegalChars string `json:"compatible_illegal_chars,omitempty"` + // Maximum number of data format errors allowed during the data import. + // If the number of data format errors does not reach the maximum, the data import is successful. + // Value range: + // - integer + // - unlimited + // Default value: 0, + // indicating that error information is returned immediately + RejectLimit string `json:"reject_limit,omitempty"` + // Name of the error table that records data format errors. + // After the parallel import is complete, you can query the error information table to obtain the detailed error information. + ErrorTableName string `json:"error_table_name,omitempty"` +} diff --git a/openstack/dis/v2/dump/CreateMRSDumpTask.go b/openstack/dis/v2/dump/CreateMRSDumpTask.go new file mode 100644 index 000000000..809ab63ed --- /dev/null +++ b/openstack/dis/v2/dump/CreateMRSDumpTask.go @@ -0,0 +1,104 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreateMRSDumpTaskOpts struct { + // Name of the stream. + // Maximum: 60 + StreamName string + // Dump destination. + // Possible values: + // - OBS: Data is dumped to OBS. + // - MRS: Data is dumped to MRS. + // - DLI: Data is dumped to DLI. + // - CLOUDTABLE: Data is dumped to CloudTable. + // - DWS: Data is dumped to DWS. + // Default: NOWHERE + // Enumeration values: + // MRS + DestinationType string `json:"destination_type"` + // Parameter list of the MRS to which data in the DIS stream will be dumped. + MRSDestinationDescriptor MRSDestinationDescriptorOpts `json:"mrs_destination_descriptor,omitempty"` +} + +func CreateMRSDumpTask(client *golangsdk.ServiceClient, opts CreateMRSDumpTaskOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/transfer-tasks + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "transfer-tasks"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} + +type MRSDestinationDescriptorOpts struct { + // Name of the dump task. + // The task name consists of letters, digits, hyphens (-), and underscores (_). + // It must be a string of 1 to 64 characters. + TaskName string `json:"task_name"` + // Name of the agency created on IAM. + // DIS uses an agency to access your specified resources. + // The parameters for creating an agency are as follows: + // - Agency Type: Cloud service + // - Cloud Service: DIS + // - Validity Period: unlimited + // - Scope: Global service, + // Project: OBS. Select the Tenant Administrator role for the global service project. + // If agencies have been created, you can obtain available agencies from the agency list by using the "Listing Agencies " API. + // This parameter cannot be left blank and the parameter value cannot exceed 64 characters. + // If there are dump tasks on the console, the system displays a message indicating that an agency will be automatically created. + // The name of the automatically created agency is dis_admin_agency. + // Maximum: 64 + AgencyName string `json:"agency_name"` + // User-defined interval at which data is imported from the current DIS stream into OBS. + // If no data is pushed to the DIS stream during the current interval, no dump file package will be generated. + // Value range: 30-900 + // Default value: 300 + // Unit: second + // Minimum: 30 + // Maximum: 900 + // Default: 300 + DeliverTimeInterval string `json:"deliver_time_interval"` + // Offset. + // LATEST: Maximum offset indicating that the latest data will be extracted. + // TRIM_HORIZON: Minimum offset indicating that the earliest data will be extracted. + // Default value: LATEST + // Default: LATEST + // Enumeration values: + // LATEST + // TRIM_HORIZON + ConsumerStrategy string `json:"consumer_strategy,omitempty"` + // Name of the MRS cluster that stores the data in the stream. + // Note: + // Only MRS clusters with non-Kerberos authentication are supported + MRSClusterName string `json:"mrs_cluster_name"` + // ID of the MRS cluster to which data in the DIS stream will be dumped. + MRSClusterID string `json:"mrs_cluster_id"` + // Hadoop Distributed File System (HDFS) path of the MRS cluster to which data in the DIS stream will be dumped. + MRSHdfsPatch string `json:"mrs_hdfs_patch"` + // Self-defined directory created in the OBS bucket and used to temporarily store data in the DIS stream. + // Directory levels are separated by slashes (/) and cannot start with slashes. + // The value can contain a maximum of 50 characters, including letters, digits, underscores (_), and slashes (/). + // This parameter is left empty by default. + FilePrefix string `json:"file_prefix,omitempty"` + // Directory to store files that will be dumped to the chosen MRS cluster. + // Different directory levels are separated by slash (/). + // Value range: + // a string of 0 to 50 characters This parameter is left empty by default. + HDFSPrefixFolder string `json:"hdfs_prefix_folder,omitempty"` + // Name of the OBS bucket used to temporarily store data in the DIS stream. + OBSBucketPath string `json:"obs_bucket_path"` + // Time duration for DIS to retry if data fails to be dumped. + // If the retry time exceeds the value of this parameter, the data that fails to be dumped is backed up to the OBS bucket_path/file_prefix/mrs_error directory + // Value range: 0-7,200 + // Unit: second + // Default value: 1,800 + // If this parameter is set to 0, DIS does not retry when the dump fails. + RetryDuration string `json:"retry_duration,omitempty"` +} diff --git a/openstack/dis/v2/dump/CreateOBSDumpTask.go b/openstack/dis/v2/dump/CreateOBSDumpTask.go new file mode 100644 index 000000000..0c20ccc3c --- /dev/null +++ b/openstack/dis/v2/dump/CreateOBSDumpTask.go @@ -0,0 +1,151 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreateOBSDumpTaskOpts struct { + // Name of the stream. + // Maximum: 60 + StreamName string + // Dump destination. + // Possible values: + // - OBS: Data is dumped to OBS. + // Default: NOWHERE + // Enumeration values: + // OBS + DestinationType string `json:"destination_type"` + // Parameter list of OBS to which data in the DIS stream will be dumped. + OBSDestinationDescriptor OBSDestinationDescriptorOpts `json:"obs_destination_descriptor,omitempty"` +} + +func CreateOBSDumpTask(client *golangsdk.ServiceClient, opts CreateOBSDumpTaskOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/transfer-tasks + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "transfer-tasks"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{201}, + }) + return err +} + +type OBSDestinationDescriptorOpts struct { + // Name of the dump task. + // The task name consists of letters, digits, hyphens (-), and underscores (_). + // It must be a string of 1 to 64 characters. + TaskName string `json:"task_name"` + // Name of the agency created on IAM. + // DIS uses an agency to access your specified resources. + // The parameters for creating an agency are as follows: + // - Agency Type: Cloud service + // - Cloud Service: DIS + // - Validity Period: unlimited + // - Scope: Global service, + // Project: OBS. Select the Tenant Administrator role for the global service project. + // If agencies have been created, you can obtain available agencies from the agency list by using the "Listing Agencies " API. + // This parameter cannot be left blank and the parameter value cannot exceed 64 characters. + // If there are dump tasks on the console, the system displays a message indicating that an agency will be automatically created. + // The name of the automatically created agency is dis_admin_agency. + // Maximum: 6 + AgencyName string `json:"agency_name"` + // User-defined interval at which data is imported from the current DIS stream into OBS. + // If no data is pushed to the DIS stream during the current interval, no dump file package will be generated. + // Value range: 30-900 + // Default value: 300 + // Unit: second + // Minimum: 30 + // Maximum: 900 + // Default: 300 + DeliverTimeInterval *int `json:"deliver_time_interval"` + // Offset. + // LATEST: Maximum offset indicating that the latest data will be extracted. + // TRIM_HORIZON: Minimum offset indicating that the earliest data will be extracted. + // Default value: LATEST + // Default: LATEST + // Enumeration values: + // LATEST + // TRIM_HORIZON + ConsumerStrategy string `json:"consumer_strategy,omitempty"` + // Directory to store files that will be dumped to OBS. + // Different directory levels are separated by slashes (/) and cannot start with slashes. + // The value can contain a maximum of 50 characters, including letters, digits, underscores (_), and slashes (/). + // This parameter is left empty by default. + // Maximum: 50 + FilePrefix string `json:"file_prefix,omitempty"` + // Directory structure of the object file written into OBS. + // The directory structure is in the format of yyyy/MM/dd/HH/mm (time at which the dump task was created). + // - N/A: Leave this parameter empty, indicating that the date and time directory is not used. + // - yyyy: year + // - yyyy/MM: year/month + // - yyyy/MM/dd: year/month/day + // - yyyy/MM/dd/HH: year/month/day/hour + // - yyyy/MM/dd/HH/mm: year/month/day/hour/minute + // Example: in 2017/11/10/14/49, the directory structure is 2017 > 11 > 10 > 14 > 49. 2017 indicates the outermost folder. + // Default value: empty. + // Note: + // After data is successfully dumped, the directory structure is obs_bucket_path/file_prefix/partition_format. + // Enumeration values: + // yyyy + // yyyy/MM + // yyyy/MM/dd + // yyyy/MM/dd/HH + // yyyy/MM/dd/HH/mm + PartitionFormat string `json:"partition_format,omitempty"` + // Name of the OBS bucket used to store data from the DIS stream. + OBSBucketPath string `json:"obs_bucket_path,omitempty"` + // Dump file format. + // Possible values: + // - Text (default) + // - Parquet + // - CarbonData + // Note: + // You can select Parquet or CarbonData only when Source Data Type is set to JSON an Dump Destination is set to OBS. + // Default: text + // Enumeration values: + // text + // parquet + // carbon + DestinationFileType string `json:"destination_file_type,omitempty"` + // Dump time directory generated based on the timestamp of the source data and the configured partition_format. + // Directory structure of the object file written into OBS. + // The directory structure is in the format of yyyy/MM/dd/HH/mm. + ProcessingSchema ProcessingSchema `json:"processing_schema,omitempty"` + // Delimiter for the dump file which is used to separate the user data that is written into the dump file + // Value range: + // - Comma (,), which is the default value + // - Semicolon (;) + // - Vertical bar (|) + // - Newline character (\n) + // Default: \n + RecordDelimiter string `json:"record_delimiter,omitempty"` +} + +type ProcessingSchema struct { + // Attribute name of the source data timestamp. + TimestampName string `json:"timestamp_name"` + // Type of the source data timestamp. + // - String + // - Timestamp: 13-bit timestamp of the long type + TimestampType string `json:"timestamp_type"` + // OBS directory generated based on the timestamp format. + // This parameter is mandatory when the timestamp type of the source data is String. + // Value range: + // - yyyy/MM/dd HH:mm:ss + // - MM/dd/yyyy HH:mm:ss + // - dd/MM/yyyy HH:mm:ss + // - yyyy-MM-dd HH:mm:ss + // - MM-dd-yyyy HH:mm:ss + // - dd-MM-yyyy HH:mm:ss + // Enumeration values: + // yyyy/MM/dd HH:mm:ss + // MM/dd/yyyy HH:mm:ss + // dd/MM/yyyy HH:mm:ss + // yyyy-MM-dd HH:mm:ss + // MM-dd-yyyy HH:mm:ss + // dd-MM-yyyy HH:mm:ss + TimestampFormat string `json:"timestamp_format,omitempty"` +} diff --git a/openstack/dis/v2/dump/DeleteTransferTask.go b/openstack/dis/v2/dump/DeleteTransferTask.go new file mode 100644 index 000000000..fade4157d --- /dev/null +++ b/openstack/dis/v2/dump/DeleteTransferTask.go @@ -0,0 +1,16 @@ +package dump + +import golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + +type DeleteTransferTaskOpts struct { + // Name of the stream. + StreamName string + // Name of the dump task to be deleted. + TaskName string +} + +func DeleteTransferTask(client *golangsdk.ServiceClient, opts DeleteTransferTaskOpts) (err error) { + // DELETE /v2/{project_id}/streams/{stream_name}/transfer-tasks/{task_name} + _, err = client.Delete(client.ServiceURL("streams", opts.StreamName, "transfer-tasks", opts.TaskName), nil) + return +} diff --git a/openstack/dis/v2/dump/GetTransferTask.go b/openstack/dis/v2/dump/GetTransferTask.go new file mode 100644 index 000000000..5c07e1e9d --- /dev/null +++ b/openstack/dis/v2/dump/GetTransferTask.go @@ -0,0 +1,103 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetTransferTaskOpts struct { + // Name of the stream. + StreamName string + // Name of the dump task to be deleted. + TaskName string +} + +func GetTransferTask(client *golangsdk.ServiceClient, opts GetTransferTaskOpts) (*GetTransferTaskResponse, error) { + // GET /v2/{project_id}/streams/{stream_name}/transfer-tasks/{task_name} + raw, err := client.Get(client.ServiceURL("streams", opts.StreamName, "transfer-tasks", opts.TaskName), nil, nil) + if err != nil { + return nil, err + } + + var res GetTransferTaskResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetTransferTaskResponse struct { + // Name of the stream to which the dump task belongs. + StreamName string `json:"stream_name,omitempty"` + // Name of the dump task. + TaskName string `json:"task_name,omitempty"` + // Id of the dump task + TaskId string `json:"task_id,omitempty"` + // Dump task status. + // Possible values: + // - ERROR: An error occurs. + // - STARTING: The dump task is being started. + // - PAUSED: The dump task has been stopped. + // - RUNNING: The dump task is running. + // - DELETE: The dump task has been deleted. + // - ABNORMAL: The dump task is abnormal. + // Enumeration values: + // ERROR + // STARTING + // PAUSED + // RUNNING + // DELETE + // ABNORMAL + State string `json:"state,omitempty"` + // Dump destination. + // Possible values: + // - OBS: Data is dumped to OBS. + // - MRS: Data is dumped to MRS. + // - DLI: Data is dumped to DLI. + // - CLOUDTABLE: Data is dumped to CloudTable. + // - DWS: Data is dumped to DWS. + // Enumeration values: + // OBS + // MRS + // DLI + // CLOUDTABLE + // DWS + DestinationType string `json:"destination_type,omitempty"` + // Time when the dump task is created. + CreateTime int64 `json:"create_time,omitempty"` + // Latest dump time of the dump task. + LastTransferTimestamp int64 `json:"last_transfer_timestamp,omitempty"` + // List of partition dump details. + Partitions []PartitionResult `json:"partitions,omitempty"` + // Parameter list of OBS to which data in the DIS stream will be dumped. + OBSDestinationDescription OBSDestinationDescriptorOpts `json:"obs_destination_description,omitempty"` + // Parameter list of the DWS to which data in the DIS stream will be dumped. + DWSDestinationDescription DWSDestinationDescriptorOpts `json:"dws_destination_description,omitempty"` + // Parameter list of the MRS to which data in the DIS stream will be dumped. + MRSDestinationDescription MRSDestinationDescriptorOpts `json:"mrs_destination_description,omitempty"` + // Parameter list of the DLI to which data in the DIS stream will be dumped. + DLIDestinationDescription DLIDestinationDescriptorOpts `json:"dli_destination_description,omitempty"` + // Parameter list of the CloudTable to which data in the DIS stream will be dumped. + CloudTableDestinationDescription CloudTableDestinationDescriptorOpts `json:"cloud_table_destination_description,omitempty"` +} + +type PartitionResult struct { + // Current status of the partition. + // Possible values: + // - CREATING: The stream is being created. + // - ACTIVE: The stream is available. + // - DELETED: The stream is being deleted. + // - EXPIRED: The stream has expired. + // Enumeration values: + // CREATING + // ACTIVE + // DELETED + // EXPIRED + Status string `json:"status,omitempty"` + // Unique identifier of the partition. + PartitionId string `json:"partition_id,omitempty"` + // Possible value range of the hash key used by the partition. + HashRange string `json:"hash_range,omitempty"` + // Sequence number range of the partition. + SequenceNumberRange string `json:"sequence_number_range,omitempty"` + // Parent partition. + ParentPartitions string `json:"parent_partitions,omitempty"` +} diff --git a/openstack/dis/v2/dump/ListTransferTasks.go b/openstack/dis/v2/dump/ListTransferTasks.go new file mode 100644 index 000000000..05420ab55 --- /dev/null +++ b/openstack/dis/v2/dump/ListTransferTasks.go @@ -0,0 +1,56 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +func ListTransferTasks(client *golangsdk.ServiceClient, streamName string) (*ListTransferTasksResponse, error) { + // GET /v2/{project_id}/streams/{stream_name}/transfer-tasks + raw, err := client.Get(client.ServiceURL("streams", streamName, "transfer-tasks"), nil, nil) + if err != nil { + return nil, err + } + + var res ListTransferTasksResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type ListTransferTasksResponse struct { + // Total number of dump tasks. + TotalNumber *int `json:"total_number,omitempty"` + // List of dump tasks. + Tasks []TransferTask `json:"tasks,omitempty"` +} + +type TransferTask struct { + // Name of the dump task. + TaskName string `json:"task_name,omitempty"` + // Id of the dump task + TaskId string `json:"task_id,omitempty"` + // Dump task status. Possible values: + // ERROR: An error occurs. + // STARTING: The dump task is being started. + // PAUSED: The dump task has been stopped. + // RUNNING: The dump task is running. + // DELETE: The dump task has been deleted. + // ABNORMAL: The dump task is abnormal. + // Enumeration values: + // ERROR + // STARTING + // PAUSED + // RUNNING + // DELETE + // ABNORMAL + State string `json:"state,omitempty"` + // Dump destination. Possible values: + // OBS: Data is dumped to OBS. + // Enumeration values: + // OBS + DestinationType string `json:"destination_type,omitempty"` + // Time when the dump task is created. + CreateTime *int64 `json:"create_time,omitempty"` + // Latest dump time of the dump task. + LastTransferTimestamp *int64 `json:"last_transfer_timestamp,omitempty"` +} diff --git a/openstack/dis/v2/dump/TransferTaskAction.go b/openstack/dis/v2/dump/TransferTaskAction.go new file mode 100644 index 000000000..729ae562b --- /dev/null +++ b/openstack/dis/v2/dump/TransferTaskAction.go @@ -0,0 +1,41 @@ +package dump + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type TransferTaskActionOpts struct { + // Name of the stream to be queried. + // Maximum: 60 + StreamName string `json:"stream_name"` + // Dump task operation. + // Currently, only the following operation is supported: + // - stop: The dump task is stopped. + // Enumeration values: + // stop + // - start: The dump task is started. + // Enumeration values: + // start + Action string `json:"action"` + // List of dump tasks to be paused. + Tasks []BatchTransferTask `json:"tasks"` +} + +type BatchTransferTask struct { + // Dump task ID. + Id string `json:"id"` +} + +func TransferTaskAction(client *golangsdk.ServiceClient, opts TransferTaskActionOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/transfer-tasks/action + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "transfer-tasks", "action"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} diff --git a/openstack/dis/v2/monitors/GetPartitionMonitor.go b/openstack/dis/v2/monitors/GetPartitionMonitor.go new file mode 100644 index 000000000..372af713c --- /dev/null +++ b/openstack/dis/v2/monitors/GetPartitionMonitor.go @@ -0,0 +1,65 @@ +package monitors + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetPartitionMonitorOpts struct { + // Name of the stream to be queried. + // Maximum: 60 + StreamName string + // Partition No. + // The value can be in either of the following formats: + // - shardId-0000000000 + // - 0 + // For example, if a stream has three partitions, the partition identifiers are 0, 1, and 2, or + // shardId-0000000000, + // shardId-0000000001, + // and shardId-0000000002, + // respectively. + PartitionId string + // Partition monitoring metric. + // (Either label or label_list must be specified. + // If both label_list and label are specified, label_list prevails.) + // - total_put_bytes_per_stream: total input traffic (byte) + // - total_get_bytes_per_stream: total output traffic (byte) + // - total_put_records_per_stream: total number of input records + // - total_get_records_per_stream: total number of output records + // Enumeration values: + // total_put_bytes_per_stream + // total_get_bytes_per_stream + // total_put_records_per_stream + // total_get_records_per_stream + Label string `q:"label,omitempty"` + // List of labels separated by commas (,) to query multiple labels in batches. + // (Either label or label_list must be specified. + // If both label_list and label exist, label_list prevails.) + LabelList string `q:"label_list,omitempty"` + // Monitoring start time, which is a 10-digit timestamp. + StartTime int64 `q:"start_time"` + // Monitoring end time, which is a 10-digit timestamp. + EndTime int64 `q:"end_time"` +} + +func GetPartitionMonitor(client *golangsdk.ServiceClient, opts GetPartitionMonitorOpts) (*GetPartitionMonitorResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/streams/{stream_name}/partitions/{partition_id}/metrics + raw, err := client.Get(client.ServiceURL("streams", opts.StreamName, "partitions", opts.PartitionId, "metrics")+q.String(), nil, nil) + if err != nil { + return nil, err + } + + var res GetPartitionMonitorResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetPartitionMonitorResponse struct { + // Data object. + Metrics Metrics `json:"metrics,omitempty"` +} diff --git a/openstack/dis/v2/monitors/GetStreamMonitor.go b/openstack/dis/v2/monitors/GetStreamMonitor.go new file mode 100644 index 000000000..7924875bd --- /dev/null +++ b/openstack/dis/v2/monitors/GetStreamMonitor.go @@ -0,0 +1,83 @@ +package monitors + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +type GetStreamMonitorOpts struct { + // Name of the stream to be queried. + // Maximum: 60 + StreamName string + // Stream monitoring metric. + // (Either label or label_list must be specified. + // If both label_list and label are specified, label_list prevails.) + // - total_put_bytes_per_stream: total input traffic (byte) + // - total_get_bytes_per_stream: total output traffic (byte) + // - total_put_records_per_stream: total number of input records + // - total_get_records_per_stream: total number of output records + // - total_put_req_latency: average processing time of upload requests (millisecond) + // - total_get_req_latency: average processing time of download requests (millisecond) + // - total_put_req_suc_per_stream: number of successful upload requests + // - total_get_req_suc_per_stream: number of successful download requests + // - traffic_control_put: number of rejected upload requests due to flow control + // - traffic_control_get: number of rejected download requests due to flow control + // Enumeration values: + // total_put_bytes_per_stream + // total_get_bytes_per_stream + // total_put_records_per_stream + // total_get_records_per_stream + // total_put_req_latency + // total_get_req_latency + // total_put_req_suc_per_stream + // total_get_req_suc_per_stream + // traffic_control_put + // traffic_control_get + Label string `q:"label,omitempty"` + // List of labels separated by commas (,) to query multiple labels in batches. + // (Either label or label_list must be set. + // If both label_list and label exist, label_list prevails.) + LabelList string `q:"label_list,omitempty"` + // Monitoring start time, which is a 10-digit timestamp. + StartTime int64 `q:"start_time"` + // Monitoring end time, which is a 10-digit timestamp. + EndTime int64 `q:"end_time"` +} + +func GetStreamMonitor(client *golangsdk.ServiceClient, opts GetStreamMonitorOpts) (*GetStreamMonitorResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/streams/{stream_name}/metrics + raw, err := client.Get(client.ServiceURL("streams", opts.StreamName, "metrics")+q.String(), nil, nil) + if err != nil { + return nil, err + } + + var res GetStreamMonitorResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetStreamMonitorResponse struct { + // Data object. + Metrics Metrics `json:"metrics,omitempty"` + // List of monitored data objects. + MetricsList []Metrics `json:"metrics_list,omitempty"` +} + +type Metrics struct { + // Monitoring data. + DataPoints []DataPoint `json:"data_points,omitempty"` + // Metric + Label string `json:"label,omitempty"` +} + +type DataPoint struct { + // Timestamp. + Timestamp int64 `json:"timestamp,omitempty"` + // Monitoring value corresponding to the timestamp. + Value int64 `json:"value,omitempty"` +} diff --git a/openstack/dis/v2/streams/CreatePolicyRule.go b/openstack/dis/v2/streams/CreatePolicyRule.go new file mode 100644 index 000000000..2e9c4e7a2 --- /dev/null +++ b/openstack/dis/v2/streams/CreatePolicyRule.go @@ -0,0 +1,45 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type CreatePolicyRuleOpts struct { + // Name of the stream for which you want to add an authorization policy. + // Maximum: 64 + StreamName string `json:"stream_name" required:"true"` + // Unique ID of the stream. + StreamId string `json:"stream_id"` + // Authorized users. + // If the permission is granted to a specified tenant, the format is domainName.*. + // If the permission is granted to a specified sub-user of a tenant, the format is domainName.userName. + // Multiple accounts can be added and separated by commas (,), + // for example, domainName1.userName1,do mainName2.userName2. + PrincipalName string `json:"principal_name"` + // Authorization operation type. + // - putRecords: upload data. + // - getRecords: download data. + // Enumeration values: + // putRecords + // getRecords + ActionType string `json:"action_type"` + // Authorization impact type. + // - accept: The authorization operation is allowed. + // Enumeration values: + // - accept + Effect string `json:"effect"` +} + +func CreatePolicyRule(client *golangsdk.ServiceClient, opts CreatePolicyRuleOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams/{stream_name}/policies + _, err = client.Post(client.ServiceURL("streams", opts.StreamName, "policies"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} diff --git a/openstack/dis/v2/streams/CreateStream.go b/openstack/dis/v2/streams/CreateStream.go new file mode 100644 index 000000000..f2dcea270 --- /dev/null +++ b/openstack/dis/v2/streams/CreateStream.go @@ -0,0 +1,83 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/common/tags" +) + +type CreateStreamOpts struct { + // Name of the stream. + // The stream name can contain 1 to 64 characters, including letters, digits, underscores (_), and hyphens (-). + // Maximum: 64 + StreamName string `json:"stream_name"` + // Number of partitions. Partitions are the base throughput unit of a DIS stream. + PartitionCount int32 `json:"partition_count"` + // Stream type. + // COMMON: a common stream. The bandwidth is 1 MB/s. + // ADVANCED: an advanced stream. The bandwidth is 5 MB/s. + // Enumeration values: + // COMMON + // ADVANCED + StreamType string `json:"stream_type,omitempty"` + // Source data type. + // BLOB: a set of binary data stored in a database management system. + // Default value: BLOB + // Enumeration values: + // BLOB + DataType string `json:"data_type,omitempty"` + // Period of time for which data is retained in the stream. + // Value range: 24-72 + // Unit: hour + // Default value: 24 + // If this parameter is left blank, the default value is used. + // Maximum: 7 + // Default: 24 + DataDuration *int `json:"data_duration,omitempty"` + // Specifies whether to enable auto-scaling. + // true: Auto scaling is enabled. + // false: Auto scaling is disabled. + // This function is disabled by default. + // Default: false + AutoScaleEnabled *bool `json:"auto_scale_enabled,omitempty"` + // Minimum number of partitions for automatic scale-down when auto-scaling is enabled. + // Minimum: 1 + AutoScaleMinPartitionCount *int64 `json:"auto_scale_min_partition_count,omitempty"` + // Maximum number of partitions for automatic scale-up when auto-scaling is enabled. + AutoScaleMaxPartitionCount *int `json:"auto_scale_max_partition_count,omitempty"` + // Source data structure that defines JSON and CSV formats. + // It is described in the syntax of the Avro schema. + DataSchema string `json:"data_schema,omitempty"` + // Attributes of data in CSV format, such as delimiter. + CsvProperties CsvProperties `json:"csv_properties,omitempty"` + // Compression type of data. Currently, the value can be: + // snappy + // gzip + // zip + // Data is not compressed by default. + // Enumeration values: + // snappy + // gzip + // zip + CompressionFormat string `json:"compression_format,omitempty"` + // List of stream tags. + Tags []tags.ResourceTag `json:"tags,omitempty"` +} + +type CsvProperties struct { + // Data separator. + Delimiter string `json:"delimiter,omitempty"` +} + +func CreateStream(client *golangsdk.ServiceClient, opts CreateStreamOpts) error { + b, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // POST /v2/{project_id}/streams + _, err = client.Post(client.ServiceURL("streams"), b, nil, &golangsdk.RequestOpts{ + OkCodes: []int{201}, + }) + return err +} diff --git a/openstack/dis/v2/streams/DeleteStream.go b/openstack/dis/v2/streams/DeleteStream.go new file mode 100644 index 000000000..82825c3d1 --- /dev/null +++ b/openstack/dis/v2/streams/DeleteStream.go @@ -0,0 +1,11 @@ +package streams + +import golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + +func DeleteStream(client *golangsdk.ServiceClient, streamName string) (err error) { + // DELETE /v2/{project_id}/streams/{stream_name} + _, err = client.Delete(client.ServiceURL("streams", streamName), &golangsdk.RequestOpts{ + MoreHeaders: map[string]string{"Content-Type": "application/json"}, JSONBody: nil, + }) + return +} diff --git a/openstack/dis/v2/streams/GetPolicyRule.go b/openstack/dis/v2/streams/GetPolicyRule.go new file mode 100644 index 000000000..a5b6d356b --- /dev/null +++ b/openstack/dis/v2/streams/GetPolicyRule.go @@ -0,0 +1,46 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" +) + +func GetPolicyRule(client *golangsdk.ServiceClient, streamName string) (*GetPolicyRuleResponse, error) { + // GET /v2/{project_id}/streams/{stream_name}/policies + raw, err := client.Get(client.ServiceURL("streams", streamName, "policies"), nil, nil) + if err != nil { + return nil, err + } + + var res GetPolicyRuleResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type GetPolicyRuleResponse struct { + // Unique ID of the stream. + StreamId string `json:"stream_id,omitempty"` + // List of authorization information records. + Rules []PrincipalRule `json:"rules,omitempty"` +} + +type PrincipalRule struct { + // ID of the authorized user. + Principal string `json:"principal,omitempty"` + // Name of the authorized user. + // If the permission is granted to all sub-users of a tenant, the format is domainName.*. + // If the permission is granted to a specified sub-user of a tenant, the format is domainName.userName. + PrincipalName string `json:"principal_name,omitempty"` + // Authorization operation type. + // - putRecords: the data to be uploaded. + // - getRecords: Download data. + // Enumeration values: + // putRecords + // getRecords + ActionType string `json:"action_type,omitempty"` + // Authorization impact type. + // - accept: The authorization operation is allowed. + // Enumeration values: + // accept + Effect string `json:"effect,omitempty"` +} diff --git a/openstack/dis/v2/streams/GetStream.go b/openstack/dis/v2/streams/GetStream.go new file mode 100644 index 000000000..77f544265 --- /dev/null +++ b/openstack/dis/v2/streams/GetStream.go @@ -0,0 +1,156 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/common/tags" +) + +type GetStreamOpts struct { + // Stream to be queried. + // Maximum: 60 + StreamName string + // Name of the partition to start the partition list with. The returned partition list does not contain this partition. + StartPartitionId string `q:"start_partitionId,omitempty"` + // Maximum number of partitions to list in a single API call. Value range: 1-1,000 Default value: 100 + // Minimum: 1 + // Maximum: 1000 + // Default: 100 + LimitPartitions *int `q:"limit_partitions,omitempty"` +} + +func GetStream(client *golangsdk.ServiceClient, opts GetStreamOpts) (*DescribeStreamResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/streams/{stream_name} + raw, err := client.Get(client.ServiceURL("streams", opts.StreamName)+q.String(), nil, + &golangsdk.RequestOpts{ + MoreHeaders: map[string]string{"Content-Type": "application/json"}, JSONBody: nil, + }) + if err != nil { + return nil, err + } + + var res DescribeStreamResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type DescribeStreamResponse struct { + // Name of the stream. + StreamName string `json:"stream_name,omitempty"` + // Time when a stream is created. The value is a 13-bit timestamp. + CreateTime *int64 `json:"create_time,omitempty"` + // Time when a stream is the most recently modified. The value is a 13-bit timestamp. + LastModifiedTime *int64 `json:"last_modified_time,omitempty"` + // Current status of the stream. Possible values: + // CREATING: The stream is being created. + // RUNNING: The stream is running. + // TERMINATING: The stream is being deleted. + // TERMINATED: The stream has been deleted. + // Enumeration values: + // CREATING + // RUNNING + // TERMINATING + // FROZEN + Status string `json:"status,omitempty"` + // Stream type. + // COMMON: a common stream. The bandwidth is 1 MB/s. + // ADVANCED: an advanced stream. The bandwidth is 5 MB/s. + // Enumeration values: + // COMMON + // ADVANCED + StreamType string `json:"stream_type,omitempty"` + // A list of partitions that comprise the DIS stream. + Partitions []PartitionResult `json:"partitions,omitempty"` + // Specifies whether there are more matching partitions of the DIS stream to list. + // true: yes + // false: no + HasMorePartitions *bool `json:"has_more_partitions,omitempty"` + // Period for storing data in units of hours. + RetentionPeriod *int `json:"retention_period,omitempty"` + // Unique identifier of the stream. + StreamId string `json:"stream_id,omitempty"` + // Source data type + // BLOB: a set of binary data stored in a database management system. + // Default value: BLOB + // Enumeration values: + // BLOB + DataType string `json:"data_type,omitempty"` + // Source data structure that defines JSON and CSV formats. + // It is described in the syntax of the Avro schema. + // For details about Avro, + // go to http://avro.apache.org/docs/current/ + DataSchema string `json:"data_schema,omitempty"` + // Compression type of data. Currently, the value can be: + // snappy + // gzip + // zip + // Data is not compressed by default. + // Enumeration values: + // snappy + // gzip + // zip + CompressionFormat string `json:"compression_format,omitempty"` + // Attributes of data in CSV format, such as delimiter. + CsvProperties CsvProperties `json:"csv_properties,omitempty"` + // Total number of writable partitions (including partitions in ACTIVE state only). + WritablePartitionCount *int `json:"writable_partition_count,omitempty"` + // Total number of readable partitions (including partitions in ACTIVE and DELETED state). + ReadablePartitionCount *int `json:"readable_partition_count,omitempty"` + // List of scaling operation records. + UpdatePartitionCounts []UpdatePartitionCountResponse `json:"update_partition_counts,omitempty"` + // List of stream tags. + Tags []tags.ResourceTag `json:"tags,omitempty"` + // Specifies whether to enable auto-scaling. + // true: auto-scaling is enabled. + // false: auto-scaling is disabled. + // This function is disabled by default. + AutoScaleEnabled *bool `json:"auto_scale_enabled,omitempty"` + // Minimum number of partitions for automatic scale-down when auto-scaling is enabled. + AutoScaleMinPartitionCount *int `json:"auto_scale_min_partition_count,omitempty"` + // Maximum number of partitions for automatic scale-up when auto-scaling is enabled. + AutoScaleMaxPartitionCount *int `json:"auto_scale_max_partition_count,omitempty"` +} + +type PartitionResult struct { + // Current status of the partition. Possible values: + // CREATING: The stream is being created. + // ACTIVE: The stream is available. + // DELETED: The stream is being deleted. + // EXPIRED: The stream has expired. + // Enumeration values: + // CREATING + // ACTIVE + // DELETED + // EXPIRED + Status string `json:"status,omitempty"` + // Unique identifier of the partition. + PartitionId string `json:"partition_id,omitempty"` + // Possible value range of the hash key used by the partition. + HashRange string `json:"hash_range,omitempty"` + // Sequence number range of the partition. + SequenceNumberRange string `json:"sequence_number_range,omitempty"` + // Parent partition. + ParentPartitions string `json:"parent_partitions,omitempty"` +} + +type UpdatePartitionCountResponse struct { + // Scaling execution timestamp, which is a 13-digit timestamp. + CreateTimestamp *int64 `json:"create_timestamp,omitempty"` + // Number of partitions before scaling. + SrcPartitionCount *int `json:"src_partition_count,omitempty"` + // Number of partitions after scaling. + TargetPartitionCount *int `json:"target_partition_count,omitempty"` + // Response code of the scaling operation. + ResultCode *int `json:"result_code,omitempty"` + // Response to the scaling operation. + ResultMsg *int `json:"result_msg,omitempty"` + // Specifies whether the scaling operation is automatic. + // true: Auto scaling is enabled. + // false: Manual scaling is enabled. + AutoScale *bool `json:"auto_scale,omitempty"` +} diff --git a/openstack/dis/v2/streams/ListStreams.go b/openstack/dis/v2/streams/ListStreams.go new file mode 100644 index 000000000..813298b75 --- /dev/null +++ b/openstack/dis/v2/streams/ListStreams.go @@ -0,0 +1,107 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/extract" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/common/tags" +) + +type ListStreamsOpts struct { + // The maximum number of DIS streams to list in a single API call. + // Value range: 1-100 Default value: 10 + // Minimum: 1 + // Maximum: 100 + // Default: 10 + Limit *int `q:"limit,omitempty"` + // Name of the DIS stream to start the stream list with. The returned stream list does not contain this DIS stream name. + // If pagination query is required, this parameter is not transferred for query on the first page. + // If the value of has_more_streams is true, the query is performed on the next page. + // The value of start_stream_name is the name of the last stream in the query result of the first page. + StartStreamName string `q:"start_stream_name,omitempty"` +} + +func ListStreams(client *golangsdk.ServiceClient, opts ListStreamsOpts) (*ListStreamsResponse, error) { + q, err := golangsdk.BuildQueryString(opts) + if err != nil { + return nil, err + } + + // GET /v2/{project_id}/streams + raw, err := client.Get(client.ServiceURL("streams")+q.String(), nil, &golangsdk.RequestOpts{ + MoreHeaders: map[string]string{"Content-Type": "application/json"}, JSONBody: nil, + }) + if err != nil { + return nil, err + } + + var res ListStreamsResponse + err = extract.Into(raw.Body, &res) + return &res, err +} + +type ListStreamsResponse struct { + // Total number of all the DIS streams created by the current tenant. + TotalNumber *int `json:"total_number,omitempty"` + // List of the streams meeting the current requests. + StreamNames []string `json:"stream_names,omitempty"` + // Specify whether there are more matching DIS streams to list. Possible values: + // true: yes + // false: no + // Default: false + HasMoreStreams *bool `json:"has_more_streams,omitempty"` + // Stream details. + StreamInfoList []StreamInfo `json:"stream_info_list,omitempty"` +} + +type StreamInfo struct { + // Name of the stream. + StreamName string `json:"stream_name,omitempty"` + // Time when the stream is created. The value is a 13-bit timestamp. + CreateTime *int64 `json:"create_time,omitempty"` + // Period for storing data in units of hours. + RetentionPeriod *int `json:"retention_period,omitempty"` + // Current status of the stream. Possible values: + // CREATING: The stream is being created. + // RUNNING: The stream is running. + // TERMINATING: The stream is being deleted. + // TERMINATED: The stream has been deleted. + // Enumeration values: + // CREATING + // RUNNING + // TERMINATING + // FROZEN + Status string `json:"status,omitempty"` + // Stream type. + // COMMON: a common stream. The bandwidth is 1 MB/s. + // ADVANCED: an advanced stream. The bandwidth is 5 MB/s. + // Enumeration values: + // COMMON + // ADVANCED + StreamType string `json:"stream_type,omitempty"` + // Source data type. + // BLOB: a collection of binary data stored as a single entity in a database management system. + // JSON: an open-standard file format that uses human-readable text to transmit data objects + // consisting of attribute-value pairs and array data types. + // CSV: a simple text format for storing tabular data in a plain text file. Commas are used as delimiters. + // Default value: BLOB + // Enumeration values: + // BLOB + // JSON + // CSV + DataType string `json:"data_type,omitempty"` + // Quantity of partitions. Partitions are the base throughput unit of a DIS stream. + PartitionCount *int `json:"partition_count,omitempty"` + // Specifies whether to enable auto-scaling. + // true: auto-scaling is enabled. + // false: auto-scaling is disabled. + // This function is disabled by default. + // Default: false + AutoScaleEnabled *bool `json:"auto_scale_enabled,omitempty"` + // Minimum number of partitions for automatic scale-down when auto scaling is enabled. + // Minimum: 1 + AutoScaleMinPartitionCount *int `json:"auto_scale_min_partition_count,omitempty"` + // Maximum number of partitions for automatic scale-up when auto scaling is enabled. + AutoScaleMaxPartitionCount *int `json:"auto_scale_max_partition_count,omitempty"` + // List of tags for the newly created DIS stream. + Tags []tags.ResourceTag `json:"tags,omitempty"` +} diff --git a/openstack/dis/v2/streams/UpdatePartitionCount.go b/openstack/dis/v2/streams/UpdatePartitionCount.go new file mode 100644 index 000000000..9dc3cd2d3 --- /dev/null +++ b/openstack/dis/v2/streams/UpdatePartitionCount.go @@ -0,0 +1,33 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type UpdatePartitionCountOpts struct { + // Name of the stream whose partition quantity needs to be changed. + // Maximum: 64 + StreamName string `json:"stream_name" required:"true"` + // Number of the target partitions. + // The value is an integer greater than 0. + // If the value is greater than the number of current partitions, scaling-up is required. + // If the value is less than the number of current partitions, scale-down is required. + // Note: A maximum of five scale-up/down operations can be performed for each stream within one hour. + // If a scale-up/down operation is successfully performed, you cannot perform one more scale-up/down operation within the next one hour. + // Minimum: 0 + TargetPartitionCount int `json:"target_partition_count"` +} + +func UpdatePartitionCount(client *golangsdk.ServiceClient, opts UpdatePartitionCountOpts) error { + body, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // PUT /v2/{project_id}/streams/{stream_name} + _, err = client.Put(client.ServiceURL("streams", opts.StreamName), body, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} diff --git a/openstack/dis/v2/streams/UpdateStream.go b/openstack/dis/v2/streams/UpdateStream.go new file mode 100644 index 000000000..339edb4ed --- /dev/null +++ b/openstack/dis/v2/streams/UpdateStream.go @@ -0,0 +1,52 @@ +package streams + +import ( + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/internal/build" +) + +type UpdateStreamOpts struct { + // Name of the stream whose partition quantity needs to be changed. + StreamName string `json:"stream_name" required:"true"` + // Period of time for which data is retained in the stream. + // Value range: 24-72 Unit: hour + // Default value: 24 If this parameter is left blank, the default value is used. + // Maximum: 168 + // Default: 24 + DataDuration *int `json:"data_duration,omitempty"` + // Source data type. + // - BLOB: a collection of binary data stored as a single entity in a database management system. + // Default value: BLOB. + // Enumeration values: + // BLOB + DataType string `json:"data_type,omitempty"` + // Source data structure that define JSON and CSV formats. + // It is described in the syntax of the Avro schema. + DataSchema string `json:"data_schema,omitempty"` + // Specifies whether to enable auto-scaling. + // - true: auto scaling is enabled. + // - false: auto-scaling is disabled. This function is disabled by default. + // Default: false + // Enumeration values: + // true + // false + AutoScaleEnabled *bool `json:"auto_scale_enabled,omitempty"` + // Minimum number of partitions for automatic scale-down when auto-scaling is enabled. + // Minimum: 1 + AutoScaleMinPartitionCount *int64 `json:"auto_scale_min_partition_count,omitempty"` + // Maximum number of partitions for automatic scale-up when auto-scaling is enabled. + AutoScaleMaxPartitionCount *int64 `json:"auto_scale_max_partition_count,omitempty"` +} + +func UpdateStream(client *golangsdk.ServiceClient, opts UpdateStreamOpts) error { + body, err := build.RequestBody(opts, "") + if err != nil { + return err + } + + // PUT /v3/{project_id}/streams/{stream_name} + _, err = client.Put(client.ServiceURL("streams", opts.StreamName), body, nil, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + return err +} diff --git a/params.go b/params.go index 7abef2ce7..0f2beb874 100644 --- a/params.go +++ b/params.go @@ -335,6 +335,8 @@ func BuildQueryString(opts interface{}) (*url.URL, error) { params.Add(tags[0], v.String()) case reflect.Int: params.Add(tags[0], strconv.FormatInt(v.Int(), 10)) + case reflect.Int64: + params.Add(tags[0], strconv.FormatInt(v.Int(), 10)) case reflect.Bool: params.Add(tags[0], strconv.FormatBool(v.Bool())) case reflect.Slice: