Skip to content

Commit

Permalink
Merge pull request #1033 from SEKOIA-IO/feature/salesforce_daily_logs…
Browse files Browse the repository at this point in the history
…_support

Feature: Salesforce Support Daily Logs (48)
  • Loading branch information
vg-svitla authored Jul 23, 2024
2 parents 8ad393e + 946fcd5 commit 9158210
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 408 deletions.
6 changes: 6 additions & 0 deletions Salesforce/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## 2024-05-28 - 1.6.1

### Changed

- Add an option to fetch `Daily` logs

## 2024-05-28 - 1.6.0

### Changed
Expand Down
26 changes: 21 additions & 5 deletions Salesforce/client/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import csv
from contextlib import asynccontextmanager
from datetime import datetime
from enum import Enum
from typing import Any, AsyncGenerator, Dict, Tuple
from urllib.parse import urlencode

Expand All @@ -17,6 +18,13 @@
from .token_refresher import SalesforceTokenRefresher


class LogType(Enum):
"""Salesforce event types."""

DAILY = "Daily"
HOURLY = "Hourly"


class SalesforceHttpClient(object):
"""Class for Salesforce Http client."""

Expand Down Expand Up @@ -101,28 +109,33 @@ def _query_to_http_param(cls, query: str) -> str:
return " ".join([line.strip() for line in query.strip().splitlines()]).replace(" ", "+").replace(",", "+,")

@staticmethod
def _log_files_query(start_from: datetime | None = None) -> str:
def _log_files_query(start_from: datetime | None = None, log_type: LogType | None = None) -> str:
"""
Query to get log files.
Docs for EventLogFile:
https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_eventlogfile.htm
Docs for query with CreatedDate filter
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/event_log_file_hourly_overview.htm
Value of filter criterion for field 'CreatedDate' must be of type dateTime and should not be enclosed in quotes.
Args:
start_from: datetime | None
log_type: LogType | None
Returns:
str:
"""
date_filter = "AND CreatedDate > {0}".format(start_from.strftime("%Y-%m-%dT%H:%M:%SZ")) if start_from else ""
result_log_type = log_type if log_type else LogType.HOURLY

return """
SELECT Id, EventType, LogFile, LogDate, CreatedDate, LogFileLength
FROM EventLogFile WHERE Interval = \'Hourly\' {0}
FROM EventLogFile WHERE Interval = \'{0}\' {1}
""".format(
date_filter
result_log_type.value, date_filter
)

def _request_url_with_query(self, query: str) -> URL:
Expand Down Expand Up @@ -186,12 +199,15 @@ async def _handle_error_response(cls, response: ClientResponse) -> None:

raise Exception(error_msg)

async def get_log_files(self, start_from: datetime | None = None) -> SalesforceEventLogFilesResponse:
async def get_log_files(
self, start_from: datetime | None = None, log_type: LogType | None = None
) -> SalesforceEventLogFilesResponse:
"""
Get log files from Salesforce.
Args:
start_from: datetime | None
log_type: LogType | None
Raises:
ValueError: Salesforce response cannot be processed
Expand All @@ -201,7 +217,7 @@ async def get_log_files(self, start_from: datetime | None = None) -> SalesforceE
"""
logger.info("Getting log files from Salesforce. Start date is {0}", start_from)

query = self._log_files_query(start_from)
query = self._log_files_query(start_from, log_type)

url = self._request_url_with_query(query)
headers = await self._request_headers()
Expand Down
10 changes: 10 additions & 0 deletions Salesforce/connector_salesforce_events.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@
"type": "integer",
"description": "The max size of chunks for the batch processing",
"default": 1000
},
"frequency": {
"type": "integer",
"description": "Batch frequency in seconds",
"default": 600
},
"fetch_daily_logs": {
"type": "boolean",
"description": "Fetch daily logs. By default, it will fetch Hourly logs",
"default": false
}
},
"required": [
Expand Down
2 changes: 1 addition & 1 deletion Salesforce/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@
"name": "Salesforce",
"uuid": "f811e134-2548-11ee-be56-0242ac120002",
"slug": "salesforce",
"version": "1.6.0"
"version": "1.6.1"
}
824 changes: 428 additions & 396 deletions Salesforce/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Salesforce/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ Faker = "^13.12.0"
aioresponses = "^0.7.4"
autoflake = "^1.4"
isort = "^5.10.1"
black = "^22.3.0"
mypy = "^0.960"
black = { version = "^24.3.0", extras = ["colorama"] }
mypy = "^0.991"
types-aiofiles = "^23.1.0.4"
types-python-dateutil = "^2.8.19.13"
poethepoet = { version = "^0.16.5", extras = ["poetry_plugin"] }
Expand Down
10 changes: 8 additions & 2 deletions Salesforce/salesforce/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sekoia_automation.connector import DefaultConnectorConfiguration
from sekoia_automation.storage import PersistentJSON

from client.http_client import SalesforceHttpClient
from client.http_client import LogType, SalesforceHttpClient
from salesforce import SalesforceModule
from salesforce.metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, OUTCOMING_EVENTS
from utils.file_utils import csv_file_as_rows, delete_file
Expand All @@ -22,6 +22,12 @@ class SalesforceConnectorConfig(DefaultConnectorConfiguration):
"""SalesforceConnector configuration."""

frequency: int = 600
fetch_daily_logs: bool = False

@property
def log_type(self) -> LogType:
"""Get log type."""
return LogType.DAILY if self.fetch_daily_logs else LogType.HOURLY


