From a80713c6f7f28dd79d70c122f1b752dee6f0d395 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 14 May 2024 09:20:23 -0700 Subject: [PATCH 01/27] GSE-3635: Fix AWSIC logging and status=408 errors (#664) * Remove vestigial logging no need to log because these errors are recorded into the "error" column in the corresponding table * Add retry and update aio* dependencies Update aiohttp, aioboto3, and aiobotocore dependencies and add retry logic to sts.assume_role invocation. --- src/connectors/aws_collect.py | 2 -- src/connectors/utils.py | 18 +++++++++++++++++- src/setup.py | 5 +++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 3145047dc..677bc4a5c 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -1706,7 +1706,6 @@ async def load_task_response(client, task): yield x except (ClientError, DataNotFoundError, ServerTimeoutError) as e: - log.info(format_exception_only(e)) for x in process_aws_response(task, e): yield x @@ -1756,7 +1755,6 @@ async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None] except ClientError as e: # record missing auditor role as empty account summary - log.info(format_exception_only(e)) yield ( task.method, updated( diff --git a/src/connectors/utils.py b/src/connectors/utils.py index b7b319557..77cdf3f6b 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -1,3 +1,4 @@ +import asyncio from inspect import signature import random import multiprocessing as mp @@ -5,6 +6,7 @@ import aioboto3 import boto3 +from botocore.parsers import ResponseParserError import yaml from requests import auth @@ -89,8 +91,22 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): + for attempt in range(10): + try: + return await try_aio_sts_assume_role( + src_role_arn, dest_role_arn, dest_external_id + ) + except ResponseParserError as e: + if attempt < 10: + delay = int(1.5**attempt) + random.randint(0, 3) + await asyncio.sleep(delay) + else: + raise + + +async def try_aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): session_name = ''.join(random.choice('0123456789ABCDEF') for i in range(16)) - async with aioboto3.client('sts') as sts: + async with aioboto3.Session().client('sts') as sts: src_role = await sts.assume_role( RoleArn=src_role_arn, RoleSessionName=session_name ) diff --git a/src/setup.py b/src/setup.py index 9404ed466..28db8d5cd 100644 --- a/src/setup.py +++ b/src/setup.py @@ -6,8 +6,9 @@ packages=find_packages(), include_package_data=True, install_requires=[ - 'aiohttp[speedups]==3.8.1', - 'aioboto3==8.3.0', + 'aiohttp[speedups]==3.9.5', + 'aioboto3==12.4.0', + 'aiobotocore==2.12.3', 'demjson3==3.0.5', 'fire==0.4.0', 'jira==2.0.0', From a66a8176171c96b4a6849a5c8002fd6f9cfbe1df Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 15 May 2024 08:23:42 -0700 Subject: [PATCH 02/27] Update base image to Python 3.8 on Debian bookworm (#665) * Update base image to Debian bookworm * Update to py38 --- Dockerfile.snowalert | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index c6834342f..bdc47e6db 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -1,4 +1,4 @@ -FROM python:3.7-slim-stretch +FROM python:3.8-slim-bookworm WORKDIR /var/task From 473d573fd4224aace5a621b3dc161de9d49ec2e5 Mon Sep 17 00:00:00 2001 From: Grace Tan Date: Wed, 15 May 2024 16:06:19 -0600 Subject: [PATCH 03/27] Revert "Update base image to Python 3.8 on Debian bookworm (#665)" (#667) This reverts commit a66a8176171c96b4a6849a5c8002fd6f9cfbe1df. --- Dockerfile.snowalert | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index bdc47e6db..c6834342f 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -1,4 +1,4 @@ -FROM python:3.8-slim-bookworm +FROM python:3.7-slim-stretch WORKDIR /var/task From b7b512682924fefe00faf442f01debc03a07c3f5 Mon Sep 17 00:00:00 2001 From: Grace Tan Date: Wed, 15 May 2024 16:06:40 -0600 Subject: [PATCH 04/27] Revert "GSE-3635: Fix AWSIC logging and status=408 errors (#664)" (#666) This reverts commit a80713c6f7f28dd79d70c122f1b752dee6f0d395. --- src/connectors/aws_collect.py | 2 ++ src/connectors/utils.py | 18 +----------------- src/setup.py | 5 ++--- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 677bc4a5c..3145047dc 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -1706,6 +1706,7 @@ async def load_task_response(client, task): yield x except (ClientError, DataNotFoundError, ServerTimeoutError) as e: + log.info(format_exception_only(e)) for x in process_aws_response(task, e): yield x @@ -1755,6 +1756,7 @@ async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None] except ClientError as e: # record missing auditor role as empty account summary + log.info(format_exception_only(e)) yield ( task.method, updated( diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 77cdf3f6b..b7b319557 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -1,4 +1,3 @@ -import asyncio from inspect import signature import random import multiprocessing as mp @@ -6,7 +5,6 @@ import aioboto3 import boto3 -from botocore.parsers import ResponseParserError import yaml from requests import auth @@ -91,22 +89,8 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): - for attempt in range(10): - try: - return await try_aio_sts_assume_role( - src_role_arn, dest_role_arn, dest_external_id - ) - except ResponseParserError as e: - if attempt < 10: - delay = int(1.5**attempt) + random.randint(0, 3) - await asyncio.sleep(delay) - else: - raise - - -async def try_aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): session_name = ''.join(random.choice('0123456789ABCDEF') for i in range(16)) - async with aioboto3.Session().client('sts') as sts: + async with aioboto3.client('sts') as sts: src_role = await sts.assume_role( RoleArn=src_role_arn, RoleSessionName=session_name ) diff --git a/src/setup.py b/src/setup.py index 28db8d5cd..9404ed466 100644 --- a/src/setup.py +++ b/src/setup.py @@ -6,9 +6,8 @@ packages=find_packages(), include_package_data=True, install_requires=[ - 'aiohttp[speedups]==3.9.5', - 'aioboto3==12.4.0', - 'aiobotocore==2.12.3', + 'aiohttp[speedups]==3.8.1', + 'aioboto3==8.3.0', 'demjson3==3.0.5', 'fire==0.4.0', 'jira==2.0.0', From 88394bee71c354cb1971d8d709e5425a8b0f174f Mon Sep 17 00:00:00 2001 From: Grace Tan Date: Wed, 15 May 2024 17:35:29 -0600 Subject: [PATCH 05/27] Update README.md (#669) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 89bda9982..d791cb05c 100644 --- a/README.md +++ b/README.md @@ -19,3 +19,4 @@ Ready? Let's [get started!](https://docs.snowalert.com/getting-started "SnowAler ## License This project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details. + From 3b78ee17b9453c7bce8b315ff24ffad1bb3b6891 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 16 May 2024 10:02:24 -0700 Subject: [PATCH 06/27] Update Python and pip dependencies & fix tests (#670) * Update Python and pip dependencies & fix tests * fix share db view creation in installer script Snowflake added a new column so the share db name is col4 now --- Dockerfile.snowalert | 2 +- src/mypy.ini | 2 +- src/runners/tests/run_alerts.py | 4 ++-- src/scripts/install.py | 3 +-- src/setup.py | 4 ++-- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index c6834342f..ca836d14c 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -1,4 +1,4 @@ -FROM python:3.7-slim-stretch +FROM python:3.9-slim-bookworm WORKDIR /var/task diff --git a/src/mypy.ini b/src/mypy.ini index 4b3720aae..9c2e20356 100644 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version=3.7 +python_version=3.9 [mypy-boto3.*] ignore_missing_imports = True diff --git a/src/runners/tests/run_alerts.py b/src/runners/tests/run_alerts.py index b963faf14..1323baf14 100644 --- a/src/runners/tests/run_alerts.py +++ b/src/runners/tests/run_alerts.py @@ -142,8 +142,8 @@ "TICKET": None, "HANDLERS": None, "CATS": {'test difficulty': {'for dogs': 3, 'for cats': 1}}, - "ENTITIES": {'first_name', 'John', 'last_name', 'Doe'}, - "TAGS": {'staging': true, 'alert_owner': 'GSE TD'}, + "ENTITIES": {'test entities': {'first_name': 'John', 'last_name': 'Doe'}}, + "TAGS": {'staging': True, 'alert_owner': 'GSE TD'}, } SLACK_MOCK_RETURN_VALUE = {'ok': True} diff --git a/src/scripts/install.py b/src/scripts/install.py index 60d31fc36..a6cd35d22 100755 --- a/src/scripts/install.py +++ b/src/scripts/install.py @@ -312,7 +312,7 @@ def find_share_db_name(do_attempt): ) # Database name is 4th attribute in row - share_db_names = [share_row[3] for share_row in sample_data_share_rows] + share_db_names = [share_row[4] for share_row in sample_data_share_rows] if len(share_db_names) == 0: VERBOSE and print(f"Unable to locate sample data share.") return @@ -509,7 +509,6 @@ def main( set_env_vars=False, verbose=False, ): - global VERBOSE VERBOSE = verbose diff --git a/src/setup.py b/src/setup.py index 9404ed466..5d4ad097a 100644 --- a/src/setup.py +++ b/src/setup.py @@ -13,8 +13,8 @@ 'jira==2.0.0', 'PyYAML==6.0', 'xmltodict==0.12.0', - 'snowflake-connector-python==2.7.3', - 'snowflake-sqlalchemy==1.3.3', + 'snowflake-connector-python==3.10.0', + 'snowflake-sqlalchemy==1.5.3', 'pybrake==0.4.0', 'pytz==2018.9', 'slackclient==1.3.1', From 60b6bd2f69a6cd965cf974115afe4af646637341 Mon Sep 17 00:00:00 2001 From: Grace Tan Date: Tue, 21 May 2024 15:08:59 -0600 Subject: [PATCH 07/27] [GSE-4024] Tag image with commit SHA after build, and push to repository (#671) * Tag image with commit SHA after build, and push to repository * updates to contibution file --- .pre-commit-config.yaml | 2 +- CONTRIBUTING.md | 3 ++- hooks/post_push | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 hooks/post_push diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 21fb8d75b..b0112d1e7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,4 +3,4 @@ repos: rev: stable hooks: - id: black - language_version: python3.7 + language_version: python3 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f0920cf3c..57d195f3e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,7 +26,7 @@ pip install -e . # to start runners export $(cat snowalert-$SNOWFLAKE_ACCOUNT.envs | xargs) -python runners/run.py all +python3 runners/run.py all ~~~ If you'd like to run the test suite, please create a separate ENVS file that contains these additional variables used to test the installer: @@ -41,6 +41,7 @@ If you save this to `snowalert-$SNOWFLAKE_ACCOUNT.testing.envs`, you can run the ~~~ # test runners +pip install -e .\[dev\] export $(cat snowalert-$SNOWFLAKE_ACCOUNT.testing.envs | xargs) pytest -vv ~~~ diff --git a/hooks/post_push b/hooks/post_push new file mode 100644 index 000000000..7ff4dbe9c --- /dev/null +++ b/hooks/post_push @@ -0,0 +1,2 @@ +docker tag $IMAGE_NAME $DOCKER_REPO:$SOURCE_COMMIT +docker push $DOCKER_REPO:$SOURCE_COMMIT From 224a31521a89b2860b37e997801e01aa1649fcd1 Mon Sep 17 00:00:00 2001 From: Grace Tan Date: Tue, 21 May 2024 15:19:39 -0600 Subject: [PATCH 08/27] missing bash (#672) --- hooks/post_push | 1 + 1 file changed, 1 insertion(+) diff --git a/hooks/post_push b/hooks/post_push index 7ff4dbe9c..2c463f7a2 100644 --- a/hooks/post_push +++ b/hooks/post_push @@ -1,2 +1,3 @@ +#!/bin/bash docker tag $IMAGE_NAME $DOCKER_REPO:$SOURCE_COMMIT docker push $DOCKER_REPO:$SOURCE_COMMIT From f1f0a8f7adb6eeee63d88e3ccd8b5fc77c7633eb Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 3 Jun 2024 16:05:12 -0700 Subject: [PATCH 09/27] Reapply "GSE-3635: Fix AWSIC logging and status=408 errors (#664)" (#666) (#673) This reverts commit b7b512682924fefe00faf442f01debc03a07c3f5. --- src/connectors/aws_collect.py | 2 -- src/connectors/utils.py | 18 +++++++++++++++++- src/setup.py | 5 +++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 3145047dc..677bc4a5c 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -1706,7 +1706,6 @@ async def load_task_response(client, task): yield x except (ClientError, DataNotFoundError, ServerTimeoutError) as e: - log.info(format_exception_only(e)) for x in process_aws_response(task, e): yield x @@ -1756,7 +1755,6 @@ async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None] except ClientError as e: # record missing auditor role as empty account summary - log.info(format_exception_only(e)) yield ( task.method, updated( diff --git a/src/connectors/utils.py b/src/connectors/utils.py index b7b319557..77cdf3f6b 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -1,3 +1,4 @@ +import asyncio from inspect import signature import random import multiprocessing as mp @@ -5,6 +6,7 @@ import aioboto3 import boto3 +from botocore.parsers import ResponseParserError import yaml from requests import auth @@ -89,8 +91,22 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): + for attempt in range(10): + try: + return await try_aio_sts_assume_role( + src_role_arn, dest_role_arn, dest_external_id + ) + except ResponseParserError as e: + if attempt < 10: + delay = int(1.5**attempt) + random.randint(0, 3) + await asyncio.sleep(delay) + else: + raise + + +async def try_aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): session_name = ''.join(random.choice('0123456789ABCDEF') for i in range(16)) - async with aioboto3.client('sts') as sts: + async with aioboto3.Session().client('sts') as sts: src_role = await sts.assume_role( RoleArn=src_role_arn, RoleSessionName=session_name ) diff --git a/src/setup.py b/src/setup.py index 5d4ad097a..6189dbac7 100644 --- a/src/setup.py +++ b/src/setup.py @@ -6,8 +6,9 @@ packages=find_packages(), include_package_data=True, install_requires=[ - 'aiohttp[speedups]==3.8.1', - 'aioboto3==8.3.0', + 'aiohttp[speedups]==3.9.5', + 'aioboto3==12.4.0', + 'aiobotocore==2.12.3', 'demjson3==3.0.5', 'fire==0.4.0', 'jira==2.0.0', From 952cbdddfa3a2843e47a2f0676d9a7b0a6128f91 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 3 Jun 2024 18:39:50 -0700 Subject: [PATCH 10/27] Update Dockerfile.snowalert (#674) fixing error > botocore.exceptions.CredentialRetrievalError: Error when retrieving credentials from container-role: Error retrieving metadata: Received error when attempting to retrieve container metadata: Connect timeout on endpoint URL: "http:///v2/credentials/" per https://repost.aws/questions/QUCFqv7OfoQlygJrmwfkJ24Q/various-aws-apis-fail-due-to-timeout --- Dockerfile.snowalert | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index ca836d14c..728a20e40 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -6,6 +6,10 @@ RUN apt-get update RUN apt-get install -y r-base RUN R -e "install.packages(c('dplyr', 'purrr', 'tidyr','MASS', 'tidyverse', 'broom','testthat'), repos = 'https://cloud.r-project.org')" +# https://repost.aws/questions/QUCFqv7OfoQlygJrmwfkJ24Q/various-aws-apis-fail-due-to-timeout +RUN update-alternatives --set iptables /usr/sbin/iptables-legacy +RUN update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy + RUN pip install --upgrade pip virtualenv pyflakes RUN mkdir -p ./snowalert From 49e91c893c5cdeea31a8883ed8e59fc51679b727 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 3 Jun 2024 20:24:12 -0700 Subject: [PATCH 11/27] Update Dockerfile.snowalert --- Dockerfile.snowalert | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index 728a20e40..709adcd44 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -3,13 +3,15 @@ FROM python:3.9-slim-bookworm WORKDIR /var/task RUN apt-get update -RUN apt-get install -y r-base -RUN R -e "install.packages(c('dplyr', 'purrr', 'tidyr','MASS', 'tidyverse', 'broom','testthat'), repos = 'https://cloud.r-project.org')" # https://repost.aws/questions/QUCFqv7OfoQlygJrmwfkJ24Q/various-aws-apis-fail-due-to-timeout +RUN apt-get install --reinstall -y iptables RUN update-alternatives --set iptables /usr/sbin/iptables-legacy RUN update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy +RUN apt-get install -y r-base +RUN R -e "install.packages(c('dplyr', 'purrr', 'tidyr','MASS', 'tidyverse', 'broom','testthat'), repos = 'https://cloud.r-project.org')" + RUN pip install --upgrade pip virtualenv pyflakes RUN mkdir -p ./snowalert From 90140eaa58d8dcec3ce8b11039d8dfea014b3799 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 16:39:47 -0700 Subject: [PATCH 12/27] Bump aioboto3 to 13.0.1 --- src/setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/setup.py b/src/setup.py index 6189dbac7..5b0e39cf7 100644 --- a/src/setup.py +++ b/src/setup.py @@ -7,8 +7,7 @@ include_package_data=True, install_requires=[ 'aiohttp[speedups]==3.9.5', - 'aioboto3==12.4.0', - 'aiobotocore==2.12.3', + 'aioboto3==13.0.1', 'demjson3==3.0.5', 'fire==0.4.0', 'jira==2.0.0', From f80416d25bad8094980cc294c96acd379c2f2bb8 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 17:35:23 -0700 Subject: [PATCH 13/27] Remove iptables Docker installation --- Dockerfile.snowalert | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index 709adcd44..7928552af 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -4,11 +4,6 @@ WORKDIR /var/task RUN apt-get update -# https://repost.aws/questions/QUCFqv7OfoQlygJrmwfkJ24Q/various-aws-apis-fail-due-to-timeout -RUN apt-get install --reinstall -y iptables -RUN update-alternatives --set iptables /usr/sbin/iptables-legacy -RUN update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy - RUN apt-get install -y r-base RUN R -e "install.packages(c('dplyr', 'purrr', 'tidyr','MASS', 'tidyverse', 'broom','testthat'), repos = 'https://cloud.r-project.org')" From 6c0d2bd53a62230eb027941d4064a9fee5fa17d7 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 18:32:36 -0700 Subject: [PATCH 14/27] Move to non-slim Debian --- Dockerfile.snowalert | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index 7928552af..fec00761f 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -1,4 +1,4 @@ -FROM python:3.9-slim-bookworm +FROM python:3.9-bookworm WORKDIR /var/task From b84705b247a0c4e57ada9d6f65081f15cb2c35b0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 19:10:35 -0700 Subject: [PATCH 15/27] Add 5m sleep to Docker file --- Dockerfile.snowalert | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index fec00761f..03c55f674 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -19,4 +19,4 @@ COPY ./install ./install RUN PYTHONPATH='' pip install ./snowalert/src/ RUN PYTHONPATH='' pip install requests -CMD ./run all +CMD ./run all && sleep 300 From bad1ab376111a5f385d3ad5f2ed29a95e4333bed Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 22:53:48 -0700 Subject: [PATCH 16/27] Revert "Add 5m sleep to Docker file" This reverts commit b84705b247a0c4e57ada9d6f65081f15cb2c35b0. --- Dockerfile.snowalert | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index 03c55f674..fec00761f 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -19,4 +19,4 @@ COPY ./install ./install RUN PYTHONPATH='' pip install ./snowalert/src/ RUN PYTHONPATH='' pip install requests -CMD ./run all && sleep 300 +CMD ./run all From 3d883ceae9926475e4f51f7209aee4efe31e34ba Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 23:02:47 -0700 Subject: [PATCH 17/27] add retry on CredentialRetrievalError --- src/connectors/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 77cdf3f6b..20491fcff 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -7,6 +7,7 @@ import aioboto3 import boto3 from botocore.parsers import ResponseParserError +from botocore.exceptions import CredentialRetrievalError import yaml from requests import auth @@ -96,7 +97,7 @@ async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None return await try_aio_sts_assume_role( src_role_arn, dest_role_arn, dest_external_id ) - except ResponseParserError as e: + except (CredentialRetrievalError, ResponseParserError) as e: if attempt < 10: delay = int(1.5**attempt) + random.randint(0, 3) await asyncio.sleep(delay) From d03de7e604f077b59906f45045d4b91d947cb5d3 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 23:13:50 -0700 Subject: [PATCH 18/27] Revert "Move to non-slim Debian" This reverts commit 6c0d2bd53a62230eb027941d4064a9fee5fa17d7. --- Dockerfile.snowalert | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.snowalert b/Dockerfile.snowalert index fec00761f..7928552af 100644 --- a/Dockerfile.snowalert +++ b/Dockerfile.snowalert @@ -1,4 +1,4 @@ -FROM python:3.9-bookworm +FROM python:3.9-slim-bookworm WORKDIR /var/task From 22daf55eaf8841decc08f13dd4283a57f45b2a16 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 11 Jun 2024 23:59:42 -0700 Subject: [PATCH 19/27] fix retry logic and add log --- src/connectors/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 20491fcff..fad9ee3d2 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -92,7 +92,7 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): - for attempt in range(10): + for attempt in range(11): try: return await try_aio_sts_assume_role( src_role_arn, dest_role_arn, dest_external_id @@ -101,6 +101,7 @@ async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None if attempt < 10: delay = int(1.5**attempt) + random.randint(0, 3) await asyncio.sleep(delay) + log.warn('retrying', attempt, 'after', type(e)) else: raise From 2ccd24209bf2389194f965906d921f56c73fa876 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 12 Jun 2024 00:08:39 -0700 Subject: [PATCH 20/27] Update utils.py --- src/connectors/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index fad9ee3d2..a01d78eb1 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -11,7 +11,7 @@ import yaml from requests import auth -from runners.helpers import db +from runners.helpers import db, log from runners.helpers.dbconfig import ROLE as SA_ROLE From 8ece4698e15c1d203b19875c2432a9e5f222b348 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 12 Jun 2024 13:08:23 -0700 Subject: [PATCH 21/27] Update utils.py --- src/connectors/utils.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index a01d78eb1..f62972d18 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -92,16 +92,18 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): - for attempt in range(11): + for attempt in range(21): try: return await try_aio_sts_assume_role( src_role_arn, dest_role_arn, dest_external_id ) except (CredentialRetrievalError, ResponseParserError) as e: - if attempt < 10: - delay = int(1.5**attempt) + random.randint(0, 3) + if attempt < 20: + # attempt 10 has 28.9s wait + # attempt 19 has 597s wait + delay = int(1.4**attempt) + random.randint(0, 3) await asyncio.sleep(delay) - log.warn('retrying', attempt, 'after', type(e)) + log.warn(f'attempt {attempt}: {e}') else: raise From b98e37ebd93913a305fb2f9a4e79a12be27bebba Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 12 Jun 2024 13:30:44 -0700 Subject: [PATCH 22/27] Update utils.py --- src/connectors/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index f62972d18..2a0513d90 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -99,9 +99,11 @@ async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None ) except (CredentialRetrievalError, ResponseParserError) as e: if attempt < 20: - # attempt 10 has 28.9s wait - # attempt 19 has 597s wait - delay = int(1.4**attempt) + random.randint(0, 3) + # attempt 0 wait of 1 + randint(0,15) seconds + # attempt 10 wait of 28.9 + randint(0,15) seconds + # attempt 19 wait of 597 + randint(0,15) seconds + # total wait is about 30m + delay = int(1.4**attempt) + random.randint(0, 30) await asyncio.sleep(delay) log.warn(f'attempt {attempt}: {e}') else: From e813aeb811d482e6bfff5f33289eb38ad254d2ab Mon Sep 17 00:00:00 2001 From: Grace Tan Date: Mon, 17 Jun 2024 13:57:53 -0600 Subject: [PATCH 23/27] [GSE-4150] Update azure_collect.py to fix `KeyError: 'managedByExtended'` error (#675) * Update azure_collect.py to fix `KeyError: 'managedByExtended'` error * Update v1_9_6-v1_10_0.md --- migrations/v1_9_6-v1_10_0.md | 6 ++++++ src/connectors/azure_collect.py | 2 ++ 2 files changed, 8 insertions(+) diff --git a/migrations/v1_9_6-v1_10_0.md b/migrations/v1_9_6-v1_10_0.md index 7a50bb8b1..9204762ca 100644 --- a/migrations/v1_9_6-v1_10_0.md +++ b/migrations/v1_9_6-v1_10_0.md @@ -10,6 +10,12 @@ WHERE alert_id IS NULL ; ``` +# Update Azure disks table to add raw col + +```sql +ALTER TABLE azure_collect_disks ADD COLUMN raw VARIANT; +``` + # Update Azure Collect Pricings table to add raw col ```sql diff --git a/src/connectors/azure_collect.py b/src/connectors/azure_collect.py index a9d305443..f0bc7a8a6 100644 --- a/src/connectors/azure_collect.py +++ b/src/connectors/azure_collect.py @@ -259,6 +259,7 @@ def access_token_cache(cloud, client_id, tenant, secret, resource, _creds={}): ('tags', 'VARIANT'), ('type', 'STRING'), ('zones', 'VARIANT'), + ('raw', 'VARIANT'), ], # https://docs.microsoft.com/en-us/rest/api/virtualnetwork/networkinterfaces/listall 'network_interfaces': [ @@ -1492,6 +1493,7 @@ def connect(connection_name, options): 'tags': 'tags', 'type': 'type', 'zones': 'zones', + '*': 'raw', }, }, 'role_definitions': { From 1917feac19fdca54f5191731f56e43ac9aa714bf Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 24 Jun 2024 18:06:00 -0700 Subject: [PATCH 24/27] Reduce AWSIC ingestion speed getting throttled at metadata service returning header "X-Rate-Limit-Limit: 40.00" --- src/connectors/aws_collect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 677bc4a5c..52a128267 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -42,7 +42,7 @@ _SESSION_CACHE: dict = {} # see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html#throttling-limits -_REQUEST_PACE_PER_SECOND = 24 # depletes Throttling bucket of 100 at 4/s in 25s +_REQUEST_PACE_PER_SECOND = 12 # depletes Throttling bucket of 100 at 2/s in 50s _REQUEST_BATCH_SIZE = 600 # 100 in Throttling bucket + 500 replenished over 25s CONNECTION_OPTIONS = [ From ca79c580564bdd3708f5771cbf88d7770fdbd72a Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 24 Jun 2024 19:36:45 -0700 Subject: [PATCH 25/27] Update AWS Collect to reduce batch size --- src/connectors/aws_collect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 52a128267..22d64aa17 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -42,8 +42,8 @@ _SESSION_CACHE: dict = {} # see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html#throttling-limits -_REQUEST_PACE_PER_SECOND = 12 # depletes Throttling bucket of 100 at 2/s in 50s -_REQUEST_BATCH_SIZE = 600 # 100 in Throttling bucket + 500 replenished over 25s +_REQUEST_PACE_PER_SECOND = 24 # depletes Throttling bucket of 100 at 4/s in 25s +_REQUEST_BATCH_SIZE = 100 # 100 in Throttling bucket + 500 replenished over 25s CONNECTION_OPTIONS = [ { From 70793e21f41662a30339e9f2795b674b8099f255 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 7 Aug 2024 16:56:29 -0700 Subject: [PATCH 26/27] Fix AWS Inventory & Configuration Collection (#676) * fix awsic rate limiting and OOM errors - rate limit metadata separately - prune cache to fix oom errors - remove / abstract out inline retries * optimizations - use boto's retry mechanism for 429 responses - cache AssumeRole failures indefinitely - re-use sessions between regions - refactor API rate limits as AioRateLimit instances - increase batch size to 1000 * add pause in pagination --- src/connectors/aws_collect.py | 140 ++++++++++++++++++++++++---------- src/connectors/utils.py | 87 +++++++++++++++------ 2 files changed, 163 insertions(+), 64 deletions(-) diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 22d64aa17..80f83373b 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -24,13 +24,23 @@ from runners.helpers.dbconfig import DATA_SCHEMA, ROLE as SA_ROLE from runners.utils import format_exception_only, format_exception -from connectors.utils import aio_sts_assume_role, updated, yaml_dump, bytes_to_str +from connectors.utils import ( + aio_sts_assume_role, + updated, + yaml_dump, + bytes_to_str, + AioRateLimit, +) from runners.helpers import db, log AIO_CONFIG = AioConfig( read_timeout=600, connect_timeout=600, + # retries={ + # 'max_attempts': 100, + # 'mode': 'standard', + # }, ) AWS_ZONE = environ.get('SA_AWS_ZONE', 'aws') @@ -41,9 +51,15 @@ _SESSION_CACHE: dict = {} -# see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html#throttling-limits -_REQUEST_PACE_PER_SECOND = 24 # depletes Throttling bucket of 100 at 4/s in 25s -_REQUEST_BATCH_SIZE = 100 # 100 in Throttling bucket + 500 replenished over 25s +_REQUEST_BATCH_SIZE = 500 + +NEVER = datetime.now(pytz.utc) + timedelta(days=365 * 100) + +# metadata API RPS limit https://github.com/aws/amazon-ecs-agent/blob/master/README.md#:~:text=ECS_TASK_METADATA_RPS_LIMIT +metadata_rate_limit = AioRateLimit(pace_per_second=40) + +# e.g. https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html +api_rate_limits = {} CONNECTION_OPTIONS = [ { @@ -1693,34 +1709,78 @@ async def load_task_response(client, task): args['AccountId'] = task.account_id client_name, method_name = task.method.split('.', 1) + api_rate_limit = api_rate_limits.setdefault( + (task.account_id, client.meta.region_name, client_name), + AioRateLimit( + pace_per_second=API_METHOD_SPECS[task.method].get('rate_per_second', 5) + ), + ) try: if client.can_paginate(method_name): - async for page in client.get_paginator(method_name).paginate(**args): + + async def load_pages(): + pages = [] + async for page in api_rate_limit.iterate_with_wait( + client.get_paginator(method_name).paginate(**args), + ): + pages.append(page) + return pages + + pages = await api_rate_limit.retry(load_pages) + for page in pages: for x in process_aws_response(task, page): yield x else: + await api_rate_limit.wait() + await metadata_rate_limit.wait() for x in process_aws_response( task, await getattr(client, method_name)(**args) ): yield x + # todo: double check whether these should be retried instead of recording errors except (ClientError, DataNotFoundError, ServerTimeoutError) as e: for x in process_aws_response(task, e): yield x -async def get_session(account_arn, client_name=None): - session_key = (account_arn, client_name) - expiration, session = _SESSION_CACHE.get(session_key, (None, None)) +async def get_session(account_arn): + # prune cache to stop OOM errors in_10m = datetime.now(pytz.utc) + timedelta(minutes=10) - if expiration is None or expiration < in_10m: - expiration, session = _SESSION_CACHE[session_key] = await aio_sts_assume_role( - src_role_arn=AUDIT_ASSUMER_ARN, - dest_role_arn=account_arn, - dest_external_id=READER_EID, - ) - return session + for k, (expiration, _) in list(_SESSION_CACHE.items()): + if expiration < in_10m: + del _SESSION_CACHE[k] + + expiration, value = _SESSION_CACHE.get(account_arn, (None, None)) + + while expiration is NEVER and value is None: + # another coroutine is working on this + await asyncio.sleep(0.1) + expiration, value = _SESSION_CACHE.get(account_arn, (None, None)) + + if expiration is None: + # print(f'session cache MISS for {account_arn}') + _SESSION_CACHE[account_arn] = (NEVER, None) + try: + await metadata_rate_limit.wait(cost=12) + expiration, value = _SESSION_CACHE[ + account_arn + ] = await metadata_rate_limit.retry( + lambda: aio_sts_assume_role( + src_role_arn=AUDIT_ASSUMER_ARN, + dest_role_arn=account_arn, + dest_external_id=READER_EID, + aio_config=AIO_CONFIG, + ), + ) + + except ClientError as e: + expiration, value = _SESSION_CACHE[account_arn] = (NEVER, e) + + # print(f'session cache SET for {account_arn}') + + return (None, value) if expiration is NEVER else (value, None) async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None]: @@ -1729,31 +1789,34 @@ async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None] client_name, method_name = task.method.split('.', 1) - try: - session = await get_session(account_arn) + session, e = await get_session(account_arn) + + if session: + await metadata_rate_limit.wait() async with session.client(client_name, config=AIO_CONFIG) as client: if hasattr(client, 'describe_regions'): + await metadata_rate_limit.wait() response = await client.describe_regions() region_names = [region['RegionName'] for region in response['Regions']] else: region_names = API_METHOD_SPECS[task.method].get('regions', [None]) - for rn in region_names: - session = await get_session(account_arn, client_name) - async with session.client( - client_name, region_name=rn, config=AIO_CONFIG - ) as client: - async for response in load_task_response(client, task): - if type(response) is DBEntry: - if rn is not None: - response.entity['region'] = rn - yield (task.method, response.entity) - elif type(response) is CollectTask: - add_task(response) - else: - log.info('log response', response) - - except ClientError as e: + for rn in region_names: + await metadata_rate_limit.wait() + async with session.client( + client_name, region_name=rn, config=AIO_CONFIG + ) as client: + async for response in load_task_response(client, task): + if type(response) is DBEntry: + if rn is not None: + response.entity['region'] = rn + yield (task.method, response.entity) + elif type(response) is CollectTask: + add_task(response) + else: + log.info('log response', response) + + else: # record missing auditor role as empty account summary yield ( task.method, @@ -1779,11 +1842,7 @@ def insert_list(name, values, table_name=None, dryrun=False): return db.insert(table_name, values, dryrun=dryrun) -async def aws_collect_task(task, wait=0.0, add_task=None): - if wait: - await asyncio.sleep(wait) - - # log.info(f'processing {task}') +async def aws_collect_task(task, add_task=None): result_lists = defaultdict(list) async for k, v in process_task(task, add_task): result_lists[k].append(v) @@ -1859,6 +1918,7 @@ async def aioingest(table_name, options, dryrun=False): dest_external_id=READER_EID, ) + await metadata_rate_limit.wait() async with session.client('organizations') as org_client: accounts = [ a.entity @@ -1892,9 +1952,7 @@ def add_task(t): while collection_tasks: coroutines = [ - aws_collect_task( - t, wait=(i / _REQUEST_PACE_PER_SECOND), add_task=add_task - ) + aws_collect_task(t, add_task=add_task) for i, t in enumerate(collection_tasks[:_REQUEST_BATCH_SIZE]) ] del collection_tasks[:_REQUEST_BATCH_SIZE] diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 2a0513d90..d40cd3613 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -2,12 +2,13 @@ from inspect import signature import random import multiprocessing as mp +import time from typing import Any import aioboto3 import boto3 from botocore.parsers import ResponseParserError -from botocore.exceptions import CredentialRetrievalError +from botocore.exceptions import CredentialRetrievalError, ConnectionClosedError import yaml from requests import auth @@ -15,6 +16,63 @@ from runners.helpers.dbconfig import ROLE as SA_ROLE +class AioRateLimit: + default_retry_exceptions = ( + ResponseParserError, + CredentialRetrievalError, + ConnectionClosedError, + ) + default_retry_times = 60 + default_seconds_between_retries = 1 + default_exp_base = 0 + + def __init__(self, pace_per_second=50): + self.pace_per_second = pace_per_second + self.interval = 1 / pace_per_second + self.next_allowed_time = time.monotonic() + + async def wait(self, cost=1): + now = time.monotonic() + self.next_allowed_time += self.interval * cost + + if self.next_allowed_time > now: + await asyncio.sleep(self.next_allowed_time - now) + else: + self.next_allowed_time = now + + async def iterate_with_wait(self, async_iterable, cost=1): + iterator = async_iterable.__aiter__() + while True: + await self.wait(cost) + try: + item = await iterator.__anext__() + except StopAsyncIteration: + break + yield item + + async def retry( + self, + coroutine_factory, + cost=1, + times=default_retry_times, + seconds_between_retries=default_seconds_between_retries, + exp_base=default_exp_base, + retry_exceptions=default_retry_exceptions, + ): + for i in range(times + 1): + try: + await self.wait(cost) + return await coroutine_factory() + except retry_exceptions as e: + if i < times: + backoff = exp_base**i if exp_base > 0 else 0 + sleep_time = seconds_between_retries + backoff + # print(f'retry after {sleep_time}s because of {type(e)}') + await asyncio.sleep(sleep_time) + else: + raise + + class Bearer(auth.AuthBase): def __init__(self, token): self.token = token @@ -91,28 +149,11 @@ def sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): ) -async def aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): - for attempt in range(21): - try: - return await try_aio_sts_assume_role( - src_role_arn, dest_role_arn, dest_external_id - ) - except (CredentialRetrievalError, ResponseParserError) as e: - if attempt < 20: - # attempt 0 wait of 1 + randint(0,15) seconds - # attempt 10 wait of 28.9 + randint(0,15) seconds - # attempt 19 wait of 597 + randint(0,15) seconds - # total wait is about 30m - delay = int(1.4**attempt) + random.randint(0, 30) - await asyncio.sleep(delay) - log.warn(f'attempt {attempt}: {e}') - else: - raise - - -async def try_aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id=None): +async def aio_sts_assume_role( + src_role_arn, dest_role_arn, dest_external_id=None, aio_config=None +): session_name = ''.join(random.choice('0123456789ABCDEF') for i in range(16)) - async with aioboto3.Session().client('sts') as sts: + async with aioboto3.Session().client('sts', config=aio_config) as sts: src_role = await sts.assume_role( RoleArn=src_role_arn, RoleSessionName=session_name ) @@ -120,7 +161,7 @@ async def try_aio_sts_assume_role(src_role_arn, dest_role_arn, dest_external_id= aws_access_key_id=src_role['Credentials']['AccessKeyId'], aws_secret_access_key=src_role['Credentials']['SecretAccessKey'], aws_session_token=src_role['Credentials']['SessionToken'], - ).client('sts') as sts_client: + ).client('sts', config=aio_config) as sts_client: sts_role = await ( sts_client.assume_role( RoleArn=dest_role_arn, From fee43a5d3f0010b90bef15d8001d60a09685365b Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 19 Aug 2024 09:02:58 -0700 Subject: [PATCH 27/27] Move down storage_account_containers Putting them last because they currently time out after a few days of ingestion --- src/connectors/azure_collect.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connectors/azure_collect.py b/src/connectors/azure_collect.py index f0bc7a8a6..e05fd30b0 100644 --- a/src/connectors/azure_collect.py +++ b/src/connectors/azure_collect.py @@ -1404,10 +1404,6 @@ def connect(connection_name, options): 'type': 'type', }, 'children': [ - { - 'kind': 'storage_accounts_containers', - 'args': {'subscriptionId': 'subscription_id', 'accountName': 'name'}, - }, { 'kind': 'queue_services', 'args': { @@ -1424,6 +1420,10 @@ def connect(connection_name, options): 'accountName': 'name', }, }, + { + 'kind': 'storage_accounts_containers', + 'args': {'subscriptionId': 'subscription_id', 'accountName': 'name'}, + }, ], }, 'storage_accounts_containers': {