diff --git a/packages/api/endpoints/reconciliation-reports.js b/packages/api/endpoints/reconciliation-reports.js index c878e55ad14..4da0c5fa3b2 100644 --- a/packages/api/endpoints/reconciliation-reports.js +++ b/packages/api/endpoints/reconciliation-reports.js @@ -173,10 +173,12 @@ async function deleteReport(req, res) { * Creates a new report * * @param {Object} req - express request object + * @param {RecReportParams} req.body * @param {Object} res - express response object * @returns {Promise} the promise of express response object */ async function createReport(req, res) { + /** @type NormalizedRecReportParams */ let validatedInput; try { validatedInput = normalizeEvent(req.body); diff --git a/packages/api/lambdas/create-reconciliation-report.js b/packages/api/lambdas/create-reconciliation-report.js index f72baba8285..209424e929e 100644 --- a/packages/api/lambdas/create-reconciliation-report.js +++ b/packages/api/lambdas/create-reconciliation-report.js @@ -1,3 +1,5 @@ +//@ts-check + 'use strict'; const cloneDeep = require('lodash/cloneDeep'); @@ -20,10 +22,10 @@ const { getFilesAndGranuleInfoQuery, getKnexClient, QuerySearchClient, + getUniqueCollectionsByGranuleFilter, + getGranulesByApiPropertiesQuery, + translatePostgresFileToApiFile, } = require('@cumulus/db'); -const { ESCollectionGranuleQueue } = require('@cumulus/es-client/esCollectionGranuleQueue'); -const Collection = require('@cumulus/es-client/collections'); -const { ESSearchQueue } = require('@cumulus/es-client/esSearchQueue'); const Logger = require('@cumulus/logger'); const { getEsClient } = require('@cumulus/es-client/search'); const { indexReconciliationReport } = require('@cumulus/es-client/indexer'); @@ -32,14 +34,12 @@ const { ReconciliationReportPgModel, translatePostgresReconReportToApiReconReport, } = require('@cumulus/db'); -const { createInternalReconciliationReport } = require('./internal-reconciliation-report'); const { createGranuleInventoryReport } = require('./reports/granule-inventory-report'); const { createOrcaBackupReconciliationReport } = require('./reports/orca-backup-reconciliation-report'); const { errorify, filenamify } = require('../lib/utils'); const { cmrGranuleSearchParams, - convertToESCollectionSearchParams, - convertToESGranuleSearchParams, + convertToDBGranuleSearchParams, initialReportHeader, } = require('../lib/reconciliationReport'); @@ -47,6 +47,60 @@ const log = new Logger({ sender: '@api/lambdas/create-reconciliation-report' }); const isDataBucket = (bucketConfig) => ['private', 'public', 'protected'].includes(bucketConfig.type); +// Typescript annotations +/** + * @typedef {typeof process.env } ProcessEnv + * @typedef {import('knex').Knex} Knex + * @typedef {import('@cumulus/es-client/search').EsClient} EsClient + * @typedef {import('../lib/types').NormalizedRecReportParams } NormalizedRecReportParams + * @typedef {import('@cumulus/cmr-client/CMR').CMRConstructorParams} CMRSettings + */ + +/** + * @typedef {Object} Env + * @property {string} [CONCURRENCY] - The concurrency level for processing. + * @property {string} [ES_INDEX] - The Elasticsearch index. + * @property {string} [AWS_REGION] - The AWS region. + * @property {string} [AWS_ACCESS_KEY_ID] - The AWS access key ID. + * @property {string} [AWS_SECRET_ACCESS_KEY] - The AWS secret access key. + * @property {string} [AWS_SESSION_TOKEN] - The AWS session token. + * @property {string} [NODE_ENV] - The Node.js environment (e.g., 'development', 'production'). + * @property {string} [DATABASE_URL] - The database connection URL. + * @property {string} [key] string - Any other environment variable as a string. + */ + +/** + * @typedef {Object} CMRCollectionItem + * @property {Object} umm - The UMM (Unified Metadata Model) object for the granule. + * @property {string} umm.ShortName - The short name of the collection. + * @property {string} umm.Version - The version of the collection. + * @property {Array} umm.RelatedUrls - The related URLs for the granule. + */ + +/** + * @typedef {Object} CMRItem + * @property {Object} umm - The UMM (Unified Metadata Model) object for the granule. + * @property {string} umm.GranuleUR - The unique identifier for the granule in CMR. + * @property {Object} umm.CollectionReference - The collection reference object. + * @property {string} umm.CollectionReference.ShortName - The short name of the collection. + * @property {string} umm.CollectionReference.Version - The version of the collection. + * @property {Array} umm.RelatedUrls - The related URLs for the granule. + */ + +/** + * @typedef {Object} DbItem + * @property {string} granule_id - The unique name for the granule (per collection) + * @property {number} cumulus_id - The unique identifier for the granule record in the database. + * @property {Date} updated_at - The last updated timestamp for the granule in the database. + */ + +/** + * @typedef {Object} GranulesReport + * @property {number} okCount - The count of OK granules. + * @property {Array<{GranuleUR: string, ShortName: string, Version: string}>} onlyInCmr + * - The list of granules only in Cumulus. + * @property {Array<{granuleId: string, collectionId: string}>} onlyInCumulus + */ /** * * @param {string} reportType - reconciliation report type @@ -102,41 +156,32 @@ function isOneWayGranuleReport(reportParams) { } /** - * Checks to see if the searchParams have any value that would require a - * filtered search in ES - * @param {Object} searchParams - * @returns {boolean} returns true if searchParams contain a key that causes filtering to occur. - */ -function shouldAggregateGranulesForCollections(searchParams) { - return [ - 'updatedAt__from', - 'updatedAt__to', - 'granuleId__in', - 'provider__in', - ].some((e) => !!searchParams[e]); -} - -/** - * fetch CMR collections and filter the returned UMM CMR collections by the desired collectionIds + * Fetches collections from the CMR (Common Metadata Repository) and returns their IDs. * - * @param {Object} recReportParams - input report params - * @param {Array} recReportParams.collectionIds - array of collectionIds to keep - * @returns {Array} filtered list of collectionIds returned from CMR + * @param {NormalizedRecReportParams} recReportParams - The parameters for the function. + * @returns {Promise} A promise that resolves to an array of collection IDs from the CMR. + * + * @example + * await fetchCMRCollections({ collectionIds: ['COLLECTION_1', 'COLLECTION_2'] }); */ async function fetchCMRCollections({ collectionIds }) { const cmrSettings = await getCmrSettings(); - const cmrCollectionsIterator = new CMRSearchConceptQueue({ - cmrSettings, - type: 'collections', - format: 'umm_json', - }); + const cmrCollectionsIterator = /** @type {CMRSearchConceptQueue} */( + new CMRSearchConceptQueue({ + cmrSettings, + type: 'collections', + format: 'umm_json', + })); const allCmrCollectionIds = []; let nextCmrItem = await cmrCollectionsIterator.shift(); while (nextCmrItem) { - allCmrCollectionIds - .push(constructCollectionId(nextCmrItem.umm.ShortName, nextCmrItem.umm.Version)); - nextCmrItem = await cmrCollectionsIterator.shift(); // eslint-disable-line no-await-in-loop + allCmrCollectionIds.push( + constructCollectionId(nextCmrItem.umm.ShortName, nextCmrItem.umm.Version) + ); + nextCmrItem + // eslint-disable-next-line no-await-in-loop + = /** @type {CMRCollectionItem | null} */ (await cmrCollectionsIterator.shift()); } const cmrCollectionIds = allCmrCollectionIds.sort(); @@ -146,31 +191,46 @@ async function fetchCMRCollections({ collectionIds }) { } /** - * Fetch collections in Elasticsearch. - * @param {Object} recReportParams - input report params. - * @returns {Promise} - list of collectionIds that match input paramaters + * Fetches collections from the database based on the provided parameters. + * + * @param {NormalizedRecReportParams} recReportParams - The reconciliation report parameters. + * @param {Knex} knex - The Knex.js database connection. + * @returns {Promise} A promise that resolves to an array of collection IDs. */ -async function fetchESCollections(recReportParams) { - const esCollectionSearchParams = convertToESCollectionSearchParams(recReportParams); - const esGranuleSearchParams = convertToESGranuleSearchParams(recReportParams); - let esCollectionIds; - // [MHS, 09/02/2020] We are doing these two because we can't use - // aggregations on scrolls yet until we update elasticsearch version. - if (shouldAggregateGranulesForCollections(esGranuleSearchParams)) { - // Build an ESCollection and call the aggregateGranuleCollections to - // get list of collection ids that have granules that have been updated - const esCollection = new Collection({ queryStringParameters: esGranuleSearchParams }, 'collection', process.env.ES_INDEX); - const esCollectionItems = await esCollection.aggregateGranuleCollections(); - esCollectionIds = esCollectionItems.sort(); - } else { - // return all ES collections - const esCollection = new ESSearchQueue(esCollectionSearchParams, 'collection', process.env.ES_INDEX); - const esCollectionItems = await esCollection.empty(); - esCollectionIds = esCollectionItems.map( - (item) => constructCollectionId(item.name, item.version) - ).sort(); +async function fetchDbCollections(recReportParams, knex) { + const { + collectionIds, + granuleIds, + providers, + startTimestamp, + endTimestamp, + } = recReportParams; + if (providers || granuleIds || startTimestamp || endTimestamp) { + const filteredDbCollections = await getUniqueCollectionsByGranuleFilter({ + knex: knex, + ...recReportParams, + }); + return filteredDbCollections.map((collection) => + constructCollectionId(collection.name, collection.version)); + } + const query = knex('collections') + .select('cumulus_id', 'name', 'version'); + if (startTimestamp) { + query.where('updated_at', '>=', startTimestamp); + } + if (endTimestamp) { + query.where('updated_at', '<=', endTimestamp); } - return esCollectionIds; + if (collectionIds) { + collectionIds.forEach((collectionId) => { + const { name, version } = deconstructCollectionId(collectionId); + query.orWhere({ name, version }); + }); + } + query.orderBy(['name', 'version']); + const dbCollections = await query; + return dbCollections.map((collection) => + constructCollectionId(collection.name, collection.version)); } /** @@ -193,7 +253,7 @@ async function createReconciliationReportForBucket(Bucket, recReportParams) { sortColumns: ['key'], granuleColumns: ['granule_id'], collectionIds: recReportParams.collectionIds, - providers: recReportParams.providers, + providers: recReportParams.provider, granuleIds: recReportParams.granuleIds, }); @@ -282,18 +342,20 @@ async function createReconciliationReportForBucket(Bucket, recReportParams) { /** * Compare the collection holdings in CMR with Cumulus * - * @param {Object} recReportParams - lambda's input filtering parameters to + * @param {NormalizedRecReportParams} recReportParams - lambda's input filtering parameters to * narrow limit of report. + * @param {Knex} knex - Database client for interacting with PostgreSQL database * @returns {Promise} an object with the okCollections, onlyInCumulus and * onlyInCmr */ -async function reconciliationReportForCollections(recReportParams) { +async function reconciliationReportForCollections(recReportParams, knex) { // compare collection holdings: // Get list of collections from CMR // Get list of collections from CUMULUS // Report collections only in CMR // Report collections only in CUMULUS log.info(`reconciliationReportForCollections (${JSON.stringify(recReportParams)})`); + //TODO - remove const oneWayReport = isOneWayCollectionReport(recReportParams); log.debug(`Creating one way report: ${oneWayReport}`); @@ -306,16 +368,18 @@ async function reconciliationReportForCollections(recReportParams) { // 'Version' as sort_key log.debug('Fetching collections from CMR.'); const cmrCollectionIds = await fetchCMRCollections(recReportParams); - const esCollectionIds = await fetchESCollections(recReportParams); - log.info(`Comparing ${cmrCollectionIds.length} CMR collections to ${esCollectionIds.length} Elasticsearch collections`); + const dbCollectionIds = await fetchDbCollections(recReportParams, knex); + log.info(`Comparing ${cmrCollectionIds.length} CMR collections to ${dbCollectionIds.length} Elasticsearch collections`); - let nextDbCollectionId = esCollectionIds[0]; + /** @type {string | undefined } */ + let nextDbCollectionId = dbCollectionIds[0]; + /** @type {string | undefined } */ let nextCmrCollectionId = cmrCollectionIds[0]; while (nextDbCollectionId && nextCmrCollectionId) { if (nextDbCollectionId < nextCmrCollectionId) { // Found an item that is only in Cumulus database and not in cmr - esCollectionIds.shift(); + dbCollectionIds.shift(); collectionsOnlyInCumulus.push(nextDbCollectionId); } else if (nextDbCollectionId > nextCmrCollectionId) { // Found an item that is only in cmr and not in Cumulus database @@ -324,16 +388,16 @@ async function reconciliationReportForCollections(recReportParams) { } else { // Found an item that is in both cmr and database okCollections.push(nextDbCollectionId); - esCollectionIds.shift(); + dbCollectionIds.shift(); cmrCollectionIds.shift(); } - nextDbCollectionId = (esCollectionIds.length !== 0) ? esCollectionIds[0] : undefined; + nextDbCollectionId = (dbCollectionIds.length !== 0) ? dbCollectionIds[0] : undefined; nextCmrCollectionId = (cmrCollectionIds.length !== 0) ? cmrCollectionIds[0] : undefined; } // Add any remaining database items to the report - collectionsOnlyInCumulus = collectionsOnlyInCumulus.concat(esCollectionIds); + collectionsOnlyInCumulus = collectionsOnlyInCumulus.concat(dbCollectionIds); // Add any remaining CMR items to the report if (!oneWayReport) collectionsOnlyInCmr = collectionsOnlyInCmr.concat(cmrCollectionIds); @@ -361,6 +425,10 @@ async function reconciliationReportForCollections(recReportParams) { * @returns {Promise} - an object with the okCount, onlyInCumulus, onlyInCmr */ async function reconciliationReportForGranuleFiles(params) { + if (!process.env.DISTRIBUTION_ENDPOINT) { + throw new Error('DISTRIBUTION_ENDPOINT is not defined in function environment variables, but is required'); + } + const distEndpoint = process.env.DISTRIBUTION_ENDPOINT; const { granuleInDb, granuleInCmr, bucketsConfig, distributionBucketMap } = params; let okCount = 0; const onlyInCumulus = []; @@ -390,7 +458,7 @@ async function reconciliationReportForGranuleFiles(params) { // not all files should be in CMR const distributionAccessUrl = await constructOnlineAccessUrl({ file: granuleFiles[urlFileName], - distEndpoint: process.env.DISTRIBUTION_ENDPOINT, + distEndpoint, bucketTypes, urlType: 'distribution', distributionBucketMap, @@ -398,7 +466,7 @@ async function reconciliationReportForGranuleFiles(params) { const s3AccessUrl = await constructOnlineAccessUrl({ file: granuleFiles[urlFileName], - distEndpoint: process.env.DISTRIBUTION_ENDPOINT, + distEndpoint, bucketTypes, urlType: 's3', distributionBucketMap, @@ -465,14 +533,18 @@ exports.reconciliationReportForGranuleFiles = reconciliationReportForGranuleFile /** * Compare the granule holdings in CMR with Cumulus for a given collection * - * @param {Object} params - parameters - * @param {string} params.collectionId - the collection which has the granules to be - * reconciled - * @param {Object} params.bucketsConfig - bucket configuration object - * @param {Object} params.distributionBucketMap - mapping of bucket->distirubtion path values - * (e.g. { bucket: distribution path }) - * @param {Object} params.recReportParams - Lambda report paramaters for narrowing focus - * @returns {Promise} - an object with the granulesReport and filesReport + * @param {Object} params - parameters + * @param {string} params.collectionId - the collection which has the granules to be + * reconciled + * @param {Object} params.bucketsConfig - bucket configuration object + * @param {Object} params.distributionBucketMap - mapping of bucket->distirubtion path values + * (e.g. { bucket: distribution path }) + * @param {NormalizedRecReportParams} params.recReportParams - Lambda report paramaters for + * narrowing focus + * @param {Object} params.knex - Database client for interacting with PostgreSQL + * database + * @returns {Promise} - an object with the granulesReport and + * filesReport */ async function reconciliationReportForGranules(params) { // compare granule holdings: @@ -481,51 +553,65 @@ async function reconciliationReportForGranules(params) { // Report granules only in CMR // Report granules only in CUMULUS log.info(`reconciliationReportForGranules(${params.collectionId})`); - const { collectionId, bucketsConfig, distributionBucketMap, recReportParams } = params; + const { collectionId, bucketsConfig, distributionBucketMap, recReportParams, knex } = params; const { name, version } = deconstructCollectionId(collectionId); + + /** @type {GranulesReport} */ const granulesReport = { okCount: 0, onlyInCumulus: [], onlyInCmr: [] }; + const filesReport = { okCount: 0, onlyInCumulus: [], onlyInCmr: [] }; try { - const cmrSettings = await getCmrSettings(); - const searchParams = new URLSearchParams({ short_name: name, version: version, sort_key: ['granule_ur'] }); + const cmrSettings = /** @type CMRSettings */(await getCmrSettings()); + const searchParams = new URLSearchParams({ short_name: name, version: version, sort_key: 'granule_ur' }); cmrGranuleSearchParams(recReportParams).forEach(([paramName, paramValue]) => { searchParams.append(paramName, paramValue); }); log.debug(`fetch CMRSearchConceptQueue(${collectionId}) with searchParams: ${JSON.stringify(searchParams)}`); - const cmrGranulesIterator = new CMRSearchConceptQueue({ + const cmrGranulesIterator + = /** @type {CMRSearchConceptQueue} */(new CMRSearchConceptQueue({ cmrSettings, type: 'granules', searchParams, format: 'umm_json', + })); + + const dbSearchParams = convertToDBGranuleSearchParams({ + ...recReportParams, + collectionIds: [collectionId], + }); + // TODO: fix typing + const granulesSearchQuery = getGranulesByApiPropertiesQuery({ + knex, + searchParams: dbSearchParams, + sortByFields: ['granule_id'], }); - const esGranuleSearchParamsByCollectionId = convertToESGranuleSearchParams( - { ...recReportParams, collectionIds: [collectionId] } - ); + const pgGranulesIterator = /** @type {QuerySearchClient} */(new QuerySearchClient( + granulesSearchQuery, + 100 // arbitrary limit on how items are fetched at once + )); - log.debug(`Create ES granule iterator with ${JSON.stringify(esGranuleSearchParamsByCollectionId)}`); - const esGranulesIterator = new ESCollectionGranuleQueue( - esGranuleSearchParamsByCollectionId, process.env.ES_INDEX - ); const oneWay = isOneWayGranuleReport(recReportParams); log.debug(`is oneWay granule report: ${collectionId}, ${oneWay}`); let [nextDbItem, nextCmrItem] = await Promise.all( - [esGranulesIterator.peek(), cmrGranulesIterator.peek()] + [(pgGranulesIterator.peek()), cmrGranulesIterator.peek()] ); while (nextDbItem && nextCmrItem) { - const nextDbGranuleId = nextDbItem.granuleId; + const nextDbGranuleId = nextDbItem.granule_id; // TODO typing :( -- oops. const nextCmrGranuleId = nextCmrItem.umm.GranuleUR; if (nextDbGranuleId < nextCmrGranuleId) { + console.log('pushing in iteration'); + console.log(JSON.stringify(nextDbItem)); // Found an item that is only in Cumulus database and not in CMR granulesReport.onlyInCumulus.push({ granuleId: nextDbGranuleId, collectionId: collectionId, }); - await esGranulesIterator.shift(); // eslint-disable-line no-await-in-loop + await pgGranulesIterator.shift(); // eslint-disable-line no-await-in-loop } else if (nextDbGranuleId > nextCmrGranuleId) { // Found an item that is only in CMR and not in Cumulus database if (!oneWay) { @@ -539,10 +625,16 @@ async function reconciliationReportForGranules(params) { } else { // Found an item that is in both CMR and Cumulus database granulesReport.okCount += 1; + // eslint-disable-next-line no-await-in-loop + const postgresGranuleFiles = await getFilesAndGranuleInfoQuery({ + knex, + searchParams: { granule_cumulus_id: nextDbItem.cumulus_id }, + sortColumns: ['key'], + }); const granuleInDb = { granuleId: nextDbGranuleId, collectionId: collectionId, - files: nextDbItem.files, + files: postgresGranuleFiles.map((f) => translatePostgresFileToApiFile(f)), }; const granuleInCmr = { GranuleUR: nextCmrGranuleId, @@ -550,9 +642,10 @@ async function reconciliationReportForGranules(params) { Version: nextCmrItem.umm.CollectionReference.Version, RelatedUrls: nextCmrItem.umm.RelatedUrls, }; - await esGranulesIterator.shift(); // eslint-disable-line no-await-in-loop + await pgGranulesIterator.shift(); // eslint-disable-line no-await-in-loop await cmrGranulesIterator.shift(); // eslint-disable-line no-await-in-loop + // TODO - this is an api granule file object. // compare the files now to avoid keeping the granules' information in memory // eslint-disable-next-line no-await-in-loop const fileReport = await reconciliationReportForGranuleFiles({ @@ -563,14 +656,17 @@ async function reconciliationReportForGranules(params) { filesReport.onlyInCmr = filesReport.onlyInCmr.concat(fileReport.onlyInCmr); } - [nextDbItem, nextCmrItem] = await Promise.all([esGranulesIterator.peek(), cmrGranulesIterator.peek()]); // eslint-disable-line max-len, no-await-in-loop + [nextDbItem, nextCmrItem] = await Promise.all([pgGranulesIterator.peek(), cmrGranulesIterator.peek()]); // eslint-disable-line max-len, no-await-in-loop } - // Add any remaining ES/PostgreSQL items to the report - while (await esGranulesIterator.peek()) { // eslint-disable-line no-await-in-loop - const dbItem = await esGranulesIterator.shift(); // eslint-disable-line no-await-in-loop + // Add any remaining PostgreSQL items to the report + while (await pgGranulesIterator.peek()) { // eslint-disable-line no-await-in-loop + const dbItem = await pgGranulesIterator.shift(); // eslint-disable-line no-await-in-loop + if (!dbItem) { + throw new Error('database returned item is null in reconciliationReportForGranules'); + } granulesReport.onlyInCumulus.push({ - granuleId: dbItem.granuleId, + granuleId: dbItem.granule_id, collectionId: collectionId, }); } @@ -579,6 +675,9 @@ async function reconciliationReportForGranules(params) { if (!oneWay) { while (await cmrGranulesIterator.peek()) { // eslint-disable-line no-await-in-loop const cmrItem = await cmrGranulesIterator.shift(); // eslint-disable-line no-await-in-loop + if (!cmrItem) { + throw new Error('CMR returned item is null in reconciliationReportForGranules'); + } granulesReport.onlyInCmr.push({ GranuleUR: cmrItem.umm.GranuleUR, ShortName: nextCmrItem.umm.CollectionReference.ShortName, @@ -608,21 +707,19 @@ exports.reconciliationReportForGranules = reconciliationReportForGranules; /** * Compare the holdings in CMR with Cumulus' internal data store, report any discrepancies * - * @param {Object} params . - parameters - * @param {Object} params.bucketsConfig - bucket configuration object - * @param {Object} params.distributionBucketMap - mapping of bucket->distirubtion path values + * @param {Object} params . - parameters + * @param {Object} params.bucketsConfig - bucket configuration object + * @param {Object} params.distributionBucketMap - mapping of bucket->distirubtion path values * (e.g. { bucket: distribution path }) - * @param {Object} [params.recReportParams] - optional Lambda endpoint's input params to - * narrow report focus - * @param {number} [params.recReportParams.StartTimestamp] - * @param {number} [params.recReportParams.EndTimestamp] - * @param {string} [params.recReportparams.collectionIds] - * @returns {Promise} - a reconciliation report + * @param {NormalizedRecReportParams} params.recReportParams - Lambda endpoint's input params to + * narrow focus of report + * @returns {Promise} - a reconciliation report */ async function reconciliationReportForCumulusCMR(params) { log.info(`reconciliationReportForCumulusCMR with params ${JSON.stringify(params)}`); + const knex = await getKnexClient(); const { bucketsConfig, distributionBucketMap, recReportParams } = params; - const collectionReport = await reconciliationReportForCollections(recReportParams); + const collectionReport = await reconciliationReportForCollections(recReportParams, knex); const collectionsInCumulusCmr = { okCount: collectionReport.okCollections.length, onlyInCumulus: collectionReport.onlyInCumulus, @@ -632,7 +729,7 @@ async function reconciliationReportForCumulusCMR(params) { // create granule and granule file report for collections in both Cumulus and CMR const promisedGranuleReports = collectionReport.okCollections.map( (collectionId) => reconciliationReportForGranules({ - collectionId, bucketsConfig, distributionBucketMap, recReportParams, + collectionId, bucketsConfig, distributionBucketMap, recReportParams, knex, }) ); const granuleAndFilesReports = await Promise.all(promisedGranuleReports); @@ -669,7 +766,7 @@ async function reconciliationReportForCumulusCMR(params) { * @param {Object} report - report to upload * @param {string} systemBucket - system bucket * @param {string} reportKey - report key - * @returns {Promise} + * @returns {Promise} */ function _uploadReportToS3(report, systemBucket, reportKey) { return s3().putObject({ @@ -682,16 +779,7 @@ function _uploadReportToS3(report, systemBucket, reportKey) { /** * Create a Reconciliation report and save it to S3 * - * @param {Object} recReportParams - params - * @param {Object} recReportParams.reportType - the report type - * @param {moment} recReportParams.createStartTime - when the report creation was begun - * @param {moment} recReportParams.endTimestamp - ending report datetime ISO Timestamp - * @param {string} recReportParams.location - location to inventory for report - * @param {string} recReportParams.reportKey - the s3 report key - * @param {string} recReportParams.stackName - the name of the CUMULUS stack - * @param {moment} recReportParams.startTimestamp - beginning report datetime ISO timestamp - * @param {string} recReportParams.systemBucket - the name of the CUMULUS system bucket - * @param {Knex} recReportParams.knex - Database client for interacting with PostgreSQL database + * @param {NormalizedRecReportParams} recReportParams - params * @returns {Promise} a Promise that resolves when the report has been * uploaded to S3 */ @@ -701,7 +789,6 @@ async function createReconciliationReport(recReportParams) { stackName, systemBucket, location, - knex, } = recReportParams; log.info(`createReconciliationReport (${JSON.stringify(recReportParams)})`); // Fetch the bucket names to reconcile @@ -714,6 +801,13 @@ async function createReconciliationReport(recReportParams) { const bucketsConfig = new BucketsConfig(bucketsConfigJson); // Write an initial report to S3 + /** + * @type {Object} + * @property {number} okCount + * @property {Object} okCountByGranule + * @property {string[]} onlyInS3 + * @property {Object[]} [onlyInDb] + */ const filesInCumulus = { okCount: 0, okCountByGranule: {}, @@ -726,6 +820,7 @@ async function createReconciliationReport(recReportParams) { onlyInCumulus: [], onlyInCmr: [], }; + let report = { ...initialReportHeader(recReportParams), filesInCumulus, @@ -743,7 +838,7 @@ async function createReconciliationReport(recReportParams) { // Create a report for each bucket const promisedBucketReports = dataBuckets.map( - (bucket) => createReconciliationReportForBucket(bucket, recReportParams, knex) + (bucket) => createReconciliationReportForBucket(bucket, recReportParams) ); const bucketReports = await Promise.all(promisedBucketReports); @@ -808,8 +903,10 @@ async function createReconciliationReport(recReportParams) { * @param {string} params.stackName - the name of the CUMULUS stack * @param {string} params.reportType - the type of reconciliation report * @param {string} params.reportName - the name of the report - * @param {Knex} params.knex - Knex client to interact with pg - * @returns {Object} report record saved to the database + * @param {Env} params.env - the environment variables + * @param {Knex} params.knex - Optional Instance of a Knex client for testing + * @param {EsClient} params.esClient - Optional Instance of an Elasticsearch client for testing + * @returns {Promise} report record saved to the database */ async function processRequest(params) { log.info(`processing reconciliation report request with params: ${JSON.stringify(params)}`); @@ -819,7 +916,7 @@ async function processRequest(params) { reportName, systemBucket, stackName, - knex = await getKnexClient(env), + knex = await getKnexClient({ env }), esClient = await getEsClient(), } = params; const createStartTime = moment.utc(); @@ -841,9 +938,10 @@ async function processRequest(params) { await indexReconciliationReport(esClient, reportApiRecord, process.env.ES_INDEX); log.info(`Report added to database as pending: ${JSON.stringify(reportApiRecord)}.`); - const concurrency = env.CONCURRENCY || 3; + const concurrency = env.CONCURRENCY || '3'; try { + /** @type NormalizedRecReportParams */ const recReportParams = { ...params, createStartTime, @@ -857,14 +955,18 @@ async function processRequest(params) { log.error( 'Internal Reconciliation Reports are no longer valid, as Cumulus is no longer utilizing Elasticsearch' ); + //TODO remove internal rec report code throw new Error('Internal Reconciliation Reports are no longer valid'); } else if (reportType === 'Granule Inventory') { await createGranuleInventoryReport(recReportParams); } else if (reportType === 'ORCA Backup') { await createOrcaBackupReconciliationReport(recReportParams); - } else { + } else if (['Inventory', 'Granule Not Found'].includes(reportType)) { // reportType is in ['Inventory', 'Granule Not Found'] - await createReconciliationReport(recReportParams); // TODO Update to not use elasticsearch + await createReconciliationReport(recReportParams); + } else { + // TODO make this a better error (res.boom?) + throw new Error(`Invalid report type: ${reportType}`); } const generatedRecord = { @@ -900,8 +1002,8 @@ async function processRequest(params) { async function handler(event) { // increase the limit of search result from CMR.searchCollections/searchGranules - process.env.CMR_LIMIT = process.env.CMR_LIMIT || 5000; - process.env.CMR_PAGE_SIZE = process.env.CMR_PAGE_SIZE || 200; + process.env.CMR_LIMIT = process.env.CMR_LIMIT || '5000'; + process.env.CMR_PAGE_SIZE = process.env.CMR_PAGE_SIZE || '200'; //TODO: Remove irrelevant env vars from terraform after ES reports are removed const varsToLog = ['CMR_LIMIT', 'CMR_PAGE_SIZE', 'ES_SCROLL', 'ES_SCROLL_SIZE']; diff --git a/packages/api/lambdas/reports/orca-backup-reconciliation-report.js b/packages/api/lambdas/reports/orca-backup-reconciliation-report.js index c22fecaabed..c91a57bd5a3 100644 --- a/packages/api/lambdas/reports/orca-backup-reconciliation-report.js +++ b/packages/api/lambdas/reports/orca-backup-reconciliation-report.js @@ -38,7 +38,11 @@ const ORCASearchCatalogQueue = require('../../lib/ORCASearchCatalogQueue'); * @property {string} reason */ -/** @typedef { import('@cumulus/db').PostgresGranuleRecord } PostgresGranuleRecord */ +/** + * @typedef { import('@cumulus/db').PostgresGranuleRecord } PostgresGranuleRecord + * @typedef {import('../../lib/types').NormalizedRecReportParams } NormalizedRecReportParams + */ + /** * @typedef {Object} GranuleReport * @property {boolean} ok @@ -98,8 +102,7 @@ const fileConflictTypes = { /** * Fetch orca configuration for all or specified collections * - * @param {Object} recReportParams - input report params - * @param {String[]} recReportParams.collectionIds - array of collectionIds + * @param {NormalizedRecReportParams} recReportParams - input report params * @returns {Promise} - list of { collectionId, orca configuration } */ async function fetchCollectionsConfig(recReportParams) { @@ -354,8 +357,7 @@ async function addGranuleToReport({ /** * Compare the granule holdings in Cumulus with ORCA * - * @param {Object} recReportParams - input report params - * @param {String[]} recReportParams.collectionIds - array of collectionIds + * @param {NormalizedRecReportParams} recReportParams - input report params * @returns {Promise} an object with the okCount, onlyInCumulus, onlyInOrca * and withConfilcts */ @@ -496,17 +498,7 @@ async function orcaReconciliationReportForGranules(recReportParams) { /** * Create an ORCA Backup Reconciliation report and save it to S3 * - * @param {Object} recReportParams - params - * @param {Object} recReportParams.collectionIds - array of collectionIds - * @param {Object} recReportParams.providers - array of providers - * @param {Object} recReportParams.granuleIds - array of granuleIds - * @param {Object} recReportParams.reportType - the report type - * @param {moment} recReportParams.createStartTime - when the report creation was begun - * @param {moment} recReportParams.endTimestamp - ending report datetime ISO Timestamp - * @param {string} recReportParams.reportKey - the s3 report key - * @param {string} recReportParams.stackName - the name of the CUMULUS stack - * @param {moment} recReportParams.startTimestamp - beginning report datetime ISO timestamp - * @param {string} recReportParams.systemBucket - the name of the CUMULUS system bucket + * @param {NormalizedRecReportParams} recReportParams - params * @returns {Promise} a Promise that resolves when the report has been * uploaded to S3 */ diff --git a/packages/api/lib/reconciliationReport.js b/packages/api/lib/reconciliationReport.js index 328ce25cfd9..18bb103c0b4 100644 --- a/packages/api/lib/reconciliationReport.js +++ b/packages/api/lib/reconciliationReport.js @@ -1,3 +1,5 @@ +//@ts-check + 'use strict'; const isEqual = require('lodash/isEqual'); @@ -9,6 +11,27 @@ const Logger = require('@cumulus/logger'); const log = new Logger({ sender: '@api/lambdas/create-reconciliation-report' }); +/** @typedef {import('../lib/types').RecReportParams } RecReportParams */ +/** @typedef {import('../lib/types').NormalizedRecReportParams } NormalizedRecReportParams */ + +/** + * @typedef {Object} ReportHeader + * @property {string | undefined} collectionId - The collection ID. + * @property {string | string[] | undefined} collectionIds - The collection IDs. + * @property {string | undefined} createEndTime - The end time of the report creation. + * @property {string} createStartTime - The start time of the report creation. + * @property {string | undefined} error - Any error that occurred. + * @property {string | undefined} granuleId - The granule ID. + * @property {string | string[] | undefined} granuleIds - The granule IDs. + * @property {string | string[] | undefined} provider - The provider. + * @property {string | string[] | undefined} providers - The providers. + * @property {string | undefined} location - The location. + * @property {string | undefined} reportEndTime - The end time of the report. + * @property {string | undefined} reportStartTime - The start time of the report. + * @property {string} reportType - The type of the report. + * @property {string} status - The status of the report. + */ + /** * Extra search params to add to the cmrGranules searchConceptQueue * @@ -23,6 +46,7 @@ function cmrGranuleSearchParams(recReportParams) { return []; } +// TODO: remove /** * Prepare a list of collectionIds into an _id__in object * @@ -36,7 +60,7 @@ function searchParamsForCollectionIdArray(collectionIds) { /** * @param {string} dateable - any input valid for a JS Date contstructor. - * @returns {number} - primitive value of input date string or undefined, if + * @returns {number | undefined} - primitive value of input date string or undefined, if * input string not convertable. */ function dateToValue(dateable) { @@ -49,6 +73,7 @@ function dateStringToDateOrNull(dateable) { return !Number.isNaN(date.valueOf()) ? date : undefined; } +//TODO Verify this function is still needed /** * * @param {Object} params - request params to convert to Elasticsearch params @@ -74,9 +99,9 @@ function convertToESCollectionSearchParams(params) { * @param {[Object]} params.collectionIds - List containing single Collection object * multiple or no collections will result in a * search object without a collection object - * @param {moment} params.endTimestamp - ending report datetime ISO Timestamp - * @param {moment} params.startTimestamp - beginning report datetime ISO timestamp - * @returns {[Object]} - array of objects of desired + * @param {string} params.endTimestamp - ending report datetime ISO Timestamp + * @param {string} params.startTimestamp - beginning report datetime ISO timestamp + * @returns {Object[]} - array of objects of desired * parameters formatted for database collection * search */ @@ -121,7 +146,7 @@ function convertToESGranuleSearchParams(params) { /** * Convert reconciliation report parameters to PostgreSQL database search params. * - * @param {Object} params - request params to convert to database params + * @param {NormalizedRecReportParams} params - request params to convert to database params * @returns {Object} object of desired parameters formated for database granule search */ function convertToDBGranuleSearchParams(params) { @@ -152,7 +177,7 @@ function convertToDBGranuleSearchParams(params) { * convert to es search parameters using createdAt for report time range * * @param {Object} params - request params to convert to Elasticsearch params - * @returns {Object} object of desired parameters formated for Elasticsearch. + * @returns {Object} object of desired parameters formatted for Elasticsearch. */ function convertToESGranuleSearchParamsWithCreatedAtRange(params) { const searchParamsWithUpdatedAt = convertToESGranuleSearchParams(params); @@ -167,7 +192,7 @@ function convertToESGranuleSearchParamsWithCreatedAtRange(params) { /** * * @param {Object} params - request params to convert to orca params - * @returns {Object} object of desired parameters formated for orca + * @returns {Object} object of desired parameters formatted for orca */ function convertToOrcaGranuleSearchParams(params) { const { collectionIds, granuleIds, providers, startTimestamp, endTimestamp } = params; @@ -183,12 +208,8 @@ function convertToOrcaGranuleSearchParams(params) { /** * create initial report header * - * @param {Object} recReportParams - params - * @param {Object} recReportParams.reportType - the report type - * @param {moment} recReportParams.createStartTime - when the report creation was begun - * @param {moment} recReportParams.endTimestamp - ending report datetime ISO Timestamp - * @param {moment} recReportParams.startTimestamp - beginning report datetime ISO timestamp - * @returns {Object} report header + * @param {NormalizedRecReportParams} recReportParams - params + * @returns {ReportHeader} report header */ function initialReportHeader(recReportParams) { const { diff --git a/packages/api/lib/reconciliationReport/normalizeEvent.js b/packages/api/lib/reconciliationReport/normalizeEvent.js index 88cd2283df1..5df330d3876 100644 --- a/packages/api/lib/reconciliationReport/normalizeEvent.js +++ b/packages/api/lib/reconciliationReport/normalizeEvent.js @@ -1,3 +1,5 @@ +//@ts-check + 'use strict'; /*eslint prefer-const: ["error", {"destructuring": "all"}]*/ @@ -5,11 +7,16 @@ const isString = require('lodash/isString'); const { removeNilProperties } = require('@cumulus/common/util'); const { InvalidArgument } = require('@cumulus/errors'); +/** + * @typedef {import('../types').RecReportParams } RecReportParams + * @typedef {import('../types').NormalizedRecReportParams } NormalizedRecReportParams + */ + /** * ensures input reportType can be handled by the lambda code. * * @param {string} reportType - * @returns {undefined} - if reportType is valid + * @returns {void} - if reportType is valid * @throws {InvalidArgument} - otherwise */ function validateReportType(reportType) { @@ -31,7 +38,7 @@ function validateReportType(reportType) { /** * Convert input to an ISO timestamp. * @param {any} dateable - any type convertable to JS Date - * @returns {string} - date formated as ISO timestamp; + * @returns {string | undefined} - date formated as ISO timestamp; */ function isoTimestamp(dateable) { if (dateable) { @@ -45,26 +52,19 @@ function isoTimestamp(dateable) { } /** - * Transforms input granuleId into correct parameters for use in the - * Reconciliation Report lambda. - * @param {Array|string} granuleId - list of granule Ids - * @param {Object} modifiedEvent - input event - * @returns {Object} updated input even with correct granuleId and granuleIds values. + * Normalizes the input into an array of granule IDs. + * + * @param {string|string[]|undefined} granuleId - The granule ID or an array of granule IDs. + * @returns {string[]|undefined} An array of granule IDs, or undefined if no granule ID is provided. */ -function updateGranuleIds(granuleId, modifiedEvent) { - let returnEvent = { ...modifiedEvent }; - if (granuleId) { - // transform input granuleId into an array on granuleIds - const granuleIds = isString(granuleId) ? [granuleId] : granuleId; - returnEvent = { ...modifiedEvent, granuleIds }; - } - return returnEvent; +function generateGranuleIds(granuleId) { + return granuleId ? (isString(granuleId) ? [granuleId] : granuleId) : undefined; } /** * Transforms input collectionId into correct parameters for use in the * Reconciliation Report lambda. - * @param {Array|string} collectionId - list of collection Ids + * @param {string[]|string | undefined} collectionId - list of collection Ids * @param {Object} modifiedEvent - input event * @returns {Object} updated input even with correct collectionId and collectionIds values. */ @@ -78,26 +78,32 @@ function updateCollectionIds(collectionId, modifiedEvent) { return returnEvent; } -function updateProviders(provider, modifiedEvent) { - let returnEvent = { ...modifiedEvent }; - if (provider) { - // transform input provider into an array on providers - const providers = isString(provider) ? [provider] : provider; - returnEvent = { ...modifiedEvent, providers }; - } - return returnEvent; +/** + * Normalizes the input provider into an array of providers. + * + * @param {string|string[]|undefined} provider - The provider or list of providers. + * @returns {string[]|undefined} An array of providers, or undefined if no provider is provided. + */ +function generateProviders(provider) { + return provider ? (isString(provider) ? [provider] : provider) : undefined; } /** * Converts input parameters to normalized versions to pass on to the report * functions. Ensures any input dates are formatted as ISO strings. * - * @param {Object} event - input payload - * @returns {Object} - Object with normalized parameters + * @param {RecReportParams} event - input payload + * @returns {NormalizedRecReportParams} - Object with normalized parameters */ function normalizeEvent(event) { const systemBucket = event.systemBucket || process.env.system_bucket; + if (!systemBucket) { + throw new InvalidArgument('systemBucket is required.'); + } const stackName = event.stackName || process.env.stackName; + if (!stackName) { + throw new InvalidArgument('stackName is required.'); + } const startTimestamp = isoTimestamp(event.startTimestamp); const endTimestamp = isoTimestamp(event.endTimestamp); @@ -120,16 +126,16 @@ function normalizeEvent(event) { throw new InvalidArgument(`${reportType} reports cannot be launched with more than one input (granuleId, collectionId, or provider).`); } modifiedEvent = updateCollectionIds(collectionId, modifiedEvent); - modifiedEvent = updateGranuleIds(granuleId, modifiedEvent); - modifiedEvent = updateProviders(provider, modifiedEvent); - return removeNilProperties({ + return (removeNilProperties({ ...modifiedEvent, systemBucket, stackName, startTimestamp, endTimestamp, reportType, - }); + granuleIds: generateGranuleIds(granuleId), + providers: generateProviders(provider), + })); } exports.normalizeEvent = normalizeEvent; diff --git a/packages/api/tests/lambdas/test-create-reconciliation-report-internals.js b/packages/api/tests/lambdas/test-create-reconciliation-report-internals.js index 819a1acc326..ab4e9b248e9 100644 --- a/packages/api/tests/lambdas/test-create-reconciliation-report-internals.js +++ b/packages/api/tests/lambdas/test-create-reconciliation-report-internals.js @@ -9,7 +9,6 @@ const CRP = rewire('../../lambdas/create-reconciliation-report'); const linkingFilesToGranules = CRP.__get__('linkingFilesToGranules'); const isOneWayCollectionReport = CRP.__get__('isOneWayCollectionReport'); const isOneWayGranuleReport = CRP.__get__('isOneWayGranuleReport'); -const shouldAggregateGranulesForCollections = CRP.__get__('shouldAggregateGranulesForCollections'); test( 'isOneWayCollectionReport returns true only when one or more specific parameters ' @@ -86,39 +85,6 @@ test( } ); -test( - 'shouldAggregateGranulesForCollections returns true only when one or more specific parameters ' - + ' are present on the reconciliation report object.', - (t) => { - const paramsThatShouldReturnTrue = ['updatedAt__to', 'updatedAt__from']; - const paramsThatShouldReturnFalse = [ - 'stackName', - 'systemBucket', - 'startTimestamp', - 'anythingAtAll', - ]; - - paramsThatShouldReturnTrue.map((p) => - t.true(shouldAggregateGranulesForCollections({ [p]: randomId('value') }))); - - paramsThatShouldReturnFalse.map((p) => - t.false(shouldAggregateGranulesForCollections({ [p]: randomId('value') }))); - - const allTrueKeys = paramsThatShouldReturnTrue.reduce( - (accum, current) => ({ ...accum, [current]: randomId('value') }), - {} - ); - t.true(shouldAggregateGranulesForCollections(allTrueKeys)); - - const allFalseKeys = paramsThatShouldReturnFalse.reduce( - (accum, current) => ({ ...accum, [current]: randomId('value') }), - {} - ); - t.false(shouldAggregateGranulesForCollections(allFalseKeys)); - t.true(shouldAggregateGranulesForCollections({ ...allTrueKeys, ...allFalseKeys })); - } -); - test('linkingFilesToGranules return values', (t) => { const reportTypesToReturnFalse = ['Granule Inventory', 'Internal', 'Inventory']; const reportTypesToReturnTrue = ['Granule Not Found']; diff --git a/packages/api/tests/lambdas/test-create-reconciliation-report.js b/packages/api/tests/lambdas/test-create-reconciliation-report.js index eab75462bca..7d4b97729ad 100644 --- a/packages/api/tests/lambdas/test-create-reconciliation-report.js +++ b/packages/api/tests/lambdas/test-create-reconciliation-report.js @@ -8,6 +8,7 @@ const pMap = require('p-map'); const omit = require('lodash/omit'); const range = require('lodash/range'); const sample = require('lodash/sample'); +const compact = require('lodash/compact'); const sinon = require('sinon'); const sortBy = require('lodash/sortBy'); const test = require('ava'); @@ -25,28 +26,27 @@ const { getBucketsConfigKey } = require('@cumulus/common/stack'); const { constructCollectionId } = require('@cumulus/message/Collections'); const { randomString, randomId } = require('@cumulus/common/test-utils'); const { - ProviderPgModel, - fakeProviderRecordFactory, - translateApiFiletoPostgresFile, - generateLocalTestDb, - destroyLocalTestDb, - localStackConnectionEnv, - migrationDir, CollectionPgModel, + destroyLocalTestDb, ExecutionPgModel, - FilePgModel, - GranulePgModel, - ReconciliationReportPgModel, fakeCollectionRecordFactory, fakeExecutionRecordFactory, fakeGranuleRecordFactory, - translatePostgresCollectionToApiCollection, + fakeProviderRecordFactory, + FilePgModel, + generateLocalTestDb, + GranulePgModel, + localStackConnectionEnv, + migrationDir, + ProviderPgModel, + ReconciliationReportPgModel, + translateApiCollectionToPostgresCollection, + translateApiFiletoPostgresFile, translateApiGranuleToPostgresGranule, - translatePostgresReconReportToApiReconReport + translatePostgresReconReportToApiReconReport, } = require('@cumulus/db'); const { getDistributionBucketMapKey } = require('@cumulus/distribution-utils'); -const indexer = require('@cumulus/es-client/indexer'); -const { Search, getEsClient } = require('@cumulus/es-client/search'); +const { Search } = require('@cumulus/es-client/search'); const { bootstrapElasticSearch } = require('@cumulus/es-client/bootstrap'); const { @@ -64,10 +64,14 @@ const handler = (event) => unwrappedHandler(normalizeEvent(event)); let esAlias; let esIndex; -let esClient; const createBucket = (Bucket) => awsServices.s3().createBucket({ Bucket }); -const testDbName = `create_rec_reports_${cryptoRandomString({ length: 10 })}`; +const requiredStaticCollectionFields = { + granuleIdExtraction: randomString(), + granuleId: randomString(), + sampleFileName: randomString(), + files: [], +}; function createDistributionBucketMapFromBuckets(buckets) { let bucketMap = {}; @@ -126,59 +130,114 @@ async function storeFilesToS3(files) { ); } -/** - * Index a single collection to elasticsearch. If the collection object has an - * updatedAt value, use a sinon stub to set the time of the granule to that - * input time. - * @param {Object} collection - a collection object -* @returns {Promise} - promise of indexed collection with active granule -*/ -async function storeCollection(collection) { - let stub; - if (collection.updatedAt) { - stub = sinon.stub(Date, 'now').returns(collection.updatedAt); - } - try { - await indexer.indexCollection(esClient, collection, esAlias); - return indexer.indexGranule( - esClient, - fakeGranuleFactoryV2({ - collectionId: constructCollectionId(collection.name, collection.version), - updatedAt: collection.updatedAt, - provider: randomString(), - }), - esAlias - ); - } finally { - if (collection.updatedAt) stub.restore(); - } +async function storeCollectionAndGranuleToPostgres(collection, context) { + const postgresCollection = translateApiCollectionToPostgresCollection({ + ...collection, + ...requiredStaticCollectionFields, + }); + const [pgCollectionRecord] = await context.collectionPgModel.create( + context.knex, + postgresCollection + ); + const [pgProviderRecord] = await context.providerPgModel.create( + context.knex, + fakeProviderRecordFactory(), + ['name', 'cumulus_id'] + ); + const collectionGranule = fakeGranuleRecordFactory({ + updated_at: pgCollectionRecord.updated_at, + created_at: pgCollectionRecord.created_at, + collection_cumulus_id: pgCollectionRecord.cumulus_id, + provider_cumulus_id: pgProviderRecord.cumulus_id, + }); + // TODO - verify edit, this was originally retruning granuleRows, + // and collection was granuleRows[0] :\ + await context.granulePgModel.create(context.knex, collectionGranule); + return { + granule: { + ...collectionGranule, + collectionId: `${collection.name}___${collection.version}`, + }, + collection: { + ...pgCollectionRecord, + providerName: pgProviderRecord.name, + }, + }; } -/** - * Index Dated collections to ES for testing timeranges. These need to happen - * in sequence because of the way we are stubbing Date.now() during indexing. - * - * @param {Array} collections - list of collection objects - * @returns {Promise} - Promise of collections indexed - */ -function storeCollectionsToElasticsearch(collections) { - let result = Promise.resolve(); - collections.forEach((collection) => { - result = result.then(() => storeCollection(collection)); - }); - return result; +async function storeCollectionsWithGranuleToPostgres(collections, context) { + const records = await Promise.all( + collections.map((collection) => storeCollectionAndGranuleToPostgres(collection, context)) + ); + return { + collections: records.map((record) => record.collection), + granules: records.map((record) => record.granule), + }; } -/** - * Index granules to ES for testing - * - * @param {Array} granules - list of granules objects - * @returns {Promise} - Promise of indexed granules - */ -async function storeGranulesToElasticsearch(granules) { - await Promise.all( - granules.map((granule) => indexer.indexGranule(esClient, granule, esAlias)) +async function generateRandomGranules(t, { + bucketRange = 2, + collectionRange = 10, + granuleRange = 10, + fileRange = 10, + stubCmr = true, +} = {}) { + const { filePgModel, granulePgModel, knex } = t.context; + + const dataBuckets = range(bucketRange).map(() => randomId('bucket')); + await Promise.all(dataBuckets.map((bucket) => + createBucket(bucket) + .then(() => t.context.bucketsToCleanup.push(bucket)))); + + // Write the buckets config to S3 + await storeBucketsConfigToS3( + dataBuckets, + t.context.systemBucket, + t.context.stackName + ); + + // Create collections that are in sync + const matchingColls = range(collectionRange).map(() => ({ + name: randomId('name'), + version: randomId('vers'), + })); + const { collections: postgresCollections } = + await storeCollectionsWithGranuleToPostgres(matchingColls, t.context); + const collectionCumulusId = postgresCollections[0].cumulus_id; + + // Create random files + const pgGranules = await granulePgModel.insert( + knex, + range(granuleRange).map(() => fakeGranuleRecordFactory({ + collection_cumulus_id: collectionCumulusId, + })), + ['cumulus_id', 'granule_id'] ); + const files = range(fileRange).map((i) => ({ + bucket: dataBuckets[i % dataBuckets.length], + key: randomId('key', 10), + granule_cumulus_id: pgGranules[i].cumulus_id, + })); + + // Store the files to S3 and postgres + await Promise.all([ + storeFilesToS3(files), + filePgModel.insert(knex, files), + ]); + + if (stubCmr) { + const cmrCollections = sortBy(matchingColls, ['name', 'version']) + .map((cmrCollection) => ({ + umm: { ShortName: cmrCollection.name, Version: cmrCollection.version }, + })); + CMR.prototype.searchConcept.restore(); + const cmrSearchStub = sinon.stub(CMR.prototype, 'searchConcept'); + cmrSearchStub.withArgs('collections').onCall(0).resolves(cmrCollections); + cmrSearchStub.withArgs('collections').onCall(1).resolves([]); + cmrSearchStub.withArgs('granules').resolves([]); + } + + return { files, granules: pgGranules, matchingColls, dataBuckets }; } async function fetchCompletedReport(reportRecord) { @@ -192,35 +251,6 @@ async function fetchCompletedReportString(reportRecord) { .then((response) => getObjectStreamContents(response.Body)); } -/** - * Looks up and returns the granulesIds given a list of collectionIds. - * @param {Array} collectionIds - list of collectionIds - * @returns {Array} list of matching granuleIds - */ -async function granuleIdsFromCollectionIds(collectionIds) { - const esValues = await (new Search( - { queryStringParameters: { collectionId__in: collectionIds.join(',') } }, - 'granule', - esAlias - )).query(); - return esValues.results.map((value) => value.granuleId); -} - -/** - * Looks up and returns the providers given a list of collectionIds. - * @param {Array} collectionIds - list of collectionIds - * @returns {Array} list of matching providers - */ -async function providersFromCollectionIds(collectionIds) { - const esValues = await (new Search( - { queryStringParameters: { collectionId__in: collectionIds.join(',') } }, - 'granule', - esAlias - )).query(); - - return esValues.results.map((value) => value.provider); -} - const randomBetween = (a, b) => Math.floor(Math.random() * (b - a + 1) + a); const randomTimeBetween = (t1, t2) => randomBetween(t1, t2); @@ -229,8 +259,11 @@ const randomTimeBetween = (t1, t2) => randomBetween(t1, t2); * random collections where some fall within the start and end timestamps. * Also creates a number that are only in ES, as well as some that are only * "returned by CMR" (as a stubbed function) - * @param {Object} t - AVA test context. - * @returns {Object} setupVars - Object with information about the current + * + * @param t.t + * @param {object} t - AVA test context. + * @param t.params + * @returns {object} setupVars - Object with information about the current * state of elasticsearch and CMR mock. * The object returned has: * + startTimestamp - beginning of matching timerange @@ -245,7 +278,7 @@ const randomTimeBetween = (t1, t2) => randomBetween(t1, t2); * excluded from CMR mock. (only in ES out of range) * + extraCmrCollections - collections not in ES but returned by the CMR mock. */ -const setupElasticAndCMRForTests = async ({ t, params = {} }) => { +const setupDatabaseAndCMRForTests = async ({ t, params = {} }) => { const dataBuckets = range(2).map(() => randomId('bucket')); await Promise.all( dataBuckets.map((bucket) => @@ -275,30 +308,35 @@ const setupElasticAndCMRForTests = async ({ t, params = {} }) => { // Create collections that are in sync ES/CMR during the time period const matchingCollections = range(numMatchingCollections).map((r) => ({ + ...requiredStaticCollectionFields, name: randomId(`name${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(startTimestamp, endTimestamp), })); // Create collections in sync ES/CMR outside of the timestamps range const matchingCollectionsOutsideRange = range(numMatchingCollectionsOutOfRange).map((r) => ({ + ...requiredStaticCollectionFields, name: randomId(`name${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(monthEarlier, startTimestamp - 1), })); // Create collections in ES only within the timestamp range const extraESCollections = range(numExtraESCollections).map((r) => ({ + ...requiredStaticCollectionFields, name: randomId(`extraES${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(startTimestamp, endTimestamp), })); // Create collections in ES only outside of the timestamp range const extraESCollectionsOutOfRange = range(numExtraESCollectionsOutOfRange).map((r) => ({ + ...requiredStaticCollectionFields, name: randomId(`extraES${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(endTimestamp + 1, monthLater), })); // create extra cmr collections that fall inside of the range. const extraCmrCollections = range(numExtraCmrCollections).map((r) => ({ + ...requiredStaticCollectionFields, name: randomId(`extraCmr${r}-`), version: randomId('vers'), updatedAt: randomTimeBetween(startTimestamp, endTimestamp), @@ -320,13 +358,21 @@ const setupElasticAndCMRForTests = async ({ t, params = {} }) => { cmrSearchStub.withArgs('collections').onCall(1).resolves([]); cmrSearchStub.withArgs('granules').resolves([]); - await storeCollectionsToElasticsearch( - matchingCollections - .concat(matchingCollectionsOutsideRange) - .concat(extraESCollections) - .concat(extraESCollectionsOutOfRange) - ); + const { collections: createdCollections, granules: collectionGranules } = + await storeCollectionsWithGranuleToPostgres( + matchingCollections + .concat(matchingCollectionsOutsideRange) + .concat(extraESCollections) + .concat(extraESCollectionsOutOfRange), + t.context + ); + const mappedProviders = {}; + createdCollections.forEach((collection) => { + mappedProviders[ + constructCollectionId(collection.name, collection.version) + ] = collection.providerName; + }); return { startTimestamp, endTimestamp, @@ -335,33 +381,36 @@ const setupElasticAndCMRForTests = async ({ t, params = {} }) => { extraESCollections, extraESCollectionsOutOfRange, extraCmrCollections, + collectionGranules, + mappedProviders, }; }; -test.before(async (t) => { - process.env = { - ...process.env, - ...localStackConnectionEnv, - PG_DATABASE: testDbName, - }; +test.before(async () => { process.env.cmr_password_secret_name = randomId('cmr-secret-name'); + process.env.DISTRIBUTION_ENDPOINT = 'TEST_ENDPOINT'; await awsServices.secretsManager().createSecret({ Name: process.env.cmr_password_secret_name, SecretString: randomId('cmr-password'), }); - const { knex, knexAdmin } = await generateLocalTestDb(testDbName, migrationDir); +}); + +test.beforeEach(async (t) => { + t.context.testDbName = `create_rec_reports_${cryptoRandomString({ length: 10 })}`; + process.env = { + ...process.env, + ...localStackConnectionEnv, + PG_DATABASE: t.context.testDbName, + }; + const { knex, knexAdmin } = await generateLocalTestDb(t.context.testDbName, migrationDir); t.context.knex = knex; t.context.knexAdmin = knexAdmin; - t.context.providerPgModel = new ProviderPgModel(); t.context.collectionPgModel = new CollectionPgModel(); t.context.executionPgModel = new ExecutionPgModel(); t.context.filePgModel = new FilePgModel(); t.context.granulePgModel = new GranulePgModel(); t.context.reconciliationReportPgModel = new ReconciliationReportPgModel(); -}); - -test.beforeEach(async (t) => { t.context.bucketsToCleanup = []; t.context.stackName = randomId('stack'); t.context.systemBucket = randomId('bucket'); @@ -382,13 +431,22 @@ test.beforeEach(async (t) => { index: esIndex, alias: esAlias, }); - esClient = await getEsClient(); t.context.esReportClient = new Search( {}, 'reconciliationReport', process.env.ES_INDEX ); + // write 4 providers to the database + t.context.providers = await Promise.all(new Array(4).fill().map(async () => { + const [pgProvider] = await t.context.providerPgModel.create( + t.context.knex, + fakeProviderRecordFactory(), + ['cumulus_id', 'name'] + ); + return pgProvider; + })); + t.context.execution = fakeExecutionRecordFactory(); const [pgExecution] = await t.context.executionPgModel.create( t.context.knex, @@ -399,26 +457,27 @@ test.beforeEach(async (t) => { }); test.afterEach.always(async (t) => { - await Promise.all(flatten(t.context.bucketsToCleanup.map(recursivelyDeleteS3Bucket))); + await Promise.all( + flatten(t.context.bucketsToCleanup.map(recursivelyDeleteS3Bucket)) + ); await t.context.executionPgModel.delete( t.context.knex, { cumulus_id: t.context.executionCumulusId } ); CMR.prototype.searchConcept.restore(); - await esClient.client.indices.delete({ index: esIndex }); + await destroyLocalTestDb({ + knex: t.context.knex, + knexAdmin: t.context.knexAdmin, + testDbName: t.context.testDbName, + }); }); -test.after.always(async (t) => { +test.after.always(async () => { await awsServices.secretsManager().deleteSecret({ SecretId: process.env.cmr_password_secret_name, ForceDeleteWithoutRecovery: true, }); delete process.env.cmr_password_secret_name; - await destroyLocalTestDb({ - knex: t.context.knex, - knexAdmin: t.context.knexAdmin, - testDbName, - }); }); test.serial('Generates valid reconciliation report for no buckets', async (t) => { @@ -462,68 +521,9 @@ test.serial('Generates valid reconciliation report for no buckets', async (t) => t.like(esRecord, reportRecord); }); +// TODO - use this to make generic the data to PG test.serial('Generates valid GNF reconciliation report when everything is in sync', async (t) => { - const { filePgModel, granulePgModel, knex } = t.context; - - const dataBuckets = range(2).map(() => randomId('bucket')); - await Promise.all(dataBuckets.map((bucket) => - createBucket(bucket) - .then(() => t.context.bucketsToCleanup.push(bucket)))); - - // Write the buckets config to S3 - await storeBucketsConfigToS3( - dataBuckets, - t.context.systemBucket, - t.context.stackName - ); - - // Create collections that are in sync - const matchingColls = range(10).map(() => ({ - name: randomId('name'), - version: randomId('vers'), - })); - await storeCollectionsToElasticsearch(matchingColls); - - const collection = fakeCollectionRecordFactory({ - name: matchingColls[0].name, - version: matchingColls[0].version, - }); - const [pgCollection] = await t.context.collectionPgModel.create( - t.context.knex, - collection - ); - const collectionCumulusId = pgCollection.cumulus_id; - - // Create random files - const pgGranules = await granulePgModel.insert( - knex, - range(10).map(() => fakeGranuleRecordFactory({ - collection_cumulus_id: collectionCumulusId, - })) - ); - const files = range(10).map((i) => ({ - bucket: dataBuckets[i % dataBuckets.length], - key: randomId('key'), - granule_cumulus_id: pgGranules[i].cumulus_id, - })); - - // Store the files to S3 and DynamoDB - await Promise.all([ - storeFilesToS3(files), - filePgModel.insert(knex, files), - ]); - - const cmrCollections = sortBy(matchingColls, ['name', 'version']) - .map((cmrCollection) => ({ - umm: { ShortName: cmrCollection.name, Version: cmrCollection.version }, - })); - - CMR.prototype.searchConcept.restore(); - const cmrSearchStub = sinon.stub(CMR.prototype, 'searchConcept'); - cmrSearchStub.withArgs('collections').onCall(0).resolves(cmrCollections); - cmrSearchStub.withArgs('collections').onCall(1).resolves([]); - cmrSearchStub.withArgs('granules').resolves([]); - + const { files, matchingColls } = await generateRandomGranules(t); const event = { systemBucket: t.context.systemBucket, stackName: t.context.stackName, @@ -562,68 +562,7 @@ test.serial('Generates valid GNF reconciliation report when everything is in syn }); test.serial('Generates a valid Inventory reconciliation report when everything is in sync', async (t) => { - const { filePgModel, granulePgModel, knex } = t.context; - - const dataBuckets = range(2).map(() => randomId('bucket')); - await Promise.all(dataBuckets.map((bucket) => - createBucket(bucket) - .then(() => t.context.bucketsToCleanup.push(bucket)))); - - // Write the buckets config to S3 - await storeBucketsConfigToS3( - dataBuckets, - t.context.systemBucket, - t.context.stackName - ); - - // Create collections that are in sync - const matchingColls = range(10).map(() => ({ - name: randomId('name'), - version: randomId('vers'), - })); - await storeCollectionsToElasticsearch(matchingColls); - - const collection = fakeCollectionRecordFactory({ - name: matchingColls[0].name, - version: matchingColls[0].version, - }); - const [pgCollection] = await t.context.collectionPgModel.create( - t.context.knex, - collection - ); - const collectionCumulusId = pgCollection.cumulus_id; - - // Create random files - const pgGranules = await granulePgModel.insert( - knex, - range(10).map(() => fakeGranuleRecordFactory({ - collection_cumulus_id: collectionCumulusId, - })) - ); - const files = range(10).map((i) => ({ - bucket: dataBuckets[i % dataBuckets.length], - key: randomId('key'), - granule_cumulus_id: pgGranules[i].cumulus_id, - })); - - // Store the files to S3 and DynamoDB - await Promise.all([ - storeFilesToS3(files), - filePgModel.insert(knex, files), - ]); - - const cmrCollections = sortBy(matchingColls, ['name', 'version']) - .map((cmrCollection) => ({ - umm: { ShortName: cmrCollection.name, Version: cmrCollection.version }, - })); - - CMR.prototype.searchConcept.restore(); - const cmrSearchStub = sinon.stub(CMR.prototype, 'searchConcept'); - cmrSearchStub.withArgs('collections').onCall(0).resolves(cmrCollections); - cmrSearchStub.withArgs('collections').onCall(1).resolves([]); - cmrSearchStub.withArgs('granules').resolves([]); - - await storeCollectionsToElasticsearch(matchingColls); + const { files, matchingColls } = await generateRandomGranules(t); const event = { systemBucket: t.context.systemBucket, @@ -655,46 +594,15 @@ test.serial('Generates a valid Inventory reconciliation report when everything i }); test.serial('Generates valid reconciliation report when there are extra internal S3 objects', async (t) => { - const { filePgModel, granulePgModel, knex } = t.context; - - const collection = fakeCollectionRecordFactory(); - const [pgCollection] = await t.context.collectionPgModel.create( - t.context.knex, - collection - ); - const collectionCumulusId = pgCollection.cumulus_id; - - const dataBuckets = range(2).map(() => randomId('bucket')); - await Promise.all(dataBuckets.map((bucket) => - createBucket(bucket) - .then(() => t.context.bucketsToCleanup.push(bucket)))); - - // Write the buckets config to S3 - await storeBucketsConfigToS3( - dataBuckets, - t.context.systemBucket, - t.context.stackName - ); - - // Create files that are in sync - const pgGranules = await granulePgModel.insert( - knex, - range(10).map(() => fakeGranuleRecordFactory({ - collection_cumulus_id: collectionCumulusId, - })) - ); - const matchingFiles = range(10).map((i) => ({ - bucket: sample(dataBuckets), - key: randomId('key'), - granule_cumulus_id: pgGranules[i].cumulus_id, - })); + const { dataBuckets, files } = await generateRandomGranules(t, { + collectionRange: 1, + stubCmr: false, + }); const extraS3File1 = { bucket: sample(dataBuckets), key: randomId('key') }; const extraS3File2 = { bucket: sample(dataBuckets), key: randomId('key') }; - // Store the files to S3 and Elasticsearch - await storeFilesToS3(matchingFiles.concat([extraS3File1, extraS3File2])); - await filePgModel.insert(knex, matchingFiles); + await storeFilesToS3(files.concat([extraS3File1, extraS3File2])); const event = { systemBucket: t.context.systemBucket, @@ -709,7 +617,7 @@ test.serial('Generates valid reconciliation report when there are extra internal const filesInCumulus = report.filesInCumulus; t.is(report.status, 'SUCCESS'); t.is(report.error, undefined); - t.is(filesInCumulus.okCount, matchingFiles.length); + t.is(filesInCumulus.okCount, files.length); const granuleIds = Object.keys(filesInCumulus.okCountByGranule); granuleIds.forEach((granuleId) => { @@ -728,61 +636,25 @@ test.serial('Generates valid reconciliation report when there are extra internal t.true(createStartTime <= createEndTime); }); -test.serial('Generates valid reconciliation report when there are extra internal DynamoDB objects', async (t) => { - const { filePgModel, granulePgModel, knex } = t.context; - - const dataBuckets = range(2).map(() => randomString()); - await Promise.all(dataBuckets.map((bucket) => - createBucket(bucket) - .then(() => t.context.bucketsToCleanup.push(bucket)))); - - // Write the buckets config to S3 - await storeBucketsConfigToS3( - dataBuckets, - t.context.systemBucket, - t.context.stackName - ); - - const collection = fakeCollectionRecordFactory(); - const [pgCollection] = await t.context.collectionPgModel.create( - t.context.knex, - collection - ); - const collectionCumulusId = pgCollection.cumulus_id; - - // Create files that are in sync - const granules = range(12).map(() => fakeGranuleRecordFactory({ - collection_cumulus_id: collectionCumulusId, - })); - const pgGranules = await granulePgModel.insert( - knex, - granules - ); - const matchingFiles = range(10).map((i) => ({ - bucket: sample(dataBuckets), - key: randomId('key'), - granule_cumulus_id: pgGranules[i].cumulus_id, - })); +test.serial('Generates valid reconciliation report when there are extra internal Postgres objects', async (t) => { + const { granules, files, dataBuckets } = await generateRandomGranules(t, { + collectionRange: 1, + granuleRange: 12, + }); + const [extraFileGranule1, extraFileGranule2] = granules.slice(10, 12); const extraDbFile1 = { bucket: sample(dataBuckets), key: randomString(), - granule_cumulus_id: pgGranules[10].cumulus_id, - granule_id: granules[10].granule_id, + granule_cumulus_id: extraFileGranule1.cumulus_id, }; const extraDbFile2 = { bucket: sample(dataBuckets), key: randomString(), - granule_cumulus_id: pgGranules[11].cumulus_id, - granule_id: granules[11].granule_id, + granule_cumulus_id: extraFileGranule2.cumulus_id, }; - // Store the files to S3 and DynamoDB - await storeFilesToS3(matchingFiles); - await filePgModel.insert(knex, matchingFiles.concat([ - omit(extraDbFile1, 'granule_id'), - omit(extraDbFile2, 'granule_id'), - ])); + await t.context.filePgModel.insert(t.context.knex, [extraDbFile1, extraDbFile2]); const event = { systemBucket: t.context.systemBucket, @@ -797,7 +669,7 @@ test.serial('Generates valid reconciliation report when there are extra internal const filesInCumulus = report.filesInCumulus; t.is(report.status, 'SUCCESS'); t.is(report.error, undefined); - t.is(filesInCumulus.okCount, matchingFiles.length); + t.is(filesInCumulus.okCount, files.length); t.is(filesInCumulus.onlyInS3.length, 0); const totalOkCount = Object.values(filesInCumulus.okCountByGranule).reduce( @@ -808,17 +680,17 @@ test.serial('Generates valid reconciliation report when there are extra internal t.is(filesInCumulus.onlyInDb.length, 2); t.truthy(filesInCumulus.onlyInDb.find((f) => f.uri === buildS3Uri(extraDbFile1.bucket, extraDbFile1.key) - && f.granuleId === extraDbFile1.granule_id)); + && f.granuleId === extraFileGranule1.granule_id)); t.truthy(filesInCumulus.onlyInDb.find((f) => f.uri === buildS3Uri(extraDbFile2.bucket, extraDbFile2.key) - && f.granuleId === extraDbFile2.granule_id)); + && f.granuleId === extraFileGranule2.granule_id)); const createStartTime = moment(report.createStartTime); const createEndTime = moment(report.createEndTime); t.true(createStartTime <= createEndTime); }); -test.serial('Generates valid reconciliation report when internally, there are both extra DynamoDB and extra S3 files', async (t) => { +test.serial('Generates valid reconciliation report when internally, there are both extra postgres and extra S3 files', async (t) => { const { filePgModel, granulePgModel, knex } = t.context; const collection = fakeCollectionRecordFactory(); @@ -870,7 +742,7 @@ test.serial('Generates valid reconciliation report when internally, there are bo granule_id: granules[11].granule_id, }; - // Store the files to S3 and DynamoDB + // Store the files to S3 and postgres await storeFilesToS3(matchingFiles.concat([extraS3File1, extraS3File2])); await filePgModel.insert(knex, matchingFiles.concat([ omit(extraDbFile1, 'granule_id'), @@ -914,13 +786,13 @@ test.serial('Generates valid reconciliation report when internally, there are bo t.true(createStartTime <= createEndTime); }); -test.serial('Generates valid reconciliation report when there are both extra ES and CMR collections', async (t) => { +test.serial('Generates valid reconciliation report when there are both extra postGres and CMR collections', async (t) => { const params = { numMatchingCollectionsOutOfRange: 0, numExtraESCollectionsOutOfRange: 0, }; - const setupVars = await setupElasticAndCMRForTests({ t, params }); + const setupVars = await setupDatabaseAndCMRForTests({ t, params }); const event = { systemBucket: t.context.systemBucket, @@ -953,9 +825,9 @@ test.serial('Generates valid reconciliation report when there are both extra ES }); test.serial( - 'With input time params, generates a valid filtered reconciliation report, when there are extra cumulus/ES and CMR collections', + 'With input time params, generates a valid filtered reconciliation report, when there are extra cumulus database and CMR collections', async (t) => { - const { startTimestamp, endTimestamp, ...setupVars } = await setupElasticAndCMRForTests({ t }); + const { startTimestamp, endTimestamp, ...setupVars } = await setupDatabaseAndCMRForTests({ t }); const event = { systemBucket: t.context.systemBucket, @@ -1001,7 +873,7 @@ test.serial( ); test.serial( - 'With location param as S3, generates a valid reconciliation report for only S3 and DynamoDB', + 'With location param as S3, generates a valid reconciliation report for only S3 and postgres', async (t) => { const { filePgModel, granulePgModel, knex } = t.context; @@ -1082,7 +954,7 @@ test.serial( numExtraESCollectionsOutOfRange: 0, }; - const setupVars = await setupElasticAndCMRForTests({ t, params }); + const setupVars = await setupDatabaseAndCMRForTests({ t, params }); const event = { systemBucket: t.context.systemBucket, @@ -1113,9 +985,9 @@ test.serial( ); test.serial( - 'Generates valid reconciliation report without time params and there are extra cumulus/ES and CMR collections', + 'Generates valid reconciliation report without time params and there are extra cumulus DB and CMR collections', async (t) => { - const setupVars = await setupElasticAndCMRForTests({ t }); + const setupVars = await setupDatabaseAndCMRForTests({ t }); const eventNoTimeStamps = { systemBucket: t.context.systemBucket, @@ -1136,7 +1008,7 @@ test.serial( setupVars.matchingCollections.length + setupVars.matchingCollectionsOutsideRange.length ); - // all extra ES collections are found + // all extra DB collections are found t.is( collectionsInCumulusCmr.onlyInCumulus.length, setupVars.extraESCollections.length + setupVars.extraESCollectionsOutOfRange.length @@ -1160,9 +1032,9 @@ test.serial( ); test.serial( - 'Generates valid ONE WAY reconciliation report with time params and filters by collectionIds when there are extra cumulus/ES and CMR collections', + 'Generates valid ONE WAY reconciliation report with time params and filters by collectionIds when there are extra cumulus DB and CMR collections', async (t) => { - const { startTimestamp, endTimestamp, ...setupVars } = await setupElasticAndCMRForTests({ t }); + const { startTimestamp, endTimestamp, ...setupVars } = await setupDatabaseAndCMRForTests({ t }); const testCollection = [ setupVars.matchingCollections[3], @@ -1215,7 +1087,7 @@ test.serial( test.serial( 'When a collectionId is in both CMR and Cumulus a valid bi-directional reconciliation report is created.', async (t) => { - const setupVars = await setupElasticAndCMRForTests({ t }); + const setupVars = await setupDatabaseAndCMRForTests({ t }); const testCollection = setupVars.matchingCollections[3]; console.log(`testCollection: ${JSON.stringify(testCollection)}`); @@ -1245,7 +1117,7 @@ test.serial( test.serial( 'When an array of collectionId exists only in CMR, creates a valid bi-directional reconciliation report.', async (t) => { - const setupVars = await setupElasticAndCMRForTests({ t }); + const setupVars = await setupDatabaseAndCMRForTests({ t }); const testCollection = [ setupVars.extraCmrCollections[3], @@ -1283,7 +1155,7 @@ test.serial( test.serial( 'When a filtered collectionId exists only in Cumulus, generates a valid bi-directional reconciliation report.', async (t) => { - const setupVars = await setupElasticAndCMRForTests({ t }); + const setupVars = await setupDatabaseAndCMRForTests({ t }); const testCollection = setupVars.extraESCollections[3]; console.log(`testCollection: ${JSON.stringify(testCollection)}`); @@ -1319,7 +1191,7 @@ test.serial( test.serial( 'Generates valid ONE WAY reconciliation report with time params and filters by granuleIds when there are extra cumulus/ES and CMR collections', async (t) => { - const { startTimestamp, endTimestamp, ...setupVars } = await setupElasticAndCMRForTests({ t }); + const { startTimestamp, endTimestamp, ...setupVars } = await setupDatabaseAndCMRForTests({ t }); const testCollection = [ setupVars.matchingCollections[3], @@ -1329,7 +1201,11 @@ test.serial( ]; const testCollectionIds = testCollection.map((c) => constructCollectionId(c.name, c.version)); - const testGranuleIds = await granuleIdsFromCollectionIds(testCollectionIds); + + //set testGranuleIds to be all setupVars.collectionGranules that are in testCollectionIds + const testGranuleIds = setupVars.collectionGranules + .filter((g) => testCollectionIds.includes(g.collectionId)) + .map((g) => g.granule_id); console.log(`granuleIds: ${JSON.stringify(testGranuleIds)}`); @@ -1348,14 +1224,12 @@ test.serial( const collectionsInCumulusCmr = report.collectionsInCumulusCmr; t.is(report.status, 'SUCCESS'); t.is(report.error, undefined); - t.is(collectionsInCumulusCmr.okCount, 1); // cumulus filters collections by granuleId and only returned test one t.is(collectionsInCumulusCmr.onlyInCumulus.length, 1); t.true(collectionsInCumulusCmr.onlyInCumulus.includes(testCollectionIds[2])); - // ONE WAY only comparison because of input timestampes t.is(collectionsInCumulusCmr.onlyInCmr.length, 0); const reportStartTime = report.reportStartTime; @@ -1374,7 +1248,7 @@ test.serial( test.serial( 'When an array of granuleId exists, creates a valid one-way reconciliation report.', async (t) => { - const setupVars = await setupElasticAndCMRForTests({ t }); + const setupVars = await setupDatabaseAndCMRForTests({ t }); const testCollection = [ setupVars.extraCmrCollections[3], @@ -1383,7 +1257,9 @@ test.serial( ]; const testCollectionIds = testCollection.map((c) => constructCollectionId(c.name, c.version)); - const testGranuleIds = await granuleIdsFromCollectionIds(testCollectionIds); + const testGranuleIds = setupVars.collectionGranules + .filter((g) => testCollectionIds.includes(g.collectionId)) + .map((g) => g.granule_id); console.log(`testGranuleIds: ${JSON.stringify(testGranuleIds)}`); @@ -1397,11 +1273,11 @@ test.serial( t.is(reportRecord.status, 'Generated'); const report = await fetchCompletedReport(reportRecord); - const collectionsInCumulusCmr = report.collectionsInCumulusCmr; t.is(report.status, 'SUCCESS'); t.is(report.error, undefined); // Filtered by input granuleIds + const collectionsInCumulusCmr = report.collectionsInCumulusCmr; t.is(collectionsInCumulusCmr.okCount, 1); t.is(collectionsInCumulusCmr.onlyInCumulus.length, 1); t.true(collectionsInCumulusCmr.onlyInCumulus.includes(testCollectionIds[2])); @@ -1416,7 +1292,8 @@ test.serial( test.serial( 'When an array of providers exists, creates a valid one-way reconciliation report.', async (t) => { - const setupVars = await setupElasticAndCMRForTests({ t }); + const setupVars = await setupDatabaseAndCMRForTests({ t }); + // TODO: collections work! Failures should be granules now. const testCollection = [ setupVars.extraCmrCollections[3], @@ -1425,7 +1302,9 @@ test.serial( ]; const testCollectionIds = testCollection.map((c) => constructCollectionId(c.name, c.version)); - const testProviders = await providersFromCollectionIds(testCollectionIds); + const testProviders = compact(testCollection.map( + (c) => setupVars.mappedProviders[constructCollectionId(c.name, c.version)] + )); const event = { systemBucket: t.context.systemBucket, @@ -1447,7 +1326,6 @@ test.serial( t.is(collectionsInCumulusCmr.okCount, 1); t.is(collectionsInCumulusCmr.onlyInCumulus.length, 1); t.true(collectionsInCumulusCmr.onlyInCumulus.includes(testCollectionIds[2])); - t.is(granulesInCumulusCmr.okCount, 0); t.is(granulesInCumulusCmr.onlyInCumulus.length, 1); @@ -1460,11 +1338,23 @@ test.serial( } ); -test.serial('reconciliationReportForGranules reports discrepancy of granule holdings in CUMULUS and CMR', async (t) => { +// TODO - this test feels *wholly* inadaquate are we relying on spec tests? +// TODO - add test for *multiple* collections, etc. // SPEC TESTS? +test.serial('reconciliationReportForGranules reports discrepancy of granule holdings in CUMULUS and CMR for a single collection', async (t) => { + // TODO - common methods? const shortName = randomString(); const version = randomString(); const collectionId = constructCollectionId(shortName, version); + const postgresCollectionRecord = fakeCollectionRecordFactory({ + name: shortName, + version, + }); + await t.context.collectionPgModel.create( + t.context.knex, + postgresCollectionRecord + ); + // create granules that are in sync const matchingGrans = range(10).map(() => fakeGranuleFactoryV2({ collectionId: collectionId, status: 'completed', files: [] })); @@ -1490,13 +1380,24 @@ test.serial('reconciliationReportForGranules reports discrepancy of granule hold cmrSearchStub.withArgs('granules').onCall(0).resolves(cmrGranules); cmrSearchStub.withArgs('granules').onCall(1).resolves([]); - await storeGranulesToElasticsearch(matchingGrans.concat(extraDbGrans)); + await Promise.all( + matchingGrans + .concat(extraDbGrans) + .map(async (granule) => { + const pgGranule = await translateApiGranuleToPostgresGranule({ + dynamoRecord: granule, + knexOrTransaction: t.context.knex, + }); + return await t.context.granulePgModel.create(t.context.knex, pgGranule); + }) + ); const { granulesReport, filesReport } = await reconciliationReportForGranules({ collectionId, bucketsConfig: new BucketsConfig({}), distributionBucketMap: {}, recReportParams: {}, + knex: t.context.knex, }); t.is(granulesReport.okCount, 10); @@ -1897,12 +1798,6 @@ test.serial('Creates a valid Granule Inventory report', async (t) => { collection ); const collectionCumulusId = pgCollection.cumulus_id; - await indexer.indexCollection( - esClient, - translatePostgresCollectionToApiCollection(pgCollection), - esAlias - ); - const matchingGrans = range(10).map(() => fakeGranuleRecordFactory({ collection_cumulus_id: collectionCumulusId, })); @@ -2075,7 +1970,7 @@ test.serial('Inventory reconciliation report JSON is formatted', async (t) => { cmrSearchStub.withArgs('collections').onCall(1).resolves([]); cmrSearchStub.withArgs('granules').resolves([]); - await storeCollectionsToElasticsearch(matchingColls); + await storeCollectionsWithGranuleToPostgres(matchingColls, t.context); const eventFormatted = { systemBucket: t.context.systemBucket, diff --git a/packages/api/tests/lambdas/test-granule-inventory-report.js b/packages/api/tests/lambdas/test-granule-inventory-report.js index 074bf81f33c..6830310c23b 100644 --- a/packages/api/tests/lambdas/test-granule-inventory-report.js +++ b/packages/api/tests/lambdas/test-granule-inventory-report.js @@ -87,7 +87,7 @@ test.serial('Writes a file containing all granules to S3.', async (t) => { const reportKey = `${t.context.stackName}/reconciliation-reports/${reportRecordName}.csv`; const systemBucket = t.context.systemBucket; const reportParams = { - ...normalizeEvent({ reportType: 'Granule Inventory' }), + ...normalizeEvent({ reportType: 'Granule Inventory', stackName: 'TestStack' }), reportKey, systemBucket, knex: t.context.knex, @@ -165,6 +165,7 @@ test.serial('Writes a file containing a filtered set of granules to S3.', async collectionId, status, granuleId: 'test', + stackName: 'testStack', }), reportKey, systemBucket, diff --git a/packages/api/tests/lambdas/test-internal-reconciliation-report.js b/packages/api/tests/lambdas/test-internal-reconciliation-report.js index c9496fd4596..00d90ba8d66 100644 --- a/packages/api/tests/lambdas/test-internal-reconciliation-report.js +++ b/packages/api/tests/lambdas/test-internal-reconciliation-report.js @@ -142,7 +142,7 @@ test.serial('internalRecReportForCollections reports discrepancy of collection h startTimestamp: moment.utc().subtract(1, 'hour').format(), endTimestamp: moment.utc().add(1, 'hour').format(), }; - report = await internalRecReportForCollections(normalizeEvent(searchParams)); + report = await internalRecReportForCollections(normalizeEvent({ ...searchParams, stackName: 'testStack' })); t.is(report.okCount, 10); t.is(report.onlyInEs.length, 2); t.is(report.onlyInDb.length, 2); @@ -154,7 +154,7 @@ test.serial('internalRecReportForCollections reports discrepancy of collection h endTimestamp: moment.utc().add(2, 'hour').format(), }; - report = await internalRecReportForCollections(normalizeEvent(paramsTimeOutOfRange)); + report = await internalRecReportForCollections(normalizeEvent({ ...paramsTimeOutOfRange, stackName: 'testStack' })); t.is(report.okCount, 0); t.is(report.onlyInEs.length, 0); t.is(report.onlyInDb.length, 0); @@ -164,7 +164,7 @@ test.serial('internalRecReportForCollections reports discrepancy of collection h const collectionId = constructCollectionId(conflictCollInDb.name, conflictCollInDb.version); const paramsCollectionId = { ...searchParams, collectionId: [collectionId, randomId('c')] }; - report = await internalRecReportForCollections(normalizeEvent(paramsCollectionId)); + report = await internalRecReportForCollections(normalizeEvent({ ...paramsCollectionId, stackName: 'testStack' })); t.is(report.okCount, 0); t.is(report.onlyInEs.length, 0); t.is(report.onlyInDb.length, 0); @@ -269,7 +269,7 @@ test.serial('internalRecReportForGranules reports discrepancy of granule holding endTimestamp: moment.utc().add(1, 'hour').format(), }; report = await internalRecReportForGranules({ - ...normalizeEvent(searchParams), + ...normalizeEvent({ ...searchParams, stackName: 'testStack' }), knex, }); t.is(report.okCount, 20); @@ -284,7 +284,7 @@ test.serial('internalRecReportForGranules reports discrepancy of granule holding }; report = await internalRecReportForGranules({ - ...normalizeEvent(outOfRangeParams), + ...normalizeEvent({ ...outOfRangeParams, stackName: 'testStack' }), knex, }); t.is(report.okCount, 0); @@ -295,7 +295,7 @@ test.serial('internalRecReportForGranules reports discrepancy of granule holding // collectionId, provider parameters const collectionProviderParams = { ...searchParams, collectionId, provider: provider.name }; report = await internalRecReportForGranules({ - ...normalizeEvent(collectionProviderParams), + ...normalizeEvent({ ...collectionProviderParams, stackName: 'testStack' }), knex, }); t.is(report.okCount, 10); @@ -310,7 +310,7 @@ test.serial('internalRecReportForGranules reports discrepancy of granule holding // provider parameter const providerParams = { ...searchParams, provider: [randomId('p'), provider.name] }; report = await internalRecReportForGranules({ - ...normalizeEvent(providerParams), + ...normalizeEvent({ ...providerParams, stackName: 'testStack' }), knex, }); t.is(report.okCount, 20); @@ -330,7 +330,7 @@ test.serial('internalRecReportForGranules reports discrepancy of granule holding collectionId: [collectionId, extraEsGrans[0].collectionId, extraEsGrans[1].collectionId], }; report = await internalRecReportForGranules({ - ...normalizeEvent(granuleIdParams), + ...normalizeEvent({ ...granuleIdParams, stackName: 'testStack' }), knex, }); t.is(report.okCount, 0); diff --git a/packages/api/tests/lib/reconciliationReport/test-normalizeEvent.js b/packages/api/tests/lib/reconciliationReport/test-normalizeEvent.js index c71989788f9..308b2d8dd94 100644 --- a/packages/api/tests/lib/reconciliationReport/test-normalizeEvent.js +++ b/packages/api/tests/lib/reconciliationReport/test-normalizeEvent.js @@ -209,7 +209,7 @@ test('normalizeEvent throws error if provider and granuleId are passed to non-In test('Invalid report type throws InvalidArgument error', (t) => { const reportType = randomId('badType'); - const inputEvent = { reportType }; + const inputEvent = { reportType, systemBucket: 'systemBucket', stackName: 'stackName' }; t.throws(() => normalizeEvent(inputEvent), { instanceOf: InvalidArgument, @@ -220,6 +220,32 @@ test('Invalid report type throws InvalidArgument error', (t) => { test('valid Reports types from reconciliation schema do not throw an error.', (t) => { const validReportTypes = reconciliationReport.properties.type.enum; validReportTypes.forEach((reportType) => { - t.notThrows(() => normalizeEvent({ reportType })); + t.notThrows(() => normalizeEvent({ reportType, systemBucket: 'systemBucket', stackName: 'stackName' })); + }); +}); + +test('normalizeEvent throws error if no systemBucket is provided', (t) => { + const inputEvent = { + endTimestamp: new Date().toISOString(), + reportType: 'Inventory', + stackName: 'stackName', + startTimestamp: new Date().toISOString(), + }; + t.throws(() => normalizeEvent(inputEvent), { + instanceOf: InvalidArgument, + message: 'systemBucket is required.', + }); +}); + +test('normalizeEvent throws error if no stackName is provided', (t) => { + const inputEvent = { + endTimestamp: new Date().toISOString(), + reportType: 'Inventory', + startTimestamp: new Date().toISOString(), + systemBucket: 'systemBucket', + }; + t.throws(() => normalizeEvent(inputEvent), { + instanceOf: InvalidArgument, + message: 'stackName is required.', }); }); diff --git a/packages/cmr-client/src/CMR.ts b/packages/cmr-client/src/CMR.ts index b9a04b1bb4a..b8096f6efdd 100644 --- a/packages/cmr-client/src/CMR.ts +++ b/packages/cmr-client/src/CMR.ts @@ -41,7 +41,7 @@ export interface CMRConstructorParams { passwordSecretName?: string provider: string, token?: string, - username: string, + username?: string, oauthProvider: string, } @@ -67,11 +67,13 @@ export interface CMRConstructorParams { * clientId: 'my-clientId', * token: 'cmr_or_launchpad_token' * }); + * TODO: this should be subclassed or refactored to a functional style + * due to branch logic/complexity in token vs password/username handling */ export class CMR { clientId: string; provider: string; - username: string; + username?: string; oauthProvider: string; password?: string; passwordSecretName?: string; @@ -79,17 +81,6 @@ export class CMR { /** * The constructor for the CMR class - * - * @param {Object} params - * @param {string} params.provider - the CMR provider id - * @param {string} params.clientId - the CMR clientId - * @param {string} params.username - CMR username, not used if token is provided - * @param {string} params.passwordSecretName - CMR password secret, not used if token is provided - * @param {string} params.password - CMR password, not used if token or - * passwordSecretName is provided - * @param {string} params.token - CMR or Launchpad token, - * if not provided, CMR username and password are used to get a cmr token - * @param {string} params.oauthProvider - Oauth provider: earthdata or launchpad */ constructor(params: CMRConstructorParams) { this.clientId = params.clientId; @@ -131,6 +122,12 @@ export class CMR { * @returns {Promise.} the token */ async getToken(): Promise { + if (this.oauthProvider === 'launchpad') { + return this.token; + } + if (!this.username) { + throw new Error('Username not specified for non-launchpad CMR client'); + } return this.token ? this.token : updateToken(this.username, await this.getCmrPassword()); diff --git a/packages/cmr-client/src/CMRSearchConceptQueue.ts b/packages/cmr-client/src/CMRSearchConceptQueue.ts index ea0b142bd19..5d1a352399b 100644 --- a/packages/cmr-client/src/CMRSearchConceptQueue.ts +++ b/packages/cmr-client/src/CMRSearchConceptQueue.ts @@ -2,18 +2,12 @@ import { CMR, CMRConstructorParams } from './CMR'; /** * Shim to correctly add a default provider_short_name to the input searchParams - * - * @param {Object} params - * @param {URLSearchParams} params.searchParams - input search - * parameters for searchConceptQueue. This parameter can be either a - * URLSearchParam object or a plain Object. - * @returns {URLSearchParams} - input object appeneded with a default provider_short_name */ export const providerParams = ({ searchParams = new URLSearchParams(), cmrSettings, }: { - searchParams: URLSearchParams, + searchParams?: URLSearchParams, cmrSettings: { provider: string } @@ -28,7 +22,7 @@ export const providerParams = ({ export interface CMRSearchConceptQueueConstructorParams { cmrSettings: CMRConstructorParams, type: string, - searchParams: URLSearchParams, + searchParams?: URLSearchParams, format?: string } @@ -49,18 +43,18 @@ export interface CMRSearchConceptQueueConstructorParams { * format: 'json' * }); */ -export class CMRSearchConceptQueue { +export class CMRSearchConceptQueue { type: string; params: URLSearchParams; format?: string; - items: unknown[]; + items: (T | null)[]; CMR: CMR; /** * The constructor for the CMRSearchConceptQueue class * * @param {Object} params - * @param {string} params.cmrSettings - the CMR settings for the requests - the provider, + * @param {Object} params.cmrSettings - the CMR settings for the requests - the provider, * clientId, and either launchpad token or EDL username and password * @param {string} params.type - the type of search 'granule' or 'collection' * @param {URLSearchParams} [params.searchParams={}] - the search parameters @@ -84,10 +78,12 @@ export class CMRSearchConceptQueue { * This does not remove the object from the queue. When there are no more * items in the queue, returns 'null'. * - * @returns {Promise} an item from the CMR search */ - async peek(): Promise { + async peek(): Promise { if (this.items.length === 0) await this.fetchItems(); + if (this.items[0] === null) { + return null as unknown as T; + } return this.items[0]; } @@ -95,12 +91,15 @@ export class CMRSearchConceptQueue { * Remove the next item from the queue * * When there are no more items in the queue, returns `null`. - * - * @returns {Promise} an item from the CMR search */ - async shift(): Promise { + async shift(): Promise { if (this.items.length === 0) await this.fetchItems(); - return this.items.shift(); + const item = this.items.shift(); + // eslint-disable-next-line lodash/prefer-is-nil + if (item === null || item === undefined) { + return null as unknown as T; + } + return item; } /** @@ -116,7 +115,7 @@ export class CMRSearchConceptQueue { this.format, false ); - this.items = results; + this.items = results as T[]; const paramsPageNum = this.params.get('page_num') ?? '0'; this.params.set('page_num', String(Number(paramsPageNum) + 1)); diff --git a/packages/cmr-client/tests/test-CMR.js b/packages/cmr-client/tests/test-CMR.js index ef1641c657b..31912efe169 100644 --- a/packages/cmr-client/tests/test-CMR.js +++ b/packages/cmr-client/tests/test-CMR.js @@ -166,7 +166,7 @@ test('getReadHeaders returns clientId and token for launchpad', (t) => { }); test.serial('ingestUMMGranule() returns CMRInternalError when CMR is down', async (t) => { - const cmrSearch = new CMR({ provider: 'my-provider', token: 'abc', clientId: 'client' }); + const cmrSearch = new CMR({ oauthProvider: 'launchpad', token: 'abc', clientId: 'client' }); const ummgMetadata = { GranuleUR: 'asdf' }; @@ -192,7 +192,7 @@ test.serial('ingestUMMGranule() returns CMRInternalError when CMR is down', asyn }); test.serial('ingestUMMGranule() throws an exception if the input fails validation', async (t) => { - const cmrSearch = new CMR({ provider: 'my-provider', token: 'abc', clientId: 'client' }); + const cmrSearch = new CMR({ oauthProvider: 'launchpad', token: 'abc', clientId: 'client' }); const ummgMetadata = { GranuleUR: 'asdf' }; diff --git a/packages/cmrjs/src/cmr-utils.js b/packages/cmrjs/src/cmr-utils.js index 59fa5ff81b6..a786c51f899 100644 --- a/packages/cmrjs/src/cmr-utils.js +++ b/packages/cmrjs/src/cmr-utils.js @@ -448,6 +448,14 @@ function generateFileUrl({ return undefined; } +/** + * @typedef {Object} OnlineAccessUrl + * @property {string} URL - The generated file URL. + * @property {string} URLDescription - The description of the URL (used by ECHO10). + * @property {string} Description - The description of the URL (used by UMMG). + * @property {string} Type - The type of the URL (used by ECHO10/UMMG). + */ + /** * Construct online access url for a given file and a url type. * @@ -458,8 +466,8 @@ function generateFileUrl({ * @param {Object} params.urlType - url type, distribution or s3 * @param {distributionBucketMap} params.distributionBucketMap - Object with bucket:tea-path mapping * for all distribution bucketss - * @param {boolean} params.useDirectS3Type - indicate if direct s3 access type is used - * @returns {(Object | undefined)} online access url object, undefined if no URL exists + * @param {boolean} [params.useDirectS3Type] - indicate if direct s3 access type is used + * @returns {(OnlineAccessUrl | undefined)} online access url object, undefined if no URL exists */ function constructOnlineAccessUrl({ file, @@ -741,15 +749,17 @@ async function updateUMMGMetadata({ * Helper to build an CMR settings object, used to initialize CMR. * * @param {Object} cmrConfig - CMR configuration object - * @param {string} cmrConfig.oauthProvider - Oauth provider: launchpad or earthdata - * @param {string} cmrConfig.provider - the CMR provider - * @param {string} cmrConfig.clientId - Client id for CMR requests - * @param {string} cmrConfig.passphraseSecretName - Launchpad passphrase secret name - * @param {string} cmrConfig.api - Launchpad api - * @param {string} cmrConfig.certificate - Launchpad certificate - * @param {string} cmrConfig.username - EDL username - * @param {string} cmrConfig.passwordSecretName - CMR password secret name - * @returns {Promise} object to create CMR instance - contains the + * @param {string} [cmrConfig.oauthProvider] - Oauth provider: launchpad or earthdata + * @param {string} [cmrConfig.provider] - the CMR provider + * @param {string} [cmrConfig.clientId] - Client id for CMR requests + * @param {string} [cmrConfig.passphraseSecretName] - Launchpad passphrase secret name + * @param {string} [cmrConfig.api] - Launchpad api + * @param {string} [cmrConfig.certificate] - Launchpad certificate + * @param {string} [cmrConfig.username] - EDL username + * @param {string} [cmrConfig.passwordSecretName] - CMR password secret name + * @returns {Promise} + * object to + * create CMR instance - contains the * provider, clientId, and either launchpad token or EDL username and * password */ diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index 97e693c3466..92a8f364cf2 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -114,6 +114,7 @@ export { export { getCollectionsByGranuleIds, + getUniqueCollectionsByGranuleFilter, } from './lib/collection'; export { diff --git a/packages/db/src/lib/QuerySearchClient.ts b/packages/db/src/lib/QuerySearchClient.ts index acb890e144d..470c7b30e9c 100644 --- a/packages/db/src/lib/QuerySearchClient.ts +++ b/packages/db/src/lib/QuerySearchClient.ts @@ -43,7 +43,6 @@ class QuerySearchClient { * * This does not remove the object from the queue. * - * @returns {Promise} - record from PostgreSQL table */ async peek() { if (this.records.length === 0) await this.fetchRecords(); @@ -53,7 +52,6 @@ class QuerySearchClient { /** * Remove and return the next item in the results * - * @returns {Promise} - record from PostgreSQL table */ async shift() { if (this.records.length === 0) await this.fetchRecords(); diff --git a/packages/db/src/lib/collection.ts b/packages/db/src/lib/collection.ts index f8b1db297f8..d43c1ab269d 100644 --- a/packages/db/src/lib/collection.ts +++ b/packages/db/src/lib/collection.ts @@ -1,6 +1,8 @@ import { Knex } from 'knex'; import Logger from '@cumulus/logger'; +import { deconstructCollectionId } from '@cumulus/message/Collections'; + import { RetryOnDbConnectionTerminateError } from './retry'; import { TableNames } from '../tables'; @@ -27,3 +29,56 @@ export const getCollectionsByGranuleIds = async ( .groupBy(`${collectionsTable}.cumulus_id`); return await RetryOnDbConnectionTerminateError(query, {}, log); }; + +// TODO - This function is going to be super-non-performant +// We need to identify the specific need here and see if we can optimize + +export const getUniqueCollectionsByGranuleFilter = async (params: { + startTimestamp?: string, + endTimestamp?: string, + collectionIds?: string[], + granuleIds?: string[], + providers?: string[], + knex: Knex, +}) => { + const { knex } = params; + const collectionsTable = TableNames.collections; + const granulesTable = TableNames.granules; + const providersTable = TableNames.providers; + + const query = knex(collectionsTable) + .distinct(`${collectionsTable}.*`) + .innerJoin(granulesTable, `${collectionsTable}.cumulus_id`, `${granulesTable}.collection_cumulus_id`); + + if (params.startTimestamp) { + query.where(`${granulesTable}.updated_at`, '>=', params.startTimestamp); + } + if (params.endTimestamp) { + query.where(`${granulesTable}.updated_at`, '<=', params.endTimestamp); + } + + // Filter by collectionIds + if (params.collectionIds && params.collectionIds.length > 0) { + const collectionNameVersionPairs = params.collectionIds.map((id) => + deconstructCollectionId(id)); + + query.whereIn( + [`${collectionsTable}.name`, `${collectionsTable}.version`], + collectionNameVersionPairs.map(({ name, version }) => [name, version]) + ); + } + + // Filter by granuleIds + if (params.granuleIds && params.granuleIds.length > 0) { + query.whereIn(`${granulesTable}.granule_id`, params.granuleIds); + } + + // Filter by provider names + if (params.providers && params.providers.length > 0) { + query.innerJoin(providersTable, `${granulesTable}.provider_cumulus_id`, `${providersTable}.cumulus_id`); + query.whereIn(`${providersTable}.name`, params.providers); + } + + query.orderBy([`${collectionsTable}.name`, `${collectionsTable}.version`]); + return query; +}; diff --git a/packages/db/src/lib/granule.ts b/packages/db/src/lib/granule.ts index 05da2df9ec3..a590fc16580 100644 --- a/packages/db/src/lib/granule.ts +++ b/packages/db/src/lib/granule.ts @@ -204,19 +204,6 @@ export const getApiGranuleExecutionCumulusIds = async ( /** * Helper to build a query to search granules by various API granule record properties. - * - * @param params - * @param params.knex - DB client - * @param params.searchParams - * @param [params.searchParams.collectionIds] - Collection ID - * @param [params.searchParams.granuleIds] - array of granule IDs - * @param [params.searchParams.providerNames] - Provider names - * @param [params.searchParams.updatedAtRange] - Date range for updated_at column - * @param [params.searchParams.status] - Granule status to search by - * @param [params.sortByFields] - Field(s) to sort by - * @param params.temporalBoundByCreatedAt -- If true, temporal bounds - * are applied to created_at column instead of updated_at column - * @returns {Knex.QueryBuilder} */ export const getGranulesByApiPropertiesQuery = ({ knex, @@ -233,7 +220,7 @@ export const getGranulesByApiPropertiesQuery = ({ status?: string }, sortByFields?: string | string[], - temporalBoundByCreatedAt: boolean, + temporalBoundByCreatedAt?: boolean, }) : Knex.QueryBuilder => { const { granules: granulesTable, diff --git a/packages/db/tests/lib/test-collection.js b/packages/db/tests/lib/test-collection.js index eeed7fc7a17..928273b723c 100644 --- a/packages/db/tests/lib/test-collection.js +++ b/packages/db/tests/lib/test-collection.js @@ -5,56 +5,85 @@ const sinon = require('sinon'); const cryptoRandomString = require('crypto-random-string'); const { - destroyLocalTestDb, - generateLocalTestDb, - GranulePgModel, CollectionPgModel, + destroyLocalTestDb, fakeCollectionRecordFactory, fakeGranuleRecordFactory, + fakeProviderRecordFactory, + generateLocalTestDb, getCollectionsByGranuleIds, + getUniqueCollectionsByGranuleFilter, + GranulePgModel, migrationDir, + ProviderPgModel, } = require('../../dist'); -const testDbName = `collection_${cryptoRandomString({ length: 10 })}`; - -test.before(async (t) => { +test.beforeEach(async (t) => { + t.context.testDbName = `collection_${cryptoRandomString({ length: 10 })}`; const { knexAdmin, knex } = await generateLocalTestDb( - testDbName, + t.context.testDbName, migrationDir ); t.context.knexAdmin = knexAdmin; t.context.knex = knex; t.context.collectionPgModel = new CollectionPgModel(); + t.context.providerPgModel = new ProviderPgModel(); t.context.granulePgModel = new GranulePgModel(); -}); - -test.after.always(async (t) => { - await destroyLocalTestDb({ - ...t.context, - testDbName, - }); -}); -test('getCollectionsByGranuleIds() returns collections for given granule IDs', async (t) => { - const collection1 = fakeCollectionRecordFactory(); - const collection2 = fakeCollectionRecordFactory(); + t.context.oldTimeStamp = '1950-01-01T00:00:00Z'; + t.context.newTimeStamp = '2020-01-01T00:00:00Z'; - const pgCollections = await t.context.collectionPgModel.insert( + t.context.collections = Array.from({ length: 3 }, (_, index) => { + const name = `collection${index + 1}`; + return fakeCollectionRecordFactory({ name, version: '001' }); + }); + t.context.pgCollections = await t.context.collectionPgModel.insert( t.context.knex, - [collection1, collection2], + t.context.collections, '*' ); + t.context.providers = Array.from({ length: 2 }, (_, index) => { + const name = `provider${index + 1}`; + return fakeProviderRecordFactory({ name }); + }); + t.context.pgProviders = await t.context.providerPgModel.create( + t.context.knex, + t.context.providers + ); - const granules = [ - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[0].cumulus_id }), - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[1].cumulus_id }), + t.context.granules = [ + fakeGranuleRecordFactory({ + collection_cumulus_id: t.context.pgCollections[0].cumulus_id, + provider_cumulus_id: t.context.pgProviders[0].cumulus_id, + updated_at: t.context.oldTimeStamp, + }), + fakeGranuleRecordFactory({ + collection_cumulus_id: t.context.pgCollections[1].cumulus_id, + provider_cumulus_id: t.context.pgProviders[1].cumulus_id, + updated_at: t.context.oldTimeStamp, + }), + fakeGranuleRecordFactory({ + collection_cumulus_id: t.context.pgCollections[2].cumulus_id, + provider_cumulus_id: t.context.pgProviders[1].cumulus_id, + updated_at: t.context.newTimeStamp, + }), ]; + await t.context.granulePgModel.insert( t.context.knex, - granules + t.context.granules ); +}); +test.afterEach.always(async (t) => { + await destroyLocalTestDb({ + ...t.context, + }); +}); + +test('getCollectionsByGranuleIds() returns collections for given granule IDs', async (t) => { + const { pgCollections, granules } = t.context; const collections = await getCollectionsByGranuleIds( t.context.knex, granules.map((granule) => granule.granule_id) @@ -64,25 +93,17 @@ test('getCollectionsByGranuleIds() returns collections for given granule IDs', a }); test('getCollectionsByGranuleIds() only returns unique collections', async (t) => { - const collection1 = fakeCollectionRecordFactory(); - const collection2 = fakeCollectionRecordFactory(); - - const pgCollections = await t.context.collectionPgModel.insert( - t.context.knex, - [collection1, collection2], - '*' - ); - - const granules = [ - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[0].cumulus_id }), - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[1].cumulus_id }), - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[1].cumulus_id }), - ]; + const { pgCollections } = t.context; + const testGranule = fakeGranuleRecordFactory({ + collection_cumulus_id: pgCollections[1].cumulus_id, + }); await t.context.granulePgModel.insert( t.context.knex, - granules + [testGranule] ); + const granules = [...t.context.granules, testGranule]; + const collections = await getCollectionsByGranuleIds( t.context.knex, granules.map((granule) => granule.granule_id) @@ -92,21 +113,15 @@ test('getCollectionsByGranuleIds() only returns unique collections', async (t) = }); test.serial('getCollectionsByGranuleIds() retries on connection terminated unexpectedly error', async (t) => { - const { knex } = t.context; - const collection1 = fakeCollectionRecordFactory(); - const collection2 = fakeCollectionRecordFactory(); - - const pgCollections = await t.context.collectionPgModel.insert( - knex, - [collection1, collection2], - '*' + const { knex, pgCollections } = t.context; + const testGranule = fakeGranuleRecordFactory({ + collection_cumulus_id: pgCollections[1].cumulus_id, + }); + await t.context.granulePgModel.insert( + t.context.knex, + [testGranule] ); - - const granules = [ - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[0].cumulus_id }), - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[1].cumulus_id }), - fakeGranuleRecordFactory({ collection_cumulus_id: pgCollections[1].cumulus_id }), - ]; + const granules = [...t.context.granules, testGranule]; const knexStub = sinon.stub(knex, 'select').returns({ select: sinon.stub().returnsThis(), @@ -127,3 +142,100 @@ test.serial('getCollectionsByGranuleIds() retries on connection terminated unexp ); t.is(error.attemptNumber, 4); }); + +test('getUniqueCollectionsByGranuleFilter filters by startTimestamp', async (t) => { + const { knex } = t.context; + const params = { + startTimestamp: '2005-01-01T00:00:00Z', + knex, + }; + + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 1); +}); + +test('getUniqueCollectionsByGranuleFilter filters by endTimestamp', async (t) => { + const { knex } = t.context; + const params = { + endTimestamp: '2005-01-01T00:00:00Z', + knex, + }; + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 2); + t.is(result[0].name, 'collection1'); + t.is(result[1].name, 'collection2'); +}); + +test('getUniqueCollectionsByGranuleFilter filters by collectionIds', async (t) => { + const { knex } = t.context; + const params = { + collectionIds: ['collection1___001', 'collection2___001'], + knex, + }; + + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 2); + t.is(result[0].name, 'collection1'); + t.is(result[0].version, '001'); + t.is(result[1].name, 'collection2'); + t.is(result[1].version, '001'); +}); + +test('getUniqueCollectionsByGranuleFilter filters by granuleIds', async (t) => { + const { knex, granules } = t.context; + const params = { + granuleIds: [granules[0].granule_id], + knex, + }; + + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 1); + t.is(result[0].name, 'collection1'); + t.is(result[0].version, '001'); +}); + +test('getUniqueCollectionsByGranuleFilter filters by providers', async (t) => { + const { knex, providers } = t.context; + const params = { + providers: [providers[0].name], + knex, + }; + + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 1); + t.is(result[0].name, 'collection1'); + t.is(result[0].version, '001'); +}); + +test('getUniqueCollectionsByGranuleFilter orders collections by name', async (t) => { + const { knex } = t.context; + const params = { + knex, + }; + + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 3); + t.is(result[0].name, 'collection1'); + t.is(result[1].name, 'collection2'); + t.is(result[2].name, 'collection3'); +}); + +test('getUniqueCollectionsByGranuleFilter returns distinct collections', async (t) => { + const { knex } = t.context; + const params = { + knex, + }; + + const granule = fakeGranuleRecordFactory({ + collection_cumulus_id: t.context.pgCollections[0].cumulus_id, + provider_cumulus_id: t.context.pgProviders[0].cumulus_id, + updated_at: t.context.oldTimeStamp, + }); + await t.context.granulePgModel.insert( + t.context.knex, + [granule] + ); + + const result = await getUniqueCollectionsByGranuleFilter(params); + t.is(result.length, 3); +});