Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: funnel actors queries on udf #25013

Merged
merged 3 commits into from
Oct 2, 2024
Merged

feat: funnel actors queries on udf #25013

merged 3 commits into from
Oct 2, 2024

Conversation

aspicer
Copy link
Contributor

@aspicer aspicer commented Sep 17, 2024

Move funnel actors queries over to UDF if you have the feature flag on.

Passes the UUID of each event to the UDF. The UDF returns, for each step, a list of event UUIDs that correspond to the funnel matches for each user.

Adds in timing fields used by correlation and user paths.

Also a tiny bit of cleanup of the superfluous leftovers in SQL that were hanging on from the old queries.

Testing

Tested with tests. Added UDF actors query unit tests. Also tested quite a bit on dev.

/* user_id:2 celery:posthog.tasks.tasks.process_query_task */ SELECT
    persons.id AS id,
    persons.created_at AS created_at,
    source.matching_events AS matching_events
FROM
    (SELECT
        aggregation_target AS actor_id,
        matched_events_array[3] AS matching_events
    FROM
        (SELECT
            arrayJoin(aggregate_funnel_array(3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), uuid, [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
            af_tuple.1 AS step_reached,
            af_tuple.2 AS breakdown,
            af_tuple.3 AS timings,
            af_tuple.4 AS matched_event_uuids_array_array,
            groupArray(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS user_events,
            mapFromArrays(arrayMap(x -> x.2, user_events), user_events) AS user_events_map,
            arrayMap(matched_event_uuids_array -> arrayMap(event_uuid -> user_events_map[event_uuid], matched_event_uuids_array), matched_event_uuids_array_array) AS matched_events_array,
            aggregation_target AS aggregation_target
        FROM
            (SELECT
                toTimeZone(e.timestamp, 'UTC') AS timestamp,
                if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS aggregation_target,
                e.uuid AS uuid,
                e.`$session_id` AS `$session_id`,
                e.`$window_id` AS `$window_id`,
                if(and(equals(e.event, '$pageview'), and(313131313131, startsWith(toString(e.distinct_id), 'a'))), 1, 0) AS step_0,
                if(equals(e.event, '$pageview'), 1, 0) AS step_1,
                if(equals(e.event, '$pageview'), 1, 0) AS step_2
            FROM
                events AS e
                LEFT OUTER JOIN (SELECT
                    argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
                    person_distinct_id_overrides.distinct_id AS distinct_id
                FROM
                    person_distinct_id_overrides
                WHERE
                    equals(person_distinct_id_overrides.team_id, 2)
                GROUP BY
                    person_distinct_id_overrides.distinct_id
                HAVING
                    ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0)
                SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
            WHERE
                and(equals(e.team_id, 2), and(and(greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('2024-09-10 00:00:00.000000', 6, 'UTC')), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('2024-09-17 23:59:59.999999', 6, 'UTC'))), in(e.event, tuple('$pageview'))), or(ifNull(equals(step_0, 1), 0), ifNull(equals(step_1, 1), 0), ifNull(equals(step_2, 1), 0))))
        GROUP BY
            aggregation_target
        HAVING
            ifNull(greaterOrEquals(step_reached, 0), 0))
    WHERE
        ifNull(greaterOrEquals(step_reached, 2), 0)
    ORDER BY
        aggregation_target ASC) AS source
    INNER JOIN (SELECT
        argMax(toTimeZone(person.created_at, 'UTC'), person.version) AS created_at,
        person.id AS id
    FROM
        person
    WHERE
        equals(person.team_id, 2)
    GROUP BY
        person.id
    HAVING
        and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))
    SETTINGS optimize_aggregation_in_order=1) AS persons ON equals(persons.id, source.actor_id)
ORDER BY
    persons.created_at DESC
LIMIT 101
OFFSET 0 SETTINGS readonly=2, max_execution_time=600, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0, allow_experimental_analyzer=1

@aspicer aspicer force-pushed the aspicer/funnel_actors branch from 2fce29a to 13f8a8a Compare September 17, 2024 04:55
@aspicer aspicer changed the title wip feat: funnel actors queries on udf Sep 17, 2024
@aspicer aspicer marked this pull request as ready for review September 17, 2024 08:37
@aspicer aspicer requested review from skoob13 and a team September 17, 2024 09:03
Copy link
Contributor

@skoob13 skoob13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is perfect! I didn't manage to test associated session recordings locally. It's always empty, but it's also empty without UDFs. One potential low-hanging fruit is using the orjson module instead of json since we deal with massive data arrays. When Ansible scripts for UDFs are in place, modules should be easier to use.

posthog/hogql_queries/insights/funnels/funnel_udf.py Outdated Show resolved Hide resolved
@aspicer
Copy link
Contributor Author

aspicer commented Sep 18, 2024

@skoob13 Just did a quick benchmark to see what's what.

In the benchmark, I parsed a json blob, iterated through a 100 item array and multipled each item by 2, to emulate doing some logic in the UDF, and then printed out the result, 1,000,000 times.

Python JSON: 14.8s
Python orjson: 5.4s
nodejs: 7.2s
rust: 2.3s

A little surprised about the node performance, thought it would be a bit faster!

