Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: yaroslavborbat <[email protected]>
  • Loading branch information
yaroslavborbat committed Dec 11, 2024
1 parent a7209a7 commit e36bb05
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions hooks/mc_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def parse_ip_address(ip_string):
class ModuleConfigValidateHook(Hook):
SNAPSHOT_NAME = "virtualmachineipaddresslease"
VALIDATOR_NAME = "moduleconfig-virtualization"
MODULE_CONFIG_KIND = "ModuleConfig"

def __init__(self, module_name: str):
self.module_name = module_name
self.namespace = common.NAMESPACE
self.MODULE_CONFIG_KIND = "ModuleConfig"

def generate_config(self) -> dict:
return {
Expand Down Expand Up @@ -48,34 +48,34 @@ def generate_config(self) -> dict:
}
]
}

@staticmethod
def __allow(ctx: hook.Context, msg: str):
ctx.output.validations.allow(msg)

@staticmethod
def __deny(ctx: hook.Context, msg: str):
ctx.output.validations.deny(msg)

def reconcile(self) -> Callable[[hook.Context], None]:
def r(ctx: hook.Context):

request = ctx.binding_context.get("review", {}).get("request")
if len(request) == 0:
self.__allow(ctx, "")
return

kind = request.get("kind", {}).get("kind", "")
name = request.get("name", "")
if kind != self.MODULE_CONFIG_KIND or name != self.module_name:
self.__allow(ctx, "")
return

lease_names = [n["filterResult"]["name"] for n in ctx.snapshots.get(self.SNAPSHOT_NAME, [])]
if len(lease_names) == 0:
self.__allow(ctx, "")
return

old_subnetes = request.get("oldObject", {}).get("spec", {}).get("settings", {}).get("virtualMachineCIDRs")
new_subnets = request.get("object", {}).get("spec", {}).get("settings", {}).get("virtualMachineCIDRs")

Expand All @@ -87,14 +87,14 @@ def r(ctx: hook.Context):
if len(validate_subnets) == 0:
self.__allow(ctx, "")
return

for name in lease_names:
ip = ip_address(parse_ip_address(name))
for subnet in validate_subnets:
if ip in subnet:
self.__deny(ctx, f"Subnet {subnet} is in use by one or more IP addresses.")
return

self.__allow(ctx, "")
return r

Expand Down

0 comments on commit e36bb05

Please sign in to comment.