Skip to content

Commit

Permalink
Add connectors native sdk template (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-twazny authored Jun 3, 2024
1 parent e7980ea commit 79cea83
Show file tree
Hide file tree
Showing 56 changed files with 3,197 additions and 0 deletions.
113 changes: 113 additions & 0 deletions templates/connectors-native-sdk-template/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
CONNECTION=native_sdk_connection
USERNAME=$(shell whoami)
APP_PACKAGE_NAME=NATIVE_SDK_CONNECTOR_TEMPLATE_$(USERNAME)
INSTANCE_NAME=$(APP_PACKAGE_NAME)_INSTANCE
SCHEMA_NAME=TEST_SCHEMA
STAGE_NAME=TEST_STAGE
VERSION=1_0
VERSION_DIR_PATH=@$(APP_PACKAGE_NAME).$(SCHEMA_NAME).$(STAGE_NAME)/$(VERSION)

# ***********
# CORE FLOW
# ***********

.PHONY: copy_internal_components
copy_internal_components:
./gradlew copyInternalComponents

.PHONY: copy_sdk_components
copy_sdk_components:
./gradlew copySdkComponents

.PHONY: prepare_app_package
prepare_app_package:
./gradlew prepareAppPackage \
-Pconnection=$(CONNECTION) \
-PappPackage=$(APP_PACKAGE_NAME) \
-Pschema=$(SCHEMA_NAME) \
-Pstage=$(STAGE_NAME)

.PHONY: deploy_connector
deploy_connector:
./gradlew deployConnector \
-Pconnection=$(CONNECTION) \
-PappPackage=$(APP_PACKAGE_NAME) \
-Pschema=$(SCHEMA_NAME) \
-Pstage=$(STAGE_NAME) \
-PappVersion=$(VERSION)

# ****************************************
# CREATE INSTANCE FROM VERSION DIRECTORY
# ****************************************

.PHONY: create_app_instance_from_version_dir
create_app_instance_from_version_dir:
./gradlew createAppInstance \
-Pconnection=$(CONNECTION) \
-PappPackage=$(APP_PACKAGE_NAME) \
-PversionDirPath=$(VERSION_DIR_PATH)

.PHONY: complex_create_app_instance_from_version_dir
complex_create_app_instance_from_version_dir:
make copy_internal_components
make copy_sdk_components
make prepare_app_package
make deploy_connector
make create_app_instance_from_version_dir

# **********************************
# CREATE INSTANCE FROM APP VERSION
# **********************************

.PHONY: create_new_version
create_new_version:
./gradlew createNewVersion \
-Pconnection=$(CONNECTION) \
-PappPackage=$(APP_PACKAGE_NAME) \
-PversionDirPath=$(VERSION_DIR_PATH) \
-PappVersion=$(VERSION)

.PHONY: create_app_instance_from_app_version
create_app_instance_from_app_version:
./gradlew createAppInstance \
-Pconnection=$(CONNECTION) \
-PappPackage=$(APP_PACKAGE_NAME) \
-PappVersion=$(VERSION)

.PHONY: complex_create_app_instance_from_app_version
complex_create_app_instance_from_app_version:
make copy_internal_components
make copy_sdk_components
make prepare_app_package
make deploy_connector
make create_new_version
make create_app_instance_from_app_version

# ******************
# ADDITIONAL TASKS
# ******************

.PHONY: drop_application_package
drop_application_package:
snowsql -c $(CONNECTION) \
-q "DROP APPLICATION PACKAGE IF EXISTS $(APP_PACKAGE_NAME)"

.PHONY: drop_application_instance
drop_application_instance:
snowsql -c $(CONNECTION) \
-q "DROP APPLICATION IF EXISTS $(INSTANCE_NAME) CASCADE"

.PHONY: drop_application
drop_application:
snowsql -c $(CONNECTION) \
-q "DROP APPLICATION IF EXISTS $(INSTANCE_NAME) CASCADE; DROP APPLICATION PACKAGE IF EXISTS $(APP_PACKAGE_NAME)"

.PHONY: reinstall_application_from_version_dir
reinstall_application_from_version_dir:
make drop_application
make complex_create_app_instance_from_version_dir

.PHONY: reinstall_application_from_app_version
reinstall_application_from_app_version:
make drop_application
make complex_create_app_instance_from_app_version
27 changes: 27 additions & 0 deletions templates/connectors-native-sdk-template/app/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2024 Snowflake Inc.

manifest_version: 1
artifacts:
setup_script: setup.sql
extension_code: true
default_streamlit: STREAMLIT.NATIVE_SDK_TEMPLATE_ST
version:
name: "1.0"
label: "connectors-native-sdk-connector-template"
comment: "A template providing a working connector out of the box to accelerate initial development"
configuration:
trace_level: ON_EVENT
log_level: info
privileges:
- EXECUTE TASK:
description: "Needed to run ingestion tasks"
- CREATE DATABASE:
description: "Needed to create a database for ingested data"
references:
- WAREHOUSE_REFERENCE:
label: "Warehouse used for ingestion"
description: "Warehouse, which will be used to schedule ingestion tasks"
privileges:
- USAGE
object_type: WAREHOUSE
register_callback: PUBLIC.REGISTER_REFERENCE
130 changes: 130 additions & 0 deletions templates/connectors-native-sdk-template/app/setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
-- Copyright (c) 2024 Snowflake Inc.

-- CONNECTORS-NATIVE-SDK
EXECUTE IMMEDIATE FROM 'native-connectors-sdk-components/all.sql';
EXECUTE IMMEDIATE FROM 'native-connectors-sdk-components/task_reactor.sql';

-- CUSTOM CONNECTOR OBJECTS
CREATE OR ALTER VERSIONED SCHEMA STREAMLIT;
GRANT USAGE ON SCHEMA STREAMLIT TO APPLICATION ROLE ADMIN;

CREATE OR REPLACE STREAMLIT STREAMLIT.NATIVE_SDK_TEMPLATE_ST
FROM '/streamlit'
MAIN_FILE = 'streamlit_app.py';
GRANT USAGE ON STREAMLIT STREAMLIT.NATIVE_SDK_TEMPLATE_ST TO APPLICATION ROLE ADMIN;

-- SNOWFLAKE REFERENCE MECHANISM
CREATE PROCEDURE PUBLIC.REGISTER_REFERENCE(ref_name STRING, operation STRING, ref_or_alias STRING)
RETURNS STRING
LANGUAGE SQL
AS
BEGIN
CASE (operation)
WHEN 'ADD' THEN
SELECT SYSTEM$SET_REFERENCE(:ref_name, :ref_or_alias);
WHEN 'REMOVE' THEN
SELECT SYSTEM$REMOVE_REFERENCE(:ref_name);
WHEN 'CLEAR' THEN
SELECT SYSTEM$REMOVE_REFERENCE(:ref_name);
ELSE RETURN 'unknown operation: ' || operation;
END CASE;
RETURN NULL;
END;
GRANT USAGE ON PROCEDURE PUBLIC.REGISTER_REFERENCE(STRING, STRING, STRING) TO APPLICATION ROLE ADMIN;

-----------------WIZARD-----------------
-- PREREQUISITES
MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
('1',
'Sample prerequisite',
'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
'https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/prerequisites',
NULL,
NULL,
1
)
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);

