From fbd7359abd5943b41fc6d04a94e40737b1f8bffd Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:56:07 -0400 Subject: [PATCH] samples: add samples for Cloud Storage ingestion, and a few small fixes (#1985) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * build: fix a small issue with the typeless bot invocation * samples: add samples for cloud storage ingestion * samples: fix a paste-o in Kinesis sample * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * samples: fix paste-o with incorrect sample function name * tests: fix sample test quotes --------- Co-authored-by: Owl Bot --- README.md | 17 +-- package.json | 2 +- samples/README.md | 20 +++ .../createTopicWithCloudStorageIngestion.js | 118 ++++++++++++++++++ samples/createTopicWithKinesisIngestion.js | 3 +- samples/system-test/topics.test.ts | 33 +++++ .../createTopicWithCloudStorageIngestion.ts | 114 +++++++++++++++++ .../createTopicWithKinesisIngestion.ts | 3 +- 8 files changed, 297 insertions(+), 13 deletions(-) create mode 100644 samples/createTopicWithCloudStorageIngestion.js create mode 100644 samples/typescript/createTopicWithCloudStorageIngestion.ts diff --git a/README.md b/README.md index 210ec6d79..db9c9ef33 100644 --- a/README.md +++ b/README.md @@ -66,15 +66,15 @@ npm install @google-cloud/pubsub ```javascript // Imports the Google Cloud client library -const { PubSub } = require("@google-cloud/pubsub"); +const {PubSub} = require('@google-cloud/pubsub'); async function quickstart( -projectId = 'your-project-id', // Your Google Cloud Platform project ID -topicNameOrId = 'my-topic', // Name for the new topic to create -subscriptionName = 'my-sub' // Name for the new subscription to create + projectId = 'your-project-id', // Your Google Cloud Platform project ID + topicNameOrId = 'my-topic', // Name for the new topic to create + subscriptionName = 'my-sub' // Name for the new subscription to create ) { // Instantiates a client - const pubsub = new PubSub({ projectId }); + const pubsub = new PubSub({projectId}); // Creates a new topic const [topic] = await pubsub.createTopic(topicNameOrId); @@ -84,19 +84,19 @@ subscriptionName = 'my-sub' // Name for the new subscription to create const [subscription] = await topic.createSubscription(subscriptionName); // Receive callbacks for new messages on the subscription - subscription.on('message', (message) => { + subscription.on('message', message => { console.log('Received message:', message.data.toString()); process.exit(0); }); // Receive callbacks for errors on the subscription - subscription.on('error', (error) => { + subscription.on('error', error => { console.error('Received error:', error); process.exit(1); }); // Send a message to the topic - topic.publishMessage({ data: Buffer.from('Test message!') }); + topic.publishMessage({data: Buffer.from('Test message!')}); } ``` @@ -138,6 +138,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) | | Create Subscription With Retry Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithRetryPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithRetryPolicy.js,samples/README.md) | | Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) | +| Create Topic With Cloud Storage Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md) | | Create Topic With Kinesis Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithKinesisIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithKinesisIngestion.js,samples/README.md) | | Create Topic With Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchema.js,samples/README.md) | | Create Topic With Schema Revisions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchemaRevisions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchemaRevisions.js,samples/README.md) | diff --git a/package.json b/package.json index b3558bf47..a4ad5eafc 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,7 @@ "prelint": "cd samples; npm link ../; npm install", "precompile": "gts clean", "typeless": "npx typeless-sample-bot --outputpath samples --targets samples --recursive", - "posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix" + "posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix samples" }, "dependencies": { "@google-cloud/paginator": "^5.0.0", diff --git a/samples/README.md b/samples/README.md index c3e17d913..dc6caf537 100644 --- a/samples/README.md +++ b/samples/README.md @@ -35,6 +35,7 @@ guides. * [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled) * [Create Subscription With Retry Policy](#create-subscription-with-retry-policy) * [Create Topic](#create-topic) + * [Create Topic With Cloud Storage Ingestion](#create-topic-with-cloud-storage-ingestion) * [Create Topic With Kinesis Ingestion](#create-topic-with-kinesis-ingestion) * [Create Topic With Schema](#create-topic-with-schema) * [Create Topic With Schema Revisions](#create-topic-with-schema-revisions) @@ -389,6 +390,25 @@ __Usage:__ +### Create Topic With Cloud Storage Ingestion + +Creates a new topic, with Cloud Storage ingestion enabled. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md) + +__Usage:__ + + +`node createTopicWithCloudStorageIngestion.js ` + + +----- + + + + ### Create Topic With Kinesis Ingestion Creates a new topic, with Kinesis ingestion enabled. diff --git a/samples/createTopicWithCloudStorageIngestion.js b/samples/createTopicWithCloudStorageIngestion.js new file mode 100644 index 000000000..c66beeb6d --- /dev/null +++ b/samples/createTopicWithCloudStorageIngestion.js @@ -0,0 +1,118 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Create Topic With Cloud Storage Ingestion +// description: Creates a new topic, with Cloud Storage ingestion enabled. +// usage: node createTopicWithCloudStorageIngestion.js + +// [START pubsub_create_topic_with_cloud_storage_ingestion] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const bucket = 'YOUR_BUCKET_NAME'; +// const inputFormat = 'text'; +// const textDelimiter = '\n'; +// const matchGlob = '**.txt'; +// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createTopicWithCloudStorageIngestion( + topicNameOrId, + bucket, + inputFormat, + textDelimiter, + matchGlob, + minimumObjectCreateTime +) { + const minimumDate = Date.parse(minimumObjectCreateTime); + const topicMetadata = { + name: topicNameOrId, + ingestionDataSourceSettings: { + cloudStorage: { + bucket, + minimumObjectCreateTime: { + seconds: minimumDate / 1000, + nanos: (minimumDate % 1000) * 1000, + }, + matchGlob, + }, + }, + }; + + // Make a format appropriately. + switch (inputFormat) { + case 'text': + topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = { + delimiter: textDelimiter, + }; + break; + case 'avro': + topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {}; + break; + case 'pubsub_avro': + topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat = + {}; + break; + default: + console.error('inputFormat must be in ("text", "avro", "pubsub_avro")'); + return; + } + + // Creates a new topic with Cloud Storage ingestion. + await pubSubClient.createTopic(topicMetadata); + console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`); +} +// [END pubsub_create_topic_with_cloud_storage_ingestion] + +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + bucket = 'YOUR_BUCKET_NAME', + inputFormat = 'text', + textDelimiter = '\n', + matchGlob = '**.txt', + minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ' +) { + createTopicWithCloudStorageIngestion( + topicNameOrId, + bucket, + inputFormat, + textDelimiter, + matchGlob, + minimumObjectCreateTime + ).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/createTopicWithKinesisIngestion.js b/samples/createTopicWithKinesisIngestion.js index 3a77dcfa6..d9a50322e 100644 --- a/samples/createTopicWithKinesisIngestion.js +++ b/samples/createTopicWithKinesisIngestion.js @@ -52,8 +52,7 @@ async function createTopicWithKinesisIngestion( streamArn, consumerArn ) { - // Creates a new topic with a schema. Note that you might also - // pass Encodings.Json or Encodings.Binary here. + // Creates a new topic with Kinesis ingestion. await pubSubClient.createTopic({ name: topicNameOrId, ingestionDataSourceSettings: { diff --git a/samples/system-test/topics.test.ts b/samples/system-test/topics.test.ts index de31c71f3..54c28ee7f 100644 --- a/samples/system-test/topics.test.ts +++ b/samples/system-test/topics.test.ts @@ -13,6 +13,7 @@ // limitations under the License. import {Message, PubSub, Topic, Subscription} from '@google-cloud/pubsub'; +import {Bucket, Storage} from '@google-cloud/storage'; import {assert} from 'chai'; import {describe, it, after} from 'mocha'; import {execSync, commandFor} from './common'; @@ -52,6 +53,17 @@ describe('topics', () => { return {t: topic, tname, s: sub}; } + async function createStorageBucket(testName: string): Promise { + const storage = new Storage({ + projectId, + }); + + const name = resources.generateStorageName(testName); + + const [bucket] = await storage.createBucket(name); + return bucket; + } + async function cleanSubs() { const [subscriptions] = await pubsub.getSubscriptions(); await Promise.all( @@ -121,6 +133,27 @@ describe('topics', () => { assert.ok(exists, 'Topic was created'); }); + it('should create a topic with cloud storage ingestion', async () => { + const testId = 'create-gcs-ingestion'; + const name = topicName(testId); + const bucket = await createStorageBucket(testId); + const bucketName = bucket.name; + + try { + const output = execSync( + `${commandFor('createTopicWithCloudStorageIngestion')} ${name} ${ + bucketName + } text '\n' '**.txt' '2024-10-10T00:00:00Z'` + ); + assert.include(output, `Topic ${name} created with Cloud Storage ingestion.`); + const [topics] = await pubsub.getTopics(); + const exists = topics.some(t => t.name === fullTopicName(name)); + assert.ok(exists, 'Topic was created'); + } finally { + await bucket.delete(); + } + }); + it('should update a topic with kinesis integration', async () => { const pair = await createPair('update-kinesis'); const output = execSync( diff --git a/samples/typescript/createTopicWithCloudStorageIngestion.ts b/samples/typescript/createTopicWithCloudStorageIngestion.ts new file mode 100644 index 000000000..c30035bba --- /dev/null +++ b/samples/typescript/createTopicWithCloudStorageIngestion.ts @@ -0,0 +1,114 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on topics with + * the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Create Topic With Cloud Storage Ingestion +// description: Creates a new topic, with Cloud Storage ingestion enabled. +// usage: node createTopicWithCloudStorageIngestion.js + +// [START pubsub_create_topic_with_cloud_storage_ingestion] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const bucket = 'YOUR_BUCKET_NAME'; +// const inputFormat = 'text'; +// const textDelimiter = '\n'; +// const matchGlob = '**.txt'; +// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ; + +// Imports the Google Cloud client library +import {PubSub, TopicMetadata} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createTopicWithCloudStorageIngestion( + topicNameOrId: string, + bucket: string, + inputFormat: string, + textDelimiter: string, + matchGlob: string, + minimumObjectCreateTime: string +) { + const minimumDate = Date.parse(minimumObjectCreateTime); + const topicMetadata: TopicMetadata = { + name: topicNameOrId, + ingestionDataSourceSettings: { + cloudStorage: { + bucket, + minimumObjectCreateTime: { + seconds: minimumDate / 1000, + nanos: (minimumDate % 1000) * 1000, + }, + matchGlob, + }, + }, + }; + + // Make a format appropriately. + switch (inputFormat) { + case 'text': + topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = { + delimiter: textDelimiter, + }; + break; + case 'avro': + topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {}; + break; + case 'pubsub_avro': + topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat = + {}; + break; + default: + console.error('inputFormat must be in ("text", "avro", "pubsub_avro")'); + return; + } + + // Creates a new topic with Cloud Storage ingestion. + await pubSubClient.createTopic(topicMetadata); + console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`); +} +// [END pubsub_create_topic_with_cloud_storage_ingestion] + +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + bucket = 'YOUR_BUCKET_NAME', + inputFormat = 'text', + textDelimiter = '\n', + matchGlob = '**.txt', + minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ' +) { + createTopicWithCloudStorageIngestion( + topicNameOrId, + bucket, + inputFormat, + textDelimiter, + matchGlob, + minimumObjectCreateTime + ).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/createTopicWithKinesisIngestion.ts b/samples/typescript/createTopicWithKinesisIngestion.ts index 7c225b227..6b42d4dc1 100644 --- a/samples/typescript/createTopicWithKinesisIngestion.ts +++ b/samples/typescript/createTopicWithKinesisIngestion.ts @@ -48,8 +48,7 @@ async function createTopicWithKinesisIngestion( streamArn: string, consumerArn: string ) { - // Creates a new topic with a schema. Note that you might also - // pass Encodings.Json or Encodings.Binary here. + // Creates a new topic with Kinesis ingestion. await pubSubClient.createTopic({ name: topicNameOrId, ingestionDataSourceSettings: {