diff --git a/templates/connectors-native-sdk-template/Makefile b/templates/connectors-native-sdk-template/Makefile new file mode 100644 index 0000000..010876d --- /dev/null +++ b/templates/connectors-native-sdk-template/Makefile @@ -0,0 +1,113 @@ +CONNECTION=native_sdk_connection +USERNAME=$(shell whoami) +APP_PACKAGE_NAME=NATIVE_SDK_CONNECTOR_TEMPLATE_$(USERNAME) +INSTANCE_NAME=$(APP_PACKAGE_NAME)_INSTANCE +SCHEMA_NAME=TEST_SCHEMA +STAGE_NAME=TEST_STAGE +VERSION=1_0 +VERSION_DIR_PATH=@$(APP_PACKAGE_NAME).$(SCHEMA_NAME).$(STAGE_NAME)/$(VERSION) + +# *********** +# CORE FLOW +# *********** + +.PHONY: copy_internal_components +copy_internal_components: + ./gradlew copyInternalComponents + +.PHONY: copy_sdk_components +copy_sdk_components: + ./gradlew copySdkComponents + +.PHONY: prepare_app_package +prepare_app_package: + ./gradlew prepareAppPackage \ + -Pconnection=$(CONNECTION) \ + -PappPackage=$(APP_PACKAGE_NAME) \ + -Pschema=$(SCHEMA_NAME) \ + -Pstage=$(STAGE_NAME) + +.PHONY: deploy_connector +deploy_connector: + ./gradlew deployConnector \ + -Pconnection=$(CONNECTION) \ + -PappPackage=$(APP_PACKAGE_NAME) \ + -Pschema=$(SCHEMA_NAME) \ + -Pstage=$(STAGE_NAME) \ + -PappVersion=$(VERSION) + +# **************************************** +# CREATE INSTANCE FROM VERSION DIRECTORY +# **************************************** + +.PHONY: create_app_instance_from_version_dir +create_app_instance_from_version_dir: + ./gradlew createAppInstance \ + -Pconnection=$(CONNECTION) \ + -PappPackage=$(APP_PACKAGE_NAME) \ + -PversionDirPath=$(VERSION_DIR_PATH) + +.PHONY: complex_create_app_instance_from_version_dir +complex_create_app_instance_from_version_dir: + make copy_internal_components + make copy_sdk_components + make prepare_app_package + make deploy_connector + make create_app_instance_from_version_dir + +# ********************************** +# CREATE INSTANCE FROM APP VERSION +# ********************************** + +.PHONY: create_new_version +create_new_version: + ./gradlew createNewVersion \ + -Pconnection=$(CONNECTION) \ + -PappPackage=$(APP_PACKAGE_NAME) \ + -PversionDirPath=$(VERSION_DIR_PATH) \ + -PappVersion=$(VERSION) + +.PHONY: create_app_instance_from_app_version +create_app_instance_from_app_version: + ./gradlew createAppInstance \ + -Pconnection=$(CONNECTION) \ + -PappPackage=$(APP_PACKAGE_NAME) \ + -PappVersion=$(VERSION) + +.PHONY: complex_create_app_instance_from_app_version +complex_create_app_instance_from_app_version: + make copy_internal_components + make copy_sdk_components + make prepare_app_package + make deploy_connector + make create_new_version + make create_app_instance_from_app_version + +# ****************** +# ADDITIONAL TASKS +# ****************** + +.PHONY: drop_application_package +drop_application_package: + snowsql -c $(CONNECTION) \ + -q "DROP APPLICATION PACKAGE IF EXISTS $(APP_PACKAGE_NAME)" + +.PHONY: drop_application_instance +drop_application_instance: + snowsql -c $(CONNECTION) \ + -q "DROP APPLICATION IF EXISTS $(INSTANCE_NAME) CASCADE" + +.PHONY: drop_application +drop_application: + snowsql -c $(CONNECTION) \ + -q "DROP APPLICATION IF EXISTS $(INSTANCE_NAME) CASCADE; DROP APPLICATION PACKAGE IF EXISTS $(APP_PACKAGE_NAME)" + +.PHONY: reinstall_application_from_version_dir +reinstall_application_from_version_dir: + make drop_application + make complex_create_app_instance_from_version_dir + +.PHONY: reinstall_application_from_app_version +reinstall_application_from_app_version: + make drop_application + make complex_create_app_instance_from_app_version diff --git a/templates/connectors-native-sdk-template/app/manifest.yml b/templates/connectors-native-sdk-template/app/manifest.yml new file mode 100644 index 0000000..5725fff --- /dev/null +++ b/templates/connectors-native-sdk-template/app/manifest.yml @@ -0,0 +1,27 @@ +# Copyright (c) 2024 Snowflake Inc. + +manifest_version: 1 +artifacts: + setup_script: setup.sql + extension_code: true + default_streamlit: STREAMLIT.NATIVE_SDK_TEMPLATE_ST +version: + name: "1.0" + label: "connectors-native-sdk-connector-template" + comment: "A template providing a working connector out of the box to accelerate initial development" +configuration: + trace_level: ON_EVENT + log_level: info +privileges: + - EXECUTE TASK: + description: "Needed to run ingestion tasks" + - CREATE DATABASE: + description: "Needed to create a database for ingested data" +references: + - WAREHOUSE_REFERENCE: + label: "Warehouse used for ingestion" + description: "Warehouse, which will be used to schedule ingestion tasks" + privileges: + - USAGE + object_type: WAREHOUSE + register_callback: PUBLIC.REGISTER_REFERENCE diff --git a/templates/connectors-native-sdk-template/app/setup.sql b/templates/connectors-native-sdk-template/app/setup.sql new file mode 100644 index 0000000..cd33f8c --- /dev/null +++ b/templates/connectors-native-sdk-template/app/setup.sql @@ -0,0 +1,130 @@ +-- Copyright (c) 2024 Snowflake Inc. + +-- CONNECTORS-NATIVE-SDK +EXECUTE IMMEDIATE FROM 'native-connectors-sdk-components/all.sql'; +EXECUTE IMMEDIATE FROM 'native-connectors-sdk-components/task_reactor.sql'; + +-- CUSTOM CONNECTOR OBJECTS +CREATE OR ALTER VERSIONED SCHEMA STREAMLIT; +GRANT USAGE ON SCHEMA STREAMLIT TO APPLICATION ROLE ADMIN; + +CREATE OR REPLACE STREAMLIT STREAMLIT.NATIVE_SDK_TEMPLATE_ST + FROM '/streamlit' + MAIN_FILE = 'streamlit_app.py'; +GRANT USAGE ON STREAMLIT STREAMLIT.NATIVE_SDK_TEMPLATE_ST TO APPLICATION ROLE ADMIN; + +-- SNOWFLAKE REFERENCE MECHANISM +CREATE PROCEDURE PUBLIC.REGISTER_REFERENCE(ref_name STRING, operation STRING, ref_or_alias STRING) + RETURNS STRING + LANGUAGE SQL + AS + BEGIN + CASE (operation) + WHEN 'ADD' THEN + SELECT SYSTEM$SET_REFERENCE(:ref_name, :ref_or_alias); + WHEN 'REMOVE' THEN + SELECT SYSTEM$REMOVE_REFERENCE(:ref_name); + WHEN 'CLEAR' THEN + SELECT SYSTEM$REMOVE_REFERENCE(:ref_name); + ELSE RETURN 'unknown operation: ' || operation; + END CASE; + RETURN NULL; + END; +GRANT USAGE ON PROCEDURE PUBLIC.REGISTER_REFERENCE(STRING, STRING, STRING) TO APPLICATION ROLE ADMIN; + +-----------------WIZARD----------------- +-- PREREQUISITES +MERGE INTO STATE.PREREQUISITES AS dest + USING (SELECT * FROM VALUES + ('1', + 'Sample prerequisite', + 'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script', + 'https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/prerequisites', + NULL, + NULL, + 1 + ) + ) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position) + ON dest.id = src.id + WHEN NOT MATCHED THEN + INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position) + VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position); + +-- CONNECTION CONFIGURATION +CREATE OR REPLACE PROCEDURE PUBLIC.SET_CONNECTION_CONFIGURATION(connection_configuration VARIANT) + RETURNS VARIANT + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0') + IMPORTS = ('/connectors-native-sdk-template.jar', '/connectors-native-sdk.jar') + HANDLER = 'com.snowflake.connectors.example.configuration.connection.TemplateConnectionConfigurationHandler.setConnectionConfiguration'; +GRANT USAGE ON PROCEDURE PUBLIC.SET_CONNECTION_CONFIGURATION(VARIANT) TO APPLICATION ROLE ADMIN; + +CREATE OR REPLACE PROCEDURE PUBLIC.TEST_CONNECTION() + RETURNS VARIANT + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0') + IMPORTS = ('/connectors-native-sdk-template.jar', '/connectors-native-sdk.jar') + HANDLER = 'com.snowflake.connectors.example.configuration.connection.TemplateConnectionValidator.testConnection'; + +-- FINALIZE CONFIGURATION +CREATE OR REPLACE PROCEDURE PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION(CUSTOM_CONFIGURATION VARIANT) + RETURNS VARIANT + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0') + IMPORTS = ('/connectors-native-sdk-template.jar', '/connectors-native-sdk.jar') + HANDLER = 'com.snowflake.connectors.example.configuration.finalize.TemplateFinalizeConnectorConfigurationCustomHandler.finalizeConnectorConfiguration'; +GRANT USAGE ON PROCEDURE PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION(VARIANT) TO APPLICATION ROLE ADMIN; + +-----------------TASK REACTOR----------------- +CREATE OR REPLACE PROCEDURE PUBLIC.TEMPLATE_WORKER(worker_id number, task_reactor_schema string) + RETURNS STRING + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:latest') + IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar') + HANDLER = 'com.snowflake.connectors.example.ingestion.TemplateWorker.executeWork'; + +CALL TASK_REACTOR.CREATE_INSTANCE_OBJECTS( + 'EXAMPLE_CONNECTOR_TASK_REACTOR', + 'PUBLIC.TEMPLATE_WORKER', + 'VIEW', + 'EXAMPLE_CONNECTOR_TASK_REACTOR.WORK_SELECTOR_VIEW', + -- TODO: Below NULL causes the application to create and use default EMPTY_EXPIRED_WORK_SELECTOR. + -- If Task Reactor work items shall be removed after some time of not being taken up by the instance, + -- then this default view can be replaced with the custom implementation + -- or this parameter can be changed to the name of the custom view. + -- You can pass the name of the view that will be created even after this procedure finishes its execution. + NULL + ); + +CREATE VIEW EXAMPLE_CONNECTOR_TASK_REACTOR.WORK_SELECTOR_VIEW AS SELECT * FROM EXAMPLE_CONNECTOR_TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID; + +CREATE OR REPLACE PROCEDURE PUBLIC.RUN_SCHEDULER_ITERATION() + RETURNS VARIANT + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0') + IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar') + HANDLER = 'com.snowflake.connectors.example.integration.SchedulerIntegratedWithTaskReactorHandler.runIteration'; + +-----------------LIFECYCLE----------------- +CREATE OR REPLACE PROCEDURE PUBLIC.PAUSE_CONNECTOR() + RETURNS VARIANT + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0') + IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar') + HANDLER = 'com.snowflake.connectors.example.lifecycle.pause.PauseConnectorCustomHandler.pauseConnector'; +GRANT USAGE ON PROCEDURE PUBLIC.PAUSE_CONNECTOR() TO APPLICATION ROLE ADMIN; + +CREATE OR REPLACE PROCEDURE PUBLIC.RESUME_CONNECTOR() + RETURNS VARIANT + LANGUAGE JAVA + RUNTIME_VERSION = '11' + PACKAGES = ('com.snowflake:snowpark:1.11.0') + IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar') + HANDLER = 'com.snowflake.connectors.example.lifecycle.resume.ResumeConnectorCustomHandler.resumeConnector'; +GRANT USAGE ON PROCEDURE PUBLIC.RESUME_CONNECTOR() TO APPLICATION ROLE ADMIN; diff --git a/templates/connectors-native-sdk-template/app/streamlit/daily_use/daily_use_layout.py b/templates/connectors-native-sdk-template/app/streamlit/daily_use/daily_use_layout.py new file mode 100644 index 0000000..e92725c --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/daily_use/daily_use_layout.py @@ -0,0 +1,19 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from daily_use.home_page import home_page +from daily_use.data_sync_page import data_sync_page +from daily_use.settings_page import settings_page + + +def daily_use_page(): + home_tab, data_sync_tab, settings_tab = st.tabs(["Home", "Data Sync", "Settings"]) + + with home_tab: + home_page() + + with data_sync_tab: + data_sync_page() + + with settings_tab: + settings_page() diff --git a/templates/connectors-native-sdk-template/app/streamlit/daily_use/data_sync_page.py b/templates/connectors-native-sdk-template/app/streamlit/daily_use/data_sync_page.py new file mode 100644 index 0000000..f27b6db --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/daily_use/data_sync_page.py @@ -0,0 +1,44 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from daily_use.sync_status_bar import sync_status_bar +from native_sdk_api.resource_management import ( + create_resource, + fetch_resources +) + + +def queue_resource(): + # TODO: add additional properties here and pass them to create_resource function + resource_name = st.session_state.get("resource_name") + + if not resource_name: + st.error("Resource name cannot be empty") + return + + result = create_resource(resource_name) + if result.is_ok(): + st.success("Resource created") + else: + st.error(result.get_message()) + + +def data_sync_page(): + sync_status_bar() + st.subheader("Enabled resources") + df = fetch_resources().to_pandas() + + with st.form("add_new_resource_form", clear_on_submit=True): + st.caption("Enable new resource. Fields required to enable a parameter might differ between the various source systems, so the field list will have to be customized here.") + st.caption("For more information on resource definitions check the [documentation](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/resource_definition_and_ingestion_processes) and the [reference](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/resource_definition_and_ingestion_processes_reference)") + # TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction + st.text_input( + "Resource name", + key="resource_name", + ) + _ = st.form_submit_button( + "Queue ingestion", + on_click=queue_resource + ) + + st.table(df) diff --git a/templates/connectors-native-sdk-template/app/streamlit/daily_use/home_page.py b/templates/connectors-native-sdk-template/app/streamlit/daily_use/home_page.py new file mode 100644 index 0000000..d3edd6c --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/daily_use/home_page.py @@ -0,0 +1,26 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from native_sdk_api.observability import get_aggregated_connector_stats + + +def home_page(): + data_frame = get_aggregated_connector_stats() + if not data_frame.empty: + st.vega_lite_chart( + data_frame, + { + "mark": { + "type": "bar", + "width": { + "band": 0.8 if len(data_frame.index) == 1 else 0.95, + }, + }, + "encoding": { + "x": {"field": "RUN_DATE", "type": "temporal", "timeUnit": "dayhours"}, + "y": {"field": "UPDATED_ROWS", "type": "quantitative", "aggregate": "mean"}, + }, + }, + use_container_width=True) + else: + st.info("No ingested rows in the chosen period of time. Start the ingestion in order to display the chart.") diff --git a/templates/connectors-native-sdk-template/app/streamlit/daily_use/settings_page.py b/templates/connectors-native-sdk-template/app/streamlit/daily_use/settings_page.py new file mode 100644 index 0000000..93ed9a4 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/daily_use/settings_page.py @@ -0,0 +1,72 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from native_sdk_api.connector_config import get_connector_configuration +from native_sdk_api.connection_config import get_connection_configuration +from utils.permission_sdk_utils import ( + get_held_account_privileges, + get_warehouse_ref +) + + +def settings_page(): + connector, connection = st.tabs(["Connector configuration", "Connection configuration"]) + with connector: + connector_config_page() + with connection: + connection_config_page() + + +def connector_config_page(): + current_config = get_connector_configuration() + + warehouse_reference = get_warehouse_ref()[0] + granted_privileges = ", ".join(get_held_account_privileges()) + destination_database = current_config.get("destination_database", "") + destination_schema = current_config.get("destination_schema", "") + + st.header("Connector configuration") + st.caption("Here you can see the general connector configuration saved during the connector configuration step of " + "the Wizard. If other available properties were used then they need to be displayed here as well.") + st.divider() + + st.text_input( + "Granted privileges:", + value=granted_privileges, + disabled=True + ) + st.text_input( + "Warehouse reference:", + value=warehouse_reference, + disabled=True + ) + st.text_input( + "Destination database:", + value=destination_database, + disabled=True + ) + st.text_input( + "Destination schema:", + value=destination_schema, + disabled=True + ) + st.divider() + + +def connection_config_page(): + current_config = get_connection_configuration() + + # TODO: implement the display for all the custom properties defined in the connection configuration step + custom_property = current_config.get("custom_connection_property", "") + + st.header("Connector configuration") + st.caption("Here you can see the connector connection configuration saved during the connection configuration step " + "of the Wizard. If some new property was introduced it has to be added here to display.") + st.divider() + + st.text_input( + "Custom connection property:", + value=custom_property, + disabled=True + ) + st.divider() diff --git a/templates/connectors-native-sdk-template/app/streamlit/daily_use/sync_status_bar.py b/templates/connectors-native-sdk-template/app/streamlit/daily_use/sync_status_bar.py new file mode 100644 index 0000000..d0940f5 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/daily_use/sync_status_bar.py @@ -0,0 +1,89 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +import pandas as pd +import json +from utils.sql_utils import (call_procedure, select_all_from) +from native_sdk_api.connector_config import get_connector_configuration + + +def sync_status_bar(): + with st.form("sync_status"): + global_schedule_config = json.loads(get_connector_configuration().get("global_schedule")) + frame = pd.DataFrame(select_all_from("PUBLIC.SYNC_STATUS")) + status = frame.iloc[0]['STATUS'] + last_synced_at = frame.iloc[0]['LAST_SYNCED_AT'] + + header_col, button_col = st.columns([3, 2]) + with header_col: + st.text_input( + "Global schedule:", + value=f"{global_schedule_config['scheduleType']}: {global_schedule_config['scheduleDefinition']}", + disabled=True + ) + + with button_col: + display_start_pause_button(status) + __display_sync_status(status, last_synced_at) + + +def __display_sync_status(status: str, timestamp): + if status == 'SYNCING_DATA': + st.form_submit_button( + f"**:blue[Syncing data]**", + disabled=True, + use_container_width=True + ) + elif status == 'LAST_SYNCED': + st.form_submit_button( + f"**:green[Last sync: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}]**", + disabled=True, + use_container_width=True + ) + elif status == 'NOT_SYNCING': + st.form_submit_button( + f"**:grey[Not syncing]**", + disabled=True, + use_container_width=True + ) + elif status == 'PAUSED': + st.form_submit_button( + f"**:grey[Syncing paused]**", + disabled=True, + use_container_width=True + ) + else: + st.form_submit_button( + f"**:red[Unknown sync status]**", + disabled=True, + use_container_width=True + ) + + +def display_start_pause_button(status): + if status in ['SYNCING_DATA', 'LAST_SYNCED', 'NOT_SYNCING']: + st.form_submit_button( + ":black_medium_square: Pause connector", + type="secondary", + on_click=__pause_connector, + use_container_width=True + ) + elif status in ['PAUSED']: + st.form_submit_button( + ":arrow_forward: Resume connector", + type="primary", + on_click=__resume_connector, + use_container_width=True + ) + else: + st.form_submit_button("Unknown sync status", disabled=True) + + +def __pause_connector(): + call_procedure('PUBLIC.PAUSE_CONNECTOR') + st.session_state['status'] = 'NOT_SYNCING' + + +def __resume_connector(): + call_procedure('PUBLIC.RESUME_CONNECTOR') + st.session_state['status'] = 'SYNCING_DATA' diff --git a/templates/connectors-native-sdk-template/app/streamlit/environment.yml b/templates/connectors-native-sdk-template/app/streamlit/environment.yml new file mode 100644 index 0000000..714c7eb --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/environment.yml @@ -0,0 +1,5 @@ +name: sf_env +channels: + - snowflake +dependencies: + - snowflake-native-apps-permission diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connection_config.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connection_config.py new file mode 100644 index 0000000..ce27cf2 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connection_config.py @@ -0,0 +1,21 @@ +# Copyright (c) 2024 Snowflake Inc. + +from utils.sf_utils import escape_identifier +from utils.sql_utils import call_procedure, variant_argument +from native_sdk_api.connector_config_view import get_configuration + + +def set_connection_configuration(custom_connection_property: str): + # TODO: this part of the code sends the config to the backend so all custom properties need to be added here + config = { + "custom_connection_property": escape_identifier(custom_connection_property), + } + + return call_procedure( + "PUBLIC.SET_CONNECTION_CONFIGURATION", + [variant_argument(config)] + ) + + +def get_connection_configuration(): + return get_configuration("connection_configuration") diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_config.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_config.py new file mode 100644 index 0000000..4b97b97 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_config.py @@ -0,0 +1,26 @@ +# Copyright (c) 2024 Snowflake Inc. + +from utils.sf_utils import escape_identifier +from utils.sql_utils import call_procedure, variant_argument +from native_sdk_api.connector_config_view import get_configuration + + +def configure_connector(dest_db: str, dest_schema: str): + # TODO: connector configuration supports additional parameters like: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username + config = { + "destination_database": escape_identifier(dest_db), + "destination_schema": escape_identifier(dest_schema), + "global_schedule": { + "scheduleType": "CRON", + "scheduleDefinition": "*/1 * * * *" + } + } + + return call_procedure( + "PUBLIC.CONFIGURE_CONNECTOR", + [variant_argument(config)] + ) + + +def get_connector_configuration(): + return get_configuration("connector_configuration") diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_config_view.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_config_view.py new file mode 100644 index 0000000..e4e08bc --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_config_view.py @@ -0,0 +1,18 @@ +# Copyright (c) 2024 Snowflake Inc. + +from snowflake.snowpark.context import get_active_session + + +def get_configuration(config_group: str): + config_rows = ( + get_active_session() + .sql(f"SELECT config_key, value FROM PUBLIC.CONNECTOR_CONFIGURATION WHERE " + f"config_group = '{config_group}'") + .collect() + ) + + config = {} + for row in config_rows: + config[row[0]] = row[1] + + return config diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_response.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_response.py new file mode 100644 index 0000000..6e5857d --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_response.py @@ -0,0 +1,20 @@ +# Copyright (c) 2024 Snowflake Inc. + +import json + + +class ConnectorResponse: + def __init__(self, response_as_json: str = "{}"): + response = json.loads(response_as_json) + + self._response_code: str = response.get("response_code") + self._message: str = response.get("message") + + def is_ok(self): + return self._response_code == "OK" + + def get_response_code(self): + return self._response_code + + def get_message(self): + return self._message diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_status.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_status.py new file mode 100644 index 0000000..7f0415e --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/connector_status.py @@ -0,0 +1,65 @@ +# Copyright (c) 2024 Snowflake Inc. + +import json +import streamlit as st +from enum import Enum +from snowflake.snowpark.context import get_active_session + + +def load_connector_statuses(): + statuses = json.loads( + get_active_session() + .sql("CALL PUBLIC.GET_CONNECTOR_STATUS()") + .collect()[0][0] + ) + + st.session_state["connector_status"] = ConnectorStatus[statuses.get("status")] + st.session_state["configuration_status"] = ConfigurationStatus[ + statuses.get("configurationStatus") + ] + + +def get_connector_status(): + return st.session_state["connector_status"] + + +def get_configuration_status(): + return st.session_state["configuration_status"] + + +def is_connector_in_error(): + return get_connector_status() == ConnectorStatus.ERROR + + +def is_connector_configuring(): + return get_connector_status() == ConnectorStatus.CONFIGURING + + +def is_connector_configured(): + return get_configuration_status() in [ + ConfigurationStatus.CONFIGURED, + ConfigurationStatus.CONNECTED + ] + + +def is_connection_configured(): + return get_configuration_status() in [ + ConfigurationStatus.CONNECTED + ] + + +class ConnectorStatus(Enum): + CONFIGURING = "CONFIGURING" + STARTING = "STARTING" + STARTED = "STARTED" + PAUSING = "PAUSING" + PAUSED = "PAUSED" + ERROR = "ERROR" + + +class ConfigurationStatus(Enum): + INSTALLED = "INSTALLED" + PREREQUISITES_DONE = "PREREQUISITES_DONE" + CONFIGURED = "CONFIGURED" + CONNECTED = "CONNECTED" + FINALIZED = "FINALIZED" diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/finalize_config.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/finalize_config.py new file mode 100644 index 0000000..cb7af89 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/finalize_config.py @@ -0,0 +1,12 @@ +# Copyright (c) 2024 Snowflake Inc. + +from utils.sql_utils import call_procedure, variant_argument + + +def finalize_connector_configuration(custom_property: str): + # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure. + config = {"custom_property": custom_property} + return call_procedure( + "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION", + [variant_argument(config)] + ) diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/observability.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/observability.py new file mode 100644 index 0000000..103e997 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/observability.py @@ -0,0 +1,13 @@ +# Copyright (c) 2024 Snowflake Inc. + +from snowflake.snowpark.context import get_active_session +import pandas as pd + + +def get_aggregated_connector_stats(): + session = get_active_session() + result = session.sql( + "SELECT RUN_DATE, UPDATED_ROWS FROM PUBLIC.AGGREGATED_CONNECTOR_STATS " + "WHERE RUN_DATE > DATEADD(DAY, -1, CURRENT_TIMESTAMP());" + ).collect() + return pd.DataFrame(result) diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/prerequisites.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/prerequisites.py new file mode 100644 index 0000000..1e64972 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/prerequisites.py @@ -0,0 +1,59 @@ +# Copyright (c) 2024 Snowflake Inc. + +from snowflake.snowpark.context import get_active_session +from utils.sql_utils import call_procedure, varchar_argument + + +session = get_active_session() + + +def fetch_prerequisites(): + result = session.table("PUBLIC.PREREQUISITES").collect() + output = [] + # TODO: prerequisite can contain some more fields, for more info check the documentation https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/prerequisites_reference + for r in result: + output.append(Prerequisite(r["ID"], r["TITLE"], r["DESCRIPTION"], r["DOCUMENTATION_URL"], r["IS_COMPLETED"])) + return output + + +def mark_all_prerequisites_as_done(): + return call_procedure("PUBLIC.MARK_ALL_PREREQUISITES_AS_DONE") + + +def update_prerequisite(prerequisite_id: str, is_completed: str): + return call_procedure( + "PUBLIC.UPDATE_PREREQUISITE", + [ + varchar_argument(prerequisite_id), + str(is_completed) + ] + ) + + +def complete_prerequisite_step(): + return call_procedure("PUBLIC.COMPLETE_PREREQUISITES_STEP") + + +class Prerequisite: + # TODO: In case some additional fields were extracted from the prerequisites table, then they need to be mapped here. + def __init__(self, prerequisite_id: str, title: str, description: str, documentation_url: str, is_completed: bool): + self._prerequisite_id = prerequisite_id + self._title = title + self._description = description + self._documentation_url = documentation_url + self._is_completed = is_completed + + def get_prerequisite_id(self): + return self._prerequisite_id + + def get_title(self): + return self._title + + def get_description(self): + return self._description + + def get_documentation_url(self): + return self._documentation_url + + def get_is_completed(self): + return self._is_completed diff --git a/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/resource_management.py b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/resource_management.py new file mode 100644 index 0000000..254304a --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/native_sdk_api/resource_management.py @@ -0,0 +1,55 @@ +# Copyright (c) 2024 Snowflake Inc. + +import string, random + +from utils.sql_utils import call_procedure, varchar_argument, variant_argument, variant_list_argument +from snowflake.snowpark.context import get_active_session + +session = get_active_session() + +def create_resource(resource_name): + ingestion_config = [{ + "id": "ingestionConfig", + "ingestionStrategy": "INCREMENTAL", + # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition. + "scheduleType": "INTERVAL", + "scheduleDefinition": "60m" + }] + # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique. + resource_id = { + "resource_name": resource_name, + } + id = f"{resource_name}_{random_suffix()}" + + # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata: + # resource_metadata = { + # "some_additional_parameter": some_additional_parameter + # } + + return call_procedure("PUBLIC.CREATE_RESOURCE", + [ + varchar_argument(id), + variant_argument(resource_id), + variant_list_argument(ingestion_config), + varchar_argument(id), + "true" + # variant_argument(resource_metadata) + ]) + + +def fetch_resources(): + # TODO: To modify the information shown about each resource in the table this query needs to be modified + return session.sql( + """ + SELECT + id, + resource_id:resource_name::string AS resource_name, + IS_ENABLED + FROM PUBLIC.INGESTION_DEFINITIONS + """ + ) + + +def random_suffix(): + suffix = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(9)) + return suffix diff --git a/templates/connectors-native-sdk-template/app/streamlit/streamlit_app.py b/templates/connectors-native-sdk-template/app/streamlit/streamlit_app.py new file mode 100644 index 0000000..6404531 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/streamlit_app.py @@ -0,0 +1,20 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from native_sdk_api.connector_status import ( + load_connector_statuses, + is_connector_in_error, + is_connector_configuring +) +from daily_use.daily_use_layout import daily_use_page +from wizard.wizard_layout import wizard_page + + +load_connector_statuses() + +if is_connector_in_error(): + st.error("Unexpected error has occurred, please reinstall the application") +elif is_connector_configuring(): + wizard_page() +else: + daily_use_page() diff --git a/templates/connectors-native-sdk-template/app/streamlit/utils/permission_sdk_utils.py b/templates/connectors-native-sdk-template/app/streamlit/utils/permission_sdk_utils.py new file mode 100644 index 0000000..789de5c --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/utils/permission_sdk_utils.py @@ -0,0 +1,27 @@ +# Copyright (c) 2024 Snowflake Inc. + +import snowflake.permissions as permissions + + +REQUIRED_PRIVILEGES = ["CREATE DATABASE", "EXECUTE TASK"] +WAREHOUSE_REF = "WAREHOUSE_REFERENCE" + + +def get_held_account_privileges(): + return permissions.get_held_account_privileges(REQUIRED_PRIVILEGES) + + +def get_missing_privileges(): + return permissions.get_missing_account_privileges(REQUIRED_PRIVILEGES) + + +def request_required_privileges(): + permissions.request_account_privileges(get_missing_privileges()) + + +def get_warehouse_ref(): + return permissions.get_reference_associations(WAREHOUSE_REF) + + +def request_warehouse_ref(): + permissions.request_reference(WAREHOUSE_REF) diff --git a/templates/connectors-native-sdk-template/app/streamlit/utils/sf_utils.py b/templates/connectors-native-sdk-template/app/streamlit/utils/sf_utils.py new file mode 100644 index 0000000..995a2ae --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/utils/sf_utils.py @@ -0,0 +1,33 @@ +# Copyright (c) 2024 Snowflake Inc. + +import re + + +__NON_QUOTED_RAW_PATTERN = "[a-zA-Z_][\\w$]*" +__QUOTED_RAW_PATTERN = "\"([^\"]|\"\")+\"" +__IDENTIFIER_RAW_PATTERN = f"({__NON_QUOTED_RAW_PATTERN})|({__QUOTED_RAW_PATTERN})" +__IDENTIFIER_PATTERN = re.compile(__IDENTIFIER_RAW_PATTERN) +__OBJECT_NAME_PATTERN = re.compile( + f"^(({__IDENTIFIER_RAW_PATTERN}\\.){0,2}|({__IDENTIFIER_RAW_PATTERN}\\.\\.))" + f"{__IDENTIFIER_RAW_PATTERN}$" +) +__FULLY_QUALIFIED_OBJECT_NAME_PATTERN = re.compile( + f"^{__IDENTIFIER_RAW_PATTERN}\\.{__IDENTIFIER_RAW_PATTERN}{0,1}" + f"\\.{__IDENTIFIER_RAW_PATTERN}$" +) + + +def validate_identifier(identifier: str): + return __IDENTIFIER_PATTERN.match(identifier) is not None + + +def validate_object_name(name: str): + return __OBJECT_NAME_PATTERN.match(name) is not None + + +def validate_fully_qualified_object_name(name: str): + return __FULLY_QUALIFIED_OBJECT_NAME_PATTERN.match(name) is not None + + +def escape_identifier(identifier: str): + return identifier.replace("'", "\\'").replace("\"", "\\\"") diff --git a/templates/connectors-native-sdk-template/app/streamlit/utils/sql_utils.py b/templates/connectors-native-sdk-template/app/streamlit/utils/sql_utils.py new file mode 100644 index 0000000..c6f67d1 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/utils/sql_utils.py @@ -0,0 +1,34 @@ +# Copyright (c) 2024 Snowflake Inc. + +import json +from native_sdk_api.connector_response import ConnectorResponse +from snowflake.snowpark.context import get_active_session + + +def select_all_from(table_name): + return get_active_session().sql(f"SELECT * FROM {table_name}").collect() + + +def call_procedure(procedure_name: str, arguments: list = ()) -> ConnectorResponse: + response = ( + get_active_session() + .sql(f"CALL {procedure_name}({__to_procedure_input(arguments)})") + .collect()[0][0] + ) + return ConnectorResponse(response) + + +def varchar_argument(argument: str): + return f"'{argument}'" + + +def variant_argument(arguments: dict): + return "PARSE_JSON('" + json.dumps(arguments) + "')" + + +def variant_list_argument(arguments: list): + return "PARSE_JSON('" + json.dumps(arguments) + "')" + + +def __to_procedure_input(arguments: list): + return ", ".join(arguments) diff --git a/templates/connectors-native-sdk-template/app/streamlit/utils/ui_utils.py b/templates/connectors-native-sdk-template/app/streamlit/utils/ui_utils.py new file mode 100644 index 0000000..03c6b0f --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/utils/ui_utils.py @@ -0,0 +1,20 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from native_sdk_api.connector_response import ConnectorResponse + + +def show_vertical_space(empty_lines: int): + for _ in range(empty_lines): + st.write("") + + +def show_error(error: str): + st.error(error) + + +def show_error_response(response: ConnectorResponse, called_procedure: str): + st.error( + f"An error response with code {response.get_response_code()} has been returned by " + f"{called_procedure}: {response.get_message()}" + ) diff --git a/templates/connectors-native-sdk-template/app/streamlit/wizard/connection_config.py b/templates/connectors-native-sdk-template/app/streamlit/wizard/connection_config.py new file mode 100644 index 0000000..c523f9d --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/wizard/connection_config.py @@ -0,0 +1,77 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +import wizard.wizard_step as ws +from utils.ui_utils import show_vertical_space, show_error, show_error_response +from native_sdk_api.connector_status import is_connection_configured +from native_sdk_api.connection_config import ( + set_connection_configuration, + get_connection_configuration +) + + +def connection_config_page(): + load_current_config() + + st.header("Connect to External API") + st.caption( + "To setup the connection to the source system you might need to provide some additional configuration. " + "If you need external access integration or some secret they have to be created in the account and then specified here." + ) + st.caption("For more information please check the [documentation](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/connection_configuration) and the [reference](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/connection_configuration_reference)") + st.divider() + + st.subheader("Custom connection configuration parameter") + input_col, _ = st.columns([2, 1]) + with input_col: + st.text_input("", key="custom_connection_property", label_visibility="collapsed") + st.caption( + "Here you can specify some additional connection parameters like secrets, external access integrations etc. " + "This configuration step will save them and test the connection to the source system. " + "However, this connection check and any additional logic have to be implemented first. " + "To implement those classes look for the following comments in the Java code of the application" + ) + st.caption("- IMPLEMENT ME test connection") + st.caption("- IMPLEMENT ME connection configuration validate") + st.caption("- IMPLEMENT ME connection callback") + # TODO: Additional configuration properties can be added to the UI like this: + # st.subheader("Additional connection parameter") + # input_col, _ = st.columns([2, 1]) + # with input_col: + # st.text_input("", key="additional_connection_property", label_visibility="collapsed") + # st.caption( + # "Some description of the additional property" + # ) + show_vertical_space(3) + + _, btn_col = st.columns([3.45, 0.55]) + with btn_col: + st.button( + "Connect", + on_click=finish_config, + type="primary" + ) + + +def load_current_config(): + if is_connection_configured(): + current_config = get_connection_configuration() + + if not st.session_state.get("custom_connection_property"): + st.session_state["custom_connection_property"] = current_config.get("custom_connection_property", "") + + +def finish_config(): + try: + # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function. + # The properties can also be validated, for example, check whether they are not blank strings etc. + response = set_connection_configuration( + custom_connection_property=st.session_state["custom_connection_property"], + ) + + if response.is_ok(): + ws.change_step(ws.FINALIZE_CONFIG) + else: + show_error_response(response=response, called_procedure="SET_CONNECTION_CONFIGURATION") + except: + show_error("Unexpected error occurred, correct the provided data and try again") diff --git a/templates/connectors-native-sdk-template/app/streamlit/wizard/connector_config.py b/templates/connectors-native-sdk-template/app/streamlit/wizard/connector_config.py new file mode 100644 index 0000000..54ab70d --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/wizard/connector_config.py @@ -0,0 +1,141 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +import wizard.wizard_step as ws +from utils.sf_utils import validate_identifier +from utils.ui_utils import show_vertical_space, show_error, show_error_response +from native_sdk_api.connector_status import is_connector_configured +from native_sdk_api.connector_config import configure_connector, get_connector_configuration +from utils.permission_sdk_utils import ( + get_missing_privileges, + get_warehouse_ref, + get_held_account_privileges, + request_required_privileges, + request_warehouse_ref +) + + +def connector_config_page(): + load_current_config() + + st.header("Configure connector") + st.caption( + "To complete the configuration and run the connector, the following objects will be " + "created in your Snowflake environment." + ) + st.caption("More can be found in the [documentation](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/connector_configuration) and the [reference](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/connector_configuration_reference)") + st.divider() + + st.subheader("Application privileges") + input_col, btn_col = st.columns([2, 1]) + with input_col: + st.text_input( + "", + value=", ".join(get_held_account_privileges()), + key="app_privileges", + disabled=True, + label_visibility="collapsed", + ) + with btn_col: + st.button( + "Grant privileges", + disabled=(not bool(get_missing_privileges())), + on_click=request_required_privileges + ) + st.caption("Specified privileges must be granted to the application before it can be started. This can be modified in the `manifest.yml` file and all the missing account level privileges will be requested automatically") + show_vertical_space(3) + + st.subheader("Warehouse reference") + wh_ref = get_warehouse_ref() + input_col, btn_col = st.columns([2, 1]) + with input_col: + st.text_input( + "", + value=(wh_ref[0] if wh_ref else ""), + key="warehouse", + disabled=True, + label_visibility="collapsed" + ) + with btn_col: + st.button( + "Choose warehouse", + on_click=request_warehouse_ref + ) + st.caption( + "A reference to an existing warehouse must be provided via the popup or in the " + "Security tab of your Native App. The USAGE privilege on the warehouse must also " + "be granted. This warehouse will be used by all Snowflake tasks created within the connector." + ) + show_vertical_space(3) + + st.subheader("Destination database") + input_col, _ = st.columns([2, 1]) + with input_col: + st.text_input("", key="dest_db", label_visibility="collapsed") + st.caption( + "Name of the new database, which will be created to store ingested data. The database " + "must not already exist in your Snowflake account" + ) + show_vertical_space(3) + + st.subheader("Destination schema") + input_col, _ = st.columns([2, 1]) + with input_col: + st.text_input("", key="dest_schema", label_visibility="collapsed") + st.caption("Name of the new schema, which will be created to store ingested data") + # TODO: Here you can add additional fields in connector configuration. Supported values are the following: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username + # For example: + # st.subheader("Operational warehouse") + # input_col, _ = st.columns([2, 1]) + # with input_col: + # st.text_input("", key="operational_warehouse", label_visibility="collapsed") + # st.caption("Name of the operational warehouse to be used") + st.divider() + + _, btn_col = st.columns([3.4, 0.6]) + with btn_col: + st.button( + "Configure", + on_click=finish_config, + type="primary" + ) + + +def load_current_config(): + if is_connector_configured(): + current_config = get_connector_configuration() + + if not st.session_state.get("dest_db"): + st.session_state["dest_db"] = current_config.get("destination_database", "") + if not st.session_state.get("dest_schema"): + st.session_state["dest_schema"] = current_config.get("destination_schema", "") + + +def finish_config(): + if get_missing_privileges(): + show_error("Required privileges were not granted to the application") + return + if not get_warehouse_ref(): + show_error("Warehouse reference was not set in the application") + return + if not validate_identifier(st.session_state["dest_db"]): + show_error("Invalid identifier provided for the destination database") + return + if not validate_identifier(st.session_state["dest_schema"]): + show_error("Invalid identifier provided for the destination schema") + return + + try: + # TODO: If some additional properties were added they need to be passed to the configure_connector function + response = configure_connector( + dest_db=st.session_state["dest_db"], + # operational_warehouse = st.session_state["operational_warehouse"], + dest_schema=st.session_state["dest_schema"] + ) + + if response.is_ok(): + ws.change_step(ws.CONNECTION_CONFIG) + else: + show_error_response(response=response, called_procedure="CONFIGURE_CONNECTOR") + except: + show_error("Unexpected error occurred, correct the provided data and try again") diff --git a/templates/connectors-native-sdk-template/app/streamlit/wizard/finalize_config.py b/templates/connectors-native-sdk-template/app/streamlit/wizard/finalize_config.py new file mode 100644 index 0000000..1f90fdd --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/wizard/finalize_config.py @@ -0,0 +1,71 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +import wizard.wizard_step as ws +from utils.ui_utils import show_vertical_space, show_error +from native_sdk_api.finalize_config import finalize_connector_configuration + + +def finalize_config_page(): + st.header("Finalize connector configuration") + st.caption( + "Finalisation step allows the user to provide some additional properties needed by the connector. " + "Those properties are not saved by default in the database " + "and this behavior needs to be implemented in the internal callback if required. " + "Additionally, this step performs more sophisticated validations on the source system if needed. " + "Another responsibility of this step is to prepare sink database for the ingested date, " + "any other needed database entities and start task reactor instances and scheduler. " + ) + st.caption("For more information please check the [documentation](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/finalize_configuration) and the [reference](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/finalize_configuration_reference)") + st.divider() + + st.subheader("Custom property") + input_col, _ = st.columns([2, 1]) + with input_col: + st.text_input( + "", + key="custom_property", + label_visibility="collapsed", + value="some_value" + ) + st.caption( + "Custom property for finalization, if you want to use it somehow, it needs to be implemented." + "For custom implementations search for the following:" + ) + st.caption("- IMPLEMENT ME finalize internal") + st.caption("- IMPLEMENT ME validate source") + # TODO: Here you can add additional fields in finalize connector configuration. + # For example: + # st.subheader("Some additional property") + # input_col, _ = st.columns([2, 1]) + # with input_col: + # st.text_input("", key="some_additional_property", label_visibility="collapsed") + # st.caption("Description of some new additional property") + show_vertical_space(3) + st.divider() + + if st.session_state.get("show_main_error"): + st.error(st.session_state.get("error_msg")) + _, finalize_btn_col = st.columns([2.95, 1.05]) + with finalize_btn_col: + st.button( + "Finalize configuration", + on_click=finalize_configuration, + type="primary" + ) + + +def finalize_configuration(): + try: + st.session_state["show_main_error"] = False + # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function. + response = finalize_connector_configuration(st.session_state.get("custom_property")) + if response.is_ok(): + ws.change_step(ws.FINALIZE_CONFIG) + else: + st.session_state["error_msg"] = f"An error response with code {response.get_response_code()} " \ + f"has been returned by FINALIZE_CONNECTOR_CONFIGURATION: " \ + f"{response.get_message()}" + st.session_state["show_main_error"] = True + except: + show_error("Unexpected error occurred, correct the provided data and try again") diff --git a/templates/connectors-native-sdk-template/app/streamlit/wizard/prerequisites.py b/templates/connectors-native-sdk-template/app/streamlit/wizard/prerequisites.py new file mode 100644 index 0000000..cb9483b --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/wizard/prerequisites.py @@ -0,0 +1,96 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +import wizard.wizard_step as ws +from utils.ui_utils import show_error, show_error_response +from native_sdk_api.prerequisites import ( + fetch_prerequisites, + update_prerequisite, + mark_all_prerequisites_as_done, + complete_prerequisite_step, + Prerequisite +) + + +def prerequisites_page(): + prerequisites = fetch_prerequisites() + + st.header("Prerequisites") + st.caption( + "Before starting the actual connector configuration process, make sure that you meet all " + "the prerequisites listed below. You don't need to complete this step before starting the " + "configuration, but it will help in making you sure that you are ready to start the " + "configuration process." + ) + st.caption("For more information on the prerequisites check the [documentation](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/prerequisites) and the [reference](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/prerequisites_reference)") + st.divider() + + for prerequisite in prerequisites: + single_prerequisite_layout(prerequisite) + + _, mark_btn_col, conf_btn_call = st.columns([2.6, 1.25, 1.15]) + with mark_btn_col: + st.button( + "Mark all as done", + on_click=call_mark_all_prerequisites_as_done, + key="mark_all_as_done_btn", + + ) + with conf_btn_call: + st.button( + "Start configuration", + on_click=call_complete_prerequisites_step, + key="complete_prerequisites_step_btn", + type="primary" + ) + + +def single_prerequisite_layout(prerequisite: Prerequisite): + desc_col, check_col = st.columns([9, 2]) + with desc_col: + st.subheader(prerequisite.get_title()) + st.caption(f"{prerequisite.get_description()}") + if prerequisite.get_documentation_url(): + st.markdown(f"[Learn more]({prerequisite.get_documentation_url()})") + with check_col: + checkbox_key = f"prerequisite_checkbox_{prerequisite.get_prerequisite_id()}" + st.text("Completed:") + st.checkbox( + "", + value=prerequisite.get_is_completed(), + key=checkbox_key, + label_visibility="hidden", + on_change=call_update_prerequisite, + args=( + checkbox_key, + prerequisite.get_prerequisite_id(), + prerequisite.get_is_completed() + ) + ) + st.divider() + + +def call_update_prerequisite(checkbox_key: str, prerequisite_id: str, is_completed: bool): + checkbox_state = st.session_state.get(checkbox_key) + if not checkbox_state == is_completed: + response = update_prerequisite(prerequisite_id, checkbox_state) + if not response.is_ok(): + show_error_response(response=response, called_procedure="UPDATE_PREREQUISITE") + + +def call_mark_all_prerequisites_as_done(): + response = mark_all_prerequisites_as_done() + if not response.is_ok(): + show_error_response(response=response, called_procedure="MARK_ALL_PREREQUISITES_AS_DONE") + + +def call_complete_prerequisites_step(): + try: + response = complete_prerequisite_step() + + if response.is_ok(): + ws.change_step(ws.CONNECTOR_CONFIG) + else: + show_error_response(response=response, called_procedure="COMPLETE_PREREQUISITES_STEP") + except: + show_error("Unexpected error occurred, correct the provided data and try again") diff --git a/templates/connectors-native-sdk-template/app/streamlit/wizard/wizard_layout.py b/templates/connectors-native-sdk-template/app/streamlit/wizard/wizard_layout.py new file mode 100644 index 0000000..00db9ba --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/wizard/wizard_layout.py @@ -0,0 +1,46 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +import wizard.wizard_step as ws +from utils.ui_utils import show_error +from wizard.connection_config import connection_config_page +from wizard.connector_config import connector_config_page +from wizard.finalize_config import finalize_config_page +from wizard.prerequisites import prerequisites_page + + +def wizard_page(): + step = ws.get_current_step() + wizard_sidebar(step) + + if step == ws.PREREQUISITES: + prerequisites_page() + elif step == ws.CONNECTOR_CONFIG: + connector_config_page() + elif step == ws.CONNECTION_CONFIG: + connection_config_page() + elif step == ws.FINALIZE_CONFIG: + finalize_config_page() + else: + show_error("Unknown step for the configuration wizard") + + +def wizard_sidebar(step: ws.WizardStep): + with st.sidebar: + st.header("Configuration steps") + + prepare_step_button(step, ws.PREREQUISITES) + prepare_step_button(step, ws.CONNECTOR_CONFIG) + prepare_step_button(step, ws.CONNECTION_CONFIG) + prepare_step_button(step, ws.FINALIZE_CONFIG) + + +def prepare_step_button(current_step: ws.WizardStep, target_step: ws.WizardStep): + if current_step == target_step: + button_label = f":blue[{target_step.get_sidebar_label()}]" + else: + button_label = target_step.get_sidebar_label() + + is_disabled = not target_step.is_current_status_allowed() + if st.button(button_label, disabled=is_disabled, use_container_width=True): + ws.change_step(target_step, True) diff --git a/templates/connectors-native-sdk-template/app/streamlit/wizard/wizard_step.py b/templates/connectors-native-sdk-template/app/streamlit/wizard/wizard_step.py new file mode 100644 index 0000000..837e863 --- /dev/null +++ b/templates/connectors-native-sdk-template/app/streamlit/wizard/wizard_step.py @@ -0,0 +1,77 @@ +# Copyright (c) 2024 Snowflake Inc. + +import streamlit as st +from utils.ui_utils import show_error +from native_sdk_api.connector_status import get_configuration_status, ConfigurationStatus + + +class WizardStep: + def __init__(self, sidebar_label: str, allowed_statuses: list): + self._sidebar_label: str = sidebar_label + self._allowed_statuses: list = allowed_statuses + + def get_sidebar_label(self): + return self._sidebar_label + + def is_current_status_allowed(self): + return st.session_state["configuration_status"] in self._allowed_statuses + + +PREREQUISITES = WizardStep( + sidebar_label="1\. Prerequisites", + allowed_statuses=[ + ConfigurationStatus.INSTALLED, + ConfigurationStatus.PREREQUISITES_DONE, + ConfigurationStatus.CONFIGURED, + ConfigurationStatus.CONNECTED + ] +) + +CONNECTOR_CONFIG = WizardStep( + sidebar_label="2\. Connector configuration", + allowed_statuses=[ + ConfigurationStatus.INSTALLED, + ConfigurationStatus.PREREQUISITES_DONE, + ConfigurationStatus.CONFIGURED, + ConfigurationStatus.CONNECTED + ] +) + +CONNECTION_CONFIG = WizardStep( + sidebar_label="3\. Connection configuration", + allowed_statuses=[ConfigurationStatus.CONFIGURED, ConfigurationStatus.CONNECTED] +) + +FINALIZE_CONFIG = WizardStep( + sidebar_label="4\. Finalize configuration", + allowed_statuses=[ConfigurationStatus.CONNECTED] +) + +__CONFIG_STATUS_TO_WIZARD_STEP = { + ConfigurationStatus.INSTALLED: PREREQUISITES, + ConfigurationStatus.PREREQUISITES_DONE: CONNECTOR_CONFIG, + ConfigurationStatus.CONFIGURED: CONNECTION_CONFIG, + ConfigurationStatus.CONNECTED: FINALIZE_CONFIG +} + + +def determine_wizard_step(): + status = get_configuration_status() + determined_step = __CONFIG_STATUS_TO_WIZARD_STEP.get(status) + + if determined_step is not None: + return determined_step + else: + show_error(f"Configuration status {status} should not trigger the configuration wizard") + + +def get_current_step(): + if "wizard_step" not in st.session_state: + st.session_state["wizard_step"] = determine_wizard_step() + return st.session_state["wizard_step"] + + +def change_step(new_step: WizardStep, rerun: bool = False): + st.session_state["wizard_step"] = new_step + if rerun: + st.experimental_rerun() diff --git a/templates/connectors-native-sdk-template/build.gradle b/templates/connectors-native-sdk-template/build.gradle new file mode 100644 index 0000000..426ded9 --- /dev/null +++ b/templates/connectors-native-sdk-template/build.gradle @@ -0,0 +1,462 @@ +plugins { + id 'java' +} + +group = 'com.snowflake' +version = '1.0-SNAPSHOT' + +repositories { + mavenCentral() + mavenLocal() +} + +dependencies { + compileOnly 'com.snowflake:connectors-native-sdk:2.0.0' + + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' + testImplementation 'com.snowflake:connectors-native-sdk-test:2.0.0' +} + +javadoc { + options { + links( + 'https://docs.oracle.com/en/java/javase/11/docs/api/', + 'https://docs.snowflake.com/developer-guide/snowpark/reference/java/', + 'https://www.javadoc.io/doc/com.fasterxml.jackson.core/jackson-databind/latest/' + ) + } +} + +test { + useJUnitPlatform() +} + +/** + * Copyright (c) 2024 Snowflake Inc. + * + * ********************************************** + * CONNECTOR LIFECYCLE TASKS + * ********************************************** + */ +import java.util.function.Supplier + +String defaultBuildDir = './sf_build' +String defaultSrcDir = './app' +String libraryName = 'connectors-native-sdk' +String defaultArtifactName = "${project.name}.jar" +String sdkComponentsDirName = 'native-connectors-sdk-components' + +project.tasks.register("copyInternalComponents") { + it.group = 'Snowflake' + it.description = "Copies jar artifact and all files from directory that contains internal custom connector components to connector build directory." + doLast() { + copyInternalComponents(defaultSrcDir, defaultArtifactName, defaultBuildDir) + } +} + +project.tasks.register('copySdkComponents') { + it.group = 'Snowflake' + it.description = "Copies .sql files from ${sdkComponentsDirName} directory to the connector build file." + doLast { + copySdkComponents(libraryName, defaultBuildDir, sdkComponentsDirName) + } +} + +project.tasks.register('prepareAppPackage') { + it.group = 'Snowflake' + it.description = 'Creates APPLICATION PACKAGE, SCHEMA and STAGE for connector artifacts deployment.' + doLast { + prepareAppPackage() + } +} + +project.tasks.register('deployConnector') { + it.group = 'Snowflake' + it.description = 'Put all files from given directory to the chosen stage with usage of snowsql tool.' + doLast { + deployConnector(defaultBuildDir) + } +} + +project.tasks.register('createNewVersion') { + it.group = 'Snowflake' + it.description = 'Creates new application VERSION from given version directory.' + doLast { + createNewVersion() + } +} + +project.tasks.register('createAppInstance') { + it.group = 'Snowflake' + it.description = 'Creates new APPLICATION INSTANCE from given VERSION.' + doLast { + createAppInstance() + } +} + +/* +* ********************************************** +* TASK MAIN LOGIC +* ********************************************** +*/ + +private void copyInternalComponents(String defaultSrcDir, String defaultArtifactName, String defaultBuildDir) { + TaskLogger.info("Starting 'copyInternalComponents' task...") + def localSrcDir = getCommandArgument('srcDir', {defaultSrcDir}) + def artifact = getCommandArgument('artifact', {defaultArtifactName}) + def targetDir = getCommandArgument('targetDir', {defaultBuildDir}) + + Utils.isDirectoryOrExit(localSrcDir) + buildLocalJavaArtifact() + project.copy { + String originalArtifactName = "${project.name}-${project.version}.jar" + TaskLogger.info("Copying jar artifact [$originalArtifactName] of local project to [${targetDir}] as [$artifact].") + from layout.projectDirectory.file("build/libs/${originalArtifactName}") + into layout.projectDirectory.dir(targetDir.replace("./", "")) + rename ("^.*${project.name}-${project.version}.jar.*\$", artifact) + } + project.copy { + TaskLogger.info("Copying all files from local source directory [${defaultSrcDir}] to connector build directory [$targetDir].") + from layout.projectDirectory.dir(localSrcDir.replace("./", "")) + into layout.projectDirectory.dir(targetDir.replace("./", "")) + } + TaskLogger.success("Local projects' jar artifact and all files from [${localSrcDir}] copied to [$defaultBuildDir] directory.") +} + +private void copySdkComponents(String libraryName, String defaultBuildDir, String sdkComponentsDirName) { + TaskLogger.info("Starting 'copySdkComponents' task...") + def targetDir = getCommandArgument('targetDir', {defaultBuildDir}) + + try { + project.copy { + TaskLogger.info("Copying [${sdkComponentsDirName}] directory with .sql files to '${targetDir}'") + from project.zipTree(project.configurations.compileClasspath.find { + it.name.startsWith(libraryName)}) + into targetDir + include "${sdkComponentsDirName}/**" + } + } catch (IllegalArgumentException e) { + Utils.exitWithErrorLog("Unable to find [${libraryName}] in the compile classpath. Make sure that the library is " + + "published to Maven local repository and the proper dependency is added to the build.gradle file.") + } + project.copy { + TaskLogger.info("Copying [${libraryName}] jar file to [${targetDir}]") + from configurations.compileClasspath.find { + it.name.startsWith(libraryName) + } + into targetDir + rename ("^.*${libraryName}.*\$", "${libraryName}.jar") + } + TaskLogger.success("Copying sdk components finished successfully.") +} + +private void prepareAppPackage() { + TaskLogger.info("Starting 'prepareAppPackage' task...") + def appPackage = getCommandArgument("appPackage") + def schema = getCommandArgument("schema") + def stage = getCommandArgument("stage") + def connection = getCommandArgument("connection") + + SnowsqlQueryScript script = SnowsqlQueryScript.createScript(connection) + TaskLogger.info("Starting to create a new APPLICATION PACKAGE, SCHEMA and STAGE.") + script.addQuery("CREATE APPLICATION PACKAGE IF NOT EXISTS ${appPackage} DISTRIBUTION = INTERNAL;") + script.addQuery("CREATE SCHEMA ${appPackage}.${schema};") + script.addQuery("CREATE STAGE ${appPackage}.${schema}.${stage};") + script.executeAndDelete() + TaskLogger.info("Stage [${appPackage}.${schema}.${stage}] created in app package [${appPackage}]") + TaskLogger.success("Application package prepared for deployment successfully.") +} + +private void deployConnector(String defaultBuildDir) { + TaskLogger.info("Starting 'deployConnector' task...") + def buildDirPath = getCommandArgument("buildDirPath", {defaultBuildDir}) + def connection = getCommandArgument("connection") + def appPackage = getCommandArgument("appPackage") + def schema = getCommandArgument("schema") + def stage = getCommandArgument("stage") + def appVersion = getCommandArgument("appVersion", {getVersionFromManifest(buildDirPath)}) + + SnowsqlQueryScript queriesScript = SnowsqlQueryScript.createScript(connection) + File buildDir = new File(buildDirPath) + if (buildDir.exists() && buildDir.isDirectory()) { + TaskLogger.info("Discovering files to be deployed in [$buildDirPath] directory.") + preparePutFileQueries(buildDir, queriesScript, appPackage, schema, stage, appVersion, buildDirPath) + } else { + Utils.exitWithErrorLog("File [${buildDir}] does not exist or is not a directory.") + } + TaskLogger.info("Putting discovered files to [${appVersion}] directory on [${appPackage}.${schema}.${stage}] stage.") + queriesScript.executeAndDelete() + TaskLogger.success("Connector deployment process finished successfully. " + + "Version directory can be found at [@${appPackage}.${schema}.${stage}/${appVersion}].") +} + +private void createNewVersion() { + TaskLogger.info("Starting 'createAppInstance' task...") + def connection = getCommandArgument("connection") + def versionDirPath = getCommandArgument("versionDirPath") + def appVersion = getCommandArgument("appVersion") + def appPackage = getCommandArgument("appPackage") + + TaskLogger.info("Creating new application version [${appVersion}] from version directory [${versionDirPath}].") + String query = """ALTER APPLICATION PACKAGE ${appPackage} + ADD VERSION "${appVersion}" + USING '${versionDirPath}';""" + Process command = new ProcessBuilder(["snowsql", "-c", connection, "-q", query]).redirectErrorStream(true).start() + Utils.executeStandardSnowsqlCommand(command) + TaskLogger.success("Version [${appVersion}] cretead successfully.") +} + +private void createAppInstance() { + TaskLogger.info("Starting 'createAppInstance' task...") + def connection = getCommandArgument("connection") + def appPackage = getCommandArgument("appPackage") + def instanceNameSuffix = getCommandArgument("instanceNameSuffix", {""}) + def versionDirPath = java.util.Optional.ofNullable(getCommandArgument("versionDirPath", {null})) + def appVersion = java.util.Optional.ofNullable(getCommandArgument("appVersion", {null})) + def instanceName = getCommandArgument("instanceName", {appPackage+(instanceNameSuffix.isBlank() ? "_INSTANCE" : "_${instanceNameSuffix}")}) + + String query = createFromAppVersionOrVersionDir(instanceName, appPackage, versionDirPath, appVersion) + Process command = new ProcessBuilder("snowsql", "-c", connection, "-q", query).redirectErrorStream(true).start() + Utils.executeStandardSnowsqlCommand(command) + TaskLogger.success("Application instance [${instanceName}] created successfully.") +} + +/* +* ********************************************** +* TASK UTILS +* ********************************************** +*/ + +private String createFromAppVersionOrVersionDir(String instanceName, String appPackage, java.util.Optional versionDirPath, java.util.Optional appVersion) { + String createFromAppPackageQuery = """CREATE APPLICATION ${instanceName} + FROM APPLICATION PACKAGE ${appPackage} + USING ${versionDirPath.orElse("")};""" + String createFromAppVersionQuery = """CREATE APPLICATION ${instanceName} + FROM APPLICATION PACKAGE ${appPackage} + USING VERSION "${appVersion.orElse("")}";""" + String infoLog + String query + + versionDirPath.ifPresent(dirPath -> + { + infoLog = "Creating APPLICATION instance from version directory [${dirPath}.]" + query = createFromAppPackageQuery + }) + appVersion.ifPresent(version -> { + infoLog = "Creating APPLICATION instance from VERSION [${version}.]" + query = createFromAppVersionQuery + }) + if (infoLog ==null && query == null) { + Utils.exitWithErrorLog("Neither [appVersion] nor [versionDirPath] parameter was specified.") + } + TaskLogger.info(infoLog) + return query +} + +private void preparePutFileQueries(File dirToSearch, SnowsqlQueryScript script, String appPackage, String schema, String stage, String appVersion, String buildDirPath) { + def filesInside = dirToSearch.listFiles().toList() + filesInside.each { + if (it.isDirectory()) { + preparePutFileQueries(it, script, appPackage, schema, stage, appVersion, buildDirPath) + } else { + TaskLogger.info("Discovered file: [.${it.path-buildDirPath}].") + String query = """PUT file://${it.path.replace("./", "")} + @${appPackage}.${schema}.${stage}/${appVersion}${(it.path-buildDirPath).replace(it.name, "")} + AUTO_COMPRESS = FALSE + OVERWRITE = FALSE;""" + script.addQuery(query) + } + } +} + +private void buildLocalJavaArtifact() { + TaskLogger.info("Building local jar artifact from local project.") + var process = new ProcessBuilder(["./gradlew", "build"]).redirectErrorStream(true).start() + Utils.executeCommand(process, "BUILD FAILED in", {Utils.exitWithErrorLog("Gradle build failed. Cannot create a jar artifact.")}) +} + +private String getCommandArgument(String propertyName) { + if (project.hasProperty(propertyName)) { + return project.property(propertyName).toString() + } + Utils.exitWithErrorLog("Parameter '${propertyName}' is required. Add [-P${propertyName}=\"\"] to the command.") +} + +private String getCommandArgument(String propertyName, Supplier defaultValue) { + return project.hasProperty(propertyName) ? + project.property(propertyName) : defaultValue.get() +} + +private String getVersionFromManifest(String buildDirPath) { + TaskLogger.info("No value assigned to 'appVersion' parameter. Fetching version from manifest file.") + File manifestFile = findManifestInDir(buildDirPath) + def lines = manifestFile.readLines() + String appVersion + try { + appVersion = lines.subList(lines.indexOf("version:"), lines.size()) + .findAll {it.contains("name:")} + .first() + .with {it.substring(it.indexOf('"')+1, it.lastIndexOf('"'))} + } catch(IndexOutOfBoundsException e) { + Utils.exitWithErrorLog("'version:' property not found in the manifest file.") + } catch(NoSuchElementException e) { + Utils.exitWithErrorLog("'name:' key for 'version:' property not found in the manifest file.") + } + allowVersionFromManifestOrExit(appVersion) + return appVersion +} + +private void allowVersionFromManifestOrExit(String appVersion) { + String useVersionFromManifestDecision = Utils.getUserInput("Found version [${appVersion}] in the manifest file. Do you want to use this version? [y/n]") + switch (useVersionFromManifestDecision) { + case "y": + TaskLogger.info("Using version [$appVersion] from manifest file as a version directory name.") + break + case "n": + Utils.exitGracefully("Run the task once again and choose the version with [-PappVersion] parameter.") + break + default: + allowVersionFromManifestOrExit(appVersion) + } +} + +private File findManifestInDir(String buildDirPath) { + File buildDir = new File(buildDirPath) + Utils.isDirectoryOrExit(buildDirPath) + String manifestFilePath = buildDir.listFiles() + .toList() + .stream() + .filter {it.getName().contains("manifest.yml") || it.getName().contains("manifest.yaml")} + .map {it.getPath()} + .findFirst() + .orElseGet {Utils.exitWithErrorLog("Manifest file does not exist in [${buildDirPath}].")} + TaskLogger.info("Manifest file found in connector build directory [$manifestFilePath].") + return new File(manifestFilePath) +} + +class TaskLogger { + + private static String redText = "\u001B[31m" + private static String lightBlueText = "\u001B[96m" + private static String greenText = "\u001B[92m" + private static String blueText = "\u001B[36m" + private static String yellowText = "\u001B[93m" + private static String defaultText = "\u001B[0m" + + static void error(String log) { + println("${redText}[ERROR]: ${log}${defaultText}") + } + + static void info(String log) { + println("${lightBlueText}[INFO]: ${log}${defaultText}") + } + + static void success(String log) { + println("${greenText}[SUCCESS]: ${log}${defaultText}") + } + + static void external(String log) { + println("${blueText}[EXTERNAL_LOG]: ${log}${defaultText}") + } + + static void input(String log) { + println("${yellowText}[INPUT_REQUIRED]: ${log}${defaultText}") + } +} + +class SnowsqlQueryScript { + + private String connection + private File script + + private SnowsqlQueryScript(String connection, File script) { + this.connection = connection + this.script = script + } + + static SnowsqlQueryScript createScript(String connectionName) { + File queriesScript = new File("./queriesScript.sql") + queriesScript.delete() + queriesScript.createNewFile() + return new SnowsqlQueryScript(connectionName, queriesScript) + } + + void addQuery(String query) { + this.script.with { + it.append("${query}\n") + } + } + + void executeAndDelete() { + var process = new ProcessBuilder(["snowsql", "-c", this.connection, "-f", this.script.path]).redirectErrorStream(true).start() + String errorTriggerLine = "If the error message is unclear" + Utils.executeCommand(process, errorTriggerLine, + { + this.script.delete() + Utils.exitWithErrorLog("Encountered an error while using snowsql. Check snowsql logs in order to find error root cause.") + }) + deleteScript() + } + + void deleteScript() { + script.delete() + } +} + +class Utils { + + static void isDirectoryOrExit(String path) { + File buildDir = new File(path) + if (!buildDir.isDirectory() || !buildDir.exists()) { + exitWithErrorLog("File [${buildDir}] does not exist or is not a directory.") + } + } + + static void exitWithErrorLog(String log) { + TaskLogger.error(log) + TaskLogger.error("Task execution failed.") + throw new NativeSdkTaskException(log) + } + + static void exitGracefully(String log) { + TaskLogger.success(log) + throw new StopExecutionException() + } + + static List executeCommand(Process command, String errorLine, Runnable onErrorAction) { + var reader = new BufferedReader(new InputStreamReader(command.getInputStream())) + List commandOutput = new LinkedList<>() + String line + boolean encounteredError + while ((line = reader.readLine()) != null) { + TaskLogger.external(line) + commandOutput.add(line) + if (line.contains(errorLine)) { + encounteredError = true + } + } + if (encounteredError) { + onErrorAction.run() + } + return commandOutput + } + + static List executeStandardSnowsqlCommand(Process command) { + return executeCommand(command, + "If the error message is unclear", + {exitWithErrorLog("Encountered an error while using snowsql. Check snowsql logs in order to find error root cause.")}) + } + + static String getUserInput(String displayedMessage) { + Scanner scanner = new Scanner(System.in) + TaskLogger.input(displayedMessage) + return scanner.next() + } +} + +class NativeSdkTaskException extends RuntimeException { + NativeSdkTaskException(String message) { + super(message) + } +} diff --git a/templates/connectors-native-sdk-template/gradle/wrapper/gradle-wrapper.jar b/templates/connectors-native-sdk-template/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..d64cd49 Binary files /dev/null and b/templates/connectors-native-sdk-template/gradle/wrapper/gradle-wrapper.jar differ diff --git a/templates/connectors-native-sdk-template/gradle/wrapper/gradle-wrapper.properties b/templates/connectors-native-sdk-template/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..ebb5223 --- /dev/null +++ b/templates/connectors-native-sdk-template/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/templates/connectors-native-sdk-template/gradlew b/templates/connectors-native-sdk-template/gradlew new file mode 100755 index 0000000..1aa94a4 --- /dev/null +++ b/templates/connectors-native-sdk-template/gradlew @@ -0,0 +1,249 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/templates/connectors-native-sdk-template/gradlew.bat b/templates/connectors-native-sdk-template/gradlew.bat new file mode 100644 index 0000000..93e3f59 --- /dev/null +++ b/templates/connectors-native-sdk-template/gradlew.bat @@ -0,0 +1,92 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/templates/connectors-native-sdk-template/settings.gradle b/templates/connectors-native-sdk-template/settings.gradle new file mode 100644 index 0000000..e2d2678 --- /dev/null +++ b/templates/connectors-native-sdk-template/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'connectors-native-sdk-template' diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ConnectorObjects.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ConnectorObjects.java new file mode 100644 index 0000000..81509e3 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ConnectorObjects.java @@ -0,0 +1,33 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example; + +/** Simple constant aggregation class for various connector object names. */ +public class ConnectorObjects { + + /** Name of the table, where the ingested issues are stored. */ + public static final String DATA_TABLE = "DATA"; + + /** Name of the view, which provides structured issue data from the {@link #DATA_TABLE}. */ + public static final String DATA_VIEW = DATA_TABLE + "_VIEW"; + + /** Name of the task used by the scheduler system. */ + public static final String SCHEDULER_TASK = "SCHEDULER_TASK"; + + /** Name of the task reactor instance used by the connector. */ + public static final String TASK_REACTOR_INSTANCE = "EXAMPLE_CONNECTOR_TASK_REACTOR"; + + /** Name of the task reactor dispatcher task. */ + public static final String DISPATCHER_TASK = "DISPATCHER_TASK"; + + /** Name of the task reactor API schema. */ + public static final String TASK_REACTOR_SCHEMA = "TASK_REACTOR"; + + /** Name of the procedure used for task reactor instance initialization. */ + public static final String INITIALIZE_INSTANCE_PROCEDURE = "INITIALIZE_INSTANCE"; + + /** Name of the procedure used for task reactor worker number setting. */ + public static final String SET_WORKERS_NUMBER_PROCEDURE = "SET_WORKERS_NUMBER"; + + /** Name of the internal connector schema. */ + public static final String STATE_SCHEMA = "STATE"; +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationCallback.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationCallback.java new file mode 100644 index 0000000..91bba3d --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationCallback.java @@ -0,0 +1,52 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.connection; + +import static java.lang.String.format; + +import com.snowflake.connectors.application.configuration.connection.ConnectionConfigurationCallback; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Custom implementation of {@link ConnectionConfigurationCallback}, used by the {@link + * TemplateConnectionConfigurationHandler}, providing external access configuration for the + * connector procedures. + */ +public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback { + + private final Session session; + + public TemplateConnectionConfigurationCallback(Session session) { + this.session = session; + } + + @Override + public ConnectorResponse execute(Variant config) { + // TODO: If you need to alter some procedures with external access you can use + // configureProcedure method or implement a similar method on your own. + // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application + // to be done after connection configuration, like altering procedures with external access. + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/connection_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/connection_configuration + return ConnectorResponse.success( + "This method needs to be implemented. Search for 'IMPLEMENT ME connection callback'"); + } + + private void configureProcedure(String procedureName, Variant config) { + var configMap = config.asMap(); + + session + .sql( + format( + "ALTER PROCEDURE %s SET " + + "SECRETS=('%s' = %s) " + + "EXTERNAL_ACCESS_INTEGRATIONS=(%s)", + procedureName, + "secret variable name", + configMap.get("key from config with secret name").asString(), + configMap.get("key from config with external access name").asString())) + .collect(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationHandler.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationHandler.java new file mode 100644 index 0000000..5a8f4df --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationHandler.java @@ -0,0 +1,28 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.connection; + +import com.snowflake.connectors.application.configuration.connection.ConnectionConfigurationHandler; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Backend implementation for the custom {@code PUBLIC.SET_CONNECTION_CONFIGURATION} procedure, + * created using a custom implementation of {@link ConnectionConfigurationHandler}. + */ +public class TemplateConnectionConfigurationHandler { + + public static Variant setConnectionConfiguration(Session session, Variant config) { + // TODO: HINT: If you want to implement the interfaces yourself you need to provide them here to + // handler or specify your own handler. + // This method is referenced with full classpath from the `setup.sql` script. + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/connection_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/connection_configuration + var handler = + ConnectionConfigurationHandler.builder(session) + .withInputValidator(new TemplateConnectionConfigurationInputValidator()) + .withCallback(new TemplateConnectionConfigurationCallback(session)) + .build(); + return handler.setConnectionConfiguration(config).toVariant(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationInputValidator.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationInputValidator.java new file mode 100644 index 0000000..fbb5014 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionConfigurationInputValidator.java @@ -0,0 +1,26 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.connection; + +import com.snowflake.connectors.application.configuration.connection.ConnectionConfigurationInputValidator; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Custom implementation of {@link ConnectionConfigurationInputValidator}, used by the {@link + * TemplateConnectionConfigurationHandler}. + */ +public class TemplateConnectionConfigurationInputValidator + implements ConnectionConfigurationInputValidator { + + @Override + public ConnectorResponse validate(Variant config) { + // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input + // requires some additional validation this is the place to implement this logic. + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/connection_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/connection_configuration + return ConnectorResponse.success( + "This method needs to be implemented. Search for 'IMPLEMENT ME connection configuration" + + " validate'"); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionValidator.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionValidator.java new file mode 100644 index 0000000..9a42c5f --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/connection/TemplateConnectionValidator.java @@ -0,0 +1,34 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.connection; + +import com.snowflake.connectors.application.configuration.connection.ConnectionConfigurationHandler; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Backend implementation for the custom {@code PUBLIC.TEST_CONNECTION} procedure, used by the + * {@link ConnectionConfigurationHandler} for initial external connection testing. + * + *