Using orjson definitely makes sense, but can you think of an easy way to administer it? Would we have to set up ansible to maintain a virtualenv at each clickhouse node?

Or maybe their distro provides a package they can install natively.

j = """{
        "id": "0178a3ab-d163-0000-4b55-bceadebb03fa",
        "name": "Hogflix Movies",
        "created_at": "2021-04-05T20:14:09.763753Z",
        "updated_at": "2021-04-05T20:14:25.443181Z",
        "membership_level": 15,
        "plugins_access_level": 9,
        "teams": [
            {
                "id": 1,
                "uuid": "0178a3ab-d1e5-0000-c5ca-da746c68f506",
                "organization": "0178a3ab-d163-0000-4b55-bceadebb03fa",
                "api_token": "tJy-b6mTLwvNP_ZJHrfgn99pQCYOGFE3-nwpb8utFa8",
                "name": "Hogflix Demo App",
                "completed_snippet_onboarding": true,
                "ingested_event": true,
                "is_demo": true,
                "timezone": "Europe/Kiev"
            }
        ],
        "available_product_features": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

    }"""

Python

import json
for i in range(1000000):
    r = json.loads(j)
    r['available_product_features'] = [x * 2 for x in r['available_product_features']]
    print(json.dumps(r))

JS

for (let i = 0; i < 1000000; i++) {
    let r = JSON.parse(j)
    r.available_product_features = r.available_product_features.map((x) => x * 2)
    process.stdout.write(JSON.stringify(r) + '\n')
}

Rust

use serde_json::{from_str, to_string};
use serde::{Deserialize, Serialize};


#[derive(Serialize, Deserialize)]
struct Team {
    id: u64,
    uuid: String,
    organization: String,
    api_token: String,
    name: String,
    completed_snippet_onboarding: bool,
    ingested_event: bool,
    is_demo: bool,
    timezone: String
}

#[derive(Serialize, Deserialize)]
struct Person {
    id: String,
    name: String,
    created_at: String,
    updated_at: String,
    membership_level: u64,
    plugins_access_level: u64,
    teams: Vec<Team>,
    available_product_features: Vec<u64>,
}

fn main() {
    // JSON string
    let j = r#"{
        "id": "0178a3ab-d163-0000-4b55-bceadebb03fa",
        "name": "Hogflix Movies",
        "created_at": "2021-04-05T20:14:09.763753Z",
        "updated_at": "2021-04-05T20:14:25.443181Z",
        "membership_level": 15,
        "plugins_access_level": 9,
        "teams": [
            {
                "id": 1,
                "uuid": "0178a3ab-d1e5-0000-c5ca-da746c68f506",
                "organization": "0178a3ab-d163-0000-4b55-bceadebb03fa",
                "api_token": "tJy-b6mTLwvNP_ZJHrfgn99pQCYOGFE3-nwpb8utFa8",
                "name": "Hogflix Demo App",
                "completed_snippet_onboarding": true,
                "ingested_event": true,
                "is_demo": true,
                "timezone": "Europe/Kiev"
            }
        ],
        "available_product_features": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

    }"#;

    // Loop 1,000,000 times
    for _ in 0..1_000_000 {
        // Deserialize the JSON string into a Rust value
        let mut r: Person = from_str(j).unwrap();

        r.available_product_features = r.available_product_features.into_iter().map(|x| x * 2).collect();

        // Serialize the Rust value back into a JSON string
        let z = to_string(&r).unwrap();
        println!("{}", z);
    }
}

@aspicer aspicer requested a review from a team September 22, 2024 21:18
if property not in self._extra_event_properties:
self._extra_event_properties.append(property)

# I think I can delete this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you? 😄

@@ -1,158 +1,44 @@
# serializer version: 1
# name: TestFunnelPersons.test_funnel_person_recordings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like all the TestFunnelPersons turned into TestFunnelPersonsUDF – is this intended here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I put the UDF class in its own file...

And the test_funnel_persons.ambr still has all the TestFunnelPersonsUDF stuff and test_funnel_persons_udf.ambr has TestFunnelPersons in it.

🤯

@skoob13
Copy link
Contributor

skoob13 commented Sep 23, 2024

@aspicer the other day, I was watching this video, which gives a nice overview of basic UDF optimizations. We'll need to run benchmarks for max_block_size, which is 65k by default, to estimate performance accurately.

I'm not sure how we can orchestrate venv though. Potentially, we can use another format like Native, which seems to be parseable with the in-built struct module.

Additionally, it seems that the most significant achievable perf benefit we can achieve with Python is to use executable_pool to avoid initialization costs, but the question is how we can find a suitable value for pool_size.

Perf is not important right now, so let's postpone this discussion.

@aspicer aspicer force-pushed the aspicer/funnel_actors branch from 3abc7a5 to ff7d4e7 Compare October 2, 2024 16:41
aspicer and others added 2 commits October 2, 2024 12:01
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
@aspicer aspicer force-pushed the aspicer/funnel_actors branch from 8980cc6 to c9fc432 Compare October 2, 2024 20:35
@aspicer aspicer merged commit 35d015c into master Oct 2, 2024
173 checks passed
@aspicer aspicer deleted the aspicer/funnel_actors branch October 2, 2024 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants