Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ericbrehault/sc 9824/manage deletions in sync agent #55

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nuclia-sync-agent-app",
"version": "1.2.21",
"version": "1.3.0",
"description": "This is a Nuclia Sync Agent App",
"main": "build/index.js",
"scripts": {
Expand Down
4 changes: 4 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 1.3.0 (2024-04-25)

- Support deletion syncing (when a file is deleted in a source, the corresponding resource is deleted in the Nuclia Knowledge Box)

# 1.2.21 (2024-04-18)

- Do not override title on existing resource
Expand Down
3 changes: 2 additions & 1 deletion server/src/logic/connector/domain/connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const SyncItemValidator = z.object({
mimeType: z.string().optional(),
isFolder: z.boolean().optional(),
parents: z.array(z.string()).optional(),
deleted: z.boolean().optional(),
});
export type SyncItem = z.infer<typeof SyncItemValidator>;

Expand All @@ -55,7 +56,7 @@ export interface IConnector {
getParameters(): ConnectorParameters;
getFolders(query?: string): Observable<SearchResults>;
getFilesFromFolders(folders: SyncItem[]): Observable<SearchResults>;
getLastModified(since: string, folders?: SyncItem[]): Observable<SearchResults>;
getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable<SearchResults>;
// we cannot use the TextField from the SDK because we want to keep connectors independant
download(resource: SyncItem): Observable<Blob | { body: string; format?: 'PLAIN' | 'MARKDOWN' | 'HTML' } | undefined>;
getLink(resource: SyncItem): Observable<Link>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,32 @@ export class ConfluenceImpl implements IConnector {
);
}

getLastModified(since: string, folders?: SyncItem[]): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable<SearchResults> {
if (!folders || folders.length === 0) {
return of({
items: [],
});
} else {
return forkJoin((folders || []).map((folder) => this._getFiles('', false, folder.originalId, since))).pipe(
const newFiles = forkJoin(
(folders || []).map((folder) => this._getFiles('', false, folder.originalId, since)),
).pipe(
map((results) => ({ items: results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]) })),
);
const deletedFiles = forkJoin(
(folders || []).map((folder) => this._getFiles('', false, folder.originalId, since, true)),
).pipe(
map((results) => ({ items: results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]) })),
);
return forkJoin([newFiles, deletedFiles]).pipe(
map(([newFiles, deletedFiles]) => {
const deleted = deletedFiles.items
.filter((item) => existings?.includes(item.originalId))
.map((item) => ({ ...item, deleted: true }));
return {
items: [...newFiles.items, ...deleted],
};
}),
);
}
}