-- CONNECTION CONFIGURATION
CREATE OR REPLACE PROCEDURE PUBLIC.SET_CONNECTION_CONFIGURATION(connection_configuration VARIANT)
RETURNS VARIANT
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('/connectors-native-sdk-template.jar', '/connectors-native-sdk.jar')
HANDLER = 'com.snowflake.connectors.example.configuration.connection.TemplateConnectionConfigurationHandler.setConnectionConfiguration';
GRANT USAGE ON PROCEDURE PUBLIC.SET_CONNECTION_CONFIGURATION(VARIANT) TO APPLICATION ROLE ADMIN;

CREATE OR REPLACE PROCEDURE PUBLIC.TEST_CONNECTION()
RETURNS VARIANT
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('/connectors-native-sdk-template.jar', '/connectors-native-sdk.jar')
HANDLER = 'com.snowflake.connectors.example.configuration.connection.TemplateConnectionValidator.testConnection';

-- FINALIZE CONFIGURATION
CREATE OR REPLACE PROCEDURE PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION(CUSTOM_CONFIGURATION VARIANT)
RETURNS VARIANT
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('/connectors-native-sdk-template.jar', '/connectors-native-sdk.jar')
HANDLER = 'com.snowflake.connectors.example.configuration.finalize.TemplateFinalizeConnectorConfigurationCustomHandler.finalizeConnectorConfiguration';
GRANT USAGE ON PROCEDURE PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION(VARIANT) TO APPLICATION ROLE ADMIN;

-----------------TASK REACTOR-----------------
CREATE OR REPLACE PROCEDURE PUBLIC.TEMPLATE_WORKER(worker_id number, task_reactor_schema string)
RETURNS STRING
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:latest')
IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar')
HANDLER = 'com.snowflake.connectors.example.ingestion.TemplateWorker.executeWork';

