Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Refactor Handling of Insert Duplicates for Post Connection #44322

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def get_connections(
@connections_router.post(
"",
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_409_CONFLICT]),
responses=create_openapi_http_exception_doc(
[status.HTTP_409_CONFLICT]
), # handled by global exception handler
)
def post_connection(
post_body: ConnectionBody,
Expand All @@ -130,13 +132,6 @@ def post_connection(
except Exception as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"{e}")

connection = session.scalar(select(Connection).filter_by(conn_id=post_body.connection_id))
if connection is not None:
raise HTTPException(
status.HTTP_409_CONFLICT,
f"Connection with connection_id: `{post_body.connection_id}` already exists",
)

connection = Connection(**post_body.model_dump(by_alias=True))
session.add(connection)

Expand Down
4 changes: 1 addition & 3 deletions tests/api_fastapi/core_api/routes/public/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ def test_post_should_respond_already_exist(self, test_client, body):
# Another request
response = test_client.post("/public/connections/", json=body)
assert response.status_code == 409
assert response.json() == {
"detail": f"Connection with connection_id: `{TEST_CONN_ID}` already exists",
}
assert response.json() == {"detail": "Unique constraint violation"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we subclass the global exception handler to add personalized messages? 'Unique constraint violation' is quite broad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to include error details (e.g., str(e)) to provide a more descriptive message. However, I can't think of a way to include custom error messages specific to each router, as the error handler is registered at the app level rather than the router level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the refactor if we can't customize the response detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me research if there is a way to register custom error message for specific routers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What type of error would you like to display @ephraimbuddy ?

Do we expect the name of the Model, of the db table, or just one message for all but just a more explicit one, or the message coming from the database ?

Copy link
Contributor Author

@jason810496 jason810496 Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After conducting some research, I have come up with a solution to add custom error message handlers for specific endpoints. If a custom error handler is provided, it will take precedence and be used before the default global error handler.

The usage would look like this:

from airflow.api_fastapi.common.exceptions import register_unique_constraint_error_handler, BaseCustomErrorHandler


@connections_router.post(
    "",
    status_code=status.HTTP_201_CREATED,
    responses=create_openapi_http_exception_doc(
        [status.HTTP_409_CONFLICT]
    ),  # handled by global exception handler
)
def post_connection(
    post_body: ConnectionBody,
    session: Annotated[Session, Depends(get_session)],
) -> ConnectionResponse:
    """Create connection entry."""
    try:
        helpers.validate_key(post_body.connection_id, max_length=200)
    except Exception as e:
        raise HTTPException(status.HTTP_400_BAD_REQUEST, f"{e}")
    connection = Connection(**post_body.model_dump(by_alias=True))
    session.add(connection)
    return connection

class _ConnectionUniqueConstraintErrorHandler(BaseCustomErrorHandler):

    def exception_handler(self, request_schema: ConnectionBody):
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=f"Connection with connection_id: `{request_schema.connection_id}` already exists",
        )

register_unique_constraint_error_handler(post_connection, _ConnectionUniqueConstraintErrorHandler(ConnectionBody))

Not sure is this clean enough for providing verbose error message for specific endpoints, WDYT ?

Copy link
Member

@pierrejeambrun pierrejeambrun Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If every endpoint returning 409 needs to define a custom class and register a custom event handler just to return the appropriate error message, it might be a little bit verbose.

Maybe we can explore parsing the database error exception ? I don't know how different message are or if they share some similitudes.... maybe the table name / unique constraint failing can be retrieved and displayed.

Otherwise we might need to do the error handling on a per route basis, with try catch etc... manually...

Copy link
Contributor Author

@jason810496 jason810496 Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the Pool and Connection error details currently being handled. I used pprint(exc.__dict__) to display all possible error details that can be utilized. However, it seems quite challenging to parse generic information from the database error exceptions.

Pool

SQLite

IntegrityError('(sqlite3.IntegrityError) UNIQUE constraint failed: slot_pool.pool')
{'code': 'gkpj',
 'connection_invalidated': False,
 'detail': [],
 'hide_parameters': False,
 'ismulti': False,
 'orig': IntegrityError('UNIQUE constraint failed: slot_pool.pool'),
 'params': ('my_pool', 11, None, 0),
 'statement': 'INSERT INTO slot_pool (pool, slots, description, '
              'include_deferred) VALUES (?, ?, ?, ?)'}

Postgres

IntegrityError('(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "slot_pool_pool_uq"\nDETAIL:  Key (pool)=(my_pool) already exists.\n')
{'code': 'gkpj',
 'connection_invalidated': False,
 'detail': [],
 'hide_parameters': False,
 'ismulti': False,
 'orig': UniqueViolation('duplicate key value violates unique constraint "slot_pool_pool_uq"\nDETAIL:  Key (pool)=(my_pool) already exists.\n'),
 'params': {'description': None,
            'include_deferred': False,
            'pool': 'my_pool',
            'slots': 11},
 'statement': 'INSERT INTO slot_pool (pool, slots, description, '
              'include_deferred) VALUES (%(pool)s, %(slots)s, %(description)s, '
              '%(include_deferred)s) RETURNING slot_pool.id'}

MySQL

IntegrityError('(MySQLdb.IntegrityError) (1062, "Duplicate entry \'my_pool\' for key \'slot_pool.slot_pool_pool_uq\'")')
{'code': 'gkpj',
 'connection_invalidated': False,
 'detail': [],
 'hide_parameters': False,
 'ismulti': False,
 'orig': IntegrityError(1062, "Duplicate entry 'my_pool' for key 'slot_pool.slot_pool_pool_uq'"),
 'params': ('my_pool', 11, None, 0),
 'statement': 'INSERT INTO slot_pool (pool, slots, description, '
              'include_deferred) VALUES (%s, %s, %s, %s)'}

Connection

SQLite

IntegrityError('(sqlite3.IntegrityError) UNIQUE constraint failed: connection.conn_id')
{'code': 'gkpj',
 'connection_invalidated': False,
 'detail': [],
 'hide_parameters': False,
 'ismulti': False,
 'orig': IntegrityError('UNIQUE constraint failed: connection.conn_id'),
 'params': ('test_connection_id',
            'test_type',
            None,
            None,
            None,
            None,
            None,
            None,
            0,
            0,
            None),
 'statement': 'INSERT INTO connection (conn_id, conn_type, description, host, '
              'schema, login, password, port, is_encrypted, '
              'is_extra_encrypted, extra) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, '
              '?, ?)'}

Postgres

IntegrityError('(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "connection_conn_id_uq"\nDETAIL:  Key (conn_id)=(test_connection_id) already exists.\n')
{'code': 'gkpj',
 'connection_invalidated': False,
 'detail': [],
 'hide_parameters': False,
 'ismulti': False,
 'orig': UniqueViolation('duplicate key value violates unique constraint "connection_conn_id_uq"\nDETAIL:  Key (conn_id)=(test_connection_id) already exists.\n'),
 'params': {'conn_id': 'test_connection_id',
            'conn_type': 'test_type',
            'description': None,
            'extra': None,
            'host': None,
            'is_encrypted': False,
            'is_extra_encrypted': False,
            'login': None,
            'password': None,
            'port': None,
            'schema': None},
 'statement': 'INSERT INTO connection (conn_id, conn_type, description, host, '
              'schema, login, password, port, is_encrypted, '
              'is_extra_encrypted, extra) VALUES (%(conn_id)s, %(conn_type)s, '
              '%(description)s, %(host)s, %(schema)s, %(login)s, %(password)s, '
              '%(port)s, %(is_encrypted)s, %(is_extra_encrypted)s, %(extra)s) '
              'RETURNING connection.id'}

MySQL

IntegrityError('(MySQLdb.IntegrityError) (1062, "Duplicate entry \'test_connection_id\' for key \'connection.connection_conn_id_uq\'")')
{'code': 'gkpj',
 'connection_invalidated': False,
 'detail': [],
 'hide_parameters': False,
 'ismulti': False,
 'orig': IntegrityError(1062, "Duplicate entry 'test_connection_id' for key 'connection.connection_conn_id_uq'"),
 'params': ('test_connection_id',
            'test_type',
            None,
            None,
            None,
            None,
            None,
            None,
            0,
            0,
            None),
 'statement': 'INSERT INTO connection (conn_id, conn_type, description, host, '
              '`schema`, login, password, port, is_encrypted, '
              'is_extra_encrypted, extra) VALUES (%s, %s, %s, %s, %s, %s, %s, '
              '%s, %s, %s, %s)'}


@pytest.mark.enable_redact
@pytest.mark.parametrize(
Expand Down