diff --git a/07-train-pipeline.ipynb b/07-train-pipeline.ipynb index c4c8a1e..282e93f 100644 --- a/07-train-pipeline.ipynb +++ b/07-train-pipeline.ipynb @@ -43,7 +43,7 @@ }, { "cell_type": "code", - "execution_count": 51, + "execution_count": 2, "id": "ffea60e3-5cbe-43b6-bcea-9d83c3e162ff", "metadata": {}, "outputs": [ @@ -73,7 +73,7 @@ }, { "cell_type": "code", - "execution_count": 52, + "execution_count": 3, "id": "cec2257a-f493-42c7-9429-7543af864ff7", "metadata": {}, "outputs": [ @@ -95,7 +95,7 @@ }, { "cell_type": "code", - "execution_count": 53, + "execution_count": 4, "id": "a06fbe24-6018-4d6b-8f81-42516b880696", "metadata": {}, "outputs": [ @@ -173,7 +173,7 @@ }, { "cell_type": "code", - "execution_count": 54, + "execution_count": 5, "id": "79aa47fd-8aa6-4126-b167-bfc678c94e60", "metadata": {}, "outputs": [], @@ -185,7 +185,7 @@ }, { "cell_type": "code", - "execution_count": 55, + "execution_count": 6, "id": "68a4fb97-bed6-4533-92d3-dbc79e134f5c", "metadata": {}, "outputs": [], @@ -229,7 +229,7 @@ }, { "cell_type": "code", - "execution_count": 56, + "execution_count": 7, "id": "0b0bdf3f-6834-4ae0-ab0e-18e64e58031e", "metadata": {}, "outputs": [], @@ -239,7 +239,7 @@ }, { "cell_type": "code", - "execution_count": 57, + "execution_count": 8, "id": "fd32b85b-b44a-4e17-9c23-98fad8efe064", "metadata": {}, "outputs": [ @@ -266,7 +266,7 @@ }, { "cell_type": "code", - "execution_count": 58, + "execution_count": 9, "id": "3868029b-b1ce-45a9-aade-ebdfec8f110f", "metadata": { "tags": [] @@ -278,7 +278,7 @@ }, { "cell_type": "code", - "execution_count": 59, + "execution_count": 10, "id": "aa1292ae-b51d-42bc-83c5-996e827c2633", "metadata": {}, "outputs": [], @@ -289,7 +289,7 @@ }, { "cell_type": "code", - "execution_count": 60, + "execution_count": 11, "id": "6893f359-64de-458e-a964-d42ce581fd86", "metadata": {}, "outputs": [], @@ -308,7 +308,7 @@ }, { "cell_type": "code", - "execution_count": 61, + "execution_count": 12, "id": "b8ab9fab-7a7e-4d7b-b241-19f4d39eb266", "metadata": {}, "outputs": [ @@ -387,7 +387,7 @@ }, { "cell_type": "code", - "execution_count": 62, + "execution_count": 13, "id": "bc26d281-a72f-4825-a5f8-33ff35d2fb03", "metadata": {}, "outputs": [ @@ -556,7 +556,7 @@ }, { "cell_type": "code", - "execution_count": 63, + "execution_count": 14, "id": "bf697751-337f-4757-a5b4-fedc2246cc66", "metadata": {}, "outputs": [ @@ -844,7 +844,7 @@ }, { "cell_type": "code", - "execution_count": 64, + "execution_count": 15, "id": "b7a65b44-3ad4-4168-af40-67289000a434", "metadata": {}, "outputs": [ @@ -961,7 +961,7 @@ }, { "cell_type": "code", - "execution_count": 65, + "execution_count": 16, "id": "758b11c7-0595-4787-a238-11bcbf3378ce", "metadata": {}, "outputs": [ @@ -1068,7 +1068,7 @@ }, { "cell_type": "code", - "execution_count": 66, + "execution_count": 17, "id": "fa1f7c39-6fd8-43c0-ab6a-21fabcd57da4", "metadata": {}, "outputs": [ @@ -1156,7 +1156,7 @@ }, { "cell_type": "code", - "execution_count": 67, + "execution_count": 18, "id": "e00bf2fa-bc5c-4289-b6a0-7cf3b6e525f3", "metadata": {}, "outputs": [ @@ -1244,7 +1244,7 @@ }, { "cell_type": "code", - "execution_count": 68, + "execution_count": 19, "id": "f2a60680-96c2-4aaa-bea9-579bc26f6edf", "metadata": {}, "outputs": [ @@ -1345,7 +1345,7 @@ }, { "cell_type": "code", - "execution_count": 69, + "execution_count": 20, "id": "f073f3ad-c410-4c08-bd50-1b3ffcbe728c", "metadata": {}, "outputs": [ @@ -1443,7 +1443,7 @@ }, { "cell_type": "code", - "execution_count": 70, + "execution_count": 21, "id": "050e6925-788b-4760-b659-ef212c3270ff", "metadata": {}, "outputs": [ @@ -1673,7 +1673,7 @@ }, { "cell_type": "code", - "execution_count": 71, + "execution_count": 22, "id": "c88bec31-9f44-4d8b-a2bd-fce98bfe8c7b", "metadata": {}, "outputs": [ @@ -1831,9 +1831,17 @@ " logging.info(f\"endpoint test complete - {count} predictions sent\")" ] }, + { + "cell_type": "markdown", + "id": "c4942c99-bcd0-4a85-be7b-7bb0d19aab46", + "metadata": {}, + "source": [ + "## Send skewed traffic" + ] + }, { "cell_type": "code", - "execution_count": 72, + "execution_count": 23, "id": "6061d9d3-825a-47e5-ba53-dbae1a9b4b70", "metadata": {}, "outputs": [ @@ -1938,8 +1946,28 @@ " _endpoint = vertex_ai.Endpoint(_endpoint_uri)\n", " logging.info(f\"_endpoint defined\")\n", " \n", + " # ===================================================\n", + " # load test instance\n", + " # ===================================================\n", + " LOCAL_INSTANCE_FILE = 'test_instance_list.pkl'\n", + " GCS_PATH_TO_BLOB = f'{experiment_name}/{experiment_run}/{many_test_instances_gcs_filename}'\n", + " LOADED_TEST_LIST = f'loaded_{LOCAL_INSTANCE_FILE}'\n", + " \n", + " loaded_test_instance = download_blob(\n", + " bucket_name=train_output_gcs_bucket,\n", + " source_gcs_obj=GCS_PATH_TO_BLOB,\n", + " local_filename=LOADED_TEST_LIST\n", + " )\n", + " logging.info(f'loaded_test_instance: {loaded_test_instance}')\n", + " \n", + " filehandler = open(LOADED_TEST_LIST, 'rb')\n", + " LIST_OF_DICTS = pkl.load(filehandler)\n", + " filehandler.close()\n", + " \n", + " logging.info(f'len(LIST_OF_DICTS): {len(LIST_OF_DICTS)}')\n", + " \n", " # ====================================================\n", - " # Send skewed predictions\n", + " # load skew features stats\n", " # ====================================================\n", " SKEW_FEATURES_STATS_FILE = 'skew_feat_stats.pkl'\n", " GCS_PATH_TO_BLOB = f'{experiment_name}/{experiment_run}/{SKEW_FEATURES_STATS_FILE}'\n", @@ -1953,9 +1981,9 @@ " )\n", " logging.info(f'loaded_skew_test_instance: {loaded_skew_test_instance}')\n", " \n", - " filehandler = open(LOADED_SKEW_FEATURES_STATS_FILE, 'rb')\n", - " SKEW_FEATURES = pkl.load(filehandler)\n", - " filehandler.close()\n", + " filehandler_v2 = open(LOADED_SKEW_FEATURES_STATS_FILE, 'rb')\n", + " SKEW_FEATURES = pkl.load(filehandler_v2)\n", + " filehandler_v2.close()\n", " \n", " mean_durations, std_durations = SKEW_FEATURES['pl_duration_ms_new']\n", " mean_num_songs, std_num_songs = SKEW_FEATURES['num_pl_songs_new']\n", @@ -2011,7 +2039,7 @@ " # send skewed traffic\n", " monitoring_test(\n", " endpoint=_endpoint, \n", - " instances=LIST_OF_INSTANCES,\n", + " instances=LIST_OF_DICTS,\n", " skew_feat_stat=SKEW_FEATURES,\n", " start=2, \n", " end=8\n", @@ -2028,7 +2056,7 @@ }, { "cell_type": "code", - "execution_count": 73, + "execution_count": 24, "id": "53209e5c-2050-4333-a994-4133b8ee58b5", "metadata": {}, "outputs": [ @@ -2305,7 +2333,7 @@ }, { "cell_type": "code", - "execution_count": 74, + "execution_count": 25, "id": "18a86934-2700-4ce3-b0c9-673fdce5a430", "metadata": {}, "outputs": [ @@ -2342,7 +2370,7 @@ }, { "cell_type": "code", - "execution_count": 75, + "execution_count": 26, "id": "8a0a9956-fb25-4d7a-9754-97d5868f7ffd", "metadata": {}, "outputs": [ @@ -2388,7 +2416,7 @@ }, { "cell_type": "code", - "execution_count": 76, + "execution_count": 27, "id": "28d8b91d-9772-49da-b7a4-4ab33a12c0c4", "metadata": {}, "outputs": [ @@ -2426,7 +2454,7 @@ }, { "cell_type": "code", - "execution_count": 77, + "execution_count": 28, "id": "cc68e120-39c8-4974-92a2-228cd0173853", "metadata": {}, "outputs": [ @@ -2458,7 +2486,7 @@ }, { "cell_type": "code", - "execution_count": 78, + "execution_count": 29, "id": "b3beb06e-05a8-470a-9d54-ff02a03758a9", "metadata": {}, "outputs": [ @@ -2489,7 +2517,7 @@ }, { "cell_type": "code", - "execution_count": 79, + "execution_count": 30, "id": "f9c1846e-e63f-4c63-8c9c-049f99d503c3", "metadata": {}, "outputs": [ @@ -2521,7 +2549,7 @@ " 'track_time_signature_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None)}" ] }, - "execution_count": 79, + "execution_count": 30, "metadata": {}, "output_type": "execute_result" } @@ -2533,7 +2561,7 @@ }, { "cell_type": "code", - "execution_count": 80, + "execution_count": 31, "id": "ab61ac70-9da5-4ed6-b1f8-aecc167e8192", "metadata": {}, "outputs": [], @@ -2563,7 +2591,7 @@ }, { "cell_type": "code", - "execution_count": 81, + "execution_count": 32, "id": "84158b9e-45d9-4ed3-802d-a2eab948535a", "metadata": {}, "outputs": [], @@ -2576,7 +2604,7 @@ }, { "cell_type": "code", - "execution_count": 82, + "execution_count": 33, "id": "75b11787-0c10-4f3b-b76f-0b10dbfd5868", "metadata": {}, "outputs": [], @@ -2609,7 +2637,7 @@ }, { "cell_type": "code", - "execution_count": 83, + "execution_count": 34, "id": "cf9ff3ab-ed67-4b0f-90f9-ed97c1e88a58", "metadata": {}, "outputs": [ @@ -2634,7 +2662,7 @@ }, { "cell_type": "code", - "execution_count": 84, + "execution_count": 35, "id": "6587613f-0351-4fa2-8fd2-aeed12b1eab1", "metadata": {}, "outputs": [], @@ -2653,7 +2681,7 @@ }, { "cell_type": "code", - "execution_count": 85, + "execution_count": 36, "id": "8bc940d1-aecb-480b-9b00-cb5c3afcda81", "metadata": {}, "outputs": [ @@ -2688,7 +2716,7 @@ }, { "cell_type": "code", - "execution_count": 86, + "execution_count": 37, "id": "aef27ee8-e094-4293-a14d-23e87f687a75", "metadata": {}, "outputs": [ @@ -2777,7 +2805,7 @@ }, { "cell_type": "code", - "execution_count": 87, + "execution_count": 38, "id": "c29169fd-6a15-48a4-99f1-c4bdfb754f89", "metadata": {}, "outputs": [ @@ -2815,7 +2843,7 @@ }, { "cell_type": "code", - "execution_count": 88, + "execution_count": 39, "id": "d8fa7a02-6d4d-43ba-bce4-5289f86db2ce", "metadata": {}, "outputs": [ @@ -2937,7 +2965,7 @@ }, { "cell_type": "code", - "execution_count": 89, + "execution_count": 40, "id": "e4d826dc-0dfc-44de-99f1-79d68ecd559d", "metadata": {}, "outputs": [ @@ -2971,7 +2999,7 @@ }, { "cell_type": "code", - "execution_count": 90, + "execution_count": 41, "id": "f18863e6-1940-468a-adcf-985d879bb21a", "metadata": {}, "outputs": [ @@ -2981,7 +3009,7 @@ "'gs://ndr-v1-hybrid-vertex-bucket/tfrs-pipe-v1/run-20230926-120445'" ] }, - "execution_count": 90, + "execution_count": 41, "metadata": {}, "output_type": "execute_result" } @@ -3011,7 +3039,7 @@ }, { "cell_type": "code", - "execution_count": 91, + "execution_count": 42, "id": "913e0f5d-abc0-4d78-915e-7a28489e3d23", "metadata": {}, "outputs": [ @@ -3051,7 +3079,7 @@ }, { "cell_type": "code", - "execution_count": 92, + "execution_count": 43, "id": "41530012-e7c5-4574-b1e5-e40b51a1f90b", "metadata": {}, "outputs": [ @@ -3093,7 +3121,7 @@ }, { "cell_type": "code", - "execution_count": 93, + "execution_count": 44, "id": "f1eb197b-ea56-4b83-aa92-8198a75711ee", "metadata": {}, "outputs": [ @@ -3124,7 +3152,7 @@ }, { "cell_type": "code", - "execution_count": 95, + "execution_count": 45, "id": "5e6f5b61-ab53-4619-b854-8c5a4da210e4", "metadata": {}, "outputs": [], @@ -3516,10 +3544,19 @@ }, { "cell_type": "code", - "execution_count": 96, + "execution_count": 46, "id": "65c65526-e42c-459c-a3a2-db8c84b6cc43", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/jupyter/.local/lib/python3.7/site-packages/kfp/v2/compiler/compiler.py:1293: FutureWarning: APIs imported from the v1 namespace (e.g. kfp.dsl, kfp.components, etc) will not be supported by the v2 compiler since v2.0.0\n", + " category=FutureWarning,\n" + ] + } + ], "source": [ "# ! rm -f custom_container_pipeline_spec.json\n", "\n", @@ -3542,7 +3579,7 @@ }, { "cell_type": "code", - "execution_count": 97, + "execution_count": 47, "id": "b3c4df38-23c9-449c-bbd2-bb6432d6e6a6", "metadata": {}, "outputs": [ @@ -3565,7 +3602,7 @@ }, { "cell_type": "code", - "execution_count": 98, + "execution_count": 48, "id": "c6dba00a-d42b-40f5-8135-777caaae79a2", "metadata": {}, "outputs": [ diff --git a/README.md b/README.md index cb68a76..eb0425d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The end to end example (with public data) follows this architecture: ![](img/arch.png) -### Notebook Overview +### Notebook Overview - TODO: needs update 0. [00-load-core-data-to-bq](00-load-core-data-to-bq.ipynb) Extract from the zip file and upload to BQ. This notebook then enriches features for the playlist songs diff --git a/src/train_pipes/send_skewed_traffic.py b/src/train_pipes/send_skewed_traffic.py new file mode 100644 index 0000000..7900459 --- /dev/null +++ b/src/train_pipes/send_skewed_traffic.py @@ -0,0 +1,189 @@ + +import kfp +from typing import Any, Callable, Dict, NamedTuple, Optional, List +from kfp.v2.dsl import ( + Artifact, Dataset, Input, InputPath, + Model, Output, OutputPath, component, Metrics +) +@kfp.v2.dsl.component( + base_image="python:3.9", + packages_to_install=[ + 'google-cloud-aiplatform==1.26.1', + 'google-cloud-pipeline-components', + 'google-cloud-storage', + 'tensorflow==2.11.0', + 'numpy' + ], +) +def send_skewed_traffic( + project: str, + location: str, + version: str, + train_output_gcs_bucket: str, + experiment_name: str, + experiment_run: str, + endpoint: str, # Input[Artifact], + # feature_dict: dict, + # metrics: Output[Metrics], +): + + import logging + from datetime import datetime + import time + import numpy as np + import pickle as pkl + + from google.cloud import aiplatform as vertex_ai + + from google.cloud import storage + from google.cloud.storage.bucket import Bucket + from google.cloud.storage.blob import Blob + + from google.protobuf import json_format + from google.protobuf.json_format import Parse + from google.protobuf.struct_pb2 import Value + from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources + + import tensorflow as tf + + logging.getLogger().setLevel(logging.INFO) + + vertex_ai.init( + project=project, + location=location, + ) + storage_client = storage.Client(project=project) + + # ==================================================== + # helper functions + # ==================================================== + def download_blob(bucket_name, source_gcs_obj, local_filename): + """Uploads a file to the bucket.""" + # storage_client = storage.Client(project=project_number) + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(source_gcs_obj) + blob.download_to_filename(local_filename) + + filehandler = open(f'{local_filename}', 'rb') + loaded_dict = pkl.load(filehandler) + filehandler.close() + + logging.info(f"File {local_filename} downloaded from gs://{bucket_name}/{source_gcs_obj}") + + return loaded_dict + + # ==================================================== + # get deployed model endpoint + # ==================================================== + logging.info(f"Endpoint = {endpoint}") + gcp_resources = Parse(endpoint, GcpResources()) + logging.info(f"gcp_resources = {gcp_resources}") + + _endpoint_resource = gcp_resources.resources[0].resource_uri + logging.info(f"_endpoint_resource = {_endpoint_resource}") + + _endpoint_uri = "/".join(_endpoint_resource.split("/")[-8:-2]) + logging.info(f"_endpoint_uri = {_endpoint_uri}") + + # define endpoint resource in component + _endpoint = vertex_ai.Endpoint(_endpoint_uri) + logging.info(f"_endpoint defined") + + # =================================================== + # load test instance + # =================================================== + LOCAL_INSTANCE_FILE = 'test_instance_list.pkl' + GCS_PATH_TO_BLOB = f'{experiment_name}/{experiment_run}/{many_test_instances_gcs_filename}' + LOADED_TEST_LIST = f'loaded_{LOCAL_INSTANCE_FILE}' + + loaded_test_instance = download_blob( + bucket_name=train_output_gcs_bucket, + source_gcs_obj=GCS_PATH_TO_BLOB, + local_filename=LOADED_TEST_LIST + ) + logging.info(f'loaded_test_instance: {loaded_test_instance}') + + filehandler = open(LOADED_TEST_LIST, 'rb') + LIST_OF_DICTS = pkl.load(filehandler) + filehandler.close() + + logging.info(f'len(LIST_OF_DICTS): {len(LIST_OF_DICTS)}') + + # ==================================================== + # load skew features stats + # ==================================================== + SKEW_FEATURES_STATS_FILE = 'skew_feat_stats.pkl' + GCS_PATH_TO_BLOB = f'{experiment_name}/{experiment_run}/{SKEW_FEATURES_STATS_FILE}' + LOADED_SKEW_FEATURES_STATS_FILE = f"loaded_{SKEW_FEATURES_STATS_FILE}" + logging.info(f'loading: {LOADED_SKEW_FEATURES_STATS_FILE}') + + loaded_skew_test_instance = download_blob( + bucket_name=train_output_gcs_bucket, + source_gcs_obj=GCS_PATH_TO_BLOB, + local_filename=LOADED_SKEW_FEATURES_STATS_FILE + ) + logging.info(f'loaded_skew_test_instance: {loaded_skew_test_instance}') + + filehandler_v2 = open(LOADED_SKEW_FEATURES_STATS_FILE, 'rb') + SKEW_FEATURES = pkl.load(filehandler_v2) + filehandler_v2.close() + + mean_durations, std_durations = SKEW_FEATURES['pl_duration_ms_new'] + mean_num_songs, std_num_songs = SKEW_FEATURES['num_pl_songs_new'] + mean_num_artists, std_num_artists = SKEW_FEATURES['num_pl_artists_new'] + mean_num_albums, std_num_albums = SKEW_FEATURES['num_pl_albums_new'] + + logging.info(f"std_durations : {round(std_durations, 0)}") + logging.info(f"std_num_songs : {round(std_num_songs, 0)}") + logging.info(f"std_num_artists : {round(std_num_artists, 0)}") + logging.info(f"std_num_albums : {round(std_num_albums, 0)}\n") + + def monitoring_test(endpoint, instances, skew_feat_stat, start=2, end=4): + + mean_durations, std_durations = skew_feat_stat['pl_duration_ms_new'] + mean_num_songs, std_num_songs = skew_feat_stat['num_pl_songs_new'] + mean_num_artists, std_num_artists = skew_feat_stat['num_pl_artists_new'] + mean_num_albums, std_num_albums = skew_feat_stat['num_pl_albums_new'] + + logging.info(f"std_durations : {round(std_durations, 0)}") + logging.info(f"std_num_songs : {round(std_num_songs, 0)}") + logging.info(f"std_num_artists : {round(std_num_artists, 0)}") + logging.info(f"std_num_albums : {round(std_num_albums, 0)}\n") + + total_preds = 0 + + for multiplier in range(start, end+1): + + print(f"multiplier: {multiplier}") + + pred_count = 0 + + for example in instances: + list_dict = {} + + example['pl_duration_ms_new'] = round(std_durations * multiplier, 0) + example['num_pl_songs_new'] = round(std_num_songs * multiplier, 0) + example['num_pl_artists_new'] = round(std_num_artists * multiplier, 0) + example['num_pl_albums_new'] = round(std_num_albums * multiplier, 0) + # list_of_skewed_instances.append(example) + + response = endpoint.predict(instances=[example]) + + if pred_count > 0 and pred_count % 250 == 0: + print(f"pred_count: {pred_count}") + + pred_count += 1 + total_preds += 1 + + logging.info(f"sent {pred_count} pred requests with {multiplier}X multiplier") + + logging.info(f"sent {total_preds} total pred requests") + + # send skewed traffic + monitoring_test( + endpoint=_endpoint, + instances=LIST_OF_DICTS, + skew_feat_stat=SKEW_FEATURES, + start=2, + end=8 + )