Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimenting with testing incremental ingestion #97

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
postgres:
image: "postgres:12"
image: "postgres:14"
container_name: postgres
restart: always
environment:
Expand All @@ -14,4 +14,4 @@ services:
- data:/var/lib/postgresql/data

volumes:
data:
data:
26 changes: 13 additions & 13 deletions plugins/incremental-ingestion-backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ This approach has the following benefits,
```ts
const builder = CatalogBuilder.create(env);
// incremental builder receives builder because it'll register
// incremental entity providers with the builder
// incremental entity providers with the builder
const incrementalBuilder = IncrementalCatalogBuilder.create(env, builder);
```
3. Last step, add `await incrementBuilder.build()` after `await builder.build()` to ensure that all `CatalogBuider` migration run before running `incrementBuilder.build()` migrations.
3. Last step, add `await incrementalBuilder.build()` after `await builder.build()` to ensure that all `CatalogBuider` migration run before running `incrementalBuilder.build()` migrations.
```ts
const { processingEngine, router } = await builder.build();

// this has to run after `await builder.build()` so ensure that catalog migrations are completed
// before incremental builder migrations are executed
// this has to run after `await builder.build()` so ensure that catalog migrations are completed
// before incremental builder migrations are executed
await incrementalBuilder.build();
```

Expand All @@ -67,18 +67,18 @@ import { PluginEnvironment } from '../types';
export default async function createPlugin(
env: PluginEnvironment,
): Promise<Router> {

const builder = CatalogBuilder.create(env);
// incremental builder receives builder because it'll register
// incremental entity providers with the builder
// incremental entity providers with the builder
const incrementalBuilder = IncrementalCatalogBuilder.create(env, builder);

builder.addProcessor(new ScaffolderEntitiesProcessor());

const { processingEngine, router } = await builder.build();

// this has to run after `await builder.build()` so ensure that catalog migrations are completed
// before incremental builder migrations are executed
// this has to run after `await builder.build()` so ensure that catalog migrations are completed
// before incremental builder migrations are executed
await incrementalBuilder.build();

await processingEngine.start();
Expand Down Expand Up @@ -186,9 +186,9 @@ If you need to pass a token to your API, then you can create a constructor that

```ts
export class MyIncrementalEntityProvider implements IncrementalEntityProvider<Cursor, Context> {

token: string;

construtor(token: string) {
this.token = token;
}
Expand All @@ -211,9 +211,9 @@ The last step is to implement the actual `next` method that will accept the curs

```ts
export class MyIncrementalEntityProvider implements IncrementalEntityProvider<Cursor, Context> {

token: string;

construtor(token: string) {
this.token = token;
}
Expand Down Expand Up @@ -276,7 +276,7 @@ export class MyIncrementalEntityProvider implements IncrementalEntityProvider<Cu
}
```

Now that you have your new Incremental Entity Provider, we can connect it to the catalog.
Now that you have your new Incremental Entity Provider, we can connect it to the catalog.

## Adding an Incremental Entity Provider to the catalog

Expand Down
16 changes: 12 additions & 4 deletions plugins/incremental-ingestion-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,23 @@
"yn": "^4.0.0"
},
"devDependencies": {
"@backstage/backend-tasks": "^0.3.0",
"@backstage/cli": "^0.17.0",
"@types/supertest": "^2.0.8",
"@backstage/plugin-permission-node": "^0.6.0",
"@effection/jest": "^2.0.2",
"@types/luxon": "^2.0.4",
"@types/supertest": "^2.0.8",
"@types/uuid": "^8.3.4",
"supertest": "^4.0.2",
"msw": "^0.35.0"
"effection": "^2.0.4",
"msw": "^0.35.0",
"supertest": "^4.0.2"
},
"files": [
"dist",
"migrations"
]
],

"jest": {
"testTimeout": 15000
}
}
3 changes: 2 additions & 1 deletion plugins/incremental-ingestion-backend/src/catalog-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export class IncrementalCatalogBuilder {
provider: IncrementalEntityProvider<T, C>,
options: IncrementalEntityProviderOptions,
) {
// TODO Check if build was called and throw error
const { burstInterval, burstLength, restLength } = options;
const { logger: catalogLogger, database, scheduler } = this.env;
const ready = this.ready;
Expand All @@ -57,4 +58,4 @@ export class IncrementalCatalogBuilder {
},
});
}
}
}
3 changes: 2 additions & 1 deletion plugins/incremental-ingestion-backend/src/iteration-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export async function createIterationDB(options: IterationDBOptions): Promise<It
}
case 'ingest':
try {
// TODO should update current action to avoid race condition?
const done = await ingestOneBurst(ingestionId, signal, tx);
if (done) {
logger.info(`Ingestion is complete. Rest for ${restLength.toHuman()}`);
Expand Down Expand Up @@ -275,7 +276,7 @@ export async function createIterationDB(options: IterationDBOptions): Promise<It
[provider.getProviderName()],
)
.whereNotIn(
'ref',
'entity_ref',
tx('ingestion.ingestion_marks')
.join(
'ingestion.ingestion_mark_entities',
Expand Down
79 changes: 79 additions & 0 deletions plugins/incremental-ingestion-backend/src/tests/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
export const backstageConfig = {
backend: {
listen: { port: 8800 },
database: {
prefix: 'graphql_tests_',
client: 'pg',
connection: {
host: 'localhost',
port: '5432',
user: 'postgres',
password: 'postgres',
},
},
baseUrl: 'http://localhost:8800',
},
catalog: {
rules: [
{
allow: [
'Component',
'System',
'API',
'Group',
'User',
'Resource',
'Location'
],
},
],
locations: [
// {
// type: 'url',
// target:
// 'https://github.com/thefrontside/backstage/blob/main/catalog-info.yaml',
// },
// {
// type: 'url',
// target:
// 'https://github.com/backstage/backstage/blob/master/packages/catalog-model/examples/all-components.yaml',
// },
// {
// type: 'url',
// target:
// 'https://github.com/backstage/backstage/blob/master/packages/catalog-model/examples/all-systems.yaml',
// },
// {
// type: 'url',
// target:
// 'https://github.com/backstage/backstage/blob/master/packages/catalog-model/examples/all-apis.yaml',
// },
// {
// type: 'url',
// target:
// 'https://github.com/backstage/backstage/blob/master/packages/catalog-model/examples/all-resources.yaml',
// },
// {
// type: 'url',
// target:
// 'https://github.com/backstage/backstage/blob/master/packages/catalog-model/examples/acme/org.yaml',
// },
// {
// type: 'url',
// target:
// 'https://github.com/backstage/software-templates/blob/main/scaffolder-templates/react-ssr-template/template.yaml',
// rules: [{ allow: ['Template'] }]
// },{
// type: 'url',
// target:
// 'https://github.com/backstage/software-templates/blob/main/scaffolder-templates/springboot-grpc-template/template.yaml',
// rules: [{ allow: ['Template'] }]
// },{
// type: 'url',
// target:
// 'https://github.com/backstage/software-templates/blob/main/scaffolder-templates/docs-template/template.yaml',
// rules: [{ allow: ['Template'] }]
// },
],
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* eslint-disable func-names */
import { describe, beforeAll, it } from '@effection/jest';
import { TaskScheduler } from '@backstage/backend-tasks';
import { ServerPermissionClient } from '@backstage/plugin-permission-node';
import { CatalogClient } from '@backstage/catalog-client';
import { ConfigReader } from '@backstage/config';
import { backstageConfig } from './config';
import { DatabaseManager, ServerTokenManager, SingleHostDiscovery, UrlReaders } from '@backstage/backend-common';
import { ClientFactory, useCatalog, useIncrementalBuilder, useLogger } from './setupTests';
import { PluginEnvironment } from '../types';
import { Operation } from 'effection';

describe('incrementally ingest entities', () => {
const factory = new ClientFactory();
let catalog: CatalogClient;
let stop: () => Promise<void>;
let rebuild: () => Operation<void>;

beforeAll(function* () {
const logger = yield useLogger()
const config = new ConfigReader(backstageConfig);
const reader = UrlReaders.default({ logger, config });
const databaseManager = DatabaseManager.fromConfig(config);
const discovery = SingleHostDiscovery.fromConfig(config);
const tokenManager = ServerTokenManager.noop();
const permissions = ServerPermissionClient.fromConfig(config, { discovery, tokenManager });
const scheduler = TaskScheduler.fromConfig(config).forPlugin('catalog');
const env: PluginEnvironment = {
logger,
database: databaseManager.forPlugin('catalog'),
config,
reader,
permissions,
scheduler
};
const { builder, incrementalBuilder } = yield useIncrementalBuilder({ ...env, factory })
;({ stop, rebuild } = yield useCatalog({ ...env, builder, incrementalBuilder, discovery }));
catalog = new CatalogClient({ discoveryApi: discovery });
});


beforeEach(function* () {
yield stop();
yield rebuild();
})

// TODO example of scenario
// get 5 entities => get an error => start again => get 4 entities instead of 5

describe('successfully ingest data', function* () {
beforeAll(function* () {
yield factory.createClient([
{ id: 1, data: ['a', 'b', 'c', 'd', 'e'] },
]);
})

it.eventually('test', function* () {
const { items } = yield catalog.getEntities()

expect(items).toHaveLength(5)
})
});

jest.setTimeout(15000);
it('successfuly ingest data', function* () {
yield factory.createClient([
{ id: 1, data: ['a', 'b', 'c', 'd', 'e'], delay: 10 },
]);
yield new Promise(resolve => setTimeout(resolve, 1000));
console.dir(yield catalog.getEntities(), { depth: 10 });
})
it('successfuly ingest data 2', function* () {
yield factory.createClient([
{ id: 1, data: ['1', '2', '3', '4', '5'], delay: 10 },
]);
yield new Promise(resolve => setTimeout(resolve, 1000));
console.dir(yield catalog.getEntities(), { depth: 10 });
})
})
Loading