Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert to last know good docker build #668

Open
wants to merge 27 commits into
base: bkou_revert2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a80713c
GSE-3635: Fix AWSIC logging and status=408 errors (#664)
sfc-gh-afedorov May 14, 2024
a66a817
Update base image to Python 3.8 on Debian bookworm (#665)
sfc-gh-afedorov May 15, 2024
473d573
Revert "Update base image to Python 3.8 on Debian bookworm (#665)" (#…
sfc-gh-gtan May 15, 2024
b7b5126
Revert "GSE-3635: Fix AWSIC logging and status=408 errors (#664)" (#666)
sfc-gh-gtan May 15, 2024
88394be
Update README.md (#669)
sfc-gh-gtan May 15, 2024
3b78ee1
Update Python and pip dependencies & fix tests (#670)
sfc-gh-afedorov May 16, 2024
60b6bd2
[GSE-4024] Tag image with commit SHA after build, and push to reposit…
sfc-gh-gtan May 21, 2024
224a315
missing bash (#672)
sfc-gh-gtan May 21, 2024
f1f0a8f
Reapply "GSE-3635: Fix AWSIC logging and status=408 errors (#664)" (#…
sfc-gh-afedorov Jun 3, 2024
952cbdd
Update Dockerfile.snowalert (#674)
sfc-gh-afedorov Jun 4, 2024
49e91c8
Update Dockerfile.snowalert
sfc-gh-afedorov Jun 4, 2024
90140ea
Bump aioboto3 to 13.0.1
sfc-gh-afedorov Jun 11, 2024
f80416d
Remove iptables Docker installation
sfc-gh-afedorov Jun 12, 2024
6c0d2bd
Move to non-slim Debian
sfc-gh-afedorov Jun 12, 2024
b84705b
Add 5m sleep to Docker file
sfc-gh-afedorov Jun 12, 2024
bad1ab3
Revert "Add 5m sleep to Docker file"
sfc-gh-afedorov Jun 12, 2024
3d883ce
add retry on CredentialRetrievalError
sfc-gh-afedorov Jun 12, 2024
d03de7e
Revert "Move to non-slim Debian"
sfc-gh-afedorov Jun 12, 2024
22daf55
fix retry logic and add log
sfc-gh-afedorov Jun 12, 2024
2ccd242
Update utils.py
sfc-gh-afedorov Jun 12, 2024
8ece469
Update utils.py
sfc-gh-afedorov Jun 12, 2024
b98e37e
Update utils.py
sfc-gh-afedorov Jun 12, 2024
e813aeb
[GSE-4150] Update azure_collect.py to fix `KeyError: 'managedByExtend…
sfc-gh-gtan Jun 17, 2024
1917fea
Reduce AWSIC ingestion speed
sfc-gh-afedorov Jun 25, 2024
ca79c58
Update AWS Collect to reduce batch size
sfc-gh-afedorov Jun 25, 2024
70793e2
Fix AWS Inventory & Configuration Collection (#676)
sfc-gh-afedorov Aug 7, 2024
fee43a5
Move down storage_account_containers
sfc-gh-afedorov Aug 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ repos:
rev: stable
hooks:
- id: black
language_version: python3.7
language_version: python3
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pip install -e .

# to start runners
export $(cat snowalert-$SNOWFLAKE_ACCOUNT.envs | xargs)
python runners/run.py all
python3 runners/run.py all
~~~

If you'd like to run the test suite, please create a separate ENVS file that contains these additional variables used to test the installer:
Expand All @@ -41,6 +41,7 @@ If you save this to `snowalert-$SNOWFLAKE_ACCOUNT.testing.envs`, you can run the

~~~
# test runners
pip install -e .\[dev\]
export $(cat snowalert-$SNOWFLAKE_ACCOUNT.testing.envs | xargs)
pytest -vv
~~~
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile.snowalert
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
FROM python:3.7-slim-stretch
FROM python:3.9-slim-bookworm

WORKDIR /var/task

RUN apt-get update

RUN apt-get install -y r-base
RUN R -e "install.packages(c('dplyr', 'purrr', 'tidyr','MASS', 'tidyverse', 'broom','testthat'), repos = 'https://cloud.r-project.org')"

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ Ready? Let's [get started!](https://docs.snowalert.com/getting-started "SnowAler
## License

This project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details.

3 changes: 3 additions & 0 deletions hooks/post_push
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
docker tag $IMAGE_NAME $DOCKER_REPO:$SOURCE_COMMIT
docker push $DOCKER_REPO:$SOURCE_COMMIT
6 changes: 6 additions & 0 deletions migrations/v1_9_6-v1_10_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ WHERE alert_id IS NULL
;
```

# Update Azure disks table to add raw col

```sql
ALTER TABLE azure_collect_disks ADD COLUMN raw VARIANT;
```

# Update Azure Collect Pricings table to add raw col

```sql
Expand Down
142 changes: 99 additions & 43 deletions src/connectors/aws_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,23 @@
from runners.helpers.dbconfig import DATA_SCHEMA, ROLE as SA_ROLE
from runners.utils import format_exception_only, format_exception

from connectors.utils import aio_sts_assume_role, updated, yaml_dump, bytes_to_str
from connectors.utils import (
aio_sts_assume_role,
updated,
yaml_dump,
bytes_to_str,
AioRateLimit,
)
from runners.helpers import db, log


AIO_CONFIG = AioConfig(
read_timeout=600,
connect_timeout=600,
# retries={
# 'max_attempts': 100,
# 'mode': 'standard',
# },
)

AWS_ZONE = environ.get('SA_AWS_ZONE', 'aws')
Expand All @@ -41,9 +51,15 @@

_SESSION_CACHE: dict = {}

# see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html#throttling-limits
_REQUEST_PACE_PER_SECOND = 24 # depletes Throttling bucket of 100 at 4/s in 25s
_REQUEST_BATCH_SIZE = 600 # 100 in Throttling bucket + 500 replenished over 25s
_REQUEST_BATCH_SIZE = 500

NEVER = datetime.now(pytz.utc) + timedelta(days=365 * 100)

# metadata API RPS limit https://github.com/aws/amazon-ecs-agent/blob/master/README.md#:~:text=ECS_TASK_METADATA_RPS_LIMIT
metadata_rate_limit = AioRateLimit(pace_per_second=40)

# e.g. https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html
api_rate_limits = {}

CONNECTION_OPTIONS = [
{
Expand Down Expand Up @@ -1693,35 +1709,78 @@ async def load_task_response(client, task):
args['AccountId'] = task.account_id

client_name, method_name = task.method.split('.', 1)
api_rate_limit = api_rate_limits.setdefault(
(task.account_id, client.meta.region_name, client_name),
AioRateLimit(
pace_per_second=API_METHOD_SPECS[task.method].get('rate_per_second', 5)
),
)

try:
if client.can_paginate(method_name):
async for page in client.get_paginator(method_name).paginate(**args):

async def load_pages():
pages = []
async for page in api_rate_limit.iterate_with_wait(
client.get_paginator(method_name).paginate(**args),
):
pages.append(page)
return pages

pages = await api_rate_limit.retry(load_pages)
for page in pages:
for x in process_aws_response(task, page):
yield x
else:
await api_rate_limit.wait()
await metadata_rate_limit.wait()
for x in process_aws_response(
task, await getattr(client, method_name)(**args)
):
yield x

# todo: double check whether these should be retried instead of recording errors
except (ClientError, DataNotFoundError, ServerTimeoutError) as e:
log.info(format_exception_only(e))
for x in process_aws_response(task, e):
yield x


async def get_session(account_arn, client_name=None):
session_key = (account_arn, client_name)
expiration, session = _SESSION_CACHE.get(session_key, (None, None))
async def get_session(account_arn):
# prune cache to stop OOM errors
in_10m = datetime.now(pytz.utc) + timedelta(minutes=10)
if expiration is None or expiration < in_10m:
expiration, session = _SESSION_CACHE[session_key] = await aio_sts_assume_role(
src_role_arn=AUDIT_ASSUMER_ARN,
dest_role_arn=account_arn,
dest_external_id=READER_EID,
)
return session
for k, (expiration, _) in list(_SESSION_CACHE.items()):
if expiration < in_10m:
del _SESSION_CACHE[k]

expiration, value = _SESSION_CACHE.get(account_arn, (None, None))

while expiration is NEVER and value is None:
# another coroutine is working on this
await asyncio.sleep(0.1)
expiration, value = _SESSION_CACHE.get(account_arn, (None, None))

if expiration is None:
# print(f'session cache MISS for {account_arn}')
_SESSION_CACHE[account_arn] = (NEVER, None)
try:
await metadata_rate_limit.wait(cost=12)
expiration, value = _SESSION_CACHE[
account_arn
] = await metadata_rate_limit.retry(
lambda: aio_sts_assume_role(
src_role_arn=AUDIT_ASSUMER_ARN,
dest_role_arn=account_arn,
dest_external_id=READER_EID,
aio_config=AIO_CONFIG,
),
)

except ClientError as e:
expiration, value = _SESSION_CACHE[account_arn] = (NEVER, e)

# print(f'session cache SET for {account_arn}')

return (None, value) if expiration is NEVER else (value, None)


async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None]:
Expand All @@ -1730,33 +1789,35 @@ async def process_task(task, add_task) -> AsyncGenerator[Tuple[str, dict], None]

client_name, method_name = task.method.split('.', 1)

try:
session = await get_session(account_arn)
session, e = await get_session(account_arn)

if session:
await metadata_rate_limit.wait()
async with session.client(client_name, config=AIO_CONFIG) as client:
if hasattr(client, 'describe_regions'):
await metadata_rate_limit.wait()
response = await client.describe_regions()
region_names = [region['RegionName'] for region in response['Regions']]
else:
region_names = API_METHOD_SPECS[task.method].get('regions', [None])

for rn in region_names:
session = await get_session(account_arn, client_name)
async with session.client(
client_name, region_name=rn, config=AIO_CONFIG
) as client:
async for response in load_task_response(client, task):
if type(response) is DBEntry:
if rn is not None:
response.entity['region'] = rn
yield (task.method, response.entity)
elif type(response) is CollectTask:
add_task(response)
else:
log.info('log response', response)

except ClientError as e:
for rn in region_names:
await metadata_rate_limit.wait()
async with session.client(
client_name, region_name=rn, config=AIO_CONFIG
) as client:
async for response in load_task_response(client, task):
if type(response) is DBEntry:
if rn is not None:
response.entity['region'] = rn
yield (task.method, response.entity)
elif type(response) is CollectTask:
add_task(response)
else:
log.info('log response', response)

else:
# record missing auditor role as empty account summary
log.info(format_exception_only(e))
yield (
task.method,
updated(
Expand All @@ -1781,11 +1842,7 @@ def insert_list(name, values, table_name=None, dryrun=False):
return db.insert(table_name, values, dryrun=dryrun)


async def aws_collect_task(task, wait=0.0, add_task=None):
if wait:
await asyncio.sleep(wait)

# log.info(f'processing {task}')
async def aws_collect_task(task, add_task=None):
result_lists = defaultdict(list)
async for k, v in process_task(task, add_task):
result_lists[k].append(v)
Expand Down Expand Up @@ -1861,6 +1918,7 @@ async def aioingest(table_name, options, dryrun=False):
dest_external_id=READER_EID,
)

await metadata_rate_limit.wait()
async with session.client('organizations') as org_client:
accounts = [
a.entity
Expand Down Expand Up @@ -1894,9 +1952,7 @@ def add_task(t):

while collection_tasks:
coroutines = [
aws_collect_task(
t, wait=(i / _REQUEST_PACE_PER_SECOND), add_task=add_task
)
aws_collect_task(t, add_task=add_task)
for i, t in enumerate(collection_tasks[:_REQUEST_BATCH_SIZE])
]
del collection_tasks[:_REQUEST_BATCH_SIZE]
Expand Down
10 changes: 6 additions & 4 deletions src/connectors/azure_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def access_token_cache(cloud, client_id, tenant, secret, resource, _creds={}):
('tags', 'VARIANT'),
('type', 'STRING'),
('zones', 'VARIANT'),
('raw', 'VARIANT'),
],
# https://docs.microsoft.com/en-us/rest/api/virtualnetwork/networkinterfaces/listall
'network_interfaces': [
Expand Down Expand Up @@ -1403,10 +1404,6 @@ def connect(connection_name, options):
'type': 'type',
},
'children': [
{
'kind': 'storage_accounts_containers',
'args': {'subscriptionId': 'subscription_id', 'accountName': 'name'},
},
{
'kind': 'queue_services',
'args': {
Expand All @@ -1423,6 +1420,10 @@ def connect(connection_name, options):
'accountName': 'name',
},
},
{
'kind': 'storage_accounts_containers',
'args': {'subscriptionId': 'subscription_id', 'accountName': 'name'},
},
],
},
'storage_accounts_containers': {
Expand Down Expand Up @@ -1492,6 +1493,7 @@ def connect(connection_name, options):
'tags': 'tags',
'type': 'type',
'zones': 'zones',
'*': 'raw',
},
},
'role_definitions': {
Expand Down
Loading