Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2164] Add Spark SQL into Charmed Spark Rock image #74

Merged
merged 15 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
.idea
*.rock
*.tar
.make_cache
env/
.make_cache/
derby.log
metastore_db/
spark-sql.out
3 changes: 3 additions & 0 deletions files/spark/bin/spark-client.spark-sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

python3 -m spark8t.cli.spark_sql $*
4 changes: 3 additions & 1 deletion rockcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ parts:
- python3-pip
overlay-script: |
mkdir -p $CRAFT_PART_INSTALL/opt/spark8t/python/dist
pip install --target=${CRAFT_PART_INSTALL}/opt/spark8t/python/dist https://github.com/canonical/spark-k8s-toolkit-py/releases/download/v0.0.3/spark8t-0.0.3-py3-none-any.whl
pip install --target=${CRAFT_PART_INSTALL}/opt/spark8t/python/dist https://github.com/canonical/spark-k8s-toolkit-py/releases/download/v0.0.5/spark8t-0.0.5-py3-none-any.whl
rm usr/bin/pip*
stage:
- opt/spark8t/python/dist
Expand Down Expand Up @@ -183,6 +183,7 @@ parts:
bin/history-server.sh: opt/pebble/history-server.sh
bin/thrift-server.sh: opt/pebble/thrift-server.sh
bin/spark-client.pyspark: opt/spark-client/python/bin/spark-client.pyspark
bin/spark-client.spark-sql: opt/spark-client/python/bin/spark-client.spark-sql
bin/spark-client.service-account-registry: opt/spark-client/python/bin/spark-client.service-account-registry
bin/spark-client.spark-shell: opt/spark-client/python/bin/spark-client.spark-shell
bin/spark-client.spark-submit: opt/spark-client/python/bin/spark-client.spark-submit
Expand All @@ -192,6 +193,7 @@ parts:
- opt/pebble/history-server.sh
- opt/pebble/thrift-server.sh
- opt/spark-client/python/bin/spark-client.pyspark
- opt/spark-client/python/bin/spark-client.spark-sql
- opt/spark-client/python/bin/spark-client.service-account-registry
- opt/spark-client/python/bin/spark-client.spark-shell
- opt/spark-client/python/bin/spark-client.spark-submit
Expand Down
63 changes: 59 additions & 4 deletions tests/integration/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ create_s3_bucket(){
# Creates a S3 bucket with the given name.
S3_ENDPOINT=$(get_s3_endpoint)
BUCKET_NAME=$1
aws --endpoint-url "http://$S3_ENDPOINT" s3api create-bucket --bucket "$BUCKET_NAME"
aws s3api create-bucket --bucket "$BUCKET_NAME"
echo "Created S3 bucket ${BUCKET_NAME}"
}

delete_s3_bucket(){
# Deletes a S3 bucket with the given name.
S3_ENDPOINT=$(get_s3_endpoint)
BUCKET_NAME=$1
aws --endpoint-url "http://$S3_ENDPOINT" s3 rb "s3://$BUCKET_NAME" --force
aws s3 rb "s3://$BUCKET_NAME" --force
echo "Deleted S3 bucket ${BUCKET_NAME}"
}

Expand All @@ -228,7 +228,7 @@ copy_file_to_s3_bucket(){
S3_ENDPOINT=$(get_s3_endpoint)

# Copy the file to S3 bucket
aws --endpoint-url "http://$S3_ENDPOINT" s3 cp $FILE_PATH s3://"$BUCKET_NAME"/"$BASE_NAME"
aws s3 cp $FILE_PATH s3://"$BUCKET_NAME"/"$BASE_NAME"
echo "Copied file ${FILE_PATH} to S3 bucket ${BUCKET_NAME}"
}

Expand Down Expand Up @@ -496,6 +496,56 @@ test_spark_shell_in_pod() {
run_spark_shell_in_pod $NAMESPACE spark
}

