From 1fdf473b25a0541fcece420d2b8a744c12008fc7 Mon Sep 17 00:00:00 2001 From: Martin Sottnik Date: Fri, 21 Jun 2024 13:55:41 +0200 Subject: [PATCH] FR-233 remove device stream implementation (#436) * remove device stream implementation * update stream mutation added * add migration --- .../migration.sql | 5 + prisma/schema.prisma | 19 ++- src/schema/api.graphql | 20 +++ src/schema/nexus-typegen.ts | 49 ++++++ src/schema/stream.ts | 145 ++++++++++++++++++ 5 files changed, 230 insertions(+), 8 deletions(-) create mode 100644 prisma/migrations/20240619074216_add_blueprint_to_stream/migration.sql diff --git a/prisma/migrations/20240619074216_add_blueprint_to_stream/migration.sql b/prisma/migrations/20240619074216_add_blueprint_to_stream/migration.sql new file mode 100644 index 00000000..897f4c87 --- /dev/null +++ b/prisma/migrations/20240619074216_add_blueprint_to_stream/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable +ALTER TABLE "stream" ADD COLUMN "blueprint_id" TEXT; + +-- AddForeignKey +ALTER TABLE "stream" ADD CONSTRAINT "stream_blueprint_id_fkey" FOREIGN KEY ("blueprint_id") REFERENCES "device_blueprint"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 542cd122..6173e1ac 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -113,6 +113,7 @@ model blueprint { tenantId String @map("tenant_id") template String device device[] + stream stream[] @@unique([name, tenantId], name: "udx_device_blueprint_name_tenant_id") @@index([tenantId], map: "idx_device_blueprint_tenant_id") @@ -120,14 +121,16 @@ model blueprint { } model stream { - id String @id @default(uuid()) - createdAt DateTime @default(now()) @map("created_at") - updatedAt DateTime @updatedAt @map("updated_at") - deviceName String @map("device_name") - streamName String @map("stream_name") - streamParameters Json? @map("stream_parameters") - tenantId String @map("tenant_id") - device device @relation(fields: [deviceName, tenantId], references: [name, tenantId]) + id String @id @default(uuid()) + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + deviceName String @map("device_name") + streamName String @map("stream_name") + streamParameters Json? @map("stream_parameters") + tenantId String @map("tenant_id") + device device @relation(fields: [deviceName, tenantId], references: [name, tenantId]) + blueprintId String? @map("blueprint_id") + blueprint blueprint? @relation(fields: [blueprintId], references: [id]) @@unique([deviceName, streamName, tenantId], name: "udx_device_name_stream_name_id") } diff --git a/src/schema/api.graphql b/src/schema/api.graphql index 2d14915a..c529f64c 100644 --- a/src/schema/api.graphql +++ b/src/schema/api.graphql @@ -55,6 +55,7 @@ type AddSnapshotPayload { } input AddStreamInput { + blueprintId: String deviceName: String! streamName: String! streamParameters: String @@ -229,6 +230,10 @@ type DeleteSnapshotPayload { snapshot: Snapshot } +type DeleteStreamPayload { + stream: Stream +} + type Device implements Node { address: String blueprint: Blueprint @@ -434,6 +439,7 @@ type Mutation { deleteDevice(id: String!): DeleteDevicePayload! deleteLabel(id: String!): DeleteLabelPayload! deleteSnapshot(input: DeleteSnapshotInput!): DeleteSnapshotPayload + deleteStream(id: String!): DeleteStreamPayload! importCSV(input: CSVImportInput!): CSVImport installDevice(id: String!): InstallDevicePayload! reconnectKafka: IsOkResponse @@ -445,6 +451,7 @@ type Mutation { updateDataStore(deviceId: String!, input: UpdateDataStoreInput!, transactionId: String!): UpdateDataStorePayload! updateDevice(id: String!, input: UpdateDeviceInput!): UpdateDevicePayload! updateGraphNodeCoordinates(input: UpdateGraphNodeCoordinatesInput!): UpdateGraphNodeCoordinatesPayload! + updateStream(id: String!, input: UpdateStreamInput!): UpdateStreamPayload! } type NetInterface { @@ -638,11 +645,13 @@ enum SortStreamBy { } type Stream implements Node { + blueprint: Blueprint createdAt: String! deviceName: String! id: ID! isActive: Boolean! streamName: String! + streamParameters: String updatedAt: String! } @@ -797,6 +806,17 @@ type UpdateGraphNodeCoordinatesPayload { deviceNames: [String!]! } +input UpdateStreamInput { + blueprintId: String + deviceName: String! + streamName: String! + streamParameters: String +} + +type UpdateStreamPayload { + stream: Stream +} + """ The `Upload` scalar type represents a file upload. """ diff --git a/src/schema/nexus-typegen.ts b/src/schema/nexus-typegen.ts index 79e98c31..bee25859 100644 --- a/src/schema/nexus-typegen.ts +++ b/src/schema/nexus-typegen.ts @@ -66,6 +66,7 @@ export interface NexusGenInputs { }; AddStreamInput: { // input type + blueprintId?: string | null; // String deviceName: string; // String! streamName: string; // String! streamParameters?: string | null; // String @@ -175,6 +176,13 @@ export interface NexusGenInputs { coordinates: NexusGenInputs['GraphNodeCoordinatesInput'][]; // [GraphNodeCoordinatesInput!]! layer?: NexusGenEnums['TopologyLayer'] | null; // TopologyLayer }; + UpdateStreamInput: { + // input type + blueprintId?: string | null; // String + deviceName: string; // String! + streamName: string; // String! + streamParameters?: string | null; // String + }; } export interface NexusGenEnums { @@ -325,6 +333,10 @@ export interface NexusGenObjects { // root type snapshot?: NexusGenRootTypes['Snapshot'] | null; // Snapshot }; + DeleteStreamPayload: { + // root type + stream?: NexusGenRootTypes['Stream'] | null; // Stream + }; Device: SourceTypes.Device; DeviceConnection: { // root type @@ -657,6 +669,10 @@ export interface NexusGenObjects { // root type deviceNames: string[]; // [String!]! }; + UpdateStreamPayload: { + // root type + stream?: NexusGenRootTypes['Stream'] | null; // Stream + }; Zone: SourceTypes.Zone; ZoneEdge: { // root type @@ -834,6 +850,10 @@ export interface NexusGenFieldTypes { // field return type snapshot: NexusGenRootTypes['Snapshot'] | null; // Snapshot }; + DeleteStreamPayload: { + // field return type + stream: NexusGenRootTypes['Stream'] | null; // Stream + }; Device: { // field return type address: string | null; // String @@ -985,6 +1005,7 @@ export interface NexusGenFieldTypes { deleteDevice: NexusGenRootTypes['DeleteDevicePayload']; // DeleteDevicePayload! deleteLabel: NexusGenRootTypes['DeleteLabelPayload']; // DeleteLabelPayload! deleteSnapshot: NexusGenRootTypes['DeleteSnapshotPayload'] | null; // DeleteSnapshotPayload + deleteStream: NexusGenRootTypes['DeleteStreamPayload']; // DeleteStreamPayload! importCSV: NexusGenRootTypes['CSVImport'] | null; // CSVImport installDevice: NexusGenRootTypes['InstallDevicePayload']; // InstallDevicePayload! reconnectKafka: NexusGenRootTypes['IsOkResponse'] | null; // IsOkResponse @@ -996,6 +1017,7 @@ export interface NexusGenFieldTypes { updateDataStore: NexusGenRootTypes['UpdateDataStorePayload']; // UpdateDataStorePayload! updateDevice: NexusGenRootTypes['UpdateDevicePayload']; // UpdateDevicePayload! updateGraphNodeCoordinates: NexusGenRootTypes['UpdateGraphNodeCoordinatesPayload']; // UpdateGraphNodeCoordinatesPayload! + updateStream: NexusGenRootTypes['UpdateStreamPayload']; // UpdateStreamPayload! }; NetInterface: { // field return type @@ -1155,11 +1177,13 @@ export interface NexusGenFieldTypes { }; Stream: { // field return type + blueprint: NexusGenRootTypes['Blueprint'] | null; // Blueprint createdAt: string; // String! deviceName: string; // String! id: string; // ID! isActive: boolean; // Boolean! streamName: string; // String! + streamParameters: string | null; // String updatedAt: string; // String! }; StreamConnection: { @@ -1271,6 +1295,10 @@ export interface NexusGenFieldTypes { // field return type deviceNames: string[]; // [String!]! }; + UpdateStreamPayload: { + // field return type + stream: NexusGenRootTypes['Stream'] | null; // Stream + }; Zone: { // field return type createdAt: string; // String! @@ -1448,6 +1476,10 @@ export interface NexusGenFieldTypeNames { // field return type name snapshot: 'Snapshot'; }; + DeleteStreamPayload: { + // field return type name + stream: 'Stream'; + }; Device: { // field return type name address: 'String'; @@ -1599,6 +1631,7 @@ export interface NexusGenFieldTypeNames { deleteDevice: 'DeleteDevicePayload'; deleteLabel: 'DeleteLabelPayload'; deleteSnapshot: 'DeleteSnapshotPayload'; + deleteStream: 'DeleteStreamPayload'; importCSV: 'CSVImport'; installDevice: 'InstallDevicePayload'; reconnectKafka: 'IsOkResponse'; @@ -1610,6 +1643,7 @@ export interface NexusGenFieldTypeNames { updateDataStore: 'UpdateDataStorePayload'; updateDevice: 'UpdateDevicePayload'; updateGraphNodeCoordinates: 'UpdateGraphNodeCoordinatesPayload'; + updateStream: 'UpdateStreamPayload'; }; NetInterface: { // field return type name @@ -1769,11 +1803,13 @@ export interface NexusGenFieldTypeNames { }; Stream: { // field return type name + blueprint: 'Blueprint'; createdAt: 'String'; deviceName: 'String'; id: 'ID'; isActive: 'Boolean'; streamName: 'String'; + streamParameters: 'String'; updatedAt: 'String'; }; StreamConnection: { @@ -1885,6 +1921,10 @@ export interface NexusGenFieldTypeNames { // field return type name deviceNames: 'String'; }; + UpdateStreamPayload: { + // field return type name + stream: 'Stream'; + }; Zone: { // field return type name createdAt: 'String'; @@ -2008,6 +2048,10 @@ export interface NexusGenArgTypes { // args input: NexusGenInputs['DeleteSnapshotInput']; // DeleteSnapshotInput! }; + deleteStream: { + // args + id: string; // String! + }; importCSV: { // args input: NexusGenInputs['CSVImportInput']; // CSVImportInput! @@ -2054,6 +2098,11 @@ export interface NexusGenArgTypes { // args input: NexusGenInputs['UpdateGraphNodeCoordinatesInput']; // UpdateGraphNodeCoordinatesInput! }; + updateStream: { + // args + id: string; // String! + input: NexusGenInputs['UpdateStreamInput']; // UpdateStreamInput! + }; }; Query: { blueprints: { diff --git a/src/schema/stream.ts b/src/schema/stream.ts index be2d862a..6ed8ce54 100644 --- a/src/schema/stream.ts +++ b/src/schema/stream.ts @@ -11,6 +11,8 @@ import { uninstallDeviceCache, } from '../external-api/uniconfig-cache'; import { getMountParamsForStream, getUniconfigStreamName } from '../helpers/stream-helpers'; +import config from '../config'; +import { Blueprint } from './blueprint'; export const StreamNode = objectType({ name: 'Stream', @@ -42,6 +44,27 @@ export const StreamNode = objectType({ return isActive; }, }); + t.string('streamParameters', { + resolve: async (root) => { + if (root.streamParameters != null) { + return JSON.stringify(root.streamParameters); + } + return null; + }, + }); + t.field('blueprint', { + type: Blueprint, + resolve: async (stream, _, { prisma }) => { + const { blueprintId } = stream; + + if (blueprintId == null) { + return null; + } + + const blueprint = await prisma.blueprint.findUnique({ where: { id: blueprintId } }); + return blueprint; + }, + }); }, }); @@ -116,6 +139,7 @@ export const AddStreamInput = inputObjectType({ t.nonNull.string('streamName'); t.nonNull.string('deviceName'); t.string('streamParameters'); + t.string('blueprintId'); }, }); @@ -136,11 +160,13 @@ export const AddStreamMutation = extendType({ }, resolve: async (_, args, { prisma, tenantId }) => { const { input } = args; + const nativeBlueprintId = input.blueprintId != null ? fromGraphId('Blueprint', input.blueprintId) : undefined; const stream = await prisma.stream.create({ data: { deviceName: input.deviceName, streamName: input.streamName, streamParameters: input.streamParameters != null ? JSON.parse(input.streamParameters) : undefined, + blueprintId: nativeBlueprintId, tenantId, }, }); @@ -241,3 +267,122 @@ export const DeactivateStreamMutation = extendType({ }); }, }); + +export const DeleteStreamPayload = objectType({ + name: 'DeleteStreamPayload', + definition: (t) => { + t.field('stream', { type: StreamNode }); + }, +}); + +export const DeleteStreamMutation = extendType({ + type: 'Mutation', + definition: (t) => { + t.nonNull.field('deleteStream', { + type: DeleteStreamPayload, + args: { + id: nonNull(stringArg()), + }, + resolve: async (_, args, { prisma, tenantId, kafka, inventoryKafka }) => { + const nativeId = fromGraphId('Stream', args.id); + const dbStream = await prisma.stream.findFirst({ + where: { id: nativeId, AND: { tenantId } }, + include: { device: true }, + }); + if (dbStream == null) { + throw new Error('device not found'); + } + const uniconfigURL = await getUniconfigURL(prisma, dbStream.device.uniconfigZoneId); + const isActive = await getCachedDeviceInstallStatus( + uniconfigURL, + getUniconfigStreamName(dbStream.streamName, dbStream.deviceName), + ); + if (isActive) { + throw new Error('stream is installed in UniConfig'); + } + + try { + const deletedStream = await prisma.stream.delete({ where: { id: nativeId } }); + + if (config.kafkaEnabled) { + await inventoryKafka?.produceDeviceRemovalEvent( + kafka, + getUniconfigStreamName(dbStream.streamName, dbStream.deviceName), + ); + } + + return { stream: deletedStream }; + } catch (error) { + throw new Error('Error deleting stream'); + } + }, + }); + }, +}); + +export const UpdateStreamInput = inputObjectType({ + name: 'UpdateStreamInput', + definition: (t) => { + t.nonNull.string('streamName'); + t.nonNull.string('deviceName'); + t.string('blueprintId'); + t.string('streamParameters'); + }, +}); +export const UpdateStreamPayload = objectType({ + name: 'UpdateStreamPayload', + definition: (t) => { + t.field('stream', { type: StreamNode }); + }, +}); +export const UpdateStreamMutation = extendType({ + type: 'Mutation', + definition: (t) => { + t.nonNull.field('updateStream', { + type: UpdateStreamPayload, + args: { + id: nonNull(stringArg()), + input: nonNull(arg({ type: UpdateStreamInput })), + }, + resolve: async (_, args, { prisma, tenantId }) => { + const nativeId = fromGraphId('Stream', args.id); + const dbStream = await prisma.stream.findFirst({ + where: { id: nativeId, tenantId }, + include: { device: true }, + }); + if (dbStream == null) { + throw new Error('stream not found'); + } + const uniconfigURL = await getUniconfigURL(prisma, dbStream.device.uniconfigZoneId); + const isActive = await getCachedDeviceInstallStatus( + uniconfigURL, + getUniconfigStreamName(dbStream.streamName, dbStream.deviceName), + ); + if (isActive) { + throw new Error('active is installed in UniConfig'); + } + const { input } = args; + const streamParameters = + input.streamParameters != null ? JSON.parse(input.streamParameters) : input.streamParameters; + + try { + const updatedStream = await prisma.stream.update({ + where: { id: nativeId }, + data: { + streamName: input.streamName, + deviceName: input.deviceName, + streamParameters, + // blueprint: input.blueprintId + // ? { connect: { id: fromGraphId('Blueprint', input.blueprintId) } } + // : undefined, + }, + }); + + return { stream: updatedStream }; + } catch (error) { + throw new Error('Error updating device'); + } + }, + }); + }, +});