-
Notifications
You must be signed in to change notification settings - Fork 29
/
flink_job.go
183 lines (159 loc) · 5.86 KB
/
flink_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package aiven
import (
"context"
"time"
)
type (
// FlinkJobHandler aiven go-client handler for Flink Jobs
FlinkJobHandler struct {
client *Client
}
// CreateFlinkJobRequest Aiven API request
// POST https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job
CreateFlinkJobRequest struct {
JobName string `json:"job_name,omitempty"`
Statement string `json:"statement"`
TablesIds []string `json:"table_ids"`
}
// CreateFlinkJobResponse Aiven API response
// POST https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job
CreateFlinkJobResponse struct {
APIResponse
JobName string `json:"job_name"`
JobId string `json:"job_id"`
}
// PatchFlinkJobRequest Aiven API request
// PATCH https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/proxy/v1/jobs/<job_id>
PatchFlinkJobRequest struct {
JobId string `json:"job_id"`
}
// GetFlinkJobRequest Aiven API request
// GET https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job/<job_id>
GetFlinkJobRequest struct {
JobId string `json:"job_id"`
}
// ListFlinkApplicationDeploymentResponse Aiven API response
// GET https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job
ListFlinkJobResponse struct {
APIResponse
Jobs []struct {
ID string `json:"id"`
Status string `json:"status"`
} `json:"jobs"`
}
// GetFlinkJobResponse Aiven API response
// GET https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/proxy/v1/jobs/<job_id>
GetFlinkJobResponse struct {
APIResponse
Name string `json:"name"`
JID string `json:"jid"`
IsStoppable bool `json:"isStoppable"`
Duration int `json:"duration"`
Now int `json:"now"`
EndTime *time.Time `json:"end-time"`
StartTime *time.Time `json:"start-time"`
MaxParallelism int `json:"maxParallelism"`
State string `json:"state"`
Plan struct {
JID string `json:"jid"`
Name string `json:"name"`
Nodes []struct {
Description string `json:"description"`
Id string `json:"id"`
Operator string `json:"operator"`
OperatorStrategy string `json:"operator_strategy"`
OptimizerProperties interface{} `json:"optimizer_properties"`
Parallelism int `json:"parallelism"`
} `json:"nodes"`
} `json:"plan"`
StatusCounts struct {
Canceled int `json:"CANCELED"`
Canceling int `json:"CANCELING"`
Created int `json:"CREATED"`
Deploying int `json:"DEPLOYING"`
Failed int `json:"FAILED"`
Finished int `json:"FINISHED"`
Initializing int `json:"INITIALIZING"`
Reconciling int `json:"RECONCILING"`
Running int `json:"RUNNING"`
Scheduled int `json:"SCHEDULED"`
} `json:"status-counts"`
Timestamps struct {
Canceled int `json:"CANCELED"`
Canceling int `json:"CANCELING"`
Created int `json:"CREATED"`
Deploying int `json:"DEPLOYING"`
Failed int `json:"FAILED"`
Finished int `json:"FINISHED"`
Initializing int `json:"INITIALIZING"`
Reconciling int `json:"RECONCILING"`
Running int `json:"RUNNING"`
Scheduled int `json:"SCHEDULED"`
} `json:"timestamps"`
Vertices []interface{} `json:"vertices"`
}
// ValidateFlinkJobRequest Aiven API request
// POST https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job/validate
ValidateFlinkJobRequest struct {
Statement string `json:"statement"`
TableIDs []string `json:"table_ids"`
}
// ValidateFlinkJobResponse Aiven API response
// POST https://api.aiven.io/v1/project/<project>/service/<service_name>/flink/job/validate
ValidateFlinkJobResponse struct {
APIResponse
JobValidateError struct {
Message string `json:"message"`
Position flinkPosition `json:"position"`
} `json:"job_validate_error"`
}
)
// Create creates a flink job
func (h *FlinkJobHandler) Create(ctx context.Context, project, service string, req CreateFlinkJobRequest) (*CreateFlinkJobResponse, error) {
path := buildPath("project", project, "service", service, "flink", "job")
bts, err := h.client.doPostRequest(ctx, path, req)
if err != nil {
return nil, err
}
var r CreateFlinkJobResponse
return &r, checkAPIResponse(bts, &r)
}
// List lists a flink job
func (h *FlinkJobHandler) List(ctx context.Context, project, service string) (*ListFlinkJobResponse, error) {
path := buildPath("project", project, "service", service, "flink", "job")
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var r ListFlinkJobResponse
return &r, checkAPIResponse(bts, &r)
}
// Get gets a flink job
func (h *FlinkJobHandler) Get(ctx context.Context, project, service string, req GetFlinkJobRequest) (*GetFlinkJobResponse, error) {
path := buildPath("project", project, "service", service, "flink", "proxy", "v1", "jobs", req.JobId)
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var r GetFlinkJobResponse
return &r, checkAPIResponse(bts, &r)
}
// Patch patches a flink job
func (h *FlinkJobHandler) Patch(ctx context.Context, project, service string, req PatchFlinkJobRequest) error {
path := buildPath("project", project, "service", service, "flink", "proxy", "v1", "jobs", req.JobId)
bts, err := h.client.doPatchRequest(ctx, path, nil)
if err != nil {
return err
}
return checkAPIResponse(bts, nil)
}
// Validate validates a flink job
func (h *FlinkJobHandler) Validate(ctx context.Context, project, service string, req ValidateFlinkJobRequest) (*ValidateFlinkJobResponse, error) {
path := buildPath("project", project, "service", service, "flink", "job", "validate")
bts, err := h.client.doPostRequest(ctx, path, req)
if err != nil {
return nil, err
}
var r ValidateFlinkJobResponse
return &r, checkAPIResponse(bts, &r)
}