Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenFGA Integration #673

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions packages/opal-client/opal_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from opal_client.data.api import init_data_router
from opal_client.data.fetcher import DataFetcher
from opal_client.data.updater import DataUpdater
from opal_client.engine.options import CedarServerOptions, OpaServerOptions
from opal_client.engine.runner import CedarRunner, OpaRunner
from opal_client.engine.options import CedarServerOptions, OpaServerOptions, OpenFGAServerOptions
from opal_client.engine.runner import CedarRunner, OpaRunner, OpenFGARunner
from opal_client.limiter import StartupLoadLimiter
from opal_client.policy.api import init_policy_router
from opal_client.policy.updater import PolicyUpdater
Expand All @@ -47,6 +47,8 @@ def __init__(
policy_updater: PolicyUpdater = None,
inline_opa_enabled: bool = None,
inline_opa_options: OpaServerOptions = None,
inline_openfga_enabled: bool = None,
inline_openfga_options: OpenFGAServerOptions = None,
inline_cedar_enabled: bool = None,
inline_cedar_options: CedarServerOptions = None,
verifier: Optional[JWTVerifier] = None,
Expand All @@ -72,6 +74,11 @@ def __init__(
inline_opa_enabled: bool = (
inline_opa_enabled or opal_client_config.INLINE_OPA_ENABLED
)

inline_openfga_enabled: bool = (
inline_openfga_enabled or opal_client_config.INLINE_OPENFGA_ENABLED
)

inline_cedar_enabled: bool = (
inline_cedar_enabled or opal_client_config.INLINE_CEDAR_ENABLED
)
Expand Down Expand Up @@ -151,6 +158,7 @@ def __init__(
inline_cedar_enabled,
inline_opa_options,
inline_cedar_options,
inline_openfga_options,
)

custom_ssl_context = get_custom_ssl_context()
Expand Down Expand Up @@ -190,9 +198,11 @@ def _init_engine_runner(
self,
inline_opa_enabled: bool,
inline_cedar_enabled: bool,
inline_openfga_enabled: bool,
inline_opa_options: Optional[OpaServerOptions] = None,
inline_cedar_options: Optional[CedarServerOptions] = None,
) -> Union[OpaRunner, CedarRunner, Literal[False]]:
inline_openfga_options: Optional[OpenFGAServerOptions] = None,
) -> Union[OpaRunner, CedarRunner, OpenFGARunner, Literal[False]]:
if inline_opa_enabled and self.policy_store_type == PolicyStoreTypes.OPA:
inline_opa_options = (
inline_opa_options or opal_client_config.INLINE_OPA_CONFIG
Expand Down Expand Up @@ -230,6 +240,16 @@ def _init_engine_runner(
piped_logs_format=opal_client_config.INLINE_CEDAR_LOG_FORMAT,
)

elif inline_openfga_enabled and self.policy_store_type == PolicyStoreTypes.OPENFGA:
inline_openfga_options = (
inline_openfga_options or opal_client_config.INLINE_OPENFGA_CONFIG
)
return OpenFGARunner.setup_openfga_runner(
options=inline_openfga_options,
piped_logs_format=opal_client_config.INLINE_OPENFGA_LOG_FORMAT,
rehydration_callbacks=rehydration_callbacks,
)

return False

def _init_fast_api_app(self):
Expand Down
28 changes: 27 additions & 1 deletion packages/opal-client/opal_client/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum

from opal_client.engine.options import CedarServerOptions, OpaServerOptions
from opal_client.engine.options import CedarServerOptions, OpaServerOptions, OpenFGAServerOptions
from opal_client.policy.options import ConnRetryOptions
from opal_client.policy_store.schemas import PolicyStoreAuth, PolicyStoreTypes
from opal_common.confi import Confi, confi
Expand All @@ -24,9 +24,14 @@ class OpalClientConfig(Confi):
)
POLICY_STORE_URL = confi.str("POLICY_STORE_URL", "http://localhost:8181")

#openfga
OPENFGA_URL = confi.str("OPENFGA_URL", "http://localhost:8080")
OPENFGA_STORE_ID = confi.str("OPENFGA_STORE_ID", None, description="The OpenFGA store ID to use")

POLICY_STORE_AUTH_TYPE = confi.enum(
"POLICY_STORE_AUTH_TYPE", PolicyStoreAuth, PolicyStoreAuth.NONE
)

POLICY_STORE_AUTH_TOKEN = confi.str(
"POLICY_STORE_AUTH_TOKEN",
None,
Expand Down Expand Up @@ -165,6 +170,22 @@ def load_policy_store():
"INLINE_CEDAR_LOG_FORMAT", EngineLogFormat, EngineLogFormat.NONE
)


# OpenFGA runner configuration
INLINE_OPENFGA_ENABLED = confi.bool("INLINE_OPENFGA_ENABLED", True)

INLINE_OPENFGA_CONFIG = confi.model(
"INLINE_OPENFGA_CONFIG",
OpenFGAServerOptions,
{}, # defaults are being set according to OpenFGAServerOptions pydantic definitions (see class)
description="cli options used when running OpenFGA inline",
)

INLINE_OPENFGA_LOG_FORMAT: EngineLogFormat = confi.enum(
"INLINE_OPENFGA_LOG_FORMAT", EngineLogFormat, EngineLogFormat.NONE
)


# configuration for fastapi routes
ALLOWED_ORIGINS = ["*"]

Expand Down Expand Up @@ -334,6 +355,11 @@ def on_load(self):
opal_common_config.LOG_MODULE_EXCLUDE_LIST = (
opal_common_config.LOG_MODULE_EXCLUDE_LIST
)

# Set the appropriate URL based on the policy store type
if self.POLICY_STORE_TYPE == PolicyStoreTypes.OPENFGA:
self.POLICY_STORE_URL = self.OPENFGA_URL


if self.DATA_STORE_CONN_RETRY is not None:
# You should use `DATA_UPDATER_CONN_RETRY`, but that's for backwards compatibility
Expand Down
45 changes: 27 additions & 18 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
BasePolicyStoreClient,
JsonableValue,
)
from opal_client.policy_store.openfga_client import OpenFGAClient
from opal_client.policy_store.policy_store_client_factory import (
DEFAULT_POLICY_STORE_GETTER,
)
Expand Down Expand Up @@ -471,25 +472,33 @@ async def _store_fetched_update(self, update_item):
)

