Skip to content

Commit

Permalink
Merge pull request #66 from koopjs/b/10625-hub-feeds-outage
Browse files Browse the repository at this point in the history
handle stream error
  • Loading branch information
sansth1010 authored Jun 4, 2024
2 parents 65ada54 + 0ad2890 commit 2f09e7c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
28 changes: 27 additions & 1 deletion src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as mockSiteModel from './test-helpers/mock-site-model.json';
import { createMockKoopApp } from './test-helpers/create-mock-koop-app';
import { readableFromArray } from './test-helpers/stream-utils';
import { DcatUsError } from './dcat-us/dcat-us-error';
import { PassThrough } from 'stream';

function buildPluginAndApp(feedTemplate, feedTemplateTransforms) {
let Output;
Expand All @@ -21,9 +22,16 @@ function buildPluginAndApp(feedTemplate, feedTemplateTransforms) {
};

const app = createMockKoopApp();

app.get('/dcat', function (req, res, next) {
req.app.locals.feedTemplateTransforms = feedTemplateTransforms;
res.locals.feedTemplate = feedTemplate;
app.use((err, _req, res, _next) => {
res.status(err.status || 500)
res.send({
error: err.message
})
})
next();
}, plugin.serve.bind(plugin));

Expand Down Expand Up @@ -61,7 +69,6 @@ describe('Output Plugin', () => {
mockFetchSite = mocked(fetchSite);

mockFetchSite.mockResolvedValue(mockSiteModel);

[plugin, app] = buildPluginAndApp(dcatTemplate, {});
});

Expand Down Expand Up @@ -137,6 +144,25 @@ describe('Output Plugin', () => {
// TODO test stream error
});

it('returns error if stream emits an error', async () => {
const mockReadable = new PassThrough();

plugin.model.pullStream.mockResolvedValue(mockReadable);
const mockError = new Error('stream error')

setTimeout(() => {
mockReadable.emit('error', mockError)
}, 200)
await request(app)
.get('/dcat')
.set('host', siteHostName)
.expect('Content-Type', /application\/json/)
.expect(500)
.expect((res) => {
expect(res.body).toEqual({ error: 'stream error' });
});
});

it('returns 400 when searchRequest returns 400', async () => {
[plugin, app] = buildPluginAndApp({}, {});

Expand Down
8 changes: 5 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ export = class OutputDcatUs11 {
const { stream: dcatStream } = getDataStreamDcatUs11(feedTemplate, feedTemplateTransforms);

const datasetStream = await this.getDatasetStream(req);
datasetStream
.pipe(dcatStream)
.pipe(res);
datasetStream.on('error', (err) => {
if (req.next) {
req.next(err);
}
}).pipe(dcatStream).pipe(res);

} catch (err) {
res.status(err.statusCode).send(this.getErrorResponse(err));
Expand Down

0 comments on commit 2f09e7c

Please sign in to comment.