forked from pulumi/examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.ts
101 lines (90 loc) · 3.26 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright 2016-2019, Pulumi Corporation. All rights reserved.
import * as azure from "@pulumi/azure";
import * as pulumi from "@pulumi/pulumi";
import { createSharedAccessToken } from "./token";
// Create an Azure Resource Group
const resourceGroup = new azure.core.ResourceGroup("streams-rg");
// Define an Event Hub Namespace with two Hubs for an input and an output data streams
const namespace = new azure.eventhub.EventHubNamespace("streams-ns", {
resourceGroupName: resourceGroup.name,
sku: "standard",
});
const inputHub = new azure.eventhub.EventHub("inputs", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
partitionCount: 2,
messageRetention: 7,
});
const consumerGroup = new azure.eventhub.EventHubConsumerGroup("analytics", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
eventhubName: inputHub.name,
});
const outputHub = new azure.eventhub.EventHub("outputs", {
resourceGroupName: resourceGroup.name,
namespaceName: namespace.name,
partitionCount: 2,
messageRetention: 7,
});
// Create a Stream Analytics Job that aggregates events per Make and 1-minute intervals
const job = new azure.streamanalytics.Job("job", {
resourceGroupName: resourceGroup.name,
compatibilityLevel: "1.1",
dataLocale: "en-GB",
eventsLateArrivalMaxDelayInSeconds: 60,
eventsOutOfOrderMaxDelayInSeconds: 50,
eventsOutOfOrderPolicy: "Adjust",
outputErrorPolicy: "Drop",
streamingUnits: 1,
transformationQuery: `
SELECT
Make,
SUM(Sales) AS Sales
INTO
[MyOutput]
FROM
[MyInput] TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(minute, 1)
`,
});
// Input of the job: the Event Hub with raw events
const input = new azure.streamanalytics.StreamInputEventHub("input", {
name: "MyInput",
resourceGroupName: resourceGroup.name,
streamAnalyticsJobName: job.name,
servicebusNamespace: namespace.name,
eventhubName: inputHub.name,
eventhubConsumerGroupName: consumerGroup.name,
sharedAccessPolicyKey: namespace.defaultPrimaryKey,
sharedAccessPolicyName: "RootManageSharedAccessKey",
serialization: {
type: "Json",
encoding: "UTF8",
},
});
// Output of the job: the Event Hub with aggregated data
const output = new azure.streamanalytics.OutputEventHub("output", {
name: "MyOutput",
resourceGroupName: resourceGroup.name,
streamAnalyticsJobName: job.name,
servicebusNamespace: namespace.name,
eventhubName: outputHub.name,
sharedAccessPolicyKey: namespace.defaultPrimaryKey,
sharedAccessPolicyName: "RootManageSharedAccessKey",
serialization: {
type: "Json",
encoding: "UTF8",
format: "Array",
},
});
// Create an Azure Function to subscribe to the output and print all outputs of the job
outputHub.onEvent("analytics-output", {
callback: async (context, event) => {
console.log(JSON.stringify(event));
},
});
const url = pulumi.interpolate`https://${namespace.name}.servicebus.windows.net`;
export const sasToken = pulumi.all([url, namespace.defaultPrimaryKey]).apply(([u, pk]) => createSharedAccessToken(u, "RootManageSharedAccessKey", pk));
export const inputEndpoint = pulumi.interpolate`${url}/${inputHub.name}/messages?timeout=60&api-version=2014-01`;