Skip to content

Commit

Permalink
Merge pull request #120 from OHDSI/null-empty-string
Browse files Browse the repository at this point in the history
null empty string fix + changed compression gz to zstd
  • Loading branch information
bradanton authored Sep 13, 2024
2 parents b3d5e9f + 59acd66 commit 8deac82
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public static class DataRecordExtensions
{
private static string GetErrorMeassge(IDataRecord reader, string fieldName, string methodName)
{
return $"DataRecordExtensions.{methodName} | {fieldName}={GetValue(reader, fieldName)}";
return $"DataRecordExtensions.{methodName} | {fieldName}={GetValueString(reader, fieldName)}";
}

public static string GetString(this IDataRecord reader, string fieldName)
Expand All @@ -16,53 +16,32 @@ public static string GetString(this IDataRecord reader, string fieldName)
if (string.IsNullOrEmpty(fieldName))
return null;

var value = GetValue(reader, fieldName);
var value = GetValueString(reader, fieldName);

if (value is null || value is DBNull)
if (value is null)
return null;

var strValue = value.ToString();

return string.Intern(strValue.Trim());
//return value;
return string.Intern(value);
}
catch (Exception e)
{
throw new Exception(GetErrorMeassge(reader, fieldName, "GetString()"), e);
}
}

public static TimeSpan? GetTime(this IDataRecord reader, string fieldName)
{
try
{
if (string.IsNullOrEmpty(fieldName))
return null;

if (DateTime.TryParse(reader.GetString(fieldName), out var dt))
{
return dt.TimeOfDay;
}

return null;
}
catch (Exception e)
{
throw new Exception(GetErrorMeassge(reader, fieldName, "GetTime()"), e);
}
}

public static int? GetIntSafe(this IDataRecord reader, string fieldName)
{
try
{
if (string.IsNullOrEmpty(fieldName))
return null;

var value = GetValue(reader, fieldName);
if (value is DBNull || string.IsNullOrEmpty(value.ToString()))
var value = GetValueString(reader, fieldName);
if (value is null)
return null;

if (int.TryParse(value.ToString(), out var result))
if (int.TryParse(value, out var result))
return result;

return null;
Expand All @@ -80,8 +59,9 @@ public static string GetString(this IDataRecord reader, string fieldName)
if (string.IsNullOrEmpty(fieldName))
return null;

var value = GetValue(reader, fieldName);
if (value is DBNull || string.IsNullOrEmpty(value.ToString()))
var value = GetValueString(reader, fieldName);

if (string.IsNullOrEmpty(value))
return null;

return Convert.ToInt32(value);
Expand All @@ -99,11 +79,11 @@ public static string GetString(this IDataRecord reader, string fieldName)
if (string.IsNullOrEmpty(fieldName))
return null;

var value = GetValue(reader, fieldName);
if (value is DBNull || string.IsNullOrEmpty(value.ToString()))
var value = GetValueString(reader, fieldName);
if (value is null)
return null;

int.TryParse(value.ToString(), out var res);
int.TryParse(value, out var res);

return res;
}
Expand All @@ -120,12 +100,12 @@ public static string GetString(this IDataRecord reader, string fieldName)
if (string.IsNullOrEmpty(fieldName))
return null;

var value = GetValue(reader, fieldName);
var value = GetValueString(reader, fieldName);

if (value is DBNull || string.IsNullOrEmpty(value.ToString()))
if (value is null)
return null;

decimal.TryParse(value.ToString(), out var res);
decimal.TryParse(value, out var res);

return res;
}
Expand All @@ -142,11 +122,11 @@ public static DateTime GetDateTime(this IDataRecord reader, string fieldName)
if (string.IsNullOrEmpty(fieldName))
return DateTime.MinValue;

var result = GetValue(reader, fieldName) as DateTime?;
var result = reader[fieldName] as DateTime?;

if (!result.HasValue)
{
var dateTimeString = GetValue(reader, fieldName).ToString();
var dateTimeString = GetValueString(reader, fieldName);

if (!string.IsNullOrEmpty(dateTimeString) && DateTime.TryParse(dateTimeString, out var dateTime))
{
Expand All @@ -173,9 +153,9 @@ public static DateTime GetDateTime(this IDataRecord reader, string fieldName)
if (string.IsNullOrEmpty(fieldName))
return null;

var value = GetValue(reader, fieldName);
var value = GetValueString(reader, fieldName);

if (value is DBNull || string.IsNullOrEmpty(value.ToString()))
if (string.IsNullOrEmpty(value))
return null;

return Convert.ToInt64(value);
Expand All @@ -186,11 +166,20 @@ public static DateTime GetDateTime(this IDataRecord reader, string fieldName)
}
}