Expand All @@ -77,6 +94,7 @@ export class ConfluenceImpl implements IConnector {
loadFolders = false,
folder = '',
lastModified?: string,
deleted = false,
start?: number,
previous?: SearchResults,
): Observable<SearchResults> {
Expand Down Expand Up @@ -115,6 +133,9 @@ export class ConfluenceImpl implements IConnector {
} else {
endpoint += '/rest/api/content?';
}
if (deleted) {
endpoint += '&status=trashed';
}
}
return from(
fetch(`${endpoint}&limit=${BATCH_SIZE}&start=${start || 0}`, {
Expand All @@ -132,7 +153,7 @@ export class ConfluenceImpl implements IConnector {
const items = [...(previous?.items || []), ...newItems];
const next = (start || 0) + BATCH_SIZE;
return result._links.next
? this._getFiles(query, loadFolders, folder, lastModified, next, { items })
? this._getFiles(query, loadFolders, folder, lastModified, deleted, next, { items })
: of({ items });
}),
);
Expand All @@ -141,7 +162,8 @@ export class ConfluenceImpl implements IConnector {
/* eslint-disable @typescript-eslint/no-explicit-any */
private mapResults(raw: any, isFolder = false): SyncItem {
const isAttachment = raw.type === 'attachment';
const itemOriginalId = isAttachment ? raw._links?.webui?.split('pageId=')[1]?.split('&')[0] || '' : raw.id;
const pageId = raw._links?.webui?.split('pageId=')[1]?.split('&')[0] || '';
const itemOriginalId = isAttachment ? `${pageId}|${raw.id}` || '' : raw.id;
return {
title: (isFolder ? raw.name : raw.title) || '',
originalId: (isFolder ? raw.key : itemOriginalId) || '',
Expand All @@ -157,8 +179,9 @@ export class ConfluenceImpl implements IConnector {
): Observable<{ body: string; format?: 'PLAIN' | 'MARKDOWN' | 'HTML' } | Blob | undefined> {
try {
if (resource.metadata.type === 'attachment') {
const id = resource.originalId.split('|')[0];
return from(
fetch(`${this.params.url}/download/attachments/${resource.originalId}/${resource.title}`, {
fetch(`${this.params.url}/download/attachments/${id}/${resource.title}`, {
method: 'GET',
headers: {
Authorization: `Basic ${btoa(this.params.user + ':' + this.params.token)}`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@ export class DropboxImpl extends OAuthBaseConnector implements IConnector {
return true;
}

getLastModified(since: string, folders?: SyncItem[] | undefined): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[] | undefined, existings?: string[]): Observable<SearchResults> {
if ((folders ?? []).length === 0) {
return of({
items: [],
});
}
return forkJoin((folders || []).map((folder) => this._getFiles('', false, folder.originalId))).pipe(
map((results) => {
const items = results.reduce(
(acc, result) => acc.concat(result.items.filter((item) => item.modifiedGMT && item.modifiedGMT > since)),
[] as SyncItem[],
);
const items = results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]);
const currentIds = items.map((item) => item.originalId);
const newItems = items.filter((item) => item.modifiedGMT && item.modifiedGMT > since);
const toDelete = (existings || [])
.filter((id) => !currentIds?.includes(id))
.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true }));
return {
items,
items: [...newItems, ...toDelete],
};
}),
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Blob as FSBlob } from 'buffer';
import * as fs from 'fs';
import path from 'path';
import { forkJoin, from, map, Observable, of, switchMap } from 'rxjs';
import { forkJoin, from, map, Observable, of, switchMap, tap } from 'rxjs';
import { ConnectorParameters, FileStatus, IConnector, Link, SearchResults, SyncItem } from '../../domain/connector';
import { SourceConnectorDefinition } from '../factory';
import { lookup } from 'mime-types';
Expand Down Expand Up @@ -59,16 +59,18 @@ class FolderImpl implements IConnector {
);
}

getLastModified(since: string, folders?: SyncItem[]): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable<SearchResults> {
if ((folders ?? []).length === 0) {
return of({
items: [],
});
}
let currentIds: string[] = [];

return forkJoin(
(folders || []).map((folder) =>
this._getFiles(folder.originalId).pipe(
tap((result) => (currentIds = [...currentIds, ...result.items.map((item) => item.originalId)])),
switchMap((results) =>
from(this.getFilesModifiedSince(results.items, since)).pipe(
map((items) => ({ items, error: results.error })),
Expand All @@ -83,6 +85,10 @@ class FolderImpl implements IConnector {
.map((result) => result.error)
.filter((error) => !!error)
.join('. ');
if (existings && existings.length > 0) {
const toDelete = existings.filter((id) => !currentIds.includes(id));
items.push(...toDelete.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true })));
}
return {
items,
error: errors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector {
return true;
}

getLastModified(since: string, folders?: SyncItem[] | undefined): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[] | undefined, existings?: string[]): Observable<SearchResults> {
if ((folders ?? []).length === 0) {
return of({
items: [],
});
}
return forkJoin((folders || []).map((folder) => this._getFileItems('', folder.uuid))).pipe(
map((results) => {
const items = results.reduce(
(acc, result) => acc.concat(result.items.filter((item) => item.modifiedGMT && item.modifiedGMT > since)),
[] as SyncItem[],
);
const items = results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]);
const currentIds = items.map((item) => item.originalId);
const newItems = items.filter((item) => item.modifiedGMT && item.modifiedGMT > since);
const toDelete = (existings || [])
.filter((id) => !currentIds?.includes(id))
.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true }));
return {
items,
items: [...newItems, ...toDelete],
};
}),
);
Expand Down Expand Up @@ -237,7 +239,7 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector {
previous?: SearchResults,
): Observable<SearchResults> {
let path =
'https://www.googleapis.com/drive/v3/files?pageSize=50&fields=nextPageToken,files(id,name,mimeType,modifiedTime,parents)';
'https://www.googleapis.com/drive/v3/files?pageSize=50&fields=nextPageToken,files(id,name,mimeType,modifiedTime,createdTime,trashed,parents)';
const allDrives = '&corpora=allDrives&supportsAllDrives=true&includeItemsFromAllDrives=true';
path += allDrives;
if (query) {
Expand Down Expand Up @@ -272,7 +274,10 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector {
}
} else {
const nextPage = res['nextPageToken'];
const items = (res.files || []).map((item: unknown) => this.mapToSyncItem(item));
const items = (res.files || [])
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.filter((item: any) => !item.trashed)
.map((item: unknown) => this.mapToSyncItem(item));
const results = {
items: [...(previous?.items || []), ...items],
nextPage,
Expand All @@ -290,7 +295,7 @@ export class GDriveImpl extends OAuthBaseConnector implements IConnector {
uuid: item.id,
title: item.name,
originalId: item.id,
modifiedGMT: item.modifiedTime,
modifiedGMT: item.modifiedTime > item.createdTime ? item.modifiedTime : item.createdTime,
parents: item.parents,
mimeType: needsPdfConversion ? 'application/pdf' : item.mimeType,
metadata: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@ export class OneDriveImpl extends OAuthBaseConnector implements IConnector {
return true;
}

getLastModified(since: string, folders?: SyncItem[] | undefined): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[] | undefined, existings?: string[]): Observable<SearchResults> {
if ((folders ?? []).length === 0) {
return of({
items: [],
});
}
return forkJoin((folders || []).map((folder) => this._getItems('', folder.uuid))).pipe(
map((results) => {
const items = results.reduce(
(acc, result) => acc.concat(result.items.filter((item) => item.modifiedGMT && item.modifiedGMT > since)),
[] as SyncItem[],
);
const items = results.reduce((acc, result) => acc.concat(result.items), [] as SyncItem[]);
const currentIds = items.map((item) => item.originalId);
const newItems = items.filter((item) => item.modifiedGMT && item.modifiedGMT > since);
const toDelete = (existings || [])
.filter((id) => !currentIds?.includes(id))
.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true }));
return {
items,
items: [...newItems, ...toDelete],
};
}),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class RSSImpl implements IConnector {
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
getLastModified(since: string, folders?: SyncItem[]): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable<SearchResults> {
// we do not manage deletions in this connectors as RSS will only push recent items
return this._getFiles().pipe(
map((searchResults) => ({
...searchResults,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,22 @@ class SitemapImpl implements IConnector {
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
getLastModified(since: string, folders?: SyncItem[]): Observable<SearchResults> {
getLastModified(since: string, folders?: SyncItem[], existings?: string[]): Observable<SearchResults> {
return this._getFiles().pipe(
map((searchResults) => ({
...searchResults,
items: searchResults.items.filter(
map((searchResults) => {
const currentIds = searchResults.items.map((item) => item.originalId);
const toSync = searchResults.items.filter(
(item) => !item.metadata['lastModified'] || item.metadata['lastModified'] > since,
),
})),
);
const toDelete = existings?.filter((id) => !currentIds.includes(id)) ?? [];
return {
...searchResults,
items: [
...toSync,
...toDelete.map((id) => ({ uuid: id, originalId: id, title: '', metadata: {}, deleted: true })),
],
};
}),
);
}

Expand Down
1 change: 1 addition & 0 deletions server/src/logic/sync/domain/dto/create-sync.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class CreateSyncDto {
filters: this.options.filters,
disabled: this.options.disabled,
syncSecurityGroups: this.options.syncSecurityGroups,
originalIds: this.options.originalIds,
};

if (this.options.labels) returnObj.labels = this.options.labels;
Expand Down
1 change: 1 addition & 0 deletions server/src/logic/sync/domain/dto/update-sync.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export class UpdateSyncDto {
if (this.options.filters) returnObj.filters = this.options.filters;
if (this.options.disabled !== undefined) returnObj.disabled = this.options.disabled;
if (this.options.syncSecurityGroups !== undefined) returnObj.syncSecurityGroups = this.options.syncSecurityGroups;
if (this.options.originalIds !== undefined) returnObj.originalIds = this.options.originalIds;

return returnObj;
}
Expand Down
8 changes: 8 additions & 0 deletions server/src/logic/sync/domain/nuclia-cloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ export class NucliaCloud {
);
}

delete(originalId: string): Observable<void> {
const slug = sha256(originalId);
return this.getKb().pipe(switchMap((kb) => kb.getResourceFromData({ id: '', slug }).delete()));
}

private getKb(): Observable<WritableKnowledgeBox> {
if (this.kb) {
return of(this.kb);
Expand Down Expand Up @@ -237,6 +242,9 @@ export class NucliaCloud {
if (metadata?.groups) {
resource.security = { access_groups: metadata.groups };
}
if (metadata?.sourceId) {
resource.origin = { ...(resource.origin || {}), source_id: metadata.sourceId };
}
return resource;
}
}
Loading