Skip to content

Commit

Permalink
Fix sync with duplicate display_name in dynamic folders mode
Browse files Browse the repository at this point in the history
  • Loading branch information
const-cloudinary committed Jan 1, 2024
1 parent 7b3f392 commit c756f56
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 18 deletions.
87 changes: 82 additions & 5 deletions cloudinary_cli/modules/sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from collections import Counter
from itertools import groupby
from os import path, remove
Expand All @@ -7,11 +8,12 @@
from cloudinary import api

from cloudinary_cli.utils.api_utils import query_cld_folder, upload_file, download_file, get_folder_mode, \
get_default_upload_options, get_destination_folder_options
from cloudinary_cli.utils.file_utils import walk_dir, delete_empty_dirs, normalize_file_extension, posix_rel_path
get_default_upload_options, get_destination_folder_options, cld_folder_exists
from cloudinary_cli.utils.file_utils import (walk_dir, delete_empty_dirs, normalize_file_extension, posix_rel_path,
populate_duplicate_name)
from cloudinary_cli.utils.json_utils import print_json, read_json_from_file, write_json_to_file
from cloudinary_cli.utils.utils import logger, run_tasks_concurrently, get_user_action, invert_dict, chunker, \
group_params, parse_option_value
group_params, parse_option_value, duplicate_values

