diff --git a/sources/Framework/org.ohdsi.cdm.framework/Common/Extensions/DataRecordExtensions.cs b/sources/Framework/org.ohdsi.cdm.framework/Common/Extensions/DataRecordExtensions.cs index 6a8c5d10..b3b26310 100644 --- a/sources/Framework/org.ohdsi.cdm.framework/Common/Extensions/DataRecordExtensions.cs +++ b/sources/Framework/org.ohdsi.cdm.framework/Common/Extensions/DataRecordExtensions.cs @@ -6,7 +6,7 @@ public static class DataRecordExtensions { private static string GetErrorMeassge(IDataRecord reader, string fieldName, string methodName) { - return $"DataRecordExtensions.{methodName} | {fieldName}={GetValue(reader, fieldName)}"; + return $"DataRecordExtensions.{methodName} | {fieldName}={GetValueString(reader, fieldName)}"; } public static string GetString(this IDataRecord reader, string fieldName) @@ -16,14 +16,13 @@ public static string GetString(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return null; - var value = GetValue(reader, fieldName); + var value = GetValueString(reader, fieldName); - if (value is null || value is DBNull) + if (value is null) return null; - var strValue = value.ToString(); - - return string.Intern(strValue.Trim()); + //return value; + return string.Intern(value); } catch (Exception e) { @@ -31,26 +30,6 @@ public static string GetString(this IDataRecord reader, string fieldName) } } - public static TimeSpan? GetTime(this IDataRecord reader, string fieldName) - { - try - { - if (string.IsNullOrEmpty(fieldName)) - return null; - - if (DateTime.TryParse(reader.GetString(fieldName), out var dt)) - { - return dt.TimeOfDay; - } - - return null; - } - catch (Exception e) - { - throw new Exception(GetErrorMeassge(reader, fieldName, "GetTime()"), e); - } - } - public static int? GetIntSafe(this IDataRecord reader, string fieldName) { try @@ -58,11 +37,11 @@ public static string GetString(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return null; - var value = GetValue(reader, fieldName); - if (value is DBNull || string.IsNullOrEmpty(value.ToString())) + var value = GetValueString(reader, fieldName); + if (value is null) return null; - if (int.TryParse(value.ToString(), out var result)) + if (int.TryParse(value, out var result)) return result; return null; @@ -80,8 +59,9 @@ public static string GetString(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return null; - var value = GetValue(reader, fieldName); - if (value is DBNull || string.IsNullOrEmpty(value.ToString())) + var value = GetValueString(reader, fieldName); + + if (string.IsNullOrEmpty(value)) return null; return Convert.ToInt32(value); @@ -99,11 +79,11 @@ public static string GetString(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return null; - var value = GetValue(reader, fieldName); - if (value is DBNull || string.IsNullOrEmpty(value.ToString())) + var value = GetValueString(reader, fieldName); + if (value is null) return null; - int.TryParse(value.ToString(), out var res); + int.TryParse(value, out var res); return res; } @@ -120,12 +100,12 @@ public static string GetString(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return null; - var value = GetValue(reader, fieldName); + var value = GetValueString(reader, fieldName); - if (value is DBNull || string.IsNullOrEmpty(value.ToString())) + if (value is null) return null; - decimal.TryParse(value.ToString(), out var res); + decimal.TryParse(value, out var res); return res; } @@ -142,11 +122,11 @@ public static DateTime GetDateTime(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return DateTime.MinValue; - var result = GetValue(reader, fieldName) as DateTime?; + var result = reader[fieldName] as DateTime?; if (!result.HasValue) { - var dateTimeString = GetValue(reader, fieldName).ToString(); + var dateTimeString = GetValueString(reader, fieldName); if (!string.IsNullOrEmpty(dateTimeString) && DateTime.TryParse(dateTimeString, out var dateTime)) { @@ -173,9 +153,9 @@ public static DateTime GetDateTime(this IDataRecord reader, string fieldName) if (string.IsNullOrEmpty(fieldName)) return null; - var value = GetValue(reader, fieldName); + var value = GetValueString(reader, fieldName); - if (value is DBNull || string.IsNullOrEmpty(value.ToString())) + if (string.IsNullOrEmpty(value)) return null; return Convert.ToInt64(value); @@ -186,11 +166,20 @@ public static DateTime GetDateTime(this IDataRecord reader, string fieldName) } } - private static object GetValue(IDataRecord reader, string fieldName) + private static string GetValueString(IDataRecord reader, string fieldName) { try { - return reader[fieldName]; + var value = reader[fieldName]; + + if (value is DBNull || value is null) + return null; + + var valStr = value.ToString().Trim(); + if (valStr == "\\N") + return null; + + return valStr; } catch (Exception e) { diff --git a/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/Lookup.cs b/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/Lookup.cs index 82b1571f..829d3643 100644 --- a/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/Lookup.cs +++ b/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/Lookup.cs @@ -217,10 +217,15 @@ public void Fill(AmazonS3Client client, string bucket, string prefix) { foreach (var v2 in v1.Values) { + v2.SourceConcepts?.TrimExcess(); v2.Ingredients?.TrimExcess(); } + + v1.TrimExcess(); } + _lookup.TrimExcess(); + GC.Collect(); } diff --git a/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/PregnancyConcepts.cs b/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/PregnancyConcepts.cs index 49e47667..6cd3a3ca 100644 --- a/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/PregnancyConcepts.cs +++ b/sources/Framework/org.ohdsi.cdm.framework/Common/Lookups/PregnancyConcepts.cs @@ -57,7 +57,7 @@ public PregnancyConcepts(string folder) GestValue = gestValue }); } - + _dictionary.TrimExcess(); } public IEnumerable GetConcepts(long conceptId) diff --git a/sources/Framework/org.ohdsi.cdm.framework/Desktop/Controllers/ChunkController.cs b/sources/Framework/org.ohdsi.cdm.framework/Desktop/Controllers/ChunkController.cs index 272b1393..9898789a 100644 --- a/sources/Framework/org.ohdsi.cdm.framework/Desktop/Controllers/ChunkController.cs +++ b/sources/Framework/org.ohdsi.cdm.framework/Desktop/Controllers/ChunkController.cs @@ -242,8 +242,9 @@ private void MoveChunkRawData(int chunkId, string baseFolder) var unloadQuery = string.Format(@"create table {0} sortkey ({1}) distkey ({1}) as {2}; " + @"UNLOAD ('select * from {0} order by {1}') to 's3://{3}' " + @"DELIMITER AS '\t' " + + @"NULL AS '\\N' " + @"credentials 'aws_access_key_id={4};aws_secret_access_key={5}' " + - @"GZIP ALLOWOVERWRITE PARALLEL ON", + @"ZSTD ALLOWOVERWRITE PARALLEL ON", tmpTableName, //0 personIdField, //1 sql, //2 diff --git a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Base/LambdaChunkPart.cs b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Base/LambdaChunkPart.cs index db991e46..fc719f3e 100644 --- a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Base/LambdaChunkPart.cs +++ b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Base/LambdaChunkPart.cs @@ -29,7 +29,7 @@ public class LambdaChunkPart : IDisposable private long? _lastSavedPersonId; private readonly System.Timers.Timer _watchdog; - private readonly Dictionary _readers = []; + private readonly Dictionary _readers = []; private Dictionary _restorePoint = []; private readonly string _tmpFolder; private bool _readRestarted; @@ -54,7 +54,7 @@ public LambdaChunkPart(int chunkId, Func createPersonBuilder, st private void Watchdog_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { - if (!string.IsNullOrEmpty(_currentReaderName) && _readers != null && _readers.Count > 0 && _readers.TryGetValue(_currentReaderName, out S3DataReader3 value)) + if (!string.IsNullOrEmpty(_currentReaderName) && _readers != null && _readers.Count > 0 && _readers.TryGetValue(_currentReaderName, out S3DataReaderZstd value)) { if (value.IdleTime != TimeSpan.Zero && value.IdleTime.TotalSeconds > 10) @@ -105,7 +105,7 @@ private void Watchdog_Elapsed(object sender, System.Timers.ElapsedEventArgs e) if (_restorePoint.TryGetValue(qd.FileName, out long valueFileName)) initRow = valueFileName; - _readers.Add(qd.FileName, new S3DataReader3(Settings.Current.Bucket, folder, + _readers.Add(qd.FileName, new S3DataReaderZstd(Settings.Current.Bucket, folder, Settings.Current.S3AwsAccessKeyId, Settings.Current.S3AwsSecretAccessKey, _chunkId, qd.FileName, qd.FieldHeaders, _prefix, initRow, _tmpFolder)); @@ -251,7 +251,7 @@ private void Watchdog_Elapsed(object sender, System.Timers.ElapsedEventArgs e) if (_restorePoint.ContainsKey(qd.FileName)) initRow = _restorePoint[qd.FileName]; - _readers.Add(qd.FileName, new S3DataReader3(Settings.Current.Bucket, folder, + _readers.Add(qd.FileName, new S3DataReaderZstd(Settings.Current.Bucket, folder, Settings.Current.S3AwsAccessKeyId, Settings.Current.S3AwsSecretAccessKey, _chunkId, qd.FileName, qd.FieldHeaders, _prefix, initRow, _tmpFolder)); @@ -593,17 +593,17 @@ private void AddEntity(QueryDefinition queryDefinition, IEnumerable _fieldHeaders; + // private readonly StringSplitter _spliter; + // private readonly string _prefix; + + // private FileStream _stream; + // private BufferedStream _bufferedStream; + // private GZipStream _gzipStream; + // private StreamReader _reader; + // private readonly string _tmpFolder; + // private long _rowIndex; + + // private readonly string _awsAccessKeyId; + // private readonly string _awsSecretAccessKey; + // private DateTime _lastReadTime; + + // public long RowIndex + // { + // get { return _rowIndex; } + // } + + // public TimeSpan IdleTime + // { + // get + // { + // if (_lastReadTime == DateTime.MinValue) + // return TimeSpan.Zero; + + // return DateTime.Now.Subtract(_lastReadTime); + // } + // } + + // public bool Paused { get; private set; } = false; + + // public string TmpFolder + // { + // get + // { + // if (string.IsNullOrEmpty(_tmpFolder)) + // return "/tmp"; + + // return $"/tmp/{_tmpFolder}"; + // } + // } + + // public void Pause() + // { + // Paused = true; + // } + + // public void Resume() + // { + // Paused = false; + // } + + // public S3DataReaderGzip(string bucket, string folder, string awsAccessKeyId, + // string awsSecretAccessKey, int chunkId, string fileName, Dictionary fieldHeaders, string prefix, long initRow, string tmpFolder) + // { + // _chunkId = chunkId; + // _bucket = bucket; + // _folder = folder; + // _fileName = fileName; + // _fieldHeaders = fieldHeaders; + // _prefix = prefix; + // _spliter = new StringSplitter(this._fieldHeaders.Count); + // _awsAccessKeyId = awsAccessKeyId; + // _awsSecretAccessKey = awsSecretAccessKey; + // _tmpFolder = tmpFolder; + + // Init(initRow); + // } + + // private void Init(long initRow) + // { + // if (!Directory.Exists($@"{TmpFolder}/raw/")) + // Directory.CreateDirectory($@"{TmpFolder}/raw/"); + + // Close(); + // Dispose(); + + // var config = new AmazonS3Config + // { + // RegionEndpoint = Amazon.RegionEndpoint.USEast1, + // BufferSize = 512 * 1024, + // MaxErrorRetry = 20 + // }; + + // _localFileName = $"{_fileName}{_prefix}_part_00.gz"; + + // Console.WriteLine(_localFileName + " " + initRow); + + // using (var client = new AmazonS3Client(_awsAccessKeyId, _awsSecretAccessKey, config)) + // using (var transferUtility = new TransferUtility(client)) + // { + // transferUtility.Download($"{TmpFolder}/raw/{_localFileName}", _bucket, $"{_folder}/{_chunkId}/{_fileName}/{_fileName}{_prefix}_part_00.gz"); + // Console.WriteLine(_localFileName + " MOVED"); + // } + + // _stream = File.Open($"{TmpFolder}/raw/{_localFileName}", FileMode.Open, FileAccess.Read); + + // _bufferedStream = new BufferedStream(_stream); + // _gzipStream = new GZipStream(_bufferedStream, CompressionMode.Decompress); + // _reader = new StreamReader(_gzipStream, Encoding.Default); + + // _rowIndex = initRow; + // for (var i = 0; i < initRow; i++) + // { + // _reader.ReadLine(); + // } + + // if (initRow > 0) + // Console.WriteLine($"{_fileName}{_prefix}_part_00.gz; Rows skipped={initRow}"); + + // _lastReadTime = DateTime.MinValue; + + // } + + // public void Close() + // { + // _stream?.Close(); + // _bufferedStream?.Close(); + // _gzipStream?.Close(); + // _reader?.Close(); + // } + + // public void Dispose() + // { + // _stream?.Dispose(); + // _bufferedStream?.Dispose(); + // _gzipStream?.Dispose(); + + // _reader?.Dispose(); + + // GC.Collect(); + // GC.WaitForPendingFinalizers(); + + // if (File.Exists($"{TmpFolder}/raw/{_localFileName}")) + // File.Delete($"{TmpFolder}/raw/{_localFileName}"); + // } + + // public void Restart() + // { + // Console.WriteLine($"{_fileName}{_prefix} - restarting... (IdleTime={IdleTime.TotalSeconds})"); + // Close(); + // Dispose(); + // } + + // public bool Read() + // { + // if (Paused) + // return true; + + // var attempt = 0; + // while (true) + // { + // try + // { + // attempt++; + // string line; + + // while ((line = _reader.ReadLine()) != null) + // { + // _lastReadTime = DateTime.Now; + + // if (!string.IsNullOrEmpty(line)) + // { + + // _spliter.Split(line, '\t'); + + // _currentLine = _spliter.Results; + // _rowIndex++; + + // return true; + // } + // } + + // _lastReadTime = DateTime.MinValue; + // return false; + // } + // catch (Exception e) + // { + // _lastReadTime = DateTime.Now; + + // Console.WriteLine($"{_fileName}{_prefix} S3DataReader3 attempt={attempt} | Exception={e.Message}"); + // Init(_rowIndex); + // if (attempt > 5) + // { + // Console.WriteLine("WARN_EXC - Read - throw"); + // Console.WriteLine(e.Message); + // Console.WriteLine(e.StackTrace); + // throw; + // } + // } + // } + // } + + // object IDataRecord.this[int i] + // { + // get + // { + // return _currentLine[i]; + // } + // } + + // object IDataRecord.this[string name] + // { + // get + // { + // return _currentLine[_fieldHeaders[name]]; + // } + // } + + // public int FieldCount { get; private set; } + + // #region NotImplemented + // public string GetName(int i) + // { + // throw new NotImplementedException(); + // } + + // public string GetDataTypeName(int i) + // { + // throw new NotImplementedException(); + // } + + // public Type GetFieldType(int i) + // { + // throw new NotImplementedException(); + // } + + // public object GetValue(int i) + // { + // throw new NotImplementedException(); + // } + + // public int GetValues(object[] values) + // { + // var cnt = 0; + // for (var i = 0; i < FieldCount; i++) + // { + // values[i] = GetValue(i); + // cnt++; + // } + + // return cnt; + // } + + // public int GetOrdinal(string name) + // { + // throw new NotImplementedException(); + // } + + // public bool GetBoolean(int i) + // { + // throw new NotImplementedException(); + // } + + // public byte GetByte(int i) + // { + // throw new NotImplementedException(); + // } + + // public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) + // { + // throw new NotImplementedException(); + // } + + // public char GetChar(int i) + // { + // throw new NotImplementedException(); + // } + + // public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) + // { + // throw new NotImplementedException(); + // } + + // public Guid GetGuid(int i) + // { + // throw new NotImplementedException(); + // } + + // public short GetInt16(int i) + // { + // throw new NotImplementedException(); + // } + + // public int GetInt32(int i) + // { + // throw new NotImplementedException(); + // } + + // public long GetInt64(int i) + // { + // throw new NotImplementedException(); + // } + + // public float GetFloat(int i) + // { + // throw new NotImplementedException(); + // } + + // public double GetDouble(int i) + // { + // throw new NotImplementedException(); + // } + + // public string GetString(int i) + // { + // throw new NotImplementedException(); + // } + + // public decimal GetDecimal(int i) + // { + // throw new NotImplementedException(); + // } + + // public DateTime GetDateTime(int i) + // { + // throw new NotImplementedException(); + // } + + // public IDataReader GetData(int i) + // { + // throw new NotImplementedException(); + // } + + // public bool IsDBNull(int i) + // { + // throw new NotImplementedException(); + // } + + // public DataTable GetSchemaTable() + // { + // throw new NotImplementedException(); + // } + + // public bool NextResult() + // { + // throw new NotImplementedException(); + // } + + // public int Depth { get; private set; } + // public bool IsClosed { get; private set; } + // public int RecordsAffected { get; private set; } + + + + // #endregion + //} +} diff --git a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/S3DataReader3.cs b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/S3DataReaderZstd.cs similarity index 94% rename from sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/S3DataReader3.cs rename to sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/S3DataReaderZstd.cs index 4a1864a7..dca198ca 100644 --- a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/S3DataReader3.cs +++ b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/S3DataReaderZstd.cs @@ -5,12 +5,12 @@ using System.Collections.Generic; using System.Data; using System.IO; -using System.IO.Compression; using System.Text; +using ZstdSharp; namespace org.ohdsi.cdm.presentation.lambdabuilder { - public class S3DataReader3 : IDataReader + public class S3DataReaderZstd : IDataReader { private string[] _currentLine; @@ -25,7 +25,7 @@ public class S3DataReader3 : IDataReader private FileStream _stream; private BufferedStream _bufferedStream; - private GZipStream _gzipStream; + private DecompressionStream _zstdStream; private StreamReader _reader; private readonly string _tmpFolder; private long _rowIndex; @@ -73,7 +73,7 @@ public void Resume() Paused = false; } - public S3DataReader3(string bucket, string folder, string awsAccessKeyId, + public S3DataReaderZstd(string bucket, string folder, string awsAccessKeyId, string awsSecretAccessKey, int chunkId, string fileName, Dictionary fieldHeaders, string prefix, long initRow, string tmpFolder) { _chunkId = chunkId; @@ -105,22 +105,22 @@ private void Init(long initRow) MaxErrorRetry = 20 }; - _localFileName = $"{_fileName}{_prefix}_part_00.gz"; + _localFileName = $"{_fileName}{_prefix}_part_00.zst"; Console.WriteLine(_localFileName + " " + initRow); using (var client = new AmazonS3Client(_awsAccessKeyId, _awsSecretAccessKey, config)) using (var transferUtility = new TransferUtility(client)) { - transferUtility.Download($"{TmpFolder}/raw/{_localFileName}", _bucket, $"{_folder}/{_chunkId}/{_fileName}/{_fileName}{_prefix}_part_00.gz"); + transferUtility.Download($"{TmpFolder}/raw/{_localFileName}", _bucket, $"{_folder}/{_chunkId}/{_fileName}/{_fileName}{_prefix}_part_00.zst"); Console.WriteLine(_localFileName + " MOVED"); } _stream = File.Open($"{TmpFolder}/raw/{_localFileName}", FileMode.Open, FileAccess.Read); _bufferedStream = new BufferedStream(_stream); - _gzipStream = new GZipStream(_bufferedStream, CompressionMode.Decompress); - _reader = new StreamReader(_gzipStream, Encoding.Default); + _zstdStream = new DecompressionStream(_bufferedStream); + _reader = new StreamReader(_zstdStream, Encoding.Default); _rowIndex = initRow; for (var i = 0; i < initRow; i++) @@ -129,7 +129,7 @@ private void Init(long initRow) } if (initRow > 0) - Console.WriteLine($"{_fileName}{_prefix}_part_00.gz; Rows skipped={initRow}"); + Console.WriteLine($"{_fileName}{_prefix}_part_00.zst; Rows skipped={initRow}"); _lastReadTime = DateTime.MinValue; @@ -139,7 +139,7 @@ public void Close() { _stream?.Close(); _bufferedStream?.Close(); - _gzipStream?.Close(); + _zstdStream?.Close(); _reader?.Close(); } @@ -147,7 +147,7 @@ public void Dispose() { _stream?.Dispose(); _bufferedStream?.Dispose(); - _gzipStream?.Dispose(); + _zstdStream?.Dispose(); _reader?.Dispose(); diff --git a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Vocabulary.cs b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Vocabulary.cs index 83866157..2bfe6c7f 100644 --- a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Vocabulary.cs +++ b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/Vocabulary.cs @@ -195,6 +195,8 @@ public void Fill(bool forLookup, bool readFromS3) _conceptIdToSourceVocabularyId.Add(long.Parse(spliter.Results[0]), new Tuple(spliter.Results[1], spliter.Results[2])); } } + + _conceptIdToSourceVocabularyId.TrimExcess(); } } diff --git a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/org.ohdsi.cdm.presentation.lambdabuilder.csproj b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/org.ohdsi.cdm.presentation.lambdabuilder.csproj index 9e15a641..319c5b26 100644 --- a/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/org.ohdsi.cdm.presentation.lambdabuilder.csproj +++ b/sources/Presentation/org.ohdsi.cdm.presentation.lambdabuilder/org.ohdsi.cdm.presentation.lambdabuilder.csproj @@ -34,6 +34,7 @@ +