diff --git a/sqlserver/CHANGELOG.md b/sqlserver/CHANGELOG.md index 9cc4263bede65..a816643ead510 100644 --- a/sqlserver/CHANGELOG.md +++ b/sqlserver/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +***Changed***: + +* Collect both DBM active sessions and blocking sessions which are sleeping. See ([#14054](https://github.com/DataDog/integrations-core/pull/14054)) + ## 13.0.0 / 2023-08-10 ***Changed***: diff --git a/sqlserver/datadog_checks/sqlserver/activity.py b/sqlserver/datadog_checks/sqlserver/activity.py index 51835fc7fe65f..b439f0dbc0a48 100644 --- a/sqlserver/datadog_checks/sqlserver/activity.py +++ b/sqlserver/datadog_checks/sqlserver/activity.py @@ -74,6 +74,40 @@ """, ).strip() +# Turns out sys.dm_exec_requests does not contain idle sessions. +# Inner joining dm_exec_sessions with dm_exec_requests will not return any idle blocking sessions. +# This prevent us reusing the same ACTIVITY_QUERY for regular activities and idle blocking sessions. +# The query below is used for idle sessions and does not join with dm_exec_requests. +# The last query execution on the connection is fetched from dm_exec_connections.most_recent_sql_handle. +IDLE_BLOCKING_SESSIONS_QUERY = re.sub( + r'\s+', + ' ', + """\ +SELECT + CONVERT( + NVARCHAR, TODATETIMEOFFSET(CURRENT_TIMESTAMP, DATEPART(TZOFFSET, SYSDATETIMEOFFSET())), 126 + ) as now, + sess.login_name as user_name, + sess.last_request_start_time as last_request_start_time, + sess.session_id as id, + DB_NAME(sess.database_id) as database_name, + sess.status as session_status, + lqt.text as statement_text, + SUBSTRING(lqt.text, 1, {proc_char_limit}) as text, + c.client_tcp_port as client_port, + c.client_net_address as client_address, + sess.host_name as host_name, + sess.program_name as program_name +FROM sys.dm_exec_sessions sess + INNER JOIN sys.dm_exec_connections c + ON sess.session_id = c.session_id + CROSS APPLY sys.dm_exec_sql_text(c.most_recent_sql_handle) lqt +WHERE sess.status = 'sleeping' + AND sess.session_id IN ({blocking_session_ids}) + AND c.session_id IN ({blocking_session_ids}) +""", +).strip() + # enumeration of the columns we collect # from sys.dm_exec_requests DM_EXEC_REQUESTS_COLS = [ @@ -153,6 +187,18 @@ def _get_active_connections(self, cursor): self.log.debug("loaded sql server current connections len(rows)=%s", len(rows)) return rows + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_idle_blocking_sessions(self, cursor, blocking_session_ids): + # The IDLE_BLOCKING_SESSIONS_QUERY contains minimum information on idle blocker + query = IDLE_BLOCKING_SESSIONS_QUERY.format( + blocking_session_ids=",".join(map(str, blocking_session_ids)), proc_char_limit=PROC_CHAR_LIMIT + ) + self.log.debug("Running query [%s]", query) + cursor.execute(query) + columns = [i[0] for i in cursor.description] + rows = [dict(zip(columns, row)) for row in cursor.fetchall()] + return rows + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _get_activity(self, cursor, exec_request_columns): self.log.debug("collecting sql server activity") @@ -165,6 +211,15 @@ def _get_activity(self, cursor, exec_request_columns): columns = [i[0] for i in cursor.description] # construct row dicts manually as there's no DictCursor for pyodbc rows = [dict(zip(columns, row)) for row in cursor.fetchall()] + # construct set of unique session ids + session_ids = {r['id'] for r in rows} + # construct set of blocking session ids + blocking_session_ids = {r['blocking_session_id'] for r in rows if r['blocking_session_id']} + # if there are blocking sessions and some of the session(s) are not captured in the activity query + idle_blocking_session_ids = blocking_session_ids - session_ids + if idle_blocking_session_ids: + idle_blocking_sessions = self._get_idle_blocking_sessions(cursor, idle_blocking_session_ids) + rows.extend(idle_blocking_sessions) return rows def _normalize_queries_and_filter_rows(self, rows, max_bytes_limit): @@ -211,7 +266,7 @@ def _get_available_requests_columns(self, cursor, all_expected_columns): @staticmethod def _get_sort_key(r): - return r.get("query_start") or datetime.datetime.now() + return r.get("query_start") or datetime.datetime.now().isoformat() def _obfuscate_and_sanitize_row(self, row): row = self._remove_null_vals(row) diff --git a/sqlserver/tests/test_activity.py b/sqlserver/tests/test_activity.py index 095bc4766dff5..6e6c32e79fb2c 100644 --- a/sqlserver/tests/test_activity.py +++ b/sqlserver/tests/test_activity.py @@ -9,6 +9,7 @@ import json import os import re +import threading import time from concurrent.futures.thread import ThreadPoolExecutor from copy import copy @@ -145,10 +146,11 @@ def run_test_query(c, q): assert set(event['ddtags']) == expected_instance_tags, "wrong instance tags activity" assert type(event['collection_interval']) in (float, int), "invalid collection_interval" - assert len(event['sqlserver_activity']) == 1, "should have collected exactly one activity row" + assert len(event['sqlserver_activity']) == 2, "should have collected exactly two activity rows" + event['sqlserver_activity'].sort(key=lambda r: r.get('blocking_session_id', 0)) # the second query should be fred's, which is currently blocked on # bob who is holding a table lock - blocked_row = event['sqlserver_activity'][0] + blocked_row = event['sqlserver_activity'][1] # assert the data that was collected is correct assert blocked_row['user_name'] == "fred", "incorrect user_name" assert blocked_row['session_status'] == "running", "incorrect session_status" @@ -200,6 +202,151 @@ def run_test_query(c, q): ) +def test_activity_nested_blocking_transactions( + aggregator, + instance_docker, + dd_run_check, + dbm_instance, +): + """ + Test to ensure the check captures a scenario where a blocking idle transaction + is collected through activity. An open transaction which completes its current + request but has not committed can still hold a row-level lock preventing subsequent + sessions from updating. It is important that the Agent captures these cases to show + the complete picture and the last executed query responsible for the lock. + """ + + TABLE_NAME = "##LockTest{}".format(str(int(time.time() * 1000))) + + QUERIES_SETUP = ( + """ + CREATE TABLE {} + ( + id int, + name varchar(10), + city varchar(20) + )""".format( + TABLE_NAME + ), + "INSERT INTO {} VALUES (1001, 'tire', 'sfo')".format(TABLE_NAME), + "INSERT INTO {} VALUES (1002, 'wisth', 'nyc')".format(TABLE_NAME), + "INSERT INTO {} VALUES (1003, 'tire', 'aus')".format(TABLE_NAME), + "COMMIT", + ) + + QUERY1 = """UPDATE {} SET [name] = 'west' WHERE [id] = 1001""".format(TABLE_NAME) + QUERY2 = """UPDATE {} SET [name] = 'fast' WHERE [id] = 1001""".format(TABLE_NAME) + QUERY3 = """UPDATE {} SET [city] = 'blow' WHERE [id] = 1001""".format(TABLE_NAME) + + check = SQLServer(CHECK_NAME, {}, [dbm_instance]) + conn1 = _get_conn_for_user(instance_docker, "fred", _autocommit=False) + conn2 = _get_conn_for_user(instance_docker, "bob", _autocommit=False) + conn3 = _get_conn_for_user(instance_docker, "fred", _autocommit=False) + + close_conns = threading.Event() + + def run_queries(conn, queries): + cur = conn.cursor() + cur.execute("USE {}".format("datadog_test")) + cur.execute("BEGIN TRANSACTION") + for q in queries: + try: + cur.execute(q) + except pyodbc.OperationalError: + # This is expected since the query (might be) blocked + pass + # Do not allow the conn to be garbage collected and closed until the global lock is released + while not close_conns.is_set(): + time.sleep(0.1) + cur.execute("COMMIT") + + # Setup + cur = conn1.cursor() + for q in QUERIES_SETUP: + cur.execute(q) + + # Transaction 1 + t1 = threading.Thread(target=run_queries, args=(conn1, [QUERY1])) + # Transaction 2 + t2 = threading.Thread(target=run_queries, args=(conn2, [QUERY2])) + # Transaction 3 + t3 = threading.Thread(target=run_queries, args=(conn3, [QUERY3])) + + t1.start() + time.sleep(0.3) + t2.start() + time.sleep(0.3) + t3.start() + + try: + dd_run_check(check) + dbm_activity = aggregator.get_event_platform_events("dbm-activity") + finally: + close_conns.set() # Release the threads + + # All 3 connections are expected + assert dbm_activity and 'sqlserver_activity' in dbm_activity[0] + assert len(dbm_activity[0]['sqlserver_activity']) == 3 + + activity = dbm_activity[0]['sqlserver_activity'] + activity = sorted(activity, key=lambda a: a.get('blocking_session_id', 0)) + + root_blocker = activity[0] + tx2 = activity[1] + tx3 = activity[2] + + # Expect to capture the root blocker, which would have a sleeping transaction but no + # associated sys.dm_exec_requests. + assert root_blocker["user_name"] == "fred" + assert root_blocker["session_status"] == "sleeping" + assert root_blocker["database_name"] == "datadog_test" + assert root_blocker["last_request_start_time"] + assert root_blocker["client_port"] + assert root_blocker["client_address"] + assert root_blocker["host_name"] + # Expect to capture the query signature for the root blocker + # query text is not captured from the req dmv + # but available in the connection dmv with most_recent_sql_handle + assert root_blocker["query_signature"] + # we do not capture requests for sleeping sessions + assert "blocking_session_id" not in root_blocker + assert "request_status" not in root_blocker + assert "query_start" not in root_blocker + + # TX2 should be blocked by the root blocker TX1, TX3 should be blocked by TX2 + assert tx2["blocking_session_id"] == root_blocker["id"] + assert tx3["blocking_session_id"] == tx2["id"] + # TX2 and TX3 should be running + assert tx2["session_status"] == "running" + assert tx3["session_status"] == "running" + # verify other essential fields are present + assert tx2["user_name"] == "bob" + assert tx2["database_name"] == "datadog_test" + assert tx2["last_request_start_time"] + assert tx2["client_port"] + assert tx2["client_address"] + assert tx2["host_name"] + assert tx2["query_signature"] + assert tx2["request_status"] + assert tx2["query_start"] + assert tx2["query_hash"] + assert tx2["query_plan_hash"] + assert tx3["user_name"] == "fred" + assert tx3["database_name"] == "datadog_test" + assert tx3["last_request_start_time"] + assert tx3["client_port"] + assert tx3["client_address"] + assert tx3["host_name"] + assert tx3["query_signature"] + assert tx3["request_status"] + assert tx3["query_start"] + assert tx3["query_hash"] + assert tx3["query_plan_hash"] + + for t in [t1, t2, t3]: + t.join() + + @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') @pytest.mark.parametrize( @@ -306,15 +453,15 @@ def test_activity_reported_hostname( def new_time(): - return datetime.datetime(2021, 9, 23, 23, 21, 21, 669330) + return datetime.datetime(2021, 9, 23, 23, 21, 21, 669330).isoformat() def old_time(): - return datetime.datetime(2021, 9, 22, 22, 21, 21, 669330) + return datetime.datetime(2021, 9, 22, 22, 21, 21, 669330).isoformat() def very_old_time(): - return datetime.datetime(2021, 9, 20, 23, 21, 21, 669330) + return datetime.datetime(2021, 9, 20, 23, 21, 21, 669330).isoformat() @pytest.mark.parametrize(