For this procedure to work - it must have been altered by the {@link + * TemplateConnectionConfigurationCallback} first. + */ +public class TemplateConnectionValidator { + + public static Variant testConnection(Session session) { + return testConnection().toVariant(); + } + + private static ConnectorResponse testConnection() { + // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to + // the source system here. This usually requires connection to some webservice or other external + // system. It is suggested to perform only the basic connectivity validation here. + // If that's the case then this procedure must be altered in + // TemplateConnectionConfigurationCallback first. + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/connection_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/connection_configuration + return ConnectorResponse.success( + "This method needs to be implemented. Search for 'IMPLEMENT ME test connection'"); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateAccessValidator.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateAccessValidator.java new file mode 100644 index 0000000..b512876 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateAccessValidator.java @@ -0,0 +1,28 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.finalize; + +import com.snowflake.connectors.application.configuration.finalization.SourceValidator; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Custom implementation of {@link SourceValidator}, used by the {@link + * TemplateFinalizeConnectorConfigurationCustomHandler}, providing final validation external source + * system. + */ +public class TemplateAccessValidator implements SourceValidator { + + @Override + public ConnectorResponse validate(Variant variant) { + // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source + // system. In some cases this can be the same validation that happened in + // TemplateConnectionValidator. + // However, it is suggested to perform more complex validations, like specific access rights to + // some specific resources here. + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/finalize_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/finalize_configuration + return ConnectorResponse.success( + "This method needs to be implemented. Search for IMPLEMENT ME validate source"); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateFinalizeConnectorConfigurationCustomHandler.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateFinalizeConnectorConfigurationCustomHandler.java new file mode 100644 index 0000000..7d06d96 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateFinalizeConnectorConfigurationCustomHandler.java @@ -0,0 +1,37 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.finalize; + +import com.snowflake.connectors.application.configuration.finalization.FinalizeConnectorHandler; +import com.snowflake.connectors.application.scheduler.SchedulerCreator; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.connectors.example.configuration.connection.TemplateConnectionConfigurationCallback; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Backend implementation for the custom {@code PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION} procedure, + * created using a custom implementation of {@link FinalizeConnectorHandler}. + * + *

For this procedure to work - it must have been altered by the {@link + * TemplateConnectionConfigurationCallback} first. + */ +public class TemplateFinalizeConnectorConfigurationCustomHandler { + + public Variant finalizeConnectorConfiguration(Session session, Variant customConfig) { + // TODO: HINT: If you want to implement the interfaces yourself you need to provide them here to + // handler or specify your own handler. + // This method is referenced with full classpath from the `setup.sql` script. + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/finalize_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/finalize_configuration + var schedulerCreator = SchedulerCreator.getInstance(session); + var handler = + FinalizeConnectorHandler.builder(session) + .withCallback( + new TemplateFinalizeConnectorConfigurationInternal(session, schedulerCreator)) + .withSourceValidator(new TemplateAccessValidator()) + .withInputValidator(v -> ConnectorResponse.success()) + .build(); + return handler.finalizeConnectorConfiguration(customConfig).toVariant(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateFinalizeConnectorConfigurationInternal.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateFinalizeConnectorConfigurationInternal.java new file mode 100644 index 0000000..bacc79d --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/finalize/TemplateFinalizeConnectorConfigurationInternal.java @@ -0,0 +1,168 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.finalize; + +import static com.snowflake.connectors.application.configuration.connector.ConnectorConfigurationKey.DESTINATION_DATABASE; +import static com.snowflake.connectors.application.configuration.connector.ConnectorConfigurationKey.DESTINATION_SCHEMA; +import static com.snowflake.connectors.example.ConnectorObjects.DATA_TABLE; +import static com.snowflake.connectors.example.ConnectorObjects.DATA_VIEW; +import static com.snowflake.connectors.example.ConnectorObjects.INITIALIZE_INSTANCE_PROCEDURE; +import static com.snowflake.connectors.example.ConnectorObjects.SET_WORKERS_NUMBER_PROCEDURE; +import static com.snowflake.connectors.example.ConnectorObjects.TASK_REACTOR_INSTANCE; +import static com.snowflake.connectors.example.ConnectorObjects.TASK_REACTOR_SCHEMA; +import static com.snowflake.connectors.util.sql.SqlTools.callProcedure; +import static com.snowflake.connectors.util.sql.SqlTools.varcharArgument; +import static java.lang.String.format; + +import com.snowflake.connectors.application.configuration.finalization.FinalizeConnectorCallback; +import com.snowflake.connectors.application.scheduler.SchedulerCreator; +import com.snowflake.connectors.common.object.ObjectName; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.connectors.example.configuration.utils.Configuration; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Custom implementation of {@link FinalizeConnectorCallback}, used by the {@link + * TemplateFinalizeConnectorConfigurationCustomHandler}, providing final connector configuration + * logic: + * + *

    + *
  • destination schema and database setup + *
  • task reactor instance initialization + *
  • scheduler initialization + *
+ */ +public class TemplateFinalizeConnectorConfigurationInternal implements FinalizeConnectorCallback { + + private static final String WAREHOUSE_REFERENCE = "reference(\\'WAREHOUSE_REFERENCE\\')"; + private static final String NULL_ARG = "null"; + + private final Session session; + private final SchedulerCreator schedulerCreator; + + public TemplateFinalizeConnectorConfigurationInternal( + Session session, SchedulerCreator schedulerCreator) { + this.session = session; + this.schedulerCreator = schedulerCreator; + } + + @Override + public ConnectorResponse execute(Variant variant) { + var connectorConfig = Configuration.fromConnectorConfig(session); + var destinationDatabase = connectorConfig.getValue(DESTINATION_DATABASE.getPropertyName()); + var destinationSchema = connectorConfig.getValue(DESTINATION_SCHEMA.getPropertyName()); + + if (destinationDatabase.isEmpty()) { + return Configuration.keyNotFoundResponse(DESTINATION_DATABASE.getPropertyName()); + } + + if (destinationSchema.isEmpty()) { + return Configuration.keyNotFoundResponse(DESTINATION_SCHEMA.getPropertyName()); + } + + var destinationTableName = + ObjectName.from(destinationDatabase.get(), destinationSchema.get(), DATA_TABLE) + .getEscapedName(); + var issuesView = + ObjectName.from(destinationDatabase.get(), destinationSchema.get(), DATA_VIEW) + .getEscapedName(); + + createDestinationDbObjects(destinationDatabase.get(), destinationSchema.get()); + createSinkTable(destinationTableName); + createSinkView(issuesView, destinationTableName); + + // Task reactor and scheduler components initialization. Those components handle work + // distribution and scheduling for the ingestion of configured resources. + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/task_reactor_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/scheduler_reference + initializeTaskReactorInstance(); + initializeScheduler(); + setTaskReactorWorkersNumber(); + + // TODO: IMPLEMENT ME finalize internal: Implement any additional custom logic for finalization + // of the connector. For example saving the config + // See more in docs: + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/finalize_configuration_reference + // https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/finalize_configuration + return ConnectorResponse.success( + "This method needs to be implemented. Search for IMPLEMENT ME finalize internal"); + } + + private void createDestinationDbObjects(String destinationDatabase, String destinationSchema) { + session.sql(format("CREATE DATABASE IF NOT EXISTS %s", destinationDatabase)).collect(); + session + .sql(format("GRANT USAGE ON DATABASE %s TO APPLICATION ROLE ADMIN", destinationDatabase)) + .collect(); + session + .sql(format("CREATE SCHEMA IF NOT EXISTS %s.%s", destinationDatabase, destinationSchema)) + .collect(); + session + .sql( + format( + "GRANT USAGE ON SCHEMA %s.%s TO APPLICATION ROLE ADMIN", + destinationDatabase, destinationSchema)) + .collect(); + } + + private void createSinkTable(String destinationTable) { + // TODO: HINT: This implementation assumes a single sink table, that's why it is configured + // here. If + // data for each resource should be stored separately, then it's better to create the tables and + // view when scheduling a resource or during ingestion if needed. + // TODO: If data should be stored in some other way than just raw variant then it should be + // customized here. + session + .sql(format("CREATE TABLE IF NOT EXISTS %s (RAW_DATA VARIANT)", destinationTable)) + .collect(); + session + .sql(format("GRANT ALL ON TABLE %s TO APPLICATION ROLE ADMIN", destinationTable)) + .collect(); + session + .sql(format("GRANT SELECT ON TABLE %s TO APPLICATION ROLE DATA_READER", destinationTable)) + .collect(); + } + + private void createSinkView(String issuesView, String destinationTable) { + session + .sql( + format( + "CREATE VIEW IF NOT EXISTS %s AS ( " + + " SELECT RAW_DATA:id as id,\n" + + " RAW_DATA:timestamp as timestamp,\n" + + " RAW_DATA:resource_id as resource_id,\n" + + " from %s " + + " )", + issuesView, destinationTable)) + .collect(); + session.sql(format("GRANT SELECT ON VIEW %s TO APPLICATION ROLE ADMIN", issuesView)).collect(); + session + .sql(format("GRANT SELECT ON VIEW %s TO APPLICATION ROLE DATA_READER", issuesView)) + .collect(); + } + + private void initializeTaskReactorInstance() { + callProcedure( + session, + TASK_REACTOR_SCHEMA, + INITIALIZE_INSTANCE_PROCEDURE, + varcharArgument(TASK_REACTOR_INSTANCE), + varcharArgument(WAREHOUSE_REFERENCE), + NULL_ARG, + NULL_ARG, + NULL_ARG, + NULL_ARG); + } + + private void initializeScheduler() { + schedulerCreator.createScheduler(); + } + + private void setTaskReactorWorkersNumber() { + session + .sql( + String.format( + "CALL %s.%s(%d, '%s')", + TASK_REACTOR_SCHEMA, SET_WORKERS_NUMBER_PROCEDURE, 5, TASK_REACTOR_INSTANCE)) + .collect(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/utils/Configuration.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/utils/Configuration.java new file mode 100644 index 0000000..5158d5a --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/utils/Configuration.java @@ -0,0 +1,57 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.utils; + +import static java.lang.String.format; + +import com.snowflake.connectors.application.configuration.ConfigurationRepository; +import com.snowflake.connectors.application.configuration.connection.ConnectionConfigurationNotFoundException; +import com.snowflake.connectors.application.configuration.connector.ConnectorConfigurationNotFoundException; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; +import java.util.Map; +import java.util.Optional; + +/** Utility class for simple connector and connection configuration management. */ +public class Configuration { + + private static final String CONNECTOR_CONFIGURATION_KEY = "connector_configuration"; + private static final String CONNECTION_CONFIGURATION_KEY = "connection_configuration"; + + private static final String KEY_NOT_FOUND_ERROR = "KEY_NOT_FOUND"; + private static final String KEY_NOT_FOUND_ERROR_MSG = + "Unable to find [%s] key in the provided configuration."; + private final Map config; + + public static Configuration fromCustomConfig(Variant variant) { + return new Configuration(variant); + } + + public static Configuration fromConnectorConfig(Session session) { + var config = + ConfigurationRepository.getInstance(session) + .fetch(CONNECTOR_CONFIGURATION_KEY, Variant.class) + .orElseThrow(ConnectorConfigurationNotFoundException::new); + return new Configuration(config); + } + + public static Configuration fromConnectionConfig(Session session) { + var config = + ConfigurationRepository.getInstance(session) + .fetch(CONNECTION_CONFIGURATION_KEY, Variant.class) + .orElseThrow(ConnectionConfigurationNotFoundException::new); + return new Configuration(config); + } + + private Configuration(Variant config) { + this.config = config.asMap(); + } + + public Optional getValue(String key) { + return Optional.ofNullable(config.get(key).asString()); + } + + public static ConnectorResponse keyNotFoundResponse(String key) { + return ConnectorResponse.error(KEY_NOT_FOUND_ERROR, format(KEY_NOT_FOUND_ERROR_MSG, key)); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/utils/ConnectorConfigurationPropertyNotFoundException.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/utils/ConnectorConfigurationPropertyNotFoundException.java new file mode 100644 index 0000000..5fbed08 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/configuration/utils/ConnectorConfigurationPropertyNotFoundException.java @@ -0,0 +1,16 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.configuration.utils; + +/** + * Exception thrown when the provided configuration property does not exist in the connector + * configuration table. + */ +public class ConnectorConfigurationPropertyNotFoundException extends RuntimeException { + + private static final String ERROR_MESSAGE = + "Property [%s] not found in the Connector Configuration"; + + public ConnectorConfigurationPropertyNotFoundException(String property) { + super(String.format(ERROR_MESSAGE, property)); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/IngestionHelper.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/IngestionHelper.java new file mode 100644 index 0000000..9c481de --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/IngestionHelper.java @@ -0,0 +1,48 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.ingestion; + +import static com.snowflake.snowpark_java.Functions.col; + +import com.snowflake.snowpark_java.Row; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.DataTypes; +import com.snowflake.snowpark_java.types.StructField; +import com.snowflake.snowpark_java.types.StructType; +import com.snowflake.snowpark_java.types.Variant; +import java.util.List; +import java.util.Map; + +/** A utility class for resource ingestion handling. */ +public final class IngestionHelper { + + private IngestionHelper() {} + + /** + * Save the raw data to a database table + * + * @param session Snowpark session object + * @param destTableName Target table name + * @param data Raw data to save + * @return number of rows saved in the destination table + */ + public static long saveRawData(Session session, String destTableName, List data) { + // TODO: This method is responsible for inserting/updating data in the sink table. If the data + // is stored in different format than raw variants the it needs to be customized + + var tableSchema = StructType.create(new StructField("RAW_DATA", DataTypes.VariantType)); + var dataRows = data.stream().map(Row::create).toArray(Row[]::new); + var source = session.createDataFrame(dataRows, tableSchema); + var table = session.table(destTableName); + var assignments = Map.of(col("RAW_DATA"), source.col("RAW_DATA")); + table + .merge( + source, + table.col("raw_data").subField("id").equal_to(source.col("raw_data").subField("id"))) + .whenMatched() + .update(assignments) + .whenNotMatched() + .insert(assignments) + .collect(); + return dataRows.length; + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateIngestion.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateIngestion.java new file mode 100644 index 0000000..9af0b10 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateIngestion.java @@ -0,0 +1,134 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.ingestion; + +import static com.snowflake.connectors.application.configuration.connector.ConnectorConfigurationKey.DESTINATION_DATABASE; +import static com.snowflake.connectors.application.configuration.connector.ConnectorConfigurationKey.DESTINATION_SCHEMA; +import static com.snowflake.connectors.application.observability.IngestionRun.IngestionStatus.CANCELED; +import static com.snowflake.connectors.application.observability.IngestionRun.IngestionStatus.COMPLETED; +import static com.snowflake.connectors.application.observability.IngestionRun.IngestionStatus.FAILED; +import static com.snowflake.connectors.application.observability.IngestionRun.IngestionStatus.IN_PROGRESS; +import static com.snowflake.connectors.example.ConnectorObjects.DATA_TABLE; + +import com.snowflake.connectors.application.observability.IngestionRun.IngestionStatus; +import com.snowflake.connectors.application.observability.IngestionRunRepository; +import com.snowflake.connectors.common.object.ObjectName; +import com.snowflake.connectors.example.configuration.utils.Configuration; +import com.snowflake.connectors.example.configuration.utils.ConnectorConfigurationPropertyNotFoundException; +import com.snowflake.connectors.taskreactor.OnIngestionFinishedCallback; +import com.snowflake.connectors.taskreactor.worker.ingestion.Ingestion; +import com.snowflake.connectors.taskreactor.worker.queue.WorkItem; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import net.snowflake.client.jdbc.internal.google.cloud.Timestamp; + +/** Custom implementation of {@link Ingestion}, used for ingestion of random data */ +public class TemplateIngestion implements Ingestion { + + private final Session session; + private final IngestionRunRepository ingestionRunRepository; + private final OnIngestionFinishedCallback onIngestionFinishedCallback; + + private String ingestionRunId; + private TemplateWorkItem templateWorkItem; + private long ingestedRows = 0L; + + public TemplateIngestion( + Session session, + IngestionRunRepository ingestionRunRepository, + OnIngestionFinishedCallback onIngestionFinishedCallback) { + this.session = session; + this.ingestionRunRepository = ingestionRunRepository; + this.onIngestionFinishedCallback = onIngestionFinishedCallback; + } + + @Override + public IngestionStatus initialState(WorkItem workItem) { + this.templateWorkItem = TemplateWorkItem.from(workItem); + return IN_PROGRESS; + } + + @Override + public void preIngestion(WorkItem workItem) { + this.ingestionRunId = + ingestionRunRepository.startRun( + templateWorkItem.getResourceIngestionDefinitionId(), + templateWorkItem.getIngestionConfigurationId(), + templateWorkItem.getProcessId(), + null); + } + + @Override + public IngestionStatus performIngestion(WorkItem workItem, IngestionStatus ingestionStatus) { + var destinationTable = prepareDestinationTableName().getEscapedName(); + + try { + // TODO: IMPLEMENT ME ingestion: The following data generates some random data to be stored. + // In a real use case + // scenario a service handling communication with the external source system should be called + // here. + List rawResults = randomData(); + + this.ingestedRows = IngestionHelper.saveRawData(this.session, destinationTable, rawResults); + return COMPLETED; + } catch (Exception exception) { + return FAILED; + } + } + + @Override + public boolean isIngestionCompleted(WorkItem workItem, IngestionStatus status) { + return status != IN_PROGRESS; + } + + @Override + public void postIngestion(WorkItem workItem, IngestionStatus ingestionStatus) { + ingestionRunRepository.endRun(ingestionRunId, ingestionStatus, ingestedRows, null); + onIngestionFinishedCallback.onIngestionFinished(templateWorkItem.getProcessId(), null); + } + + @Override + public void ingestionCancelled(WorkItem workItem, IngestionStatus lastState) { + ingestionRunRepository.endRun(ingestionRunId, CANCELED, ingestedRows, null); + } + + private ObjectName prepareDestinationTableName() { + var connectorConfig = Configuration.fromConnectorConfig(session); + var destinationDatabase = + connectorConfig + .getValue(DESTINATION_DATABASE.getPropertyName()) + .orElseThrow( + () -> + new ConnectorConfigurationPropertyNotFoundException( + DESTINATION_DATABASE.getPropertyName())); + var destinationSchema = + connectorConfig + .getValue(DESTINATION_SCHEMA.getPropertyName()) + .orElseThrow( + () -> + new ConnectorConfigurationPropertyNotFoundException( + DESTINATION_SCHEMA.getPropertyName())); + return ObjectName.from(destinationDatabase, destinationSchema, DATA_TABLE); + } + + public List randomData() { + return IntStream.range(0, new Random().nextInt(128)) + .boxed() + .map( + it -> + new Variant( + Map.of( + "resource_id", + templateWorkItem.getResourceIngestionDefinitionId(), + "id", + UUID.randomUUID(), + "timestamp", + Timestamp.now().toString()))) + .collect(Collectors.toList()); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateWorkItem.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateWorkItem.java new file mode 100644 index 0000000..e52aeb5 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateWorkItem.java @@ -0,0 +1,60 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.ingestion; + +import com.snowflake.connectors.example.ingestion.exception.WorkItemPayloadPropertyNotFoundException; +import com.snowflake.connectors.taskreactor.worker.queue.WorkItem; +import com.snowflake.snowpark_java.types.Variant; +import java.util.Optional; + +/** + * Class representing a template ingestion work item record which is inserted into the task reactor + * queue. + * + *

Fields of this class are populated using deserialized data from the payload of the task + * reactor work item. + */ +public class TemplateWorkItem { + + private static final String RESOURCE_INGESTION_DEFINITION_ID_PROPERTY = + "resourceIngestionDefinitionId"; + private static final String INGESTION_CONFIGURATION_ID_PROPERTY = "ingestionConfigurationId"; + + private final String processId; + private final String resourceIngestionDefinitionId; + private final String ingestionConfigurationId; + + public static TemplateWorkItem from(WorkItem workItem) { + var resourceIngestionDefinitionId = + extractPropertyFromWorkItemPayload(workItem, RESOURCE_INGESTION_DEFINITION_ID_PROPERTY); + var ingestionConfigurationId = + extractPropertyFromWorkItemPayload(workItem, INGESTION_CONFIGURATION_ID_PROPERTY); + return new TemplateWorkItem( + workItem.resourceId, resourceIngestionDefinitionId, ingestionConfigurationId); + } + + private static String extractPropertyFromWorkItemPayload(WorkItem workItem, String propertyName) { + var properties = workItem.payload.asMap(); + return Optional.ofNullable(properties.get(propertyName)) + .map(Variant::asString) + .orElseThrow(() -> new WorkItemPayloadPropertyNotFoundException(propertyName)); + } + + private TemplateWorkItem( + String processId, String resourceIngestionDefinitionId, String ingestionConfigurationId) { + this.processId = processId; + this.resourceIngestionDefinitionId = resourceIngestionDefinitionId; + this.ingestionConfigurationId = ingestionConfigurationId; + } + + public String getProcessId() { + return processId; + } + + public String getResourceIngestionDefinitionId() { + return resourceIngestionDefinitionId; + } + + public String getIngestionConfigurationId() { + return ingestionConfigurationId; + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateWorker.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateWorker.java new file mode 100644 index 0000000..6d0a3e8 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/TemplateWorker.java @@ -0,0 +1,37 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.ingestion; + +import com.snowflake.connectors.application.integration.SchedulerTaskReactorOnIngestionFinishedCallback; +import com.snowflake.connectors.application.observability.DefaultIngestionRunRepository; +import com.snowflake.connectors.common.object.Identifier; +import com.snowflake.connectors.example.configuration.connection.TemplateConnectionConfigurationCallback; +import com.snowflake.connectors.taskreactor.worker.WorkerId; +import com.snowflake.connectors.taskreactor.worker.ingestion.IngestionWorker; +import com.snowflake.snowpark_java.Session; + +/** + * Backend implementation for the custom {@code PUBLIC.TEMPLATE_WORKER} procedure, used by the task + * reactor to generate random data. + * + *

For this procedure to work - it must have been altered by the {@link + * TemplateConnectionConfigurationCallback} first. + */ +public class TemplateWorker { + + public static String executeWork(Session session, int workerId, String taskReactorSchema) { + var ingestionRunRepository = new DefaultIngestionRunRepository(session); + var callback = SchedulerTaskReactorOnIngestionFinishedCallback.getInstance(session); + + var ingestion = new TemplateIngestion(session, ingestionRunRepository, callback); + var workerIdentifier = new WorkerId(workerId); + var schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema); + var worker = IngestionWorker.from(session, ingestion, workerIdentifier, schemaIdentifier); + + try { + worker.run(); + return "Executed"; + } catch (Exception exception) { + throw new RuntimeException(exception.getMessage()); + } + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/exception/WorkItemPayloadPropertyNotFoundException.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/exception/WorkItemPayloadPropertyNotFoundException.java new file mode 100644 index 0000000..23507df --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/exception/WorkItemPayloadPropertyNotFoundException.java @@ -0,0 +1,12 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.ingestion.exception; + +/** Exception thrown when the provided property does not exist in the work item payload. */ +public class WorkItemPayloadPropertyNotFoundException extends RuntimeException { + + private static final String ERROR_MESSAGE = "Property [%s] not found in the WorkItem payload"; + + public WorkItemPayloadPropertyNotFoundException(String property) { + super(String.format(ERROR_MESSAGE, property)); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/exception/WorkItemResourceIdPropertyNotFoundException.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/exception/WorkItemResourceIdPropertyNotFoundException.java new file mode 100644 index 0000000..c9c47f4 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/ingestion/exception/WorkItemResourceIdPropertyNotFoundException.java @@ -0,0 +1,12 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.ingestion.exception; + +/** Exception thrown when the provided property does not exist in the work item resource id. */ +public class WorkItemResourceIdPropertyNotFoundException extends RuntimeException { + + private static final String ERROR_MESSAGE = "Property [%s] not found in the WorkItem resourceId"; + + public WorkItemResourceIdPropertyNotFoundException(String property) { + super(String.format(ERROR_MESSAGE, property)); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/integration/SchedulerIntegratedWithTaskReactorHandler.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/integration/SchedulerIntegratedWithTaskReactorHandler.java new file mode 100644 index 0000000..1486d46 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/integration/SchedulerIntegratedWithTaskReactorHandler.java @@ -0,0 +1,27 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.integration; + +import com.snowflake.connectors.application.integration.SchedulerTaskReactorOnIngestionScheduled; +import com.snowflake.connectors.application.scheduler.RunSchedulerIterationHandler; +import com.snowflake.connectors.common.object.Identifier; +import com.snowflake.connectors.example.ConnectorObjects; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** + * Backend implementation for the custom {@code PUBLIC.RUN_SCHEDULER_ITERATION} procedure, used by + * the scheduler system. + */ +public class SchedulerIntegratedWithTaskReactorHandler { + + public static Variant runIteration(Session session) { + var callback = + SchedulerTaskReactorOnIngestionScheduled.getInstance( + session, Identifier.from(ConnectorObjects.TASK_REACTOR_INSTANCE)); + return RunSchedulerIterationHandler.builder(session) + .withCallback(callback) + .build() + .runIteration() + .toVariant(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/pause/InternalPauseConnectorCallback.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/pause/InternalPauseConnectorCallback.java new file mode 100644 index 0000000..dea19c0 --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/pause/InternalPauseConnectorCallback.java @@ -0,0 +1,39 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.lifecycle.pause; + +import static com.snowflake.connectors.example.ConnectorObjects.SCHEDULER_TASK; +import static com.snowflake.connectors.example.ConnectorObjects.STATE_SCHEMA; + +import com.snowflake.connectors.application.lifecycle.pause.PauseConnectorCallback; +import com.snowflake.connectors.common.object.ObjectName; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.connectors.common.task.TaskRepository; + +/** + * Custom implementation of {@link PauseConnectorCallback}, used by the {@link + * PauseConnectorCustomHandler}, providing suspension of the scheduler system. + */ +public class InternalPauseConnectorCallback implements PauseConnectorCallback { + + private static final String ERROR_CODE = "PAUSE_CONNECTOR_FAILED"; + private static final String ERROR_MSG = "Unable to suspend all connector tasks"; + + private final TaskRepository taskRepository; + + public InternalPauseConnectorCallback(TaskRepository taskRepository) { + this.taskRepository = taskRepository; + } + + @Override + public ConnectorResponse execute() { + var schedulerTask = ObjectName.from(STATE_SCHEMA, SCHEDULER_TASK); + + try { + taskRepository.fetch(schedulerTask).suspend(); + } catch (Exception e) { + return ConnectorResponse.error(ERROR_CODE, ERROR_MSG); + } + + return ConnectorResponse.success(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/pause/PauseConnectorCustomHandler.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/pause/PauseConnectorCustomHandler.java new file mode 100644 index 0000000..6a09dfa --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/pause/PauseConnectorCustomHandler.java @@ -0,0 +1,22 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.lifecycle.pause; + +import com.snowflake.connectors.application.lifecycle.pause.PauseConnectorHandler; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.connectors.common.task.TaskRepository; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** Backend implementation for the custom {@code PUBLIC.PAUSE_CONNECTOR} procedure. */ +public class PauseConnectorCustomHandler { + + public Variant pauseConnector(Session session) { + var internalCallback = new InternalPauseConnectorCallback(TaskRepository.getInstance(session)); + var handler = + PauseConnectorHandler.builder(session) + .withStateValidator(ConnectorResponse::success) + .withCallback(internalCallback) + .build(); + return handler.pauseConnector().toVariant(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/resume/InternalResumeConnectorCallback.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/resume/InternalResumeConnectorCallback.java new file mode 100644 index 0000000..441685e --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/resume/InternalResumeConnectorCallback.java @@ -0,0 +1,39 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.lifecycle.resume; + +import static com.snowflake.connectors.example.ConnectorObjects.SCHEDULER_TASK; +import static com.snowflake.connectors.example.ConnectorObjects.STATE_SCHEMA; + +import com.snowflake.connectors.application.lifecycle.resume.ResumeConnectorCallback; +import com.snowflake.connectors.common.object.ObjectName; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.connectors.common.task.TaskRepository; + +/** + * Custom implementation of {@link ResumeConnectorCallback}, used by the {@link + * ResumeConnectorCustomHandler}, providing resumption of the scheduler system. + */ +public class InternalResumeConnectorCallback implements ResumeConnectorCallback { + + private static final String ERROR_CODE = "RESUME_CONNECTOR_FAILED"; + private static final String ERROR_MSG = "Unable to resume all connector tasks"; + + private final TaskRepository taskRepository; + + public InternalResumeConnectorCallback(TaskRepository taskRepository) { + this.taskRepository = taskRepository; + } + + @Override + public ConnectorResponse execute() { + var schedulerTask = ObjectName.from(STATE_SCHEMA, SCHEDULER_TASK); + + try { + taskRepository.fetch(schedulerTask).resume(); + } catch (Exception e) { + return ConnectorResponse.error(ERROR_CODE, ERROR_MSG); + } + + return ConnectorResponse.success(); + } +} diff --git a/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/resume/ResumeConnectorCustomHandler.java b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/resume/ResumeConnectorCustomHandler.java new file mode 100644 index 0000000..38a517f --- /dev/null +++ b/templates/connectors-native-sdk-template/src/main/java/com/snowflake/connectors/example/lifecycle/resume/ResumeConnectorCustomHandler.java @@ -0,0 +1,22 @@ +/** Copyright (c) 2024 Snowflake Inc. */ +package com.snowflake.connectors.example.lifecycle.resume; + +import com.snowflake.connectors.application.lifecycle.resume.ResumeConnectorHandler; +import com.snowflake.connectors.common.response.ConnectorResponse; +import com.snowflake.connectors.common.task.TaskRepository; +import com.snowflake.snowpark_java.Session; +import com.snowflake.snowpark_java.types.Variant; + +/** Backend implementation for the custom {@code PUBLIC.RESUME_CONNECTOR} procedure. */ +public class ResumeConnectorCustomHandler { + + public Variant resumeConnector(Session session) { + var internalCallback = new InternalResumeConnectorCallback(TaskRepository.getInstance(session)); + var handler = + ResumeConnectorHandler.builder(session) + .withStateValidator(ConnectorResponse::success) + .withCallback(internalCallback) + .build(); + return handler.resumeConnector().toVariant(); + } +}