-
Notifications
You must be signed in to change notification settings - Fork 281
/
bqml_generate_text.sqlx
206 lines (178 loc) · 7.3 KB
/
bqml_generate_text.sqlx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
config { hasOutput: true }
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE OR REPLACE PROCEDURE ${self()}(source_table STRING, target_table STRING, ml_model STRING, content_column STRING, key_columns ARRAY<STRING>, options_string STRING)
OPTIONS (
description='''Iteratively executes the BQML.GENERATE_TEXT function to ensure all source table rows have generated output in the destination table,
handling potential retryable errors gracefully along the way.
Any rows already present in the destination table are ignored, so this procedure is safe to call multiple times.
@param STRING source_table : The full path of the BigQuery table containing the input prompts for text generation. Path format - "project.dataset.table" or "dataset.table"
@param STRING target_table : The full path of the BigQuery table where the generated text will be stored. This table will be created if it does not exist.
@param STRING ml_model : The full path of the text model to be used.
@param STRING prompt_column : The name of the column in the `source_table` containing the text promt.
@param ARRAY<STRING> key_columns : An array of column names in the `source_table` that uniquely identify each row. '*' is not a valid value.
@param STRING options_string : A JSON string containing additional optional parameters for the text generation process. Set to '{}' if you want to use defaults for all options parameters.
A sample fully-filled JSON option string would look like:
'{
"batch_size": 500,
"termination_time_secs": 43200,
"source_filter": "LENGTH(text) < 1000",
"projection_columns": ["type", "text"],
"ml_options": "STRUCT(0.5 as temperature)"
}'
The parameters within the options string are documented below as:
INT64 batch_size : The number of rows to process in each child job during the procedure. A larger value will reduce the overhead of multiple child jobs, but needs to be small enough to complete in a single job run. Defaults to 1000.
INT64 termination_time_secs : The maximum time (in seconds) the script should run before terminating. Defaults to 82800 (23 hours).
STRING source_filter : An optional filter applied as a SQL WHERE clause to the source table before processing. Defaults to 'TRUE'.
ARRAY<STRING> projection_columns : An array of column to copy from the source table into the destination table. '*' is not a valid value.
STRING ml_options : A JSON string representing additional struct options defined in ML.GENERATE_TEXT. Must be of the form 'STRUCT(...)'
''')
BEGIN
DECLARE batch_size DEFAULT 1000;
-- The time to wait before the script terminates
DECLARE termination_time_secs DEFAULT(23 * 60 * 60);
-- An optional filter applied as a where clause to apply to the source table
DECLARE source_filter DEFAULT 'TRUE';
-- The columns to project from the source table to the target table
DECLARE projection_columns ARRAY<STRING> DEFAULT ARRAY[];
-- The ML options to use for the ML operation
DECLARE ml_options DEFAULT 'STRUCT(TRUE AS flatten_json_output)';
DECLARE ml_query STRING;
-- The filter condition for accepting the ML result into the target table
DECLARE
accept_filter
DEFAULT 'ml_generate_text_status' || " NOT LIKE 'A retryable error occurred:%'";
DECLARE
key_cols_filter
DEFAULT(
SELECT
STRING_AGG('S.' || KEY || ' = T.' || KEY, ' AND ')
FROM
UNNEST(key_columns) AS KEY
);
DECLARE options JSON;
BEGIN
SET options = PARSE_JSON(options_string);
EXCEPTION WHEN ERROR THEN
RAISE USING MESSAGE = 'Unable to parse options_string as JSON';
END;
BEGIN
IF JSON_EXTRACT_SCALAR(options, '$.batch_size') IS NOT NULL THEN
SET batch_size = CAST(JSON_EXTRACT_SCALAR(options, '$.batch_size') AS INT64);
END IF;
EXCEPTION WHEN ERROR THEN
RAISE USING MESSAGE = 'Invalid batch_size. It must be an integer.';
END;
BEGIN
IF JSON_EXTRACT_SCALAR(options, '$.termination_time_secs') IS NOT NULL THEN
SET termination_time_secs = CAST(JSON_EXTRACT_SCALAR(options, '$.termination_time_secs') AS INT64);
END IF;
EXCEPTION WHEN ERROR THEN
RAISE USING MESSAGE = 'Invalid termination_time_secs. It must be an integer.';
END;
BEGIN
IF JSON_EXTRACT_SCALAR(options, '$.source_filter') IS NOT NULL THEN
SET source_filter = CAST(JSON_EXTRACT_SCALAR(options, '$.source_filter') AS STRING);
END IF;
EXCEPTION WHEN ERROR THEN
RAISE USING MESSAGE = 'Invalid source_filter. It must be a string.';
END;
BEGIN
IF JSON_EXTRACT_STRING_ARRAY(options, '$.projection_columns') IS NOT NULL THEN
SET projection_columns = JSON_EXTRACT_STRING_ARRAY(options, '$.projection_columns');
END IF;
EXCEPTION WHEN ERROR THEN
RAISE USING MESSAGE = 'Invalid projection_columns. It must be an array of strings.';
END;
SET projection_columns = (
SELECT ARRAY_AGG(DISTINCT val)
FROM UNNEST(ARRAY_CONCAT(key_columns, projection_columns)) AS val
);
BEGIN
IF JSON_EXTRACT_SCALAR(options, '$.ml_options') IS NOT NULL THEN
SET ml_options = CAST(JSON_EXTRACT_SCALAR(options, '$.ml_options') AS STRING);
END IF;
EXCEPTION WHEN ERROR THEN
RAISE USING MESSAGE = 'Invalid ml_options. It must be a string.';
END;
SET ml_query = FORMAT(
'SELECT %s, %s AS prompt FROM `%s` WHERE %s',
ARRAY_TO_STRING(projection_columns, ','),
content_column,
source_table,
source_filter);
-- Indicate the parameters used in this script run
SELECT source_table, target_table, ml_model, content_column, key_columns, options, batch_size, source_filter, termination_time_secs, projection_columns, ml_options, ml_query;
-- Create the target table first if it does not exist
EXECUTE
IMMEDIATE
FORMAT(
'''
CREATE TABLE IF NOT EXISTS `%s` AS
(SELECT *
FROM ML.GENERATE_TEXT (MODEL `%s`,
(SELECT *
FROM (%s)
LIMIT 10), %s)
WHERE %s)''',
target_table,
ml_model,
ml_query,
ml_options,
accept_filter);
-- Iteratively populate the target table
REPEAT
DROP TABLE IF EXISTS _SESSION.text_batch;
-- Identify new rows in the source table to generate text
-- For throughput reasons, materialize these rows into a temp table before calling GENERATE_TEXT()
EXECUTE
IMMEDIATE
FORMAT(
'''
CREATE TEMP TABLE _SESSION.text_batch AS
(SELECT *
FROM (%s) AS S
WHERE NOT EXISTS (SELECT * FROM %s AS T WHERE %s) LIMIT %d)
''',
ml_query,
target_table,
key_cols_filter,
batch_size);
-- Generate text for these rows and insert them into the target table
EXECUTE
IMMEDIATE
FORMAT(
'''
INSERT `%s`
SELECT *
FROM ML.GENERATE_TEXT (MODEL `%s`,
TABLE _SESSION.text_batch, %s)
WHERE %s
''',
target_table,
ml_model,
ml_options,
accept_filter);
UNTIL(
SELECT
@@row_count
)
= 0
OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), @@script.creation_time, SECOND)
>= termination_time_secs
END
REPEAT;
END