Skip to content

Commit

Permalink
samples: add samples for Cloud Storage ingestion, and a few small fix…
Browse files Browse the repository at this point in the history
…es (#1985)

* build: fix a small issue with the typeless bot invocation

* samples: add samples for cloud storage ingestion

* samples: fix a paste-o in Kinesis sample

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* samples: fix paste-o with incorrect sample function name

* tests: fix sample test quotes

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
feywind and gcf-owl-bot[bot] authored Oct 15, 2024
1 parent 7019003 commit fbd7359
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 13 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ npm install @google-cloud/pubsub

```javascript
// Imports the Google Cloud client library
const { PubSub } = require("@google-cloud/pubsub");
const {PubSub} = require('@google-cloud/pubsub');

async function quickstart(
projectId = 'your-project-id', // Your Google Cloud Platform project ID
topicNameOrId = 'my-topic', // Name for the new topic to create
subscriptionName = 'my-sub' // Name for the new subscription to create
projectId = 'your-project-id', // Your Google Cloud Platform project ID
topicNameOrId = 'my-topic', // Name for the new topic to create
subscriptionName = 'my-sub' // Name for the new subscription to create
) {
// Instantiates a client
const pubsub = new PubSub({ projectId });
const pubsub = new PubSub({projectId});

// Creates a new topic
const [topic] = await pubsub.createTopic(topicNameOrId);
Expand All @@ -84,19 +84,19 @@ subscriptionName = 'my-sub' // Name for the new subscription to create
const [subscription] = await topic.createSubscription(subscriptionName);

// Receive callbacks for new messages on the subscription
subscription.on('message', (message) => {
subscription.on('message', message => {
console.log('Received message:', message.data.toString());
process.exit(0);
});

// Receive callbacks for errors on the subscription
subscription.on('error', (error) => {
subscription.on('error', error => {
console.error('Received error:', error);
process.exit(1);
});

// Send a message to the topic
topic.publishMessage({ data: Buffer.from('Test message!') });
topic.publishMessage({data: Buffer.from('Test message!')});
}

```
Expand Down Expand Up @@ -138,6 +138,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
| Create Subscription With Retry Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithRetryPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithRetryPolicy.js,samples/README.md) |
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
| Create Topic With Cloud Storage Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md) |
| Create Topic With Kinesis Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithKinesisIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithKinesisIngestion.js,samples/README.md) |
| Create Topic With Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchema.js,samples/README.md) |
| Create Topic With Schema Revisions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchemaRevisions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchemaRevisions.js,samples/README.md) |
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"prelint": "cd samples; npm link ../; npm install",
"precompile": "gts clean",
"typeless": "npx typeless-sample-bot --outputpath samples --targets samples --recursive",
"posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix"
"posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix samples"
},
"dependencies": {
"@google-cloud/paginator": "^5.0.0",
Expand Down
20 changes: 20 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ guides.
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
* [Create Subscription With Retry Policy](#create-subscription-with-retry-policy)
* [Create Topic](#create-topic)
* [Create Topic With Cloud Storage Ingestion](#create-topic-with-cloud-storage-ingestion)
* [Create Topic With Kinesis Ingestion](#create-topic-with-kinesis-ingestion)
* [Create Topic With Schema](#create-topic-with-schema)
* [Create Topic With Schema Revisions](#create-topic-with-schema-revisions)
Expand Down Expand Up @@ -389,6 +390,25 @@ __Usage:__



### Create Topic With Cloud Storage Ingestion

Creates a new topic, with Cloud Storage ingestion enabled.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md)

__Usage:__


`node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>`


-----




### Create Topic With Kinesis Ingestion

Creates a new topic, with Kinesis ingestion enabled.
Expand Down
118 changes: 118 additions & 0 deletions samples/createTopicWithCloudStorageIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Cloud Storage Ingestion
// description: Creates a new topic, with Cloud Storage ingestion enabled.
// usage: node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>

// [START pubsub_create_topic_with_cloud_storage_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
topicNameOrId,
bucket,
inputFormat,
textDelimiter,
matchGlob,
minimumObjectCreateTime
) {
const minimumDate = Date.parse(minimumObjectCreateTime);
const topicMetadata = {
name: topicNameOrId,
ingestionDataSourceSettings: {
cloudStorage: {
bucket,
minimumObjectCreateTime: {
seconds: minimumDate / 1000,
nanos: (minimumDate % 1000) * 1000,
},
matchGlob,
},
},
};

// Make a format appropriately.
switch (inputFormat) {
case 'text':
topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
delimiter: textDelimiter,
};
break;
case 'avro':
topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
break;
case 'pubsub_avro':
topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
{};
break;
default:
console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
return;
}

// Creates a new topic with Cloud Storage ingestion.
await pubSubClient.createTopic(topicMetadata);
console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}
// [END pubsub_create_topic_with_cloud_storage_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
bucket = 'YOUR_BUCKET_NAME',
inputFormat = 'text',
textDelimiter = '\n',
matchGlob = '**.txt',
minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ'
) {
createTopicWithCloudStorageIngestion(
topicNameOrId,
bucket,
inputFormat,
textDelimiter,
matchGlob,
minimumObjectCreateTime
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
3 changes: 1 addition & 2 deletions samples/createTopicWithKinesisIngestion.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ async function createTopicWithKinesisIngestion(
streamArn,
consumerArn
) {
// Creates a new topic with a schema. Note that you might also
// pass Encodings.Json or Encodings.Binary here.
// Creates a new topic with Kinesis ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
Expand Down
33 changes: 33 additions & 0 deletions samples/system-test/topics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

import {Message, PubSub, Topic, Subscription} from '@google-cloud/pubsub';
import {Bucket, Storage} from '@google-cloud/storage';
import {assert} from 'chai';
import {describe, it, after} from 'mocha';
import {execSync, commandFor} from './common';
Expand Down Expand Up @@ -52,6 +53,17 @@ describe('topics', () => {
return {t: topic, tname, s: sub};
}

async function createStorageBucket(testName: string): Promise<Bucket> {
const storage = new Storage({
projectId,
});

const name = resources.generateStorageName(testName);

const [bucket] = await storage.createBucket(name);
return bucket;
}

async function cleanSubs() {
const [subscriptions] = await pubsub.getSubscriptions();
await Promise.all(
Expand Down Expand Up @@ -121,6 +133,27 @@ describe('topics', () => {
assert.ok(exists, 'Topic was created');
});

it('should create a topic with cloud storage ingestion', async () => {
const testId = 'create-gcs-ingestion';
const name = topicName(testId);
const bucket = await createStorageBucket(testId);
const bucketName = bucket.name;

try {
const output = execSync(
`${commandFor('createTopicWithCloudStorageIngestion')} ${name} ${
bucketName
} text '\n' '**.txt' '2024-10-10T00:00:00Z'`
);
assert.include(output, `Topic ${name} created with Cloud Storage ingestion.`);
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
} finally {
await bucket.delete();
}
});

it('should update a topic with kinesis integration', async () => {
const pair = await createPair('update-kinesis');
const output = execSync(
Expand Down
114 changes: 114 additions & 0 deletions samples/typescript/createTopicWithCloudStorageIngestion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Cloud Storage Ingestion
// description: Creates a new topic, with Cloud Storage ingestion enabled.
// usage: node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>

// [START pubsub_create_topic_with_cloud_storage_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
topicNameOrId: string,
bucket: string,
inputFormat: string,
textDelimiter: string,
matchGlob: string,
minimumObjectCreateTime: string
) {
const minimumDate = Date.parse(minimumObjectCreateTime);
const topicMetadata: TopicMetadata = {
name: topicNameOrId,
ingestionDataSourceSettings: {
cloudStorage: {
bucket,
minimumObjectCreateTime: {
seconds: minimumDate / 1000,
nanos: (minimumDate % 1000) * 1000,
},
matchGlob,
},
},
};

// Make a format appropriately.
switch (inputFormat) {
case 'text':
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
delimiter: textDelimiter,
};
break;
case 'avro':
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
break;
case 'pubsub_avro':
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
{};
break;
default:
console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
return;
}

// Creates a new topic with Cloud Storage ingestion.
await pubSubClient.createTopic(topicMetadata);
console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}
// [END pubsub_create_topic_with_cloud_storage_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
bucket = 'YOUR_BUCKET_NAME',
inputFormat = 'text',
textDelimiter = '\n',
matchGlob = '**.txt',
minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ'
) {
createTopicWithCloudStorageIngestion(
topicNameOrId,
bucket,
inputFormat,
textDelimiter,
matchGlob,
minimumObjectCreateTime
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
Loading

0 comments on commit fbd7359

Please sign in to comment.