Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add convert_rules_to_ballots Santa mgmt cmd #1062

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tests/santa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,12 @@ def force_target_counter(target_type, blocked_count=0, collected_count=0, execut
def force_rule(
target_type=Target.Type.SIGNING_ID,
target_identifier=None,
target=None,
configuration=None,
policy=Rule.Policy.BLOCKLIST,
):
target = force_target(target_type, target_identifier)
if not target:
target = force_target(target_type, target_identifier)
if configuration is None:
configuration = force_configuration()
return Rule.objects.create(configuration=configuration, target=target, policy=policy)
Expand Down
5 changes: 4 additions & 1 deletion zentral/contrib/santa/ballot_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def get_configurations_allowed_votes(self):
allowed_votes.append((configuration, cfg_allowed_votes))
return allowed_votes

def cast_votes(self, votes):
def verify_votes(self, votes):
# known voter?
if self.voter.is_anonymous:
raise VotingError("Anonymous voters cannot vote")
Expand All @@ -345,6 +345,9 @@ def cast_votes(self, votes):
raise VotingNotAllowedError(
f"Voting upvote? {yes_vote} on configuration {configuration} is not allowed"
)

def cast_votes(self, votes):
self.verify_votes(votes)
self._cast_verified_votes(votes)

def _cast_verified_votes(self, votes, event_target=None):
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from django.core.management.base import BaseCommand
from django.db import transaction
from realms.models import Realm, RealmUser
from zentral.contrib.santa.models import Configuration, Rule
from zentral.contrib.santa.ballot_box import BallotBox


class Command(BaseCommand):
help = 'Convert existing Santa rules to ballots'

def add_arguments(self, parser):
parser.add_argument("-r", "--realm", help="Realm name")
parser.add_argument("-u", "--username", help="Realm user username")
parser.add_argument("-c", "--configuration", help="Name of the Santa configuration", nargs="*")
parser.add_argument("-d", "--dry-run", help="Dry run", action="store_true")

def handle(self, *args, **options):
realm = Realm.objects.get(name=options["realm"])
realm_user = RealmUser.objects.get(realm=realm, username=options["username"])
configurations = list(Configuration.objects.filter(name__in=options["configuration"]).order_by("pk"))
dry_run = options.get("dry_run")
try:
with transaction.atomic():
self.convert_rules(realm_user, configurations, dry_run)
except Exception:
self.stderr.write("Rollback database changes")

def convert_rules(self, realm_user, configurations, dry_run):
target_votes = {}
for configuration in configurations:
for rule in configuration.rule_set.select_related("target").all():
yes_vote = Rule.Policy(rule.policy) == Rule.Policy.ALLOWLIST
target_votes.setdefault(rule.target, []).append((configuration, yes_vote))
rule_qs = Rule.objects.filter(configuration__in=configurations)

if dry_run:
self.stdout.write(f"{rule_qs.count()} rules will be deleted")
else:
rule_count, _ = rule_qs.delete()
self.stdout.write(f"{rule_count} rules deleted")

for target, votes in target_votes.items():
votes_display = ", ".join(
"['{}' {}]".format(c, "up" if y else "down")
for c, y in votes
)
target_display = f"{target.type} {target.identifier}"
ballot_box = BallotBox.for_realm_user(target, realm_user, lock_target=not dry_run, all_configurations=True)
if dry_run:
try:
ballot_box.verify_votes(votes)
except Exception:
self.stderr.write(f"Invalid votes {votes_display} on target {target_display}")
else:
self.stdout.write(f"Votes {votes_display} on target {target_display} OK")
else:
try:
ballot_box.cast_votes(votes)
except Exception:
self.stderr.write(f"Could not cast votes {votes_display} on target {target_display}")
raise
else:
self.stdout.write(f"Votes {votes_display} on target {target_display} saved")
Loading