Skip to content

Commit

Permalink
Merge pull request #83 from DalgoT4D/configurable-endpoint
Browse files Browse the repository at this point in the history
configurable api endpoint
  • Loading branch information
siddhant3030 authored May 15, 2024
2 parents c430c9f + ff804da commit 8820263
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@


from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator

import requests
from typing import Any, Iterable, List, Mapping, Optional, Tuple
import json
from datetime import datetime
import requests

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream, IncrementalMixin
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.core import StreamData
from datetime import datetime


stream_json_schema = {
Expand Down Expand Up @@ -101,7 +100,7 @@ def path(
def update_state(self) -> None:
if self.latest_updated_date:
if self.latest_updated_date > self.state["updated_at"]:
self.state = {self.cursor_field: self.latest_updated_date}
self.state = {"updated_at": self.latest_updated_date}
self.latest_updated_date = None
return None

Expand Down Expand Up @@ -196,15 +195,14 @@ def state(self) -> Mapping[str, Any]:

@state.setter
def state(self, value: Mapping[str, Any]):
self.cursor_value = value[self.cursor_field]
self.cursor_value = value.get(self.cursor_field)
self._state = value


# Source
class SourceGlific(AbstractSource):
"""Glific source"""

API_URL = "https://api.staging.tides.coloredcow.com/api"
PAGINATION_LIMIT = 500

def check_connection(self, logger, config) -> Tuple[bool, any]:
Expand All @@ -226,7 +224,9 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
logger.info("Password missing")
return False, "Password missing"

endpoint = f"{self.API_URL}/v1/session"
api_url = config["glific_url"]

endpoint = f"{api_url}/v1/session"
auth_payload = {"user": {"phone": config["phone"], "password": config["password"]}}

response = requests.post(endpoint, json=auth_payload, timeout=30)
Expand All @@ -243,8 +243,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""

api_url = config["glific_url"]

# authenticate and get the credentials for all streams
endpoint = f"{self.API_URL}/v1/session"
endpoint = f"{api_url}/v1/session"
auth_payload = {"user": {"phone": config["phone"], "password": config["password"]}}
try:
response = requests.post(endpoint, json=auth_payload, timeout=30)
Expand All @@ -255,7 +257,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return []

# fetch the export config for organization/client/user
endpoint = f"{self.API_URL}"
endpoint = api_url
headers = {"authorization": credentials["access_token"]}

try:
Expand All @@ -274,7 +276,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
export_config = json.loads(data["data"]["organizationExportConfig"]["data"])
streams = []
for table in export_config["tables"]:
stream_obj = IncrementalGlificStream(table, self.API_URL, self.PAGINATION_LIMIT, credentials, config)
stream_obj = IncrementalGlificStream(table, api_url, self.PAGINATION_LIMIT, credentials, config)
streams.append(stream_obj)

return streams
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ connectionSpecification:
order: 2
default: "2023-01-26T11:11:11Z"
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$
glific_url:
type: string
title: Glific URL
description: URL of the Glific instance
order: 3
default: "https://api.staging.glific.com/api"
pattern: ^https:\/\/[a-zA-Z0-9.\-\/]+$

0 comments on commit 8820263

Please sign in to comment.