Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect both DBM active sessions and blocking sessions which are sleeping #14054

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
82b42bc
have the activity query collect all active OR blocking server activity
Feb 28, 2023
b072c03
work in progress test
Mar 2, 2023
a8920e6
update test with working scenario
Mar 2, 2023
52489e1
fixthe sql query to collect sleeping sessions
Mar 2, 2023
07fcd8f
collect the last available text
Mar 2, 2023
f16a25d
working activity test
Mar 2, 2023
854e5e0
style fixes
Mar 2, 2023
073ee71
test activity fixes
Mar 2, 2023
48e7467
fix truncation tests
Mar 2, 2023
ba9a8b3
fix test determinism
Mar 2, 2023
8385441
remove unnecessary assertion
justiniso Jun 28, 2023
ec73efe
Merge branch 'master' into justindd/sqlserver-collect-all-blocking-qu…
justiniso Jun 28, 2023
d219ed9
tmp
lu-zhengda Aug 2, 2023
e9ee62f
run checks of idle blocking sessions
lu-zhengda Aug 2, 2023
df9e257
merge master and resolve conflicts
lu-zhengda Aug 2, 2023
8364eda
filter current sess in idle blocking sess query
lu-zhengda Aug 2, 2023
9a0cb86
collect set of blocking spids then query for idle blockers
lu-zhengda Aug 3, 2023
2a9e9ab
remove unrelated comments
lu-zhengda Aug 3, 2023
89b2f9e
enable check_result_length for _get_idle_blocking_sessions
lu-zhengda Aug 3, 2023
ce90025
grab sql text from connection most_recent_sql_handle for idle sessions
lu-zhengda Aug 9, 2023
4c109f7
Merge branch 'master' into justindd/sqlserver-collect-all-blocking-qu…
lu-zhengda Aug 9, 2023
ffe7942
run idle blocking session query only when there are sessions not capt…
lu-zhengda Aug 10, 2023
39fcdbf
Merge branch 'master' into justindd/sqlserver-collect-all-blocking-qu…
lu-zhengda Aug 10, 2023
4f47aca
update the changelog
lu-zhengda Aug 10, 2023
5bc1d5b
fix lint
lu-zhengda Aug 10, 2023
1339c01
update comments with full table name
lu-zhengda Aug 14, 2023
fd7fbbf
remove unneeded logs & update comments
lu-zhengda Aug 14, 2023
f1345cc
remove unused var
lu-zhengda Aug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sqlserver/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

***Changed***:

