Skip to content

Commit

Permalink
V03 issue745 Improve poll cod deriveKey & refactor to remove code dup…
Browse files Browse the repository at this point in the history
…lication (#746)

* refactor deriveKey into parent class. change key for cod #745

* update unit tests to reflect refactor and deriveKey change. #745
  • Loading branch information
petersilva authored Aug 14, 2023
1 parent 59dbc7d commit 5afe909
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 115 deletions.
4 changes: 2 additions & 2 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def __init__(self, cfg=None):
# open cache, get masks.
if self.o.nodupe_ttl > 0:
if self.o.nodupe_driver.lower() == "redis":
self.plugins['load'].append('sarracenia.flowcb.nodupe.redis.NoDupe')
self.plugins['load'].append('sarracenia.flowcb.nodupe.redis.Redis')
else:
self.plugins['load'].append('sarracenia.flowcb.nodupe.disk.NoDupe')
self.plugins['load'].append('sarracenia.flowcb.nodupe.disk.Disk')


if (( hasattr(self.o, 'delete_source') and self.o.delete_source ) or \
Expand Down
56 changes: 56 additions & 0 deletions sarracenia/flowcb/nodupe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@


import logging

from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class NoDupe(FlowCB):
"""
duplicate suppression family of modules.
invoked with:
callback sarracenia.flowcb.nodupe.disk
or:
callback sarracenia.flowcb.nodupe.redis
with default being loaded depdending on the presence of a
nodupe_driver "redis"
setting (defaults to disk.)
"""


def deriveKey(self, msg) -> str:

key=None
if ('nodupe_override' in msg) and ('key' in msg['nodupe_override']):
key = msg['nodupe_override']['key']
elif 'fileOp' in msg :
if 'link' in msg['fileOp']:
key = msg['fileOp']['link']
elif 'directory' in msg['fileOp']:
if 'remove' not in msg['fileOp']:
key = msg['relPath']
elif ('identity' in msg) and not (msg['identity']['method'] in ['cod']):
key = msg['identity']['method'] + ',' + msg['identity']['value'].replace('\n', '')

if not key:
if 'mtime' in msg:
t = msg['mtime']
else:
t = msg['pubTime']
if 'size' in msg:
key = f"{msg['relPath']},{t},{msg['size']}"
else:
key = f"{msg['relPath']},{t}"

return key


37 changes: 2 additions & 35 deletions sarracenia/flowcb/nodupe/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

from sarracenia import nowflt, timestr2flt, timeflt2str

from sarracenia.flowcb import FlowCB
from sarracenia.flowcb.nodupe import NoDupe

logger = logging.getLogger(__name__)


class NoDupe(FlowCB):
class Disk(NoDupe):
"""
generalised duplicate suppression for sr3 programs. It is used as a
time based buffer that prevents, when activated, identical files (of some kinds)
Expand Down Expand Up @@ -124,39 +124,6 @@ def _not_in_cache(self, key, relpath) -> bool:

return True

def deriveKey(self, msg) -> str:

key=None
if ('nodupe_override' in msg) and ('key' in msg['nodupe_override']):
key = msg['nodupe_override']['key']
elif 'fileOp' in msg :
if 'link' in msg['fileOp']:
key = msg['fileOp']['link']
elif 'directory' in msg['fileOp']:
if 'remove' not in msg['fileOp']:
key = msg['relPath']
elif 'identity' in msg:
if msg['identity']['method'] in ['cod']:
# if cod, revert to using the path.
key = msg['relPath']
else:
key = msg['identity']['method'] + ',' + msg['identity']['value'].replace('\n', '')


if not key:
if 'mtime' in msg:
t = msg['mtime']
else:
t = msg['pubTime']
if 'size' in msg:
key = f"{msg['relPath']},{t},{msg['size']}"
else:
key = f"{msg['relPath']},{t}"

return key



def check_message(self, msg) -> bool :
"""
derive keys to be looked up in cache of messages already seen.
Expand Down
37 changes: 3 additions & 34 deletions sarracenia/flowcb/nodupe/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

from sarracenia import nowflt, timestr2flt, timeflt2str

from sarracenia.flowcb import FlowCB
from sarracenia.flowcb.nodupe import NoDupe

import redis

logger = logging.getLogger(__name__)


class NoDupe(FlowCB):
class Redis(NoDupe):
"""
generalised duplicate suppression for sr3 programs. It is used as a
time based buffer that prevents, when activated, identical files (of some kinds)
Expand Down Expand Up @@ -79,37 +79,6 @@ def _hash(self, text) -> str:
h.update(bytes(text, 'utf-8'))
return h.hexdigest()

def _deriveKey(self, message) -> str:

key = None
if ('nodupe_override' in message) and ('key' in message['nodupe_override']):
key = message['nodupe_override']['key']
elif 'fileOp' in message :
if 'link' in message['fileOp']:
key = message['fileOp']['link']
elif 'directory' in message['fileOp'] and 'remove' not in message['fileOp']:
key = message['relPath']
elif 'identity' in message:
if message['identity']['method'] in ['cod']:
# if cod, revert to using the path.
key = message['relPath']
else:
key = message['identity']['method'] + ',' + message['identity']['value'].replace('\n', '')

if not key:
if 'mtime' in message:
message_time = message['mtime']
else:
message_time = message['pubTime']

if 'size' in message:
key = f"{message['relPath']},{message_time},{message['size']}"
else:
key = f"{message['relPath']},{message_time}"

return key


def _is_new(self, message) -> bool :
"""
Derive keys to be looked up in cache of messages already seen, then look them up in the cache,
Expand All @@ -118,7 +87,7 @@ def _is_new(self, message) -> bool :
True if it is new.
"""

key = self._deriveKey(message)
key = self.deriveKey(message)

if ('nodupe_override' in message) and ('path' in message['nodupe_override']):
path = message['nodupe_override']['path']
Expand Down
46 changes: 23 additions & 23 deletions tests/sarracenia/flowcb/nodupe/disk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def pretty(*things, **named_things):
print(str(k) + ":")
pprint.PrettyPrinter(indent=2, width=200).pprint(v)

from sarracenia.flowcb.nodupe.disk import NoDupe
from sarracenia.flowcb.nodupe.disk import Disk
from sarracenia import Message as SR3Message

class Options:
Expand Down Expand Up @@ -58,7 +58,7 @@ def make_message():
def test_deriveKey(tmp_path):
BaseOptions = Options()
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

thismsg = make_message()
thismsg['nodupe_override'] = {'key': "SomeKeyValue"}
Expand All @@ -74,7 +74,7 @@ def test_deriveKey(tmp_path):

thismsg = make_message()
thismsg['identity'] = {'method': "cod"}
assert nodupe.deriveKey(thismsg) == thismsg["relPath"]
assert nodupe.deriveKey(thismsg) == thismsg["relPath"] + "," + thismsg["mtime"]

thismsg['identity'] = {'method': "method", 'value': "value\n"}
assert nodupe.deriveKey(thismsg) == "method,value"
Expand All @@ -91,7 +91,7 @@ def test_open__WithoutFile(tmp_path):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.open()
assert nodupe.cache_file == str(tmp_path) + os.sep + 'recent_files_005.cache'
Expand All @@ -103,7 +103,7 @@ def test_open__WithFile(tmp_path):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

filepath = str(tmp_path) + os.sep + 'recent_files_005.cache'
fp = open(filepath, 'a')
Expand All @@ -120,7 +120,7 @@ def test_open__WithData(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

fp = open(str(tmp_path) + os.sep + 'recent_files_005.cache', 'a')
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_on_start(tmp_path):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.on_start()

Expand All @@ -169,7 +169,7 @@ def test_on_stop(tmp_path):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.on_stop()

Expand All @@ -182,7 +182,7 @@ def test_close(tmp_path):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.on_start()

Expand All @@ -197,7 +197,7 @@ def test_close__ErrorThrown(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.on_start()

Expand All @@ -223,7 +223,7 @@ def test_close__Unlink(tmp_path):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.on_start()

Expand All @@ -240,7 +240,7 @@ def test_close__Unlink_ErrorThrown(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)

nodupe.on_start()

Expand Down Expand Up @@ -268,7 +268,7 @@ def test_clean(tmp_path, capsys):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.cache_dict = {
Expand All @@ -292,7 +292,7 @@ def test_clean__Persist_DelPath(tmp_path, capsys):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -322,7 +322,7 @@ def test_save(tmp_path, capsys):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -351,7 +351,7 @@ def test_save__Unlink_Error(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -388,7 +388,7 @@ def test_save__Open_Error(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -422,7 +422,7 @@ def test_on_housekeeping(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -457,7 +457,7 @@ def test__not_in_cache(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -495,7 +495,7 @@ def test_check_message(tmp_path, capsys):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down Expand Up @@ -528,7 +528,7 @@ def test_after_accept(tmp_path, capsys):
BaseOptions.cfg_run_dir = str(tmp_path)
BaseOptions.no = 5
BaseOptions.inflight = 0
nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand All @@ -555,7 +555,7 @@ def test_after_accept__WithFileAges(tmp_path, capsys):
BaseOptions.no = 5
BaseOptions.inflight = 0

nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000
nodupe.o.nodupe_fileAgeMin = 1000
nodupe.o.nodupe_fileAgeMax = 1000
Expand Down Expand Up @@ -587,7 +587,7 @@ def test_after_accept__InFlight(tmp_path, capsys):
BaseOptions.no = 5
BaseOptions.inflight = 1000

nodupe = NoDupe(BaseOptions)
nodupe = Disk(BaseOptions)
nodupe.o.nodupe_ttl = 100000

nodupe.open()
Expand Down
Loading

0 comments on commit 5afe909

Please sign in to comment.