From 127fbdcdab02602b950fe9c5a70269890462d241 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Fri, 1 Nov 2024 16:05:40 -0500 Subject: [PATCH 01/11] initial commit --- docs/mediastore/concurrency/_category_.json | 9 ++ docs/mediastore/concurrency/account-level.md | 90 +++++++++++++++++++ .../{viewer-metrics.md => heartbeats.md} | 0 3 files changed, 99 insertions(+) create mode 100644 docs/mediastore/concurrency/_category_.json create mode 100644 docs/mediastore/concurrency/account-level.md rename docs/mediastore/enhancements/{viewer-metrics.md => heartbeats.md} (100%) diff --git a/docs/mediastore/concurrency/_category_.json b/docs/mediastore/concurrency/_category_.json new file mode 100644 index 0000000000..4ded40e97b --- /dev/null +++ b/docs/mediastore/concurrency/_category_.json @@ -0,0 +1,9 @@ +{ + "label": "Concurrency", + "position": 33, + "collapsible": true, + "collapsed": false, + "link": { + "description": "Learn about managing concurrency with Momento" + } +} diff --git a/docs/mediastore/concurrency/account-level.md b/docs/mediastore/concurrency/account-level.md new file mode 100644 index 0000000000..1b974f49c8 --- /dev/null +++ b/docs/mediastore/concurrency/account-level.md @@ -0,0 +1,90 @@ +--- +sidebar_position: 2 +sidebar_label: Device management +title: Concurrency tracking +description: Learn how to track and manage concurrent devices for an account +hide_title: true +keywords: + - momento + - mediastore + - zero buffer rate + - zbr + - streaming + - live + - elemental + - serverless + - metrics + - concurrency tracking +--- + +# Tracking concurrent devices for an account + +Tracking concurrent devices or streams per account is essential for applications where managing access, ensuring security, and optimizing resources are top priorities. In subscription-based services, enforcing device or stream limits restrict simultaneous access to content to a specified number of devices. **Concurrency tracking** also supports account security by preventing unauthorized sharing, safeguarding the service from potential misuse. This also enables resource optimization in high-traffic applications, especially within media and entertainment, by managing resources dynamically based on real-time demand. + +The pattern outlined below demonstrates how Momento provides **real-time session monitoring** without the need for complex infrastructure. + +## Overview + +```mermaid +flowchart TD + subgraph Account + A[Account ID] + end + subgraph Momento + B1[Store Heartbeats in Capped List] + B2[Evaluate Unique Devices on Entitlement Check] + end + subgraph Device + C1[Device 1] + C2[Device 2] + C3[Device N] + end + + C1 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + C2 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + C3 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + + B1 -- "Capped to Most Recent N Entries" --> B2 + A -- "Evaluate Active Concurrency" --> B2 + + Account -.->|Check Entitlement| A +``` + +Monitoring concurrency with Momento relies on [heartbeats](/mediastore/enhancements/heartbeats) to be emitted from connected players. A server component manages a [cache list](/cache/develop/basics/datatypes#lists) in Momento that tracks the most recent N heartbeats (N is based on your business requirements). During an [entitlement check](/mediastore/entitlements/about), the list is fetched, player ids are deduplicated, and the concurrency count is determined. + +The major components in concurrency tracking are: + +* **Device** - Each device or stream sends a heartbeat via [Momento Topics](/topics), which includes a unique session ID. +* **Momento** + * **Cache** - Stores recent heartbeats for each account in a dedicated cache list, discarding old entries when the list exceeds a certain length. + * **Auth** - Creates [session tokens](/cache/develop/authentication/tokens) for players, encoding the account id directly in the token. +* **Account** - Represents the user account in your system. + +## Building a concurrency tracker + +Four components are needed in this pattern + +* Token vending machine +* Device heartbeat +* Heartbeat handler +* Concurrency checker + +### Token vending machine + +A token vending machine is a pattern that dispenses short-lived session tokens with limited permissions. This is a server-side component, usually an API endpoint, that dynamically generates the token. Below is a snippet of code used to create a session token. This code should live inside of your API endpoint handler. + +```javascript +const scope = { permissions: [ + { + role: 'publishonly', + cache: 'video', + topic: mediaId + } +]}; + +const response = await authClient.generateDisposableToken(scope, ExpiresIn.minutes(30), { tokenId: accountId }); +if(response.type === GenerateDisposableTokenResponse.Success){ + return { token: response.authToken }; +} +``` + diff --git a/docs/mediastore/enhancements/viewer-metrics.md b/docs/mediastore/enhancements/heartbeats.md similarity index 100% rename from docs/mediastore/enhancements/viewer-metrics.md rename to docs/mediastore/enhancements/heartbeats.md From a0f241e64e5714b52e9a77ffabe79205909516e9 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Mon, 4 Nov 2024 07:07:44 -0600 Subject: [PATCH 02/11] add tabs for TVI code --- docs/mediastore/concurrency/account-level.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/docs/mediastore/concurrency/account-level.md b/docs/mediastore/concurrency/account-level.md index 1b974f49c8..c5c33f051d 100644 --- a/docs/mediastore/concurrency/account-level.md +++ b/docs/mediastore/concurrency/account-level.md @@ -17,6 +17,9 @@ keywords: - concurrency tracking --- +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + # Tracking concurrent devices for an account Tracking concurrent devices or streams per account is essential for applications where managing access, ensuring security, and optimizing resources are top priorities. In subscription-based services, enforcing device or stream limits restrict simultaneous access to content to a specified number of devices. **Concurrency tracking** also supports account security by preventing unauthorized sharing, safeguarding the service from potential misuse. This also enables resource optimization in high-traffic applications, especially within media and entertainment, by managing resources dynamically based on real-time demand. @@ -72,6 +75,8 @@ Four components are needed in this pattern ### Token vending machine A token vending machine is a pattern that dispenses short-lived session tokens with limited permissions. This is a server-side component, usually an API endpoint, that dynamically generates the token. Below is a snippet of code used to create a session token. This code should live inside of your API endpoint handler. + + ```javascript const scope = { permissions: [ @@ -87,4 +92,15 @@ if(response.type === GenerateDisposableTokenResponse.Success){ return { token: response.authToken }; } ``` - + + + + + + + +In the snippet above, we set explicit permissions to allow the user to *publish* messages to the `mediaId` topic. This is the way the player heartbeat will communicate with our handler. The token is configured to live for 30 minutes and has the user's `accountId` embedded in the token. The embedded account id will show up as an argument in our heartbeat subscription on the server, *preventing messages from being spoofed* and adding a layer of security to our solution. + +:::info +In a production scenario, this code might live in your existing authZ mechanism and return the generated token as a claim. Assumptions are made here that prior to the code snippet above, the user has been authenticated and you have access to their account id and have securely identified the content their are attempting to view. +::: From ec8b2f89f6599fc8435af453e340f719d896328a Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Thu, 7 Nov 2024 09:58:05 -0600 Subject: [PATCH 03/11] adding heartbeat code --- docs/mediastore/concurrency/account-level.md | 213 +++++++++++++++++- .../mediastore/enhancements/live-reactions.md | 8 +- 2 files changed, 217 insertions(+), 4 deletions(-) diff --git a/docs/mediastore/concurrency/account-level.md b/docs/mediastore/concurrency/account-level.md index c5c33f051d..cab41a8480 100644 --- a/docs/mediastore/concurrency/account-level.md +++ b/docs/mediastore/concurrency/account-level.md @@ -75,6 +75,7 @@ Four components are needed in this pattern ### Token vending machine A token vending machine is a pattern that dispenses short-lived session tokens with limited permissions. This is a server-side component, usually an API endpoint, that dynamically generates the token. Below is a snippet of code used to create a session token. This code should live inside of your API endpoint handler. + @@ -83,7 +84,7 @@ const scope = { permissions: [ { role: 'publishonly', cache: 'video', - topic: mediaId + topic: 'heartbeat' } ]}; @@ -94,13 +95,221 @@ if(response.type === GenerateDisposableTokenResponse.Success){ ``` + +```go +resp, err := authClient.GenerateDisposableToken(ctx, &momento.GenerateDisposableTokenRequest{ + ExpiresIn: utils.ExpiresInMinutes(30), + Scope: momento.TopicPublishOnly( + momento.CacheName{Name: "video"}, + momento.TopicName{Name: "heartbeat"}, + ), + Props: momento.DisposableTokenProps{ + TokenId: &req.PlayerID, + }, + }) + + if err != nil { + http.Error(w, "Failed to generate token", http.StatusInternalServerError) + return + } + + switch r := resp.(type) { + case *auth_resp.GenerateDisposableTokenSuccess: + return r.ApiKey + default: + http.Error(w, "Failed to generate token", http.StatusInternalServerError) + } +``` + + +```csharp +var response = await _authClient.GenerateDispableTokenAsync( + DisposableTokenScopes.TopicPublishOnly("video", "heartbeat"), + ExpiresIn.Minutes(30) +); + +return response.AuthToken; +``` + -In the snippet above, we set explicit permissions to allow the user to *publish* messages to the `mediaId` topic. This is the way the player heartbeat will communicate with our handler. The token is configured to live for 30 minutes and has the user's `accountId` embedded in the token. The embedded account id will show up as an argument in our heartbeat subscription on the server, *preventing messages from being spoofed* and adding a layer of security to our solution. +In the snippet above, we set explicit permissions to allow the user to *publish* messages to the `heartbeat` topic. This is the way the player heartbeat will communicate with our handler. The token is configured to live for 30 minutes and has the user's `accountId` embedded in the token. The embedded account id will show up as an argument in our heartbeat subscription on the server, *preventing messages from being spoofed* and adding a layer of security to our solution. :::info In a production scenario, this code might live in your existing authZ mechanism and return the generated token as a claim. Assumptions are made here that prior to the code snippet above, the user has been authenticated and you have access to their account id and have securely identified the content their are attempting to view. ::: + +### Device heartbeat + +With the token vending machine in place, we can use it on the device to publish heartbeat on a regular interval. The heartbeat can contain any information about the media, player, or device based on your use case. For this simple walkthrough, we will provide the minimum amount of information and include only the device id. + + + + +```jsx +import React, { useEffect, useState, useMemo } from 'react'; +import ReactDOM from 'react-dom/client'; +import { TopicClient, CredentialProvider } from '@gomomento/sdk-web'; + +const HEARTBEAT_INTERVAL_MS = 5000; + +function getMediaIdFromQuery() { + const params = new URLSearchParams(window.location.search); + return params.get('id') +} + +function Device() { + const [topicClient, setTopicClient] = useState(null); + + const mediaId = useMemo(() => getMediaIdFromQuery(), []); + const deviceId = useMemo(() => { + const savedDeviceId = localStorage.getItem('deviceId'); + if (savedDeviceId) return savedDeviceId; + + const newDeviceId = crypto.randomUUID(); + localStorage.setItem('deviceId', newDeviceId); + return newDeviceId; + }, []); + + const message = useMemo(() => JSON.stringify({ deviceId, mediaId }), [deviceId, mediaId]); + + useEffect(() => { + async function initTopicClient() { + const response = await fetch('http://localhost:3000/tokens', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ accountId: 'account-id' }), + }); + if (response.ok) { + const { token } = await response.json(); + const topicClient = new TopicClient({ + credentialProvider: CredentialProvider.fromString(token), + }); + setTopicClient(topicClient); + } + } + + initTopicClient(); + }, [mediaId]); + + useEffect(() => { + if (topicClient) { + const intervalId = setInterval(() => { + topicClient.publish('video', 'heartbeat', message); + }, HEARTBEAT_INTERVAL_MS); + + return () => clearInterval(intervalId); + } + }, [topicClient, mediaId, message]); + + return ( +
+

