Skip to content

Commit

Permalink
Merge branch 'main' into edit-variable
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamraj-git authored Dec 29, 2024
2 parents 2dfc255 + 11cdd97 commit 1e85ed1
Show file tree
Hide file tree
Showing 9 changed files with 439 additions and 2 deletions.
8 changes: 8 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,11 @@ class VariableCollectionResponse(BaseModel):

variables: list[VariableResponse]
total_entries: int


class VariablesImportResponse(BaseModel):
"""Import Variables serializer for responses."""

created_variable_keys: list[str]
import_count: int
created_count: int
92 changes: 92 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5857,6 +5857,68 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/variables/import:
post:
tags:
- Variable
summary: Import Variables
description: Import variables from a JSON file.
operationId: import_variables
parameters:
- name: action_if_exists
in: query
required: false
schema:
enum:
- overwrite
- fail
- skip
type: string
default: fail
title: Action If Exists
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/Body_import_variables'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/VariablesImportResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
get:
tags:
Expand Down Expand Up @@ -6435,6 +6497,16 @@ components:
- status
title: BaseInfoResponse
description: Base info serializer for responses.
Body_import_variables:
properties:
file:
type: string
format: binary
title: File
type: object
required:
- file
title: Body_import_variables
ClearTaskInstancesBody:
properties:
dry_run:
Expand Down Expand Up @@ -9709,6 +9781,26 @@ components:
- is_encrypted
title: VariableResponse
description: Variable serializer for responses.
VariablesImportResponse:
properties:
created_variable_keys:
items:
type: string
type: array
title: Created Variable Keys
import_count:
type: integer
title: Import Count
created_count:
type: integer
title: Created Count
type: object
required:
- created_variable_keys
- import_count
- created_count
title: VariablesImportResponse
description: Import Variables serializer for responses.
VersionInfo:
properties:
version:
Expand Down
59 changes: 57 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
# under the License.
from __future__ import annotations

from typing import Annotated
import json
from typing import Annotated, Literal

from fastapi import Depends, HTTPException, Query, status
from fastapi import Depends, HTTPException, Query, UploadFile, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select
Expand All @@ -35,6 +36,7 @@
VariableBody,
VariableCollectionResponse,
VariableResponse,
VariablesImportResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.variable import Variable
Expand Down Expand Up @@ -180,3 +182,56 @@ def post_variable(
variable = session.scalar(select(Variable).where(Variable.key == post_body.key).limit(1))

return variable


@variables_router.post(
"/import",
status_code=status.HTTP_200_OK,
responses=create_openapi_http_exception_doc(
[status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY]
),
)
def import_variables(
file: UploadFile,
session: SessionDep,
action_if_exists: Literal["overwrite", "fail", "skip"] = "fail",
) -> VariablesImportResponse:
"""Import variables from a JSON file."""
try:
file_content = file.file.read().decode("utf-8")
variables = json.loads(file_content)

if not isinstance(variables, dict):
raise ValueError("Uploaded JSON must contain key-value pairs.")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON format: {e}")

if not variables:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="No variables found in the provided JSON.",
)

existing_keys = {variable for variable in session.execute(select(Variable.key)).scalars()}
import_keys = set(variables.keys())

matched_keys = existing_keys & import_keys

if action_if_exists == "fail" and matched_keys:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"The variables with these keys: {matched_keys} already exists.",
)
elif action_if_exists == "skip":
create_keys = import_keys - matched_keys
else:
create_keys = import_keys

for key in create_keys:
Variable.set(key=key, value=variables[key], session=session)

return VariablesImportResponse(
created_count=len(create_keys),
import_count=len(import_keys),
created_variable_keys=list(create_keys),
)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,9 @@ export type PoolServicePostPoolsMutationResult = Awaited<
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
export type VariableServiceImportVariablesMutationResult = Awaited<
ReturnType<typeof VariableService.importVariables>
>;
export type BackfillServicePauseBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.pauseBackfill>
>;
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
} from "../requests/services.gen";
import {
BackfillPostBody,
Body_import_variables,
ClearTaskInstancesBody,
ConnectionBody,
ConnectionBulkBody,
Expand Down Expand Up @@ -3354,6 +3355,49 @@ export const useVariableServicePostVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Import Variables
* Import variables from a JSON file.
* @param data The data for the request.
* @param data.formData
* @param data.actionIfExists
* @returns VariablesImportResponse Successful Response
* @throws ApiError
*/
export const useVariableServiceImportVariables = <
TData = Common.VariableServiceImportVariablesMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
actionIfExists?: "overwrite" | "fail" | "skip";
formData: Body_import_variables;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
actionIfExists?: "overwrite" | "fail" | "skip";
formData: Body_import_variables;
},
TContext
>({
mutationFn: ({ actionIfExists, formData }) =>
VariableService.importVariables({
actionIfExists,
formData,
}) as unknown as Promise<TData>,
...options,
});
/**
* Pause Backfill
* @param data The data for the request.
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,19 @@ export const $BaseInfoResponse = {
description: "Base info serializer for responses.",
} as const;

export const $Body_import_variables = {
properties: {
file: {
type: "string",
format: "binary",
title: "File",
},
},
type: "object",
required: ["file"],
title: "Body_import_variables",
} as const;

export const $ClearTaskInstancesBody = {
properties: {
dry_run: {
Expand Down Expand Up @@ -5573,6 +5586,30 @@ export const $VariableResponse = {
description: "Variable serializer for responses.",
} as const;

export const $VariablesImportResponse = {
properties: {
created_variable_keys: {
items: {
type: "string",
},
type: "array",
title: "Created Variable Keys",
},
import_count: {
type: "integer",
title: "Import Count",
},
created_count: {
type: "integer",
title: "Created Count",
},
},
type: "object",
required: ["created_variable_keys", "import_count", "created_count"],
title: "VariablesImportResponse",
description: "Import Variables serializer for responses.",
} as const;

export const $VersionInfo = {
properties: {
version: {
Expand Down
32 changes: 32 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ import type {
GetVariablesResponse,
PostVariableData,
PostVariableResponse,
ImportVariablesData,
ImportVariablesResponse,
ReparseDagFileData,
ReparseDagFileResponse,
GetHealthResponse,
Expand Down Expand Up @@ -3154,6 +3156,36 @@ export class VariableService {
},
});
}

/**
* Import Variables
* Import variables from a JSON file.
* @param data The data for the request.
* @param data.formData
* @param data.actionIfExists
* @returns VariablesImportResponse Successful Response
* @throws ApiError
*/
public static importVariables(
data: ImportVariablesData,
): CancelablePromise<ImportVariablesResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/variables/import",
query: {
action_if_exists: data.actionIfExists,
},
formData: data.formData,
mediaType: "multipart/form-data",
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
409: "Conflict",
422: "Unprocessable Entity",
},
});
}
}

export class DagParsingService {
Expand Down
Loading

0 comments on commit 1e85ed1

Please sign in to comment.