diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 9752c6491cf7b..d406add82e4bc 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -239,4 +239,8 @@ message RDDStorageInfo { message RDDStorageInfoWrapper { RDDStorageInfo info = 1; -} \ No newline at end of file +} + +message ResourceProfileWrapper { + ResourceProfileInfo rpInfo = 1; +} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala index 1649200b94a18..b83888c3a5f8d 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala @@ -97,7 +97,7 @@ object ApplicationEnvironmentInfoWrapperSerializer { builder.build() } - private def serializeResourceProfileInfo(info: ResourceProfileInfo): + private[status] def serializeResourceProfileInfo(info: ResourceProfileInfo): StoreTypes.ResourceProfileInfo = { val builder = StoreTypes.ResourceProfileInfo.newBuilder() builder.setId(info.id) @@ -118,7 +118,7 @@ object ApplicationEnvironmentInfoWrapperSerializer { builder.build() } - private def deserializeResourceProfileInfo(info: StoreTypes.ResourceProfileInfo): + private[status] def deserializeResourceProfileInfo(info: StoreTypes.ResourceProfileInfo): ResourceProfileInfo = { new ResourceProfileInfo( diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala index 3905a7ea4aa32..c71ab941035d1 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala @@ -31,6 +31,7 @@ private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer { case a: ApplicationInfoWrapper => ApplicationInfoWrapperSerializer.serialize(a) case r: RDDStorageInfoWrapper => RDDStorageInfoWrapperSerializer.serialize(r) + case r: ResourceProfileWrapper => ResourceProfileWrapperSerializer.serialize(r) case other => super.serialize(other) } @@ -47,6 +48,8 @@ private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer { ApplicationInfoWrapperSerializer.deserialize(data).asInstanceOf[T] case _ if classOf[RDDStorageInfoWrapper].isAssignableFrom(klass) => RDDStorageInfoWrapperSerializer.deserialize(data).asInstanceOf[T] + case _ if classOf[ResourceProfileWrapper].isAssignableFrom(klass) => + ResourceProfileWrapperSerializer.deserialize(data).asInstanceOf[T] case other => super.deserialize(data, klass) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala new file mode 100644 index 0000000000000..fed59a8f30d75 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.spark.status.ResourceProfileWrapper +import org.apache.spark.status.protobuf.ApplicationEnvironmentInfoWrapperSerializer.{deserializeResourceProfileInfo, serializeResourceProfileInfo} + +object ResourceProfileWrapperSerializer { + def serialize(input: ResourceProfileWrapper): Array[Byte] = { + val builder = StoreTypes.ResourceProfileWrapper.newBuilder() + builder.setRpInfo(serializeResourceProfileInfo(input.rpInfo)) + builder.build().toByteArray + } + + def deserialize(bytes: Array[Byte]): ResourceProfileWrapper = { + val wrapper = StoreTypes.ResourceProfileWrapper.parseFrom(bytes) + new ResourceProfileWrapper( + rpInfo = deserializeResourceProfileInfo(wrapper.getRpInfo) + ) + } +} diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index 35c30198a6c7a..996a4f0d70084 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -458,4 +458,46 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { } } } + + test("Resource Profile") { + val input = new ResourceProfileWrapper( + rpInfo = new ResourceProfileInfo( + id = 0, + executorResources = Map( + "0" -> new ExecutorResourceRequest( + resourceName = "exec1", + amount = 64, + discoveryScript = "script0", + vendor = "apache_2"), + "1" -> new ExecutorResourceRequest( + resourceName = "exec2", + amount = 65, + discoveryScript = "script1", + vendor = "apache_1") + ), + taskResources = Map( + "0" -> new TaskResourceRequest(resourceName = "exec1", amount = 1), + "1" -> new TaskResourceRequest(resourceName = "exec2", amount = 1) + ) + ) + ) + + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[ResourceProfileWrapper]) + assert(result.rpInfo.id == input.rpInfo.id) + assert(result.rpInfo.executorResources.size == input.rpInfo.executorResources.size) + assert(result.rpInfo.executorResources.keys.size == input.rpInfo.executorResources.keys.size) + result.rpInfo.executorResources.keysIterator.foreach { k => + assert(result.rpInfo.executorResources.contains(k)) + assert(input.rpInfo.executorResources.contains(k)) + assert(result.rpInfo.executorResources(k) == input.rpInfo.executorResources(k)) + } + assert(result.rpInfo.taskResources.size == input.rpInfo.taskResources.size) + assert(result.rpInfo.taskResources.keys.size == input.rpInfo.taskResources.keys.size) + result.rpInfo.taskResources.keysIterator.foreach { k => + assert(result.rpInfo.taskResources.contains(k)) + assert(input.rpInfo.taskResources.contains(k)) + assert(result.rpInfo.taskResources(k) == input.rpInfo.taskResources(k)) + } + } }