Device {deviceId}: {topicClient ? 'Connected' : 'Not Connected'}

+
+ ); +} + +const root = ReactDOM.createRoot(document.getElementById('root')); +root.render(); + +``` + +
+ + +```html + + + + +
+

Device : Not Connected

+
+ + + + + +``` + +
+
+ +In the above examples, the player html only includes the heartbeat logic. It calls the token vending machine from step one that we put behind an API endpoint running locally to fetch a token. Once the player has the token, it begins publishing the device id and media id to the `heartbeat` topic. The heartbeat is sent every 5 seconds so the heartbeat handler can track active instances. + +Two things to note in the code for the device heartbeat: + +1. The account id being supplied to the token vending machine is hardcoded, in practice this would come from your AuthN mechanism. +2. When calling the Momento HTTP API, the base url is [region based](/platform/regions). Substitute the placeholder with the correct region endpoint for your use case. If you use the Momento SDK, region handling is managed for you. + +### Heartbeat handler diff --git a/docs/mediastore/enhancements/live-reactions.md b/docs/mediastore/enhancements/live-reactions.md index 7e373a9ab0..1c42033fa1 100644 --- a/docs/mediastore/enhancements/live-reactions.md +++ b/docs/mediastore/enhancements/live-reactions.md @@ -71,7 +71,7 @@ const authClient = new AuthClient(); const app = express(); app.use(express.json()); -app.post('/tokens', (req, res) => { +app.post('/tokens', async (req, res) => { const { playerId, streamId} = req.body; const scope = { permissions: [{ @@ -79,11 +79,15 @@ app.post('/tokens', (req, res) => { cache: 'video', topic: streamId }] - }, + }; const tokenResponse = await authClient.generateDisposableToken(scope, ExpiresIn.minutes(30), { tokenId: playerId }); res.status(201).json({ token: tokenResponse.authToken }); }); + +app.listen(3000, () => { + console.log('Server is running on port 3000'); +}); ``` From 727775e54efa87b388d9ede55ab034e1aecdce52 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Thu, 7 Nov 2024 14:32:26 -0600 Subject: [PATCH 04/11] concurrency checker code --- docs/mediastore/concurrency/account-level.md | 134 ++++++++++++++++++- 1 file changed, 130 insertions(+), 4 deletions(-) diff --git a/docs/mediastore/concurrency/account-level.md b/docs/mediastore/concurrency/account-level.md index cab41a8480..cb6f9b7872 100644 --- a/docs/mediastore/concurrency/account-level.md +++ b/docs/mediastore/concurrency/account-level.md @@ -34,7 +34,7 @@ flowchart TD A[Account ID] end subgraph Momento - B1[Store Heartbeats in Capped List] + B1[Store Heartbeats in Cache Dictionary] B2[Evaluate Unique Devices on Entitlement Check] end subgraph Device @@ -53,13 +53,13 @@ flowchart TD Account -.->|Check Entitlement| A ``` -Monitoring concurrency with Momento relies on [heartbeats](/mediastore/enhancements/heartbeats) to be emitted from connected players. A server component manages a [cache list](/cache/develop/basics/datatypes#lists) in Momento that tracks the most recent N heartbeats (N is based on your business requirements). During an [entitlement check](/mediastore/entitlements/about), the list is fetched, player ids are deduplicated, and the concurrency count is determined. +Monitoring concurrency with Momento relies on [heartbeats](/mediastore/enhancements/heartbeats) to be emitted from connected players. A server component manages [cache dictionaries](/cache/develop/basics/datatypes#dictionaries) in Momento that track the heartbeats from unique players over a given interval. During an [entitlement check](/mediastore/entitlements/about), the last *complete* interval dictionary is fetched and the concurrency count is determined. The major components in concurrency tracking are: * **Device** - Each device or stream sends a heartbeat via [Momento Topics](/topics), which includes a unique session ID. * **Momento** - * **Cache** - Stores recent heartbeats for each account in a dedicated cache list, discarding old entries when the list exceeds a certain length. + * **Cache** - Stores recent heartbeats for each account in interval based cache dictionaries. * **Auth** - Creates [session tokens](/cache/develop/authentication/tokens) for players, encoding the account id directly in the token. * **Account** - Represents the user account in your system. @@ -158,7 +158,7 @@ const HEARTBEAT_INTERVAL_MS = 5000; function getMediaIdFromQuery() { const params = new URLSearchParams(window.location.search); - return params.get('id') + return params.get('id'); } function Device() { @@ -312,4 +312,130 @@ Two things to note in the code for the device heartbeat: 1. The account id being supplied to the token vending machine is hardcoded, in practice this would come from your AuthN mechanism. 2. When calling the Momento HTTP API, the base url is [region based](/platform/regions). Substitute the placeholder with the correct region endpoint for your use case. If you use the Momento SDK, region handling is managed for you. +*For a complete example of a token vending machine, [check out this tutorial](/media-storage/enhancements/live-reactions#step-1-building-a-token-vending-machine).* + ### Heartbeat handler + +Devices for a specific account will be tracked in a series of cache dictionaries. A unique cache dictionary will be used to track device heartbeats over a given time interval. The time interval can vary based on your business requirements. Our example will be evaluating concurrency once a minute. + +The naming convention for the interval-based dictionaries is `{accountId}-${intervalTime}`. To calculate the interval time, get the time in ticks of a given minute and round down. + + + + +```javascript +function getIntervalMarker(minutesBack = 0) { + const now = new Date(); + now.setTime(now.getTime() - minutesBack * 60000); + now.setSeconds(0, 0); + return now.getTime(); +} +``` + + + + +```go +import ( + "fmt" + "time" +) + +func getIntervalMarker(minutesBack int) int64 { + now := time.Now().Add(-time.Duration(minutesBack) * time.Minute) + rounded := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location()) + return rounded.UnixNano() / int64(time.Millisecond) +} +``` + + + + +```csharp +static long GetIntervalMarker(int minutesBack = 0) +{ + DateTime now = DateTime.UtcNow.AddMinutes(-minutesBack); + now = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute, 0, DateTimeKind.Utc); + return new DateTimeOffset(now).ToUnixTimeMilliseconds(); +} +``` + + + + +As heartbeats come in, the device id is stored as a value in the dictionary and a count is incremented. A [time to live (TTL)](/cache/learn/how-it-works/expire-data-with-ttl) is set on the dictionary for twice the interval length, so the data automatically cleans itself up when it is no longer needed. + +### Concurrency checker + +Lastly, we have the concurrency checker. Often rolled in as part of an [entitlement check](/media-storage/entitlements/about), this is the logic that reads the heartbeat dictionary and determines if an account is over their allowed limit. To check concurrency, we fetch the dictionary length from the *previous interval* and count the number of entries. + + + + +```javascript +async function getConcurrentDeviceCount(accountId) { + const interval = getIntervalMarker(1); + const intervalKey = `${accountId}-${interval}`; + let deviceCount = 0; + const response = await cacheClient.dictionaryLength('video', intervalKey); + if(response.type === CacheDictionaryLengthResponse.Hit){ + deviceCount = response.value(); + } + return deviceCount; +} +``` + + + + +```go +func getConcurrentDeviceCount(accountId string) int { + interval := getIntervalMarker(1) + intervalKey := fmt.Sprintf("%s-%d", accountId, interval) + deviceCount := 0 + + resp, err := client.DictionaryLength(ctx, &momento.DictionaryLengthRequest{ + CacheName: cacheName, + DictionaryName: intervalKey, + }) + if err != nil { + panic(err) + } + switch r := resp.(type) { + case *responses.DictionaryLengthHit: + deviceCount = int(r.Length()) + } + return deviceCount +} +``` + + + + +```csharp +static async Task GetConcurrentDeviceCount(CacheClient cacheClient, string accountId) +{ + long interval = GetIntervalMarker(1); + string intervalKey = $"{accountId}-{interval}"; + int deviceCount = 0; + + CacheDictionaryLengthResponse response = await cacheClient.DictionaryLengthAsync("video", intervalKey); + if (response is CacheDictionaryLengthResponse.Hit) + { + deviceCount = response.Length; + } + + return deviceCount; +} +``` + + + + +This function uses the `getIntervalMarker` method we created in the previous step to get the time of our last interval, then calls Momento Cache to see how many entries are in the dictionary. Remember, each device that reported a heartbeat counts as an entry in the dictionary, so the length directly maps to the number of concurrent players. + +The value is returned to the caller and it's up to standard business logic to take over from there. + +:::tip +Interested in something a little more managed? Check out our [helper library]() that does the hard work for you! +::: From 9ee726b051d4df78d69331702b444bb608c740f6 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Thu, 7 Nov 2024 15:20:39 -0600 Subject: [PATCH 05/11] move folder structure to new location --- docs/{mediastore => media-storage}/concurrency/_category_.json | 0 docs/{mediastore => media-storage}/concurrency/account-level.md | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename docs/{mediastore => media-storage}/concurrency/_category_.json (100%) rename docs/{mediastore => media-storage}/concurrency/account-level.md (99%) diff --git a/docs/mediastore/concurrency/_category_.json b/docs/media-storage/concurrency/_category_.json similarity index 100% rename from docs/mediastore/concurrency/_category_.json rename to docs/media-storage/concurrency/_category_.json diff --git a/docs/mediastore/concurrency/account-level.md b/docs/media-storage/concurrency/account-level.md similarity index 99% rename from docs/mediastore/concurrency/account-level.md rename to docs/media-storage/concurrency/account-level.md index cb6f9b7872..a08b744114 100644 --- a/docs/mediastore/concurrency/account-level.md +++ b/docs/media-storage/concurrency/account-level.md @@ -437,5 +437,5 @@ This function uses the `getIntervalMarker` method we created in the previous ste The value is returned to the caller and it's up to standard business logic to take over from there. :::tip -Interested in something a little more managed? Check out our [helper library]() that does the hard work for you! +Interested in something a little more managed? Check out our [helper library](/media-storage) that does the hard work for you! ::: From 0850c1679f7ddbe8e096afe5b80e6aa18cfcf486 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Mon, 11 Nov 2024 16:13:12 -0600 Subject: [PATCH 06/11] Add other part of heartbeat handler --- .../concurrency/account-level.md | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/docs/media-storage/concurrency/account-level.md b/docs/media-storage/concurrency/account-level.md index a08b744114..78954bb648 100644 --- a/docs/media-storage/concurrency/account-level.md +++ b/docs/media-storage/concurrency/account-level.md @@ -363,6 +363,52 @@ static long GetIntervalMarker(int minutesBack = 0) +To create the cache key that tracks the device heartbeat count, you append the value from the function above to the end of a user's account id. Creating keys this way means you have a dedicated cache item per interval. Coupled with an appropriate TTL, the cache item will clean itself up automatically, simplifying the code needed for the pattern. + + + + +```javascript +// Get the account id through your business logic +const accountId = getAccountId(req); +const { deviceId } = req.body; +const key = `${accountId}-${getIntervalMarker()}`; + +await cacheClient.dictionaryIncrement('video', key, deviceId, 1); +``` + + + + +```go +accountId := getAccountId(req) +deviceId := req.DeviceID + +key := fmt.Sprintf("%s-%d", accountId, getIntervalMarker()) +_, err := cacheClient.DictionaryIncrement(ctx, &momento.DictionaryIncrementRequest{ + CacheName: momento.String("video"), + DictionaryName: key, + Field: deviceId, + Amount: 1 +}) +if err != nil { + fmt.Println("Error incrementing cache:", err) +} +``` + + + + +```csharp + var accountId = GetAccountId(req); + var deviceId = req.Body.DeviceId; + var key = $"{accountId}-{getIntervalMarker()}"; + + await cacheClient.DictionaryIncrementAsync("video", key, deviceId, 1); +``` + + + As heartbeats come in, the device id is stored as a value in the dictionary and a count is incremented. A [time to live (TTL)](/cache/learn/how-it-works/expire-data-with-ttl) is set on the dictionary for twice the interval length, so the data automatically cleans itself up when it is no longer needed. ### Concurrency checker @@ -395,7 +441,7 @@ func getConcurrentDeviceCount(accountId string) int { deviceCount := 0 resp, err := client.DictionaryLength(ctx, &momento.DictionaryLengthRequest{ - CacheName: cacheName, + CacheName: momento.String("video"), DictionaryName: intervalKey, }) if err != nil { @@ -437,5 +483,5 @@ This function uses the `getIntervalMarker` method we created in the previous ste The value is returned to the caller and it's up to standard business logic to take over from there. :::tip -Interested in something a little more managed? Check out our [helper library](/media-storage) that does the hard work for you! +For a complete example of this pattern in action, along with other practical patterns for media streaming, check out our [demo on GitHub](https://github.com/momentohq/demo-video-streaming)! ::: From d559cbce3c25f39ff085e38b6e7ed27eea737e9e Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Mon, 11 Nov 2024 22:38:33 +0000 Subject: [PATCH 07/11] updates from proofread --- .../concurrency/{account-level.md => devices.md} | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) rename docs/media-storage/concurrency/{account-level.md => devices.md} (94%) diff --git a/docs/media-storage/concurrency/account-level.md b/docs/media-storage/concurrency/devices.md similarity index 94% rename from docs/media-storage/concurrency/account-level.md rename to docs/media-storage/concurrency/devices.md index 78954bb648..7c9b5012ca 100644 --- a/docs/media-storage/concurrency/account-level.md +++ b/docs/media-storage/concurrency/devices.md @@ -65,7 +65,7 @@ The major components in concurrency tracking are: ## Building a concurrency tracker -Four components are needed in this pattern +Four components are needed in this pattern: * Token vending machine * Device heartbeat @@ -316,9 +316,9 @@ Two things to note in the code for the device heartbeat: ### Heartbeat handler -Devices for a specific account will be tracked in a series of cache dictionaries. A unique cache dictionary will be used to track device heartbeats over a given time interval. The time interval can vary based on your business requirements. Our example will be evaluating concurrency once a minute. +Devices for a specific account will be tracked in a series of cache dictionaries. A unique cache dictionary is used to track device heartbeats over a given time interval. The time interval can vary based on your business requirements. Our example will be evaluating concurrency *once a minute*. -The naming convention for the interval-based dictionaries is `{accountId}-${intervalTime}`. To calculate the interval time, get the time in ticks of a given minute and round down. +The naming convention for the interval-based dictionaries is `{accountId}-{intervalTime}`. To calculate the interval time, get a number representation of time for a given minute and round down. @@ -336,11 +336,6 @@ function getIntervalMarker(minutesBack = 0) { ```go -import ( - "fmt" - "time" -) - func getIntervalMarker(minutesBack int) int64 { now := time.Now().Add(-time.Duration(minutesBack) * time.Minute) rounded := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location()) @@ -363,7 +358,7 @@ static long GetIntervalMarker(int minutesBack = 0) -To create the cache key that tracks the device heartbeat count, you append the value from the function above to the end of a user's account id. Creating keys this way means you have a dedicated cache item per interval. Coupled with an appropriate TTL, the cache item will clean itself up automatically, simplifying the code needed for the pattern. +To create the cache key that tracks the device heartbeat count, you append the value from the function above to the end of a user's account id. Creating keys in this manner provides a *dedicated cache item per interval*. Coupled with an appropriate [time to live (TTL)](/cache/learn/how-it-works/expire-data-with-ttl), the cache item will clean itself up automatically, simplifying the code needed for the pattern. @@ -409,8 +404,9 @@ if err != nil { -As heartbeats come in, the device id is stored as a value in the dictionary and a count is incremented. A [time to live (TTL)](/cache/learn/how-it-works/expire-data-with-ttl) is set on the dictionary for twice the interval length, so the data automatically cleans itself up when it is no longer needed. +As heartbeats come in, the device id is stored as a value in the dictionary and a count is incremented. A TTL is set on the dictionary for twice the interval length, so the data automatically cleans itself up when it is no longer needed. +Now that the data is stored, let's move on to the code that checks how many devices are actively steaming. ### Concurrency checker Lastly, we have the concurrency checker. Often rolled in as part of an [entitlement check](/media-storage/entitlements/about), this is the logic that reads the heartbeat dictionary and determines if an account is over their allowed limit. To check concurrency, we fetch the dictionary length from the *previous interval* and count the number of entries. From 14ac6ad9506dcd0e6945a7994bb3237d15c590a9 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Mon, 11 Nov 2024 22:39:25 +0000 Subject: [PATCH 08/11] fix diagram label --- docs/media-storage/concurrency/devices.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/media-storage/concurrency/devices.md b/docs/media-storage/concurrency/devices.md index 7c9b5012ca..984c2801f3 100644 --- a/docs/media-storage/concurrency/devices.md +++ b/docs/media-storage/concurrency/devices.md @@ -43,9 +43,9 @@ flowchart TD C3[Device N] end - C1 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 - C2 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 - C3 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + C1 -- "Heartbeat ➡ Account ID + Timestamp" --> B1 + C2 -- "Heartbeat ➡ Account ID + Timestamp" --> B1 + C3 -- "Heartbeat ➡ Account ID + Timestamp" --> B1 B1 -- "Capped to Most Recent N Entries" --> B2 A -- "Evaluate Active Concurrency" --> B2 From c66fc5564bc228f893b8d403fa908244b4c7deb9 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Mon, 11 Nov 2024 16:45:05 -0600 Subject: [PATCH 09/11] fix broken links --- .../concurrency/account-level.md | 487 ++++++++++++++++++ 1 file changed, 487 insertions(+) create mode 100644 docs/media-storage/concurrency/account-level.md diff --git a/docs/media-storage/concurrency/account-level.md b/docs/media-storage/concurrency/account-level.md new file mode 100644 index 0000000000..3273e1dd29 --- /dev/null +++ b/docs/media-storage/concurrency/account-level.md @@ -0,0 +1,487 @@ +--- +sidebar_position: 2 +sidebar_label: Device management +title: Concurrency tracking +description: Learn how to track and manage concurrent devices for an account +hide_title: true +keywords: + - momento + - media storage + - zero buffer rate + - zbr + - streaming + - live + - elemental + - serverless + - metrics + - concurrency tracking +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +# Tracking concurrent devices for an account + +Tracking concurrent devices or streams per account is essential for applications where managing access, ensuring security, and optimizing resources are top priorities. In subscription-based services, enforcing device or stream limits restrict simultaneous access to content to a specified number of devices. **Concurrency tracking** also supports account security by preventing unauthorized sharing, safeguarding the service from potential misuse. This also enables resource optimization in high-traffic applications, especially within media and entertainment, by managing resources dynamically based on real-time demand. + +The pattern outlined below demonstrates how Momento provides **real-time session monitoring** without the need for complex infrastructure. + +## Overview + +```mermaid +flowchart TD + subgraph Account + A[Account ID] + end + subgraph Momento + B1[Store Heartbeats in Cache Dictionary] + B2[Evaluate Unique Devices on Entitlement Check] + end + subgraph Device + C1[Device 1] + C2[Device 2] + C3[Device N] + end + + C1 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + C2 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + C3 -- "Heartbeat ➡ Session ID + Timestamp" --> B1 + + B1 -- "Capped to Most Recent N Entries" --> B2 + A -- "Evaluate Active Concurrency" --> B2 + + Account -.->|Check Entitlement| A +``` + +Monitoring concurrency with Momento relies on [heartbeats](/media-storage/enhancements/heartbeats) to be emitted from connected players. A server component manages [cache dictionaries](/cache/develop/basics/datatypes#dictionaries) in Momento that track the heartbeats from unique players over a given interval. During an [entitlement check](/media-storage/entitlements/about), the last *complete* interval dictionary is fetched and the concurrency count is determined. + +The major components in concurrency tracking are: + +* **Device** - Each device or stream sends a heartbeat via [Momento Topics](/topics), which includes a unique session ID. +* **Momento** + * **Cache** - Stores recent heartbeats for each account in interval based cache dictionaries. + * **Auth** - Creates [session tokens](/cache/develop/authentication/tokens) for players, encoding the account id directly in the token. +* **Account** - Represents the user account in your system. + +## Building a concurrency tracker + +Four components are needed in this pattern + +* Token vending machine +* Device heartbeat +* Heartbeat handler +* Concurrency checker + +### Token vending machine + +A token vending machine is a pattern that dispenses short-lived session tokens with limited permissions. This is a server-side component, usually an API endpoint, that dynamically generates the token. Below is a snippet of code used to create a session token. This code should live inside of your API endpoint handler. + + + + +```javascript +const scope = { permissions: [ + { + role: 'publishonly', + cache: 'video', + topic: 'heartbeat' + } +]}; + +const response = await authClient.generateDisposableToken(scope, ExpiresIn.minutes(30), { tokenId: accountId }); +if(response.type === GenerateDisposableTokenResponse.Success){ + return { token: response.authToken }; +} +``` + + + +```go +resp, err := authClient.GenerateDisposableToken(ctx, &momento.GenerateDisposableTokenRequest{ + ExpiresIn: utils.ExpiresInMinutes(30), + Scope: momento.TopicPublishOnly( + momento.CacheName{Name: "video"}, + momento.TopicName{Name: "heartbeat"}, + ), + Props: momento.DisposableTokenProps{ + TokenId: &req.PlayerID, + }, + }) + + if err != nil { + http.Error(w, "Failed to generate token", http.StatusInternalServerError) + return + } + + switch r := resp.(type) { + case *auth_resp.GenerateDisposableTokenSuccess: + return r.ApiKey + default: + http.Error(w, "Failed to generate token", http.StatusInternalServerError) + } +``` + + + + +```csharp +var response = await _authClient.GenerateDispableTokenAsync( + DisposableTokenScopes.TopicPublishOnly("video", "heartbeat"), + ExpiresIn.Minutes(30) +); + +return response.AuthToken; +``` + + + + +In the snippet above, we set explicit permissions to allow the user to *publish* messages to the `heartbeat` topic. This is the way the player heartbeat will communicate with our handler. The token is configured to live for 30 minutes and has the user's `accountId` embedded in the token. The embedded account id will show up as an argument in our heartbeat subscription on the server, *preventing messages from being spoofed* and adding a layer of security to our solution. + +:::info +In a production scenario, this code might live in your existing authZ mechanism and return the generated token as a claim. Assumptions are made here that prior to the code snippet above, the user has been authenticated and you have access to their account id and have securely identified the content their are attempting to view. +::: + +### Device heartbeat + +With the token vending machine in place, we can use it on the device to publish heartbeat on a regular interval. The heartbeat can contain any information about the media, player, or device based on your use case. For this simple walkthrough, we will provide the minimum amount of information and include only the device id. + + + + +```jsx +import React, { useEffect, useState, useMemo } from 'react'; +import ReactDOM from 'react-dom/client'; +import { TopicClient, CredentialProvider } from '@gomomento/sdk-web'; + +const HEARTBEAT_INTERVAL_MS = 5000; + +function getMediaIdFromQuery() { + const params = new URLSearchParams(window.location.search); + return params.get('id'); +} + +function Device() { + const [topicClient, setTopicClient] = useState(null); + + const mediaId = useMemo(() => getMediaIdFromQuery(), []); + const deviceId = useMemo(() => { + const savedDeviceId = localStorage.getItem('deviceId'); + if (savedDeviceId) return savedDeviceId; + + const newDeviceId = crypto.randomUUID(); + localStorage.setItem('deviceId', newDeviceId); + return newDeviceId; + }, []); + + const message = useMemo(() => JSON.stringify({ deviceId, mediaId }), [deviceId, mediaId]); + + useEffect(() => { + async function initTopicClient() { + const response = await fetch('http://localhost:3000/tokens', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ accountId: 'account-id' }), + }); + if (response.ok) { + const { token } = await response.json(); + const topicClient = new TopicClient({ + credentialProvider: CredentialProvider.fromString(token), + }); + setTopicClient(topicClient); + } + } + + initTopicClient(); + }, [mediaId]); + + useEffect(() => { + if (topicClient) { + const intervalId = setInterval(() => { + topicClient.publish('video', 'heartbeat', message); + }, HEARTBEAT_INTERVAL_MS); + + return () => clearInterval(intervalId); + } + }, [topicClient, mediaId, message]); + + return ( +
+

