diff --git a/pytrackunit/helper.py b/pytrackunit/helper.py index 47e75fe..0a26970 100644 --- a/pytrackunit/helper.py +++ b/pytrackunit/helper.py @@ -2,39 +2,13 @@ from datetime import datetime, timedelta from copy import deepcopy -import matplotlib.pyplot as plt -import matplotlib.dates from dateutil.parser.isoparser import isoparse -def plot_can_val(_data,valname,filename=None): - """plot value from getCanData""" - data = filter(lambda x:x['name'] == valname,_data) - plot_val(data,'value',filename) - def get_datetime(time_string): """transforms trackunit time string to datetime""" #return datetime.strptime(time_string.split('.')[0],"%Y-%m-%dT%H:%M:%S") return isoparse(time_string) -def plot_val(_data,valname,filename=None): - """plots a value from data (expected format from getHistory)""" - data = (map(lambda x: (x['time'],x[valname]),_data)) - data = (set(data)) - data = (map(lambda x: (get_datetime(x[0]),float(x[1])),data)) - data = list(data) - data.sort(key=lambda x: x[0]) - dates = list(map(lambda x: x[0], data)) - dates = matplotlib.dates.date2num(dates) - fig, _ax = plt.subplots()#marker='', linestyle='-' - values = list(map(lambda x: x[1], data)) - _ax.plot_date(dates, values ,fmt=",-") - _ax.locator_params(axis="y", nbins=10) - fig.autofmt_xdate() - if filename is not None: - plt.savefig(filename) - else: - plt.show() - def get_next_section(data,finsec,fendsec=None,min_insec_len=None,min_endsec_len=0): """ returns the next section wich fulfills finsec with at least min_insec_len datapoints @@ -117,3 +91,15 @@ def start_end_from_tdelta(tdelta, preset_end=None): return end, end start = end-timedelta(days=irange) return start, end + +class SecureString: + """This will just hide the api_key from being printed""" + def __init__(self,_str): + """Creates the opject with given string as data""" + self.string = _str + def gets(self): + """returns the string""" + return self.string + def sets(self,value): + """sets the string to value""" + self.string = value diff --git a/pytrackunit/sqlcache.py b/pytrackunit/sqlcache.py index c009e1a..59ff1bf 100644 --- a/pytrackunit/sqlcache.py +++ b/pytrackunit/sqlcache.py @@ -1,12 +1,13 @@ """Module for caching data in sql db""" from copy import deepcopy -import sqlite3 import os.path import os from datetime import datetime +import sqlite3 +import aiosqlite from .tucache import TuCache -from .tuiter import SqlIter, TuIter +from .tuiter import TuIter from .helper import get_datetime, start_end_from_tdelta @@ -115,6 +116,19 @@ ) ''' +def create_tables(db_path): + """Creates the necessary tables in an sqlite database which is located at db_path""" + _db = sqlite3.connect(db_path) + cur = _db.cursor() + cur.execute(CREATE_HISTORY_TABLE) + cur.execute(CREATE_HISTORY_META) + cur.execute(CREATE_ERROR_META_TABLE) + cur.execute(CREATE_ERROR_DATA_TABLE) + cur.execute(CREATE_CANDATA_META_TABLE) + cur.execute(CREATE_CANDATA_DATA_TABLE) + _db.commit() + _db.close() + def candata_item_to_sql_item(_x,meta): """ returns the candata as a tuple and converts the time to unix timestamp (milliseconds) @@ -129,6 +143,7 @@ def candata_item_to_sql_item(_x,meta): else: _uom = None return (_id,_time,_variableid,_name,_value,_uom) + def sql_item_to_candata_item(obj): """ the operation candata_item_to_sql_item reversed @@ -141,6 +156,7 @@ def sql_item_to_candata_item(obj): _x['value'] = obj[4] _x['uoM'] = obj[5] return _x + def error_item_to_sql_item(_x,meta): """ returns the error as a tuple and converts the time to unix timestamp (milliseconds) @@ -161,6 +177,7 @@ def error_item_to_sql_item(_x,meta): else: _desc = None return (_id,_time,_spn,_fmi,_oc,_name,_desc) + def sql_item_to_error_item(obj): """ the operation error_item_to_sql_item reversed @@ -235,6 +252,7 @@ def history_item_to_sql_item(_x,meta): _Input3ChangeCounter, _Input4ChangeCounter, _batteryLevel, _externalPower) # pylinte: enable=invalid-name, too-many-locals + def sql_item_to_history_item(obj): """ the operation history_item_to_sql_item reversed @@ -286,103 +304,127 @@ def sql_item_to_history_item(obj): _x['externalPower'] = obj[43] return _x -class SqlInsertIter: - """iterator for tucache data""" - def __init__(self, table, sqliter, meta=None, _db=None): - assert _db is not None - print("SqlInsertIter __init__ has sqliter:",sqliter) - self.sqliter = sqliter - self.iter_started = False - self.meta = meta - self._db = _db - if self._db is not None: - self.cur = self._db.cursor() +async def SqlInsertIter(dbpath, table, sqliter, meta, verbose=False): + """Generator which inserts data from an upstream iterator into the database""" + assert dbpath is not None + assert dbpath != "" + + async with aiosqlite.connect(dbpath) as _db: + + if verbose: + print("SqlInsertIter __init__ has sqliter:",sqliter) if table == "error": - self.insert_sql = "INSERT INTO error VALUES (?,?,?,?,?,?,?)" - self.fconv = error_item_to_sql_item + insert_sql = "INSERT INTO error VALUES (?,?,?,?,?,?,?)" + fconv = error_item_to_sql_item elif table == "history": - self.insert_sql = """ + insert_sql = """ INSERT INTO history VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) """ - self.fconv = history_item_to_sql_item + fconv = history_item_to_sql_item elif table == "candata": - self.insert_sql = """ + insert_sql = """ INSERT INTO candata VALUES (?,?,?,?,?,?) """ - self.fconv = candata_item_to_sql_item + fconv = candata_item_to_sql_item else: raise Exception("Not yet implemented") - def __aiter__(self): - if self.iter_started: - raise Exception("cant start tuiter more than once") - self.iter_started = True - return self - - async def __anext__(self): - try: - data, meta = await self.sqliter.__anext__() - if self._db is not None: - #print(data) - data = list(data) - sqldata = (map(lambda x: self.fconv(x,meta),data)) - self.cur.executemany(self.insert_sql,sqldata) - return data, meta - except StopAsyncIteration as exc: + await _db.execute(f"INSERT INTO {table}meta VALUES (?,?,?)",\ + (meta['id'],meta['start_ts'],meta['end_ts'])) + + async for data,meta in sqliter: + data = list(data) + sqldata = map(lambda x: fconv(x[0],x[1]),zip(data,[meta])) + + try: + await _db.executemany(insert_sql,sqldata) + except sqlite3.IntegrityError as exc1: + if verbose: + print("Integrety error with",meta) + print("Try to find double entry (insert side)") + for in_item in sqldata: + try: + await _db.execute(insert_sql,in_item) + except sqlite3.IntegrityError: + print("Item throwing Integrity error",in_item) + await _db.rollback() + raise sqlite3.IntegrityError from exc1 + + yield data, meta + + if verbose: + await _db.commit() print("Committed") - self._db.commit() - raise StopAsyncIteration from exc - except sqlite3.IntegrityError as exc1: - print("Integrety error with",meta) - print("Try to find double entry (insert side)") - sqldata = (map(lambda x: self.fconv(x,meta),data)) - for in_item in sqldata: - try: - self.cur.execute(self.insert_sql,in_item) - except sqlite3.IntegrityError: - print("Item throwing Integrity error",in_item) - self._db.rollback() - raise sqlite3.IntegrityError from exc1 + + + +async def SqlReturnIter(db_path, table, meta): + """Generator return given data from database""" + command = f"select * from {table} where unit = ? and time >= ? and time <= ? order by time" + if table == "error": + fconv = sql_item_to_error_item + elif table == "history": + fconv = sql_item_to_history_item + elif table == "candata": + fconv = sql_item_to_candata_item + else: + raise Exception("Table not implemented yet") + async with aiosqlite.connect(db_path) as _db: + async with _db.execute(command,(meta['id'],meta['start_ts'],meta['end_ts'])) as _cur: + async for x in _cur: + yield [fconv(x)], meta + class SqlCache: """Sql cache can cache trackunit data in Sqlite DB""" def __init__(self,**kwargs): - self.web_db_path = kwargs.get('db_path','sqlcache.sqlite') - create_tables = not os.path.isfile(self.web_db_path) - self._db = sqlite3.connect(self.web_db_path) - assert self._db is not None + self.verbose = kwargs.get('verbose',False) + self.db_path = kwargs.get('db_path','sqlcache.sqlite') + self.tdelta_end = kwargs.get('tdelta_end',None) cache1_kwargs = deepcopy(kwargs) cache1_kwargs['dont_cache_data'] = kwargs.get('sql_cache1_dont_cache_data',True) self.cache1 = kwargs.get('sql_cache1',TuCache(**cache1_kwargs)) self.cache2 = kwargs.get('sql_cache2',TuCache(**kwargs)) - self.tdelta_end = None - if create_tables: - cur = self._db.cursor() - cur.execute(CREATE_HISTORY_TABLE) - cur.execute(CREATE_HISTORY_META) - cur.execute(CREATE_ERROR_META_TABLE) - cur.execute(CREATE_ERROR_DATA_TABLE) - cur.execute(CREATE_CANDATA_META_TABLE) - cur.execute(CREATE_CANDATA_DATA_TABLE) - self._db.commit() - def clean(self): + self.db_connection = None + self.connect() + + def connect(self): + """Connects the SqlCache and sets up db_connection""" + if self.db_connection is not None: + raise Exception("Cant connect twice") + no_db_present = not os.path.isfile(self.db_path) + self.db_connection = sqlite3.connect(self.db_path) + if no_db_present: + create_tables(self.db_path) + + def close(self): + """closes database connection""" + if self.db_connection is None: + return + self.db_connection.close() + self.db_connection = None + + def __enter__(self): + return self + + def __exit__(self, _type, value, traceback): + self.close() + + def reset(self,remove_data=False): """removes database file""" - if self._db is not None: - self._db.close() - os.remove(self.web_db_path) - self._db = None + self.close() + if remove_data: + os.remove(self.db_path) + self.connect() + async def get_unitlist(self): """returns a list of vehicles""" return await self.cache2.get_unitlist() + # pylint: disable=too-many-arguments def get_general_upstream(self, table, veh_id, start_ts, end_ts, previter=None): """gets errors from upstream cache""" - cur = self._db.cursor() - - cur.execute(f"INSERT INTO {table}meta VALUES (?,?,?)",(veh_id,start_ts,end_ts)) - # wait until data is in database and commit then - #self._db.commit() start = datetime.fromtimestamp(start_ts/1000.0) end = datetime.fromtimestamp(end_ts/1000.0) @@ -398,10 +440,12 @@ def get_general_upstream(self, table, veh_id, start_ts, end_ts, previter=None): meta = {} meta["id"] = veh_id + meta["start_ts"] = start_ts + meta["end_ts"] = end_ts meta["start"] = start meta["end"] = end - wrap_iter = SqlInsertIter(table,int_data_iter,meta,self._db) + wrap_iter = SqlInsertIter(self.db_path,table,int_data_iter,meta,self.verbose) if previter is None: previter = TuIter() @@ -411,64 +455,57 @@ def get_general_upstream(self, table, veh_id, start_ts, end_ts, previter=None): return previter, _len def get_general_sql(self, table, veh_id, start_ts, end_ts, previter=None): """gets data of this period from db whether or not it was actually stored there""" - cur = self._db.cursor() - cnt = next(cur.execute(\ - f""" + cur = self.db_connection.execute(f""" select count(*) from {table} where unit = ? and time >= ? and time <= ? order by time - """,(veh_id,start_ts,end_ts)))[0] + """,(veh_id,start_ts,end_ts)) + cnt = cur.fetchone()[0] - print("found",cnt,"in sql") + if self.verbose: + print("found",cnt,"in sql") meta = {} meta["id"] = veh_id - meta["start"] = start_ts - meta["end"] = end_ts + meta["start_ts"] = start_ts + meta["end_ts"] = end_ts if previter is None: previter = TuIter() if cnt > 0: - if table == "error": - fconv = sql_item_to_error_item - elif table == "history": - fconv = sql_item_to_history_item - elif table == "candata": - fconv = sql_item_to_candata_item - else: - raise Exception("Table not implemented yet") - previter.add(SqlIter(iter(map(lambda x: [x],map(fconv,cur.execute(\ - f"select * from {table} where unit = ? and time >= ? and time <= ? order by time",\ - (veh_id,start_ts,end_ts))))),meta)) - else: + previter.add(SqlReturnIter(self.db_path,table,meta)) + elif self.verbose: print("could not find any item in block ", start_ts, end_ts,"for unit",veh_id) return previter, cnt def get_general_unixts(self, table, veh_id, start_ts, end_ts, previter=None): """returns error in between the given datetime objects""" - cur = self._db.cursor() - # Query meta database to check whether there the data is in database - try: - me_vehid, me_start, me_end = next(iter(cur.execute(f""" - select * from {table}meta where unit = ? and ( - (start <= ? and end > ?) or - (start < ? and end >= ?) or - (start >= ? and end <= ?) - ) order by start - """,(veh_id,start_ts,start_ts,end_ts,end_ts,start_ts,end_ts)))) - me_start = int(float(me_start)) - me_end = int(float(me_end)) - except StopIteration: - print("Stop iteration. Didnt find",\ - veh_id,start_ts,end_ts,\ - "Get errors from upstream now") + cur = self.db_connection.execute(f""" + select * from {table}meta where unit = ? and ( + (start <= ? and end > ?) or + (start < ? and end >= ?) or + (start >= ? and end <= ?) + ) order by start + """,(veh_id,start_ts,start_ts,end_ts,end_ts,start_ts,end_ts)) + first = cur.fetchone() + if first is not None: + me_vehid, me_start, me_end = first[0], first[1], first[2] + else: + if self.verbose: + print("Stop iteration. Didnt find",\ + veh_id,start_ts,end_ts,\ + "Get errors from upstream now") return self.get_general_upstream(table,veh_id,start_ts,end_ts,previter) - print("Found block",me_vehid,me_start,me_end) + me_start = int(float(me_start)) + me_end = int(float(me_end)) + + if self.verbose: + print("Found block",me_vehid,me_start,me_end) # Depending on the start and end of the next block, apply divide and conquer # by recursive calls of this function. diff --git a/pytrackunit/trackunit.py b/pytrackunit/trackunit.py index f5b0354..cba2428 100644 --- a/pytrackunit/trackunit.py +++ b/pytrackunit/trackunit.py @@ -6,6 +6,7 @@ import tqdm from .tucache import TuCache from .sqlcache import SqlCache +from .helper import SecureString def get_multi_general(func,idlist,tdelta,f_process=None,progress_bar=True): """ @@ -51,12 +52,12 @@ def __init__(self,**kwargs): if 'apikey_path' not in kwargs: kwargs['apikey_path'] = 'api.key' with open(kwargs["apikey_path"],encoding="utf8") as file_apikey: - api_key = file_apikey.readline() + api_key = SecureString(file_apikey.readline()) else: api_key = kwargs['api_key'] - if len(api_key) != 32: + if len(api_key.gets()) != 32: raise Exception("Invalid API-key length") - kwargs['auth'] = ('API',api_key) + kwargs['auth'] = (SecureString('API'),api_key) if kwargs.get('tu_use_sqlcache',False): self.cache = SqlCache(**kwargs) else: diff --git a/pytrackunit/tuiter.py b/pytrackunit/tuiter.py index a24d1fb..7a8c97e 100644 --- a/pytrackunit/tuiter.py +++ b/pytrackunit/tuiter.py @@ -20,25 +20,6 @@ async def __anext__(self): except StopIteration as exc: raise StopAsyncIteration from exc -class SqlIter: - """iterator for tucache data""" - def __init__(self, sqliter, meta=None): - self.sqliter = sqliter - self.iter_started = False - self.meta = meta - - def __aiter__(self): - if self.iter_started: - raise Exception("cant start tuiter more than once") - self.iter_started = True - return self - - async def __anext__(self): - try: - return next(self.sqliter) , self.meta - except StopIteration as exc: - raise StopAsyncIteration from exc - class TuIter: """iterator holding all internal iterators""" def __init__(self) -> None: diff --git a/pytrackunit/webcache.py b/pytrackunit/webcache.py index 1ebbc57..50bc028 100644 --- a/pytrackunit/webcache.py +++ b/pytrackunit/webcache.py @@ -40,7 +40,8 @@ def __init__(self,**kwargs): if kwargs.get('verbose',False): print("WebCaches args:",kwargs) auth_tuple = kwargs.get("auth",None) - self.auth = BasicAuth(auth_tuple[0],auth_tuple[1]) if auth_tuple is not None else None + self.auth = BasicAuth(auth_tuple[0].gets(),auth_tuple[1].gets()) \ + if auth_tuple is not None else None self.verbose = kwargs.get("verbose",False) self.dir = kwargs.get("webcache_dir","web-cache") self.dont_read_files = kwargs.get("dont_read_files",False) diff --git a/tests/test_helper.py b/tests/test_helper.py index a19445c..3ca4b4c 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -1,9 +1,4 @@ import math -from numpy import mat -import hashlib -from os import remove -from os.path import isfile -from time import sleep from copy import deepcopy from pytrackunit.helper import * @@ -12,42 +7,6 @@ END_UNIX_WITHOUT_TIME = datetime(year=2022,month=2,day=20).timestamp() START_UNIX_TS = datetime(year=2022,month=2,day=10).timestamp() -# ------ plot_can_val ------- - -def test_plot_can_val(): - propname = 'test-prop' - - testobj = {} - testobj['time'] = '2021-06-18T13:18:18.0Z' - testobj['name'] = propname - testobj['value'] = 0.0 - data = [] - for i in range(3000): - to = deepcopy(testobj) - time = datetime.fromtimestamp(END_UNIX_TS+i*100) - to['time'] = time.isoformat() - to['value'] = math.sin(i/100) - data.append(to) - - filename = 'outfile.png' - - plot_can_val(data,propname,filename) - - for i in range(1000): - if isfile(filename): - break - sleep(1) - - sha256_hash = hashlib.sha256() - with open(filename,"rb") as f: - for byte_block in iter(lambda: f.read(4096),b""): - sha256_hash.update(byte_block) - - # github doesnt like this - # assert sha256_hash.hexdigest() == '5c3922902660f8164bea9837aa2cbd9c124b5cd2571b50c1bfd522d14f00784c' - - remove(filename) - # ------ get_datetime ------- def test_get_datetime(): @@ -60,41 +19,6 @@ def test_get_datetime(): assert dt.time().second == 13 assert dt.time().microsecond == 100000 -# ------ plot_val ------- - -def test_plot_val(): - propname = 'test-prop' - - testobj = {} - testobj['time'] = '2021-06-18T13:18:18.0Z' - testobj[propname] = 0.0 - data = [] - for i in range(3000): - to = deepcopy(testobj) - time = datetime.fromtimestamp(END_UNIX_TS+i*100) - to['time'] = time.isoformat() - to[propname] = math.sin(i/100) - data.append(to) - - filename = 'outfile.png' - - plot_val(data,propname,filename) - - for i in range(1000): - if isfile(filename): - break - sleep(1) - - sha256_hash = hashlib.sha256() - with open(filename,"rb") as f: - for byte_block in iter(lambda: f.read(4096),b""): - sha256_hash.update(byte_block) - - # github doesnt like this - # assert sha256_hash.hexdigest() == '5c3922902660f8164bea9837aa2cbd9c124b5cd2571b50c1bfd522d14f00784c' - - remove(filename) - # ------ get_next_section ------- def get_dataset(id=0): diff --git a/tests/test_sqlcache.py b/tests/test_sqlcache.py index dd8249c..92c8d59 100644 --- a/tests/test_sqlcache.py +++ b/tests/test_sqlcache.py @@ -1,4 +1,5 @@ from copy import deepcopy +from tabnanny import verbose import pytest from datetime import datetime, timedelta from pytrackunit.sqlcache import * @@ -6,6 +7,8 @@ import os.path import json from dateutil.parser import isoparse +import sqlite3 +from pytrackunit.helper import SecureString pytest_plugins = ('pytest_asyncio',) @@ -16,8 +19,9 @@ START = datetime(2022,1,1,10,0,0,1000) END = datetime(2022,1,30,10,0,0,1000) MID = datetime(2022,1,15,10,0,0,1000) -API_KEY = open("api.key").read() -AUTH = ('API',API_KEY) +#API_KEY = open("api.key").read() +API_KEY = "asdfasdfasdfasdfasdfasdfasdfasdf" +AUTH = (SecureString('API'),SecureString(API_KEY)) EXAMPLE_HIST_OBJ = json.loads("""{ "accessKey": "", @@ -70,6 +74,8 @@ def __init__(self): self.testobj["description"] = "7" self.testmeta = {} self.testmeta["id"] = VEH + self.testmeta["start_ts"] = START.timestamp() + self.testmeta["end_ts"] = END.timestamp() self.modifiy_time = True def get_testobj(self, nr,time=None): obj = deepcopy(self.testobj) @@ -144,39 +150,6 @@ def get_candata_timedelta(self,veh_id,start,end,previter): self.previter.append(previter) return self.ret_candata_generator(10*(len(self.veh_id)-1),_start), 10 -class PseudoDB: - class PseudoDBCursor: - def __init__(self,throw_integrity_error=-1): - self.exm_command = [] - self.exm_data = [] - self.throw_integrity_error=throw_integrity_error - self.ex_command = [] - self.ex_data = [] - def execute(self,command,data): - print("PseudodbCursor",command,data) - self.ex_command.append(command) - self.ex_data.append(data) - if len(self.ex_data) == self.throw_integrity_error: - raise sqlite3.IntegrityError("Test Integrity Error") - def executemany(self,command,data): - self.exm_command.append(command) - self.exm_data.append(list(data)) - if len(self.exm_data) == self.throw_integrity_error: - raise sqlite3.IntegrityError("Test Integrity Error") - def __init__(self): - self.throw_integrity_error=-1 - self.commit_cnt = 0 - self.cursors = [] - self.rollback_cnt = 0 - def commit(self): - self.commit_cnt += 1 - def cursor(self): - cursor = self.PseudoDBCursor(self.throw_integrity_error) - self.cursors.append(cursor) - return cursor - def rollback(self): - self.rollback_cnt += 1 - def dict_equal(x,y): for k in x: if not x[k] == "none" and k in y: @@ -188,6 +161,16 @@ def dict_equal(x,y): return False return True +# ---------------- create_tables --------------------- + +def test_create_table(): + if os.path.isfile(db_path): + os.remove(db_path) + create_tables(db_path) + os.remove(db_path) + create_tables(db_path) + os.remove(db_path) + # ---------------- candata_item_to_sql_item --------------------- def test_candata_item_conversion(): @@ -291,67 +274,76 @@ def test_history_item_conversion(): @pytest.mark.asyncio async def test_sqlinsertiter(): + if os.path.isfile(db_path): + os.remove(db_path) + cache = CacheForTests() - db = PseudoDB() - it = SqlInsertIter("error",cache.ret_val_generator(0,MID),cache.testmeta,db) + create_tables(db_path) + it = SqlInsertIter(db_path,"error",cache.ret_val_generator(0,MID),cache.testmeta,verbose=True) async for x in it: print(x) - assert len(db.cursors) == 1 - assert len(db.cursors[0].exm_command) == 10 - assert len(db.cursors[0].exm_data) == 10 - assert len(db.cursors[0].exm_data[0]) == 1 - assert db.cursors[0].exm_command[0] == "INSERT INTO error VALUES (?,?,?,?,?,?,?)" - #assert db.cursors[0].exm_data[-1][0] == error_item_to_sql_item(cache.testobj,cache.testmeta) + with sqlite3.connect(db_path) as db: + with db: + cur = db.execute("select count(*) from error") + assert 10 == cur.fetchone()[0] + + + @pytest.mark.asyncio async def test_sqlinsertiter_integrity_error(): + if os.path.isfile(db_path): + os.remove(db_path) + + db = sqlite3.connect(db_path) + cache = CacheForTests() - db = PseudoDB() - db.throw_integrity_error=5 - it = SqlInsertIter("error",cache.ret_val_generator(0,START),cache.testmeta,db) - cnt = 0 + create_tables(db_path) + it = SqlInsertIter(db_path,"error",cache.ret_val_generator(0,MID),cache.testmeta,verbose=True) + async for x in it: + print(x) with pytest.raises(sqlite3.IntegrityError) as exc: + it = SqlInsertIter(db_path,"error",cache.ret_val_generator(0,MID),cache.testmeta,verbose=True) async for x in it: print(x) - cnt += 1 - - assert len(db.cursors) == 1 - assert len(db.cursors[0].exm_data) == 5 - assert len(db.cursors[0].ex_data) == 1 - assert db.rollback_cnt == 1 - assert db.commit_cnt == 0 - - print("second attempt") - db.throw_integrity_error=-1 + db.execute("delete from error") + db.execute("delete from errormeta") + db.commit() - it = SqlInsertIter("error",cache.ret_val_generator(0,START),cache.testmeta,db) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.get_testobj(cnt,START) - cnt += 1 + cur = db.execute("select count(*) from error") + assert 0 == cur.fetchone()[0] + + it = SqlInsertIter(db_path,"error",cache.ret_val_generator(0,MID),cache.testmeta,verbose=True) + async for x in it: + print(x) - assert len(db.cursors) == 2 - assert len(db.cursors[0].exm_data) == 5 - assert len(db.cursors[0].ex_data) == 1 - assert len(db.cursors[1].exm_data) == 10 - assert len(db.cursors[1].ex_data) == 0 - assert db.rollback_cnt == 1 - assert db.commit_cnt == 1 + cur = db.cursor() + cur = db.execute("select count(*) from error") + cnt = cur.fetchone()[0] + assert 10 == cnt + db.close() # ---------------- SqlCache init --------------------- def test_init(): - cache = SqlCache(db_path=db_path) - -# ---------------- SqlCache clean --------------------- - -def test_clean(): - cache = SqlCache(db_path=db_path) - assert os.path.isfile(db_path) - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(db_path=db_path) as cache: + pass + os.remove(db_path) + +# ---------------- SqlCache reset --------------------- + +def test_reset(): + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(db_path=db_path) as cache: + assert os.path.isfile(db_path) + cache.reset(True) + assert os.path.isfile(db_path) + os.remove(db_path) assert not os.path.isfile(db_path) # ---------------- SqlCache get_general_upstream --------------------- @@ -363,323 +355,352 @@ def test_clean(): # ---------------- SqlCache get_faults_timedelta --------------------- def test_get_faults_timedelta(): - cache = SqlCache(db_path=db_path) - cache.clean() - cache = SqlCache(auth=AUTH,_dir=None,db_path=db_path) - cache.get_faults_timedelta(VEH,START,END) - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + + with SqlCache(auth=AUTH,_dir=None,db_path=db_path) as cache: + cache.get_faults_timedelta(VEH,START,END) def test_get_faults_twice(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path) - cache.get_faults_timedelta(VEH,START,END) - res = cache.get_faults_timedelta(VEH,START,END) - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + + with SqlCache(auth=AUTH,_dir=None,db_path=db_path) as cache: + cache.get_faults_timedelta(VEH,START,END) + res = cache.get_faults_timedelta(VEH,START,END) @pytest.mark.asyncio async def test_with_mock_same_block(): - cache = SqlCache(auth=AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - for x in cache.cache1.start: - assert x == START - for x in cache.cache1.end: - assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth=AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + for x in cache.cache1.start: + assert x == START + for x in cache.cache1.end: + assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_with_mock_smaller_block(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.cache1.modifiy_time = False - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START+timedelta(days=14)) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START+timedelta(days=10),END-timedelta(days=10)) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START+timedelta(days=14)) - cnt += 1 - assert cnt == 10 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - for x in cache.cache1.start: - assert x == START - for x in cache.cache1.end: - assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.cache1.modifiy_time = False + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,MID) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START+timedelta(days=10),END-timedelta(days=10)) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START+timedelta(days=14)) + cnt += 1 + assert cnt == 10 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + for x in cache.cache1.start: + assert x == START + for x in cache.cache1.end: + assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_with_mock_bigger_block(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START-timedelta(days=10),END+timedelta(days=10)) - assert len(it.iterators) == 3 - cnt = 0 - async for x,meta in it.iterators[0]: - assert len(x) == 1 - print(cnt) - assert x[0] == cache.cache1.get_testobj(cnt+10,START-timedelta(days=10)) - cnt += 1 - assert cnt == 10 - print(it.iterators[1]) - async for x,meta in it.iterators[1]: - assert len(x) == 1 - print(cnt) - assert x[0] == cache.cache1.get_testobj(cnt-10,time=START) - cnt += 1 - assert cnt == 20 - async for x,meta in it.iterators[2]: - assert len(x) == 1 - print(cnt) - assert x[0] == cache.cache1.get_testobj(cnt,time=END+timedelta(milliseconds=1)) - cnt += 1 - assert cnt == 30 - assert len(cache.cache1.veh_id) == 3 - assert len(cache.cache1.start) == 3 - assert len(cache.cache1.end) == 3 - assert len(cache.cache1.previter) == 3 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START-timedelta(days=10),END+timedelta(days=10)) + assert len(it.iterators) == 3 + cnt = 0 + async for x,meta in it.iterators[0]: + assert len(x) == 1 + print(cnt) + assert x[0] == cache.cache1.get_testobj(cnt+10,START-timedelta(days=10)) + cnt += 1 + assert cnt == 10 + print(it.iterators[1]) + async for x,meta in it.iterators[1]: + assert len(x) == 1 + print(cnt) + assert x[0] == cache.cache1.get_testobj(cnt-10,time=START) + cnt += 1 + assert cnt == 20 + async for x,meta in it.iterators[2]: + assert len(x) == 1 + print(cnt) + assert x[0] == cache.cache1.get_testobj(cnt,time=END+timedelta(milliseconds=1)) + cnt += 1 + assert cnt == 30 + assert len(cache.cache1.veh_id) == 3 + assert len(cache.cache1.start) == 3 + assert len(cache.cache1.end) == 3 + assert len(cache.cache1.previter) == 3 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_with_mock_part_bigger_block_1(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START-timedelta(days=10),END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 2 - assert len(cache.cache1.start) == 2 - assert len(cache.cache1.end) == 2 - assert len(cache.cache1.previter) == 2 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START-timedelta(days=10),END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 2 + assert len(cache.cache1.start) == 2 + assert len(cache.cache1.end) == 2 + assert len(cache.cache1.previter) == 2 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_with_mock_part_bigger_block_2(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START,END+timedelta(days=10)) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 2 - assert len(cache.cache1.start) == 2 - assert len(cache.cache1.end) == 2 - assert len(cache.cache1.previter) == 2 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START,END+timedelta(days=10)) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 2 + assert len(cache.cache1.start) == 2 + assert len(cache.cache1.end) == 2 + assert len(cache.cache1.previter) == 2 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_with_mock_overlapping_1(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START-timedelta(days=10),END-timedelta(days=10)) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 2 - assert len(cache.cache1.start) == 2 - assert len(cache.cache1.end) == 2 - assert len(cache.cache1.previter) == 2 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START-timedelta(days=10),END-timedelta(days=10)) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 2 + assert len(cache.cache1.start) == 2 + assert len(cache.cache1.end) == 2 + assert len(cache.cache1.previter) == 2 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_with_mock_overlapping_2(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - it, cnt = cache.get_faults_timedelta(VEH,START,END) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,START) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults_timedelta(VEH,START+timedelta(days=10),END+timedelta(days=10)) - async for x,meta in it: - assert len(x) == 1 - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 2 - assert len(cache.cache1.start) == 2 - assert len(cache.cache1.end) == 2 - assert len(cache.cache1.previter) == 2 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + it, cnt = cache.get_faults_timedelta(VEH,START,END) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,START) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults_timedelta(VEH,START+timedelta(days=10),END+timedelta(days=10)) + async for x,meta in it: + assert len(x) == 1 + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 2 + assert len(cache.cache1.start) == 2 + assert len(cache.cache1.end) == 2 + assert len(cache.cache1.previter) == 2 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) # ---------------- SqlCache get_faults --------------------- @pytest.mark.asyncio async def test_get_faults_tdelta_timedelta(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.tdelta_end = END - start,end = start_end_from_tdelta(END-START,END) - it, cnt = cache.get_faults(VEH,END-START) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,start) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults(VEH,END-START) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,start) - cnt += 1 - assert cnt == 10 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - for x in cache.cache1.start: - pass - #assert x == START - for x in cache.cache1.end: - pass - #assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.tdelta_end = END + start,end = start_end_from_tdelta(END-START,END) + it, cnt = cache.get_faults(VEH,END-START) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,start) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults(VEH,END-START) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,start) + cnt += 1 + assert cnt == 10 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + for x in cache.cache1.start: + pass + #assert x == START + for x in cache.cache1.end: + pass + #assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_get_faults_tdelta_int_days(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.tdelta_end = END - start,end = start_end_from_tdelta(30,END) - it, cnt = cache.get_faults(VEH,30) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt,start) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_faults(VEH,30) - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_testobj(cnt-10,start) - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.tdelta_end = END + start,end = start_end_from_tdelta(30,END) + it, cnt = cache.get_faults(VEH,30) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt,start) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_faults(VEH,30) + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_testobj(cnt-10,start) + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) # ---------------- SqlCache get_history_timedelta --------------------- @@ -687,76 +708,82 @@ async def test_get_faults_tdelta_int_days(): @pytest.mark.asyncio async def test_get_history_tdelta_timedelta(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.tdelta_end = END - start,end = start_end_from_tdelta(END-START,END) - it, cnt = cache.get_history(VEH,END-START) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_hist_obj(cnt,start) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_history(VEH,END-START) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert dict_equal(x[0],cache.cache1.get_testobj(cnt,start)) - cnt += 1 - assert cnt == 10 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - for x in cache.cache1.start: - pass - #assert x == START - for x in cache.cache1.end: - pass - #assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.tdelta_end = END + start,end = start_end_from_tdelta(END-START,END) + it, cnt = cache.get_history(VEH,END-START) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_hist_obj(cnt,start) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_history(VEH,END-START) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert dict_equal(x[0],cache.cache1.get_testobj(cnt,start)) + cnt += 1 + assert cnt == 10 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + for x in cache.cache1.start: + pass + #assert x == START + for x in cache.cache1.end: + pass + #assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_get_history_tdelta_int_days(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.tdelta_end = END - start,end = start_end_from_tdelta(30,END) - it, cnt = cache.get_history(VEH,30) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_hist_obj(cnt,start) - cnt += 1 - assert cnt == 10 - print("second get") - cnt = 0 - print(cache.cache1.get_testobj(0,start)) - print(cache.cache1.get_testobj(1,start)) - it, cnt = cache.get_history(VEH,30) - async for x,meta in it: - assert len(x) == 1 - # if not dict_equal(x[0],cache.cache1.get_testobj(cnt,start)): - # assert x[0] == cache.cache1.get_testobj(cnt,start) - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.tdelta_end = END + start,end = start_end_from_tdelta(30,END) + it, cnt = cache.get_history(VEH,30) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_hist_obj(cnt,start) + cnt += 1 + assert cnt == 10 + print("second get") + cnt = 0 + print(cache.cache1.get_testobj(0,start)) + print(cache.cache1.get_testobj(1,start)) + it, cnt = cache.get_history(VEH,30) + async for x,meta in it: + assert len(x) == 1 + # if not dict_equal(x[0],cache.cache1.get_testobj(cnt,start)): + # assert x[0] == cache.cache1.get_testobj(cnt,start) + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) # ---------------- SqlCache get_candata_timedelta --------------------- @@ -764,72 +791,77 @@ async def test_get_history_tdelta_int_days(): @pytest.mark.asyncio async def test_get_candata_tdelta_timedelta(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.tdelta_end = END - start,end = start_end_from_tdelta(END-START,END) - it, cnt = cache.get_candata(VEH,END-START) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_candata_obj(cnt,start) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_candata(VEH,END-START) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - if not dict_equal(x[0] , cache.cache1.get_candata_obj(cnt,start)): + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.tdelta_end = END + start,end = start_end_from_tdelta(END-START,END) + it, cnt = cache.get_candata(VEH,END-START) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 assert x[0] == cache.cache1.get_candata_obj(cnt,start) - cnt += 1 - assert cnt == 10 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - for x in cache.cache1.start: - pass - #assert x == START - for x in cache.cache1.end: - pass - #assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_candata(VEH,END-START) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + if not dict_equal(x[0] , cache.cache1.get_candata_obj(cnt,start)): + assert x[0] == cache.cache1.get_candata_obj(cnt,start) + cnt += 1 + assert cnt == 10 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + for x in cache.cache1.start: + pass + #assert x == START + for x in cache.cache1.end: + pass + #assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path) @pytest.mark.asyncio async def test_get_candata_tdelta_int_days(): - cache = SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests()) - cache.tdelta_end = END - start,end = start_end_from_tdelta(30,END) - it, cnt = cache.get_candata(VEH,30) - cnt = 0 - async for x,meta in it: - assert len(x) == 1 - assert x[0] == cache.cache1.get_candata_obj(cnt,start) - cnt += 1 - assert cnt == 10 - print("second get") - it, cnt = cache.get_candata(VEH,30) - async for x,meta in it: - assert len(x) == 1 - if not dict_equal(x[0] , cache.cache1.get_candata_obj(cnt-10,start)): - assert x[0] == cache.cache1.get_candata_obj(cnt-10,start) - cnt += 1 - assert cnt == 20 - assert len(cache.cache1.veh_id) == 1 - assert len(cache.cache1.start) == 1 - assert len(cache.cache1.end) == 1 - assert len(cache.cache1.previter) == 1 - for x in cache.cache1.veh_id: - assert x == VEH - # for x in cache.cache1.start: - # assert x == START - # for x in cache.cache1.end: - # assert x == END - for x in cache.cache1.previter: - assert x == None - cache.clean() - + if os.path.isfile(db_path): + os.remove(db_path) + with SqlCache(auth = AUTH,_dir=None,db_path=db_path,sql_cache1=CacheForTests(),verbose=True) as cache: + cache.tdelta_end = END + start,end = start_end_from_tdelta(30,END) + it, cnt = cache.get_candata(VEH,30) + cnt = 0 + async for x,meta in it: + assert len(x) == 1 + assert x[0] == cache.cache1.get_candata_obj(cnt,start) + cnt += 1 + assert cnt == 10 + print("second get") + it, cnt = cache.get_candata(VEH,30) + async for x,meta in it: + assert len(x) == 1 + if not dict_equal(x[0] , cache.cache1.get_candata_obj(cnt-10,start)): + assert x[0] == cache.cache1.get_candata_obj(cnt-10,start) + cnt += 1 + assert cnt == 20 + assert len(cache.cache1.veh_id) == 1 + assert len(cache.cache1.start) == 1 + assert len(cache.cache1.end) == 1 + assert len(cache.cache1.previter) == 1 + for x in cache.cache1.veh_id: + assert x == VEH + # for x in cache.cache1.start: + # assert x == START + # for x in cache.cache1.end: + # assert x == END + for x in cache.cache1.previter: + assert x == None + if os.path.isfile(db_path): + os.remove(db_path)