From 3bee0aab6585fc6b9c1b26b48706078d3c2b0494 Mon Sep 17 00:00:00 2001 From: Alex Boatwright Date: Wed, 7 Oct 2020 14:01:03 -0700 Subject: [PATCH] fix: moving definition of query into the pipe definition (#130) --- src/flows/components/header/Submit.tsx | 140 ++++--------------------- src/flows/context/query.tsx | 68 +++++++++++- src/flows/pipes/Data/index.ts | 39 +++++++ src/flows/pipes/Query/index.ts | 15 +++ src/types/flows.ts | 5 + 5 files changed, 145 insertions(+), 122 deletions(-) diff --git a/src/flows/components/header/Submit.tsx b/src/flows/components/header/Submit.tsx index 986b4cf5b2..5b04a24b44 100644 --- a/src/flows/components/header/Submit.tsx +++ b/src/flows/components/header/Submit.tsx @@ -14,13 +14,10 @@ import {event} from 'src/cloud/utils/reporting' // Types import {RemoteDataState} from 'src/types' -const PREVIOUS_REGEXP = /__PREVIOUS_RESULT__/g -const COMMENT_REMOVER = /(\/\*([\s\S]*?)\*\/)|(\/\/(.*)$)/gm - const fakeNotify = notify export const Submit: FC = () => { - const {query} = useContext(QueryContext) + const {query, generateMap} = useContext(QueryContext) const {id, flow} = useContext(FlowContext) const {add, update} = useContext(ResultsContext) const {timeContext} = useContext(TimeContext) @@ -42,133 +39,36 @@ export const Submit: FC = () => { const submit = () => { event('Flow Submit Button Clicked') - let queryIncludesPreviousResult = false setLoading(RemoteDataState.Loading) - Promise.all( - flow.data.allIDs - .reduce((stages, pipeID, index) => { - flow.meta.update(pipeID, {loading: RemoteDataState.Loading}) - const pipe = flow.data.get(pipeID) - - if (pipe.type === 'query') { - let text = pipe.queries[pipe.activeQuery].text.replace( - COMMENT_REMOVER, - '' - ) - let requirements = {} - - if (!text.replace(/\s/g, '').length) { - if (stages.length) { - stages[stages.length - 1].instances.push(pipeID) - } - return stages - } - - if (PREVIOUS_REGEXP.test(text)) { - requirements = { - ...(index === 0 ? {} : stages[stages.length - 1].requirements), - [`prev_${index}`]: stages[stages.length - 1].text, - } - text = text.replace(PREVIOUS_REGEXP, `prev_${index}`) - queryIncludesPreviousResult = true - } - - stages.push({ - text, - instances: [pipeID], - requirements, - }) - } else if (pipe.type === 'data') { - const {bucketName} = pipe - - const text = `from(bucket: "${bucketName}")|>range(start: v.timeRangeStart, stop: v.timeRangeStop)` - - stages.push({ - text, - instances: [pipeID], - requirements: {}, - }) - } else if (pipe.type === 'queryBuilder') { - const {aggregateFunction, bucket, field, measurement, tags} = pipe - - let text = `from(bucket: "${bucket.name}")|>range(start: v.timeRangeStart, stop: v.timeRangeStop)` - if (measurement) { - text += `|> filter(fn: (r) => r["_measurement"] == "${measurement}")` - } - if (field) { - text += `|> filter(fn: (r) => r["_field"] == "${field}")` - } - if (tags && Object.keys(tags)?.length > 0) { - Object.keys(tags) - .filter((tagName: string) => !!tags[tagName]) - .forEach((tagName: string) => { - const tagValues = tags[tagName] - if (tagValues.length === 1) { - text += `|> filter(fn: (r) => r["${tagName}"] == "${tagValues[0]}")` - } else { - tagValues.forEach((val, i) => { - if (i === 0) { - text += `|> filter(fn: (r) => r["${tagName}"] == "${val}"` - } - if (tagValues.length - 1 === i) { - text += ` or r["${tagName}"] == "${val}")` - } else { - text += ` or r["${tagName}"] == "${val}"` - } - }) - } - }) - } - if (aggregateFunction?.name) { - text += ` |> aggregateWindow(every: v.windowPeriod, fn: ${aggregateFunction.name}, createEmpty: false) - |> yield(name: "${aggregateFunction.name}")` - } + flow.data.allIDs.forEach(pipeID => { + flow.meta.update(pipeID, {loading: RemoteDataState.Loading}) + }) - stages.push({ - text, - instances: [pipeID], - requirements: {}, - }) - } else if (stages.length) { - stages[stages.length - 1].instances.push(pipeID) - } + const map = generateMap() - return stages - }, []) - .map(queryStruct => { - const queryText = - Object.entries(queryStruct.requirements) - .map(([key, value]) => `${key} = (\n${value}\n)\n\n`) - .join('') + queryStruct.text - - return query(queryText) - .then(response => { - queryStruct.instances.forEach(pipeID => { - forceUpdate(pipeID, response) - flow.meta.update(pipeID, {loading: RemoteDataState.Done}) - }) + Promise.all( + map.map(stage => { + return query(stage.text) + .then(response => { + stage.instances.forEach(pipeID => { + forceUpdate(pipeID, response) + flow.meta.update(pipeID, {loading: RemoteDataState.Done}) }) - .catch(e => { - queryStruct.instances.forEach(pipeID => { - forceUpdate(pipeID, { - error: e.message, - }) - flow.meta.update(pipeID, {loading: RemoteDataState.Error}) + }) + .catch(e => { + stage.instances.forEach(pipeID => { + forceUpdate(pipeID, { + error: e.message, }) + flow.meta.update(pipeID, {loading: RemoteDataState.Error}) }) - }) + }) + }) ) - .then(() => { event('Flow Submit Resolved') - if (queryIncludesPreviousResult) { - event('flows_queryIncludesPreviousResult') - } else { - event('flows_queryExcludesPreviousResult') - } - setLoading(RemoteDataState.Done) }) .catch(e => { diff --git a/src/flows/context/query.tsx b/src/flows/context/query.tsx index 1e6af84e52..b96a9b0328 100644 --- a/src/flows/context/query.tsx +++ b/src/flows/context/query.tsx @@ -12,22 +12,32 @@ import {TimeContext} from 'src/flows/context/time' import {fromFlux as parse} from '@influxdata/giraffe' import {event} from 'src/cloud/utils/reporting' import {FluxResult} from 'src/types/flows' +import {PIPE_DEFINITIONS} from 'src/flows' + +interface Stage { + text: string + instances: string[] +} export interface QueryContextType { query: (text: string) => Promise + generateMap: () => Stage[] } export const DEFAULT_CONTEXT: QueryContextType = { query: () => Promise.resolve({} as FluxResult), + generateMap: () => [], } export const QueryContext = React.createContext( DEFAULT_CONTEXT ) +const PREVIOUS_REGEXP = /__PREVIOUS_RESULT__/g + type Props = StateProps export const QueryProvider: FC = ({children, variables, org}) => { - const {id} = useContext(FlowContext) + const {id, flow} = useContext(FlowContext) const {timeContext} = useContext(TimeContext) const time = timeContext[id] @@ -41,6 +51,58 @@ export const QueryProvider: FC = ({children, variables, org}) => { variables.map(v => asAssignment(v)) }, [variables, time]) + const generateMap = (): Stage[] => { + return flow.data.allIDs + .reduce((stages, pipeID) => { + const pipe = flow.data.get(pipeID) + + const stage = { + text: '', + instances: [pipeID], + requirements: {}, + } + + const create = (text, loadPrevious) => { + if (loadPrevious && stages.length) { + stage.requirements = { + ...stages[stages.length - 1].requirements, + [`prev_${stages.length}`]: stages[stages.length - 1].text, + } + stage.text = text.replace(PREVIOUS_REGEXP, `prev_${stages.length}`) + } else { + stage.text = text + } + + stages.push(stage) + } + + const append = () => { + if (stages.length) { + stages[stages.length - 1].instances.push(pipeID) + } + } + + if (PIPE_DEFINITIONS[pipe.type].generateFlux) { + PIPE_DEFINITIONS[pipe.type].generateFlux(pipe, create, append) + } else { + append() + } + + return stages + }, []) + .map(queryStruct => { + const queryText = + Object.entries(queryStruct.requirements) + .map(([key, value]) => `${key} = (\n${value}\n)\n\n`) + .join('') + queryStruct.text + + return { + text: queryText, + instances: queryStruct.instances, + } + }) + } + const query = (text: string) => { const windowVars = getWindowVars(text, vars) const extern = buildVarsOption([...vars, ...windowVars]) @@ -69,7 +131,9 @@ export const QueryProvider: FC = ({children, variables, org}) => { } return ( - {children} + + {children} + ) } diff --git a/src/flows/pipes/Data/index.ts b/src/flows/pipes/Data/index.ts index 765c549e89..e7aeef5149 100644 --- a/src/flows/pipes/Data/index.ts +++ b/src/flows/pipes/Data/index.ts @@ -15,5 +15,44 @@ export default register => { tags: {}, aggregateFunction: FUNCTIONS[0], }, + generateFlux: (pipe, create, _append) => { + const {aggregateFunction, bucket, field, measurement, tags} = pipe + + let text = `from(bucket: "${bucket.name}")|>range(start: v.timeRangeStart, stop: v.timeRangeStop)` + if (measurement) { + text += `|> filter(fn: (r) => r["_measurement"] == "${measurement}")` + } + if (field) { + text += `|> filter(fn: (r) => r["_field"] == "${field}")` + } + if (tags && Object.keys(tags)?.length > 0) { + Object.keys(tags) + .filter((tagName: string) => !!tags[tagName]) + .forEach((tagName: string) => { + const tagValues = tags[tagName] + if (tagValues.length === 1) { + text += `|> filter(fn: (r) => r["${tagName}"] == "${tagValues[0]}")` + } else { + tagValues.forEach((val, i) => { + if (i === 0) { + text += `|> filter(fn: (r) => r["${tagName}"] == "${val}"` + } + if (tagValues.length - 1 === i) { + text += ` or r["${tagName}"] == "${val}")` + } else { + text += ` or r["${tagName}"] == "${val}"` + } + }) + } + }) + } + + if (aggregateFunction?.name) { + text += ` |> aggregateWindow(every: v.windowPeriod, fn: ${aggregateFunction.name}, createEmpty: false) + |> yield(name: "${aggregateFunction.name}")` + } + + create(text) + }, }) } diff --git a/src/flows/pipes/Query/index.ts b/src/flows/pipes/Query/index.ts index 7aee15d6e7..3f1ef579a3 100644 --- a/src/flows/pipes/Query/index.ts +++ b/src/flows/pipes/Query/index.ts @@ -1,6 +1,9 @@ import View from './view' import './style.scss' +const PREVIOUS_REGEXP = /__PREVIOUS_RESULT__/g +const COMMENT_REMOVER = /(\/\*([\s\S]*?)\*\/)|(\/\/(.*)$)/gm + export default register => { register({ type: 'query', @@ -23,5 +26,17 @@ export default register => { }, ], }, + generateFlux: (pipe, create, append) => { + const text = pipe.queries[pipe.activeQuery].text + .replace(COMMENT_REMOVER, '') + .replace(/\s/g, '') + + if (!text.length) { + append() + return + } + + create(text, PREVIOUS_REGEXP.test(text)) + }, }) } diff --git a/src/types/flows.ts b/src/types/flows.ts index c0d2dc2228..a1aff69824 100644 --- a/src/types/flows.ts +++ b/src/types/flows.ts @@ -122,4 +122,9 @@ export interface TypeRegistration { component: FunctionComponent | ComponentClass // the view component for rendering the interface button: string // a human readable string for appending the type initial: any // the default state for an add + generateFlux?: ( + pipe: PipeData, + create: (text: string, loadPrevious?: boolean) => void, + append: () => void + ) => void // Generates the flux used to grab data from the backend }