Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Rare with group by cause job abort due to result size too large #611

Closed
seankao-az opened this issue Aug 29, 2024 · 4 comments
Closed
Assignees
Labels
bug Something isn't working Lang:PPL Pipe Processing Language support

Comments

@seankao-az
Copy link
Collaborator

seankao-az commented Aug 29, 2024

What is the bug?
Running source=myglue_test.default.http_logs | rare request by clientip
gets error:

{
  "status": "FAILED",
  "error": "{\"Message\":\"Spark exception. Cause: Job aborted due to stage failure: Total size of serialized results of 118 tasks (1032.9 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)\"}"
}

However, an equivalent SQL query: select count(*) as cnt, request, clientip from myglue_test.default.http_logs group by request, clientip order by cnt asc limit 10 could get the result correctly:

SQL query result
{
	"status": "SUCCESS",
	"schema": [
		{
			"name": "cnt",
			"type": "long"
		},
		{
			"name": "request",
			"type": "string"
		},
		{
			"name": "clientip",
			"type": "string"
		}
	],
	"datarows": [
		[
			1,
			"GET /images/102321.gif HTTP/1.0",
			"120.125.16.0"
		],
		[
			1,
			"GET /images/102327.gif HTTP/1.0",
			"183.200.6.0"
		],
		[
			1,
			"GET /images/102321.gif HTTP/1.0",
			"89.57.14.0"
		],
		[
			1,
			"GET /images/102321.gif HTTP/1.0",
			"147.116.16.0"
		],
		[
			1,
			"GET /images/102321.gif HTTP/1.1",
			"224.58.14.0"
		],
		[
			1,
			"GET /images/102321.gif HTTP/1.0",
			"178.72.4.0"
		],
		[
			1,
			"GET /english/images/comp_bu_stage1n.gif HTTP/1.0",
			"195.128.16.0"
		],
		[
			1,
			"GET /english/images/comp_bu_stage1n.gif HTTP/1.0",
			"26.52.13.0"
		],
		[
			1,
			"GET /images/102321.gif HTTP/1.1",
			"77.76.14.0"
		],
		[
			1,
			"GET /english/images/team_bu_roster_on.gif HTTP/1.1",
			"142.58.14.0"
		]
	],
	"total": 10,
	"size": 10
}

The LIMIT 10 clause might cause a difference in SQL query, but I add it because rare command defaults to size 10 as well