try:
if (
opal_client_config.SPLIT_ROOT_DATA
and policy_store_path in ("/", "")
and isinstance(policy_data, dict)
):
await self._set_split_policy_data(
store_transaction,
url=url,
save_method=entry.save_method,
data=policy_data,
)
if isinstance(self._policy_store, OpenFGAClient):
# OpenFGA expects a list of tuples
if isinstance(policy_data, list):
await store_transaction.set_policy_data(policy_data, path=policy_store_path)
else:
logger.warning(f"Skipping non-list data for OpenFGA: {policy_store_path}")
else:
await self._set_policy_data(
store_transaction,
url=url,
path=policy_store_path,
save_method=entry.save_method,
data=policy_data,
)

if (
opal_client_config.SPLIT_ROOT_DATA
and policy_store_path in ("/", "")
and isinstance(policy_data, dict)
):
await self._set_split_policy_data(
store_transaction,
url=url,
save_method=entry.save_method,
data=policy_data,
)
else:
await self._set_policy_data(
store_transaction,
url=url,
path=policy_store_path,
save_method=entry.save_method,
data=policy_data,
)
# No exception we we're able to save to the policy-store
report.saved = True
# save the report for the entry
Expand Down
78 changes: 78 additions & 0 deletions packages/opal-client/opal_client/engine/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,84 @@ def get_opa_startup_files(self) -> str:
return " ".join(files)


class OpenFGAServerOptions(BaseModel):
"""Options to configure the OpenFGA server (apply when choosing to run OpenFGA inline)."""

addr: str = Field(
":8080",
description="listening address of the OpenFGA server (e.g., [ip]:<port> for TCP)",
)
authentication: AuthenticationScheme = Field(
AuthenticationScheme.off,
description="OpenFGA authentication scheme (default off)",
)
authentication_token: Optional[str] = Field(
None,
description="If authentication is 'token', this specifies the token to use.",
)
store_id: Optional[str] = Field(
None,
description="The OpenFGA store ID to use.",
)

class Config:
use_enum_values = True
allow_population_by_field_name = True

@classmethod
def alias_generator(cls, string: str) -> str:
return "--{}".format(string.replace("_", "-"))

@validator("authentication")
def validate_authentication(cls, v: AuthenticationScheme):
if v not in [AuthenticationScheme.off, AuthenticationScheme.token]:
raise ValueError("Invalid AuthenticationScheme for OpenFGA.")
return v

@validator("authentication_token")
def validate_authentication_token(cls, v: Optional[str], values: dict[str, Any]):
if values["authentication"] == AuthenticationScheme.token and v is None:
raise ValueError(
"A token must be specified for AuthenticationScheme.token."
)
return v

