Skip to content

Commit

Permalink
Add convert_rules_to_ballots Santa mgmt cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
np5 committed Sep 20, 2024
1 parent 4dae550 commit d793ae1
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
4 changes: 3 additions & 1 deletion tests/santa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,12 @@ def force_target_counter(target_type, blocked_count=0, collected_count=0, execut
def force_rule(
target_type=Target.Type.SIGNING_ID,
target_identifier=None,
target=None,
configuration=None,
policy=Rule.Policy.BLOCKLIST,
):
target = force_target(target_type, target_identifier)
if not target:
target = force_target(target_type, target_identifier)
if configuration is None:
configuration = force_configuration()
return Rule.objects.create(configuration=configuration, target=target, policy=policy)
Expand Down
5 changes: 4 additions & 1 deletion zentral/contrib/santa/ballot_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def get_configurations_allowed_votes(self):
allowed_votes.append((configuration, cfg_allowed_votes))
return allowed_votes

def cast_votes(self, votes):
def verify_votes(self, votes):
# known voter?
if self.voter.is_anonymous:
raise VotingError("Anonymous voters cannot vote")
Expand All @@ -345,6 +345,9 @@ def cast_votes(self, votes):
raise VotingNotAllowedError(
f"Voting upvote? {yes_vote} on configuration {configuration} is not allowed"
)

def cast_votes(self, votes):
self.verify_votes(votes)
self._cast_verified_votes(votes)

def _cast_verified_votes(self, votes, event_target=None):
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from django.core.management.base import BaseCommand
from django.db import transaction
from realms.models import Realm, RealmUser
from zentral.contrib.santa.models import Configuration, Rule
from zentral.contrib.santa.ballot_box import BallotBox


class Command(BaseCommand):
help = 'Convert existing Santa rules to ballots'

def add_arguments(self, parser):
parser.add_argument("-r", "--realm", help="Realm name")
parser.add_argument("-u", "--username", help="Realm user username")
parser.add_argument("-c", "--configuration", help="Name of the Santa configuration", nargs="*")
parser.add_argument("-d", "--dry-run", help="Dry run", action="store_true")

def handle(self, *args, **options):
realm = Realm.objects.get(name=options["realm"])
realm_user = RealmUser.objects.get(realm=realm, username=options["username"])
configurations = list(Configuration.objects.filter(name__in=options["configuration"]).order_by("pk"))
dry_run = options.get("dry_run")
try:
with transaction.atomic():
self.convert_rules(realm_user, configurations, dry_run)
except Exception:
self.stderr.write("Rollback database changes")

def convert_rules(self, realm_user, configurations, dry_run):
target_votes = {}
for configuration in configurations:
for rule in configuration.rule_set.select_related("target").all():
yes_vote = Rule.Policy(rule.policy) == Rule.Policy.ALLOWLIST
target_votes.setdefault(rule.target, []).append((configuration, yes_vote))
rule_qs = Rule.objects.filter(configuration__in=configurations)

if dry_run:
self.stdout.write(f"{rule_qs.count()} rules will be deleted")
else:
rule_count, _ = rule_qs.delete()
self.stdout.write(f"{rule_count} rules deleted")

for target, votes in target_votes.items():
votes_display = ", ".join(
"['{}' {}]".format(c, "up" if y else "down")
for c, y in votes
)
target_display = f"{target.type} {target.identifier}"
ballot_box = BallotBox.for_realm_user(target, realm_user, lock_target=not dry_run, all_configurations=True)
if dry_run:
try:
ballot_box.verify_votes(votes)
except Exception:
self.stderr.write(f"Invalid votes {votes_display} on target {target_display}")
else:
self.stdout.write(f"Votes {votes_display} on target {target_display} OK")
else:
try:
ballot_box.cast_votes(votes)
except Exception:
self.stderr.write(f"Could not cast votes {votes_display} on target {target_display}")
raise
else:
self.stdout.write(f"Votes {votes_display} on target {target_display} saved")

0 comments on commit d793ae1

Please sign in to comment.