Skip to content

Commit

Permalink
feat(data-warehouse): integrate model run (#25090)
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Sep 24, 2024
1 parent c1aacbe commit 12ada6a
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 37 deletions.
3 changes: 3 additions & 0 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,9 @@ const api = {
): Promise<DataWarehouseSavedQuery> {
return await new ApiRequest().dataWarehouseSavedQuery(viewId).update({ data })
},
async run(viewId: DataWarehouseSavedQuery['id']): Promise<void> {
return await new ApiRequest().dataWarehouseSavedQuery(viewId).withAction('run').create()
},
async ancestors(viewId: DataWarehouseSavedQuery['id'], level?: number): Promise<Record<string, string[]>> {
return await new ApiRequest()
.dataWarehouseSavedQuery(viewId)
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/utils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ export function humanFriendlyDiff(from: dayjs.Dayjs | string, to: dayjs.Dayjs |
}

export function humanFriendlyDetailedTime(
date: dayjs.Dayjs | string | null,
date: dayjs.Dayjs | string | null | undefined,
formatDate = 'MMMM DD, YYYY',
formatTime = 'h:mm:ss A'
): string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ export const databaseTableListLogic = kea<databaseTableListLogicType>([
}, {} as Record<string, DatabaseSchemaDataWarehouseTable>)
},
],
dataWarehouseTablesMapById: [
(s) => [s.database],
(database): Record<string, DatabaseSchemaDataWarehouseTable> => {
if (!database || !database.tables) {
return {}
}

return Object.values(database.tables)
.filter(
(n): n is DatabaseSchemaDataWarehouseTable => n.type === 'data_warehouse' || n.type == 'view'
)
.reduce((acc, cur) => {
acc[cur.id] = database.tables[cur.name] as DatabaseSchemaDataWarehouseTable
return acc
}, {} as Record<string, DatabaseSchemaDataWarehouseTable>)
},
],
views: [
(s) => [s.database],
(database): DatabaseSchemaViewTable[] => {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/scenes/data-model/Node.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function GenericNode({ pref, className = '', children }: NodeProps): JSX.Element
return (
<div
ref={pref}
className={`flex max-w-[200px] px-4 py-3 justify-center items-center space-between gap-1 bg-bg-3000 border border-black border-2 rounded-lg truncate ${className}`}
className={`flex w-[200px] px-4 py-3 justify-center items-center space-between gap-1 bg-bg-3000 border border-black border-2 rounded-lg ${className}`}
>
{children}
</div>
Expand Down
53 changes: 50 additions & 3 deletions frontend/src/scenes/data-model/NodeCanvasWithTable.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { LemonButton, LemonTag } from '@posthog/lemon-ui'
import clsx from 'clsx'
import { useActions, useValues } from 'kea'
import { humanFriendlyDetailedTime } from 'lib/utils'
import { useEffect, useRef, useState } from 'react'
import { dataWarehouseViewsLogic } from 'scenes/data-warehouse/saved_queries/dataWarehouseViewsLogic'
import { StatusTagSetting } from 'scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable'

import GenericNode from './Node'
import { FixedField, JoinedField, TableFields } from './TableFields'
Expand Down Expand Up @@ -96,7 +101,9 @@ const calculateNodePositions = (nodesWithDepth: NodeWithDepth[]): NodePosition[]

const calculateTablePosition = (nodePositions: NodePosition[]): Position => {
// Find the node with the maximum x position
const farthestNode = nodePositions.reduce((max, node) => (node.position.x > max.position.x ? node : max))
const farthestNode = nodePositions.reduce((max, node) => (node.position.x > max.position.x ? node : max), {
position: { x: 0, y: 0 },
})

// Calculate the table position to be slightly to the right of the farthest node
const tablePosition: Position = {
Expand Down Expand Up @@ -207,6 +214,10 @@ const NodeCanvasWithTable = ({
joinedFields,
tableName,
}: ScrollableDraggableCanvasProps): JSX.Element => {
// would like to keep nodecanvas logicless
const { dataWarehouseSavedQueryMapById } = useValues(dataWarehouseViewsLogic)
const { runDataWarehouseSavedQuery } = useActions(dataWarehouseViewsLogic)

const canvasRef = useRef<HTMLCanvasElement | null>(null)
const [isDragging, setIsDragging] = useState(false)
const [offset, setOffset] = useState({ x: 0, y: 0 })
Expand Down Expand Up @@ -350,7 +361,7 @@ const NodeCanvasWithTable = ({
)
})}
</svg>
{nodePositions.map(({ name, position, nodeId }, idx) => {
{nodePositions.map(({ name, savedQueryId, position, nodeId }, idx) => {
return (
<div
key={nodeId}
Expand All @@ -367,7 +378,43 @@ const NodeCanvasWithTable = ({
nodeRefs.current[idx]?.setAttribute('id', nodeId)
}}
>
{name}
<div className="flex flex-col max-w-full">
<div className="flex flex-wrap justify-between gap-2">
<div className="font-bold break-words">{name}</div>
{savedQueryId && (
<LemonButton
type="primary"
size="xsmall"
onClick={() => runDataWarehouseSavedQuery(savedQueryId)}
>
Run
</LemonButton>
)}
</div>
{savedQueryId && dataWarehouseSavedQueryMapById[savedQueryId]?.status && (
<div className="text-xs mt-2 max-w-full">
<LemonTag
type={
(dataWarehouseSavedQueryMapById[savedQueryId]?.status &&
StatusTagSetting[
dataWarehouseSavedQueryMapById[savedQueryId].status as string
]) ||
'default'
}
className="break-words"
>
{dataWarehouseSavedQueryMapById[savedQueryId]?.status}
</LemonTag>
</div>
)}
{savedQueryId && dataWarehouseSavedQueryMapById[savedQueryId]?.last_run_at && (
<span className="text-xs mt-2 max-w-full break-words">
{`Last calculated ${humanFriendlyDetailedTime(
dataWarehouseSavedQueryMapById[savedQueryId]?.last_run_at
)}`}
</span>
)}
</div>
</GenericNode>
</div>
)
Expand Down
8 changes: 1 addition & 7 deletions frontend/src/scenes/data-model/TableFields.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ export function TableFields({ fixedFields, joinedFields, rowsRefs, tableName }:
</div>
</div>
<div className="flex flex-col gap-1">
<div
ref={(el) => {
rowsRefs.current[joinedFields.length] = el
rowsRefs.current[joinedFields.length]?.setAttribute('id', 'schema')
}}
className="pl-4 mt-4"
>
<div className="pl-4 mt-4">
<h4>Schema</h4>
</div>
<LemonTable
Expand Down
26 changes: 12 additions & 14 deletions frontend/src/scenes/data-model/dataModelSceneLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Node } from './types'
export const dataModelSceneLogic = kea<dataModelSceneLogicType>([
path(['scenes', 'data-model', 'dataModelSceneLogic']),
connect(() => ({
values: [databaseTableListLogic, ['posthogTablesMap', 'viewsMapById']],
values: [databaseTableListLogic, ['posthogTablesMap', 'viewsMapById', 'dataWarehouseTablesMapById']],
})),
actions({
traverseAncestors: (viewId: DataWarehouseSavedQuery['id'], level: number) => ({ viewId, level }),
Expand All @@ -35,7 +35,11 @@ export const dataModelSceneLogic = kea<dataModelSceneLogicType>([
...values.nodeMap,
[ancestor]: {
nodeId: ancestor,
name: values.viewsMapById[ancestor]?.name || ancestor,
name:
values.viewsMapById[ancestor]?.name ||
values.dataWarehouseTablesMapById[ancestor]?.name ||
ancestor,
savedQueryId: values.viewsMapById[ancestor]?.id,
leaf: [...(values.nodeMap[ancestor]?.leaf || []), viewId],
},
})
Expand Down Expand Up @@ -68,17 +72,7 @@ export const dataModelSceneLogic = kea<dataModelSceneLogicType>([
table: field.name,
})) || [],
],
allNodes: [
(s) => [s.nodeMap],
(nodeMap) => [
{
nodeId: 'posthog',
name: 'PostHog',
leaf: ['schema'],
},
...Object.values(nodeMap),
],
],
allNodes: [(s) => [s.nodeMap], (nodeMap) => [...Object.values(nodeMap)]],
}),
subscriptions(({ actions, values }) => ({
joinedFields: (joinedFields) => {
Expand All @@ -87,7 +81,11 @@ export const dataModelSceneLogic = kea<dataModelSceneLogicType>([
...values.nodeMap,
[field.id]: {
nodeId: field.id,
name: values.viewsMapById[field.id]?.name || field.id,
name:
values.viewsMapById[field.id]?.name ||
values.dataWarehouseTablesMapById[field.id]?.name ||
field.id,
savedQueryId: values.viewsMapById[field.id]?.id,
leaf: [`${field.name}_joined`],
},
})
Expand Down
1 change: 1 addition & 0 deletions frontend/src/scenes/data-model/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface Position {
export interface Node {
nodeId: string
name: string
savedQueryId?: string
leaf: string[]
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { lemonToast } from '@posthog/lemon-ui'
import { connect, kea, listeners, path, selectors } from 'kea'
import { actions, connect, events, kea, listeners, path, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import { router } from 'kea-router'
import api from 'lib/api'
Expand All @@ -8,6 +8,7 @@ import { urls } from 'scenes/urls'
import { userLogic } from 'scenes/userLogic'

import { DatabaseSchemaViewTable } from '~/queries/schema'
import { DataWarehouseSavedQuery } from '~/types'

import type { dataWarehouseViewsLogicType } from './dataWarehouseViewsLogicType'

Expand All @@ -17,37 +18,52 @@ export const dataWarehouseViewsLogic = kea<dataWarehouseViewsLogicType>([
values: [userLogic, ['user'], databaseTableListLogic, ['views', 'databaseLoading']],
actions: [databaseTableListLogic, ['loadDatabase']],
})),

loaders({
actions({
runDataWarehouseSavedQuery: (viewId: string) => ({ viewId }),
}),
loaders(({ values }) => ({
dataWarehouseSavedQueries: [
null,
[] as DataWarehouseSavedQuery[],
{
loadDataWarehouseSavedQueries: async () => {
const savedQueries = await api.dataWarehouseSavedQueries.list()
return savedQueries.results
},
createDataWarehouseSavedQuery: async (view: Partial<DatabaseSchemaViewTable>) => {
const newView = await api.dataWarehouseSavedQueries.create(view)

lemonToast.success(`${newView.name ?? 'View'} successfully created`)
router.actions.push(urls.dataWarehouseView(newView.id))

return null
return [...values.dataWarehouseSavedQueries, newView]
},
deleteDataWarehouseSavedQuery: async (viewId: string) => {
await api.dataWarehouseSavedQueries.delete(viewId)
return null
return values.dataWarehouseSavedQueries.filter((view) => view.id !== viewId)
},
updateDataWarehouseSavedQuery: async (view: DatabaseSchemaViewTable) => {
await api.dataWarehouseSavedQueries.update(view.id, view)
return null
const newView = await api.dataWarehouseSavedQueries.update(view.id, view)
return values.dataWarehouseSavedQueries.map((savedQuery) => {
if (savedQuery.id === view.id) {
return newView
}
return savedQuery
})
},
},
],
}),
})),
listeners(({ actions }) => ({
createDataWarehouseSavedQuerySuccess: () => {
actions.loadDatabase()
},
updateDataWarehouseSavedQuerySuccess: () => {
actions.loadDatabase()
},
runDataWarehouseSavedQuery: async ({ viewId }) => {
await api.dataWarehouseSavedQueries.run(viewId)
actions.loadDataWarehouseSavedQueries()
},
})),
selectors({
shouldShowEmptyState: [
Expand All @@ -56,5 +72,25 @@ export const dataWarehouseViewsLogic = kea<dataWarehouseViewsLogicType>([
return views?.length == 0 && !databaseLoading
},
],
dataWarehouseSavedQueryMapById: [
(s) => [s.dataWarehouseSavedQueries],
(dataWarehouseSavedQueries) => {
return dataWarehouseSavedQueries.reduce((acc, cur) => {
acc[cur.id] = cur
return acc
}, {} as Record<string, DataWarehouseSavedQuery>)
},
],
}),
events(({ actions, cache }) => ({
afterMount: () => {
actions.loadDataWarehouseSavedQueries()
if (!cache.pollingInterval) {
cache.pollingInterval = setInterval(actions.loadDataWarehouseSavedQueries, 5000)
}
},
beforeUnmount: () => {
clearInterval(cache.pollingInterval)
},
})),
])
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { manualLinkSources, PipelineNodeTab, PipelineStage } from '~/types'
import { SOURCE_DETAILS } from '../new/sourceWizardLogic'
import { dataWarehouseSettingsLogic } from './dataWarehouseSettingsLogic'

const StatusTagSetting = {
export const StatusTagSetting: Record<string, 'primary' | 'success' | 'danger'> = {
Running: 'primary',
Completed: 'success',
Error: 'danger',
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3899,6 +3899,8 @@ export interface DataWarehouseSavedQuery {
name: string
query: HogQLQuery
columns: DatabaseSchemaField[]
last_run_at?: string
status?: string
}

export interface DataWarehouseViewLink {
Expand Down
7 changes: 6 additions & 1 deletion posthog/warehouse/api/saved_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ class Meta:
"created_by",
"created_at",
"columns",
"status",
"last_run_at",
]
read_only_fields = ["id", "created_by", "created_at", "columns"]
read_only_fields = ["id", "created_by", "created_at", "columns", "status", "last_run_at"]

def get_columns(self, view: DataWarehouseSavedQuery) -> list[SerializedField]:
team_id = self.context["team_id"]
Expand Down Expand Up @@ -171,6 +173,9 @@ def run(self, request: request.Request, *args, **kwargs) -> response.Response:
select=[Selector(label=saved_query.id.hex, ancestors=ancestors, descendants=descendants)],
)
workflow_id = f"data-modeling-run-{saved_query.id.hex}"
saved_query.status = DataWarehouseSavedQuery.Status.RUNNING
saved_query.save()

async_to_sync(temporal.start_workflow)( # type: ignore
"data-modeling-run", # type: ignore
inputs, # type: ignore
Expand Down

0 comments on commit 12ada6a

Please sign in to comment.