private static object GetValue(IDataRecord reader, string fieldName)
private static string GetValueString(IDataRecord reader, string fieldName)
{
try
{
return reader[fieldName];
var value = reader[fieldName];

if (value is DBNull || value is null)
return null;

var valStr = value.ToString().Trim();
if (valStr == "\\N")
return null;

return valStr;
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,15 @@ public void Fill(AmazonS3Client client, string bucket, string prefix)
{
foreach (var v2 in v1.Values)
{
v2.SourceConcepts?.TrimExcess();
v2.Ingredients?.TrimExcess();
}

v1.TrimExcess();
}

_lookup.TrimExcess();

GC.Collect();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public PregnancyConcepts(string folder)
GestValue = gestValue
});
}

_dictionary.TrimExcess();
}

public IEnumerable<PregnancyConcept> GetConcepts(long conceptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ private void MoveChunkRawData(int chunkId, string baseFolder)
var unloadQuery = string.Format(@"create table {0} sortkey ({1}) distkey ({1}) as {2}; " +
@"UNLOAD ('select * from {0} order by {1}') to 's3://{3}' " +
@"DELIMITER AS '\t' " +
@"NULL AS '\\N' " +
@"credentials 'aws_access_key_id={4};aws_secret_access_key={5}' " +
@"GZIP ALLOWOVERWRITE PARALLEL ON",
@"ZSTD ALLOWOVERWRITE PARALLEL ON",
tmpTableName, //0
personIdField, //1
sql, //2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class LambdaChunkPart : IDisposable
private long? _lastSavedPersonId;

private readonly System.Timers.Timer _watchdog;
private readonly Dictionary<string, S3DataReader3> _readers = [];
private readonly Dictionary<string, S3DataReaderZstd> _readers = [];
private Dictionary<string, long> _restorePoint = [];
private readonly string _tmpFolder;
private bool _readRestarted;
Expand All @@ -54,7 +54,7 @@ public LambdaChunkPart(int chunkId, Func<IPersonBuilder> createPersonBuilder, st

private void Watchdog_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
if (!string.IsNullOrEmpty(_currentReaderName) && _readers != null && _readers.Count > 0 && _readers.TryGetValue(_currentReaderName, out S3DataReader3 value))
if (!string.IsNullOrEmpty(_currentReaderName) && _readers != null && _readers.Count > 0 && _readers.TryGetValue(_currentReaderName, out S3DataReaderZstd value))
{
if (value.IdleTime != TimeSpan.Zero &&
value.IdleTime.TotalSeconds > 10)
Expand Down Expand Up @@ -105,7 +105,7 @@ private void Watchdog_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
if (_restorePoint.TryGetValue(qd.FileName, out long valueFileName))
initRow = valueFileName;

_readers.Add(qd.FileName, new S3DataReader3(Settings.Current.Bucket, folder,
_readers.Add(qd.FileName, new S3DataReaderZstd(Settings.Current.Bucket, folder,
Settings.Current.S3AwsAccessKeyId,
Settings.Current.S3AwsSecretAccessKey, _chunkId, qd.FileName, qd.FieldHeaders, _prefix,
initRow, _tmpFolder));
Expand Down Expand Up @@ -251,7 +251,7 @@ private void Watchdog_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
if (_restorePoint.ContainsKey(qd.FileName))
initRow = _restorePoint[qd.FileName];

_readers.Add(qd.FileName, new S3DataReader3(Settings.Current.Bucket, folder,
_readers.Add(qd.FileName, new S3DataReaderZstd(Settings.Current.Bucket, folder,
Settings.Current.S3AwsAccessKeyId,
Settings.Current.S3AwsSecretAccessKey, _chunkId, qd.FileName, qd.FieldHeaders, _prefix,
initRow, _tmpFolder));
Expand Down Expand Up @@ -593,17 +593,17 @@ private void AddEntity(QueryDefinition queryDefinition, IEnumerable<EntityDefini
else
{
if (personIdsToSave.Contains(personId.Value))
((S3DataReader3)reader).Resume();
((S3DataReaderZstd)reader).Resume();
else
{
((S3DataReader3)reader).Pause();
((S3DataReaderZstd)reader).Pause();
return;
}
}


queryDefinition.ProcessedPersonIds.TryAdd(personId.Value, 0);
queryDefinition.ProcessedPersonIds[personId.Value] = ((S3DataReader3)reader).RowIndex;
queryDefinition.ProcessedPersonIds[personId.Value] = ((S3DataReaderZstd)reader).RowIndex;

if (_readRestarted)
{
Expand Down
Loading

0 comments on commit 8deac82

Please sign in to comment.