Skip to content

Commit

Permalink
[SPARK-41426][UI] Protobuf serializer for ResourceProfileWrapper
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add Protobuf serializer for ResourceProfileWrapper

### Why are the changes needed?
Support fast and compact serialization/deserialization for ResourceProfileWrapper over RocksDB.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT

Closes apache#39105 from techaddict/SPARK-41426-ResourceProfileWrapper.

Authored-by: Sandeep Singh <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
techaddict authored and gengliangwang committed Dec 21, 2022
1 parent e23983a commit 7efc6f4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,8 @@ message RDDStorageInfo {

message RDDStorageInfoWrapper {
RDDStorageInfo info = 1;
}
}

message ResourceProfileWrapper {
ResourceProfileInfo rpInfo = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object ApplicationEnvironmentInfoWrapperSerializer {
builder.build()
}

private def serializeResourceProfileInfo(info: ResourceProfileInfo):
private[status] def serializeResourceProfileInfo(info: ResourceProfileInfo):
StoreTypes.ResourceProfileInfo = {
val builder = StoreTypes.ResourceProfileInfo.newBuilder()
builder.setId(info.id)
Expand All @@ -118,7 +118,7 @@ object ApplicationEnvironmentInfoWrapperSerializer {
builder.build()
}

private def deserializeResourceProfileInfo(info: StoreTypes.ResourceProfileInfo):
private[status] def deserializeResourceProfileInfo(info: StoreTypes.ResourceProfileInfo):
ResourceProfileInfo = {

new ResourceProfileInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer {
case a: ApplicationInfoWrapper => ApplicationInfoWrapperSerializer.serialize(a)
case r: RDDStorageInfoWrapper =>
RDDStorageInfoWrapperSerializer.serialize(r)
case r: ResourceProfileWrapper => ResourceProfileWrapperSerializer.serialize(r)
case other => super.serialize(other)
}

Expand All @@ -47,6 +48,8 @@ private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer {
ApplicationInfoWrapperSerializer.deserialize(data).asInstanceOf[T]
case _ if classOf[RDDStorageInfoWrapper].isAssignableFrom(klass) =>
RDDStorageInfoWrapperSerializer.deserialize(data).asInstanceOf[T]
case _ if classOf[ResourceProfileWrapper].isAssignableFrom(klass) =>
ResourceProfileWrapperSerializer.deserialize(data).asInstanceOf[T]
case other => super.deserialize(data, klass)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf

import org.apache.spark.status.ResourceProfileWrapper
import org.apache.spark.status.protobuf.ApplicationEnvironmentInfoWrapperSerializer.{deserializeResourceProfileInfo, serializeResourceProfileInfo}

object ResourceProfileWrapperSerializer {
def serialize(input: ResourceProfileWrapper): Array[Byte] = {
val builder = StoreTypes.ResourceProfileWrapper.newBuilder()
builder.setRpInfo(serializeResourceProfileInfo(input.rpInfo))
builder.build().toByteArray
}

def deserialize(bytes: Array[Byte]): ResourceProfileWrapper = {
val wrapper = StoreTypes.ResourceProfileWrapper.parseFrom(bytes)
new ResourceProfileWrapper(
rpInfo = deserializeResourceProfileInfo(wrapper.getRpInfo)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,4 +458,46 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
}
}
}

test("Resource Profile") {
val input = new ResourceProfileWrapper(
rpInfo = new ResourceProfileInfo(
id = 0,
executorResources = Map(
"0" -> new ExecutorResourceRequest(
resourceName = "exec1",
amount = 64,
discoveryScript = "script0",
vendor = "apache_2"),
"1" -> new ExecutorResourceRequest(
resourceName = "exec2",
amount = 65,
discoveryScript = "script1",
vendor = "apache_1")
),
taskResources = Map(
"0" -> new TaskResourceRequest(resourceName = "exec1", amount = 1),
"1" -> new TaskResourceRequest(resourceName = "exec2", amount = 1)
)
)
)

val bytes = serializer.serialize(input)
val result = serializer.deserialize(bytes, classOf[ResourceProfileWrapper])
assert(result.rpInfo.id == input.rpInfo.id)
assert(result.rpInfo.executorResources.size == input.rpInfo.executorResources.size)
assert(result.rpInfo.executorResources.keys.size == input.rpInfo.executorResources.keys.size)
result.rpInfo.executorResources.keysIterator.foreach { k =>
assert(result.rpInfo.executorResources.contains(k))
assert(input.rpInfo.executorResources.contains(k))
assert(result.rpInfo.executorResources(k) == input.rpInfo.executorResources(k))
}
assert(result.rpInfo.taskResources.size == input.rpInfo.taskResources.size)
assert(result.rpInfo.taskResources.keys.size == input.rpInfo.taskResources.keys.size)
result.rpInfo.taskResources.keysIterator.foreach { k =>
assert(result.rpInfo.taskResources.contains(k))
assert(input.rpInfo.taskResources.contains(k))
assert(result.rpInfo.taskResources(k) == input.rpInfo.taskResources(k))
}
}
}

0 comments on commit 7efc6f4

Please sign in to comment.