Skip to content

Commit

Permalink
fix: moving definition of query into the pipe definition (influxdata#130
Browse files Browse the repository at this point in the history
)
  • Loading branch information
drdelambre authored Oct 7, 2020
1 parent 0abf6cb commit 3bee0aa
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 122 deletions.
140 changes: 20 additions & 120 deletions src/flows/components/header/Submit.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ import {event} from 'src/cloud/utils/reporting'
// Types
import {RemoteDataState} from 'src/types'

const PREVIOUS_REGEXP = /__PREVIOUS_RESULT__/g
const COMMENT_REMOVER = /(\/\*([\s\S]*?)\*\/)|(\/\/(.*)$)/gm

const fakeNotify = notify

export const Submit: FC = () => {
const {query} = useContext(QueryContext)
const {query, generateMap} = useContext(QueryContext)
const {id, flow} = useContext(FlowContext)
const {add, update} = useContext(ResultsContext)
const {timeContext} = useContext(TimeContext)
Expand All @@ -42,133 +39,36 @@ export const Submit: FC = () => {

const submit = () => {
event('Flow Submit Button Clicked')
let queryIncludesPreviousResult = false
setLoading(RemoteDataState.Loading)
Promise.all(
flow.data.allIDs
.reduce((stages, pipeID, index) => {
flow.meta.update(pipeID, {loading: RemoteDataState.Loading})
const pipe = flow.data.get(pipeID)

if (pipe.type === 'query') {
let text = pipe.queries[pipe.activeQuery].text.replace(
COMMENT_REMOVER,
''
)
let requirements = {}

if (!text.replace(/\s/g, '').length) {
if (stages.length) {
stages[stages.length - 1].instances.push(pipeID)
}
return stages
}

if (PREVIOUS_REGEXP.test(text)) {
requirements = {
...(index === 0 ? {} : stages[stages.length - 1].requirements),
[`prev_${index}`]: stages[stages.length - 1].text,
}
text = text.replace(PREVIOUS_REGEXP, `prev_${index}`)
queryIncludesPreviousResult = true
}

stages.push({
text,
instances: [pipeID],
requirements,
})
} else if (pipe.type === 'data') {
const {bucketName} = pipe

const text = `from(bucket: "${bucketName}")|>range(start: v.timeRangeStart, stop: v.timeRangeStop)`

stages.push({
text,
instances: [pipeID],
requirements: {},
})
} else if (pipe.type === 'queryBuilder') {
const {aggregateFunction, bucket, field, measurement, tags} = pipe

let text = `from(bucket: "${bucket.name}")|>range(start: v.timeRangeStart, stop: v.timeRangeStop)`
if (measurement) {
text += `|> filter(fn: (r) => r["_measurement"] == "${measurement}")`
}
if (field) {
text += `|> filter(fn: (r) => r["_field"] == "${field}")`
}
if (tags && Object.keys(tags)?.length > 0) {
Object.keys(tags)
.filter((tagName: string) => !!tags[tagName])
.forEach((tagName: string) => {
const tagValues = tags[tagName]
if (tagValues.length === 1) {
text += `|> filter(fn: (r) => r["${tagName}"] == "${tagValues[0]}")`
} else {
tagValues.forEach((val, i) => {
if (i === 0) {
text += `|> filter(fn: (r) => r["${tagName}"] == "${val}"`
}
if (tagValues.length - 1 === i) {
text += ` or r["${tagName}"] == "${val}")`
} else {
text += ` or r["${tagName}"] == "${val}"`
}
})
}
})
}

if (aggregateFunction?.name) {
text += ` |> aggregateWindow(every: v.windowPeriod, fn: ${aggregateFunction.name}, createEmpty: false)
|> yield(name: "${aggregateFunction.name}")`
}
flow.data.allIDs.forEach(pipeID => {
flow.meta.update(pipeID, {loading: RemoteDataState.Loading})
})

stages.push({
text,
instances: [pipeID],
requirements: {},
})
} else if (stages.length) {
stages[stages.length - 1].instances.push(pipeID)
}
const map = generateMap()

return stages
}, [])
.map(queryStruct => {
const queryText =
Object.entries(queryStruct.requirements)
.map(([key, value]) => `${key} = (\n${value}\n)\n\n`)
.join('') + queryStruct.text

return query(queryText)
.then(response => {
queryStruct.instances.forEach(pipeID => {
forceUpdate(pipeID, response)
flow.meta.update(pipeID, {loading: RemoteDataState.Done})
})
Promise.all(
map.map(stage => {
return query(stage.text)
.then(response => {
stage.instances.forEach(pipeID => {
forceUpdate(pipeID, response)
flow.meta.update(pipeID, {loading: RemoteDataState.Done})
})
.catch(e => {
queryStruct.instances.forEach(pipeID => {
forceUpdate(pipeID, {
error: e.message,
})
flow.meta.update(pipeID, {loading: RemoteDataState.Error})
})
.catch(e => {
stage.instances.forEach(pipeID => {
forceUpdate(pipeID, {
error: e.message,
})
flow.meta.update(pipeID, {loading: RemoteDataState.Error})
})
})
})
})
)

.then(() => {
event('Flow Submit Resolved')

if (queryIncludesPreviousResult) {
event('flows_queryIncludesPreviousResult')
} else {
event('flows_queryExcludesPreviousResult')
}

setLoading(RemoteDataState.Done)
})
.catch(e => {
Expand Down
68 changes: 66 additions & 2 deletions src/flows/context/query.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,32 @@ import {TimeContext} from 'src/flows/context/time'
import {fromFlux as parse} from '@influxdata/giraffe'
import {event} from 'src/cloud/utils/reporting'
import {FluxResult} from 'src/types/flows'
import {PIPE_DEFINITIONS} from 'src/flows'

interface Stage {
text: string
instances: string[]
}

export interface QueryContextType {
query: (text: string) => Promise<FluxResult>
generateMap: () => Stage[]
}

export const DEFAULT_CONTEXT: QueryContextType = {
query: () => Promise.resolve({} as FluxResult),
generateMap: () => [],
}

export const QueryContext = React.createContext<QueryContextType>(
DEFAULT_CONTEXT
)

const PREVIOUS_REGEXP = /__PREVIOUS_RESULT__/g

type Props = StateProps
export const QueryProvider: FC<Props> = ({children, variables, org}) => {
const {id} = useContext(FlowContext)
const {id, flow} = useContext(FlowContext)
const {timeContext} = useContext(TimeContext)
const time = timeContext[id]

Expand All @@ -41,6 +51,58 @@ export const QueryProvider: FC<Props> = ({children, variables, org}) => {
variables.map(v => asAssignment(v))
}, [variables, time])

const generateMap = (): Stage[] => {
return flow.data.allIDs
.reduce((stages, pipeID) => {
const pipe = flow.data.get(pipeID)

const stage = {
text: '',
instances: [pipeID],
requirements: {},
}

const create = (text, loadPrevious) => {
if (loadPrevious && stages.length) {
stage.requirements = {
...stages[stages.length - 1].requirements,
[`prev_${stages.length}`]: stages[stages.length - 1].text,
}
stage.text = text.replace(PREVIOUS_REGEXP, `prev_${stages.length}`)
} else {
stage.text = text
}

stages.push(stage)
}

const append = () => {
if (stages.length) {
stages[stages.length - 1].instances.push(pipeID)
}
}

if (PIPE_DEFINITIONS[pipe.type].generateFlux) {
PIPE_DEFINITIONS[pipe.type].generateFlux(pipe, create, append)
} else {
append()
}

return stages
}, [])
.map(queryStruct => {
const queryText =
Object.entries(queryStruct.requirements)
.map(([key, value]) => `${key} = (\n${value}\n)\n\n`)
.join('') + queryStruct.text

return {
text: queryText,
instances: queryStruct.instances,
}
})
}

const query = (text: string) => {
const windowVars = getWindowVars(text, vars)
const extern = buildVarsOption([...vars, ...windowVars])
Expand Down Expand Up @@ -69,7 +131,9 @@ export const QueryProvider: FC<Props> = ({children, variables, org}) => {
}

return (
<QueryContext.Provider value={{query}}>{children}</QueryContext.Provider>
<QueryContext.Provider value={{query, generateMap}}>
{children}
</QueryContext.Provider>
)
}

Expand Down
39 changes: 39 additions & 0 deletions src/flows/pipes/Data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,44 @@ export default register => {
tags: {},
aggregateFunction: FUNCTIONS[0],
},
generateFlux: (pipe, create, _append) => {
const {aggregateFunction, bucket, field, measurement, tags} = pipe

let text = `from(bucket: "${bucket.name}")|>range(start: v.timeRangeStart, stop: v.timeRangeStop)`
if (measurement) {
text += `|> filter(fn: (r) => r["_measurement"] == "${measurement}")`
}
if (field) {
text += `|> filter(fn: (r) => r["_field"] == "${field}")`
}
if (tags && Object.keys(tags)?.length > 0) {
Object.keys(tags)
.filter((tagName: string) => !!tags[tagName])
.forEach((tagName: string) => {
const tagValues = tags[tagName]
if (tagValues.length === 1) {
text += `|> filter(fn: (r) => r["${tagName}"] == "${tagValues[0]}")`
} else {
tagValues.forEach((val, i) => {
if (i === 0) {
text += `|> filter(fn: (r) => r["${tagName}"] == "${val}"`
}
if (tagValues.length - 1 === i) {
text += ` or r["${tagName}"] == "${val}")`
} else {
text += ` or r["${tagName}"] == "${val}"`
}
})
}
})
}

if (aggregateFunction?.name) {
text += ` |> aggregateWindow(every: v.windowPeriod, fn: ${aggregateFunction.name}, createEmpty: false)
|> yield(name: "${aggregateFunction.name}")`
}

create(text)
},
})
}
15 changes: 15 additions & 0 deletions src/flows/pipes/Query/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import View from './view'
import './style.scss'

const PREVIOUS_REGEXP = /__PREVIOUS_RESULT__/g
const COMMENT_REMOVER = /(\/\*([\s\S]*?)\*\/)|(\/\/(.*)$)/gm

export default register => {
register({
type: 'query',
Expand All @@ -23,5 +26,17 @@ export default register => {
},
],
},
generateFlux: (pipe, create, append) => {
const text = pipe.queries[pipe.activeQuery].text
.replace(COMMENT_REMOVER, '')
.replace(/\s/g, '')

if (!text.length) {
append()
return
}

create(text, PREVIOUS_REGEXP.test(text))
},
})
}
5 changes: 5 additions & 0 deletions src/types/flows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,9 @@ export interface TypeRegistration {
component: FunctionComponent<PipeProp> | ComponentClass<PipeProp> // the view component for rendering the interface
button: string // a human readable string for appending the type
initial: any // the default state for an add
generateFlux?: (
pipe: PipeData,
create: (text: string, loadPrevious?: boolean) => void,
append: () => void
) => void // Generates the flux used to grab data from the backend
}

0 comments on commit 3bee0aa

Please sign in to comment.