Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic document queue #1364

Open
wants to merge 5 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ import { getPinoTransport } from '@hyperdx/node-opentelemetry'
import { PRODUCTION } from '@magickml/config'

if (PRODUCTION) {
initLogger({
name: 'cloud-agent-worker',
transport: {
targets: [
getPinoTransport('info')
]
},
level: 'info',
})
initLogger({
name: 'cloud-agent-worker',
transport: {
targets: [getPinoTransport('info')]
},
level: 'info'
})
} else {
initLogger({ name: 'cloud-agent-worker' })
}
initLogger({ name: 'cloud-agent-worker' })
}
const logger = getLogger()

// log handle errors
Expand Down Expand Up @@ -69,7 +67,7 @@ const routes: Route[] = [...spells, ...apis, ...serverRoutes]
* form and multipart-json requests, and routes.
*/
async function init() {
await initApp()
await initApp('server')
await initAgentCommander()
// load plugins
await (async () => {
Expand Down
6 changes: 5 additions & 1 deletion packages/core/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import { authenticateApiKey } from './hooks/authenticateApiKey'
// Initialize the Feathers Koa app
export const app: Application = koa(feathers())

export type Environment = 'default' | 'server' | 'agent'

declare module './declarations' {
interface Configuration {
vectordb: PostgresVectorStoreCustom | any
Expand All @@ -51,10 +53,11 @@ declare module './declarations' {
redis: Redis
isAgent?: boolean
agentCommander: AgentCommander
environment: Environment
}
}

export async function initApp() {
export async function initApp(environment: Environment = 'default') {
const logger = getLogger()
logger.info('Initializing feathers app...')
globalsManager.register('feathers', app)
Expand All @@ -70,6 +73,7 @@ export async function initApp() {
max: paginateMax,
}
app.set('paginate', paginate)
app.set('environment', environment)

// Koa middleware
app.use(cors({ origin: '*' }))
Expand Down
71 changes: 65 additions & 6 deletions packages/core/server/src/services/documents/documents.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
// This module provides a document service for managing documents with embedding and pagination support
// For more information about this file see https://dove.feathersjs.com/guides/cli/service.class.html#database-services

import AWS from 'aws-sdk'
import * as BullMQ from 'bullmq'
import {
AWS_BUCKET_NAME,
AWS_ACCESS_KEY,
AWS_REGION,
AWS_SECRET_KEY,
AWS_BUCKET_ENDPOINT,
} from '@magickml/config'

import type { Params } from '@feathersjs/feathers'
import type { KnexAdapterOptions, KnexAdapterParams } from '@feathersjs/knex'
import { KnexService } from '@feathersjs/knex'
Expand All @@ -23,6 +33,24 @@ export type DocumentParams = KnexAdapterParams<DocumentQuery>

const embeddingSize = 1000

export const DOCUMENT_QUEUE = 'document:process'

type Element = {
date: string
type: string
projectId: string
id: string
metadata: {
fileName: string
fileType: string
}
embeddings: {
documentId: string
index: string
content: string
}[]
}

/**
* DocumentService class
* Implements the custom document service extending the base Knex service
Expand All @@ -32,6 +60,34 @@ const embeddingSize = 1000
export class DocumentService<
ServiceParams extends Params = DocumentParams
> extends KnexService<Document, DocumentData, ServiceParams, DocumentPatch> {
s3: AWS.S3
uploader: any
bucketName: string = AWS_BUCKET_NAME
documentQueue: BullMQ.Queue = new BullMQ.Queue(DOCUMENT_QUEUE)

constructor(args) {
super(args)
// Set up AWS S3
AWS.config.update({
accessKeyId: AWS_ACCESS_KEY,
secretAccessKey: AWS_SECRET_KEY,
region: AWS_REGION,
})
this.s3 = new AWS.S3({
endpoint: AWS_BUCKET_ENDPOINT,
s3ForcePathStyle: true,
})
}

// Not the best typing here
async create(data: DocumentData[] | any): Promise<any> {
console.log('Adding document to queue', data)
const job = this.documentQueue.add('create', data)
return {
job,
}
}

/**
* Creates a new document
* @param data {DocumentData} The document data to create
Expand All @@ -40,14 +96,15 @@ export class DocumentService<

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
async create(data: DocumentData): Promise<any> {
async createWorker(data: DocumentData, job: BullMQ.Job): Promise<any> {
console.log('Work received in create worker')
const embeddingdb = app.get('embeddingdb')
const { modelName, secrets, files, ...docData } = data as DocumentData & {
modelName: string
secrets: string
}

let elements = [] as any[]
let elements = [] as Element[]
if (docData.content) {
elements = [
...elements,
Expand All @@ -68,11 +125,14 @@ export class DocumentService<
}

for (const element of elements) {
// report the progress of the jpb

const { embeddings, ...document } = element
embeddings //remove linting error lmao
await embeddingdb.from('documents').insert(document)
//create embeddings
for (const embedding of element.embeddings) {
for (let i = 0; i < element.embeddings.length; i++) {
const embedding = element.embeddings[i]
if (!embedding.content || embedding.content?.length === 0) continue
if (data.hasOwnProperty('secrets')) {
await embeddingdb.fromString(embedding.content, embedding, {
Expand All @@ -83,6 +143,7 @@ export class DocumentService<
} else {
await embeddingdb.from('embeddings').insert(embedding)
}
await job.updateProgress((i / element.embeddings.length) * 100)
}
}

Expand Down Expand Up @@ -126,8 +187,6 @@ export class DocumentService<
) {
const param = params.query

console.log('param!!!!!!!', param)

const querys = await db('documents')
.joinRaw(
'inner join embeddings on documents.id = embeddings."documentId" and embeddings.index = 0'
Expand Down Expand Up @@ -292,7 +351,7 @@ const getUnstructuredData = async (files, docData) => {
return elements
}

const createElement = (element, docData) => {
const createElement = (element, docData): Element => {
const documentId = uuidv4()
const embeddings: any[] = []
for (const i in element) {
Expand Down
29 changes: 27 additions & 2 deletions packages/core/server/src/services/documents/documents.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// DOCUMENTED
import * as BullMQ from 'bullmq'
import { hooks as schemaHooks } from '@feathersjs/schema'
import pgvector from 'pgvector/pg'
import { Application, HookContext } from '../../declarations'
import { DocumentService, getOptions } from './documents.class'
import { DOCUMENT_QUEUE, DocumentService, getOptions } from './documents.class'
import {
documentPatchResolver,
documentPatchValidator,
documentQueryResolver,
documentQueryValidator,
} from './documents.schema'
import { event } from '../events/events'

Check warning on line 13 in packages/core/server/src/services/documents/documents.ts

View workflow job for this annotation

GitHub Actions / ESLint

'event' is defined but never used

Check warning on line 13 in packages/core/server/src/services/documents/documents.ts

View workflow job for this annotation

GitHub Actions / ESLint

'event' is defined but never used

// Array with 1536 elements containing 0
const nullArray = new Array(1536).fill(0)
Expand All @@ -24,9 +26,32 @@
// Register our service on the Feathers application
app.use('documents', new DocumentService(getOptions(app)), {
methods: ['find', 'get', 'create', 'patch', 'remove'],
events: [],
events: ['finished', 'progress'],
})

if (app.get('environment') === 'server') {
// Set up document queue to process document jobs
const worker = new BullMQ.Worker(DOCUMENT_QUEUE, async job => {

Check warning on line 34 in packages/core/server/src/services/documents/documents.ts

View workflow job for this annotation

GitHub Actions / ESLint

'worker' is assigned a value but never used

Check warning on line 34 in packages/core/server/src/services/documents/documents.ts

View workflow job for this annotation

GitHub Actions / ESLint

'worker' is assigned a value but never used
const { data } = await app
.service('documents')
.createWorker(job.data, job)
return data
})

// event queues to send events to the client
const eventQueue = new BullMQ.QueueEvents(DOCUMENT_QUEUE)

eventQueue.on('completed', (jobId, returnvalue) => {
console.log('DOCUMENT COMPLETED', jobId, returnvalue)
app.service('documents').emit('finished', { jobId, returnvalue })
})

eventQueue.on('progress', (jobId, progress) => {
console.log('progress', jobId, progress)
app.service('documents').emit('progress', { jobId, progress })
})
}

// Initialize hooks
app.service('documents').hooks({
around: {
Expand Down
Loading