From ca69e10ebbc08671fd327acefd4c729da95e1edd Mon Sep 17 00:00:00 2001 From: "J. Ritchie Carroll" Date: Mon, 23 Oct 2023 02:03:29 -0500 Subject: [PATCH] Added scheduled downsampling intervals and improved existing archive curtailment operations. --- .../LocalOutputAdapter.cs | 296 +++++++++++++++++- .../Properties/AssemblyInfo.cs | 1 + .../Snap/Services/ArchiveDetails.cs | 13 +- .../Snap/Storage/FileFlags.cs | 95 ++++-- Source/openHistorian.sln.DotSettings | 4 + 5 files changed, 373 insertions(+), 36 deletions(-) diff --git a/Source/Libraries/Adapters/openHistorian.Adapters/LocalOutputAdapter.cs b/Source/Libraries/Adapters/openHistorian.Adapters/LocalOutputAdapter.cs index 38caca99f6..2d46510540 100755 --- a/Source/Libraries/Adapters/openHistorian.Adapters/LocalOutputAdapter.cs +++ b/Source/Libraries/Adapters/openHistorian.Adapters/LocalOutputAdapter.cs @@ -43,13 +43,17 @@ using GSF.Historian.Replication; using GSF.IO; using GSF.IO.Unmanaged; +using GSF.Snap.Filters; using GSF.Snap.Services; +using GSF.Snap.Services.Reader; +using GSF.Snap.Storage; using GSF.TimeSeries; using GSF.TimeSeries.Adapters; using GSF.Units; using openHistorian.Model; using openHistorian.Net; using openHistorian.Snap; +using openHistorian.Snap.Definitions; using DeviceGroup = openHistorian.Model.DeviceGroup; using Measurement = openHistorian.Model.Measurement; using Timer = System.Timers.Timer; @@ -98,6 +102,11 @@ public class LocalOutputAdapter : OutputAdapterBase /// public const int DefaultMaximumArchiveDays = 0; + /// + /// Defines the default value for . + /// + public const string DefaultDownsamplingIntervals = ""; + /// /// Defines the default value for . /// @@ -152,6 +161,7 @@ public class LocalOutputAdapter : OutputAdapterBase private Dictionary m_measurements; private Dictionary> m_compressionSettings; private readonly Dictionary> m_swingingDoorStates; + private SortedList m_downsamplingIntervals; private Timer m_archiveCurtailmentTimer; private SafeFileWatcher[] m_attachedPathWatchers; private bool m_disposed; @@ -168,6 +178,7 @@ public LocalOutputAdapter() m_key = new HistorianKey(); m_value = new HistorianValue(); m_swingingDoorStates = new Dictionary>(); + m_downsamplingIntervals = new SortedList(); } #endregion @@ -368,6 +379,51 @@ public double DesiredRemainingSpace [DefaultValue(DefaultMaximumArchiveDays)] public int MaximumArchiveDays { get; set; } = DefaultMaximumArchiveDays; + /// + /// Gets or sets the scheduled downsampling intervals for this archive. + /// + [ConnectionStringParameter] + [Description("Define the scheduled downsampling intervals for this archive. Format: any number of downsampling-start-days=samples-per-second separated by semi-colons, e.g., 180=10; 365=1; 650=0.1")] + [DefaultValue(DefaultDownsamplingIntervals)] + public string DownsamplingIntervals + { + get + { + if (m_downsamplingIntervals.Count == 0) + return string.Empty; + + StringBuilder downsampling = new(); + + foreach (KeyValuePair interval in m_downsamplingIntervals) + { + if (downsampling.Length > 0) + downsampling.Append("; "); + + downsampling.Append($"{interval.Key}={interval.Value}"); + } + + return downsampling.ToString(); + } + set + { + m_downsamplingIntervals.Clear(); + + if (string.IsNullOrWhiteSpace(value)) + return; + + foreach (string interval in value.Split(';')) + { + string[] parts = interval.Split('='); + + if (parts.Length != 2) + continue; + + if (int.TryParse(parts[0].Trim(), out int startDay) && startDay > 0 && double.TryParse(parts[1].Trim(), out double samplesPerSecond) && samplesPerSecond > 0.0D) + m_downsamplingIntervals[startDay] = samplesPerSecond; + } + } + } + /// /// Gets or sets the flag that determines if oldest archive files should be removed before running out of archive space. /// @@ -501,6 +557,7 @@ public override string Status status.AppendLine($" Staging count: {m_archiveInfo.StagingCount:N0}"); status.AppendLine($" Memory pool size: {Globals.MemoryPool.MaximumPoolSize / SI2.Giga:N4}GB"); status.AppendLine($" Maximum archive days: {(MaximumArchiveDays < 1 ? "No limit" : MaximumArchiveDays.ToString("N0"))}"); + status.AppendLine($" Downsampling intervals: {(string.IsNullOrWhiteSpace(DownsamplingIntervals) ? "None defined" : DownsamplingIntervals)}"); status.AppendLine($" Auto-remove old archives: {AutoRemoveOldestFilesBeforeFull}"); status.AppendLine($" Time reasonability check: {(EnableTimeReasonabilityCheck ? "Enabled" : "Not Enabled")}"); status.AppendLine($" Archive curtailment timer: {Time.ToElapsedTimeString(ArchiveCurtailmentInterval, 0)}"); @@ -654,6 +711,18 @@ public override void Initialize() if (settings.TryGetValue(nameof(AutoRemoveOldestFilesBeforeFull), out setting)) AutoRemoveOldestFilesBeforeFull = setting.ParseBoolean(); + if (MaximumArchiveDays < 1 && !AutoRemoveOldestFilesBeforeFull) + OnStatusMessage(MessageLevel.Warning, "Maximum archive days not set and automated removal of oldest files before full disk is not enabled: system will not initiate archive file curtailment operations in this configuration. Disk space for target archive paths should be monitored externally."); + + if (settings.TryGetValue(nameof(DownsamplingIntervals), out setting)) + DownsamplingIntervals = setting; + + if (MaximumArchiveDays > 0 && m_downsamplingIntervals.Any(kvp => kvp.Key >= MaximumArchiveDays)) + { + OnStatusMessage(MessageLevel.Warning, $"Downsampling intervals defined for days greater than or equal to maximum archive days ({MaximumArchiveDays:N0}) will be ignored."); + m_downsamplingIntervals = new SortedList(m_downsamplingIntervals.Where(kvp => kvp.Key < MaximumArchiveDays).ToDictionary(kvp => kvp.Key, kvp => kvp.Value)); + } + EnableTimeReasonabilityCheck = settings.TryGetValue(nameof(EnableTimeReasonabilityCheck), out setting) && setting.ParseBoolean(); if (settings.TryGetValue(nameof(PastTimeReasonabilityLimit), out setting) && double.TryParse(setting, out double value)) @@ -683,6 +752,11 @@ public override void Initialize() if (!settings.TryGetValue("CacheFlushInterval", out setting) || !int.TryParse(setting, out int cacheFlushInterval)) cacheFlushInterval = 100; + // Validate maximum downsampling intervals. Currently only 10 stages are defined, 0 to 9 - since configured + // staging count is typically 3, this allows 6 total downsampling intervals (as extra stages): + if (m_downsamplingIntervals.Count > 9 - stagingCount) + throw new InvalidOperationException($"Maximum of {9 - stagingCount} downsampling intervals are allowed."); + // Establish archive information for this historian instance m_archiveInfo = new HistorianServerDatabaseConfig(InstanceName, WorkingDirectory, true); @@ -787,32 +861,230 @@ public void DeleteFolder(string folderName) } /// - /// Initiates archive file curtailment based on defined maximum archive days. + /// Initiates archive file curtailment based on defined maximum archive days, auto removal of oldest files before full disk and defined downsampling intervals. /// - [AdapterCommand("Initiates archive file curtailment based on defined maximum archive days.", "Administrator", "Editor")] + [AdapterCommand("Initiates archive file curtailment based on defined maximum archive days, auto removal of oldest files before full disk and defined downsampling intervals.", "Administrator", "Editor")] public void CurtailArchiveFiles() { - if (MaximumArchiveDays < 1) + if (AutoRemoveOldestFilesBeforeFull) + RemoveOldestFilesBeforeFull(); + + if (m_downsamplingIntervals.Count > 0) + DownsampleArchiveFiles(); + + if (MaximumArchiveDays > 0) + RemoveFilesOlderThanMaxArchiveDays(); + } + + private void RemoveOldestFilesBeforeFull() + { + // Set target space to be three times the target file size plus minimum disk space before considered full, + // this will allow three new final stage archive files to be written before next curtailment interval. + // Curtailment interval, desired remaining space and target file size can all be adjusted to better + // accommodate high volume data archiving in very low disk space environments. + long neededSpace = m_archiveInfo.DesiredRemainingSpace + 3 * m_archiveInfo.TargetFileSize; + + try + { + // Check if any target archive destination has enough disk space + foreach (string path in m_archiveDirectories) + { + FilePath.GetAvailableFreeSpace(path, out long freeSpace, out _); + + // If any path has needed space, then we are done + if (freeSpace > neededSpace) + return; + } + } + catch (Exception ex) + { + OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Oldest file removal operation cancelled: failed during check for full disk: {ex.Message}", ex)); + + // Do not continue with archive curtailment if disk space check failed + return; + } + + try + { + OnStatusMessage(MessageLevel.Warning, "Disk space is near full, scanning for oldest archive files..."); + + ClientDatabaseBase database = GetClientDatabase(); + + long fileSizeSum = 0; + + // Find oldest archive files until we have reached target disk space. End time is preferred over start + // time for sorting since devices with an inaccurate GPS clock can provide bad start times when out + // of range timestamps are not configured to be filtered and you don't want to accidentally delete a + // file with otherwise in-range data. + ArchiveDetails[] filesToDelete = database.GetAllAttachedFiles().OrderBy(file => file.EndTime).TakeWhile(item => + { + fileSizeSum += item.FileSize; + return fileSizeSum < neededSpace; + }).ToArray(); + + database.DeleteFiles(filesToDelete.Select(file => file.Id).ToList()); + + OnStatusMessage(MessageLevel.Warning, $"Deleted the following oldest archive files in order to free disk space:\r\n {filesToDelete.Select(file => FilePath.TrimFileName(file.FileName, 75)).ToDelimitedString($"{Environment.NewLine} ")}"); + } + catch (Exception ex) + { + OnProcessException(MessageLevel.Error, new InvalidOperationException($"Failed while attempting to delete oldest archive files in order to free disk space: {ex.Message}", ex)); + } + } + + private void DownsampleArchiveFiles() + { + ClientDatabaseBase database = GetClientDatabase(); + SortedList filesToDownsample = new(); + + try + { + OnStatusMessage(MessageLevel.Info, "Scanning archive files for downsampling..."); + + List attachedFiles = database.GetAllAttachedFiles(); + + // Start with oldest files first, this way they can be excluded from further downsampling if they are already targeted + int[] startDays = m_downsamplingIntervals.Keys.OrderByDescending(value => value).ToArray(); + HashSet targetedArchives = new(); + + int downsamplingStage = m_archiveInfo.StagingCount + startDays.Length; + + foreach (int startDay in startDays) + { + // Get list of files that have both a start time and an end time that are greater than the target + // start day for a given downsampling interval. We check both start and end times since devices + // with an inaccurate GPS clock can provide bad time when out of range timestamps are not configured + // to be filtered and you don't want to accidentally delete a file with otherwise in-range data. + // We also filter on stage number to avoid downsampling files that have already been downsampled. + HashSet matchingFiles = new(attachedFiles.Where(file => + (DateTime.UtcNow - file.StartTime).TotalDays > startDay && + (DateTime.UtcNow - file.EndTime).TotalDays > startDay && + FileFlags.GetStageNumber(file.Flags) < downsamplingStage && + !file.Flags.Contains(FileFlags.IntermediateFile))); + + // Start days are ordered from highest to lowest, so we decrement the downsampling stage + downsamplingStage--; + + if (matchingFiles.Count == 0) + continue; + + matchingFiles.ExceptWith(targetedArchives); + + if (matchingFiles.Count == 0) + continue; + + filesToDownsample[startDay] = matchingFiles.ToArray(); + targetedArchives.UnionWith(matchingFiles); + } + } + catch (Exception ex) { - OnStatusMessage(MessageLevel.Info, "Maximum archive days not set, cannot initiate archive file curtailment."); + OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed while scanning archive files for downsampling: {ex.Message}", ex)); + } + + if (filesToDownsample.Count == 0) return; + + try + { + int downsamplingStage = 1; + + foreach (KeyValuePair kvp in filesToDownsample) + { + int startDay = kvp.Key; + ArchiveDetails[] files = kvp.Value; + double samplesPerSecond = m_downsamplingIntervals[startDay]; + + // Each successive downsampled file will be flagged with a higher stage count + Guid stageFlags = FileFlags.GetStage(m_archiveInfo.StagingCount + downsamplingStage++); + + OnStatusMessage(MessageLevel.Info, $"Downsampling {files.Length:N0} archive files older than {startDay:N0} days to {samplesPerSecond} samples per second..."); + + foreach (ArchiveDetails file in files) + { + string downsampledFile = DownsampleArchiveFile(file, samplesPerSecond, stageFlags); + + if (string.IsNullOrWhiteSpace(downsampledFile)) + continue; + + try + { + // Remove higher resolution file after successful downsampling + database.DeleteFiles(new List(new[] { file.Id })); + + // Attach downsampled resolution file to active historian instance + database.AttachFilesOrPaths(new [] { downsampledFile }); + + OnStatusMessage(MessageLevel.Info, $"Downsampled archive file \"{FilePath.GetFileName(file.FileName)}\" to {samplesPerSecond} samples per second."); + } + catch (Exception ex) + { + OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed while attempting to transition to downsampled archive file \"{FilePath.GetFileName(downsampledFile)}\": {ex.Message}", ex)); + } + } + } } + catch (Exception ex) + { + OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed downsampling archive files: {ex.Message}", ex)); + } + } + private string DownsampleArchiveFile(ArchiveDetails file, double samplesPerSecond, Guid stageFlags) + { try { - OnStatusMessage(MessageLevel.Info, "Attempting to curtail archive files based on defined maximum archive days..."); + DateTime startTime = file.StartTime; + DateTime endTime = file.EndTime; + TimeSpan interval = new((long)(TimeSpan.TicksPerSecond * samplesPerSecond)); + + using ArchiveList archiveList = new(); + archiveList.LoadFiles(new[] { file.FileName }); + + using SequentialReaderStream reader = new(archiveList, SortedTreeEngineReaderOptions.Default, TimestampSeekFilter.CreateFromIntervalData(startTime, endTime, interval, new TimeSpan(TimeSpan.TicksPerMillisecond))); + + string sourcePath = FilePath.GetDirectoryName(file.FileName); + string completeFileName = Path.Combine(sourcePath, $"{FilePath.GetFileNameWithoutExtension(file.FileName)}-{samplesPerSecond}sps.d2"); + string pendingFileName = Path.Combine(sourcePath, $"{FilePath.GetFileNameWithoutExtension(completeFileName)}.~d2i"); + + SortedTreeFileSimpleWriter.Create(pendingFileName, completeFileName, 4096, null, HistorianFileEncodingDefinition.TypeGuid, reader, stageFlags); + + return completeFileName; + } + catch (Exception ex) + { + OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed while attempting to downsample archive file \"{FilePath.GetFileName(file.FileName)}\" to {samplesPerSecond} samples per second: {ex.Message}", ex)); + } + + return null; + } + + private void RemoveFilesOlderThanMaxArchiveDays() + { + try + { + OnStatusMessage(MessageLevel.Info, $"Scanning for archive files older than {MaximumArchiveDays:N0} days..."); ClientDatabaseBase database = GetClientDatabase(); - // Get list of files that have both a start time and an end time that are greater than the maximum archive days. We check both start and end times - // since PMUs can provide bad time (not currently being filtered) and you don't want to accidentally delete a file with otherwise in-range data. - ArchiveDetails[] filesToDelete = database.GetAllAttachedFiles().Where(file => (DateTime.UtcNow - file.StartTime).TotalDays > MaximumArchiveDays && (DateTime.UtcNow - file.EndTime).TotalDays > MaximumArchiveDays).ToArray(); + // Get list of files that have both a start time and an end time that are greater than the maximum + // archive days. We check both start and end times since devices with an inaccurate GPS clock can + // provide bad time when out of range timestamps are not configured to be filtered and you don't + // want to accidentally delete a file with otherwise in-range data. + ArchiveDetails[] filesToDelete = database.GetAllAttachedFiles().Where(file => + (DateTime.UtcNow - file.StartTime).TotalDays > MaximumArchiveDays && + (DateTime.UtcNow - file.EndTime).TotalDays > MaximumArchiveDays).ToArray(); + + if (filesToDelete.Length <= 0) + return; + database.DeleteFiles(filesToDelete.Select(file => file.Id).ToList()); - OnStatusMessage(MessageLevel.Info, $"Deleted the following old archive files:\r\n {filesToDelete.Select(file => FilePath.TrimFileName(file.FileName, 75)).ToDelimitedString(Environment.NewLine + " ")}"); + + OnStatusMessage(MessageLevel.Info, $"Deleted the following archive files that were older than {MaximumArchiveDays:N0} days:\r\n {filesToDelete.Select(file => FilePath.TrimFileName(file.FileName, 75)).ToDelimitedString($"{Environment.NewLine} ")}"); } catch (Exception ex) { - OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed to limit maximum archive size: {ex.Message}", ex)); + OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed while attempting to delete archive files older than {MaximumArchiveDays:N0} days: {ex.Message}", ex)); } } @@ -1288,9 +1560,9 @@ private static void OptimizeLocalHistorianSettings(AdoDataConnection connection, // TODO: Remove this code when device IDs have been updated to use GUIDs: // Make sure gateway protocol data publishers filter out device groups from metadata since groups reference local device IDs const string DefaultDeviceFilter = "FROM DeviceDetail WHERE IsConcentrator = 0;"; - const string GSFDataPublisherTypeName = nameof(GSF) + "." + nameof(GSF.TimeSeries) + "." + nameof(GSF.TimeSeries.Transport) + "." + nameof(GSF.TimeSeries.Transport.DataPublisher); + const string GSFDataPublisherTypeName = $"{nameof(GSF)}.{nameof(GSF.TimeSeries)}.{nameof(GSF.TimeSeries.Transport)}.{nameof(GSF.TimeSeries.Transport.DataPublisher)}"; const string GSFMetadataTables = nameof(GSFDataPublisher.MetadataTables); - const string STTPDataPublisherTypeName = nameof(sttp) + "." + nameof(sttp.DataPublisher); + const string STTPDataPublisherTypeName = $"{nameof(sttp)}.{nameof(sttp.DataPublisher)}"; const string STTPMetadataTables = nameof(STTPDataPublisher.MetadataTables); int virtualProtocolID = s_virtualProtocolID != 0 ? s_virtualProtocolID : s_virtualProtocolID = connection.ExecuteScalar("SELECT ID FROM Protocol WHERE Acronym='VirtualInput'"); string deviceGroupFilter = $"FROM DeviceDetail WHERE IsConcentrator = 0 AND NOT (ProtocolID = {virtualProtocolID} AND AccessID = {DeviceGroup.DefaultAccessID});"; diff --git a/Source/Libraries/GSF.SortedTreeStore/Properties/AssemblyInfo.cs b/Source/Libraries/GSF.SortedTreeStore/Properties/AssemblyInfo.cs index 870b493230..d66b45f164 100755 --- a/Source/Libraries/GSF.SortedTreeStore/Properties/AssemblyInfo.cs +++ b/Source/Libraries/GSF.SortedTreeStore/Properties/AssemblyInfo.cs @@ -42,5 +42,6 @@ [assembly: AssemblyVersion("2.8.343.0")] [assembly: AssemblyFileVersion("2.8.343.0")] [assembly: InternalsVisibleTo("GSF.SortedTreeStore.Test")] +[assembly: InternalsVisibleTo("openHistorian.Adapters")] [assembly: InternalsVisibleTo("openHistorian.PerformanceTests")] [assembly: InternalsVisibleTo("ArchiveIntegrityChecker")] \ No newline at end of file diff --git a/Source/Libraries/GSF.SortedTreeStore/Snap/Services/ArchiveDetails.cs b/Source/Libraries/GSF.SortedTreeStore/Snap/Services/ArchiveDetails.cs index 53c9898710..b4a2ca64af 100755 --- a/Source/Libraries/GSF.SortedTreeStore/Snap/Services/ArchiveDetails.cs +++ b/Source/Libraries/GSF.SortedTreeStore/Snap/Services/ArchiveDetails.cs @@ -22,6 +22,7 @@ //****************************************************************************************************** using System; +using System.Linq; namespace GSF.Snap.Services { @@ -108,6 +109,15 @@ public DateTime EndTime private set; } + /// + /// Gets the flags for the archive file. + /// + public Guid[] Flags + { + get; + private set; + } + private ArchiveDetails() { } @@ -126,7 +136,8 @@ public static ArchiveDetails Create(ArchiveTableSummary public static Guid GetStage(int stageNumber) { - switch (stageNumber) + return stageNumber switch { - case 0: - return Stage0; - case 1: - return Stage1; - case 2: - return Stage2; - case 3: - return Stage3; - case 4: - return Stage4; - case 5: - return Stage5; - case 6: - return Stage6; - case 7: - return Stage7; - case 8: - return Stage8; - case 9: - return Stage9; + 0 => Stage0, + 1 => Stage1, + 2 => Stage2, + 3 => Stage3, + 4 => Stage4, + 5 => Stage5, + 6 => Stage6, + 7 => Stage7, + 8 => Stage8, + 9 => Stage9, + _ => throw new ArgumentOutOfRangeException(nameof(stageNumber), "Must be between 0 and 9") + }; + } + + /// + /// Gets the number of the supplied stage . + /// + /// Stage flag. + /// Number of specified stage ; otherwise, -1 if flag is not a stage flag. + public static int GetStageNumber(Guid flag) + { + if (flag == Stage0) + return 0; + if (flag == Stage1) + return 1; + if (flag == Stage2) + return 2; + if (flag == Stage3) + return 3; + if (flag == Stage4) + return 4; + if (flag == Stage5) + return 5; + if (flag == Stage6) + return 6; + if (flag == Stage7) + return 7; + if (flag == Stage8) + return 8; + if (flag == Stage9) + return 9; + + return -1; + } + + /// + /// Gets the stage number from the set of supplied file . + /// + /// File flags. + /// Stage number from the set of supplied file ; otherwise, -1 if none of the flags is a stage flag. + public static int GetStageNumber(Guid[] flags) + { + foreach (Guid flag in flags) + { + int stage = GetStageNumber(flag); + + if (stage > -1) + return stage; } - throw new ArgumentOutOfRangeException("stageNumber", "Must be between 0 and 9"); + + return -1; + } + + /// + /// Determines if the supplied flag is a stage flag. + /// + /// Flag to test. + /// true if is a stage flag; otherwise, false. + public static bool IsStageFlag(Guid flag) + { + return GetStageNumber(flag) > -1; } // {4558F270-3F85-456C-8824-7805FF03B384} @@ -118,6 +167,7 @@ public static Guid GetStage(int stageNumber) /// public static readonly Guid Stage8 = new Guid(0x69be577b, 0xc044, 0x49c2, 0x85, 0xb3, 0x8f, 0x8f, 0xad, 0x76, 0x38, 0x03); + // {B5A3FBC4-285A-4559-A3EA-0940219677F8} /// /// Indicates that the file is in Stage 9. /// @@ -134,6 +184,5 @@ public static Guid GetStage(int stageNumber) /// Indicates that this is an intermediate file that can still be automatically rolled over. /// public static readonly Guid IntermediateFile = new Guid(0xd4626375, 0x3e2f, 0x4a62, 0xbc, 0x12, 0x65, 0xbb, 0x45, 0xe4, 0xa7, 0xb6); - } } diff --git a/Source/openHistorian.sln.DotSettings b/Source/openHistorian.sln.DotSettings index 1c31e1b7d3..923701dd38 100644 --- a/Source/openHistorian.sln.DotSettings +++ b/Source/openHistorian.sln.DotSettings @@ -2,5 +2,9 @@ CFF DB True + True + True + + True True True \ No newline at end of file