Physical plan for PPL query:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(4) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#32]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$3843/0x00007f070d20d1d0@4851585e, obj#31: java.lang.String
      +- DeserializeToObject createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, count(request)#8L, true, false, true), request#12.toString, clientip#11.toString, StructField(count(request),LongType,false), StructField(request,StringType,true), StructField(clientip,StringType,true)), obj#30: org.apache.spark.sql.Row
         +- *(3) Sort [request#12 DESC NULLS LAST], true, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 1
                  +- Exchange rangepartitioning(request#12 DESC NULLS LAST, 1000), ENSURE_REQUIREMENTS, [plan_id=98]
                     +- *(2) HashAggregate(keys=[request#12, clientip#11], functions=[count(request#12)], output=[count(request)#8L, request#12, clientip#11], schema specialized)
                        +- AQEShuffleRead coalesced
                           +- ShuffleQueryStage 0
                              +- Exchange hashpartitioning(request#12, clientip#11, 1000), ENSURE_REQUIREMENTS, [plan_id=52]
                                 +- *(1) HashAggregate(keys=[request#12, clientip#11], functions=[partial_count(request#12)], output=[request#12, clientip#11, count#34L], schema specialized)
                                    +- *(1) Project [clientip#11, request#12]
                                       +- FileScan json spark_catalog.default.http_logs[clientip#11,request#12,year#15,month#16,day#17] Batched: false, DataFilters: [], Format: JSON, Location: CatalogFileIndex(1 paths)[s3://..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<clientip:string,request:string>

Physical plan for SQL query:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#31]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$3865/0x00007fa68d23a778@509772ad, obj#30: java.lang.String
      +- DeserializeToObject createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cnt#8L, true, false, true), request#12.toString, clientip#11.toString, StructField(cnt,LongType,false), StructField(request,StringType,true), StructField(clientip,StringType,true)), obj#29: org.apache.spark.sql.Row
         +- TakeOrderedAndProject(limit=10, orderBy=[cnt#8L ASC NULLS FIRST], output=[cnt#8L,request#12,clientip#11])
            +- *(2) HashAggregate(keys=[request#12, clientip#11], functions=[count(1)], output=[cnt#8L, request#12, clientip#11], schema specialized)
               +- AQEShuffleRead coalesced
                  +- ShuffleQueryStage 0
                     +- Exchange hashpartitioning(request#12, clientip#11, 1000), ENSURE_REQUIREMENTS, [plan_id=48]
                        +- *(1) HashAggregate(keys=[request#12, clientip#11], functions=[partial_count(1)], output=[request#12, clientip#11, count#33L], schema specialized)
                           +- *(1) Project [clientip#11, request#12]
                              +- FileScan json spark_catalog.default.http_logs[clientip#11,request#12,year#15,month#16,day#17] Batched: false, DataFilters: [], Format: JSON, Location: CatalogFileIndex(1 paths)[s3:..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<clientip:string,request:string>

What is your host/environment?

  • AOS: 2.15
  • EMR-S: emr-7.2.0
  • Flint: 0.5
@seankao-az seankao-az added bug Something isn't working untriaged labels Aug 29, 2024
@penghuo
Copy link
Collaborator

penghuo commented Aug 29, 2024

LIMIT 10 is not the same as Rare. To achieve the desired result, we should use a window function to limit the output to 10 records per group.

SELECT 
    request, 
    clientip, 
    cnt
FROM (
    SELECT 
        request, 
        clientip, 
        COUNT(*) AS cnt, 
        ROW_NUMBER() OVER (PARTITION BY request, clientip ORDER BY cnt ASC) AS rn
    FROM 
        myglue_test.default.http_logs
    GROUP BY 
        request, clientip
) AS RankedLogs
WHERE 
    rn <= 10
ORDER BY 
    cnt ASC;

@seankao-az
Copy link
Collaborator Author

Fail to analyze query. Cause: [UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_WINDOW] The feature is not supported: Referencing a lateral column alias `cnt` in window expression "row_number() OVER (PARTITION BY request, clientip ORDER BY lateralAliasReference(cnt) ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"

modifying the query to:

SELECT 
    request, 
    clientip, 
    cnt
FROM (
    SELECT 
        request, 
        clientip, 
        COUNT(*) AS cnt, 
        ROW_NUMBER() OVER (PARTITION BY request, clientip ORDER BY COUNT(*) ASC) AS rn
    FROM 
        myglue_test.default.http_logs
    GROUP BY 
        request, clientip
) AS RankedLogs
WHERE 
    rn <= 10
ORDER BY 
    cnt ASC;

results in:

{
	"status": "FAILED",
	"error": "{\"Message\":\"Spark exception. Cause: Job aborted due to stage failure: Total size of serialized results of 12 tasks (1905.8 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)\"}"
}

The physical plan being:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(5) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#35]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$3910/0x00007f4501250f10@10b46c72, obj#34: java.lang.String
      +- DeserializeToObject createexternalrow(request#15.toString, clientip#14.toString, staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cnt#8L, true, false, true), StructField(request,StringType,true), StructField(clientip,StringType,true), StructField(cnt,LongType,false)), obj#33: org.apache.spark.sql.Row
         +- *(4) Sort [cnt#8L ASC NULLS FIRST], true, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 1
                  +- Exchange rangepartitioning(cnt#8L ASC NULLS FIRST, 1000), ENSURE_REQUIREMENTS, [plan_id=205]
                     +- *(3) Project [request#15, clientip#14, cnt#8L]
                        +- *(3) Filter (rn#9 <= 10)
                           +- Window [row_number() windowspecdefinition(request#15, clientip#14, _w0#21L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9], [request#15, clientip#14], [_w0#21L ASC NULLS FIRST]
                              +- WindowGroupLimit [request#15, clientip#14], [_w0#21L ASC NULLS FIRST], row_number(), 10, Final
                                 +- *(2) Sort [request#15 ASC NULLS FIRST, clientip#14 ASC NULLS FIRST, _w0#21L ASC NULLS FIRST], false, 0
                                    +- *(2) HashAggregate(keys=[request#15, clientip#14], functions=[count(1)], output=[request#15, clientip#14, cnt#8L, _w0#21L], schema specialized)
                                       +- AQEShuffleRead coalesced
                                          +- ShuffleQueryStage 0
                                             +- Exchange hashpartitioning(request#15, clientip#14, 1000), ENSURE_REQUIREMENTS, [plan_id=97]
                                                +- *(1) HashAggregate(keys=[request#15, clientip#14], functions=[partial_count(1)], output=[request#15, clientip#14, count#37L], schema specialized)
                                                   +- *(1) Project [clientip#14, request#15]
                                                      +- FileScan json spark_catalog.default.http_logs[clientip#14,request#15,year#18,month#19,day#20] Batched: false, DataFilters: [], Format: JSON, Location: CatalogFileIndex(1 paths)[s3:..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<clientip:string,request:string>

Now the plan looks way different with the window function.

@seankao-az
Copy link
Collaborator Author

seankao-az commented Aug 29, 2024

But regardless, same failure can happen to SQL as well. Not a PPL rare command issue. And it can be solved by scaling up driver node or changing spark.driver.maxResultSize as error message suggests

@seankao-az seankao-az added the Lang:PPL Pipe Processing Language support label Aug 29, 2024
@seankao-az seankao-az moved this from Done to non-issue in PPL Commands Aug 29, 2024
@YANG-DB YANG-DB removed the untriaged label Oct 21, 2024
@YANG-DB YANG-DB self-assigned this Oct 21, 2024
@YANG-DB YANG-DB reopened this Oct 21, 2024
@YANG-DB
Copy link
Member

YANG-DB commented Oct 21, 2024

The new sampletable command will help reduce the amount of scanned data in exchange to precision

@YANG-DB YANG-DB removed the untriaged label Oct 25, 2024
@YANG-DB YANG-DB moved this from non-issue to In Progress in PPL Commands Oct 25, 2024
@YANG-DB YANG-DB moved this from In Progress to Done in PPL Commands Nov 12, 2024
@YANG-DB YANG-DB closed this as completed by moving to Done in PPL Commands Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Lang:PPL Pipe Processing Language support
Projects
Status: Done
Development

No branches or pull requests

3 participants