def get_cmdline(self) -> str:
result = [
"openfga-server", # Assuming there's an openfga-server command
]
if (
self.authentication == AuthenticationScheme.token
and self.authentication_token is not None
):
result += [
"--token",
self.authentication_token,
]
addr = self.addr.split(":", 1)
port = None
if len(addr) == 1:
listen_address = addr[0]
elif len(addr) == 2:
listen_address, port = addr
if len(listen_address) == 0:
listen_address = "0.0.0.0"
result += [
"--addr",
listen_address,
]
if port is not None:
result += [
"--port",
port,
]
if self.store_id is not None:
result += [
"--store-id",
self.store_id,
]
return " ".join(result)

class CedarServerOptions(BaseModel):
"""Options to configure the Cedar agent (apply when choosing to run Cedar
inline)."""
Expand Down
50 changes: 49 additions & 1 deletion packages/opal-client/opal_client/engine/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import psutil
from opal_client.config import EngineLogFormat
from opal_client.engine.logger import log_engine_output_opa, log_engine_output_simple
from opal_client.engine.options import CedarServerOptions, OpaServerOptions
from opal_client.engine.options import CedarServerOptions, OpaServerOptions, OpenFGAServerOptions
from opal_client.logger import logger
from tenacity import retry, wait_random_exponential

Expand Down Expand Up @@ -281,6 +281,54 @@ def setup_opa_runner(
return opa_runner


class OpenFGARunner(PolicyEngineRunner):
def __init__(
self,
options: Optional[OpenFGAServerOptions] = None,
piped_logs_format: EngineLogFormat = EngineLogFormat.NONE,
):
super().__init__(piped_logs_format)
self._options = options or OpenFGAServerOptions()

async def handle_log_line(self, line: bytes) -> bool:
await log_engine_output_simple(line)
return False

@property
def command(self) -> str:
return self._options.get_cmdline()

@staticmethod
def setup_openfga_runner(
options: Optional[OpenFGAServerOptions] = None,
piped_logs_format: EngineLogFormat = EngineLogFormat.NONE,
initial_start_callbacks: Optional[List[AsyncCallback]] = None,
rehydration_callbacks: Optional[List[AsyncCallback]] = None,
):
"""Factory for OpenFGARunner, accept optional callbacks to run in certain
lifecycle events.

Initial Start Callbacks:
The first time we start the engine, we might want to do certain actions (like launch tasks)
that are dependent on the policy store being up (such as PolicyUpdater, DataUpdater).

Rehydration Callbacks:
when the engine restarts, its cache is clean and it does not have the state necessary
to handle authorization queries. therefore it is necessary that we rehydrate the
cache with fresh state fetched from the server.
"""
openfga_runner = OpenFGARunner(options=options, piped_logs_format=piped_logs_format)

if initial_start_callbacks:
openfga_runner.register_process_initial_start_callbacks(
initial_start_callbacks
)

if rehydration_callbacks:
openfga_runner.register_process_restart_callbacks(rehydration_callbacks)

return openfga_runner

class CedarRunner(PolicyEngineRunner):
def __init__(
self,
Expand Down
18 changes: 16 additions & 2 deletions packages/opal-client/opal_client/policy/updater.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from typing import List, Optional

import json
import pydantic
from fastapi_websocket_pubsub import PubSubClient
from fastapi_websocket_rpc.rpc_channel import RpcChannel
Expand All @@ -15,6 +15,7 @@
from opal_client.policy_store.policy_store_client_factory import (
DEFAULT_POLICY_STORE_GETTER,
)
from opal_client.policy_store.openfga_client import OpenFGAClient
from opal_common.async_utils import TakeANumberQueue, TasksPool
from opal_common.config import opal_common_config
from opal_common.schemas.data import DataUpdateReport
Expand Down Expand Up @@ -333,7 +334,20 @@ async def update_policy(
error=bundle_error,
)
if bundle:
await store_transaction.set_policies(bundle)

if isinstance(self._policy_store, OpenFGAClient):
for policy_module in bundle.policy_modules:
if policy_module.path.endswith('.json'):
try:
policy_data = json.loads(policy_module.rego)
await store_transaction.set_policy(policy_module.path, json.dumps(policy_data))
except json.JSONDecodeError:
logger.error(f"Invalid JSON in OpenFGA policy file: {policy_module.path}")
else:
logger.warning(f"Skipping non-JSON file for OpenFGA: {policy_module.path}")
else:
await store_transaction.set_policies(bundle)

# if we got here, we did not throw during the transaction
if self._should_send_reports:
# spin off reporting (no need to wait on it)
Expand Down
Loading
Loading