class SalesforceConnector(AsyncConnector):
Expand Down Expand Up @@ -95,7 +101,7 @@ async def get_salesforce_events(self) -> list[str]:
datetime: last event date
"""
_last_event_date = self.last_event_date
log_files = await self.salesforce_client.get_log_files(_last_event_date)
log_files = await self.salesforce_client.get_log_files(_last_event_date, self.configuration.log_type)

logger.info(
"Found {count} log files to process since {date}",
Expand Down
2 changes: 1 addition & 1 deletion Salesforce/salesforce/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from prometheus_client import Counter, Histogram, Gauge
from prometheus_client import Counter, Gauge, Histogram

# Declare common prometheus metrics
prom_namespace = "symphony_module_common"
Expand Down
34 changes: 33 additions & 1 deletion Salesforce/tests/client/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from aiohttp import ClientResponse
from aioresponses import aioresponses

from client.http_client import SalesforceHttpClient
from client.http_client import LogType, SalesforceHttpClient
from client.schemas.log_file import EventLogFile, SalesforceEventLogFilesResponse
from utils.file_utils import delete_file

Expand Down Expand Up @@ -65,6 +65,38 @@ async def test_salesforce_http_client_log_files_query_1():
assert query.strip() == expected_query


@pytest.mark.asyncio
async def test_salesforce_http_client_log_files_query_2():
"""Test SalesforceHttpClient._log_files_query."""
query = SalesforceHttpClient._log_files_query(
start_from=datetime.datetime.strptime("2023-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
log_type=LogType.DAILY,
)

expected_query = """
SELECT Id, EventType, LogFile, LogDate, CreatedDate, LogFileLength
FROM EventLogFile WHERE Interval = \'Daily\' AND CreatedDate > 2023-01-01T00:00:00Z
""".strip()

assert query.strip() == expected_query


@pytest.mark.asyncio
async def test_salesforce_http_client_log_files_query_2():
"""Test SalesforceHttpClient._log_files_query."""
query = SalesforceHttpClient._log_files_query(
start_from=datetime.datetime.strptime("2023-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
log_type=LogType.HOURLY,
)

expected_query = """
SELECT Id, EventType, LogFile, LogDate, CreatedDate, LogFileLength
FROM EventLogFile WHERE Interval = \'Hourly\' AND CreatedDate > 2023-01-01T00:00:00Z
""".strip()

assert query.strip() == expected_query


@pytest.mark.asyncio
async def test_salesforce_http_client_request_url_with_query(session_faker):
"""
Expand Down
71 changes: 71 additions & 0 deletions Salesforce/tests/salesforce/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from aioresponses import aioresponses
from sekoia_automation import constants

from client.http_client import LogType
from client.schemas.log_file import EventLogFile, SalesforceEventLogFilesResponse
from salesforce import SalesforceModule
from salesforce.connector import SalesforceConnector, SalesforceConnectorConfig
Expand Down Expand Up @@ -343,3 +344,73 @@ async def test_salesforce_connector_get_salesforce_events_1(
[expected_result.extend(pushed_events_ids) for _ in range(1, len(csv_content.splitlines()))]

assert result == expected_result


@pytest.mark.asyncio
async def test_salesforce_connector_get_salesforce_events_2(
connector: SalesforceConnector, session_faker, http_token, csv_content, salesforce_url, pushed_events_ids
):
"""
Test salesforce connector get salesforce events.
Args:
connector: SalesforceConnector
session_faker: Faker
http_token: HttpToken
csv_content: str
salesforce_url: str
"""
current_date = datetime.now(timezone.utc).replace(microsecond=0)
log_file_date = current_date - timedelta(days=1)
connector.configuration.fetch_daily_logs = True

# Try to put last event date higher to be 1 day ahead of the log file date
with connector.context as cache:
cache["last_event_date"] = (current_date + timedelta(days=1)).isoformat()

event_log_file = EventLogFile(
Id=session_faker.pystr(),
EventType=session_faker.pystr(),
LogFile=session_faker.pystr(),
LogDate=log_file_date.isoformat(),
CreatedDate=log_file_date.isoformat(),
LogFileLength=1024 * 1024 * 1024,
)

log_files_response_success = SalesforceEventLogFilesResponse(totalSize=1, done=True, records=[event_log_file])

token_data = http_token.dict()
token_data["id"] = token_data["tid"]

with aioresponses() as mocked_responses:
mocked_responses.post(
"{0}/services/oauth2/token?grant_type=client_credentials".format(salesforce_url),
status=200,
payload=token_data,
)

query = connector.salesforce_client._log_files_query(connector.last_event_date, LogType.DAILY)
get_log_files_url = connector.salesforce_client._request_url_with_query(query)

log_file_response_dict = log_files_response_success.dict()
log_file_response_dict["records"] = [token_data]

mocked_responses.get(
get_log_files_url,
payload=log_files_response_success.dict(),
)

# We try to return too large file to process it memory at this place, putting Content-Length to 1GB
mocked_responses.get(
url="{0}{1}".format(salesforce_url, event_log_file.LogFile),
status=200,
body=csv_content.encode("utf-8"),
headers={"Content-Length": "{0}".format(1024 * 1024 * 1024)},
)

result = await connector.get_salesforce_events()

expected_result = []
[expected_result.extend(pushed_events_ids) for _ in range(1, len(csv_content.splitlines()))]

assert result == expected_result
5 changes: 5 additions & 0 deletions Salesforce/trigger_salesforce_events.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
"type": "integer",
"description": "Batch frequency in seconds",
"default": 600
},
"fetch_daily_logs": {
"type": "boolean",
"description": "Fetch daily logs. By default, it will fetch Hourly logs",
"default": false
}
},
"required": [
Expand Down

0 comments on commit 9158210

Please sign in to comment.