Skip to content

Commit

Permalink
Syncing recent changes. (#1027)
Browse files Browse the repository at this point in the history
* Making monitoring server IPv4-compatible.
* Fixing issues with MySQL and 0 seconsd from epoch timestamps (MySQL
  expects timestamps starting from 1 second from epoch).
* MySQL optimisations to prevent locking issues in client_paths table.
* UIv2 work.
  • Loading branch information
mbushkov authored Mar 22, 2023
1 parent 3928b69 commit 7f93f8b
Show file tree
Hide file tree
Showing 26 changed files with 323 additions and 121 deletions.
5 changes: 3 additions & 2 deletions grr/server/grr_response_server/bin/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ def handle(l):

results = data_store.REL_DB.ReadClientStats(
client_id=client_id,
min_timestamp=rdfvalue.RDFDatetime.FromSecondsSinceEpoch(0),
max_timestamp=rdfvalue.RDFDatetime.Now())
min_timestamp=data_store.REL_DB.MinTimestamp(),
max_timestamp=rdfvalue.RDFDatetime.Now(),
)
self.assertLen(results, 1)
stats = results[0]

Expand Down
6 changes: 4 additions & 2 deletions grr/server/grr_response_server/client_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ def _NormalizeKeyword(self, keyword):

def _AnalyzeKeywords(self, keywords):
"""Extracts a start time from a list of keywords if present."""
start_time = rdfvalue.RDFDatetime.Now() - rdfvalue.Duration.From(
180, rdfvalue.DAYS)
start_time = max(
rdfvalue.RDFDatetime.Now() - rdfvalue.Duration.From(180, rdfvalue.DAYS),
data_store.REL_DB.MinTimestamp(),
)
filtered_keywords = []

for k in keywords:
Expand Down
87 changes: 51 additions & 36 deletions grr/server/grr_response_server/databases/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,16 @@ class Database(metaclass=abc.ABCMeta):
def Now(self) -> rdfvalue.RDFDatetime:
"""Retrieves current time as reported by the database."""

# Different DB engines might make different assumptions about what a valid
# minimal timestamp is.
# For example, MySQL doesn't handle sub second fractional timestamps well:
# Per https://dev.mysql.com/doc/refman/8.0/en/datetime.html:
# "the range for TIMESTAMP values is '1970-01-01 00:00:01.000000' to
# '2038-01-19 03:14:07.999999'".
@abc.abstractmethod
def MinTimestamp(self) -> rdfvalue.RDFDatetime:
"""Returns minimal timestamp allowed by the DB."""

