Skip to content

Commit

Permalink
OS-7112. Improve CSV based import performance
Browse files Browse the repository at this point in the history
## Description

Now we don't reimport the entire report: rewrite expenses only for last
5 days before last import date and skip the rest

## Related issue number

OS-7112

## Checklist

* [x] The pull request title is a good summary of the changes
* [ ] Unit tests for the changes exist
* [ ] New and existing unit tests pass locally
  • Loading branch information
sd-hystax authored Dec 7, 2023
1 parent 7341d02 commit 90c658b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
16 changes: 12 additions & 4 deletions diworker/diworker/importers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pyarrow.parquet as pq

LOG = logging.getLogger(__name__)
CHUNK_SIZE = 500
CHUNK_SIZE = 200
GZIP_ENDING = '.gz'
IGNORE_EXPENSE_TYPES = ['Credit']
tag_prefixes = ['resource_tags_aws_', 'resource_tags_user_']
Expand Down Expand Up @@ -282,6 +282,8 @@ def load_csv_report(self, report_path, account_id_ca_id_map,
start_date = self._datetime_from_expense(
row, 'lineItem/UsageStartDate').replace(
hour=0, minute=0, second=0)
if start_date < self.min_date_import_threshold:
continue
row['start_date'] = start_date
row['end_date'] = self._datetime_from_expense(
row, 'lineItem/UsageEndDate')
Expand Down Expand Up @@ -334,6 +336,9 @@ def load_parquet_report(self, report_path, account_id_ca_id_map,
elif field_name == 'lineItem/UsageStartDate':
start_date = self._datetime_from_value(value).replace(
hour=0, minute=0, second=0)
if start_date < self.min_date_import_threshold:
skipped_rows.add(expense_num)
continue
chunk[expense_num]['start_date'] = start_date
elif field_name == 'lineItem/UsageEndDate':
chunk[expense_num]['end_date'] = self._datetime_from_value(
Expand Down Expand Up @@ -620,6 +625,7 @@ def get_resource_ids(self, cloud_account_id, billing_period):
filters = {
'cloud_account_id': cloud_account_id,
'resource_id': {'$exists': True, '$ne': None},
'start_date': {'$gte': self.min_date_import_threshold}
}
if billing_period:
filters['bill/BillingPeriodStartDate'] = billing_period
Expand All @@ -641,9 +647,11 @@ def get_raw_expenses_by_filters(self, filters):
{'$project': {"root": 0}}
], allowDiskUse=True)

@staticmethod
def _get_billing_period_filters(billing_period):
return {'bill/BillingPeriodStartDate': billing_period}
def _get_billing_period_filters(self, billing_period):
return {
'bill/BillingPeriodStartDate': billing_period,
'start_date': {'$gte': self.min_date_import_threshold}
}

@staticmethod
def set_raw_chunk(expenses):
Expand Down
13 changes: 11 additions & 2 deletions diworker/diworker/importers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import requests
import shutil
import uuid
from functools import cached_property

from collections import defaultdict
from pymongo import UpdateOne
Expand All @@ -16,6 +17,7 @@

LOG = logging.getLogger(__name__)
CHUNK_SIZE = 200
CSV_REWRITE_DAYS = 5


class BaseReportImporter:
Expand Down Expand Up @@ -354,8 +356,7 @@ def get_raw_expenses_by_filters(self, filters):
}},
], allowDiskUse=True)

@staticmethod
def _get_billing_period_filters(period_start):
def _get_billing_period_filters(self, period_start):
return {'start_date': {'$gte': period_start}}

def _generate_clean_records(self, resource_ids, cloud_account_id,
Expand Down Expand Up @@ -597,6 +598,14 @@ def __init__(self, *args, **kwargs):
self.last_import_modified_at = self.cloud_acc.get(
'last_import_modified_at', 0)

@cached_property
def min_date_import_threshold(self) -> datetime:
last_import_dt = datetime.fromtimestamp(
self.cloud_acc.get('last_import_modified_at', 0), tz=timezone.utc)
return last_import_dt.replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=CSV_REWRITE_DAYS)

def detect_period_start(self):
pass

Expand Down
15 changes: 11 additions & 4 deletions diworker/diworker/importers/nebius.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def load_csv_report(self, report_path, account_id_ca_id_map,

self.detected_cloud_accounts.add(cloud_account_id)
date = self._datetime_from_expense(row, 'date')
if date < self.min_date_import_threshold:
continue
row['start_date'] = date.replace(
hour=0, minute=0, second=0, microsecond=0)
row['end_date'] = date.replace(
Expand All @@ -140,7 +142,10 @@ def collect_tags(self, expense):
return tags

def get_resource_ids(self, cloud_account_id, billing_period):
base_filters = {'cloud_account_id': cloud_account_id}
base_filters = {
'cloud_account_id': cloud_account_id,
'start_date': {'$gte': self.min_date_import_threshold}
}
if billing_period:
base_filters['date'] = billing_period
resource_ids = self.mongo_raw.aggregate([
Expand All @@ -149,9 +154,11 @@ def get_resource_ids(self, cloud_account_id, billing_period):
], allowDiskUse=True)
return [x['_id'] for x in resource_ids]

@staticmethod
def _get_billing_period_filters(billing_period):
return {'date': billing_period}
def _get_billing_period_filters(self, billing_period):
return {
'date': billing_period,
'start_date': {'$gte': self.min_date_import_threshold}
}

def get_resource_type(self, expense):
sku_name = expense['sku_name']
Expand Down

0 comments on commit 90c658b

Please sign in to comment.