diff --git a/docs/about/release-notes.md b/docs/about/release-notes.md
index c7d9129d..7d936ef9 100644
--- a/docs/about/release-notes.md
+++ b/docs/about/release-notes.md
@@ -1,3 +1,10 @@
+## Version 0.3.2
+
+- Added support for performing commits
+- Added support for pulling in job status
+
+---
+
## Version 0.3.1
- Added support for Service Group objects
diff --git a/docs/sdk/client.md b/docs/sdk/client.md
index 7965c04b..5c2abe03 100644
--- a/docs/sdk/client.md
+++ b/docs/sdk/client.md
@@ -4,7 +4,7 @@
The SCM (Strata Cloud Manager) client module provides the primary interface for interacting with the Palo Alto Networks
Strata Cloud Manager API. It handles authentication, request management, and provides a clean interface for making API
-calls with proper error handling.
+calls with proper error handling and Pydantic model validation.
## SCM Client
@@ -21,7 +21,7 @@ class Scm:
client_id: str,
client_secret: str,
tsg_id: str,
- api_base_url: str,
+ api_base_url: str = "https://api.strata.paloaltonetworks.com",
log_level: str = "ERROR"
)
```
@@ -51,7 +51,7 @@ def request(self, method: str, endpoint: str, **kwargs)
-Makes a generic HTTP request to the API.
+Makes a generic HTTP request to the API and returns a Pydantic model or None.
**Parameters:**
@@ -69,7 +69,7 @@ def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs)
-Makes a GET request to the specified endpoint.
+Makes a GET request to the specified endpoint, automatically handling token refresh.
@@ -81,7 +81,7 @@ def post(self, endpoint: str, **kwargs)
-Makes a POST request to the specified endpoint.
+Makes a POST request to the specified endpoint, automatically handling token refresh.
@@ -93,7 +93,7 @@ def put(self, endpoint: str, **kwargs)
-Makes a PUT request to the specified endpoint.
+Makes a PUT request to the specified endpoint, automatically handling token refresh.
@@ -105,7 +105,43 @@ def delete(self, endpoint: str, **kwargs)
-Makes a DELETE request to the specified endpoint.
+Makes a DELETE request to the specified endpoint, automatically handling token refresh.
+
+### Job Management Methods
+
+
+
+
+
+```python
+def list_jobs(
+ self,
+ limit: int = 100,
+ offset: int = 0,
+ parent_id: Optional[str] = None
+) -> JobListResponse
+```
+
+
+
+Lists jobs with pagination support and optional parent ID filtering.
+
+
+
+
+
+```python
+def wait_for_job(
+ self,
+ job_id: str,
+ timeout: int = 300,
+ poll_interval: int = 10
+) -> Optional[JobStatusResponse]
+```
+
+
+
+Waits for a job to complete with configurable timeout and polling interval.
## Usage Examples
@@ -122,8 +158,7 @@ from scm.client import Scm
client = Scm(
client_id="your_client_id",
client_secret="your_client_secret",
- tsg_id="your_tsg_id",
- api_base_url="https://api.example.com"
+ tsg_id="your_tsg_id"
)
```
@@ -141,7 +176,6 @@ client = Scm(
client_id="your_client_id",
client_secret="your_client_secret",
tsg_id="your_tsg_id",
- api_base_url="https://api.example.com",
log_level="DEBUG" # Enable detailed logging
)
```
@@ -166,58 +200,47 @@ response = client.get(
-#### POST Request Example
+#### POST Request Example with Pydantic Models
```python
-# Create a new address object
-new_address = {
- "name": "example-address",
- "folder": "Texas",
- "ip_netmask": "192.168.1.0/24"
-}
-response = client.post(
- endpoint="/config/objects/v1/addresses",
- json=new_address
-)
-```
-
-
+from scm.models.operations import CandidatePushRequestModel
-#### PUT Request Example
-
-
-
-
+# Create a commit request
+commit_request = CandidatePushRequestModel(
+ folders=["Texas"],
+ admin=["admin@example.com"],
+ description="Initial commit"
+)
-```python
-# Update an existing address
-updated_address = {
- "name": "example-address",
- "ip_netmask": "192.168.2.0/24"
-}
-response = client.put(
- endpoint="/config/objects/v1/addresses/example-address",
- json=updated_address
+response = client.post(
+ endpoint="/config/operations/v1/config-versions/candidate:push",
+ json=commit_request.model_dump()
)
```
-#### DELETE Request Example
+#### Commit Changes Example
```python
-# Delete an address object
-response = client.delete(
- endpoint="/config/objects/v1/addresses/example-address"
+# Commit changes with synchronous wait
+response = client.commit(
+ folders=["Texas"],
+ description="Configuration update",
+ sync=True,
+ timeout=300
)
+
+if response.success:
+ print(f"Commit successful! Job ID: {response.job_id}")
```
@@ -233,11 +256,12 @@ response = client.delete(
```python
from scm.client import Scm
from scm.exceptions import (
+ APIError,
AuthenticationError,
NotFoundError,
- BadRequestError,
+ InvalidObjectError,
ServerError,
- APIError
+ TimeoutError
)
@@ -251,45 +275,33 @@ def perform_api_operations():
log_level="INFO"
)
- # Perform API operations
try:
- # List addresses
- addresses = client.get("/config/objects/v1/addresses")
- print(f"Found {len(addresses)} addresses")
-
- # Create new address
- new_address = {
- "name": "test-address",
- "folder": "Texas",
- "ip_netmask": "192.168.1.0/24"
- }
- created = client.post("/config/objects/v1/addresses", json=new_address)
- print(f"Created address: {created['name']}")
+ # Commit changes
+ response = client.commit(
+ folders=["Texas"],
+ description="Test commit",
+ sync=True
+ )
+ print(f"Commit job ID: {response.job_id}")
except AuthenticationError as e:
print(f"Authentication failed: {e.message}")
print(f"Error code: {e.error_code}")
- print(f"HTTP status: {e.http_status_code}")
- except NotFoundError as e:
- print(f"Resource not found: {e.message}")
- if e.details:
- print(f"Additional details: {e.details}")
-
- except BadRequestError as e:
- print(f"Invalid request: {e.message}")
+ except InvalidObjectError as e:
+ print(f"Invalid request data: {e.message}")
if e.details:
print(f"Validation errors: {e.details}")
+ except TimeoutError as e:
+ print(f"Operation timed out: {str(e)}")
+
except ServerError as e:
print(f"Server error occurred: {e.message}")
print(f"Status code: {e.http_status_code}")
except APIError as e:
print(f"API error occurred: {e}")
-
- except Exception as e:
- print(f"Unexpected error: {str(e)}")
```
@@ -299,20 +311,23 @@ def perform_api_operations():
### Client Configuration
1. **Logging Configuration**
+
- Use appropriate log levels for different environments
- Set to DEBUG for development and troubleshooting
- Set to ERROR or WARNING for production
2. **Error Handling**
+
- Always wrap API calls in try-except blocks
- Handle specific exceptions before generic ones
- Log error details for troubleshooting
- Include proper error recovery mechanisms
-3. **Session Management**
- - Reuse client instances when possible
- - Properly close/cleanup sessions when done
- - Handle token refresh scenarios
+3. **Token Management**
+
+ - The client automatically handles token refresh
+ - No manual token management required
+ - Tokens are refreshed when expired before requests
### Code Examples
@@ -332,43 +347,16 @@ client = Scm(
# Reuse for multiple operations
-def get_address(object_uuid: str):
- return client.get(f"/config/objects/v1/addresses/{object_uuid}")
-
-
-def create_address(address_data: dict):
- return client.post("/config/objects/v1/addresses", json=address_data)
-```
-
-
-
-#### Using with Context Managers
-
-
-
-
-
-```python
-from contextlib import contextmanager
+def commit_changes(folders: List[str], description: str):
+ return client.commit(
+ folders=folders,
+ description=description,
+ sync=True
+ )
-@contextmanager
-def scm_client_session(**kwargs):
- client = Scm(**kwargs)
- try:
- yield client
- finally:
- # Cleanup if needed
- pass
-
-
-# Usage
-with scm_client_session(
- client_id="your_client_id",
- client_secret="your_client_secret",
- tsg_id="your_tsg_id"
-) as client:
- addresses = client.get("/config/objects/v1/addresses")
+def check_job_status(job_id: str):
+ return client.get_job_status(job_id)
```
@@ -377,20 +365,19 @@ with scm_client_session(
### Authentication Failures
-- Invalid credentials
-- Expired tokens
-- Missing required parameters
+- Invalid credentials (`AuthenticationError`)
+- Expired tokens (handled automatically)
+- Missing required parameters (`InvalidObjectError`)
### Request Failures
-- Malformed requests
-- Invalid parameters
-- Missing required fields
-- Resource not found
-- Duplicate resources
+- Malformed requests (`InvalidObjectError`)
+- Invalid parameters (`InvalidObjectError`)
+- Resource not found (`NotFoundError`)
+- Operation timeouts (`TimeoutError`)
### Server Errors
-- Timeouts
-- Service unavailable
-- Internal server errors
+- Service unavailable (`ServerError`)
+- Internal server errors (`ServerError`)
+- Gateway timeouts (`GatewayTimeoutError`)
\ No newline at end of file
diff --git a/docs/sdk/config/objects/address.md b/docs/sdk/config/objects/address.md
index 8fa77282..d42855bc 100644
--- a/docs/sdk/config/objects/address.md
+++ b/docs/sdk/config/objects/address.md
@@ -225,86 +225,10 @@ print(f"Description: {desktop1.description}")
-## Full Workflow Example
-
-Here's a complete example demonstrating the full lifecycle of an address object with proper error handling:
-
-
-
-
-
-```python
-from scm.client import Scm
-from scm.config.objects import Address
-from scm.exceptions import (
- InvalidObjectError,
- NotFoundError,
- AuthenticationError,
- NameNotUniqueError
-)
-
-try:
- # Initialize client with debug logging
- client = Scm(
- client_id="your_client_id",
- client_secret="your_client_secret",
- tsg_id="your_tsg_id",
- log_level="DEBUG"
- )
-
- # Initialize address object
- addresses = Address(client)
-
- try:
- # Create new address
- create_data = {
- "name": "test_network",
- "ip_netmask": "10.0.0.0/24",
- "description": "Test network segment",
- "folder": "Texas",
- "tag": ["Python", "Automation"]
- }
-
- new_address = addresses.create(create_data)
- print(f"Created address: {new_address.name}")
-
- # Fetch the address by name
- try:
- fetched = addresses.fetch(
- name="test_network",
- folder="Texas"
- )
- print(f"Found address: {fetched.name}")
-
- # Update the address using Pydantic model
- fetched.description = "Updated test network segment"
- updated = addresses.update(fetched)
- print(f"Updated description: {updated.description}")
-
- except NotFoundError as e:
- print(f"Address not found: {e.message}")
-
- # Clean up
- addresses.delete(new_address.id)
- print("Address deleted successfully")
-
- except NameNotUniqueError as e:
- print(f"Address name conflict: {e.message}")
- except InvalidObjectError as e:
- print(f"Invalid address data: {e.message}")
- if e.details:
- print(f"Details: {e.details}")
-
-except AuthenticationError as e:
- print(f"Authentication failed: {e.message}")
- print(f"Status code: {e.http_status_code}")
-```
-
-
-
## Full script examples
-Refer to the [examples](../../../../examples/scm/config/objects) directory.
+Refer to
+the [address.py example](https://github.com/cdot65/pan-scm-sdk/blob/main/examples/scm/config/objects/address.py).
## Related Models
diff --git a/docs/sdk/config/objects/address_group.md b/docs/sdk/config/objects/address_group.md
index 8dcb0d6e..8cfb91bc 100644
--- a/docs/sdk/config/objects/address_group.md
+++ b/docs/sdk/config/objects/address_group.md
@@ -80,7 +80,7 @@ try:
"description": "Web server group",
"static": ["example_website", "webserver_network"],
"folder": "Texas",
- "tag": ["Python", "Automation"]
+ "tag": ["Automation"]
}
new_group = address_groups.create(static_group)
@@ -269,110 +269,10 @@ except MissingQueryParameterError as e:
-## Full Workflow Example
-
-Here's a complete example demonstrating the full lifecycle of an address group with proper error handling:
-
-
-
-
-
-```python
-from scm.client import Scm
-from scm.config.objects import Address, AddressGroup
-from scm.exceptions import (
- InvalidObjectError,
- NotFoundError,
- AuthenticationError,
- NameNotUniqueError,
- ReferenceNotZeroError
-)
-
-try:
- # Initialize client with debug logging
- client = Scm(
- client_id="your_client_id",
- client_secret="your_client_secret",
- tsg_id="your_tsg_id",
- log_level="DEBUG" # Enable detailed logging
- )
-
- # Initialize address and address group objects
- addresses = Address(client)
- address_groups = AddressGroup(client)
-
- try:
- # Create address objects
- ao1 = {
- "name": "test_network1",
- "ip_netmask": "10.0.0.0/24",
- "description": "Test network",
- "folder": "Texas",
- "tag": ["Automation"]
- }
- test_network1 = addresses.create(ao1)
-
- ao2 = {
- "name": "test_network2",
- "ip_netmask": "10.0.1.0/24",
- "description": "Test network",
- "folder": "Texas",
- "tag": ["Automation"]
- }
- test_network2 = addresses.create(ao2)
-
- # Create a new static group
- test_network_group = {
- "name": "test_network_group",
- "description": "Test networks",
- "static": [test_network1.name, test_network2.name],
- "folder": "Texas",
- "tag": ["Automation"]
- }
-
- new_group = address_groups.create(test_network_group)
- print(f"Created group: {new_group.name}")
-
- # Fetch and update the group
- try:
- fetched_group = address_groups.fetch(
- name="test_network_group",
- folder="Texas"
- )
- print(f"Found group: {fetched_group.name}")
-
- # Update the group using Pydantic model
- fetched_group.description = "Updated test networks"
- updated = address_groups.update(fetched_group)
- print(f"Updated description: {updated.description}")
-
- except NotFoundError as e:
- print(f"Group not found: {e.message}")
-
- # Clean up
- try:
- address_groups.delete(new_group.id)
- print("Group deleted successfully")
- except ReferenceNotZeroError as e:
- print(f"Cannot delete group - still in use: {e.message}")
-
- except NameNotUniqueError as e:
- print(f"Name conflict: {e.message}")
- except InvalidObjectError as e:
- print(f"Invalid data: {e.message}")
- if e.details:
- print(f"Details: {e.details}")
-
-except AuthenticationError as e:
- print(f"Authentication failed: {e.message}")
- print(f"Status code: {e.http_status_code}")
-```
-
-
-
## Full script examples
-Refer to the [examples](../../../../examples/scm/config/objects) directory.
+Refer to
+the [address_group.py example](https://github.com/cdot65/pan-scm-sdk/blob/main/examples/scm/config/objects/address_group.py).
## Related Models
diff --git a/docs/sdk/index.md b/docs/sdk/index.md
index 853d2735..96061ef5 100644
--- a/docs/sdk/index.md
+++ b/docs/sdk/index.md
@@ -7,9 +7,8 @@ configuration objects and data models used to interact with Palo Alto Networks S
- [Auth](auth.md)
- [Client](client.md)
-- [Exceptions](exceptions.md)
- Configuration
- - [Objects](config/objects/index)
+ - Objects
- [Address](config/objects/address.md)
- [Address Group](config/objects/address_group.md)
- [Application](config/objects/application.md)
@@ -17,7 +16,7 @@ configuration objects and data models used to interact with Palo Alto Networks S
- [Service](config/objects/service.md)
- [Service Group](config/objects/service_group.md)
- [Tag](config/objects/tag.md)
- - [Security Services](config/security_services/index)
+ - Security Services
- [Anti-Spyware Profile](config/security_services/anti_spyware_profile)
- [Decryption Profile](config/security_services/decryption_profile.md)
- [DNS Security Profile](config/security_services/dns_security_profile.md)
@@ -25,7 +24,7 @@ configuration objects and data models used to interact with Palo Alto Networks S
- [Vulnerability Protection Profile](config/security_services/vulnerability_protection_profile.md)
- [Wildfire Antivirus Profile](config/security_services/wildfire_antivirus.md)
- Data Models
- - [Objects](models/objects/index)
+ - Objects
- [Address Models](models/objects/address_models.md)
- [Address Group Models](models/objects/address_group_models.md)
- [Application Models](models/objects/application_models.md)
@@ -33,13 +32,17 @@ configuration objects and data models used to interact with Palo Alto Networks S
- [Service Models](models/objects/service_models.md)
- [Service Group Models](models/objects/service_group_models.md)
- [Tag Models](models/objects/tag_models.md)
- - [Security Services](models/security_services/index.md)
+ - Operations
+ - [Candidate Push (commit) Models](models/operations/candidate_push.md)
+ - [Jobs Models](models/operations/jobs.md)
+ - Security Services
- [Anti-Spyware Profile Models](models/security_services/anti_spyware_profile_models.md)
- [Decryption Profile Models](models/security_services/decryption_profile_models.md)
- [DNS Security Profile Models](models/security_services/dns_security_profile_models.md)
- [Security Rule Models](models/security_services/security_rule_models.md)
- [Vulnerability Protection Profile Models](models/security_services/vulnerability_protection_profile_models.md)
- [Wildfire Antivirus Profile Models](models/security_services/wildfire_antivirus_profile_models.md)
+- [Exceptions](exceptions.md)
---
diff --git a/docs/sdk/models/operations/candidate_push.md b/docs/sdk/models/operations/candidate_push.md
new file mode 100644
index 00000000..ab95e84a
--- /dev/null
+++ b/docs/sdk/models/operations/candidate_push.md
@@ -0,0 +1,164 @@
+# Candidate Push Models
+
+## Overview
+
+The Candidate Push models provide a structured way to manage configuration commits in Palo Alto Networks' Strata Cloud
+Manager.
+These models handle the validation and processing of commit requests, including folder selection, admin authorization,
+and
+commit descriptions.
+
+## Attributes
+
+### Request Model Attributes
+
+| Attribute | Type | Required | Default | Description |
+|-------------|-----------|----------|---------|-------------------------------------------------|
+| folders | List[str] | Yes | None | List of folders to commit changes from |
+| admin | List[str] | Yes | None | List of admin email addresses for authorization |
+| description | str | Yes | None | Description of commit changes. Max length: 255 |
+
+### Response Model Attributes
+
+| Attribute | Type | Required | Default | Description |
+|-----------|------|----------|---------|---------------------------------------------------|
+| success | bool | Yes | None | Whether commit operation was successfully started |
+| job_id | str | Yes | None | ID of the commit job |
+| message | str | Yes | None | Detailed message about the commit operation |
+
+## Exceptions
+
+The Candidate Push models can raise the following exceptions during validation:
+
+- **ValueError**: Raised in several scenarios:
+ - When folders list is empty or contains invalid strings
+ - When admin list is empty or contains invalid email addresses
+ - When description validation fails (empty or exceeds max length)
+
+## Model Validators
+
+### Folder Validation
+
+The models enforce validation rules for the folders list:
+
+
+
+
+
+```python
+from scm.models.operations import CandidatePushRequestModel
+
+# Error: empty folders list
+try:
+ request = CandidatePushRequestModel(
+ folders=[],
+ admin=["admin@example.com"],
+ description="Test commit"
+ )
+except ValueError as e:
+ print(e) # "At least one folder must be specified"
+
+# Error: invalid folder strings
+try:
+ request = CandidatePushRequestModel(
+ folders=["", " "],
+ admin=["admin@example.com"],
+ description="Test commit"
+ )
+except ValueError as e:
+ print(e) # "All folders must be non-empty strings"
+```
+
+
+
+### Admin Validation
+
+The models validate admin email addresses:
+
+
+
+
+
+```python
+# Error: empty admin list
+try:
+ request = CandidatePushRequestModel(
+ folders=["Production"],
+ admin=[],
+ description="Test commit"
+ )
+except ValueError as e:
+ print(e) # "At least one admin must be specified"
+
+# Error: invalid email addresses
+try:
+ request = CandidatePushRequestModel(
+ folders=["Production"],
+ admin=["invalid-email", "also-invalid"],
+ description="Test commit"
+ )
+except ValueError as e:
+ print(e) # "All admin entries must be valid email addresses"
+```
+
+
+
+## Usage Examples
+
+### Creating a Commit Request
+
+
+
+
+
+```python
+# Using dictionary
+from scm.config.operations import CandidatePush
+
+commit_dict = {
+ "folders": ["Texas", "Production"],
+ "admin": ["admin@example.com"],
+ "description": "Updating security policies"
+}
+
+candidate_push = CandidatePush(api_client)
+response = candidate_push.create(commit_dict)
+
+# Using model directly
+from scm.models.operations import CandidatePushRequestModel
+
+commit_request = CandidatePushRequestModel(
+ folders=["Texas", "Production"],
+ admin=["admin@example.com"],
+ description="Updating security policies"
+)
+
+payload = commit_request.model_dump(exclude_unset=True)
+response = candidate_push.create(payload)
+```
+
+
+
+### Handling the Response
+
+
+
+
+
+```python
+from scm.models.operations import CandidatePushResponseModel
+
+# Response will be automatically validated
+response = CandidatePushResponseModel(
+ success=True,
+ job_id="1586",
+ message="CommitAndPush job enqueued with jobid 1586"
+)
+
+# Access response attributes
+if response.success:
+ print(f"Commit job {response.job_id} started successfully")
+ print(f"Message: {response.message}")
+```
+
+
\ No newline at end of file
diff --git a/docs/sdk/models/operations/jobs.md b/docs/sdk/models/operations/jobs.md
new file mode 100644
index 00000000..6abf507f
--- /dev/null
+++ b/docs/sdk/models/operations/jobs.md
@@ -0,0 +1,264 @@
+# Jobs Models
+
+## Overview
+
+The Jobs models provide a structured way to track and monitor job status and details in Palo Alto Networks' Strata Cloud
+Manager.
+These models support both individual job status queries and paginated job list responses. The models handle validation
+and
+serialization of job data when interacting with the SCM API.
+
+## Attributes
+
+### JobDetails Model
+
+| Attribute | Type | Required | Default | Description |
+|-------------|-----------|----------|---------|--------------------------------|
+| info | List[str] | No | [] | List of informational messages |
+| errors | List[str] | No | [] | List of error messages |
+| warnings | List[str] | No | [] | List of warning messages |
+| description | str | No | None | Description of the job |
+
+### JobStatusData Model
+
+| Attribute | Type | Required | Default | Description |
+|-------------|----------|----------|---------|-------------------------------|
+| id | str | Yes | None | Unique identifier for the job |
+| cfg_id | str | No | "" | Configuration ID |
+| details | str | Yes | None | Job details in JSON format |
+| dev_serial | str | No | "" | Device serial number |
+| dev_uuid | str | No | "" | Device UUID |
+| device_name | str | No | "" | Name of the device |
+| device_type | str | No | "" | Type of device |
+| end_ts | datetime | No | None | Job end timestamp |
+| insert_ts | datetime | Yes | None | Job creation timestamp |
+| job_result | str | Yes | None | Result of the job |
+| job_status | str | Yes | None | Current status of the job |
+| job_type | str | Yes | None | Type of job |
+| last_update | datetime | Yes | None | Last update timestamp |
+| owner | str | Yes | None | Job owner |
+| parent_id | str | No | "0" | Parent job ID |
+| percent | str | Yes | None | Job completion percentage |
+| result_i | str | Yes | None | Numeric result code |
+| result_str | str | Yes | None | String result description |
+| session_id | str | No | "" | Session ID |
+| start_ts | datetime | Yes | None | Job start timestamp |
+| status_i | str | Yes | None | Numeric status code |
+| status_str | str | Yes | None | String status description |
+| summary | str | No | "" | Job summary |
+| type_i | str | Yes | None | Numeric job type code |
+| type_str | str | Yes | None | String job type description |
+| uname | str | Yes | None | Username |
+
+### JobListItem Model
+
+| Attribute | Type | Required | Default | Description |
+|-------------|------|----------|---------|-------------------------------|
+| id | str | Yes | None | Unique identifier for the job |
+| device_name | str | No | "" | Name of the device |
+| end_ts | str | No | None | Job end timestamp |
+| job_result | str | Yes | None | Result of the job |
+| job_status | str | Yes | None | Current status of the job |
+| job_type | str | Yes | None | Type of job |
+| parent_id | str | Yes | None | Parent job ID |
+| percent | str | No | "" | Job completion percentage |
+| result_str | str | Yes | None | String result description |
+| start_ts | str | Yes | None | Job start timestamp |
+| status_str | str | Yes | None | String status description |
+| summary | str | No | "" | Job summary |
+| type_str | str | Yes | None | String job type description |
+| uname | str | Yes | None | Username |
+| description | str | No | "" | Job description |
+
+## Model Validators
+
+### Timestamp Validation
+
+The JobListItem model validates timestamp fields to handle empty strings:
+
+
+
+
+
+```python
+from scm.models.operations import JobListItem
+
+# Empty string timestamps are converted to None
+job = JobListItem(
+ id="123",
+ job_result="SUCCESS",
+ job_status="COMPLETED",
+ job_type="CONFIG",
+ parent_id="0",
+ result_str="Success",
+ start_ts="2023-01-01T00:00:00",
+ end_ts="", # Will be converted to None
+ status_str="Completed",
+ type_str="Configuration",
+ uname="admin"
+)
+print(job.end_ts) # None
+```
+
+
+
+### Datetime Serialization
+
+The JobStatusData model automatically serializes datetime fields to ISO format strings:
+
+
+
+
+
+```python
+from datetime import datetime
+from scm.models.operations import JobStatusData
+
+job = JobStatusData(
+ id="123",
+ details="{}",
+ insert_ts=datetime.now(),
+ job_result="SUCCESS",
+ job_status="COMPLETED",
+ job_type="CONFIG",
+ last_update=datetime.now(),
+ owner="admin",
+ percent="100",
+ result_i="1",
+ result_str="Success",
+ start_ts=datetime.now(),
+ status_i="2",
+ status_str="Completed",
+ type_i="3",
+ type_str="Configuration",
+ uname="admin"
+)
+
+data = job.model_dump()
+print(data["start_ts"]) # "2023-12-20T10:15:30.123456"
+```
+
+
+
+## Usage Examples
+
+### Working with Job Details
+
+
+
+
+
+```python
+from scm.models.operations import JobDetails
+
+# Create job details with messages
+details = JobDetails(
+ info=["Starting configuration push", "Configuration applied"],
+ warnings=["Device connection slow"],
+ errors=[],
+ description="Configuration update job"
+)
+
+# Access job details
+print(details.info) # ["Starting configuration push", "Configuration applied"]
+print(details.warnings) # ["Device connection slow"]
+print(details.description) # "Configuration update job"
+```
+
+
+
+### Processing Job Status Data
+
+
+
+
+
+```python
+from scm.models.operations import JobStatusResponse, JobStatusData
+from datetime import datetime
+
+# Create job status data
+job_data = JobStatusData(
+ id="123",
+ details="{}",
+ insert_ts=datetime.now(),
+ job_result="SUCCESS",
+ job_status="COMPLETED",
+ job_type="CONFIG",
+ last_update=datetime.now(),
+ owner="admin",
+ percent="100",
+ result_i="1",
+ result_str="Success",
+ start_ts=datetime.now(),
+ status_i="2",
+ status_str="Completed",
+ type_i="3",
+ type_str="Configuration",
+ uname="admin"
+)
+
+# Create response with job data
+response = JobStatusResponse(data=[job_data])
+
+# Access job status information
+for job in response.data:
+ print(f"Job {job.id}: {job.status_str} ({job.percent}%)")
+```
+
+
+
+### Working with Job Lists
+
+
+
+
+
+```python
+from scm.models.operations import JobListResponse, JobListItem
+
+# Create job list items
+jobs = [
+ JobListItem(
+ id="123",
+ job_result="SUCCESS",
+ job_status="COMPLETED",
+ job_type="CONFIG",
+ parent_id="0",
+ result_str="Success",
+ start_ts="2023-01-01T00:00:00",
+ status_str="Completed",
+ type_str="Configuration",
+ uname="admin"
+ ),
+ JobListItem(
+ id="124",
+ job_result="IN_PROGRESS",
+ job_status="RUNNING",
+ job_type="COMMIT",
+ parent_id="0",
+ result_str="In Progress",
+ start_ts="2023-01-01T00:15:00",
+ status_str="Running",
+ type_str="Commit",
+ uname="admin"
+ )
+]
+
+# Create paginated response
+response = JobListResponse(
+ data=jobs,
+ total=100,
+ limit=10,
+ offset=0
+)
+
+# Process job list
+for job in response.data:
+ print(f"Job {job.id}: {job.type_str} - {job.status_str}")
+
+# Access pagination info
+print(f"Showing {len(response.data)} of {response.total} jobs")
+```
+
+
\ No newline at end of file
diff --git a/examples/scm/config/objects/address.py b/examples/scm/config/objects/address.py
index 370e618e..eb75e54c 100644
--- a/examples/scm/config/objects/address.py
+++ b/examples/scm/config/objects/address.py
@@ -10,52 +10,52 @@
try:
# Initialize client with debug logging
client = Scm(
- client_id="your_client_id_here",
- client_secret="your_client_secret_here",
- tsg_id="your_scm_tsg_id_here",
+ client_id="your_client_id",
+ client_secret="your_client_secret",
+ tsg_id="your_tsg_id",
log_level="DEBUG",
)
+ # Initialize address object
+ addresses = Address(client)
+
+ try:
+ # Create new address
+ create_data = {
+ "name": "test_network",
+ "ip_netmask": "10.0.0.0/24",
+ "description": "Test network segment",
+ "folder": "Texas",
+ "tag": ["Python", "Automation"],
+ }
+
+ new_address = addresses.create(create_data)
+ print(f"Created address: {new_address.name}")
+
+ # Fetch the address by name
+ try:
+ fetched = addresses.fetch(name="test_network", folder="Texas")
+ print(f"Found address: {fetched.name}")
+
+ # Update the address using Pydantic model
+ fetched.description = "Updated test network segment"
+ updated = addresses.update(fetched)
+ print(f"Updated description: {updated.description}")
+
+ except NotFoundError as e:
+ print(f"Address not found: {e.message}")
+
+ # Clean up
+ addresses.delete(new_address.id)
+ print("Address deleted successfully")
+
+ except NameNotUniqueError as e:
+ print(f"Address name conflict: {e.message}")
+ except InvalidObjectError as e:
+ print(f"Invalid address data: {e.message}")
+ if e.details:
+ print(f"Details: {e.details}")
+
except AuthenticationError as e:
print(f"Authentication failed: {e.message}")
print(f"Status code: {e.http_status_code}")
-
-# Initialize address object
-addresses = Address(client)
-
-try:
- # Create new address
- create_data = {
- "name": "test_network",
- "ip_netmask": "10.0.0.0/24",
- "description": "Test network segment",
- "folder": "Texas",
- "tag": ["Automation"],
- }
-
- new_address = addresses.create(create_data)
- print(f"Created address: {new_address.name}")
-
-except NameNotUniqueError as e:
- print(f"Address name conflict: {e.message}")
-except InvalidObjectError as e:
- print(f"Invalid address data: {e.message}")
- if e.details:
- print(f"Details: {e.details}")
-
-# Fetch the address by name
-try:
- fetched = addresses.fetch(name="test_network", folder="Texas")
- print(f"Found address: {fetched['name']}")
-
- # Update the address
- fetched["description"] = "Updated test network segment"
- updated = addresses.update(fetched)
- print(f"Updated description: {updated.description}")
-
-except NotFoundError as e:
- print(f"Address not found: {e.message}")
-
-# Clean up
-addresses.delete(new_address.id)
-print("Address deleted successfully")
diff --git a/examples/scm/config/objects/address_group.py b/examples/scm/config/objects/address_group.py
new file mode 100644
index 00000000..dc0d7c6f
--- /dev/null
+++ b/examples/scm/config/objects/address_group.py
@@ -0,0 +1,87 @@
+from scm.client import Scm
+from scm.config.objects import Address, AddressGroup
+from scm.exceptions import (
+ InvalidObjectError,
+ NotFoundError,
+ AuthenticationError,
+ NameNotUniqueError,
+ ReferenceNotZeroError,
+)
+
+try:
+ # Initialize client with debug logging
+ client = Scm(
+ client_id="your_client_id",
+ client_secret="your_client_secret",
+ tsg_id="your_tsg_id",
+ log_level="DEBUG", # Enable detailed logging
+ )
+
+ # Initialize address and address group objects
+ addresses = Address(client)
+ address_groups = AddressGroup(client)
+
+ try:
+ # Create address objects
+ ao1 = {
+ "name": "test_network1",
+ "ip_netmask": "10.0.0.0/24",
+ "description": "Test network",
+ "folder": "Texas",
+ "tag": ["Automation"],
+ }
+ test_network1 = addresses.create(ao1)
+
+ ao2 = {
+ "name": "test_network2",
+ "ip_netmask": "10.0.1.0/24",
+ "description": "Test network",
+ "folder": "Texas",
+ "tag": ["Automation"],
+ }
+ test_network2 = addresses.create(ao2)
+
+ # Create a new static group
+ test_network_group = {
+ "name": "test_network_group",
+ "description": "Test networks",
+ "static": [test_network1.name, test_network2.name],
+ "folder": "Texas",
+ "tag": ["Automation"],
+ }
+
+ new_group = address_groups.create(test_network_group)
+ print(f"Created group: {new_group.name}")
+
+ # Fetch and update the group
+ try:
+ fetched_group = address_groups.fetch(
+ name="test_network_group", folder="Texas"
+ )
+ print(f"Found group: {fetched_group.name}")
+
+ # Update the group using Pydantic model
+ fetched_group.description = "Updated test networks"
+ updated = address_groups.update(fetched_group)
+ print(f"Updated description: {updated.description}")
+
+ except NotFoundError as e:
+ print(f"Group not found: {e.message}")
+
+ # Clean up
+ try:
+ address_groups.delete(new_group.id)
+ print("Group deleted successfully")
+ except ReferenceNotZeroError as e:
+ print(f"Cannot delete group - still in use: {e.message}")
+
+ except NameNotUniqueError as e:
+ print(f"Name conflict: {e.message}")
+ except InvalidObjectError as e:
+ print(f"Invalid data: {e.message}")
+ if e.details:
+ print(f"Details: {e.details}")
+
+except AuthenticationError as e:
+ print(f"Authentication failed: {e.message}")
+ print(f"Status code: {e.http_status_code}")
diff --git a/mkdocs.yml b/mkdocs.yml
index 6276e2e7..d43447be 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -33,7 +33,6 @@ nav:
- Overview: sdk/index.md
- Auth: sdk/auth.md
- Client: sdk/client.md
- - Exceptions: sdk/exceptions.md
- Configuration:
- Objects:
- Overview: sdk/config/objects/index.md
@@ -62,6 +61,9 @@ nav:
- Service Models: sdk/models/objects/service_models.md
- Service Group Models: sdk/models/objects/service_group_models.md
- Tag Models: sdk/models/objects/tag_models.md
+ - Operations:
+ - Candidate Push (commit config): sdk/models/operations/candidate_push.md
+ - Jobs: sdk/models/operations/jobs.md
- Security Services:
- Overview: sdk/models/security_services/index.md
- Anti Spyware Security Profile Models: sdk/models/security_services/anti_spyware_profile_models.md
@@ -70,6 +72,7 @@ nav:
- Security Rule Models: sdk/models/security_services/security_rule_models.md
- Vulnerability Protection Security Profile Models: sdk/config/security_services/vulnerability_protection_profile_models.md
- Wildfire Anti Virus Security Profile Models: sdk/models/security_services/wildfire_antivirus_profile_models.md
+ - Exceptions: sdk/exceptions.md
extra_css:
- css/termynal.css
- css/custom.css
diff --git a/pyproject.toml b/pyproject.toml
index b1619cb6..656d0006 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pan-scm-sdk"
-version = "0.3.1"
+version = "0.3.2"
description = "Python SDK for Palo Alto Networks Strata Cloud Manager."
authors = ["Calvin Remsburg "]
license = "Apache 2.0"
diff --git a/scm/auth.py b/scm/auth.py
index f76edcf2..05a30976 100644
--- a/scm/auth.py
+++ b/scm/auth.py
@@ -69,7 +69,7 @@ def _create_session(self) -> OAuth2Session:
# Configure retry strategy
retry_strategy = self._setup_retry_strategy()
adapter = HTTPAdapter(max_retries=retry_strategy)
- oauth.mount("http://", adapter)
+ oauth.mount("http://", adapter) # noqa
oauth.mount("https://", adapter)
logger.debug("Fetching initial token...")
@@ -171,7 +171,7 @@ def refresh_token(self) -> None:
temp_session = Session()
retry_strategy = self._setup_retry_strategy()
adapter = HTTPAdapter(max_retries=retry_strategy)
- temp_session.mount("http://", adapter)
+ temp_session.mount("http://", adapter) # noqa
temp_session.mount("https://", adapter)
try:
diff --git a/scm/client.py b/scm/client.py
index 4ef9f544..921ed2c6 100644
--- a/scm/client.py
+++ b/scm/client.py
@@ -3,7 +3,8 @@
# Standard library imports
import logging
import sys
-from typing import Optional, Dict, Any
+import time
+from typing import Optional, Dict, Any, List
# External libraries
from requests.exceptions import HTTPError
@@ -15,6 +16,12 @@
ErrorHandler,
)
from scm.models.auth import AuthRequestModel
+from scm.models.operations import (
+ CandidatePushRequestModel,
+ CandidatePushResponseModel,
+ JobStatusResponse,
+ JobListResponse,
+)
class Scm:
@@ -174,3 +181,145 @@ def delete(
endpoint,
**kwargs,
)
+
+ def list_jobs(
+ self,
+ limit: int = 100,
+ offset: int = 0,
+ parent_id: Optional[str] = None,
+ ) -> JobListResponse:
+ """
+ List jobs in SCM with pagination support and optional parent ID filtering.
+
+ Args:
+ limit: Maximum number of jobs to return (default: 100)
+ offset: Number of jobs to skip (default: 0)
+ parent_id: Filter jobs by parent job ID (default: None)
+
+ Returns:
+ JobListResponse: Paginated list of jobs
+ """
+ # Make API request with just pagination parameters
+ response = self.get(
+ "/config/operations/v1/jobs",
+ params={
+ "limit": limit,
+ "offset": offset,
+ },
+ )
+
+ # Convert to Pydantic model
+ jobs_response = JobListResponse(**response)
+
+ # If parent_id filter is specified, filter the jobs
+ if parent_id is not None:
+ filtered_data = [
+ job for job in jobs_response.data if job.parent_id == parent_id
+ ]
+ jobs_response.data = filtered_data
+ jobs_response.total = len(filtered_data)
+
+ return jobs_response
+
+ def get_job_status(self, job_id: str) -> JobStatusResponse:
+ """
+ Get the status of a job.
+
+ Args:
+ job_id: The ID of the job to check
+
+ Returns:
+ JobStatusResponse: The job status response
+ """
+ response = self.get(f"/config/operations/v1/jobs/{job_id}")
+ return JobStatusResponse(**response)
+
+ def wait_for_job(
+ self, job_id: str, timeout: int = 300, poll_interval: int = 10
+ ) -> Optional[JobStatusResponse]:
+ """
+ Wait for a job to complete.
+
+ Args:
+ job_id: The ID of the job to check
+ timeout: Maximum time to wait in seconds (default: 300)
+ poll_interval: Time between status checks in seconds (default: 10)
+
+ Returns:
+ JobStatusResponse: The final job status response
+
+ Raises:
+ TimeoutError: If the job doesn't complete within the timeout period
+ """
+ start_time = time.time()
+ while True:
+ if time.time() - start_time > timeout:
+ raise TimeoutError(
+ f"Job {job_id} did not complete within {timeout} seconds"
+ )
+
+ status = self.get_job_status(job_id)
+ if not status.data:
+ time.sleep(poll_interval)
+ continue
+
+ job_status = status.data[0]
+ if job_status.status_str == "FIN":
+ return status
+
+ time.sleep(poll_interval)
+
+ def commit(
+ self,
+ folders: List[str],
+ description: str,
+ admin: Optional[List[str]] = None,
+ sync: bool = False,
+ timeout: int = 300,
+ ) -> CandidatePushResponseModel:
+ """
+ Commits configuration changes to SCM.
+
+ Args:
+ folders: List of folder names to commit changes from
+ description: Description of the commit
+ admin: List of admin emails. Defaults to client_id if not provided
+ sync: Whether to wait for job completion
+ timeout: Maximum time to wait for job completion in seconds
+
+ Returns:
+ CandidatePushResponseModel: Response containing job information
+ """
+ if admin is None:
+ admin = [self.oauth_client.auth_request.client_id]
+
+ commit_request = CandidatePushRequestModel(
+ folders=folders,
+ admin=admin,
+ description=description,
+ )
+
+ self.logger.debug(f"Commit request: {commit_request.model_dump()}")
+
+ response = self.post(
+ "/config/operations/v1/config-versions/candidate:push",
+ json=commit_request.model_dump(),
+ )
+
+ commit_response = CandidatePushResponseModel(**response)
+
+ if sync and commit_response.success and commit_response.job_id:
+ try:
+ final_status = self.wait_for_job(
+ commit_response.job_id, timeout=timeout
+ )
+ if final_status:
+ self.logger.info(
+ f"Commit job {commit_response.job_id} completed: "
+ f"{final_status.data[0].result_str}"
+ )
+ except TimeoutError as e:
+ self.logger.error(f"Commit job timed out: {str(e)}")
+ raise
+
+ return commit_response
diff --git a/scm/config/__init__.py b/scm/config/__init__.py
index 3d28d671..93d05a98 100644
--- a/scm/config/__init__.py
+++ b/scm/config/__init__.py
@@ -1,7 +1,13 @@
# scm/config/__init__.py
+from typing import List, Dict, Any, Optional
+
from scm.client import Scm
-from typing import List, Dict, Any
+from scm.models.operations import (
+ CandidatePushResponseModel,
+ JobStatusResponse,
+ JobListResponse,
+)
class BaseObject:
@@ -35,25 +41,116 @@ def __init__(self, api_client: Scm):
self.api_client = api_client
- def create(self, data: Dict[str, Any]) -> Dict[str, Any]:
- response = self.api_client.post(self.ENDPOINT, json=data)
+ # CRUD methods
+ def create(
+ self,
+ data: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ response = self.api_client.post(
+ self.ENDPOINT,
+ json=data,
+ )
return response
- def get(self, object_id: str) -> Dict[str, Any]:
+ def get(
+ self,
+ object_id: str,
+ ) -> Dict[str, Any]:
endpoint = f"{self.ENDPOINT}/{object_id}"
response = self.api_client.get(endpoint)
return response
- def update(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ def update(
+ self,
+ data: Dict[str, Any],
+ ) -> Dict[str, Any]:
endpoint = f"{self.ENDPOINT}/{data['id']}"
- response = self.api_client.put(endpoint, json=data)
+ response = self.api_client.put(
+ endpoint,
+ json=data,
+ )
return response
- def delete(self, object_id: str) -> None:
+ def delete(
+ self,
+ object_id: str,
+ ) -> None:
endpoint = f"{self.ENDPOINT}/{object_id}"
self.api_client.delete(endpoint)
- def list(self, **filters) -> List[Dict[str, Any]]:
+ def list(
+ self,
+ **filters,
+ ) -> List[Dict[str, Any]]:
params = {k: v for k, v in filters.items() if v is not None}
- response = self.api_client.get(self.ENDPOINT, params=params)
+ response = self.api_client.get(
+ self.ENDPOINT,
+ params=params,
+ )
return response.get("data", [])
+
+ def list_jobs(
+ self,
+ limit: int = 100,
+ offset: int = 0,
+ parent_id: Optional[str] = None,
+ ) -> JobListResponse:
+ """
+ List jobs in SCM with pagination support and optional parent ID filtering.
+
+ Args:
+ limit: Maximum number of jobs to return (default: 100)
+ offset: Number of jobs to skip (default: 0)
+ parent_id: Filter jobs by parent job ID (default: None)
+
+ Returns:
+ JobListResponse: Paginated list of jobs
+ """
+ return self.api_client.list_jobs(
+ limit=limit,
+ offset=offset,
+ parent_id=parent_id,
+ )
+
+ def get_job_status(self, job_id: str) -> JobStatusResponse:
+ """
+ Get the status of a job.
+
+ Args:
+ job_id: The ID of the job to check
+
+ Returns:
+ JobStatusResponse: The job status response
+ """
+ return self.api_client.get_job_status(job_id)
+
+ def commit(
+ self,
+ folders: List[str],
+ description: str,
+ admin: Optional[List[str]] = None,
+ sync: bool = False,
+ timeout: int = 300,
+ ) -> CandidatePushResponseModel:
+ """
+ Commits configuration changes to SCM.
+
+ This method proxies to the api_client's commit method.
+
+ Args:
+ folders: List of folder names to commit changes from
+ description: Description of the commit
+ admin: List of admin emails
+ sync: Whether to wait for job completion
+ timeout: Maximum time to wait for job completion in seconds
+
+ Returns:
+ CandidatePushResponseModel: Response containing job information
+ """
+ return self.api_client.commit(
+ folders=folders,
+ description=description,
+ admin=admin,
+ sync=sync,
+ timeout=timeout,
+ )
diff --git a/scm/models/operations/__init__.py b/scm/models/operations/__init__.py
new file mode 100644
index 00000000..05a876bf
--- /dev/null
+++ b/scm/models/operations/__init__.py
@@ -0,0 +1,13 @@
+# scm/models/operations/__init__.py
+
+from .candidate_push import (
+ CandidatePushRequestModel,
+ CandidatePushResponseModel,
+)
+from .jobs import (
+ JobDetails,
+ JobStatusData,
+ JobStatusResponse,
+ JobListItem,
+ JobListResponse,
+)
diff --git a/scm/models/operations/candidate_push.py b/scm/models/operations/candidate_push.py
new file mode 100644
index 00000000..60a20f37
--- /dev/null
+++ b/scm/models/operations/candidate_push.py
@@ -0,0 +1,97 @@
+# scm/models/operations/candidate_push.py
+
+from typing import List
+
+from pydantic import BaseModel, Field, ConfigDict, field_validator
+
+
+class CandidatePushRequestModel(BaseModel):
+ """
+ Represents a commit request for Palo Alto Networks' Strata Cloud Manager.
+
+ This class defines the structure and validation rules for commit requests,
+ including folder selection, admin users, and commit description.
+
+ Attributes:
+ folders (List[str]): List of folders to commit changes from.
+ admin (List[str]): List of admin email addresses authorized for the commit.
+ description (str): Description of the commit changes.
+
+ Error:
+ ValueError: Raised when validation fails for folders, admin, or description.
+ """
+
+ folders: List[str] = Field(
+ ...,
+ min_length=1,
+ description="List of folders to commit changes from",
+ examples=[["Texas", "Production"]],
+ )
+ admin: List[str] = Field(
+ ...,
+ min_length=1,
+ description="List of admin email addresses",
+ examples=[["admin@example.com"]],
+ )
+ description: str = Field(
+ ...,
+ min_length=1,
+ max_length=255,
+ description="Description of the commit changes",
+ )
+
+ model_config = ConfigDict(
+ populate_by_name=True,
+ validate_assignment=True,
+ )
+
+ @field_validator("folders")
+ def validate_folders(cls, v):
+ """Ensure folders list is not empty and contains valid strings."""
+ if not v:
+ raise ValueError("At least one folder must be specified")
+ if not all(isinstance(folder, str) and folder.strip() for folder in v):
+ raise ValueError("All folders must be non-empty strings")
+ return v
+
+ @field_validator("admin")
+ def validate_admin(cls, v):
+ """Ensure admin list is not empty and contains valid email addresses."""
+ if not v:
+ raise ValueError("At least one admin must be specified")
+ if not all(isinstance(admin, str) and "@" in admin for admin in v):
+ raise ValueError("All admin entries must be valid email addresses")
+ return v
+
+
+class CandidatePushResponseModel(BaseModel):
+ """
+ Represents a commit response from Palo Alto Networks' Strata Cloud Manager.
+
+ This class defines the structure for commit operation responses,
+ including success status, job ID, and response message.
+
+ Attributes:
+ success (bool): Whether the commit operation was successfully initiated.
+ job_id (str): The ID of the commit job.
+ message (str): Detailed message about the commit operation.
+ """
+
+ success: bool = Field(
+ ...,
+ description="Whether the commit operation was successfully initiated",
+ )
+ job_id: str = Field(
+ ...,
+ description="The ID of the commit job",
+ examples=["1586"],
+ )
+ message: str = Field(
+ ...,
+ description="Detailed message about the commit operation",
+ examples=["CommitAndPush job enqueued with jobid 1586"],
+ )
+
+ model_config = ConfigDict(
+ populate_by_name=True,
+ )
diff --git a/scm/models/operations/jobs.py b/scm/models/operations/jobs.py
new file mode 100644
index 00000000..1e7bb04d
--- /dev/null
+++ b/scm/models/operations/jobs.py
@@ -0,0 +1,111 @@
+# scm/models/operations/jobs.py
+
+from datetime import datetime
+from typing import List, Optional
+
+from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator
+
+
+class JobDetails(BaseModel):
+ """Model for job details JSON string."""
+
+ info: List[str] = Field(default_factory=list)
+ errors: List[str] = Field(default_factory=list)
+ warnings: List[str] = Field(default_factory=list)
+ description: Optional[str] = None
+
+
+class JobStatusData(BaseModel):
+ """Model for individual job status data."""
+
+ cfg_id: str = Field(default="")
+ details: str
+ dev_serial: str = Field(default="")
+ dev_uuid: str = Field(default="")
+ device_name: str = Field(default="")
+ device_type: str = Field(default="")
+ end_ts: Optional[datetime] = None
+ id: str
+ insert_ts: datetime
+ job_result: str
+ job_status: str
+ job_type: str
+ last_update: datetime
+ opaque_int: str = Field(default="0")
+ opaque_str: str = Field(default="")
+ owner: str
+ parent_id: str = Field(default="0")
+ percent: str
+ result_i: str
+ result_str: str
+ session_id: str = Field(default="")
+ start_ts: datetime
+ status_i: str
+ status_str: str
+ summary: str = Field(default="")
+ type_i: str
+ type_str: str
+ uname: str
+
+ model_config = ConfigDict(populate_by_name=True)
+
+ @field_serializer(
+ "end_ts",
+ "insert_ts",
+ "last_update",
+ "start_ts",
+ )
+ def serialize_datetime(self, dt: Optional[datetime], _info) -> Optional[str]:
+ """Serialize datetime fields to ISO format."""
+ return dt.isoformat() if dt else None
+
+
+class JobStatusResponse(BaseModel):
+ """Model for job status response."""
+
+ data: List[JobStatusData]
+
+ model_config = ConfigDict(populate_by_name=True)
+
+
+class JobListItem(BaseModel):
+ """Model for individual job in list response."""
+
+ device_name: str = Field(default="")
+ end_ts: Optional[str] = Field(default=None)
+ id: str
+ job_result: str
+ job_status: str
+ job_type: str
+ parent_id: str
+ percent: str = Field(default="")
+ result_str: str
+ start_ts: str
+ status_str: str
+ summary: str = Field(default="")
+ type_str: str
+ uname: str
+ description: str = Field(default="")
+
+ model_config = ConfigDict(populate_by_name=True)
+
+ @field_validator(
+ "end_ts",
+ "start_ts",
+ )
+ def validate_timestamp(cls, v: Optional[str]) -> Optional[str]:
+ """Validate timestamp fields, allowing empty strings."""
+ if v == "":
+ return None
+ return v
+
+
+class JobListResponse(BaseModel):
+ """Model for jobs list response with pagination."""
+
+ data: List[JobListItem]
+ total: int
+ limit: int
+ offset: int
+
+ model_config = ConfigDict(populate_by_name=True)
diff --git a/tests/scm/config/test_base_object.py b/tests/scm/config/test_base_object.py
index 9a2467b8..5353ba02 100644
--- a/tests/scm/config/test_base_object.py
+++ b/tests/scm/config/test_base_object.py
@@ -1,9 +1,16 @@
# tests/scm/config/test_base_object.py
-import pytest
from unittest.mock import MagicMock
-from scm.config import BaseObject
+
+import pytest
+
from scm.client import Scm
+from scm.config import BaseObject
+from scm.models.operations import (
+ CandidatePushResponseModel,
+ JobStatusResponse,
+ JobListResponse,
+)
@pytest.mark.usefixtures("load_env")
@@ -19,10 +26,17 @@ class MockConfigObject(BaseObject):
def setup_method(self, mock_scm):
"""Setup method that runs before each test."""
self.mock_scm = mock_scm
+ # Mock basic HTTP methods
self.mock_scm.get = MagicMock()
self.mock_scm.post = MagicMock()
self.mock_scm.put = MagicMock()
self.mock_scm.delete = MagicMock()
+
+ # Mock API-specific methods
+ self.mock_scm.list_jobs = MagicMock()
+ self.mock_scm.get_job_status = MagicMock()
+ self.mock_scm.commit = MagicMock()
+
self.test_object = self.MockConfigObject(self.mock_scm)
def test_initialization(self):
@@ -222,3 +236,132 @@ def test_create_method_payload_validation(self):
"/api/v1/test-objects",
json=nested_data,
)
+
+ def test_list_jobs(self):
+ """
+ **Objective:** Test list_jobs method return value.
+ **Workflow:**
+ 1. Tests job listing with pagination and filtering
+ 2. Verifies return type and value
+ 3. Validates parameter passing
+ """
+ mock_response = {
+ "data": [
+ {
+ "id": "1",
+ "status_str": "FIN",
+ "job_result": "2",
+ "job_status": "2",
+ "job_type": "53",
+ "type_str": "CommitAndPush",
+ "result_str": "OK",
+ "start_ts": "2024-11-30T10:00:00",
+ "uname": "test@example.com",
+ "parent_id": "0",
+ }
+ ],
+ "total": 1,
+ "limit": 100,
+ "offset": 0,
+ }
+ self.mock_scm.list_jobs.return_value = JobListResponse(**mock_response)
+
+ # Test with default parameters
+ response = self.test_object.list_jobs()
+ assert isinstance(response, JobListResponse)
+ self.mock_scm.list_jobs.assert_called_with(limit=100, offset=0, parent_id=None)
+
+ # Test with custom parameters
+ response = self.test_object.list_jobs(
+ limit=50, offset=10, parent_id="parent123"
+ )
+ assert isinstance(response, JobListResponse)
+ self.mock_scm.list_jobs.assert_called_with(
+ limit=50, offset=10, parent_id="parent123"
+ )
+
+ def test_get_job_status(self):
+ """
+ **Objective:** Test get_job_status method return value.
+ **Workflow:**
+ 1. Tests job status retrieval
+ 2. Verifies return type and value
+ 3. Validates parameter passing
+ """
+ mock_response = {
+ "data": [
+ {
+ "id": "1595",
+ "status_str": "FIN",
+ "status_i": "2",
+ "start_ts": "2024-11-30T10:00:00",
+ "insert_ts": "2024-11-30T10:00:00",
+ "last_update": "2024-11-30T10:02:00",
+ "job_status": "2",
+ "job_type": "53",
+ "job_result": "2",
+ "result_i": "2",
+ "result_str": "OK",
+ "details": "completed",
+ "owner": "test",
+ "percent": "100",
+ "type_i": "53",
+ "type_str": "CommitAndPush",
+ "uname": "test-user",
+ }
+ ]
+ }
+ self.mock_scm.get_job_status.return_value = JobStatusResponse(**mock_response)
+
+ response = self.test_object.get_job_status("1595")
+ assert isinstance(response, JobStatusResponse)
+ assert response.data[0].id == "1595"
+ assert response.data[0].status_str == "FIN"
+ self.mock_scm.get_job_status.assert_called_with("1595")
+
+ def test_commit(self):
+ """
+ **Objective:** Test commit method return value.
+ **Workflow:**
+ 1. Tests configuration commit operation
+ 2. Verifies return type and value
+ 3. Validates parameter passing with different combinations
+ """
+ mock_response = {
+ "success": True,
+ "job_id": "1586",
+ "message": "CommitAndPush job enqueued with jobid 1586",
+ }
+ self.mock_scm.commit.return_value = CandidatePushResponseModel(**mock_response)
+
+ # Test with minimal required parameters
+ response = self.test_object.commit(
+ folders=["folder1"], description="Test commit"
+ )
+ assert isinstance(response, CandidatePushResponseModel)
+ assert response.success is True
+ assert response.job_id == "1586"
+ self.mock_scm.commit.assert_called_with(
+ folders=["folder1"],
+ description="Test commit",
+ admin=None,
+ sync=False,
+ timeout=300,
+ )
+
+ # Test with all parameters
+ response = self.test_object.commit(
+ folders=["folder1", "folder2"],
+ description="Test commit with all params",
+ admin=["admin@example.com"],
+ sync=True,
+ timeout=600,
+ )
+ assert isinstance(response, CandidatePushResponseModel)
+ self.mock_scm.commit.assert_called_with(
+ folders=["folder1", "folder2"],
+ description="Test commit with all params",
+ admin=["admin@example.com"],
+ sync=True,
+ timeout=600,
+ )
diff --git a/tests/scm/models/auth/operations/test_commit.py b/tests/scm/models/auth/operations/test_commit.py
new file mode 100644
index 00000000..6f8f1947
--- /dev/null
+++ b/tests/scm/models/auth/operations/test_commit.py
@@ -0,0 +1,176 @@
+import pytest
+from pydantic import ValidationError
+
+from scm.models.operations import CandidatePushRequestModel
+
+
+class TestCandidatePushRequestModel:
+ """Tests for CandidatePushRequestModel validators."""
+
+ def test_valid_model(self):
+ """Test model with valid data."""
+ valid_data = {
+ "folders": ["folder1", "folder2"],
+ "admin": ["admin@example.com", "other@example.com"],
+ "description": "Test commit",
+ }
+ model = CandidatePushRequestModel(**valid_data)
+ assert model.folders == valid_data["folders"]
+ assert model.admin == valid_data["admin"]
+ assert model.description == valid_data["description"]
+
+ def test_folders_empty_list(self):
+ """Test validation with empty folders list."""
+ invalid_data = {
+ "folders": [],
+ "admin": ["admin@example.com"],
+ "description": "Test commit",
+ }
+ with pytest.raises(ValidationError) as exc_info:
+ CandidatePushRequestModel(**invalid_data)
+
+ error = exc_info.value
+ assert (
+ "1 validation error for CandidatePushRequestModel\nfolders\n List should have at least 1 item after validation, not 0"
+ in str(error)
+ )
+
+ def test_validate_folders_empty_value(self):
+ """Test validate_folders method with empty value."""
+ with pytest.raises(ValueError) as exc_info:
+ CandidatePushRequestModel.validate_folders(None)
+ assert str(exc_info.value) == "At least one folder must be specified"
+
+ with pytest.raises(ValueError) as exc_info:
+ CandidatePushRequestModel.validate_folders([])
+ assert str(exc_info.value) == "At least one folder must be specified"
+
+ def test_validate_admin_empty_value(self):
+ """Test validate_admin method with empty value."""
+ with pytest.raises(ValueError) as exc_info:
+ CandidatePushRequestModel.validate_admin(None)
+ assert str(exc_info.value) == "At least one admin must be specified"
+
+ with pytest.raises(ValueError) as exc_info:
+ CandidatePushRequestModel.validate_admin([])
+ assert str(exc_info.value) == "At least one admin must be specified"
+
+ def test_folders_invalid_strings(self):
+ """Test validation with invalid folder strings."""
+ test_cases = [
+ {"folders": ["", "folder2"], "desc": "empty string"},
+ {"folders": ["folder1", " "], "desc": "whitespace string"},
+ {"folders": [123, "folder2"], "desc": "non-string value"},
+ ]
+
+ for case in test_cases:
+ invalid_data = {
+ "folders": case["folders"],
+ "admin": ["admin@example.com"],
+ "description": "Test commit",
+ }
+ with pytest.raises(ValidationError) as exc_info:
+ CandidatePushRequestModel(**invalid_data)
+
+ error = exc_info.value
+ assert "1 validation error for CandidatePushRequestModel" in str(
+ error
+ ), f"Failed for {case['desc']}"
+
+ def test_admin_empty_list(self):
+ """Test validation with empty admin list."""
+ invalid_data = {
+ "folders": ["folder1"],
+ "admin": [],
+ "description": "Test commit",
+ }
+ with pytest.raises(ValidationError) as exc_info:
+ CandidatePushRequestModel(**invalid_data)
+
+ error = exc_info.value
+ assert "1 validation error for CandidatePushRequestModel" in str(error)
+
+ def test_admin_invalid_emails(self):
+ """Test validation with invalid email addresses."""
+ test_cases = [
+ {"admin": ["invalid-email", "admin@example.com"], "desc": "no @ symbol"},
+ {"admin": ["admin@example.com", ""], "desc": "empty string"},
+ {"admin": [123, "admin@example.com"], "desc": "non-string value"},
+ {"admin": ["admin@example.com", " "], "desc": "whitespace string"},
+ ]
+
+ for case in test_cases:
+ invalid_data = {
+ "folders": ["folder1"],
+ "admin": case["admin"],
+ "description": "Test commit",
+ }
+ with pytest.raises(ValidationError) as exc_info:
+ CandidatePushRequestModel(**invalid_data)
+
+ error = exc_info.value
+ assert "1 validation error for CandidatePushRequestModel" in str(
+ error
+ ), f"Failed for {case['desc']}"
+
+ def test_multiple_validation_errors(self):
+ """Test handling of multiple validation errors."""
+ invalid_data = {
+ "folders": [], # Empty folders list
+ "admin": ["invalid-email"], # Invalid email
+ "description": "Test commit",
+ }
+ with pytest.raises(ValidationError) as exc_info:
+ CandidatePushRequestModel(**invalid_data)
+
+ error = exc_info.value
+ error_str = str(error)
+ assert "2 validation errors for CandidatePushRequestModel" in error_str
+
+ def test_edge_cases(self):
+ """Test edge cases and boundary conditions."""
+ test_cases = [
+ # Minimum valid case
+ {
+ "data": {
+ "folders": ["folder1"],
+ "admin": ["admin@example.com"],
+ "description": "x",
+ },
+ "should_pass": True,
+ "desc": "minimum valid case",
+ },
+ # Unicode characters in folders
+ {
+ "data": {
+ "folders": ["文件夹1", "फ़ोल्डर2"],
+ "admin": ["admin@example.com"],
+ "description": "Test unicode folders",
+ },
+ "should_pass": True,
+ "desc": "unicode folder names",
+ },
+ # Special characters in email
+ {
+ "data": {
+ "folders": ["folder1"],
+ "admin": ["user+test@example.com"],
+ "description": "Test special email",
+ },
+ "should_pass": True,
+ "desc": "special characters in email",
+ },
+ ]
+
+ for case in test_cases:
+ if case["should_pass"]:
+ try:
+ model = CandidatePushRequestModel(**case["data"])
+ assert (
+ model is not None
+ ), f"Failed to create model for {case['desc']}"
+ except ValidationError as e:
+ pytest.fail(f"Should have passed for {case['desc']}: {str(e)}")
+ else:
+ with pytest.raises(ValidationError):
+ CandidatePushRequestModel(**case["data"])
diff --git a/tests/scm/models/auth/operations/test_jobs.py b/tests/scm/models/auth/operations/test_jobs.py
new file mode 100644
index 00000000..bf826b4e
--- /dev/null
+++ b/tests/scm/models/auth/operations/test_jobs.py
@@ -0,0 +1,151 @@
+from datetime import datetime, timezone
+
+from scm.models.operations.jobs import JobStatusData, JobListItem
+
+
+class TestJobStatusData:
+ """Tests for JobStatusData model."""
+
+ def test_serialize_datetime_with_value(self):
+ """Test datetime serialization with valid datetime."""
+ # Create a datetime object
+ test_dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
+
+ # Create a JobStatusData instance with test datetime
+ job_data = JobStatusData(
+ details="test details",
+ id="123",
+ insert_ts=test_dt,
+ job_result="2",
+ job_status="2",
+ job_type="53",
+ last_update=test_dt,
+ owner="test",
+ percent="100",
+ result_i="2",
+ result_str="OK",
+ start_ts=test_dt,
+ status_i="2",
+ status_str="FIN",
+ type_i="53",
+ type_str="CommitAndPush",
+ uname="test@example.com",
+ end_ts=test_dt,
+ )
+
+ # Convert to dict to trigger serialization
+ serialized = job_data.model_dump()
+
+ # Verify all datetime fields are serialized correctly
+ expected_iso = "2024-01-01T12:00:00+00:00"
+ assert serialized["end_ts"] == expected_iso
+ assert serialized["insert_ts"] == expected_iso
+ assert serialized["last_update"] == expected_iso
+ assert serialized["start_ts"] == expected_iso
+
+ def test_serialize_datetime_with_none(self):
+ """Test datetime serialization with None value."""
+ # Create a JobStatusData instance with None for optional datetime
+ job_data = JobStatusData(
+ details="test details",
+ id="123",
+ insert_ts=datetime.now(),
+ job_result="2",
+ job_status="2",
+ job_type="53",
+ last_update=datetime.now(),
+ owner="test",
+ percent="100",
+ result_i="2",
+ result_str="OK",
+ start_ts=datetime.now(),
+ status_i="2",
+ status_str="FIN",
+ type_i="53",
+ type_str="CommitAndPush",
+ uname="test@example.com",
+ end_ts=None, # Optional field set to None
+ )
+
+ # Convert to dict to trigger serialization
+ serialized = job_data.model_dump()
+
+ # Verify None is preserved
+ assert serialized["end_ts"] is None
+
+
+class TestJobListItem:
+ """Tests for JobListItem model."""
+
+ def test_validate_timestamp_with_empty_string(self):
+ """Test timestamp validation with empty string."""
+ # Create JobListItem with empty string timestamps
+ job_item = JobListItem(
+ id="123",
+ job_result="2",
+ job_status="2",
+ job_type="53",
+ parent_id="0",
+ result_str="OK",
+ start_ts="", # Empty string
+ status_str="FIN",
+ type_str="CommitAndPush",
+ uname="test@example.com",
+ end_ts="", # Empty string
+ )
+
+ # Verify empty strings are converted to None
+ assert job_item.end_ts is None
+ assert job_item.start_ts is None
+
+ def test_validate_timestamp_with_value(self):
+ """Test timestamp validation with valid timestamp string."""
+ timestamp = "2024-01-01T12:00:00Z"
+ job_item = JobListItem(
+ id="123",
+ job_result="2",
+ job_status="2",
+ job_type="53",
+ parent_id="0",
+ result_str="OK",
+ start_ts=timestamp,
+ status_str="FIN",
+ type_str="CommitAndPush",
+ uname="test@example.com",
+ end_ts=timestamp,
+ )
+
+ # Verify valid timestamps are preserved
+ assert job_item.end_ts == timestamp
+ assert job_item.start_ts == timestamp
+
+ def test_validate_timestamp_with_none(self):
+ """Test timestamp validation with None value."""
+ job_item = JobListItem(
+ id="123",
+ job_result="2",
+ job_status="2",
+ job_type="53",
+ parent_id="0",
+ result_str="OK",
+ start_ts="2024-01-01T12:00:00Z", # Required field
+ status_str="FIN",
+ type_str="CommitAndPush",
+ uname="test@example.com",
+ end_ts=None, # Optional field
+ )
+
+ # Verify None is preserved for optional field
+ assert job_item.end_ts is None
+
+ def test_validate_timestamp_direct_validation(self):
+ """Test timestamp validator method directly."""
+ # Test with empty string
+ assert JobListItem.validate_timestamp("") is None
+
+ # Test with valid timestamp
+ timestamp = "2024-01-01T12:00:00Z"
+ assert JobListItem.validate_timestamp(timestamp) == timestamp
+
+ # Test with None
+ assert JobListItem.validate_timestamp(None) is None
diff --git a/tests/scm/test_client.py b/tests/scm/test_client.py
index 45b907f8..9c73001e 100644
--- a/tests/scm/test_client.py
+++ b/tests/scm/test_client.py
@@ -1,5 +1,5 @@
# tests/test_client.py
-from unittest.mock import patch, MagicMock
+from unittest.mock import MagicMock, patch
import pytest
from requests.exceptions import HTTPError
@@ -15,6 +15,11 @@
ErrorHandler,
ServerError,
)
+from scm.models.operations import (
+ JobListResponse,
+ JobStatusResponse,
+ CandidatePushResponseModel,
+)
from tests.utils import raise_mock_http_error
@@ -467,3 +472,417 @@ def test_handle_api_error_empty_field(self):
assert "{'errorType': 'Invalid Object'}" in str(exc_info.value)
assert "HTTP error: 400" in str(exc_info.value)
assert "API error: API_I00035" in str(exc_info.value)
+
+
+class TestClientJobMethods(TestClientBase):
+ """Tests for job-related client methods."""
+
+ def test_list_jobs_basic(self):
+ """Test basic job listing without filtering."""
+ mock_response = {
+ "data": [
+ {
+ "device_name": "device1",
+ "id": "1",
+ "job_result": "2",
+ "job_status": "2",
+ "job_type": "53",
+ "parent_id": "0",
+ "result_str": "OK",
+ "start_ts": "2024-11-30T10:00:00",
+ "status_str": "FIN",
+ "type_str": "CommitAndPush",
+ "uname": "test@example.com",
+ }
+ ],
+ "total": 1,
+ "limit": 100,
+ "offset": 0,
+ }
+ self.session.request.return_value.json.return_value = mock_response
+
+ result = self.client.list_jobs(limit=100, offset=0)
+
+ assert isinstance(result, JobListResponse)
+ assert len(result.data) == 1
+ assert result.total == 1
+ assert result.limit == 100
+ self.session.request.assert_called_once()
+
+ def test_list_jobs_with_parent_filter(self):
+ """Test job listing with parent_id filtering."""
+ mock_response = {
+ "data": [
+ {
+ "id": "1",
+ "parent_id": "parent1",
+ "job_result": "2",
+ "job_status": "2",
+ "job_type": "53",
+ "result_str": "OK",
+ "start_ts": "2024-11-30T10:00:00",
+ "status_str": "FIN",
+ "type_str": "CommitAndPush",
+ "uname": "test@example.com",
+ },
+ {
+ "id": "2",
+ "parent_id": "parent2",
+ "job_result": "2",
+ "job_status": "2",
+ "job_type": "53",
+ "result_str": "OK",
+ "start_ts": "2024-11-30T10:00:00",
+ "status_str": "FIN",
+ "type_str": "CommitAndPush",
+ "uname": "test@example.com",
+ },
+ ],
+ "total": 2,
+ "limit": 100,
+ "offset": 0,
+ }
+ self.session.request.return_value.json.return_value = mock_response
+
+ result = self.client.list_jobs(parent_id="parent1")
+
+ assert isinstance(result, JobListResponse)
+ assert len(result.data) == 1
+ assert result.data[0].parent_id == "parent1"
+ assert result.total == 1 # Total should be updated for filtered results
+
+ def test_get_job_status(self):
+ """Test getting status of a specific job."""
+ mock_response = {
+ "data": [
+ {
+ "cfg_id": "",
+ "details": '{"info":["Partial changes to commit: changes to configuration by administrators: pan-scm-sdk@1821351705.iam.panserviceaccount.com","Configuration committed successfully"],"errors":[],"warnings":[],"description":"this is a test"}',
+ "dev_serial": "",
+ "dev_uuid": "",
+ "device_name": "",
+ "device_type": "",
+ "end_ts": "2024-11-30 10:25:21",
+ "id": "1595",
+ "insert_ts": "2024-11-30 10:24:50",
+ "job_result": "2",
+ "job_status": "2",
+ "job_type": "53",
+ "last_update": "2024-11-30 10:25:22",
+ "opaque_int": "0",
+ "opaque_str": "",
+ "owner": "cfgserv",
+ "parent_id": "0",
+ "percent": "100",
+ "result_i": "2",
+ "result_str": "OK",
+ "session_id": "",
+ "start_ts": "2024-11-30 10:24:50",
+ "status_i": "2",
+ "status_str": "FIN",
+ "summary": "",
+ "type_i": "53",
+ "type_str": "CommitAndPush",
+ "uname": "pan-scm-sdk@1821351705.iam.panserviceaccount.com",
+ }
+ ]
+ }
+ self.session.request.return_value.json.return_value = mock_response
+
+ result = self.client.get_job_status("1595")
+
+ assert isinstance(result, JobStatusResponse)
+ assert len(result.data) == 1
+ assert result.data[0].id == "1595"
+ self.session.request.assert_called_once_with(
+ "GET",
+ "https://api.strata.paloaltonetworks.com/config/operations/v1/jobs/1595",
+ params=None,
+ )
+
+ def test_wait_for_job_success(self):
+ """Test waiting for job completion - successful case."""
+ with patch("time.time", side_effect=[0, 10, 20]):
+ with patch("time.sleep") as mock_sleep:
+ # First call returns running status, second call returns completed
+ self.session.request.return_value.json.side_effect = [
+ {
+ "data": [
+ {
+ "id": "test_job",
+ "status_str": "RUN",
+ "status_i": "1",
+ "start_ts": "2024-11-30T10:00:00",
+ "insert_ts": "2024-11-30T10:00:00",
+ "last_update": "2024-11-30T10:01:00",
+ "job_status": "1",
+ "job_type": "53",
+ "job_result": "0",
+ "details": "running",
+ "owner": "test",
+ "percent": "50",
+ "result_i": "0",
+ "result_str": "PENDING",
+ "type_i": "53",
+ "type_str": "CommitAndPush",
+ "uname": "test-user",
+ }
+ ]
+ },
+ {
+ "data": [
+ {
+ "id": "test_job",
+ "status_str": "FIN",
+ "status_i": "2",
+ "start_ts": "2024-11-30T10:00:00",
+ "insert_ts": "2024-11-30T10:00:00",
+ "last_update": "2024-11-30T10:02:00",
+ "job_status": "2",
+ "job_type": "53",
+ "job_result": "2",
+ "details": "completed",
+ "owner": "test",
+ "percent": "100",
+ "result_i": "2",
+ "result_str": "OK",
+ "type_i": "53",
+ "type_str": "CommitAndPush",
+ "uname": "test-user",
+ }
+ ]
+ },
+ ]
+
+ result = self.client.wait_for_job(
+ "test_job", timeout=30, poll_interval=10
+ )
+
+ assert isinstance(result, JobStatusResponse)
+ assert result.data[0].status_str == "FIN"
+ mock_sleep.assert_called_once_with(10)
+
+ def test_wait_for_job_timeout(self):
+ """Test waiting for job completion - timeout case."""
+ with patch("time.time", side_effect=[0, 301, 302]): # Simulate timeout
+ with patch("time.sleep"):
+ self.session.request.return_value.json.return_value = {
+ "data": [
+ {
+ "id": "test_job",
+ "status_str": "RUN",
+ "start_ts": "2024-11-30T10:00:00",
+ "insert_ts": "2024-11-30T10:00:00",
+ "last_update": "2024-11-30T10:01:00",
+ "job_status": "1",
+ "job_type": "53",
+ "job_result": "0",
+ "details": "running",
+ "owner": "test",
+ }
+ ]
+ }
+
+ with pytest.raises(TimeoutError) as exc_info:
+ self.client.wait_for_job("test_job", timeout=300)
+
+ assert "did not complete within 300 seconds" in str(exc_info.value)
+
+ def test_wait_for_job_empty_response(self):
+ """Test waiting for job completion - empty response handling."""
+ with patch("time.time", side_effect=[0, 10, 20]):
+ with patch("time.sleep") as mock_sleep:
+ self.session.request.return_value.json.return_value = {"data": []}
+
+ with pytest.raises(TimeoutError):
+ self.client.wait_for_job("test_job", timeout=15, poll_interval=5)
+
+ assert mock_sleep.call_count > 0
+
+
+class TestClientCommitMethods(TestClientBase):
+ def test_commit_basic(self):
+ """Test basic commit functionality without sync."""
+ mock_response = {
+ "success": True,
+ "job_id": "1586",
+ "message": "CommitAndPush job enqueued with jobid 1586",
+ }
+ self.session.request.return_value.json.return_value = mock_response
+
+ result = self.client.commit(
+ folders=["folder1", "folder2"],
+ description="Test commit",
+ admin=["admin@example.com"],
+ )
+
+ assert isinstance(result, CandidatePushResponseModel)
+ assert result.success is True
+ assert result.job_id == "1586"
+ self.session.request.assert_called_once_with(
+ "POST",
+ "https://api.strata.paloaltonetworks.com/config/operations/v1/config-versions/candidate:push",
+ json={
+ "folders": ["folder1", "folder2"],
+ "description": "Test commit",
+ "admin": ["admin@example.com"],
+ },
+ )
+
+ def test_commit_with_sync(self):
+ """Test commit with sync enabled."""
+ # Mock the client_id to return a string
+ self.client.oauth_client.auth_request.client_id = "test@example.com"
+
+ # Mock the initial commit response
+ commit_response = {
+ "success": True,
+ "job_id": "1586",
+ "message": "CommitAndPush job enqueued with jobid 1586",
+ }
+
+ # Mock the job status responses for wait_for_job
+ job_status_response = {
+ "data": [
+ {
+ "id": "1586",
+ "status_str": "FIN",
+ "status_i": "2",
+ "start_ts": "2024-11-30T10:00:00",
+ "insert_ts": "2024-11-30T10:00:00",
+ "last_update": "2024-11-30T10:02:00",
+ "job_status": "2",
+ "job_type": "53",
+ "job_result": "2",
+ "result_i": "2",
+ "result_str": "OK",
+ "details": "completed",
+ "owner": "test",
+ "percent": "100",
+ "type_i": "53",
+ "type_str": "CommitAndPush",
+ "uname": "test-user",
+ }
+ ]
+ }
+
+ # Setup mock responses
+ self.session.request.return_value.json.side_effect = [
+ commit_response,
+ job_status_response,
+ ]
+
+ with patch("time.sleep"): # Mock sleep to speed up test
+ result = self.client.commit(
+ folders=["folder1"],
+ description="Test commit with sync",
+ sync=True,
+ timeout=30,
+ )
+
+ assert isinstance(result, CandidatePushResponseModel)
+ assert result.success is True
+ assert result.job_id == "1586"
+ assert (
+ self.session.request.call_count == 2
+ ) # One for commit, one for status check
+
+ def test_commit_sync_timeout(self):
+ """Test commit with sync that times out."""
+ # Mock the client_id to return a string
+ self.client.oauth_client.auth_request.client_id = "test@example.com"
+
+ # Mock the initial commit response
+ commit_response = {
+ "success": True,
+ "job_id": "1586",
+ "message": "CommitAndPush job enqueued with jobid 1586",
+ }
+
+ # Mock the job status response for a running job
+ job_status_response = {
+ "data": [
+ {
+ "id": "1586",
+ "status_str": "RUN",
+ "status_i": "1",
+ "start_ts": "2024-11-30T10:00:00",
+ "insert_ts": "2024-11-30T10:00:00",
+ "last_update": "2024-11-30T10:01:00",
+ "job_status": "1",
+ "job_type": "53",
+ "job_result": "0",
+ "details": "running",
+ "owner": "test",
+ "percent": "50",
+ "result_i": "0",
+ "result_str": "PENDING",
+ "type_i": "53",
+ "type_str": "CommitAndPush",
+ "uname": "test-user",
+ }
+ ]
+ }
+
+ self.session.request.return_value.json.side_effect = [
+ commit_response,
+ job_status_response,
+ job_status_response, # Multiple status checks that show it's still running
+ ]
+
+ with patch("time.sleep"), patch("time.time", side_effect=[0, 31, 32]):
+ with pytest.raises(TimeoutError) as exc_info:
+ self.client.commit(
+ folders=["folder1"],
+ description="Test commit timeout",
+ sync=True,
+ timeout=30,
+ )
+
+ assert "did not complete within 30 seconds" in str(exc_info.value)
+
+ def test_commit_default_admin(self):
+ """Test commit using default admin (client_id)."""
+ # Mock the client_id to return a string instead of a MagicMock
+ self.client.oauth_client.auth_request.client_id = "test@example.com"
+
+ mock_response = {
+ "success": True,
+ "job_id": "1586",
+ "message": "CommitAndPush job enqueued with jobid 1586",
+ }
+ self.session.request.return_value.json.return_value = mock_response
+
+ result = self.client.commit(
+ folders=["folder1"], description="Test commit with default admin"
+ )
+
+ assert isinstance(result, CandidatePushResponseModel)
+ assert result.success is True
+
+ # Verify the admin field defaulted to client_id
+ self.session.request.assert_called_once_with(
+ "POST",
+ "https://api.strata.paloaltonetworks.com/config/operations/v1/config-versions/candidate:push",
+ json={
+ "folders": ["folder1"],
+ "description": "Test commit with default admin",
+ "admin": ["test@example.com"],
+ },
+ )
+
+ def test_commit_validation_error(self):
+ """Test commit with invalid parameters."""
+ # Mock the client_id to return a string
+ self.client.oauth_client.auth_request.client_id = "test@example.com"
+
+ with pytest.raises(ValueError) as exc_info:
+ self.client.commit(
+ folders=[], # Empty folders list should raise validation error
+ description="Test commit validation",
+ )
+
+ # Updated to match Pydantic's actual error message
+ assert "List should have at least 1 item after validation" in str(
+ exc_info.value
+ )