diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index bb9e3db7bca61..e361ef436dfb1 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import datetime as dt import functools import gzip @@ -9,7 +10,7 @@ from unittest import mock from uuid import uuid4 -import boto3 +import aioboto3 import botocore.exceptions import brotli import pytest @@ -55,18 +56,20 @@ TEST_ROOT_BUCKET = "test-batch-exports" -def check_valid_credentials() -> bool: +async def check_valid_credentials() -> bool: """Check if there are valid AWS credentials in the environment.""" - sts = boto3.client("sts") + session = aioboto3.Session() + sts = await session.client("sts") try: - sts.get_caller_identity() + await sts.get_caller_identity() except botocore.exceptions.ClientError: return False else: return True -create_test_client = functools.partial(boto3.client, endpoint_url=settings.OBJECT_STORAGE_ENDPOINT) +SESSION = aioboto3.Session() +create_test_client = functools.partial(SESSION.client, endpoint_url=settings.OBJECT_STORAGE_ENDPOINT) @pytest.fixture @@ -75,39 +78,38 @@ def bucket_name() -> str: return f"{TEST_ROOT_BUCKET}-{str(uuid4())}" -@pytest.fixture -def s3_client(bucket_name): +@pytest_asyncio.fixture +async def s3_client(bucket_name): """Manage a testing S3 client to interact with a testing S3 bucket. Yields the test S3 client after creating a testing S3 bucket. Upon resuming, we delete the contents and the bucket itself. """ - s3_client = create_test_client( + async with create_test_client( "s3", aws_access_key_id="object_storage_root_user", aws_secret_access_key="object_storage_root_password", - ) + ) as s3_client: + await s3_client.create_bucket(Bucket=bucket_name) - s3_client.create_bucket(Bucket=bucket_name) + yield s3_client - yield s3_client + response = await s3_client.list_objects_v2(Bucket=bucket_name) - response = s3_client.list_objects_v2(Bucket=bucket_name) + if "Contents" in response: + for obj in response["Contents"]: + if "Key" in obj: + await s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) - if "Contents" in response: - for obj in response["Contents"]: - if "Key" in obj: - s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) + await s3_client.delete_bucket(Bucket=bucket_name) - s3_client.delete_bucket(Bucket=bucket_name) - -def assert_events_in_s3( +async def assert_events_in_s3( s3_client, bucket_name, key_prefix, events, compression: str | None = None, exclude_events: list[str] | None = None ): """Assert provided events written to JSON in key_prefix in S3 bucket_name.""" # List the objects in the bucket with the prefix. - objects = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=key_prefix) + objects = await s3_client.list_objects_v2(Bucket=bucket_name, Prefix=key_prefix) # Check that there is only one object. assert len(objects.get("Contents", [])) == 1 @@ -115,8 +117,8 @@ def assert_events_in_s3( # Get the object. key = objects["Contents"][0].get("Key") assert key - object = s3_client.get_object(Bucket=bucket_name, Key=key) - data = object["Body"].read() + s3_object = await s3_client.get_object(Bucket=bucket_name, Key=key) + data = await s3_object["Body"].read() # Check that the data is correct. match compression: @@ -306,10 +308,12 @@ async def test_insert_into_s3_activity_puts_data_into_s3( with override_settings( BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 ): # 5MB, the minimum for Multipart uploads - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.run(insert_into_s3_activity, insert_inputs) - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) + await assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) @pytest.mark.django_db @@ -436,7 +440,9 @@ async def test_s3_export_workflow_with_minio_bucket( activities=[create_export_run, insert_into_s3_activity, update_export_run_status], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.client.execute_workflow( S3BatchExportWorkflow.run, inputs, @@ -452,7 +458,7 @@ async def test_s3_export_workflow_with_minio_bucket( run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) + await assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) @pytest.mark.skipif( @@ -581,45 +587,46 @@ async def test_s3_export_workflow_with_s3_bucket(interval, compression, encrypti **batch_export.destination.config, ) - s3_client = boto3.client("s3") + async with aioboto3.Session().client("s3") as s3_client: - def create_s3_client(*args, **kwargs): - """Mock function to return an already initialized S3 client.""" - return s3_client + @contextlib.asynccontextmanager + async def create_s3_client(*args, **kwargs): + """Mock function to return an already initialized S3 client.""" + yield s3_client - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: - async with Worker( - activity_environment.client, - task_queue=settings.TEMPORAL_TASK_QUEUE, - workflows=[S3BatchExportWorkflow], - activities=[create_export_run, insert_into_s3_activity, update_export_run_status], - workflow_runner=UnsandboxedWorkflowRunner(), - ): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_s3_client): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=10), - ) + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[create_export_run, insert_into_s3_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_s3_client + ): + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) - assert len(runs) == 1 + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 - run = runs[0] - assert run.status == "Completed" + run = runs[0] + assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) + await assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) @pytest.mark.django_db @pytest.mark.asyncio @pytest.mark.parametrize("compression", [None, "gzip"]) -async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( - client: HttpClient, s3_client, bucket_name, compression -): +async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data(s3_client, bucket_name, compression): """Test the full S3 workflow targetting a MinIO bucket. The workflow should update the batch export run status to completed and produce the expected @@ -700,7 +707,9 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( activities=[create_export_run, insert_into_s3_activity, update_export_run_status], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.client.execute_workflow( S3BatchExportWorkflow.run, inputs, @@ -716,15 +725,15 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix.format(year=2023, month="04", day="25"), events, compression) + await assert_events_in_s3( + s3_client, bucket_name, prefix.format(year=2023, month="04", day="25"), events, compression + ) @pytest.mark.django_db @pytest.mark.asyncio @pytest.mark.parametrize("compression", [None, "gzip", "brotli"]) -async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( - client: HttpClient, s3_client, bucket_name, compression -): +async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at(s3_client, bucket_name, compression): """Test the full S3 workflow targetting a MinIO bucket. In this scenario we assert that when inserted_at is NULL, we default to _timestamp. @@ -818,7 +827,9 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( activities=[create_export_run, insert_into_s3_activity, update_export_run_status], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.client.execute_workflow( S3BatchExportWorkflow.run, inputs, @@ -834,15 +845,13 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) + await assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) @pytest.mark.django_db @pytest.mark.asyncio @pytest.mark.parametrize("compression", [None, "gzip", "brotli"]) -async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( - client: HttpClient, s3_client, bucket_name, compression -): +async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix(s3_client, bucket_name, compression): """Test the S3BatchExport Workflow utilizing a custom key prefix. We will be asserting that exported events land in the appropiate S3 key according to the prefix. @@ -921,7 +930,9 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( activities=[create_export_run, insert_into_s3_activity, update_export_run_status], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.client.execute_workflow( S3BatchExportWorkflow.run, inputs, @@ -940,20 +951,18 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( expected_key_prefix = prefix.format( table="events", year="2023", month="04", day="25", hour="14", minute="30", second="00" ) - objects = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=expected_key_prefix) + objects = await s3_client.list_objects_v2(Bucket=bucket_name, Prefix=expected_key_prefix) key = objects["Contents"][0].get("Key") assert len(objects.get("Contents", [])) == 1 assert key.startswith(expected_key_prefix) - assert_events_in_s3(s3_client, bucket_name, expected_key_prefix, events, compression) + await assert_events_in_s3(s3_client, bucket_name, expected_key_prefix, events, compression) @pytest.mark.django_db @pytest.mark.asyncio @pytest.mark.parametrize("compression", [None, "gzip", "brotli"]) -async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( - client: HttpClient, s3_client, bucket_name, compression -): +async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates(s3_client, bucket_name, compression): """Test that S3 Export Workflow end-to-end by using a local MinIO bucket instead of S3. In this particular instance of the test, we assert no duplicates are exported to S3. @@ -1065,7 +1074,9 @@ async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( activities=[create_export_run, insert_into_s3_activity, update_export_run_status], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.client.execute_workflow( S3BatchExportWorkflow.run, inputs, @@ -1080,7 +1091,7 @@ async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) + await assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) @pytest_asyncio.fixture @@ -1537,9 +1548,11 @@ def assert_heartbeat_details(*details): ) with override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): - with mock.patch("posthog.temporal.workflows.s3_batch_export.boto3.client", side_effect=create_test_client): + with mock.patch( + "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client + ): await activity_environment.run(insert_into_s3_activity, insert_inputs) # This checks that the assert_heartbeat_details function was actually called assert current_part_number > 1 - assert_events_in_s3(s3_client, bucket_name, prefix, events, None, None) + await assert_events_in_s3(s3_client, bucket_name, prefix, events, None, None) diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 7b9abbe100808..c451a8a036e87 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -295,6 +295,9 @@ def __exit__(self, exc, value, tb): """Context-manager protocol exit method.""" return self._file.__exit__(exc, value, tb) + def __iter__(self): + yield from self._file + @property def brotli_compressor(self): if self._brotli_compressor is None: diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 4252614a0263f..14568c44541f2 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -1,11 +1,13 @@ import asyncio +import contextlib import datetime as dt +import io import json import posixpath import typing from dataclasses import dataclass -import boto3 +import aioboto3 from django.conf import settings from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy @@ -90,8 +92,20 @@ class S3MultiPartUploadState(typing.NamedTuple): class S3MultiPartUpload: """An S3 multi-part upload.""" - def __init__(self, s3_client, bucket_name: str, key: str, encryption: str | None, kms_key_id: str | None): - self.s3_client = s3_client + def __init__( + self, + region_name: str, + bucket_name: str, + key: str, + encryption: str | None, + kms_key_id: str | None, + aws_access_key_id: str | None = None, + aws_secret_access_key: str | None = None, + ): + self._session = aioboto3.Session() + self.region_name = region_name + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key self.bucket_name = bucket_name self.key = key self.encryption = encryption @@ -118,7 +132,17 @@ def is_upload_in_progress(self) -> bool: return False return True - def start(self) -> str: + @contextlib.asynccontextmanager + async def s3_client(self): + async with self._session.client( + "s3", + region_name=self.region_name, + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + ) as client: + yield client + + async def start(self) -> str: """Start this S3MultiPartUpload.""" if self.is_upload_in_progress() is True: raise UploadAlreadyInProgressError(self.upload_id) @@ -129,11 +153,13 @@ def start(self) -> str: if self.kms_key_id: optional_kwargs["SSEKMSKeyId"] = self.kms_key_id - multipart_response = self.s3_client.create_multipart_upload( - Bucket=self.bucket_name, - Key=self.key, - **optional_kwargs, - ) + async with self.s3_client() as s3_client: + multipart_response = await s3_client.create_multipart_upload( + Bucket=self.bucket_name, + Key=self.key, + **optional_kwargs, + ) + upload_id: str = multipart_response["UploadId"] self.upload_id = upload_id @@ -146,66 +172,72 @@ def continue_from_state(self, state: S3MultiPartUploadState): return self.upload_id - def complete(self) -> str: + async def complete(self) -> str: if self.is_upload_in_progress() is False: raise NoUploadInProgressError() - response = self.s3_client.complete_multipart_upload( - Bucket=self.bucket_name, - Key=self.key, - UploadId=self.upload_id, - MultipartUpload={"Parts": self.parts}, - ) + async with self.s3_client() as s3_client: + response = await s3_client.complete_multipart_upload( + Bucket=self.bucket_name, Key=self.key, UploadId=self.upload_id, MultipartUpload={"Parts": self.parts} + ) self.upload_id = None self.parts = [] return response["Location"] - def abort(self): + async def abort(self): if self.is_upload_in_progress() is False: raise NoUploadInProgressError() - self.s3_client.abort_multipart_upload( - Bucket=self.bucket_name, - Key=self.key, - UploadId=self.upload_id, - ) + async with self.s3_client() as s3_client: + await s3_client.abort_multipart_upload( + Bucket=self.bucket_name, + Key=self.key, + UploadId=self.upload_id, + ) self.upload_id = None self.parts = [] - def upload_part(self, body: BatchExportTemporaryFile, rewind: bool = True): + async def upload_part(self, body: BatchExportTemporaryFile, rewind: bool = True): next_part_number = self.part_number + 1 if rewind is True: body.rewind() - response = self.s3_client.upload_part( - Bucket=self.bucket_name, - Key=self.key, - PartNumber=next_part_number, - UploadId=self.upload_id, - Body=body, - ) + # aiohttp is not duck-type friendly and requires a io.IOBase + # We comply with the file-like interface of io.IOBase. + # So we tell mypy to be nice with us. + reader = io.BufferedReader(body) # type: ignore + + async with self.s3_client() as s3_client: + response = await s3_client.upload_part( + Bucket=self.bucket_name, + Key=self.key, + PartNumber=next_part_number, + UploadId=self.upload_id, + Body=reader, + ) + reader.detach() # BufferedReader closes the file otherwise. self.parts.append({"PartNumber": next_part_number, "ETag": response["ETag"]}) - def __enter__(self): + async def __aenter__(self): if not self.is_upload_in_progress(): - self.start() + await self.start() return self - def __exit__(self, exc_type, exc_value, traceback) -> bool: + async def __aexit__(self, exc_type, exc_value, traceback) -> bool: if exc_value is None: # Succesfully completed the upload - self.complete() + await self.complete() return True if exc_type == asyncio.CancelledError: # Ensure we clean-up the cancelled upload. - self.abort() + await self.abort() return False @@ -249,17 +281,20 @@ class S3InsertInputs: kms_key_id: str | None = None -def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]: +async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]: """Initialize a S3MultiPartUpload and resume it from a hearbeat state if available.""" logger = get_batch_exports_logger(inputs=inputs) key = get_s3_key(inputs) - s3_client = boto3.client( - "s3", + + s3_upload = S3MultiPartUpload( + bucket_name=inputs.bucket_name, + key=key, + encryption=inputs.encryption, + kms_key_id=inputs.kms_key_id, region_name=inputs.region, aws_access_key_id=inputs.aws_access_key_id, aws_secret_access_key=inputs.aws_secret_access_key, ) - s3_upload = S3MultiPartUpload(s3_client, inputs.bucket_name, key, inputs.encryption, inputs.kms_key_id) details = activity.info().heartbeat_details @@ -291,7 +326,7 @@ def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3Mu logger.info( f"Export will start from the beginning as we are using brotli compression: {interval_start}", ) - s3_upload.abort() + await s3_upload.abort() return s3_upload, interval_start @@ -335,7 +370,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): logger.info("BatchExporting %s rows to S3", count) - s3_upload, interval_start = initialize_and_resume_multipart_upload(inputs) + s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs) # Iterate through chunks of results from ClickHouse and push them to S3 # as a multipart upload. The intention here is to keep memory usage low, @@ -364,7 +399,7 @@ async def worker_shutdown_handler(): asyncio.create_task(worker_shutdown_handler()) - with s3_upload as s3_upload: + async with s3_upload as s3_upload: with BatchExportTemporaryFile(compression=inputs.compression) as local_results_file: for result in results_iterator: record = { @@ -390,7 +425,7 @@ async def worker_shutdown_handler(): local_results_file.bytes_since_last_reset, ) - s3_upload.upload_part(local_results_file) + await s3_upload.upload_part(local_results_file) last_uploaded_part_timestamp = result["inserted_at"] activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) @@ -405,7 +440,7 @@ async def worker_shutdown_handler(): local_results_file.bytes_since_last_reset, ) - s3_upload.upload_part(local_results_file) + await s3_upload.upload_part(local_results_file) last_uploaded_part_timestamp = result["inserted_at"] activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) diff --git a/requirements.in b/requirements.in index 62f9b7a8a6f1c..673f7e045a34e 100644 --- a/requirements.in +++ b/requirements.in @@ -5,9 +5,10 @@ # - `pip-compile --rebuild requirements-dev.in` # aiohttp>=3.8.4 +aioboto3==11.1 antlr4-python3-runtime==4.13.0 amqp==5.1.1 -boto3==1.26.66 +boto3==1.26.76 boto3-stubs[s3] brotli==1.1.0 celery==5.3.4 diff --git a/requirements.txt b/requirements.txt index 403e3f597cf12..beb9261497bb4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,11 +4,18 @@ # # pip-compile requirements.in # +aioboto3==11.1 + # via -r requirements.in +aiobotocore[boto3]==2.5.0 + # via aioboto3 aiohttp==3.8.5 # via # -r requirements.in + # aiobotocore # geoip2 # openai +aioitertools==0.11.0 + # via aiobotocore aiosignal==1.2.0 # via aiohttp amqp==5.1.1 @@ -43,12 +50,15 @@ backoff==2.2.1 # via posthoganalytics billiard==4.1.0 # via celery -boto3==1.26.66 - # via -r requirements.in +boto3==1.26.76 + # via + # -r requirements.in + # aiobotocore boto3-stubs[s3]==1.26.138 # via -r requirements.in -botocore==1.29.66 +botocore==1.29.76 # via + # aiobotocore # boto3 # s3transfer botocore-stubs==1.29.130 @@ -539,6 +549,8 @@ webdriver-manager==4.0.1 # via -r requirements.in whitenoise==6.5.0 # via -r requirements.in +wrapt==1.15.0 + # via aiobotocore wsproto==1.1.0 # via trio-websocket xmlsec==1.3.13