_DEFAULT_DELETION_BATCH_SIZE = 30
_DEFAULT_CONCURRENT_WORKERS = 30
Expand Down Expand Up @@ -43,6 +45,10 @@ def sync(local_folder, cloudinary_folder, push, pull, include_hidden, concurrent
if push == pull:
raise UsageError("Please use either the '--push' OR '--pull' options")

if pull and not cld_folder_exists(cloudinary_folder):
logger.error(f"Cloudinary folder '{cloudinary_folder}' does not exist. Aborting...")
return False

sync_dir = SyncDir(local_folder, cloudinary_folder, include_hidden, concurrent_workers, force, keep_unique,
deletion_batch_size, folder_mode, optional_parameter, optional_parameter_parsed)

Expand Down Expand Up @@ -81,9 +87,12 @@ def __init__(self, local_dir, remote_dir, include_hidden, concurrent_workers, fo
self.local_files = walk_dir(path.abspath(self.local_dir), include_hidden)
logger.info(f"Found {len(self.local_files)} items in local folder '{local_dir}'")

self.remote_files = query_cld_folder(self.remote_dir, self.folder_mode)
logger.info(f"Found {len(self.remote_files)} items in Cloudinary folder '{self.user_friendly_remote_dir}' "
raw_remote_files = query_cld_folder(self.remote_dir, self.folder_mode)
logger.info(f"Found {len(raw_remote_files)} items in Cloudinary folder '{self.user_friendly_remote_dir}' "
f"({self.folder_mode} folder mode)")
self.remote_files = self._normalize_remote_file_names(raw_remote_files, self.local_files)
self.remote_duplicate_names = duplicate_values(self.remote_files, "normalized_path", "asset_id")
self._print_duplicate_file_names()

local_file_names = self.local_files.keys()
remote_file_names = self.remote_files.keys()
Expand All @@ -94,10 +103,14 @@ def __init__(self, local_dir, remote_dir, include_hidden, concurrent_workers, fo
Usually Cloudinary sanitizes those file names and strips invalid characters. Although it is a good best effort
for a general use case, when syncing local folder with Cloudinary, it is not the best option, since directories
will be always out-of-sync.
In addition in dynamic folder mode Cloudinary allows having identical display names for differrent files.
To overcome this limitation, cloudinary-cli keeps .cld-sync hidden file in the sync directory that contains a
mapping of the diverse file names. This file keeps tracking of the files and allows syncing in both directions.
"""

# handle fixed folder mode public_id differences
diverse_file_names = read_json_from_file(self.sync_meta_file, does_not_exist_ok=True)
self.diverse_file_names = dict(
(normalize_file_extension(k), normalize_file_extension(v)) for k, v in diverse_file_names.items())
Expand Down Expand Up @@ -189,6 +202,70 @@ def pull(self):
if download_errors:
raise Exception("Sync did not finish successfully")

def _normalize_remote_file_names(self, remote_files, local_files):
"""
When multiple remote files have duplicate display name, we save them locally by appending index at the end
of the base name, e.g. Image (1).jpg, Image (2).jpg, etc.
For consistency, we sort files by `created_at` date.
For partially synced files, when a remote file in the middle was deleted, we want to avoid resync
of the remaining files.
For example, if we had: Image (1), Image (2),..., Image(5), Image (10) on Cloudinary.
If we delete "Image (2)" and resync - that would cause all files from Image (3) to Image (10) to be resynced.
(Image (3) would become Image (2), ... Image (10) -> Image (9))
Instead, since those indexes are arbitrary, we map local files to the remote files by etag (md5sum).
Synced files will keep their indexes, out-of-sync files will be synced.
:param remote_files: Remote files.
:param local_files: Local files.
:return:
"""
duplicate_ids = duplicate_values(remote_files, "normalized_path")
for duplicate_name, asset_ids in duplicate_ids.items():
duplicate_dts = sorted([remote_files[asset_id] for asset_id in asset_ids], key=lambda f: f['created_at'])
local_candidates = self._local_candidates(duplicate_name)
remainng_duplicate_dts = []
for duplicate_dt in duplicate_dts:
matched_name = next((f for f in local_candidates.keys() if local_candidates[f] == duplicate_dt["etag"]),
None)
if matched_name is None:
remainng_duplicate_dts.append(duplicate_dt)
continue
# found local synced file.
remote_files[duplicate_dt["asset_id"]]["normalized_unique_path"] = matched_name
local_candidates.pop(matched_name)

unique_paths = {v["normalized_unique_path"] for v in remote_files.values()}
curr_index = 0
for dup in remainng_duplicate_dts:
# here we check for collisions with other existing files.
# remote file can have both "Image.jpg" and "Image (1).jpg", which are valid names, skip those.
candidate_path = populate_duplicate_name(dup['normalized_path'], curr_index)
while candidate_path in unique_paths:
curr_index += 1
candidate_path = populate_duplicate_name(dup['normalized_path'], curr_index)
remote_files[dup["asset_id"]]["normalized_unique_path"] = candidate_path
curr_index += 1

return {dt["normalized_unique_path"]: dt for dt in remote_files.values()}

def _local_candidates(self, candidate_path):
filename, extension = path.splitext(candidate_path)
r = re.compile(f"({candidate_path}|{filename} \(\d+\){extension})")
# sort local files by base name (without ext) for accurate results.
return dict(sorted({f: self.local_files[f]["etag"] for f in filter(r.match, self.local_files.keys())}.items(),
key=lambda f: path.splitext(f[0])[0]))

def _print_duplicate_file_names(self):
if (len(self.remote_duplicate_names) > 0):
logger.warning(f"Cloudinary folder '{self.user_friendly_remote_dir}' "
f"contains {len(self.remote_duplicate_names)} duplicate asset names")
for normalized_path, asset_ids in self.remote_duplicate_names.items():
logger.debug(f"Duplicate name: '{normalized_path}', asset ids: {', '.join(asset_ids)}")

def _print_sync_status(self, success, errors):
logger.info("==Sync Status==")
logger.info("===============")
Expand Down
29 changes: 22 additions & 7 deletions cloudinary_cli/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@

import requests
from click import style, launch
from cloudinary import Search, uploader, api
from cloudinary import Search, SearchFolders, uploader, api
from cloudinary.utils import cloudinary_url

from cloudinary_cli.defaults import logger
from cloudinary_cli.utils.config_utils import is_valid_cloudinary_config
from cloudinary_cli.utils.file_utils import normalize_file_extension, posix_rel_path, get_destination_folder
from cloudinary_cli.utils.file_utils import (normalize_file_extension, posix_rel_path, get_destination_folder,
populate_duplicate_name)
from cloudinary_cli.utils.json_utils import print_json, write_json_to_file
from cloudinary_cli.utils.utils import log_exception, confirm_action, get_command_params, merge_responses, \
normalize_list_params, ConfigurationError, print_api_help
normalize_list_params, ConfigurationError, print_api_help, duplicate_values

PAGINATION_MAX_RESULTS = 500

Expand All @@ -34,14 +35,19 @@ def query_cld_folder(folder, folder_mode):
rel_path = _relative_path(asset, folder)
rel_display_path = _relative_display_path(asset, folder)
path_key = rel_display_path if folder_mode == "dynamic" else rel_path
files[normalize_file_extension(path_key)] = {
normalized_path_key = normalize_file_extension(path_key)
files[asset["asset_id"]] = {
"asset_id": asset['asset_id'],
"normalized_path": normalized_path_key,
"normalized_unique_path": normalized_path_key,
"type": asset['type'],
"resource_type": asset['resource_type'],
"public_id": asset['public_id'],
"format": asset['format'],
"etag": asset.get('etag', '0'),
"relative_path": rel_path, # save for inner use
"access_mode": asset.get('access_mode', 'public'),
"created_at": asset.get('created_at'),
# dynamic folder mode fields
"asset_folder": asset.get('asset_folder'),
"display_name": asset.get('display_name'),
Expand All @@ -53,6 +59,15 @@ def query_cld_folder(folder, folder_mode):

return files

def cld_folder_exists(folder):
folder = folder.strip('/') # omit redundant leading slash and duplicate trailing slashes in query

if not folder:
return True # root folder

res = SearchFolders().expression(f"name=\"{folder}\"").execute()

return res.get("total_count", 0) > 0

def _display_path(asset):
if asset.get("display_name") is None:
Expand Down Expand Up @@ -80,9 +95,9 @@ def regen_derived_version(public_id, delivery_type, res_type,
eager_trans, eager_async,
eager_notification_url):
options = {"type": delivery_type, "resource_type": res_type,
"eager": eager_trans, "eager_async": eager_async,
"eager_notification_url": eager_notification_url,
"overwrite": True, "invalidate": True}
"eager": eager_trans, "eager_async": eager_async,
"eager_notification_url": eager_notification_url,
"overwrite": True, "invalidate": True}
try:
exp_res = uploader.explicit(public_id, **options)
derived_url = f'{exp_res.get("eager")[0].get("secure_url")}'
Expand Down
13 changes: 13 additions & 0 deletions cloudinary_cli/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ def normalize_file_extension(filename: str) -> str:

return ".".join([p for p in [filename, extension_alias] if p])

def populate_duplicate_name(filename, index=0):
"""
Adds index to the filename in order to avoid duplicates.
:param filename: The file name to modify.
:param index: The desired index.
:return: Modified file name.
"""
filename, extension = os.path.splitext(filename)
if index != 0:
filename = f"{filename} ({index})"

return ".".join([p for p in [filename, extension[1:]] if p])

def posix_rel_path(end, start) -> str:
"""
Expand Down
16 changes: 16 additions & 0 deletions cloudinary_cli/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,19 @@ def chunker(seq, size):
:return: a single chunk
"""
return (seq[pos:pos + size] for pos in range(0, len(seq), size))


def duplicate_values(items, value_key, key_of_interest=None):
"""
Finds duplicate values in a dictionary of objects.
:param items: All items.
:param value_key: The duplicate value key to search.
:param key_of_interest: The key to add to the resulting list.
:return:
"""
rev_multidict = {}
for key, value in items.items():
rev_multidict.setdefault(value[value_key], set()).add(value[key_of_interest] if key_of_interest is not None else key)

return {key: values for key, values in rev_multidict.items() if len(values) > 1}
11 changes: 9 additions & 2 deletions test/helper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import cloudinary.api
from cloudinary import logger
from cloudinary_cli.utils.api_utils import query_cld_folder
from urllib3 import HTTPResponse, disable_warnings
from urllib3._collections import HTTPHeaderDict

Expand Down Expand Up @@ -116,8 +117,14 @@ def retry_func(*args, **kwargs):
return retry_decorator


def delete_cld_folder_if_exists(folder):
cloudinary.api.delete_resources_by_prefix(folder)
def delete_cld_folder_if_exists(folder, folder_mode = "fixed"):
if folder_mode == "fixed":
cloudinary.api.delete_resources_by_prefix(folder)
else:
assets = query_cld_folder(folder, folder_mode)
if (len(assets)):
cloudinary.api.delete_resources([f["public_id"] for f in assets.values()])

try:
cloudinary.api.delete_folder(folder)
except cloudinary.exceptions.NotFound:
Expand Down
48 changes: 44 additions & 4 deletions test/test_modules/test_cli_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from test.helper_test import unique_suffix, RESOURCES_DIR, TEST_FILES_DIR, delete_cld_folder_if_exists, retry_assertion, \
get_request_url, get_params, URLLIB3_REQUEST
from test.test_modules.test_cli_upload_dir import UPLOAD_MOCK_RESPONSE
from cloudinary_cli.utils.api_utils import get_folder_mode


class TestCLISync(unittest.TestCase):
Expand All @@ -19,14 +20,19 @@ class TestCLISync(unittest.TestCase):
LOCAL_SYNC_PULL_DIR = str(Path.joinpath(RESOURCES_DIR, unique_suffix("test_sync_pull")))
CLD_SYNC_DIR = unique_suffix("test_sync")

DUPLICATE_NAME = unique_suffix("duplicate_name")

GRACE_PERIOD = 3 # seconds

folder_mode = "fixed"

def setUp(self) -> None:
delete_cld_folder_if_exists(self.CLD_SYNC_DIR)
self.folder_mode = get_folder_mode()
delete_cld_folder_if_exists(self.CLD_SYNC_DIR, self.folder_mode)
time.sleep(1)

def tearDown(self) -> None:
delete_cld_folder_if_exists(self.CLD_SYNC_DIR)
delete_cld_folder_if_exists(self.CLD_SYNC_DIR, self.folder_mode)
time.sleep(1)
shutil.rmtree(self.LOCAL_SYNC_PULL_DIR, ignore_errors=True)

Expand Down Expand Up @@ -81,6 +87,14 @@ def test_cli_sync_pull(self):
self.assertIn("Synced | 12", result.output)
self.assertIn("Done!", result.output)


def test_cli_sync_pull_non_existing_folder(self):
non_existing_dir = self.CLD_SYNC_DIR + "non_existing"
result = self.runner.invoke(cli, ['sync', '--pull', self.LOCAL_SYNC_PULL_DIR, non_existing_dir])

self.assertIn(f"error: Cloudinary folder '{non_existing_dir}' does not exist.", result.output)
self.assertIn("Aborting...", result.output)

@retry_assertion
def test_cli_sync_pull_twice(self):
self._upload_sync_files(TEST_FILES_DIR)
Expand Down Expand Up @@ -119,8 +133,10 @@ def test_cli_sync_pull_out_of_sync(self):
self.assertIn("Synced | 11", result.output)
self.assertIn("Done!", result.output)

def _upload_sync_files(self, dir):
result = self.runner.invoke(cli, ['sync', '--push', '-F', dir, self.CLD_SYNC_DIR])
def _upload_sync_files(self, dir, optional_params=None):
if optional_params is None:
optional_params = []
result = self.runner.invoke(cli, ['sync', '--push', '-F', dir, self.CLD_SYNC_DIR] + optional_params)

self.assertEqual(0, result.exit_code)
self.assertIn("Synced | 12", result.output)
Expand All @@ -137,3 +153,27 @@ def test_sync_override_defaults(self, mocker):

self.assertIn("raw/upload", get_request_url(mocker))
self.assertTrue(get_params(mocker)['unique_filename'])


@unittest.skipUnless(get_folder_mode() == "dynamic", "requires dynamic folder mode")
@retry_assertion
def test_cli_sync_duplicate_file_names_dynamic_folder_mode(self):
self._upload_sync_files(TEST_FILES_DIR, ['-o', 'display_name', self.DUPLICATE_NAME])

# wait for indexing to be updated
time.sleep(self.GRACE_PERIOD)

result = self.runner.invoke(cli, ['sync', '--pull', '-F', self.LOCAL_SYNC_PULL_DIR, self.CLD_SYNC_DIR])

self.assertEqual(0, result.exit_code)
self.assertIn("Found 0 items in local folder", result.output)
self.assertIn("Downloading 12 files", result.output)
for index in range(1, 6):
self.assertIn(f"{self.DUPLICATE_NAME} ({index})", result.output)
self.assertIn("Done!", result.output)

result = self.runner.invoke(cli, ['sync', '--push', '-F', self.LOCAL_SYNC_PULL_DIR, self.CLD_SYNC_DIR])

self.assertEqual(0, result.exit_code)
self.assertIn("Skipping 12 items", result.output)
self.assertIn("Done!", result.output)

0 comments on commit c756f56

Please sign in to comment.