From 2008c54c265f6f5aa317e9a891a4384b089be496 Mon Sep 17 00:00:00 2001 From: daveads Date: Mon, 30 Sep 2024 02:50:12 +0100 Subject: [PATCH 1/9] Draft --- packages/opal-client/opal_client/policy_store/openfga_client.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 packages/opal-client/opal_client/policy_store/openfga_client.py diff --git a/packages/opal-client/opal_client/policy_store/openfga_client.py b/packages/opal-client/opal_client/policy_store/openfga_client.py new file mode 100644 index 00000000..f0671e8b --- /dev/null +++ b/packages/opal-client/opal_client/policy_store/openfga_client.py @@ -0,0 +1 @@ +## Draft for openfga-policy-implementation \ No newline at end of file From 81f9fa1a3e9dd87b7b24a7b75f039f70d3af0f9a Mon Sep 17 00:00:00 2001 From: daveads Date: Fri, 4 Oct 2024 07:20:27 +0100 Subject: [PATCH 2/9] test --- .../policy_store/openfga_client.py | 232 +++++++++++++++++- 1 file changed, 231 insertions(+), 1 deletion(-) diff --git a/packages/opal-client/opal_client/policy_store/openfga_client.py b/packages/opal-client/opal_client/policy_store/openfga_client.py index f0671e8b..22b285b7 100644 --- a/packages/opal-client/opal_client/policy_store/openfga_client.py +++ b/packages/opal-client/opal_client/policy_store/openfga_client.py @@ -1 +1,231 @@ -## Draft for openfga-policy-implementation \ No newline at end of file +import asyncio +import json +from typing import Dict, List, Optional, Union +from urllib.parse import quote_plus + +import aiohttp +from aiofiles.threadpool.text import AsyncTextIOWrapper +from fastapi import status +from opal_client.config import opal_client_config +from opal_client.logger import logger +from opal_client.policy_store.base_policy_store_client import ( + BasePolicyStoreClient, + JsonableValue, +) +from opal_client.policy_store.schemas import PolicyStoreAuth +from opal_common.schemas.policy import PolicyBundle +from opal_common.schemas.store import StoreTransaction, TransactionType +from tenacity import retry, stop_after_attempt, wait_exponential +from openfga_sdk import OpenFgaClient as SDKOpenFgaClient, ClientConfiguration +from openfga_sdk.client.models import ClientWriteRequest, ClientTuple, WriteAuthorizationModelRequest, ClientReadRequest, ClientCheckRequest + +RETRY_CONFIG = { + "stop": stop_after_attempt(5), + "wait": wait_exponential(multiplier=1, min=4, max=10), +} + +class OpenFGAClient(BasePolicyStoreClient): + def __init__( + self, + openfga_server_url=None, + openfga_auth_token: Optional[str] = None, + auth_type: PolicyStoreAuth = PolicyStoreAuth.NONE, + store_id: Optional[str] = None, + ): + base_url = openfga_server_url or opal_client_config.POLICY_STORE_URL + self._openfga_url = base_url + self._store_id = store_id + self._policy_version: Optional[str] = None + self._lock = asyncio.Lock() + self._token = openfga_auth_token + self._auth_type: PolicyStoreAuth = auth_type + + self._had_successful_data_transaction = False + self._had_successful_policy_transaction = False + self._most_recent_data_transaction: Optional[StoreTransaction] = None + self._most_recent_policy_transaction: Optional[StoreTransaction] = None + + configuration = ClientConfiguration( + api_url=self._openfga_url, + store_id=self._store_id, + ) + if self._auth_type == PolicyStoreAuth.TOKEN: + configuration.credentials = self._token + + self._fga_client = SDKOpenFgaClient(configuration) + + async def _get_auth_headers(self) -> Dict[str, str]: + headers: Dict[str, str] = {} + if self._auth_type == PolicyStoreAuth.TOKEN and self._token is not None: + headers["Authorization"] = f"Bearer {self._token}" + return headers + + @retry(**RETRY_CONFIG) + async def set_policy( + self, + policy_id: str, + policy_code: str, + transaction_id: Optional[str] = None, + ): + try: + model = json.loads(policy_code) + response = await self._fga_client.write_authorization_model( + WriteAuthorizationModelRequest(**model) + ) + self._policy_version = response.authorization_model_id + logger.info(f"Successfully set policy with ID: {self._policy_version}") + return response + except json.JSONDecodeError: + logger.error(f"Invalid policy code: {policy_code}") + raise + except Exception as e: + logger.error(f"Error setting policy: {str(e)}") + raise + + @retry(**RETRY_CONFIG) + async def get_policy(self, policy_id: str) -> Optional[str]: + try: + response = await self._fga_client.read_authorization_model(policy_id) + return json.dumps(response.authorization_model.dict()) + except Exception as e: + logger.error(f"Error getting policy: {str(e)}") + return None + + @retry(**RETRY_CONFIG) + async def get_policies(self) -> Optional[Dict[str, str]]: + try: + response = await self._fga_client.read_authorization_models() + return { + model.id: json.dumps(model.dict()) + for model in response.authorization_models + } + except Exception as e: + logger.error(f"Error getting policies: {str(e)}") + return None + + async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = None): + logger.warning("Deleting policies is not supported in OpenFGA") + + @retry(**RETRY_CONFIG) + async def set_policy_data( + self, + policy_data: JsonableValue, + path: str = "", + transaction_id: Optional[str] = None, + ): + try: + tuples = [ + ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) + for t in policy_data + ] + body = ClientWriteRequest(writes=tuples) + await self._fga_client.write(body) + logger.info(f"Successfully set policy data for path: {path}") + return None + except Exception as e: + logger.error(f"Error setting policy data: {str(e)}") + raise + + @retry(**RETRY_CONFIG) + async def delete_policy_data( + self, path: str = "", transaction_id: Optional[str] = None + ): + try: + body = ClientWriteRequest(deletes=[ClientTuple(object=path)]) + await self._fga_client.write(body) + logger.info(f"Successfully deleted policy data for path: {path}") + return None + except Exception as e: + logger.error(f"Error deleting policy data: {str(e)}") + raise + + @retry(**RETRY_CONFIG) + async def get_data(self, path: str) -> Dict: + try: + response = await self._fga_client.read(ClientReadRequest(object=path)) + return {"tuples": [tuple.dict() for tuple in response.tuples]} + except Exception as e: + logger.error(f"Error getting data: {str(e)}") + return {} + + async def patch_policy_data( + self, + policy_data: List[Dict], + path: str = "", + transaction_id: Optional[str] = None, + ): + # OpenFGA doesn't have a direct patch operation, so we'll implement it as a write + try: + tuples = [ + ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) + for t in policy_data + ] + body = ClientWriteRequest(writes=tuples) + await self._fga_client.write(body) + logger.info(f"Successfully patched policy data for path: {path}") + return None + except Exception as e: + logger.error(f"Error patching policy data: {str(e)}") + raise + + @retry(**RETRY_CONFIG) + async def check_permission(self, user: str, relation: str, object: str) -> bool: + try: + response = await self._fga_client.check( + ClientCheckRequest(user=user, relation=relation, object=object) + ) + return response.allowed + except Exception as e: + logger.error(f"Error checking permission: {str(e)}") + return False + + async def log_transaction(self, transaction: StoreTransaction): + if transaction.transaction_type == TransactionType.policy: + self._most_recent_policy_transaction = transaction + if transaction.success: + self._had_successful_policy_transaction = True + elif transaction.transaction_type == TransactionType.data: + self._most_recent_data_transaction = transaction + if transaction.success: + self._had_successful_data_transaction = True + + async def is_ready(self) -> bool: + return ( + self._had_successful_policy_transaction + and self._had_successful_data_transaction + ) + + async def is_healthy(self) -> bool: + return ( + self._most_recent_policy_transaction is not None + and self._most_recent_policy_transaction.success + ) and ( + self._most_recent_data_transaction is not None + and self._most_recent_data_transaction.success + ) + + async def full_export(self, writer: AsyncTextIOWrapper) -> None: + policies = await self.get_policies() + data = await self.get_data("") + await writer.write( + json.dumps({"policies": policies, "data": data}, default=str) + ) + + async def full_import(self, reader: AsyncTextIOWrapper) -> None: + import_data = json.loads(await reader.read()) + + for policy_id, policy_code in import_data["policies"].items(): + await self.set_policy(policy_id, policy_code) + + await self.set_policy_data(import_data["data"]) + + async def get_policy_version(self) -> Optional[str]: + return self._policy_version + + async def set_policies( + self, bundle: PolicyBundle, transaction_id: Optional[str] = None + ): + for policy in bundle.policy_modules: + await self.set_policy(policy.path, policy.rego) + + self._policy_version = bundle.hash \ No newline at end of file From 59b2f89d6a19b6955f5f0150d0025531e8bc3056 Mon Sep 17 00:00:00 2001 From: daveads Date: Sun, 20 Oct 2024 11:09:00 +0100 Subject: [PATCH 3/9] init test** --- packages/opal-client/opal_client/client.py | 24 +++++- packages/opal-client/opal_client/config.py | 26 ++++++- .../opal-client/opal_client/engine/options.py | 78 +++++++++++++++++++ .../opal-client/opal_client/engine/runner.py | 50 +++++++++++- 4 files changed, 173 insertions(+), 5 deletions(-) diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index be8e5ca4..b52e05cc 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -19,8 +19,8 @@ from opal_client.data.api import init_data_router from opal_client.data.fetcher import DataFetcher from opal_client.data.updater import DataUpdater -from opal_client.engine.options import CedarServerOptions, OpaServerOptions -from opal_client.engine.runner import CedarRunner, OpaRunner +from opal_client.engine.options import CedarServerOptions, OpaServerOptions, OpenFGAServerOptions +from opal_client.engine.runner import CedarRunner, OpaRunner, OpenFGARunner from opal_client.limiter import StartupLoadLimiter from opal_client.policy.api import init_policy_router from opal_client.policy.updater import PolicyUpdater @@ -47,6 +47,8 @@ def __init__( policy_updater: PolicyUpdater = None, inline_opa_enabled: bool = None, inline_opa_options: OpaServerOptions = None, + inline_openfga_enabled: bool = None, + inline_openfga_options: OpenFGAServerOptions = None, inline_cedar_enabled: bool = None, inline_cedar_options: CedarServerOptions = None, verifier: Optional[JWTVerifier] = None, @@ -72,6 +74,11 @@ def __init__( inline_opa_enabled: bool = ( inline_opa_enabled or opal_client_config.INLINE_OPA_ENABLED ) + + inline_openfga_enabled: bool = ( + inline_openfga_enabled or opal_client_config.INLINE_OPENFGA_ENABLED + ) + inline_cedar_enabled: bool = ( inline_cedar_enabled or opal_client_config.INLINE_CEDAR_ENABLED ) @@ -192,7 +199,8 @@ def _init_engine_runner( inline_cedar_enabled: bool, inline_opa_options: Optional[OpaServerOptions] = None, inline_cedar_options: Optional[CedarServerOptions] = None, - ) -> Union[OpaRunner, CedarRunner, Literal[False]]: + inline_openfga_options: Optional[OpenFGAServerOptions] = None, + ) -> Union[OpaRunner, CedarRunner, OpenFGARunner, Literal[False]]: if inline_opa_enabled and self.policy_store_type == PolicyStoreTypes.OPA: inline_opa_options = ( inline_opa_options or opal_client_config.INLINE_OPA_CONFIG @@ -230,6 +238,16 @@ def _init_engine_runner( piped_logs_format=opal_client_config.INLINE_CEDAR_LOG_FORMAT, ) + elif inline_openfga_enabled and self.policy_store_type == PolicyStoreTypes.OPENFGA: + inline_openfga_options = ( + inline_openfga_options or opal_client_config.INLINE_OPENFGA_CONFIG + ) + return OpenFGARunner.setup_openfga_runner( + options=inline_openfga_options, + piped_logs_format=opal_client_config.INLINE_OPENFGA_LOG_FORMAT, + rehydration_callbacks=rehydration_callbacks, + ) + return False def _init_fast_api_app(self): diff --git a/packages/opal-client/opal_client/config.py b/packages/opal-client/opal_client/config.py index 58d7ae2c..9373fd12 100644 --- a/packages/opal-client/opal_client/config.py +++ b/packages/opal-client/opal_client/config.py @@ -1,6 +1,6 @@ from enum import Enum -from opal_client.engine.options import CedarServerOptions, OpaServerOptions +from opal_client.engine.options import CedarServerOptions, OpaServerOptions, OpenFGAServerOptions from opal_client.policy.options import ConnRetryOptions from opal_client.policy_store.schemas import PolicyStoreAuth, PolicyStoreTypes from opal_common.confi import Confi, confi @@ -24,6 +24,9 @@ class OpalClientConfig(Confi): ) POLICY_STORE_URL = confi.str("POLICY_STORE_URL", "http://localhost:8181") + #openfga + OPENFGA_URL = confi.str("OPENFGA_URL", "http://localhost:8080") + POLICY_STORE_AUTH_TYPE = confi.enum( "POLICY_STORE_AUTH_TYPE", PolicyStoreAuth, PolicyStoreAuth.NONE ) @@ -165,6 +168,22 @@ def load_policy_store(): "INLINE_CEDAR_LOG_FORMAT", EngineLogFormat, EngineLogFormat.NONE ) + + # OpenFGA runner configuration + INLINE_OPENFGA_ENABLED = confi.bool("INLINE_OPENFGA_ENABLED", True) + + INLINE_OPENFGA_CONFIG = confi.model( + "INLINE_OPENFGA_CONFIG", + OpenFGAServerOptions, + {}, # defaults are being set according to OpenFGAServerOptions pydantic definitions (see class) + description="cli options used when running OpenFGA inline", + ) + + INLINE_OPENFGA_LOG_FORMAT: EngineLogFormat = confi.enum( + "INLINE_OPENFGA_LOG_FORMAT", EngineLogFormat, EngineLogFormat.NONE + ) + + # configuration for fastapi routes ALLOWED_ORIGINS = ["*"] @@ -334,6 +353,11 @@ def on_load(self): opal_common_config.LOG_MODULE_EXCLUDE_LIST = ( opal_common_config.LOG_MODULE_EXCLUDE_LIST ) + + # Set the appropriate URL based on the policy store type + if self.POLICY_STORE_TYPE == PolicyStoreTypes.OPENFGA: + self.POLICY_STORE_URL = self.OPENFGA_URL + if self.DATA_STORE_CONN_RETRY is not None: # You should use `DATA_UPDATER_CONN_RETRY`, but that's for backwards compatibility diff --git a/packages/opal-client/opal_client/engine/options.py b/packages/opal-client/opal_client/engine/options.py index 370424e6..68cf431a 100644 --- a/packages/opal-client/opal_client/engine/options.py +++ b/packages/opal-client/opal_client/engine/options.py @@ -83,6 +83,84 @@ def get_opa_startup_files(self) -> str: return " ".join(files) +class OpenFGAServerOptions(BaseModel): + """Options to configure the OpenFGA server (apply when choosing to run OpenFGA inline).""" + + addr: str = Field( + ":8080", + description="listening address of the OpenFGA server (e.g., [ip]: for TCP)", + ) + authentication: AuthenticationScheme = Field( + AuthenticationScheme.off, + description="OpenFGA authentication scheme (default off)", + ) + authentication_token: Optional[str] = Field( + None, + description="If authentication is 'token', this specifies the token to use.", + ) + store_id: Optional[str] = Field( + None, + description="The OpenFGA store ID to use.", + ) + + class Config: + use_enum_values = True + allow_population_by_field_name = True + + @classmethod + def alias_generator(cls, string: str) -> str: + return "--{}".format(string.replace("_", "-")) + + @validator("authentication") + def validate_authentication(cls, v: AuthenticationScheme): + if v not in [AuthenticationScheme.off, AuthenticationScheme.token]: + raise ValueError("Invalid AuthenticationScheme for OpenFGA.") + return v + + @validator("authentication_token") + def validate_authentication_token(cls, v: Optional[str], values: dict[str, Any]): + if values["authentication"] == AuthenticationScheme.token and v is None: + raise ValueError( + "A token must be specified for AuthenticationScheme.token." + ) + return v + + def get_cmdline(self) -> str: + result = [ + "openfga-server", # Assuming there's an openfga-server command + ] + if ( + self.authentication == AuthenticationScheme.token + and self.authentication_token is not None + ): + result += [ + "--token", + self.authentication_token, + ] + addr = self.addr.split(":", 1) + port = None + if len(addr) == 1: + listen_address = addr[0] + elif len(addr) == 2: + listen_address, port = addr + if len(listen_address) == 0: + listen_address = "0.0.0.0" + result += [ + "--addr", + listen_address, + ] + if port is not None: + result += [ + "--port", + port, + ] + if self.store_id is not None: + result += [ + "--store-id", + self.store_id, + ] + return " ".join(result) + class CedarServerOptions(BaseModel): """Options to configure the Cedar agent (apply when choosing to run Cedar inline).""" diff --git a/packages/opal-client/opal_client/engine/runner.py b/packages/opal-client/opal_client/engine/runner.py index 9cca62c2..4d88136a 100644 --- a/packages/opal-client/opal_client/engine/runner.py +++ b/packages/opal-client/opal_client/engine/runner.py @@ -7,7 +7,7 @@ import psutil from opal_client.config import EngineLogFormat from opal_client.engine.logger import log_engine_output_opa, log_engine_output_simple -from opal_client.engine.options import CedarServerOptions, OpaServerOptions +from opal_client.engine.options import CedarServerOptions, OpaServerOptions, OpenFGAServerOptions from opal_client.logger import logger from tenacity import retry, wait_random_exponential @@ -281,6 +281,54 @@ def setup_opa_runner( return opa_runner +class OpenFGARunner(PolicyEngineRunner): + def __init__( + self, + options: Optional[OpenFGAServerOptions] = None, + piped_logs_format: EngineLogFormat = EngineLogFormat.NONE, + ): + super().__init__(piped_logs_format) + self._options = options or OpenFGAServerOptions() + + async def handle_log_line(self, line: bytes) -> bool: + await log_engine_output_simple(line) + return False + + @property + def command(self) -> str: + return self._options.get_cmdline() + + @staticmethod + def setup_openfga_runner( + options: Optional[OpenFGAServerOptions] = None, + piped_logs_format: EngineLogFormat = EngineLogFormat.NONE, + initial_start_callbacks: Optional[List[AsyncCallback]] = None, + rehydration_callbacks: Optional[List[AsyncCallback]] = None, + ): + """Factory for OpenFGARunner, accept optional callbacks to run in certain + lifecycle events. + + Initial Start Callbacks: + The first time we start the engine, we might want to do certain actions (like launch tasks) + that are dependent on the policy store being up (such as PolicyUpdater, DataUpdater). + + Rehydration Callbacks: + when the engine restarts, its cache is clean and it does not have the state necessary + to handle authorization queries. therefore it is necessary that we rehydrate the + cache with fresh state fetched from the server. + """ + openfga_runner = OpenFGARunner(options=options, piped_logs_format=piped_logs_format) + + if initial_start_callbacks: + openfga_runner.register_process_initial_start_callbacks( + initial_start_callbacks + ) + + if rehydration_callbacks: + openfga_runner.register_process_restart_callbacks(rehydration_callbacks) + + return openfga_runner + class CedarRunner(PolicyEngineRunner): def __init__( self, From b84bd889dee5eb5ca964f34cee2d7a5fca3ea05d Mon Sep 17 00:00:00 2001 From: daveads Date: Sun, 20 Oct 2024 11:10:01 +0100 Subject: [PATCH 4/9] inital test --- .../policy_store/policy_store_client_factory.py | 12 ++++++++++++ .../opal_client/policy_store/schemas.py | 14 ++++---------- packages/opal-client/requires.txt | 1 + 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/packages/opal-client/opal_client/policy_store/policy_store_client_factory.py b/packages/opal-client/opal_client/policy_store/policy_store_client_factory.py index 848925af..fbf32f50 100644 --- a/packages/opal-client/opal_client/policy_store/policy_store_client_factory.py +++ b/packages/opal-client/opal_client/policy_store/policy_store_client_factory.py @@ -152,6 +152,18 @@ def create( cedar_auth_token=store_token, auth_type=auth_type, ) + + #Openfga + elif PolicyStoreTypes.OPENFGA == store_type: + from opal_client.policy_store.openfga_client import OpenFGAClient + + res = OpenFGAClient( + openfga_server_url=url, + openfga_auth_token=store_token, + auth_type=auth_type, + store_id=opal_client_config.OPENFGA_STORE_ID, + ) + # MOCK elif PolicyStoreTypes.MOCK == store_type: from opal_client.policy_store.mock_policy_store_client import ( diff --git a/packages/opal-client/opal_client/policy_store/schemas.py b/packages/opal-client/opal_client/policy_store/schemas.py index f2a0514a..29859b64 100644 --- a/packages/opal-client/opal_client/policy_store/schemas.py +++ b/packages/opal-client/opal_client/policy_store/schemas.py @@ -1,30 +1,26 @@ from enum import Enum from typing import Optional - from pydantic import BaseModel, Field, validator - class PolicyStoreTypes(Enum): OPA = "OPA" CEDAR = "CEDAR" + OPENFGA = "OPENFGA" MOCK = "MOCK" - class PolicyStoreAuth(Enum): NONE = "none" TOKEN = "token" OAUTH = "oauth" TLS = "tls" - class PolicyStoreDetails(BaseModel): """ - represents a policy store endpoint - contains the policy store's: + Represents a policy store endpoint - contains the policy store's: - location (url) - type - credentials """ - type: PolicyStoreTypes = Field( PolicyStoreTypes.OPA, description="the type of policy store, currently only OPA is officially supported", @@ -32,17 +28,15 @@ class PolicyStoreDetails(BaseModel): url: str = Field( ..., description="the url that OPA can be found in. if localhost is the host - " - "it means OPA is on the same hostname as OPAL client.", + "it means OPA is on the same hostname as OPAL client.", ) token: Optional[str] = Field( None, description="optional access token required by the policy store" ) - auth_type: PolicyStoreAuth = Field( PolicyStoreAuth.NONE, description="the type of authentication is supported for the policy store.", ) - oauth_client_id: Optional[str] = Field( None, description="optional OAuth client id required by the policy store" ) @@ -63,4 +57,4 @@ def force_enum(cls, v): class Config: use_enum_values = True - allow_population_by_field_name = True + allow_population_by_field_name = True \ No newline at end of file diff --git a/packages/opal-client/requires.txt b/packages/opal-client/requires.txt index 0fb2499e..4c32a8ec 100644 --- a/packages/opal-client/requires.txt +++ b/packages/opal-client/requires.txt @@ -4,3 +4,4 @@ psutil>=5.9.1,<6 tenacity>=8.0.1,<9 dpath>=2.1.5,<3 jsonpatch>=1.33,<2 +openfga-sdk==0.7.2 From 077884193e334c4bd340005a9ef8bec12c9adf77 Mon Sep 17 00:00:00 2001 From: daveads Date: Mon, 21 Oct 2024 07:28:55 +0100 Subject: [PATCH 5/9] configs --- packages/opal-client/opal_client/client.py | 2 ++ packages/opal-client/opal_client/config.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index b52e05cc..0090553d 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -158,6 +158,7 @@ def __init__( inline_cedar_enabled, inline_opa_options, inline_cedar_options, + inline_openfga_options, ) custom_ssl_context = get_custom_ssl_context() @@ -197,6 +198,7 @@ def _init_engine_runner( self, inline_opa_enabled: bool, inline_cedar_enabled: bool, + inline_openfga_enabled: bool, inline_opa_options: Optional[OpaServerOptions] = None, inline_cedar_options: Optional[CedarServerOptions] = None, inline_openfga_options: Optional[OpenFGAServerOptions] = None, diff --git a/packages/opal-client/opal_client/config.py b/packages/opal-client/opal_client/config.py index 9373fd12..9660ba9d 100644 --- a/packages/opal-client/opal_client/config.py +++ b/packages/opal-client/opal_client/config.py @@ -26,10 +26,12 @@ class OpalClientConfig(Confi): #openfga OPENFGA_URL = confi.str("OPENFGA_URL", "http://localhost:8080") + OPENFGA_STORE_ID = confi.str("OPENFGA_STORE_ID", None, description="The OpenFGA store ID to use") POLICY_STORE_AUTH_TYPE = confi.enum( "POLICY_STORE_AUTH_TYPE", PolicyStoreAuth, PolicyStoreAuth.NONE ) + POLICY_STORE_AUTH_TOKEN = confi.str( "POLICY_STORE_AUTH_TOKEN", None, From 2c30962cc1e248af74011ee36baee84bacd65db0 Mon Sep 17 00:00:00 2001 From: daveads Date: Mon, 21 Oct 2024 15:20:50 +0100 Subject: [PATCH 6/9] fix... --- .../policy_store/openfga_client.py | 83 ++++++++++--------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/packages/opal-client/opal_client/policy_store/openfga_client.py b/packages/opal-client/opal_client/policy_store/openfga_client.py index 22b285b7..2a52bd2f 100644 --- a/packages/opal-client/opal_client/policy_store/openfga_client.py +++ b/packages/opal-client/opal_client/policy_store/openfga_client.py @@ -17,7 +17,9 @@ from opal_common.schemas.store import StoreTransaction, TransactionType from tenacity import retry, stop_after_attempt, wait_exponential from openfga_sdk import OpenFgaClient as SDKOpenFgaClient, ClientConfiguration -from openfga_sdk.client.models import ClientWriteRequest, ClientTuple, WriteAuthorizationModelRequest, ClientReadRequest, ClientCheckRequest +from openfga_sdk.client.models import ClientWriteRequest, ClientTuple, ClientCheckRequest, ClientListObjectsRequest +from openfga_sdk.models import WriteAuthorizationModelRequest, TupleKey, ReadRequest + RETRY_CONFIG = { "stop": stop_after_attempt(5), @@ -34,7 +36,7 @@ def __init__( ): base_url = openfga_server_url or opal_client_config.POLICY_STORE_URL self._openfga_url = base_url - self._store_id = store_id + self._store_id = store_id or opal_client_config.OPENFGA_STORE_ID self._policy_version: Optional[str] = None self._lock = asyncio.Lock() self._token = openfga_auth_token @@ -67,21 +69,55 @@ async def set_policy( policy_code: str, transaction_id: Optional[str] = None, ): + logger.debug(f"Attempting to set policy with ID {policy_id}: {policy_code}") try: - model = json.loads(policy_code) + policy = json.loads(policy_code) response = await self._fga_client.write_authorization_model( - WriteAuthorizationModelRequest(**model) + WriteAuthorizationModelRequest(**policy) ) self._policy_version = response.authorization_model_id logger.info(f"Successfully set policy with ID: {self._policy_version}") return response except json.JSONDecodeError: - logger.error(f"Invalid policy code: {policy_code}") + logger.error(f"Invalid policy code (not valid JSON): {policy_code}") raise except Exception as e: logger.error(f"Error setting policy: {str(e)}") raise + @retry(**RETRY_CONFIG) + async def set_policy_data( + self, + policy_data: JsonableValue, + path: str = "", + transaction_id: Optional[str] = None, + ): + logger.debug(f"Attempting to set policy data: {json.dumps(policy_data, indent=2)}") + try: + if not policy_data: + logger.warning("Received empty policy data. Skipping operation.") + return None + + tuples = [ + ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) + for t in policy_data + if all(k in t for k in ("user", "relation", "object")) + ] + logger.debug(f"Created tuples: {tuples}") + + if not tuples: + logger.warning("No valid tuples found in policy data. Skipping operation.") + return None + + body = ClientWriteRequest(writes=tuples) + logger.debug(f"Sending write request to OpenFGA: {body}") + await self._fga_client.write(body) + logger.info(f"Successfully set policy data for path: {path}") + return None + except Exception as e: + logger.error(f"Error setting policy data: {str(e)}") + raise + @retry(**RETRY_CONFIG) async def get_policy(self, policy_id: str) -> Optional[str]: try: @@ -106,26 +142,6 @@ async def get_policies(self) -> Optional[Dict[str, str]]: async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = None): logger.warning("Deleting policies is not supported in OpenFGA") - @retry(**RETRY_CONFIG) - async def set_policy_data( - self, - policy_data: JsonableValue, - path: str = "", - transaction_id: Optional[str] = None, - ): - try: - tuples = [ - ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) - for t in policy_data - ] - body = ClientWriteRequest(writes=tuples) - await self._fga_client.write(body) - logger.info(f"Successfully set policy data for path: {path}") - return None - except Exception as e: - logger.error(f"Error setting policy data: {str(e)}") - raise - @retry(**RETRY_CONFIG) async def delete_policy_data( self, path: str = "", transaction_id: Optional[str] = None @@ -142,7 +158,7 @@ async def delete_policy_data( @retry(**RETRY_CONFIG) async def get_data(self, path: str) -> Dict: try: - response = await self._fga_client.read(ClientReadRequest(object=path)) + response = await self._fga_client.read(ReadRequest(object=path)) return {"tuples": [tuple.dict() for tuple in response.tuples]} except Exception as e: logger.error(f"Error getting data: {str(e)}") @@ -155,18 +171,7 @@ async def patch_policy_data( transaction_id: Optional[str] = None, ): # OpenFGA doesn't have a direct patch operation, so we'll implement it as a write - try: - tuples = [ - ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) - for t in policy_data - ] - body = ClientWriteRequest(writes=tuples) - await self._fga_client.write(body) - logger.info(f"Successfully patched policy data for path: {path}") - return None - except Exception as e: - logger.error(f"Error patching policy data: {str(e)}") - raise + return await self.set_policy_data(policy_data, path, transaction_id) @retry(**RETRY_CONFIG) async def check_permission(self, user: str, relation: str, object: str) -> bool: @@ -226,6 +231,6 @@ async def set_policies( self, bundle: PolicyBundle, transaction_id: Optional[str] = None ): for policy in bundle.policy_modules: - await self.set_policy(policy.path, policy.rego) + await self.set_policy(policy.path, json.dumps(policy.rego)) self._policy_version = bundle.hash \ No newline at end of file From 646a244a1aeadb4d6222f2d12af3066668ddbf17 Mon Sep 17 00:00:00 2001 From: daveads Date: Mon, 21 Oct 2024 16:48:40 +0100 Subject: [PATCH 7/9] packages/opal-common/opal_common/engine/paths.py --- packages/opal-common/opal_common/engine/paths.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/opal-common/opal_common/engine/paths.py b/packages/opal-common/opal_common/engine/paths.py index ce31c8ae..e13a389e 100644 --- a/packages/opal-common/opal_common/engine/paths.py +++ b/packages/opal-common/opal_common/engine/paths.py @@ -1,8 +1,8 @@ from pathlib import Path -from opal_common.config import opal_common_config +from opal_common.config import opal_common_config def is_data_module(path: Path) -> bool: """Only json files named `data.json` can be included in official OPA bundles as static data files. @@ -13,8 +13,7 @@ def is_data_module(path: Path) -> bool: def is_policy_module(path: Path) -> bool: - """Checks if a given path points to a rego file (extension == .rego). - - Only rego files are allowed in official OPA bundles as policy files. + """Checks if a given path points to a rego file (extension == .rego) or a JSON file for OpenFGA. """ - return path.suffix in opal_common_config.POLICY_REPO_POLICY_EXTENSIONS + + return path.suffix in opal_common_config.POLICY_REPO_POLICY_EXTENSIONS or path.suffix == '.json' From d29c71e66d78aefe0e2da8dc6b8acaca7a35e7cf Mon Sep 17 00:00:00 2001 From: daveads Date: Mon, 21 Oct 2024 16:50:37 +0100 Subject: [PATCH 8/9] test... --- .../opal-client/opal_client/data/updater.py | 45 +++++++++++-------- .../opal-client/opal_client/policy/updater.py | 18 +++++++- .../policy_store/openfga_client.py | 27 ++++++----- 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/packages/opal-client/opal_client/data/updater.py b/packages/opal-client/opal_client/data/updater.py index e288b596..55837901 100644 --- a/packages/opal-client/opal_client/data/updater.py +++ b/packages/opal-client/opal_client/data/updater.py @@ -20,6 +20,7 @@ BasePolicyStoreClient, JsonableValue, ) +from opal_client.policy_store.openfga_client import OpenFGAClient from opal_client.policy_store.policy_store_client_factory import ( DEFAULT_POLICY_STORE_GETTER, ) @@ -471,25 +472,33 @@ async def _store_fetched_update(self, update_item): ) try: - if ( - opal_client_config.SPLIT_ROOT_DATA - and policy_store_path in ("/", "") - and isinstance(policy_data, dict) - ): - await self._set_split_policy_data( - store_transaction, - url=url, - save_method=entry.save_method, - data=policy_data, - ) + if isinstance(self._policy_store, OpenFGAClient): + # OpenFGA expects a list of tuples + if isinstance(policy_data, list): + await store_transaction.set_policy_data(policy_data, path=policy_store_path) + else: + logger.warning(f"Skipping non-list data for OpenFGA: {policy_store_path}") else: - await self._set_policy_data( - store_transaction, - url=url, - path=policy_store_path, - save_method=entry.save_method, - data=policy_data, - ) + + if ( + opal_client_config.SPLIT_ROOT_DATA + and policy_store_path in ("/", "") + and isinstance(policy_data, dict) + ): + await self._set_split_policy_data( + store_transaction, + url=url, + save_method=entry.save_method, + data=policy_data, + ) + else: + await self._set_policy_data( + store_transaction, + url=url, + path=policy_store_path, + save_method=entry.save_method, + data=policy_data, + ) # No exception we we're able to save to the policy-store report.saved = True # save the report for the entry diff --git a/packages/opal-client/opal_client/policy/updater.py b/packages/opal-client/opal_client/policy/updater.py index 57d93099..8a3a44db 100644 --- a/packages/opal-client/opal_client/policy/updater.py +++ b/packages/opal-client/opal_client/policy/updater.py @@ -1,6 +1,6 @@ import asyncio from typing import List, Optional - +import json import pydantic from fastapi_websocket_pubsub import PubSubClient from fastapi_websocket_rpc.rpc_channel import RpcChannel @@ -15,6 +15,7 @@ from opal_client.policy_store.policy_store_client_factory import ( DEFAULT_POLICY_STORE_GETTER, ) +from opal_client.policy_store.openfga_client import OpenFGAClient from opal_common.async_utils import TakeANumberQueue, TasksPool from opal_common.config import opal_common_config from opal_common.schemas.data import DataUpdateReport @@ -333,7 +334,20 @@ async def update_policy( error=bundle_error, ) if bundle: - await store_transaction.set_policies(bundle) + + if isinstance(self._policy_store, OpenFGAClient): + for policy_module in bundle.policy_modules: + if policy_module.path.endswith('.json'): + try: + policy_data = json.loads(policy_module.rego) + await store_transaction.set_policy(policy_module.path, json.dumps(policy_data)) + except json.JSONDecodeError: + logger.error(f"Invalid JSON in OpenFGA policy file: {policy_module.path}") + else: + logger.warning(f"Skipping non-JSON file for OpenFGA: {policy_module.path}") + else: + await store_transaction.set_policies(bundle) + # if we got here, we did not throw during the transaction if self._should_send_reports: # spin off reporting (no need to wait on it) diff --git a/packages/opal-client/opal_client/policy_store/openfga_client.py b/packages/opal-client/opal_client/policy_store/openfga_client.py index 2a52bd2f..baf17e70 100644 --- a/packages/opal-client/opal_client/policy_store/openfga_client.py +++ b/packages/opal-client/opal_client/policy_store/openfga_client.py @@ -64,19 +64,26 @@ async def _get_auth_headers(self) -> Dict[str, str]: @retry(**RETRY_CONFIG) async def set_policy( - self, - policy_id: str, - policy_code: str, - transaction_id: Optional[str] = None, - ): + self, + policy_id: str, + policy_code: str, + transaction_id: Optional[str] = None, +): logger.debug(f"Attempting to set policy with ID {policy_id}: {policy_code}") try: policy = json.loads(policy_code) - response = await self._fga_client.write_authorization_model( - WriteAuthorizationModelRequest(**policy) - ) - self._policy_version = response.authorization_model_id - logger.info(f"Successfully set policy with ID: {self._policy_version}") + if 'schema_version' in policy: + # This is an authorization model + response = await self._fga_client.write_authorization_model( + WriteAuthorizationModelRequest(**policy) + ) + self._policy_version = response.authorization_model_id + logger.info(f"Successfully set OpenFGA authorization model with ID: {self._policy_version}") + else: + # This is relationship tuples data + tuples = [ClientTuple(**t) for t in policy] + response = await self._fga_client.write(ClientWriteRequest(writes=tuples)) + logger.info(f"Successfully set OpenFGA relationship tuples for {policy_id}") return response except json.JSONDecodeError: logger.error(f"Invalid policy code (not valid JSON): {policy_code}") From f802eb2897aea503df8dcfffa4ab8e2c471a86c5 Mon Sep 17 00:00:00 2001 From: daveads Date: Fri, 25 Oct 2024 14:33:59 +0100 Subject: [PATCH 9/9] implemented using api calls drop openfga_sdk --- .../policy_store/openfga_client.py | 393 ++++++++++++------ 1 file changed, 264 insertions(+), 129 deletions(-) diff --git a/packages/opal-client/opal_client/policy_store/openfga_client.py b/packages/opal-client/opal_client/policy_store/openfga_client.py index baf17e70..b4383cae 100644 --- a/packages/opal-client/opal_client/policy_store/openfga_client.py +++ b/packages/opal-client/opal_client/policy_store/openfga_client.py @@ -1,6 +1,6 @@ import asyncio import json -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Any from urllib.parse import quote_plus import aiohttp @@ -16,16 +16,83 @@ from opal_common.schemas.policy import PolicyBundle from opal_common.schemas.store import StoreTransaction, TransactionType from tenacity import retry, stop_after_attempt, wait_exponential -from openfga_sdk import OpenFgaClient as SDKOpenFgaClient, ClientConfiguration -from openfga_sdk.client.models import ClientWriteRequest, ClientTuple, ClientCheckRequest, ClientListObjectsRequest -from openfga_sdk.models import WriteAuthorizationModelRequest, TupleKey, ReadRequest - RETRY_CONFIG = { "stop": stop_after_attempt(5), "wait": wait_exponential(multiplier=1, min=4, max=10), } +class OpenFGATransactionLogState: + """State tracker for OpenFGA transactions and health checks.""" + + def __init__( + self, + data_updater_enabled: bool = True, + policy_updater_enabled: bool = True, + ): + self._data_updater_disabled = not data_updater_enabled + self._policy_updater_disabled = not policy_updater_enabled + self._num_successful_policy_transactions = 0 + self._num_failed_policy_transactions = 0 + self._num_successful_data_transactions = 0 + self._num_failed_data_transactions = 0 + self._last_policy_transaction: Optional[StoreTransaction] = None + self._last_failed_policy_transaction: Optional[StoreTransaction] = None + self._last_data_transaction: Optional[StoreTransaction] = None + self._last_failed_data_transaction: Optional[StoreTransaction] = None + + @property + def ready(self) -> bool: + return self._num_successful_policy_transactions > 0 and ( + self._data_updater_disabled or self._num_successful_data_transactions > 0 + ) + + @property + def healthy(self) -> bool: + policy_updater_is_healthy: bool = ( + self._last_policy_transaction is not None + and self._last_policy_transaction.success + ) + data_updater_is_healthy: bool = ( + self._last_data_transaction is not None + and self._last_data_transaction.success + ) + is_healthy: bool = ( + self._policy_updater_disabled or policy_updater_is_healthy + ) and (self._data_updater_disabled or data_updater_is_healthy) + + if is_healthy: + logger.debug( + f"OpenFGA client health: {is_healthy} (policy: {policy_updater_is_healthy}, data: {data_updater_is_healthy})" + ) + else: + logger.warning( + f"OpenFGA client health: {is_healthy} (policy: {policy_updater_is_healthy}, data: {data_updater_is_healthy})" + ) + + return is_healthy + + def process_transaction(self, transaction: StoreTransaction): + """Process and track transaction state.""" + logger.debug( + "processing store transaction: {transaction}", + transaction=transaction.dict(), + ) + if transaction.transaction_type == TransactionType.policy: + if transaction.success: + self._last_policy_transaction = transaction + self._num_successful_policy_transactions += 1 + else: + self._last_failed_policy_transaction = transaction + self._num_failed_policy_transactions += 1 + elif transaction.transaction_type == TransactionType.data: + if transaction.success: + self._last_data_transaction = transaction + self._num_successful_data_transactions += 1 + else: + self._last_failed_data_transaction = transaction + self._num_failed_data_transactions += 1 + class OpenFGAClient(BasePolicyStoreClient): def __init__( self, @@ -33,211 +100,279 @@ def __init__( openfga_auth_token: Optional[str] = None, auth_type: PolicyStoreAuth = PolicyStoreAuth.NONE, store_id: Optional[str] = None, + data_updater_enabled: bool = True, + policy_updater_enabled: bool = True, ): base_url = openfga_server_url or opal_client_config.POLICY_STORE_URL - self._openfga_url = base_url + self._openfga_url = base_url.rstrip('/') self._store_id = store_id or opal_client_config.OPENFGA_STORE_ID + self._base_url = f"{self._openfga_url}/stores/{self._store_id}" self._policy_version: Optional[str] = None self._lock = asyncio.Lock() self._token = openfga_auth_token self._auth_type: PolicyStoreAuth = auth_type + self._session: Optional[aiohttp.ClientSession] = None - self._had_successful_data_transaction = False - self._had_successful_policy_transaction = False - self._most_recent_data_transaction: Optional[StoreTransaction] = None - self._most_recent_policy_transaction: Optional[StoreTransaction] = None - - configuration = ClientConfiguration( - api_url=self._openfga_url, - store_id=self._store_id, + # Initialize transaction tracking + self._transaction_state = OpenFGATransactionLogState( + data_updater_enabled=data_updater_enabled, + policy_updater_enabled=policy_updater_enabled, ) - if self._auth_type == PolicyStoreAuth.TOKEN: - configuration.credentials = self._token - - self._fga_client = SDKOpenFgaClient(configuration) - async def _get_auth_headers(self) -> Dict[str, str]: - headers: Dict[str, str] = {} - if self._auth_type == PolicyStoreAuth.TOKEN and self._token is not None: - headers["Authorization"] = f"Bearer {self._token}" - return headers + async def _get_session(self) -> aiohttp.ClientSession: + """Get or create the aiohttp session with proper headers.""" + if self._session is None or self._session.closed: + headers = {"Content-Type": "application/json"} + if self._auth_type == PolicyStoreAuth.TOKEN and self._token is not None: + headers["Authorization"] = f"Bearer {self._token}" + self._session = aiohttp.ClientSession(headers=headers) + return self._session @retry(**RETRY_CONFIG) async def set_policy( - self, - policy_id: str, - policy_code: str, - transaction_id: Optional[str] = None, -): - logger.debug(f"Attempting to set policy with ID {policy_id}: {policy_code}") - try: - policy = json.loads(policy_code) - if 'schema_version' in policy: - # This is an authorization model - response = await self._fga_client.write_authorization_model( - WriteAuthorizationModelRequest(**policy) - ) - self._policy_version = response.authorization_model_id - logger.info(f"Successfully set OpenFGA authorization model with ID: {self._policy_version}") - else: - # This is relationship tuples data - tuples = [ClientTuple(**t) for t in policy] - response = await self._fga_client.write(ClientWriteRequest(writes=tuples)) - logger.info(f"Successfully set OpenFGA relationship tuples for {policy_id}") - return response - except json.JSONDecodeError: - logger.error(f"Invalid policy code (not valid JSON): {policy_code}") - raise - except Exception as e: - logger.error(f"Error setting policy: {str(e)}") - raise - - @retry(**RETRY_CONFIG) - async def set_policy_data( self, - policy_data: JsonableValue, - path: str = "", + policy_id: str, + policy_code: str, transaction_id: Optional[str] = None, ): - logger.debug(f"Attempting to set policy data: {json.dumps(policy_data, indent=2)}") + """Write an authorization model to OpenFGA.""" try: - if not policy_data: - logger.warning("Received empty policy data. Skipping operation.") - return None - - tuples = [ - ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) - for t in policy_data - if all(k in t for k in ("user", "relation", "object")) - ] - logger.debug(f"Created tuples: {tuples}") - - if not tuples: - logger.warning("No valid tuples found in policy data. Skipping operation.") - return None + logger.debug(f"Attempting to set policy with ID {policy_id}: {policy_code}") + policy = json.loads(policy_code) - body = ClientWriteRequest(writes=tuples) - logger.debug(f"Sending write request to OpenFGA: {body}") - await self._fga_client.write(body) - logger.info(f"Successfully set policy data for path: {path}") - return None + session = await self._get_session() + async with session.post( + f"{self._base_url}/authorization-models", + json=policy + ) as response: + if response.status == 201: + data = await response.json() + self._policy_version = data["authorization_model_id"] + logger.info(f"Successfully set policy with model ID: {self._policy_version}") + return data + else: + error_body = await response.text() + logger.error(f"Error setting policy: {error_body}") + raise Exception(f"Failed to set policy: HTTP {response.status}") except Exception as e: - logger.error(f"Error setting policy data: {str(e)}") + logger.error(f"Error in set_policy: {str(e)}") raise @retry(**RETRY_CONFIG) async def get_policy(self, policy_id: str) -> Optional[str]: + """Get an authorization model by ID.""" try: - response = await self._fga_client.read_authorization_model(policy_id) - return json.dumps(response.authorization_model.dict()) + session = await self._get_session() + async with session.get( + f"{self._base_url}/authorization-models/{policy_id}" + ) as response: + if response.status == 200: + data = await response.json() + return json.dumps(data["authorization_model"]) + return None except Exception as e: - logger.error(f"Error getting policy: {str(e)}") + logger.error(f"Error in get_policy: {str(e)}") return None @retry(**RETRY_CONFIG) async def get_policies(self) -> Optional[Dict[str, str]]: + """Get all authorization models.""" try: - response = await self._fga_client.read_authorization_models() - return { - model.id: json.dumps(model.dict()) - for model in response.authorization_models - } + session = await self._get_session() + async with session.get( + f"{self._base_url}/authorization-models" + ) as response: + if response.status == 200: + data = await response.json() + return { + model["id"]: json.dumps(model) + for model in data.get("authorization_models", []) + } + return None except Exception as e: - logger.error(f"Error getting policies: {str(e)}") + logger.error(f"Error in get_policies: {str(e)}") return None async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = None): + """OpenFGA does not support deletion of authorization models.""" logger.warning("Deleting policies is not supported in OpenFGA") + return None @retry(**RETRY_CONFIG) - async def delete_policy_data( - self, path: str = "", transaction_id: Optional[str] = None + async def set_policy_data( + self, + policy_data: JsonableValue, + path: str = "", + transaction_id: Optional[str] = None, ): + """Set relationship tuples in OpenFGA.""" try: - body = ClientWriteRequest(deletes=[ClientTuple(object=path)]) - await self._fga_client.write(body) - logger.info(f"Successfully deleted policy data for path: {path}") - return None + logger.debug(f"Setting policy data: {policy_data}") + + # Transform data into tuples if needed + if isinstance(policy_data, dict) and "tuples" in policy_data: + tuples = policy_data["tuples"] + elif isinstance(policy_data, list): + tuples = policy_data + else: + logger.warning(f"Invalid policy data format: {policy_data}") + return + + writes = { + "writes": { + "tuple_keys": [ + { + "user": tuple["user"], + "relation": tuple["relation"], + "object": tuple["object"] + } for tuple in tuples + ] + } + } + + session = await self._get_session() + async with session.post( + f"{self._base_url}/write", + json=writes + ) as response: + if response.status != 200: + error_body = await response.text() + raise Exception(f"Failed to set policy data: {error_body}") + logger.info(f"Successfully wrote {len(tuples)} tuples") + except Exception as e: - logger.error(f"Error deleting policy data: {str(e)}") + logger.error(f"Error in set_policy_data: {str(e)}") raise @retry(**RETRY_CONFIG) - async def get_data(self, path: str) -> Dict: + async def get_data(self, path: str = "") -> Dict: + """Get relationship tuples, optionally filtered by path.""" try: - response = await self._fga_client.read(ReadRequest(object=path)) - return {"tuples": [tuple.dict() for tuple in response.tuples]} + session = await self._get_session() + async with session.post( + f"{self._base_url}/read", + json={"tuple_key": {"object": path} if path else {}} + ) as response: + if response.status == 200: + data = await response.json() + return { + "tuples": [ + { + "user": tuple["key"]["user"], + "relation": tuple["key"]["relation"], + "object": tuple["key"]["object"] + } + for tuple in data.get("tuples", []) + ] + } + return {} except Exception as e: - logger.error(f"Error getting data: {str(e)}") + logger.error(f"Error in get_data: {str(e)}") return {} + async def get_data_with_input(self, path: str, input_model: Any) -> Dict: + """Check authorization with context.""" + try: + input_data = input_model.dict() + session = await self._get_session() + async with session.post( + f"{self._base_url}/check", + json={ + "tuple_key": { + "user": input_data.get("user"), + "relation": input_data.get("relation"), + "object": path + }, + "context": input_data.get("context", {}) + } + ) as response: + if response.status == 200: + data = await response.json() + return { + "allowed": data.get("allowed", False), + "resolution": data.get("resolution") + } + return {"allowed": False} + except Exception as e: + logger.error(f"Error in get_data_with_input: {str(e)}") + return {"allowed": False} + async def patch_policy_data( self, policy_data: List[Dict], path: str = "", transaction_id: Optional[str] = None, ): - # OpenFGA doesn't have a direct patch operation, so we'll implement it as a write + """OpenFGA doesn't support patches directly - implementing as a write.""" return await self.set_policy_data(policy_data, path, transaction_id) - @retry(**RETRY_CONFIG) async def check_permission(self, user: str, relation: str, object: str) -> bool: + """Check if a user has a specific relation to an object.""" try: - response = await self._fga_client.check( - ClientCheckRequest(user=user, relation=relation, object=object) - ) - return response.allowed + session = await self._get_session() + async with session.post( + f"{self._base_url}/check", + json={ + "tuple_key": { + "user": user, + "relation": relation, + "object": object + } + } + ) as response: + if response.status == 200: + data = await response.json() + return data.get("allowed", False) + return False except Exception as e: logger.error(f"Error checking permission: {str(e)}") return False async def log_transaction(self, transaction: StoreTransaction): - if transaction.transaction_type == TransactionType.policy: - self._most_recent_policy_transaction = transaction - if transaction.success: - self._had_successful_policy_transaction = True - elif transaction.transaction_type == TransactionType.data: - self._most_recent_data_transaction = transaction - if transaction.success: - self._had_successful_data_transaction = True + """Log and track transaction state.""" + self._transaction_state.process_transaction(transaction) async def is_ready(self) -> bool: - return ( - self._had_successful_policy_transaction - and self._had_successful_data_transaction - ) + """Check if the client is ready to handle requests.""" + return self._transaction_state.ready async def is_healthy(self) -> bool: - return ( - self._most_recent_policy_transaction is not None - and self._most_recent_policy_transaction.success - ) and ( - self._most_recent_data_transaction is not None - and self._most_recent_data_transaction.success - ) + """Check if the client is healthy.""" + return self._transaction_state.healthy async def full_export(self, writer: AsyncTextIOWrapper) -> None: + """Export full state to a file.""" policies = await self.get_policies() - data = await self.get_data("") + data = await self.get_data() await writer.write( json.dumps({"policies": policies, "data": data}, default=str) ) async def full_import(self, reader: AsyncTextIOWrapper) -> None: + """Import full state from a file.""" import_data = json.loads(await reader.read()) - - for policy_id, policy_code in import_data["policies"].items(): + + for policy_id, policy_code in import_data.get("policies", {}).items(): await self.set_policy(policy_id, policy_code) - - await self.set_policy_data(import_data["data"]) + + if "data" in import_data: + await self.set_policy_data(import_data["data"]) async def get_policy_version(self) -> Optional[str]: + """Get the current policy version.""" return self._policy_version async def set_policies( - self, bundle: PolicyBundle, transaction_id: Optional[str] = None + self, + bundle: PolicyBundle, + transaction_id: Optional[str] = None ): + """Set policies from a bundle.""" for policy in bundle.policy_modules: - await self.set_policy(policy.path, json.dumps(policy.rego)) + await self.set_policy(policy.path, policy.rego) + self._policy_version = bundle.hash - self._policy_version = bundle.hash \ No newline at end of file + async def get_policy_module_ids(self) -> List[str]: + """Get all policy module IDs.""" + policies = await self.get_policies() + return list(policies.keys()) if policies else [] \ No newline at end of file