@abc.abstractmethod
def WriteArtifact(self, artifact):
"""Writes new artifact to the database.
Expand Down Expand Up @@ -3062,6 +3072,9 @@ def __init__(self, delegate: Database):
def Now(self) -> rdfvalue.RDFDatetime:
return self.delegate.Now()

def MinTimestamp(self) -> rdfvalue.RDFDatetime:
return self.delegate.MinTimestamp()

def WriteArtifact(self, artifact):
precondition.AssertType(artifact, rdf_artifacts.Artifact)
if not artifact.name:
Expand Down Expand Up @@ -3185,7 +3198,7 @@ def WriteClientSnapshotHistory(self, clients):
def ReadClientSnapshotHistory(self, client_id, timerange=None):
precondition.ValidateClientId(client_id)
if timerange is not None:
_ValidateTimeRange(timerange)
self._ValidateTimeRange(timerange)

return self.delegate.ReadClientSnapshotHistory(
client_id, timerange=timerange)
Expand All @@ -3205,7 +3218,7 @@ def ReadClientStartupInfo(self,
def ReadClientStartupInfoHistory(self, client_id, timerange=None):
precondition.ValidateClientId(client_id)
if timerange is not None:
_ValidateTimeRange(timerange)
self._ValidateTimeRange(timerange)

return self.delegate.ReadClientStartupInfoHistory(
client_id, timerange=timerange)
Expand Down Expand Up @@ -3250,7 +3263,7 @@ def ListClientsForKeywords(
keywords = set(keywords)

if start_time:
_ValidateTimestamp(start_time)
self._ValidateTimestamp(start_time)

result = self.delegate.ListClientsForKeywords(
keywords, start_time=start_time)
Expand Down Expand Up @@ -3324,12 +3337,12 @@ def ReadClientStats(
if min_timestamp is None:
min_timestamp = rdfvalue.RDFDatetime.Now() - CLIENT_STATS_RETENTION
else:
_ValidateTimestamp(min_timestamp)
self._ValidateTimestamp(min_timestamp)

if max_timestamp is None:
max_timestamp = rdfvalue.RDFDatetime.Now()
else:
_ValidateTimestamp(max_timestamp)
self._ValidateTimestamp(max_timestamp)

return self.delegate.ReadClientStats(client_id, min_timestamp,
max_timestamp)
Expand Down Expand Up @@ -3475,7 +3488,7 @@ def ReadPathInfo(self, client_id, path_type, components, timestamp=None):
_ValidatePathComponents(components)

if timestamp is not None:
_ValidateTimestamp(timestamp)
self._ValidateTimestamp(timestamp)

return self.delegate.ReadPathInfo(
client_id, path_type, components, timestamp=timestamp)
Expand Down Expand Up @@ -3525,7 +3538,7 @@ def FindPathInfoByPathID(self, client_id, path_type, path_id, timestamp=None):
precondition.ValidateClientId(client_id)

if timestamp is not None:
_ValidateTimestamp(timestamp)
self._ValidateTimestamp(timestamp)

return self.delegate.FindPathInfoByPathID( # pytype: disable=attribute-error
client_id, path_type, path_id, timestamp=timestamp)
Expand Down Expand Up @@ -3596,7 +3609,7 @@ def WriteUserNotification(self, notification):
def ReadUserNotifications(self, username, state=None, timerange=None):
_ValidateUsername(username)
if timerange is not None:
_ValidateTimeRange(timerange)
self._ValidateTimeRange(timerange)
if state is not None:
_ValidateNotificationState(state)

Expand Down Expand Up @@ -3759,7 +3772,7 @@ def ReadCronJobRuns(self, job_id):
return self.delegate.ReadCronJobRuns(job_id)

def DeleteOldCronJobRuns(self, cutoff_timestamp):
_ValidateTimestamp(cutoff_timestamp)
self._ValidateTimestamp(cutoff_timestamp)
return self.delegate.DeleteOldCronJobRuns(cutoff_timestamp)

def WriteHashBlobReferences(self, references_by_hash):
Expand Down Expand Up @@ -3880,10 +3893,10 @@ def UpdateFlow(self,
precondition.AssertType(client_crash_info, rdf_client.ClientCrash)
if processing_since != Database.unchanged:
if processing_since is not None:
_ValidateTimestamp(processing_since)
self._ValidateTimestamp(processing_since)
if processing_deadline != Database.unchanged:
if processing_deadline is not None:
_ValidateTimestamp(processing_deadline)
self._ValidateTimestamp(processing_deadline)
return self.delegate.UpdateFlow(
client_id,
flow_id,
Expand Down Expand Up @@ -4487,6 +4500,33 @@ def ReadBlobEncryptionKeys(

return self.delegate.ReadBlobEncryptionKeys(blob_ids)

# Minimal allowed timestamp is DB-specific. Thus the validation code for
# timestamps is DB-specific as well.
def _ValidateTimeRange(
self, timerange: Tuple[rdfvalue.RDFDatetime, rdfvalue.RDFDatetime]
):
"""Parses a timerange argument and always returns non-None timerange."""
if len(timerange) != 2:
raise ValueError("Timerange should be a sequence with 2 items.")

(start, end) = timerange
precondition.AssertOptionalType(start, rdfvalue.RDFDatetime)
precondition.AssertOptionalType(end, rdfvalue.RDFDatetime)
if start is not None:
self._ValidateTimestamp(start)
if end is not None:
self._ValidateTimestamp(end)

# Minimal allowed timestamp is DB-specific. Thus the validation code for
# timestamps is DB-specific as well.
def _ValidateTimestamp(self, timestamp: rdfvalue.RDFDatetime):
precondition.AssertType(timestamp, rdfvalue.RDFDatetime)
if timestamp < self.delegate.MinTimestamp():
raise ValueError(
"Timestamp is less than the minimal timestamp allowed by the DB: "
f"{timestamp} < {self.delegate.MinTimestamp()}."
)


def _ValidateEnumType(value, expected_enum_type):
if value not in expected_enum_type.reverse_enum:
Expand Down Expand Up @@ -4606,35 +4646,10 @@ def _ValidateNotificationState(notification_state):
raise ValueError("notification_state can't be STATE_UNSET")


def _ValidateTimeRange(timerange):
"""Parses a timerange argument and always returns non-None timerange."""
if len(timerange) != 2:
raise ValueError("Timerange should be a sequence with 2 items.")

(start, end) = timerange
precondition.AssertOptionalType(start, rdfvalue.RDFDatetime)
precondition.AssertOptionalType(end, rdfvalue.RDFDatetime)


def _ValidateClosedTimeRange(time_range):
"""Checks that a time-range has both start and end timestamps set."""
time_range_start, time_range_end = time_range
_ValidateTimestamp(time_range_start)
_ValidateTimestamp(time_range_end)
if time_range_start > time_range_end:
raise ValueError("Invalid time-range: %d > %d." %
(time_range_start.AsMicrosecondsSinceEpoch(),
time_range_end.AsMicrosecondsSinceEpoch()))


def _ValidateDuration(duration):
precondition.AssertType(duration, rdfvalue.Duration)


def _ValidateTimestamp(timestamp):
precondition.AssertType(timestamp, rdfvalue.RDFDatetime)


def _ValidateClientPathID(client_path_id):
precondition.AssertType(client_path_id, rdf_objects.ClientPathID)

Expand Down
18 changes: 9 additions & 9 deletions grr/server/grr_response_server/databases/db_flows_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,12 @@ def testFlowTimestamp(self):

self.db.WriteClientMetadata(client_id, fleetspeak_enabled=False)

before_timestamp = rdfvalue.RDFDatetime.Now()
before_timestamp = self.db.Now()

flow_obj = rdf_flow_objects.Flow(client_id=client_id, flow_id=flow_id)
self.db.WriteFlowObject(flow_obj)

after_timestamp = rdfvalue.RDFDatetime.Now()
after_timestamp = self.db.Now()

flow_obj = self.db.ReadFlowObject(client_id=client_id, flow_id=flow_id)
self.assertBetween(flow_obj.create_time, before_timestamp, after_timestamp)
Expand All @@ -318,13 +318,13 @@ def testFlowTimestampWithMissingCreationTime(self):

self.db.WriteClientMetadata(client_id, fleetspeak_enabled=False)

before_timestamp = rdfvalue.RDFDatetime.Now()
before_timestamp = self.db.Now()

flow_obj = rdf_flow_objects.Flow(client_id=client_id, flow_id=flow_id)
flow_obj.create_time = None
self.db.WriteFlowObject(flow_obj)

after_timestamp = rdfvalue.RDFDatetime.Now()
after_timestamp = self.db.Now()

flow_obj = self.db.ReadFlowObject(client_id=client_id, flow_id=flow_id)
self.assertBetween(flow_obj.create_time, before_timestamp, after_timestamp)
Expand Down Expand Up @@ -698,7 +698,7 @@ def testProcessingInformationUpdate(self):
client_id = db_test_utils.InitializeClient(self.db)
flow_id = db_test_utils.InitializeFlow(self.db, client_id)

now = rdfvalue.RDFDatetime.Now()
now = self.db.Now()
deadline = now + rdfvalue.Duration.From(6, rdfvalue.HOURS)
self.db.UpdateFlow(
client_id,
Expand Down Expand Up @@ -1530,10 +1530,10 @@ def testLeaseFlowForProcessingUpdatesFlowObjects(self):
def testFlowLastUpdateTime(self):
processing_time = rdfvalue.Duration.From(60, rdfvalue.SECONDS)

t0 = rdfvalue.RDFDatetime.Now()
t0 = self.db.Now()
client_id = db_test_utils.InitializeClient(self.db)
flow_id = db_test_utils.InitializeFlow(self.db, client_id)
t1 = rdfvalue.RDFDatetime.Now()
t1 = self.db.Now()

read_flow = self.db.ReadFlowObject(client_id, flow_id)

Expand All @@ -1543,9 +1543,9 @@ def testFlowLastUpdateTime(self):
client_id, flow_id, processing_time)
self.assertBetween(flow_for_processing.last_update_time, t0, t1)

t2 = rdfvalue.RDFDatetime.Now()
t2 = self.db.Now()
self.db.ReleaseProcessedFlow(flow_for_processing)
t3 = rdfvalue.RDFDatetime.Now()
t3 = self.db.Now()

read_flow = self.db.ReadFlowObject(client_id, flow_id)
self.assertBetween(read_flow.last_update_time, t2, t3)
Expand Down
3 changes: 3 additions & 0 deletions grr/server/grr_response_server/databases/mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,6 @@ def _DeepCopy(self, obj):
def Now(self) -> rdfvalue.RDFDatetime:
del self # Unused.
return rdfvalue.RDFDatetime.Now()

def MinTimestamp(self) -> rdfvalue.RDFDatetime:
return rdfvalue.RDFDatetime.FromSecondsSinceEpoch(0)
6 changes: 6 additions & 0 deletions grr/server/grr_response_server/databases/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,9 @@ def Now(self, cursor: MySQLdb.cursors.Cursor) -> rdfvalue.RDFDatetime:
[(timestamp,)] = cursor.fetchall()

return mysql_utils.TimestampToRDFDatetime(timestamp)

def MinTimestamp(self) -> rdfvalue.RDFDatetime:
# Per https://dev.mysql.com/doc/refman/8.0/en/datetime.html:
# "the range for TIMESTAMP values is '1970-01-01 00:00:01.000000' to
# '2038-01-19 03:14:07.999999'".
return rdfvalue.RDFDatetime.FromSecondsSinceEpoch(1)
20 changes: 20 additions & 0 deletions grr/server/grr_response_server/databases/mysql_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import contextlib
import logging
import os
import re
import time
from typing import Callable, Optional, Sequence, Text

Expand Down Expand Up @@ -107,4 +108,23 @@ def DumpCurrentSchema(cursor: Cursor) -> Text:
rows = cursor.fetchall()
defs.append(rows[0][1])

cursor.execute("""
SELECT TRIGGER_NAME FROM INFORMATION_SCHEMA.TRIGGERS
WHERE trigger_schema = (SELECT DATABASE())
""")
for (trigger,) in sorted(cursor.fetchall()):
cursor.execute(f"SHOW CREATE TRIGGER `{trigger}`")
rows = cursor.fetchall()

# `SHOW CREATE TRIGGER` will return the concrete definer of the trigger,
# so we need to patch its output here to show the default `CURRENT_USER`.
trigger_def = re.sub(
r"^CREATE\s+DEFINER\s*=\s*`[^`]+`(@`[^`]+`)?\s*TRIGGER",
"CREATE DEFINER = CURRENT_USER TRIGGER",
rows[0][2],
count=1,
flags=re.DOTALL | re.MULTILINE,
)
defs.append(trigger_def)

return "\n\n".join(defs)
Loading

0 comments on commit 7f93f8b

Please sign in to comment.