* Collect both DBM active sessions and blocking sessions which are sleeping. See ([#14054](https://github.com/DataDog/integrations-core/pull/14054))

## 13.0.0 / 2023-08-10

***Changed***:
Expand Down
57 changes: 56 additions & 1 deletion sqlserver/datadog_checks/sqlserver/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,40 @@
""",
).strip()

# Turns out sys.dm_exec_requests does not contain idle sessions.
# Inner joining dm_exec_sessions with dm_exec_requests will not return any idle blocking sessions.
# This prevent us reusing the same ACTIVITY_QUERY for regular activities and idle blocking sessions.
# The query below is used for idle sessions and does not join with dm_exec_requests.
# The last query execution on the connection is fetched from dm_exec_connections.most_recent_sql_handle.
IDLE_BLOCKING_SESSIONS_QUERY = re.sub(
r'\s+',
' ',
"""\
SELECT
CONVERT(
NVARCHAR, TODATETIMEOFFSET(CURRENT_TIMESTAMP, DATEPART(TZOFFSET, SYSDATETIMEOFFSET())), 126
) as now,
sess.login_name as user_name,
sess.last_request_start_time as last_request_start_time,
sess.session_id as id,
DB_NAME(sess.database_id) as database_name,
sess.status as session_status,
lqt.text as statement_text,
lu-zhengda marked this conversation as resolved.
Show resolved Hide resolved
SUBSTRING(lqt.text, 1, {proc_char_limit}) as text,
c.client_tcp_port as client_port,
c.client_net_address as client_address,
sess.host_name as host_name,
sess.program_name as program_name
FROM sys.dm_exec_sessions sess
INNER JOIN sys.dm_exec_connections c
ON sess.session_id = c.session_id
CROSS APPLY sys.dm_exec_sql_text(c.most_recent_sql_handle) lqt
lu-zhengda marked this conversation as resolved.
Show resolved Hide resolved
WHERE sess.status = 'sleeping'
AND sess.session_id IN ({blocking_session_ids})
AND c.session_id IN ({blocking_session_ids})
lu-zhengda marked this conversation as resolved.
Show resolved Hide resolved
""",
).strip()

# enumeration of the columns we collect
# from sys.dm_exec_requests
DM_EXEC_REQUESTS_COLS = [
Expand Down Expand Up @@ -153,6 +187,18 @@ def _get_active_connections(self, cursor):
self.log.debug("loaded sql server current connections len(rows)=%s", len(rows))
return rows

@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
def _get_idle_blocking_sessions(self, cursor, blocking_session_ids):
# The IDLE_BLOCKING_SESSIONS_QUERY contains minimum information on idle blocker
query = IDLE_BLOCKING_SESSIONS_QUERY.format(
blocking_session_ids=",".join(map(str, blocking_session_ids)), proc_char_limit=PROC_CHAR_LIMIT
)
self.log.debug("Running query [%s]", query)
cursor.execute(query)
columns = [i[0] for i in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
return rows

@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
def _get_activity(self, cursor, exec_request_columns):
self.log.debug("collecting sql server activity")
Expand All @@ -165,6 +211,15 @@ def _get_activity(self, cursor, exec_request_columns):
columns = [i[0] for i in cursor.description]
# construct row dicts manually as there's no DictCursor for pyodbc
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
# construct set of unique session ids
session_ids = {r['id'] for r in rows}
# construct set of blocking session ids
blocking_session_ids = {r['blocking_session_id'] for r in rows if r['blocking_session_id']}
# if there are blocking sessions and some of the session(s) are not captured in the activity query
idle_blocking_session_ids = blocking_session_ids - session_ids
if idle_blocking_session_ids:
idle_blocking_sessions = self._get_idle_blocking_sessions(cursor, idle_blocking_session_ids)
rows.extend(idle_blocking_sessions)
return rows

def _normalize_queries_and_filter_rows(self, rows, max_bytes_limit):
Expand Down Expand Up @@ -211,7 +266,7 @@ def _get_available_requests_columns(self, cursor, all_expected_columns):

@staticmethod
def _get_sort_key(r):
return r.get("query_start") or datetime.datetime.now()
return r.get("query_start") or datetime.datetime.now().isoformat()
alexandre-normand marked this conversation as resolved.
Show resolved Hide resolved

def _obfuscate_and_sanitize_row(self, row):
row = self._remove_null_vals(row)
Expand Down
157 changes: 152 additions & 5 deletions sqlserver/tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import json
import os
import re
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
from copy import copy
Expand Down Expand Up @@ -145,10 +146,11 @@ def run_test_query(c, q):
assert set(event['ddtags']) == expected_instance_tags, "wrong instance tags activity"
assert type(event['collection_interval']) in (float, int), "invalid collection_interval"

assert len(event['sqlserver_activity']) == 1, "should have collected exactly one activity row"
assert len(event['sqlserver_activity']) == 2, "should have collected exactly two activity rows"
event['sqlserver_activity'].sort(key=lambda r: r.get('blocking_session_id', 0))
# the second query should be fred's, which is currently blocked on
# bob who is holding a table lock
blocked_row = event['sqlserver_activity'][0]
blocked_row = event['sqlserver_activity'][1]
# assert the data that was collected is correct
assert blocked_row['user_name'] == "fred", "incorrect user_name"
assert blocked_row['session_status'] == "running", "incorrect session_status"
Expand Down Expand Up @@ -200,6 +202,151 @@ def run_test_query(c, q):
)


def test_activity_nested_blocking_transactions(
aggregator,
instance_docker,
dd_run_check,
dbm_instance,
):
"""
Test to ensure the check captures a scenario where a blocking idle transaction
is collected through activity. An open transaction which completes its current
request but has not committed can still hold a row-level lock preventing subsequent
sessions from updating. It is important that the Agent captures these cases to show
the complete picture and the last executed query responsible for the lock.
"""

TABLE_NAME = "##LockTest{}".format(str(int(time.time() * 1000)))

QUERIES_SETUP = (
"""
CREATE TABLE {}
(
id int,
name varchar(10),
city varchar(20)
)""".format(
TABLE_NAME
),
"INSERT INTO {} VALUES (1001, 'tire', 'sfo')".format(TABLE_NAME),
"INSERT INTO {} VALUES (1002, 'wisth', 'nyc')".format(TABLE_NAME),
"INSERT INTO {} VALUES (1003, 'tire', 'aus')".format(TABLE_NAME),
"COMMIT",
)

QUERY1 = """UPDATE {} SET [name] = 'west' WHERE [id] = 1001""".format(TABLE_NAME)
QUERY2 = """UPDATE {} SET [name] = 'fast' WHERE [id] = 1001""".format(TABLE_NAME)
QUERY3 = """UPDATE {} SET [city] = 'blow' WHERE [id] = 1001""".format(TABLE_NAME)

check = SQLServer(CHECK_NAME, {}, [dbm_instance])
conn1 = _get_conn_for_user(instance_docker, "fred", _autocommit=False)
conn2 = _get_conn_for_user(instance_docker, "bob", _autocommit=False)
conn3 = _get_conn_for_user(instance_docker, "fred", _autocommit=False)

close_conns = threading.Event()

def run_queries(conn, queries):
cur = conn.cursor()
cur.execute("USE {}".format("datadog_test"))
cur.execute("BEGIN TRANSACTION")
for q in queries:
try:
cur.execute(q)
except pyodbc.OperationalError:
# This is expected since the query (might be) blocked
pass
# Do not allow the conn to be garbage collected and closed until the global lock is released
while not close_conns.is_set():
time.sleep(0.1)
cur.execute("COMMIT")

# Setup
cur = conn1.cursor()
for q in QUERIES_SETUP:
cur.execute(q)

# Transaction 1
t1 = threading.Thread(target=run_queries, args=(conn1, [QUERY1]))
# Transaction 2
t2 = threading.Thread(target=run_queries, args=(conn2, [QUERY2]))
# Transaction 3
t3 = threading.Thread(target=run_queries, args=(conn3, [QUERY3]))

t1.start()
time.sleep(0.3)
t2.start()
time.sleep(0.3)
t3.start()

try:
dd_run_check(check)
dbm_activity = aggregator.get_event_platform_events("dbm-activity")
finally:
close_conns.set() # Release the threads

# All 3 connections are expected
assert dbm_activity and 'sqlserver_activity' in dbm_activity[0]
assert len(dbm_activity[0]['sqlserver_activity']) == 3

activity = dbm_activity[0]['sqlserver_activity']
activity = sorted(activity, key=lambda a: a.get('blocking_session_id', 0))

root_blocker = activity[0]
tx2 = activity[1]
tx3 = activity[2]

# Expect to capture the root blocker, which would have a sleeping transaction but no
# associated sys.dm_exec_requests.
assert root_blocker["user_name"] == "fred"
assert root_blocker["session_status"] == "sleeping"
assert root_blocker["database_name"] == "datadog_test"
assert root_blocker["last_request_start_time"]
assert root_blocker["client_port"]
assert root_blocker["client_address"]
assert root_blocker["host_name"]
# Expect to capture the query signature for the root blocker
# query text is not captured from the req dmv
# but available in the connection dmv with most_recent_sql_handle
assert root_blocker["query_signature"]
# we do not capture requests for sleeping sessions
assert "blocking_session_id" not in root_blocker
assert "request_status" not in root_blocker
assert "query_start" not in root_blocker

# TX2 should be blocked by the root blocker TX1, TX3 should be blocked by TX2
assert tx2["blocking_session_id"] == root_blocker["id"]
assert tx3["blocking_session_id"] == tx2["id"]
# TX2 and TX3 should be running
assert tx2["session_status"] == "running"
assert tx3["session_status"] == "running"
# verify other essential fields are present
assert tx2["user_name"] == "bob"
assert tx2["database_name"] == "datadog_test"
assert tx2["last_request_start_time"]
assert tx2["client_port"]
assert tx2["client_address"]
assert tx2["host_name"]
assert tx2["query_signature"]
assert tx2["request_status"]
assert tx2["query_start"]
assert tx2["query_hash"]
assert tx2["query_plan_hash"]
assert tx3["user_name"] == "fred"
assert tx3["database_name"] == "datadog_test"
assert tx3["last_request_start_time"]
assert tx3["client_port"]
assert tx3["client_address"]
assert tx3["host_name"]
assert tx3["query_signature"]
assert tx3["request_status"]
assert tx3["query_start"]
assert tx3["query_hash"]
assert tx3["query_plan_hash"]

for t in [t1, t2, t3]:
t.join()


@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@pytest.mark.parametrize(
Expand Down Expand Up @@ -306,15 +453,15 @@ def test_activity_reported_hostname(


def new_time():
return datetime.datetime(2021, 9, 23, 23, 21, 21, 669330)
return datetime.datetime(2021, 9, 23, 23, 21, 21, 669330).isoformat()


def old_time():
return datetime.datetime(2021, 9, 22, 22, 21, 21, 669330)
return datetime.datetime(2021, 9, 22, 22, 21, 21, 669330).isoformat()


def very_old_time():
return datetime.datetime(2021, 9, 20, 23, 21, 21, 669330)
return datetime.datetime(2021, 9, 20, 23, 21, 21, 669330).isoformat()


@pytest.mark.parametrize(
Expand Down
Loading