Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rudimentary custom wgconfig #112

Open
wants to merge 1 commit into
base: humble
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fogros2/fogros2/cloud_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def apt_install(self, args):
)

def pip_install(self, args):
self.scp.execute_cmd(f"python3 -m pip install {args}")
self.scp.execute_cmd(f"sudo pip3 install {args}")

def install_cloud_dependencies(self):
self.apt_install("wireguard unzip docker.io python3-pip ros-humble-rmw-cyclonedds-cpp")
Expand Down
243 changes: 12 additions & 231 deletions fogros2/fogros2/wgconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,244 +31,25 @@
# PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO OBLIGATION TO PROVIDE
# MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.


from builtins import str
from builtins import range
from io import open
import os


class WGConfig():
SECTION_FIRSTLINE = '_index_firstline'
SECTION_LASTLINE = '_index_lastline'
SECTION_RAW = '_rawdata'
_interface = None # interface attributes
_peers = None # peer data

def __init__(self, file, keyattr='PublicKey'):
self.filename = self.file2filename(file)
self.keyattr = keyattr
self.lines = []
self.initialize_file()

@staticmethod
def file2filename(file):
if os.path.basename(file) == file:
if not file.endswith('.conf'):
file += '.conf'
file = os.path.join('/etc/wireguard', file)
return file

def invalidate_data(self):
self._interface = None
self._peers = None

def read_file(self):
with open(self.filename, 'r') as wgfile:
self.lines = [line.rstrip() for line in wgfile.readlines()]
self.invalidate_data()
self.filename = file
self.keyattr = keyattr
self.lines = ['[Interface]']

def write_file(self, file=None):
if file is None:
filename = self.filename
else:
filename = self.file2filename(file)
with os.fdopen(
os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o640), 'w') as wgfile:
def write_file(self):
with os.fdopen(os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o640), 'w') as wgfile:
wgfile.writelines(line + '\n' for line in self.lines)

@staticmethod
def parse_line(line):
attr, _, value = line.partition('=')
attr = attr.strip()
parts = value.partition('#')
value = parts[0].strip() # strip comments and whitespace
value = str(value) # this line is for Python2 support only
comment = parts[1] + parts[2]
if value.isnumeric():
value = [int(value)]
else:
# decompose into list based on commata as separator
value = [item.strip() for item in value.split(',')]
return attr, value, comment

def parse_lines(self):
# There will be two special attributes in the parsed data:
# _index_firstline: Line (zero indexed) of the section header
# (including any leading lines with comments)
# _index_lastline: Line (zero indexed) of the last attribute line of the section
# (including any directly following comments)

def close_section(section, section_data):
section_data = {k: (v if len(v) > 1 else v[0]) for k, v in section_data.items()}
if section is None: # nothing to close on first section
return
elif section == 'interface': # close interface section
self._interface = section_data
else: # close peer section
peername = section_data.get(self.keyattr)
self._peers[peername] = section_data
section_data[self.SECTION_RAW] = self.lines[section_data[self.SECTION_FIRSTLINE]:
(section_data[self.SECTION_LASTLINE] + 1)]

self._interface = dict()
self._peers = dict()
section = None
section_data = dict()
last_empty_line_in_section = -1 # virtual empty line before start of file
for i, line in enumerate(self.lines):
# Ignore leading whitespace and trailing whitespace
line = line.strip()
# Ignore empty lines and comments
if len(line) == 0:
last_empty_line_in_section = i
continue
if line.startswith('['): # section
if last_empty_line_in_section is not None:
section_data[self.SECTION_LASTLINE] = [last_empty_line_in_section - 1]
close_section(section, section_data)
section_data = dict()
section = line[1:].partition(']')[0].lower()
if last_empty_line_in_section is None:
section_data[self.SECTION_FIRSTLINE] = [i]
else:
section_data[self.SECTION_FIRSTLINE] = [last_empty_line_in_section + 1]
last_empty_line_in_section = None
section_data[self.SECTION_LASTLINE] = [i]
if section not in ['interface', 'peer']:
raise ValueError('Unsupported section [{0}] in line {1}'.format(section, i))
elif line.startswith('#'):
section_data[self.SECTION_LASTLINE] = [i]
else: # regular line
attr, value, _comment = self.parse_line(line)
section_data[attr] = section_data.get(attr, [])
section_data[attr].extend(value)
section_data[self.SECTION_LASTLINE] = [i]
close_section(section, section_data)

def handle_leading_comment(self, leading_comment):
if leading_comment is not None:
if leading_comment.strip()[0] != '#':
raise ValueError('A comment needs to start with a "#"')
self.lines.append(leading_comment)

def initialize_file(self, leading_comment=None):
self.lines = list()
self.handle_leading_comment(leading_comment) # add leading comment if needed
self.lines.append('[Interface]')
self.invalidate_data()

