diff --git a/package.json b/package.json index 5c910a1..83c52bd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nuclia-sync-agent-app", - "version": "1.2.21", + "version": "1.3.0", "description": "This is a Nuclia Sync Agent App", "main": "build/index.js", "scripts": { diff --git a/server/CHANGELOG.md b/server/CHANGELOG.md index d6d11aa..416692b 100644 --- a/server/CHANGELOG.md +++ b/server/CHANGELOG.md @@ -1,3 +1,7 @@ +# 1.3.0 (2024-04-25) + +- Support deletion syncing (when a file is deleted in a source, the corresponding resource is deleted in the Nuclia Knowledge Box) + # 1.2.21 (2024-04-18) - Do not override title on existing resource diff --git a/server/src/logic/connector/domain/connector.ts b/server/src/logic/connector/domain/connector.ts index f320272..3893ff6 100644 --- a/server/src/logic/connector/domain/connector.ts +++ b/server/src/logic/connector/domain/connector.ts @@ -29,6 +29,7 @@ export const SyncItemValidator = z.object({ mimeType: z.string().optional(), isFolder: z.boolean().optional(), parents: z.array(z.string()).optional(), + deleted: z.boolean().optional(), }); export type SyncItem = z.infer; @@ -55,7 +56,7 @@ export interface IConnector { getParameters(): ConnectorParameters; getFolders(query?: string): Observable; getFilesFromFolders(folders: SyncItem[]): Observable; - getLastModified(since: string, folders?: SyncItem[]): Observable; + getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable; // we cannot use the TextField from the SDK because we want to keep connectors independant download(resource: SyncItem): Observable; getLink(resource: SyncItem): Observable; diff --git a/server/src/logic/connector/infrastructure/connectors/confluence.connector.ts b/server/src/logic/connector/infrastructure/connectors/confluence.connector.ts index 50c068f..c45f775 100644 --- a/server/src/logic/connector/infrastructure/connectors/confluence.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/confluence.connector.ts @@ -60,15 +60,32 @@ export class ConfluenceImpl implements IConnector { ); } - getLastModified(since: string, folders?: SyncItem[]): Observable { + getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable { if (!folders || folders.length === 0) { return of({ items: [], }); } else { - return forkJoin((folders || []).map((folder) => this._getFiles('', false, folder.originalId, since))).pipe( + const newFiles = forkJoin( + (folders || []).map((folder) => this._getFiles('', false, folder.originalId, since)), + ).pipe( map((results) => ({ items: results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]) })), ); + const deletedFiles = forkJoin( + (folders || []).map((folder) => this._getFiles('', false, folder.originalId, since, true)), + ).pipe( + map((results) => ({ items: results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]) })), + ); + return forkJoin([newFiles, deletedFiles]).pipe( + map(([newFiles, deletedFiles]) => { + const deleted = deletedFiles.items + .filter((item) => existings?.includes(item.originalId)) + .map((item) => ({ ...item, deleted: true })); + return { + items: [...newFiles.items, ...deleted], + }; + }), + ); } } @@ -77,6 +94,7 @@ export class ConfluenceImpl implements IConnector { loadFolders = false, folder = '', lastModified?: string, + deleted = false, start?: number, previous?: SearchResults, ): Observable { @@ -115,6 +133,9 @@ export class ConfluenceImpl implements IConnector { } else { endpoint += '/rest/api/content?'; } + if (deleted) { + endpoint += '&status=trashed'; + } } return from( fetch(`${endpoint}&limit=${BATCH_SIZE}&start=${start || 0}`, { @@ -132,7 +153,7 @@ export class ConfluenceImpl implements IConnector { const items = [...(previous?.items || []), ...newItems]; const next = (start || 0) + BATCH_SIZE; return result._links.next - ? this._getFiles(query, loadFolders, folder, lastModified, next, { items }) + ? this._getFiles(query, loadFolders, folder, lastModified, deleted, next, { items }) : of({ items }); }), ); @@ -141,7 +162,8 @@ export class ConfluenceImpl implements IConnector { /* eslint-disable @typescript-eslint/no-explicit-any */ private mapResults(raw: any, isFolder = false): SyncItem { const isAttachment = raw.type === 'attachment'; - const itemOriginalId = isAttachment ? raw._links?.webui?.split('pageId=')[1]?.split('&')[0] || '' : raw.id; + const pageId = raw._links?.webui?.split('pageId=')[1]?.split('&')[0] || ''; + const itemOriginalId = isAttachment ? `${pageId}|${raw.id}` || '' : raw.id; return { title: (isFolder ? raw.name : raw.title) || '', originalId: (isFolder ? raw.key : itemOriginalId) || '', @@ -157,8 +179,9 @@ export class ConfluenceImpl implements IConnector { ): Observable<{ body: string; format?: 'PLAIN' | 'MARKDOWN' | 'HTML' } | Blob | undefined> { try { if (resource.metadata.type === 'attachment') { + const id = resource.originalId.split('|')[0]; return from( - fetch(`${this.params.url}/download/attachments/${resource.originalId}/${resource.title}`, { + fetch(`${this.params.url}/download/attachments/${id}/${resource.title}`, { method: 'GET', headers: { Authorization: `Basic ${btoa(this.params.user + ':' + this.params.token)}`, diff --git a/server/src/logic/connector/infrastructure/connectors/dropbox.connector.ts b/server/src/logic/connector/infrastructure/connectors/dropbox.connector.ts index 687096e..6d2590b 100644 --- a/server/src/logic/connector/infrastructure/connectors/dropbox.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/dropbox.connector.ts @@ -40,7 +40,7 @@ export class DropboxImpl extends OAuthBaseConnector implements IConnector { return true; } - getLastModified(since: string, folders?: SyncItem[] | undefined): Observable { + getLastModified(since: string, folders?: SyncItem[] | undefined, existings?: string[]): Observable { if ((folders ?? []).length === 0) { return of({ items: [], @@ -48,12 +48,14 @@ export class DropboxImpl extends OAuthBaseConnector implements IConnector { } return forkJoin((folders || []).map((folder) => this._getFiles('', false, folder.originalId))).pipe( map((results) => { - const items = results.reduce( - (acc, result) => acc.concat(result.items.filter((item) => item.modifiedGMT && item.modifiedGMT > since)), - [] as SyncItem[], - ); + const items = results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]); + const currentIds = items.map((item) => item.originalId); + const newItems = items.filter((item) => item.modifiedGMT && item.modifiedGMT > since); + const toDelete = (existings || []) + .filter((id) => !currentIds?.includes(id)) + .map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true })); return { - items, + items: [...newItems, ...toDelete], }; }), ); diff --git a/server/src/logic/connector/infrastructure/connectors/folder.connector.ts b/server/src/logic/connector/infrastructure/connectors/folder.connector.ts index 0688ca4..93375a1 100644 --- a/server/src/logic/connector/infrastructure/connectors/folder.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/folder.connector.ts @@ -1,7 +1,7 @@ import { Blob as FSBlob } from 'buffer'; import * as fs from 'fs'; import path from 'path'; -import { forkJoin, from, map, Observable, of, switchMap } from 'rxjs'; +import { forkJoin, from, map, Observable, of, switchMap, tap } from 'rxjs'; import { ConnectorParameters, FileStatus, IConnector, Link, SearchResults, SyncItem } from '../../domain/connector'; import { SourceConnectorDefinition } from '../factory'; import { lookup } from 'mime-types'; @@ -59,16 +59,18 @@ class FolderImpl implements IConnector { ); } - getLastModified(since: string, folders?: SyncItem[]): Observable { + getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable { if ((folders ?? []).length === 0) { return of({ items: [], }); } + let currentIds: string[] = []; return forkJoin( (folders || []).map((folder) => this._getFiles(folder.originalId).pipe( + tap((result) => (currentIds = [...currentIds, ...result.items.map((item) => item.originalId)])), switchMap((results) => from(this.getFilesModifiedSince(results.items, since)).pipe( map((items) => ({ items, error: results.error })), @@ -83,6 +85,10 @@ class FolderImpl implements IConnector { .map((result) => result.error) .filter((error) => !!error) .join('. '); + if (existings && existings.length > 0) { + const toDelete = existings.filter((id) => !currentIds.includes(id)); + items.push(...toDelete.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true }))); + } return { items, error: errors, diff --git a/server/src/logic/connector/infrastructure/connectors/gdrive.connector.ts b/server/src/logic/connector/infrastructure/connectors/gdrive.connector.ts index c2cb1be..8de31d8 100644 --- a/server/src/logic/connector/infrastructure/connectors/gdrive.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/gdrive.connector.ts @@ -41,7 +41,7 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector { return true; } - getLastModified(since: string, folders?: SyncItem[] | undefined): Observable { + getLastModified(since: string, folders?: SyncItem[] | undefined, existings?: string[]): Observable { if ((folders ?? []).length === 0) { return of({ items: [], @@ -49,12 +49,14 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector { } return forkJoin((folders || []).map((folder) => this._getFileItems('', folder.uuid))).pipe( map((results) => { - const items = results.reduce( - (acc, result) => acc.concat(result.items.filter((item) => item.modifiedGMT && item.modifiedGMT > since)), - [] as SyncItem[], - ); + const items = results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]); + const currentIds = items.map((item) => item.originalId); + const newItems = items.filter((item) => item.modifiedGMT && item.modifiedGMT > since); + const toDelete = (existings || []) + .filter((id) => !currentIds?.includes(id)) + .map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true })); return { - items, + items: [...newItems, ...toDelete], }; }), ); @@ -237,7 +239,7 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector { previous?: SearchResults, ): Observable { let path = - 'https://www.googleapis.com/drive/v3/files?pageSize=50&fields=nextPageToken,files(id,name,mimeType,modifiedTime,parents)'; + 'https://www.googleapis.com/drive/v3/files?pageSize=50&fields=nextPageToken,files(id,name,mimeType,modifiedTime,createdTime,trashed,parents)'; const allDrives = '&corpora=allDrives&supportsAllDrives=true&includeItemsFromAllDrives=true'; path += allDrives; if (query) { @@ -272,7 +274,10 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector { } } else { const nextPage = res['nextPageToken']; - const items = (res.files || []).map((item: unknown) => this.mapToSyncItem(item)); + const items = (res.files || []) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .filter((item: any) => !item.trashed) + .map((item: unknown) => this.mapToSyncItem(item)); const results = { items: [...(previous?.items || []), ...items], nextPage, @@ -290,7 +295,7 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector { uuid: item.id, title: item.name, originalId: item.id, - modifiedGMT: item.modifiedTime, + modifiedGMT: item.modifiedTime > item.createdTime ? item.modifiedTime : item.createdTime, parents: item.parents, mimeType: needsPdfConversion ? 'application/pdf' : item.mimeType, metadata: { diff --git a/server/src/logic/connector/infrastructure/connectors/onedrive.connector.ts b/server/src/logic/connector/infrastructure/connectors/onedrive.connector.ts index 36d052f..e9e7ae2 100644 --- a/server/src/logic/connector/infrastructure/connectors/onedrive.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/onedrive.connector.ts @@ -40,7 +40,7 @@ export class OneDriveImpl extends OAuthBaseConnector implements IConnector { return true; } - getLastModified(since: string, folders?: SyncItem[] | undefined): Observable { + getLastModified(since: string, folders?: SyncItem[] | undefined, existings?: string[]): Observable { if ((folders ?? []).length === 0) { return of({ items: [], @@ -48,12 +48,14 @@ export class OneDriveImpl extends OAuthBaseConnector implements IConnector { } return forkJoin((folders || []).map((folder) => this._getItems('', folder.uuid))).pipe( map((results) => { - const items = results.reduce( - (acc, result) => acc.concat(result.items.filter((item) => item.modifiedGMT && item.modifiedGMT > since)), - [] as SyncItem[], - ); + const items = results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]); + const currentIds = items.map((item) => item.originalId); + const newItems = items.filter((item) => item.modifiedGMT && item.modifiedGMT > since); + const toDelete = (existings || []) + .filter((id) => !currentIds?.includes(id)) + .map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true })); return { - items, + items: [...newItems, ...toDelete], }; }), ); diff --git a/server/src/logic/connector/infrastructure/connectors/rss.connector.ts b/server/src/logic/connector/infrastructure/connectors/rss.connector.ts index d29a7a3..502d34b 100644 --- a/server/src/logic/connector/infrastructure/connectors/rss.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/rss.connector.ts @@ -106,7 +106,8 @@ class RSSImpl implements IConnector { } // eslint-disable-next-line @typescript-eslint/no-unused-vars - getLastModified(since: string, folders?: SyncItem[]): Observable { + getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable { + // we do not manage deletions in this connectors as RSS will only push recent items return this._getFiles().pipe( map((searchResults) => ({ ...searchResults, diff --git a/server/src/logic/connector/infrastructure/connectors/sitemap.connector.ts b/server/src/logic/connector/infrastructure/connectors/sitemap.connector.ts index e4c2972..732d347 100644 --- a/server/src/logic/connector/infrastructure/connectors/sitemap.connector.ts +++ b/server/src/logic/connector/infrastructure/connectors/sitemap.connector.ts @@ -115,14 +115,22 @@ class SitemapImpl implements IConnector { } // eslint-disable-next-line @typescript-eslint/no-unused-vars - getLastModified(since: string, folders?: SyncItem[]): Observable { + getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable { return this._getFiles().pipe( - map((searchResults) => ({ - ...searchResults, - items: searchResults.items.filter( + map((searchResults) => { + const currentIds = searchResults.items.map((item) => item.originalId); + const toSync = searchResults.items.filter( (item) => !item.metadata['lastModified'] || item.metadata['lastModified'] > since, - ), - })), + ); + const toDelete = existings?.filter((id) => !currentIds.includes(id)) ?? []; + return { + ...searchResults, + items: [ + ...toSync, + ...toDelete.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true })), + ], + }; + }), ); } diff --git a/server/src/logic/sync/domain/dto/create-sync.dto.ts b/server/src/logic/sync/domain/dto/create-sync.dto.ts index bf7f9e3..dcac835 100644 --- a/server/src/logic/sync/domain/dto/create-sync.dto.ts +++ b/server/src/logic/sync/domain/dto/create-sync.dto.ts @@ -16,6 +16,7 @@ export class CreateSyncDto { filters: this.options.filters, disabled: this.options.disabled, syncSecurityGroups: this.options.syncSecurityGroups, + originalIds: this.options.originalIds, }; if (this.options.labels) returnObj.labels = this.options.labels; diff --git a/server/src/logic/sync/domain/dto/update-sync.dto.ts b/server/src/logic/sync/domain/dto/update-sync.dto.ts index 83c77ba..d0a978f 100644 --- a/server/src/logic/sync/domain/dto/update-sync.dto.ts +++ b/server/src/logic/sync/domain/dto/update-sync.dto.ts @@ -21,6 +21,7 @@ export class UpdateSyncDto { if (this.options.filters) returnObj.filters = this.options.filters; if (this.options.disabled !== undefined) returnObj.disabled = this.options.disabled; if (this.options.syncSecurityGroups !== undefined) returnObj.syncSecurityGroups = this.options.syncSecurityGroups; + if (this.options.originalIds !== undefined) returnObj.originalIds = this.options.originalIds; return returnObj; } diff --git a/server/src/logic/sync/domain/nuclia-cloud.ts b/server/src/logic/sync/domain/nuclia-cloud.ts index a38f9b7..a1674ce 100644 --- a/server/src/logic/sync/domain/nuclia-cloud.ts +++ b/server/src/logic/sync/domain/nuclia-cloud.ts @@ -204,6 +204,11 @@ export class NucliaCloud { ); } + delete(originalId: string): Observable { + const slug = sha256(originalId); + return this.getKb().pipe(switchMap((kb) => kb.getResourceFromData({ id: '', slug }).delete())); + } + private getKb(): Observable { if (this.kb) { return of(this.kb); @@ -237,6 +242,9 @@ export class NucliaCloud { if (metadata?.groups) { resource.security = { access_groups: metadata.groups }; } + if (metadata?.sourceId) { + resource.origin = { ...(resource.origin || {}), source_id: metadata.sourceId }; + } return resource; } } diff --git a/server/src/logic/sync/domain/sync.entity.ts b/server/src/logic/sync/domain/sync.entity.ts index 50f52ea..e9f4f49 100644 --- a/server/src/logic/sync/domain/sync.entity.ts +++ b/server/src/logic/sync/domain/sync.entity.ts @@ -85,6 +85,7 @@ export interface ISyncEntity { filters?: Filters; disabled?: boolean; syncSecurityGroups?: boolean; + originalIds?: string[]; } export class SyncEntity { @@ -99,10 +100,22 @@ export class SyncEntity { public filters?: Filters; public disabled?: boolean; public syncSecurityGroups?: boolean; + public originalIds?: string[]; constructor(options: ISyncEntity) { - const { connector, kb, labels, title, id, lastSyncGMT, foldersToSync, filters, disabled, syncSecurityGroups } = - options; + const { + connector, + kb, + labels, + title, + id, + lastSyncGMT, + foldersToSync, + filters, + disabled, + syncSecurityGroups, + originalIds, + } = options; this.connector = connector; this.kb = kb; this.labels = labels; @@ -113,6 +126,7 @@ export class SyncEntity { this.filters = filters; this.disabled = disabled; this.syncSecurityGroups = syncSecurityGroups; + this.originalIds = originalIds; this.setConnectorDefinition(); } @@ -134,18 +148,21 @@ export class SyncEntity { return this.sourceConnector.getFolders(); } - getLastModified(): Observable<{ success: boolean; results: SyncItem[]; error?: string }> { + getLastModified(existings?: string[]): Observable<{ success: boolean; results: SyncItem[]; error?: string }> { const foldersToSyncPending: SyncItem[] = (this.foldersToSync ?? []).filter( (folder) => folder.status === FileStatus.PENDING || folder.status === undefined, ); const foldersToSyncUpdated: SyncItem[] = (this.foldersToSync ?? []).filter( (folder) => folder.status === FileStatus.UPLOADED, ); - const getFilesFoldersUpdated = - foldersToSyncUpdated.length > 0 - ? this.sourceConnector!.getLastModified(this.lastSyncGMT || '2000-01-01T00:00:00.000Z', foldersToSyncUpdated) - : of({ items: [] } as SearchResults); - + let getFilesFoldersUpdated = of({ items: [] } as SearchResults); + if (foldersToSyncUpdated.length > 0) { + getFilesFoldersUpdated = this.sourceConnector!.getLastModified( + this.lastSyncGMT || '2000-01-01T00:00:00.000Z', + foldersToSyncUpdated, + existings, + ); + } const getFilesFolderPending = foldersToSyncPending.length > 0 ? this.sourceConnector!.getFilesFromFolders(foldersToSyncPending) diff --git a/server/src/logic/sync/domain/use-cases/sync-all-folders-data.use-case.ts b/server/src/logic/sync/domain/use-cases/sync-all-folders-data.use-case.ts index 80e6b2c..ad05af0 100644 --- a/server/src/logic/sync/domain/use-cases/sync-all-folders-data.use-case.ts +++ b/server/src/logic/sync/domain/use-cases/sync-all-folders-data.use-case.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { concatMap, delay, filter, lastValueFrom, map, of, switchMap, tap, toArray } from 'rxjs'; +import { concatMap, delay, filter, from, lastValueFrom, map, of, switchMap, toArray } from 'rxjs'; import { EVENTS } from '../../../../events/events'; import { eventEmitter } from '../../../../server'; @@ -21,12 +21,18 @@ export interface SyncAllFoldersUseCase { export class SyncAllFolders implements SyncAllFoldersUseCase { constructor(private readonly repository: ISyncRepository) {} - callbackFinishSync = (syncEntity: SyncEntity, processed: string[], successCount: number, error?: string) => { + callbackFinishSync = ( + syncEntity: SyncEntity, + processed: string[], + deleted: string[], + successCount: number, + error?: string, + ) => { eventEmitter.emit(EVENTS.FINISH_SYNCHRONIZATION_SYNC_OBJECT, { from: syncEntity.id, to: syncEntity.kb?.knowledgeBox || 'Unknown kb', date: new Date().toISOString(), - processed, + processed: [...processed, ...deleted], successCount, error, }); @@ -36,20 +42,22 @@ export class SyncAllFolders implements SyncAllFoldersUseCase { return folder; }); + const ids = new Set([...(syncEntity.originalIds || []).filter((id) => !deleted.includes(id)), ...processed]); const [message, updateSyncDto] = UpdateSyncDto.create({ lastSyncGMT: new Date().toISOString(), id: syncEntity.id, foldersToSync: foldersToSyncCopy, + originalIds: [...ids], }); if (updateSyncDto) { - new UpdateSync(this.repository).execute(updateSyncDto); + return from(new UpdateSync(this.repository).execute(updateSyncDto)); } else { throw new Error(`Error updating sync: ${message}`); } }; processSyncEntity(syncEntity: SyncEntity) { - return syncEntity.getLastModified().pipe( + return syncEntity.getLastModified(syncEntity.originalIds).pipe( map((result) => { return { result, syncEntity }; }), @@ -62,8 +70,7 @@ export class SyncAllFolders implements SyncAllFoldersUseCase { }); if (!result.success || result.results.length === 0) { - this.callbackFinishSync(syncEntity, [], 0, result.error); - return of(undefined); + return this.callbackFinishSync(syncEntity, [], [], 0, result.error); } return this.processItems(syncEntity, result.results); }), @@ -95,7 +102,7 @@ export class SyncAllFolders implements SyncAllFoldersUseCase { }), concatMap((item) => new SyncSingleFile(syncEntity, item).execute().pipe( - map((res) => ({ id: item.originalId, success: res.success })), + map((res) => ({ id: item.originalId, success: res.success, action: item.deleted ? 'delete' : 'upload' })), // do not overwhelm the source delay(500), ), @@ -110,17 +117,21 @@ export class SyncAllFolders implements SyncAllFoldersUseCase { if (syncObjectValues.length > 0) { await lastValueFrom( of(...syncObjectValues).pipe( - switchMap((syncObj) => new RefreshAccessToken(this.repository).execute(new SyncEntity(syncObj))), - switchMap((syncEntity) => + concatMap((syncObj) => new RefreshAccessToken(this.repository).execute(new SyncEntity(syncObj))), + concatMap((syncEntity) => this.processSyncEntity(syncEntity).pipe( - tap((result) => { + concatMap((result) => { if (result) { - const processed = result.map((res) => res.id); + const processed = result.filter((res) => res.success && res.action === 'upload').map((res) => res.id); + const deleted = result.filter((res) => res.success && res.action === 'delete').map((res) => res.id); const successCount = result.filter((res) => res.success).length; console.log('processed', processed); + console.log('deleted', deleted); console.log('successCount', successCount); - this.callbackFinishSync(syncEntity, processed, successCount, ''); + return this.callbackFinishSync(syncEntity, processed, deleted, successCount, ''); + } else { + return of(undefined); } }), ), diff --git a/server/src/logic/sync/domain/use-cases/sync-single-file.use-case.ts b/server/src/logic/sync/domain/use-cases/sync-single-file.use-case.ts index 031ceff..4848a5d 100644 --- a/server/src/logic/sync/domain/use-cases/sync-single-file.use-case.ts +++ b/server/src/logic/sync/domain/use-cases/sync-single-file.use-case.ts @@ -65,64 +65,23 @@ export class SyncSingleFile implements SyncSingleFileUseCase { if (!sync.kb) { return of({ success: false }); } - const nucliaConnector = new NucliaCloud(sync.kb); - return downloadFileOrLink(sync, item).pipe( - switchMap((data) => { - try { - if (data.type === ContentType.blob && data.blob) { - return from(data.blob.arrayBuffer()).pipe( - switchMap((arrayBuffer) => { - return nucliaConnector.upload(item.originalId, item.title, { - buffer: arrayBuffer, - metadata: { ...item.metadata, labels: sync.labels, groups: data.extra?.groups }, - mimeType: item.mimeType, - }); - }), - ); - } else if (data.type === ContentType.text && data.text) { - return nucliaConnector.upload(item.originalId, item.title, { - text: data.text, - metadata: { ...item.metadata, labels: sync.labels, groups: data.extra?.groups }, - }); - } else if (data.type === ContentType.link && data.link) { - const mimeType = - item.mimeType !== TO_BE_CHECKED ? of(item.mimeType || 'text/html') : this.checkMimetype(data.link.uri); - return mimeType.pipe( - switchMap((type) => - nucliaConnector.uploadLink( - item.originalId, - item.title, - data.link, - type, - { ...item.metadata, labels: sync.labels, groups: data.extra?.groups }, - { - headers: sync.connector.parameters.headers, - cookies: sync.connector.parameters.cookies, - localstorage: sync.connector.parameters.localstorage, - }, - ), - ), - map(() => ({ success: true, message: '' })), - ); - } else { - return of({ success: false, message: '' }); - } - } catch (err) { - return of({ success: false, message: `${err}` }); - } - }), + return this.uploadOrDelete(item, sync).pipe( tap((res) => { if (res.success) { - console.log(`Uploaded ${item.originalId} with success`); + const message = + res.action === 'delete' + ? `Deleted ${item.originalId} with success` + : `Uploaded ${item.originalId} with success`; + console.log(message); eventEmitter.emit(EVENTS.FINISH_SYNCHRONIZATION_SINGLE_FILE, { from: sync.id, to: sync.kb?.knowledgeBox || 'Unknown kb', date: new Date().toISOString(), status: 'success', - message: `Uploaded ${item.originalId} with success`, + message, }); } else { - console.warn(`Failed to upload ${item.originalId}`); + console.warn(`Failed to ${res.action} ${item.originalId}`); eventEmitter.emit(EVENTS.FINISH_SYNCHRONIZATION_SINGLE_FILE, { from: sync.id, to: sync.kb?.knowledgeBox || 'Unknown kb', @@ -135,6 +94,74 @@ export class SyncSingleFile implements SyncSingleFileUseCase { ); } + private uploadOrDelete( + item: SyncItem, + sync: SyncEntity, + ): Observable<{ success: boolean; message?: string; action: string }> { + const nucliaConnector = new NucliaCloud(sync.kb); + if (item.deleted) { + return nucliaConnector.delete(item.originalId).pipe( + map(() => ({ success: true, message: '', action: 'delete' })), + catchError((err) => of({ success: false, message: `${err}`, action: 'delete' })), + ); + } else { + return downloadFileOrLink(sync, item).pipe( + switchMap((data) => { + try { + if (data.type === ContentType.blob && data.blob) { + return from(data.blob.arrayBuffer()).pipe( + switchMap((arrayBuffer) => { + return nucliaConnector.upload(item.originalId, item.title, { + buffer: arrayBuffer, + metadata: { + ...item.metadata, + labels: sync.labels, + groups: data.extra?.groups, + sourceId: sync.id, + }, + mimeType: item.mimeType, + }); + }), + map((res) => ({ ...res, action: 'upload' })), + ); + } else if (data.type === ContentType.text && data.text) { + return nucliaConnector + .upload(item.originalId, item.title, { + text: data.text, + metadata: { ...item.metadata, labels: sync.labels, groups: data.extra?.groups, sourceId: sync.id }, + }) + .pipe(map((res) => ({ ...res, action: 'upload' }))); + } else if (data.type === ContentType.link && data.link) { + const mimeType = + item.mimeType !== TO_BE_CHECKED ? of(item.mimeType || 'text/html') : this.checkMimetype(data.link.uri); + return mimeType.pipe( + switchMap((type) => + nucliaConnector.uploadLink( + item.originalId, + item.title, + data.link, + type, + { ...item.metadata, labels: sync.labels, groups: data.extra?.groups, sourceId: sync.id }, + { + headers: sync.connector.parameters.headers, + cookies: sync.connector.parameters.cookies, + localstorage: sync.connector.parameters.localstorage, + }, + ), + ), + map(() => ({ success: true, message: '', action: 'upload' })), + ); + } else { + return of({ success: false, message: '', action: 'upload' }); + } + } catch (err) { + return of({ success: false, message: `${err}`, action: 'upload' }); + } + }), + ); + } + } + private checkMimetype(url: string): Observable { try { return from(fetch(url, { method: 'HEAD' })).pipe( diff --git a/server/src/logic/sync/presentation/routes.ts b/server/src/logic/sync/presentation/routes.ts index c9dff4e..ef40465 100644 --- a/server/src/logic/sync/presentation/routes.ts +++ b/server/src/logic/sync/presentation/routes.ts @@ -53,6 +53,9 @@ export class SyncFileSystemRoutes { id: sync.id, title: sync.title, connector: sync.connector.name, + lastSyncGMT: sync.lastSyncGMT, + disabled: sync.disabled, + totalSyncedResources: sync.originalIds?.length || 0, })); res.status(200).send(kbSyncs); } catch (error) { @@ -87,6 +90,8 @@ export class SyncFileSystemRoutes { try { await this.checkAuth(id, req.headers.token as string, syncRepository); const data = await new GetSync(syncRepository).execute(id); + // remove originalIds from response (too big) + delete data.originalIds; res.status(200).send(data); } catch (error) { this.handleError(res, error);