-
Notifications
You must be signed in to change notification settings - Fork 0
/
ntpsync_delay.py
executable file
·105 lines (88 loc) · 3.53 KB
/
ntpsync_delay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/python
import os
import subprocess
import configparser
import logging
import ntplib
import ctypes
import errno
'''
Given an NTP Client and server host will make num_requests NTP requests and returns the most accurate offset.
'''
def get_offset(client: ntplib.NTPClient, ntp_host: str, port: int=123, num_requests: int=8, timeout: int=5):
delay = float("inf")
offset = 0
for i in range(num_requests):
# Get an ntplib.NTPStats object
logging.debug("making NTP request #{} to: {}".format(i + 1, ntp_host))
response = client.request(ntp_host, port=port, timeout=timeout)
if delay > response.delay:
logging.info("new delay {}s".format(response.delay))
delay = response.delay
offset = response.offset
return offset
'''
Parse config file for logging level
'''
def get_log_level(config):
log_level = config['log_level']
log_level_dict = {'notset': logging.NOTSET,
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL}
return log_level_dict[log_level]
'''
Call the C so that runs adjtime.
'''
def adjtime_c(time_s: int, time_us: int, so_filepath: str):
logging.debug("Calling C function from: {}".format(so_filepath))
c_functions = ctypes.CDLL(so_filepath, use_errno=True)
ret_val = c_functions.ntp_adjtime(time_s, time_us)
err = ctypes.get_errno()
if ret_val:
logging.error("adjtime returned {}".format(ret_val))
logging.critical(os.strerror(err))
return
logging.info("System clock is adjusting. This will run in the background.")
return
'''
Call the OS date command to bring the time closer to speed up
the adjtime call.
'''
def adjust_date(offset: float):
subprocess.run(["date", "-s", "{} seconds".format(offset)])
return
if __name__ == "__main__":
# Open config file
config = configparser.ConfigParser()
config.read("config.ini")
# Setup logging
#logger = logging.getLogger(__name__)
log_format = "[%(levelname)s: %(funcName)s] %(message)s"
log_config = config['Logging']
if log_config['log_file']:
logging.basicConfig(filename=log_config['log_file'], level=get_log_level(log_config), format=log_format)
logging.basicConfig(level=get_log_level(log_config), format=log_format)
if log_config['enable_log'] != 'True':
logging.disable(logging.CRITICAL)
ntp_config = config['NTPSettings']
# Create an NTPClient
c = ntplib.NTPClient()
# Calculate the offset from the NTP server
offset = get_offset(c, ntp_config['ntp_url'], int(ntp_config['ntp_port']),
int(ntp_config['number_requests']), int(ntp_config['timeout_s']))
# If we are really far off, adjust quickly and then use adjtime.
if (abs(offset) > 0.128):
logging.warning("Offset is too big {}s. Stepping time.".format(offset))
adjust_date(offset)
offset = get_offset(c, ntp_config['ntp_url'], int(ntp_config['ntp_port']),
int(ntp_config['number_requests']), int(ntp_config['timeout_s']))
logging.info("Best offset calculated: {}".format(offset))
seconds = int(offset)
microseconds = int((offset - seconds)*1e+6)
logging.debug("Seconds: {}".format(seconds))
logging.debug("Microseconds: {}".format(microseconds))
so_file = ntp_config['adjtime_so_path']
adjtime_c(seconds, microseconds, so_file)