CALL TASK_REACTOR.CREATE_INSTANCE_OBJECTS(
'EXAMPLE_CONNECTOR_TASK_REACTOR',
'PUBLIC.TEMPLATE_WORKER',
'VIEW',
'EXAMPLE_CONNECTOR_TASK_REACTOR.WORK_SELECTOR_VIEW',
-- TODO: Below NULL causes the application to create and use default EMPTY_EXPIRED_WORK_SELECTOR.
-- If Task Reactor work items shall be removed after some time of not being taken up by the instance,
-- then this default view can be replaced with the custom implementation
-- or this parameter can be changed to the name of the custom view.
-- You can pass the name of the view that will be created even after this procedure finishes its execution.
NULL
);

CREATE VIEW EXAMPLE_CONNECTOR_TASK_REACTOR.WORK_SELECTOR_VIEW AS SELECT * FROM EXAMPLE_CONNECTOR_TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;

CREATE OR REPLACE PROCEDURE PUBLIC.RUN_SCHEDULER_ITERATION()
RETURNS VARIANT
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar')
HANDLER = 'com.snowflake.connectors.example.integration.SchedulerIntegratedWithTaskReactorHandler.runIteration';

-----------------LIFECYCLE-----------------
CREATE OR REPLACE PROCEDURE PUBLIC.PAUSE_CONNECTOR()
RETURNS VARIANT
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar')
HANDLER = 'com.snowflake.connectors.example.lifecycle.pause.PauseConnectorCustomHandler.pauseConnector';
GRANT USAGE ON PROCEDURE PUBLIC.PAUSE_CONNECTOR() TO APPLICATION ROLE ADMIN;

CREATE OR REPLACE PROCEDURE PUBLIC.RESUME_CONNECTOR()
RETURNS VARIANT
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('/connectors-native-sdk.jar', '/connectors-native-sdk-template.jar')
HANDLER = 'com.snowflake.connectors.example.lifecycle.resume.ResumeConnectorCustomHandler.resumeConnector';
GRANT USAGE ON PROCEDURE PUBLIC.RESUME_CONNECTOR() TO APPLICATION ROLE ADMIN;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) 2024 Snowflake Inc.

import streamlit as st
from daily_use.home_page import home_page
from daily_use.data_sync_page import data_sync_page
from daily_use.settings_page import settings_page


def daily_use_page():
home_tab, data_sync_tab, settings_tab = st.tabs(["Home", "Data Sync", "Settings"])

with home_tab:
home_page()

with data_sync_tab:
data_sync_page()

with settings_tab:
settings_page()
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2024 Snowflake Inc.

import streamlit as st
from daily_use.sync_status_bar import sync_status_bar
from native_sdk_api.resource_management import (
create_resource,
fetch_resources
)


def queue_resource():
# TODO: add additional properties here and pass them to create_resource function
resource_name = st.session_state.get("resource_name")

if not resource_name:
st.error("Resource name cannot be empty")
return

result = create_resource(resource_name)
if result.is_ok():
st.success("Resource created")
else:
st.error(result.get_message())


def data_sync_page():
sync_status_bar()
st.subheader("Enabled resources")
df = fetch_resources().to_pandas()

with st.form("add_new_resource_form", clear_on_submit=True):
st.caption("Enable new resource. Fields required to enable a parameter might differ between the various source systems, so the field list will have to be customized here.")
st.caption("For more information on resource definitions check the [documentation](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/flow/resource_definition_and_ingestion_processes) and the [reference](https://other-docs.snowflake.com/LIMITEDACCESS/connector-sdk/reference/resource_definition_and_ingestion_processes_reference)")
# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
"Resource name",
key="resource_name",
)
_ = st.form_submit_button(
"Queue ingestion",
on_click=queue_resource
)

st.table(df)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2024 Snowflake Inc.

import streamlit as st
from native_sdk_api.observability import get_aggregated_connector_stats


def home_page():
data_frame = get_aggregated_connector_stats()
if not data_frame.empty:
st.vega_lite_chart(
data_frame,
{
"mark": {
"type": "bar",
"width": {
"band": 0.8 if len(data_frame.index) == 1 else 0.95,
},
},
"encoding": {
"x": {"field": "RUN_DATE", "type": "temporal", "timeUnit": "dayhours"},
"y": {"field": "UPDATED_ROWS", "type": "quantitative", "aggregate": "mean"},
},
},
use_container_width=True)
else:
st.info("No ingested rows in the chosen period of time. Start the ingestion in order to display the chart.")
Loading

0 comments on commit 79cea83

Please sign in to comment.