def add_peer(self, key, leading_comment=None):
if key in self.peers:
raise KeyError('Peer to be added already exists')
self.lines.append('') # append an empty line for separation
self.handle_leading_comment(leading_comment) # add leading comment if needed
# Append peer with key attribute

def add_peer(self, key, leading_comment):
self.lines.append('')
self.lines.append(leading_comment)
self.lines.append('[Peer]')
self.lines.append('{0} = {1}'.format(self.keyattr, key))
# Invalidate data cache
self.invalidate_data()

def del_peer(self, key):
if key not in self.peers:
raise KeyError('The peer to be deleted does not exist')
section_firstline = self.peers[key][self.SECTION_FIRSTLINE]
section_lastline = self.peers[key][self.SECTION_LASTLINE]
# Remove a blank line directly before the peer section
if section_firstline > 0:
if len(self.lines[section_firstline - 1]) == 0:
section_firstline -= 1
# Only keep needed lines
result = []
if section_firstline > 0:
result.extend(self.lines[0:section_firstline])
result.extend(self.lines[(section_lastline + 1):])
self.lines = result
# Invalidate data cache
self.invalidate_data()

def get_sectioninfo(self, key):
if key is None: # interface
section_firstline = self.interface[self.SECTION_FIRSTLINE]
section_lastline = self.interface[self.SECTION_LASTLINE]
else: # peer
if key not in self.peers:
raise KeyError('The specified peer does not exist')
section_firstline = self.peers[key][self.SECTION_FIRSTLINE]
section_lastline = self.peers[key][self.SECTION_LASTLINE]
return section_firstline, section_lastline

def add_attr(self, key, attr, value, leading_comment=None, append_as_line=False):
section_firstline, section_lastline = self.get_sectioninfo(key)
if leading_comment is not None:
if leading_comment.strip()[0] != '#':
raise ValueError('A comment needs to start with a "#"')
# Look for line with the attribute
line_found = None
for i in range(section_firstline + 1, section_lastline + 1):
line_attr, line_value, line_comment = self.parse_line(self.lines[i])
if attr == line_attr:
line_found = i
# Add the attribute at the right place
if (line_found is None) or append_as_line:
line_found = section_lastline if (line_found is None) else line_found
line_found += 1
self.lines.insert(line_found, '{0} = {1}'.format(attr, value))
else:
line_attr, line_value, line_comment = self.parse_line(self.lines[line_found])
line_value.append(value)
if len(line_comment) > 0:
line_comment = ' ' + line_comment
line_value = [str(item) for item in line_value]
self.lines[line_found] = line_attr + ' = ' + ', '.join(line_value) + line_comment
# Handle leading comments
if leading_comment is not None:
self.lines.insert(line_found, leading_comment)
# Invalidate data cache
self.invalidate_data()

def del_attr(self, key, attr, value=None, remove_leading_comments=True):
section_firstline, section_lastline = self.get_sectioninfo(key)
# Find all lines with matching attribute name and (if requested) value
line_found = []
for i in range(section_firstline + 1, section_lastline + 1):
line_attr, line_value, line_comment = self.parse_line(self.lines[i])
if attr == line_attr:
if (value is None) or (value in line_value):
line_found.append(i)
if len(line_found) == 0:
raise ValueError('The attribute/value to be deleted is not present')
# Process all relevant lines
for i in reversed(line_found): # reversed so that non-processed indices stay valid
if value is None:
del(self.lines[i])
else:
line_attr, line_value, line_comment = self.parse_line(self.lines[i])
line_value.remove(value)
if len(line_value) > 0: # keep remaining values in that line
self.lines[i] = line_attr + ' = ' + ', '.join(line_value) + line_comment
else: # otherwise line is no longer needed
del(self.lines[i])
# Handle leading comments
if remove_leading_comments:
i = line_found[0] - 1
while i > 0:
if len(self.lines[i]) and (self.lines[i][0] == '#'):
del(self.lines[i])
i -= 1
else:
break
# Invalidate data cache
self.invalidate_data()

@property
def interface(self):
if self._interface is None:
self.parse_lines()
return self._interface
self.lines.append(self.keyattr + ' = ' + key)

@property
def peers(self):
if self._peers is None:
self.parse_lines()
return self._peers
def add_attr(self, key, attr, value):
self.lines.append(attr + ' = ' + str(value))
14 changes: 0 additions & 14 deletions fogros2/fogros2/wgexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,3 @@ def get_publickey(wg_private):
return None
out = out.strip() # remove trailing newline
return out


def generate_keypair():
wg_private = generate_privatekey()
wg_public = get_publickey(wg_private)
return wg_private, wg_public


def generate_presharedkey():
out, err, returncode = execute('wg genpsk', suppressoutput=True)
if (returncode != 0) or (len(err) > 0):
return None
out = out.strip() # remove trailing newline
return out