Device {deviceId}: {topicClient ? 'Connected' : 'Not Connected'}

+
+ ); +} + +const root = ReactDOM.createRoot(document.getElementById('root')); +root.render(); + +``` + +
+ + +```html + + + + +
+

Device : Not Connected

+
+ + + + + +``` + +
+
+ +In the above examples, the player html only includes the heartbeat logic. It calls the token vending machine from step one that we put behind an API endpoint running locally to fetch a token. Once the player has the token, it begins publishing the device id and media id to the `heartbeat` topic. The heartbeat is sent every 5 seconds so the heartbeat handler can track active instances. + +Two things to note in the code for the device heartbeat: + +1. The account id being supplied to the token vending machine is hardcoded, in practice this would come from your AuthN mechanism. +2. When calling the Momento HTTP API, the base url is [region based](/platform/regions). Substitute the placeholder with the correct region endpoint for your use case. If you use the Momento SDK, region handling is managed for you. + +*For a complete example of a token vending machine, [check out this tutorial](/media-storage/enhancements/live-reactions#step-1-building-a-token-vending-machine).* + +### Heartbeat handler + +Devices for a specific account will be tracked in a series of cache dictionaries. A unique cache dictionary will be used to track device heartbeats over a given time interval. The time interval can vary based on your business requirements. Our example will be evaluating concurrency once a minute. + +The naming convention for the interval-based dictionaries is `{accountId}-${intervalTime}`. To calculate the interval time, get the time in ticks of a given minute and round down. + + + + +```javascript +function getIntervalMarker(minutesBack = 0) { + const now = new Date(); + now.setTime(now.getTime() - minutesBack * 60000); + now.setSeconds(0, 0); + return now.getTime(); +} +``` + + + + +```go +import ( + "fmt" + "time" +) + +func getIntervalMarker(minutesBack int) int64 { + now := time.Now().Add(-time.Duration(minutesBack) * time.Minute) + rounded := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location()) + return rounded.UnixNano() / int64(time.Millisecond) +} +``` + + + + +```csharp +static long GetIntervalMarker(int minutesBack = 0) +{ + DateTime now = DateTime.UtcNow.AddMinutes(-minutesBack); + now = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute, 0, DateTimeKind.Utc); + return new DateTimeOffset(now).ToUnixTimeMilliseconds(); +} +``` + + + + +To create the cache key that tracks the device heartbeat count, you append the value from the function above to the end of a user's account id. Creating keys this way means you have a dedicated cache item per interval. Coupled with an appropriate TTL, the cache item will clean itself up automatically, simplifying the code needed for the pattern. + + + + +```javascript +// Get the account id through your business logic +const accountId = getAccountId(req); +const { deviceId } = req.body; +const key = `${accountId}-${getIntervalMarker()}`; + +await cacheClient.dictionaryIncrement('video', key, deviceId, 1); +``` + + + + +```go +accountId := getAccountId(req) +deviceId := req.DeviceID + +key := fmt.Sprintf("%s-%d", accountId, getIntervalMarker()) +_, err := cacheClient.DictionaryIncrement(ctx, &momento.DictionaryIncrementRequest{ + CacheName: momento.String("video"), + DictionaryName: key, + Field: deviceId, + Amount: 1 +}) +if err != nil { + fmt.Println("Error incrementing cache:", err) +} +``` + + + + +```csharp + var accountId = GetAccountId(req); + var deviceId = req.Body.DeviceId; + var key = $"{accountId}-{getIntervalMarker()}"; + + await cacheClient.DictionaryIncrementAsync("video", key, deviceId, 1); +``` + + + +As heartbeats come in, the device id is stored as a value in the dictionary and a count is incremented. A [time to live (TTL)](/cache/learn/how-it-works/expire-data-with-ttl) is set on the dictionary for twice the interval length, so the data automatically cleans itself up when it is no longer needed. + +### Concurrency checker + +Lastly, we have the concurrency checker. Often rolled in as part of an [entitlement check](/media-storage/entitlements/about), this is the logic that reads the heartbeat dictionary and determines if an account is over their allowed limit. To check concurrency, we fetch the dictionary length from the *previous interval* and count the number of entries. + + + + +```javascript +async function getConcurrentDeviceCount(accountId) { + const interval = getIntervalMarker(1); + const intervalKey = `${accountId}-${interval}`; + let deviceCount = 0; + const response = await cacheClient.dictionaryLength('video', intervalKey); + if(response.type === CacheDictionaryLengthResponse.Hit){ + deviceCount = response.value(); + } + return deviceCount; +} +``` + + + + +```go +func getConcurrentDeviceCount(accountId string) int { + interval := getIntervalMarker(1) + intervalKey := fmt.Sprintf("%s-%d", accountId, interval) + deviceCount := 0 + + resp, err := client.DictionaryLength(ctx, &momento.DictionaryLengthRequest{ + CacheName: momento.String("video"), + DictionaryName: intervalKey, + }) + if err != nil { + panic(err) + } + switch r := resp.(type) { + case *responses.DictionaryLengthHit: + deviceCount = int(r.Length()) + } + return deviceCount +} +``` + + + + +```csharp +static async Task GetConcurrentDeviceCount(CacheClient cacheClient, string accountId) +{ + long interval = GetIntervalMarker(1); + string intervalKey = $"{accountId}-{interval}"; + int deviceCount = 0; + + CacheDictionaryLengthResponse response = await cacheClient.DictionaryLengthAsync("video", intervalKey); + if (response is CacheDictionaryLengthResponse.Hit) + { + deviceCount = response.Length; + } + + return deviceCount; +} +``` + + + + +This function uses the `getIntervalMarker` method we created in the previous step to get the time of our last interval, then calls Momento Cache to see how many entries are in the dictionary. Remember, each device that reported a heartbeat counts as an entry in the dictionary, so the length directly maps to the number of concurrent players. + +The value is returned to the caller and it's up to standard business logic to take over from there. + +:::tip +For a complete example of this pattern in action, along with other practical patterns for media streaming, check out our [demo on GitHub](https://github.com/momentohq/demo-video-streaming)! +::: From 0de12ba3e55b5d1d0d699b3034ae28e3510e5073 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Tue, 12 Nov 2024 09:27:33 -0600 Subject: [PATCH 10/11] fix broken links for real --- docs/media-storage/concurrency/devices.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/media-storage/concurrency/devices.md b/docs/media-storage/concurrency/devices.md index 984c2801f3..e7edd21ac9 100644 --- a/docs/media-storage/concurrency/devices.md +++ b/docs/media-storage/concurrency/devices.md @@ -6,7 +6,7 @@ description: Learn how to track and manage concurrent devices for an account hide_title: true keywords: - momento - - mediastore + - media storage - zero buffer rate - zbr - streaming @@ -53,7 +53,7 @@ flowchart TD Account -.->|Check Entitlement| A ``` -Monitoring concurrency with Momento relies on [heartbeats](/mediastore/enhancements/heartbeats) to be emitted from connected players. A server component manages [cache dictionaries](/cache/develop/basics/datatypes#dictionaries) in Momento that track the heartbeats from unique players over a given interval. During an [entitlement check](/mediastore/entitlements/about), the last *complete* interval dictionary is fetched and the concurrency count is determined. +Monitoring concurrency with Momento relies on [heartbeats](/media-storage/enhancements/heartbeats) to be emitted from connected players. A server component manages [cache dictionaries](/cache/develop/basics/datatypes#dictionaries) in Momento that track the heartbeats from unique players over a given interval. During an [entitlement check](/media-storage/entitlements/about), the last *complete* interval dictionary is fetched and the concurrency count is determined. The major components in concurrency tracking are: From 3ff742f643a58dbcc066e75b6959fb2a22fafe67 Mon Sep 17 00:00:00 2001 From: Allen Helton Date: Tue, 12 Nov 2024 10:28:52 -0600 Subject: [PATCH 11/11] fix unrelated broken japanese link --- .../current/platform/connectivity/private-link.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/ja/docusaurus-plugin-content-docs/current/platform/connectivity/private-link.md b/i18n/ja/docusaurus-plugin-content-docs/current/platform/connectivity/private-link.md index 3bc2818e7c..6b9f26fb6c 100644 --- a/i18n/ja/docusaurus-plugin-content-docs/current/platform/connectivity/private-link.md +++ b/i18n/ja/docusaurus-plugin-content-docs/current/platform/connectivity/private-link.md @@ -25,7 +25,7 @@ keywords: # AWS PrivateLinkとMomentoのプライベートアクセスオプションで安全に接続 -Momentoは、セキュリティと柔軟性を優先し、成長に合わせて拡張する場合でも、コンプライアンスに合わせて最適化する場合でも、インフラストラクチャのニーズを満たします。Momentoは、プライベートVPCにデプロイされていないアプリケーションのために、デフォルトでセキュアな[パブリックエンドポイント](/プラットフォーム/リージョン)を提供しています。しかし、VPC*内からのアクセスが必要なお客様には、[AWS PrivateLink](https://aws.amazon.com/privatelink/)を介してセキュアに接続するオプションを提供しています。 +Momentoは、セキュリティと柔軟性を優先し、成長に合わせて拡張する場合でも、コンプライアンスに合わせて最適化する場合でも、インフラストラクチャのニーズを満たします。Momentoは、プライベートVPCにデプロイされていないアプリケーションのために、デフォルトでセキュアな[パブリックエンドポイント](/platform/regions)を提供しています。しかし、VPC*内からのアクセスが必要なお客様には、[AWS PrivateLink](https://aws.amazon.com/privatelink/)を介してセキュアに接続するオプションを提供しています。 このオプションを使用すると、Amazon VPCからMomentoのサービスに直接プライベートな接続を確立することができます。