run_spark_sql_in_pod() {
echo "run_spark_sql_in_pod ${1} ${2}"

NAMESPACE=$1
USERNAME=$2

SPARK_SQL_COMMANDS=$(cat ./tests/integration/resources/test-spark-sql.sql)
create_s3_bucket test

echo -e "$(kubectl -n $NAMESPACE exec testpod -- \
env \
UU="$USERNAME" \
NN="$NAMESPACE" \
CMDS="$SPARK_SQL_COMMANDS" \
IM=$(spark_image) \
ACCESS_KEY=$(get_s3_access_key) \
SECRET_KEY=$(get_s3_secret_key) \
S3_ENDPOINT=$(get_s3_endpoint) \
/bin/bash -c 'echo "$CMDS" | spark-client.spark-sql \
--username $UU \
--namespace $NN \
--conf spark.kubernetes.container.image=$IM \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint=$S3_ENDPOINT \
--conf spark.hadoop.fs.s3a.access.key=$ACCESS_KEY \
--conf spark.hadoop.fs.s3a.secret.key=$SECRET_KEY \
--conf spark.driver.extraJavaOptions='-Dderby.system.home=/tmp/derby' \
--conf spark.sql.warehouse.dir=s3a://test/warehouse')" > spark-sql.out

# derby.system.home=/tmp/derby is needed because
# kubectl exec runs commands with `/` as working directory
# and by default derby.system.home has value `.`, the current working directory
# (for which _daemon_ user has no permission on)

num_rows_inserted=$(cat spark-sql.out | grep "^Inserted Rows:" | rev | cut -d' ' -f1 | rev )
echo -e "${num_rows_inserted} rows were inserted."
rm spark-sql.out
delete_s3_bucket test
if [ "${num_rows_inserted}" != "3" ]; then
echo "ERROR: Testing spark-sql failed. ${num_rows_inserted} out of 3 rows were inserted. Aborting with exit code 1."
exit 1
fi
}

test_spark_sql_in_pod() {
run_spark_sql_in_pod tests spark
}

run_pyspark_in_pod() {
echo "run_pyspark_in_pod ${1} ${2}"

Expand Down Expand Up @@ -550,6 +600,12 @@ echo -e "##################################"

(setup_user_context && test_pyspark_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

echo -e "##################################"
echo -e "RUN SPARK SQL IN POD"
echo -e "##################################"

(setup_user_context && test_spark_sql_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

echo -e "##################################"
echo -e "RUN EXAMPLE JOB WITH POD TEMPLATE"
echo -e "##################################"
Expand All @@ -562,7 +618,6 @@ echo -e "########################################"

(setup_user_context && test_example_job_in_pod_with_metrics && cleanup_user_success) || cleanup_user_failure_in_pod


echo -e "########################################"
echo -e "RUN EXAMPLE JOB WITH ERRORS"
echo -e "########################################"
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/resources/test-spark-sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE DATABASE IF NOT EXISTS sparkdb;
USE sparkdb;
CREATE TABLE IF NOT EXISTS sparkdb.testTable (number Int, word String);
INSERT INTO sparkdb.testTable VALUES (1, "foo"), (2, "bar"), (3, "grok");
SELECT CONCAT("Inserted Rows: ", COUNT(*)) FROM sparkdb.testTable;
EXIT;
7 changes: 5 additions & 2 deletions tests/integration/setup-aws-cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ sudo snap install aws-cli --classic
ACCESS_KEY=$(kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_ACCESS_KEY}' | base64 -d)
SECRET_KEY=$(kubectl get secret -n minio-operator microk8s-user-1 -o jsonpath='{.data.CONSOLE_SECRET_KEY}' | base64 -d)

S3_BUCKET="spark"
# Get S3 endpoint from MinIO
S3_ENDPOINT=$(kubectl get service minio -n minio-operator -o jsonpath='{.spec.clusterIP}')

DEFAULT_REGION="us-east-2"

# Configure AWS CLI credentials
aws configure set aws_access_key_id $ACCESS_KEY
aws configure set aws_secret_access_key $SECRET_KEY
aws configure set default.region $DEFAULT_REGION
echo "AWS CLI credentials set successfully"
aws configure set endpoint_url "http://$S3_ENDPOINT"
echo "AWS CLI credentials set successfully"
Loading