diff --git a/src/connection.js b/src/connection.js index 80cf90f..4c98ee6 100644 --- a/src/connection.js +++ b/src/connection.js @@ -8,7 +8,7 @@ const AuthProvider = require('./authprovider'); const BasicProvider = require('./basicprovider'); const OidcProvider = require('./oidcprovider'); -const Capabilities = require('./capabilities'); +const { GdcCapabilities, Migrate } = require('./gdc'); const FileTypes = require('./filetypes'); const UserFile = require('./userfile'); const Job = require('./job'); @@ -18,6 +18,7 @@ const Service = require('./service'); const Builder = require('./builder/builder'); const BuilderNode = require('./builder/node'); + const CONFORMANCE_RELS = [ 'conformance', 'http://www.opengis.net/def/rel/ogc/1.0/conformance' @@ -120,7 +121,8 @@ class Connection { } } - this.capabilitiesObject = new Capabilities(data); + Migrate.connection = this; + this.capabilitiesObject = new GdcCapabilities(data); return this.capabilitiesObject; } @@ -802,6 +804,37 @@ class Connection { return await pg.describeUserProcess(); } + isOgcProcess(process) { + let nodes = Object.values(process.process_graph); + return Boolean(nodes.find(node => { + let process = this.processes.get(node.process_id); + return Utils.isObject(process) && Boolean(process.ogcapi); + })); + } + + async executeOgcProcess(process, abortController = null) { + let openEO = this._normalizeUserProcess(process) + let p = Object.values(openEO.process.process_graph).find(v => { + let spec = this.processes.get(v.process_id); + return Boolean(spec && spec.ogcapi); + }); + let requestBody = Migrate.execute(openEO); + console.log(p.process_id, requestBody); // @todo remove + let response = await this._post(`/processes/${p.process_id}/execution`, requestBody, Environment.getResponseType(), abortController); + let syncResult = { + data: response.data, + costs: null, + type: null, + logs: [] + }; + + if (typeof response.headers['content-type'] === 'string') { + syncResult.type = response.headers['content-type']; + } + + return syncResult; + } + /** * Executes a process synchronously and returns the result as the response. * @@ -822,6 +855,9 @@ class Connection { budget: budget } ); + if (this.isOgcProcess(requestBody.process)) { + return this.executeOgcProcess(process, abortController); + } let response = await this._post('/result', requestBody, Environment.getResponseType(), abortController); let syncResult = { data: response.data, diff --git a/src/gdc.js b/src/gdc.js new file mode 100644 index 0000000..d334263 --- /dev/null +++ b/src/gdc.js @@ -0,0 +1,410 @@ +const Capabilities = require("./capabilities"); +const Utils = require('@openeo/js-commons/src/utils'); +const StacMigrate = require('@radiantearth/stac-migrate'); + +class GdcCapabilities extends Capabilities { + + constructor(data) { + super(data); + Object.assign(this.featureMap, { + describeCoverage: 'get /collections/{collection_id}/coverage', + describeCoverageDomainset: 'get /collections/{collection_id}/coverage/domainset', + describeCoverageRangetype: 'get /collections/{collection_id}/coverage/rangetype', + describeCoverageRangeset: 'get /collections/{collection_id}/coverage/rangeset', + describeCoverageMetadata: 'get /collections/{collection_id}/coverage/metadata', + executeOgcProcess: 'post /processes/{processId}/execution', + }); + this.checkConformance(); + } + + hasConformance(uri) { + if(!Array.isArray(this.data.conformsTo)) { + return false; + } + return this.data.conformsTo.includes(uri); + } + + checkConformance() { + if (!Array.isArray(this.data.endpoints)) { + this.data.endpoints = []; + } + const isCoverage = this.hasConformance('http://www.opengis.net/spec/ogcapi-coverages-1/0.0/conf/geodata-coverage'); + const isFeatures = this.hasConformance('http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/core'); + if (isCoverage || isFeatures) { + this.data.endpoints.push({ + "path": "/collections", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}", + "methods": ["GET"] + }); + } + // if (isFeatures) { + // this.data.endpoints.push({ + // "path": "/collections/{collection_id}/items", + // "methods": ["GET"] + // }); + // this.data.endpoints.push({ + // "path": "/collections/{collection_id}/items/{item_id}", + // "methods": ["GET"] + // }); + // } + if (isCoverage) { + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/domainset", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/rangetype", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/rangeset", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/metadata", + "methods": ["GET"] + }); + } + const isProcessApi = this.hasConformance('http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core'); + const processDismiss = this.hasConformance('http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/dismiss'); + const processJobList = this.hasConformance('http://www.opengis.net/spec/ogcapi-processes-1/1.0/req/job-list'); + if (isProcessApi) { + this.data.endpoints.push({ + "path": "/processes", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/processes/{processId}", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/processes/{processId}/execution", + "methods": ["POST"] + }); + let jobMethods = ["GET"]; + if (processDismiss) { // @todo Is dismiss equivalent to openEO job cancellation or deletion? + jobMethods.push("DELETE"); + } + this.data.endpoints.push({ + "path": "/jobs/{job_id}", + "methods": jobMethods + }); + this.data.endpoints.push({ + "path": "/jobs/{job_id}/results", + "methods": ["GET"] + }); + } + if (processJobList) { + this.data.endpoints.push({ + "path": "/jobs", + "methods": ["GET"] + }); + } + this.init(); + } + + /** + * Initializes the class. + * + * @protected + */ + init() { + if (Array.isArray(this.data.endpoints)) { + super.init(); + } + } + + /** + * Validates the capabilities. + * + * Throws an error in case of an issue, otherwise just passes. + * + * @protected + * @throws {Error} + */ + validate() { + if(!Utils.isObject(this.data)) { + throw new Error("No capabilities retrieved."); + } + } + + /** + * Returns the openEO API version implemented by the back-end. + * + * @returns {string} openEO API version number. + */ + apiVersion() { + return this.data.api_version; + } + + /** + * Returns the GDC API version implemented by the back-end. + * + * @returns {string} GDC API version number. + */ + gdcVersion() { + return this.data.gdc_version || "1.0.0-beta.1"; + } + + isEndpoint(response, method, endpoint) { + if (response.config.method !== method) { + return false; + } + if (endpoint.includes('{}')) { + let pattern = '^' + endpoint.replace('{}', '[^/]+') + '$'; + let regex = new RegExp(pattern); + return regex.test(response.config.url); + } + return endpoint === response.config.url; + } + + /** + * Migrates a response, if required. + * + * @param {AxiosResponse} response + * @protected + * @returns {AxiosResponse} + */ + migrate(response) { + if (this.isEndpoint(response, 'get', '/collections')) { + response.data.collections = response.data.collections.map(collection => Migrate.collection(collection, response)); + } + else if (this.isEndpoint(response, 'get', '/collections/{}')) { + response.data = Migrate.collection(response.data, response); + } + else if (this.isEndpoint(response, 'get', '/processes')) { + response.data.processes = response.data.processes.map(process => Migrate.process(process, response)); + } + else if (this.isEndpoint(response, 'get', '/jobs')) { + response.data.jobs = response.data.jobs.map(job => Migrate.job(job, response)); + } + else if (this.isEndpoint(response, 'get', '/jobs/{}')) { + response.data = Migrate.job(response.data, response); + } + + response = Migrate.all(response); + + return response; + } +} + +const JobStatusMap = { + accepted: 'created', + running: 'running', + successful: 'finished', + failed: 'error', + dismissed: 'canceled' +}; + +const Migrate = { + + connection: null, + + all(response) { + if (Array.isArray(response.data.links)) { + response.data.links = this.connection.makeLinksAbsolute(response.data.links, response); + } + return response; + }, + + collection(collection, response) { + if (collection.stac_version) { + return collection; + } + + // Make sure the required properties are present + collection = StacMigrate.collection(collection); + collection.ogcapi = true; + // Make links absolute + if (Array.isArray(collection.links)) { + collection.links = this.connection.makeLinksAbsolute(collection.links, response); + } + + return collection; + }, + + process(process, response) { + if (process.parameters || process.returns) { + return process; + } + + process.ogcapi = true; + process.summary = process.title; + + process.parameters = []; + for(let name in process.inputs) { + let input = process.inputs[name]; + process.parameters.push({ + name, + description: [input.title, input.description].filter(v => Boolean(v)).join("\n\n"), + schema: input.schema, + optional: typeof input.schema.default !== 'undefined' + }); + } + + let addOutputParam = (p, name, output) => { + output = Object.assign({}, output); + if (Array.isArray(output.schema.oneOf) && output.schema.oneOf.every(s => s.type === 'string' && Boolean(s.contentMediaType))) { + output.schema = { + type: 'string', + enum: output.schema.oneOf.map(s => s.contentMediaType) + }; + } + p.parameters.push(Object.assign({name: `output:${name}`}, output)); + }; + + if (Utils.size(process.outputs) === 1) { + let [name, output] = Object.entries(process.outputs)[0]; + process.returns = { + description: [output.title, output.description].filter(v => Boolean(v)).join("\n\n"), + schema: output.schema + }; + // @todo workaround for now + addOutputParam(process, name, output); + } + else { + process.returns = { + description: 'see process description', + schema: [] + }; + for(let name in process.outputs) { + let output = process.outputs[name]; + let schema = Object.assign({}, output.schema, {title: output.title, description: output.description}); + process.returns.schema.push(schema); + // @todo workaround for now + addOutputParam(process, name, output); + } + } + + // Make links absolute + if (Array.isArray(process.links)) { + process.links = this.connection.makeLinksAbsolute(process.links, response); + } + + return process; + }, + + job(job, response) { + if (job.type !== 'process') { + return job; + } + + job.ogcapi = true; + job.id = job.jobID; + job.process = { + process_graph: { + [job.processID]: { + process_id: job.processID, + arguments: {} + } + } + }; + job.status = JobStatusMap[job.status]; + job.created = job.started; + job.updated = job.finished; + job.description = job.message; + + if (Array.isArray(job.links)) { + job.links = this.connection.makeLinksAbsolute(job.links, response); + } + + return job; + }, + + execute(requestBody) { + let graph = Object.entries(requestBody.process.process_graph); + let count = graph.length; + let collection = graph.find(([k,v]) => v.process_id === 'load_collection'); + if (collection) { + count--; + } + let process = graph.find(([k,v]) => { + let spec = this.connection.processes.get(v.process_id); + return Boolean(spec && spec.ogcapi); + }); + let node; + if (process) { + count--; + node = process[1]; + } + else { + throw new Error('Synchronous execution with OGC API - Processes requires an OGC API Process.'); + } + if (count !== 0) { + throw new Error('Synchronous execution with OGC API - Processes only supports one OGC API Process and a collection at a time.'); + } + + // Clone argument so that we can make changes to them + let inputs = Utils.deepClone(node.arguments); + + // Get output parameters from arguments + let outputs = {}; + // @todo workaround for now + for(let key in node.arguments) { + if (key.startsWith('output:')) { + let name = key.substring(7); + delete inputs[key]; + outputs[name] = { + format: { + mediaType: node.arguments[key] + } + }; + } + } + + // Replace openEO fromNode with OAP href instances + if (collection) { + let replaceDeep = (x, parent = null) => { + if (Utils.isObject(x) && x.from_node === collection[0]) { + let c = collection[1].arguments; + let url = new URL(this.connection.getBaseUrl() + `/collections/${c.id}/coverage`); + if (Utils.isObject(c.spatial_extent) && typeof c.spatial_extent.west !== 'undefined') { + let bbox = c.spatial_extent; + let subset = `Lat(${bbox.west}:${bbox.east}),Lon(${bbox.south}:${bbox.north})`; + url.searchParams.append('subset', subset); + } + x = { + href: url.toString() + // @todo scaleFactor=256, f=image/tiff, temporal extent? + }; + // @todo GNOSIS needs this?! + if (!Array.isArray(parent)) { + x = [x]; + } + } + else if (x && typeof x === 'object') { + for(let key in x) { + x[key] = replaceDeep(x[key], x); + } + } + return x; + }; + + inputs = replaceDeep(inputs); + } + + // Construct OAP process + return { + "process": `${this.connection.getBaseUrl()}/processes/${node.process_id}`, + inputs, + outputs + } + }, + +}; + +module.exports = { + GdcCapabilities, + Migrate +}; \ No newline at end of file diff --git a/src/openeo.js b/src/openeo.js index acfabe9..6d108f7 100644 --- a/src/openeo.js +++ b/src/openeo.js @@ -27,6 +27,7 @@ const Formula = require('./builder/formula'); const MIN_API_VERSION = '1.0.0-rc.2'; const MAX_API_VERSION = '1.x.x'; +const GDC_VERSION = '1.0.0-beta.1'; /** * Main class to start with openEO. Allows to connect to a server. @@ -94,9 +95,12 @@ class OpenEO { // Check whether back-end is accessible and supports a compatible version. let capabilities = await connection.init(); - if (Versions.compare(capabilities.apiVersion(), MIN_API_VERSION, "<") || Versions.compare(capabilities.apiVersion(), MAX_API_VERSION, ">")) { + if (capabilities.apiVersion() && (Versions.compare(capabilities.apiVersion(), MIN_API_VERSION, "<") || Versions.compare(capabilities.apiVersion(), MAX_API_VERSION, ">"))) { throw new Error("Client only supports the API versions between " + MIN_API_VERSION + " and " + MAX_API_VERSION); } + if (!Versions.compare(capabilities.gdcVersion(), GDC_VERSION, "=")) { + throw new Error("Client only supports the GDC version " + GDC_VERSION); + } return connection; }