Skip to content

Commit

Permalink
chore: Scala client for certified events (#2045)
Browse files Browse the repository at this point in the history
* Scala client for certified events

* Fixing bug, where I was extracting region and pvi environment details from hadoop configuration. Changed it to retrieve rather from spark configuration

* This state represents code that works in notebook after porting section of the codes separately in Edog.

* Token expiry check, removing unused imports

* Creating JWT Token Parser. Removing dependencies that are no longer needed for token parsing.

* Restoring resthelpers.scala to prior state. Adding exception handling and few PR comments.

* 1) Restricting access level to class properties, and functions. 2) Cleaning unused imports. 3) Closing unused resources like file handler, etc. 4) And fixing few scala style checks like calling convention for 0 parameter func, etc.

* Refactoring to support single responsibility as much as possible and adding tests

* Checking an empty http response content, before parsing

* Fixing the early http client termination. At this point we are successfully emitting telemetry.

* Fixing typos

* Fixing test failure.

* At this point addressing just pr comments.

* Addressing PR comments

* Removing token used for test and created a dummy token creator.

* Adding abstract classes to represent certified event payload, adding test for it, addressing PR comments.

* Turning code immutable as much as possible and removing few tests and classes that were replaced by alternative approach.

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/UsageUtils.scala

Co-authored-by: Mark Hamilton <[email protected]>

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/UsageUtils.scala

Co-authored-by: Mark Hamilton <[email protected]>

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/TokenUtils.scala

Co-authored-by: Mark Hamilton <[email protected]>

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/TokenUtils.scala

Co-authored-by: Mark Hamilton <[email protected]>

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/TokenUtils.scala

Co-authored-by: Mark Hamilton <[email protected]>

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/HostEndpointUtils.scala

Co-authored-by: Mark Hamilton <[email protected]>

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usage/FabricTokenServiceClient.scala

Co-authored-by: Mark Hamilton <[email protected]>

* porting some api realted to web calls to WebUtils.scala, making few calls succinct, addressing potential null exceptions, etc.

* Fixing build error that was partly introduced from missing to remove uavailable reference and partly syncing to remote.

* refactoring to change few object into traits, and addressing few PR comments

* changing the case of certified event activity name

* Removing some class and associated test. Turning some variable lazy.

* Changing some parameter to Option and then removing unused imports

* 1) Removing token extraction via fabric token service.

* some cleanup

* Removing token caching.

* neaten PR

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala

* add futures

* Adding Fabric environment check and using it to decide emitting certified event

* Adding platform check before emitting CE. Turning calls to log CE asynchronous.

* Modifying logic to determine if platform is Fabric only if it is Synapse internal

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/common/PlatformDetails.scala

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/common/PlatformDetails.scala

* Apply suggestions from code review

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala

* Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala

* Update .gitignore

* Update tools/docgen/docgen/manifest.yaml

* Update environment.yml

* Update environment.yml

---------

Co-authored-by: Mark Hamilton <[email protected]>
Co-authored-by: Mark Hamilton <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2023
1 parent 21cff04 commit 9a4800c
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ package com.microsoft.azure.synapse.ml.logging

import com.microsoft.azure.synapse.ml.build.BuildInfo
import com.microsoft.azure.synapse.ml.logging.common.SASScrubber
import com.microsoft.azure.synapse.ml.logging.fabric.CertifiedEventClient.logToCertifiedEvents
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class RequiredLogFields(uid: String,
className: String,
Expand Down Expand Up @@ -110,16 +113,29 @@ trait SynapseMLLogging extends Logging {

protected def logBase(methodName: String,
numCols: Option[Int],
executionSeconds: Option[Double]
executionSeconds: Option[Double],
logCertifiedEvent: Boolean = false
): Unit = {
logBase(getPayload(
methodName,
numCols,
executionSeconds,
None))
None), logCertifiedEvent)
}

protected def logBase(info: Map[String, String]): Unit = {
protected def logBase(info: Map[String, String], logCertifiedEvent: Boolean): Unit = {
if (logCertifiedEvent) {
Future {
logToCertifiedEvents(
info("libraryName"),
info("method"),
info -- Seq("libraryName", "method")
)
}.failed.map {
case e: Exception => logErrorBase("certifiedEventLogging", e)
}
}

logInfo(info.toJson.compactPrint)
}

Expand All @@ -130,23 +146,23 @@ trait SynapseMLLogging extends Logging {
}

def logClass(): Unit = {
logBase("constructor", None, None)
logBase("constructor", None, None, true)
}

def logFit[T](f: => T, columns: Int): T = {
logVerb("fit", f, Some(columns))
def logFit[T](f: => T, columns: Int, logCertifiedEvent: Boolean = true): T = {
logVerb("fit", f, columns, logCertifiedEvent)
}

def logTransform[T](f: => T, columns: Int): T = {
logVerb("transform", f, Some(columns))
def logTransform[T](f: => T, columns: Int, logCertifiedEvent: Boolean = true): T = {
logVerb("transform", f, columns, logCertifiedEvent)
}

def logVerb[T](verb: String, f: => T, columns: Option[Int] = None): T = {
def logVerb[T](verb: String, f: => T, columns: Int = -1, logCertifiedEvent: Boolean = false): T = {
val startTime = System.nanoTime()
try {
val ret = f
logBase(verb, columns, Some((System.nanoTime() - startTime) / 1e9))
ret
// Begin emitting certified event.
logBase(verb, Some(columns), Some((System.nanoTime() - startTime) / 1e9), logCertifiedEvent)
f
} catch {
case e: Exception =>
logErrorBase(verb, e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.common

import org.apache.spark.sql.SparkSession

object PlatformDetails {
val PlatformSynapseInternal = "synapse_internal"
val PlatformSynapse = "synapse"
val PlatformBinder = "binder"
val PlatformDatabricks = "databricks"
val PlatformUnknown = "unknown"
val SynapseProjectName = "Microsoft.ProjectArcadia"

def currentPlatform(): String = {
val azureService = sys.env.get("AZURE_SERVICE")
azureService match {
case Some(serviceName) if serviceName == SynapseProjectName =>
val spark = SparkSession.builder.getOrCreate()
val clusterType = spark.conf.get("spark.cluster.type")
if (clusterType == "synapse") PlatformSynapse else PlatformSynapseInternal
case _ if new java.io.File("/dbfs").exists() => PlatformDatabricks
case _ if sys.env.get("BINDER_LAUNCH_HOST").isDefined => PlatformBinder
case _ => PlatformUnknown
}
}

def runningOnSynapseInternal(): Boolean = currentPlatform() == PlatformSynapseInternal

def runningOnSynapse(): Boolean = currentPlatform() == PlatformSynapse

def runningOnFabric(): Boolean = runningOnSynapseInternal
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.fabric

import org.apache.spark.sql.SparkSession
import spray.json.DefaultJsonProtocol.{StringJsonFormat, _}
import spray.json._

import java.time.Instant
import java.util.UUID
import scala.reflect.runtime.currentMirror
import scala.reflect.runtime.universe._

import com.microsoft.azure.synapse.ml.logging.common.PlatformDetails.runningOnFabric

object CertifiedEventClient extends RESTUtils {

private val PbiGlobalServiceEndpoints = Map(
"public" -> "https://api.powerbi.com/",
"fairfax" -> "https://api.powerbigov.us",
"mooncake" -> "https://api.powerbi.cn",
"blackforest" -> "https://app.powerbi.de",
"msit" -> "https://api.powerbi.com/",
"prod" -> "https://api.powerbi.com/",
"int3" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/",
"dxt" -> "https://powerbistagingapi.analysis.windows.net/",
"edog" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/",
"dev" -> "https://onebox-redirect.analysis.windows-int.net/",
"console" -> "http://localhost:5001/",
"daily" -> "https://dailyapi.powerbi.com/")


private lazy val CertifiedEventUri = getCertifiedEventUri

private def getAccessToken: String = {
val objectName = "com.microsoft.azure.trident.tokenlibrary.TokenLibrary"
val mirror = currentMirror
val module = mirror.staticModule(objectName)
val obj = mirror.reflectModule(module).instance
val objType = mirror.reflect(obj).symbol.toType
val methodName = "getAccessToken"
val methodSymbols = objType.decl(TermName(methodName)).asTerm.alternatives
val argType = typeOf[String]
val selectedMethodSymbol = methodSymbols.find { m =>
m.asMethod.paramLists match {
case List(List(param)) => param.typeSignature =:= argType
case _ => false
}
}.getOrElse(throw new NoSuchMethodException(s"Method $methodName with argument type $argType not found"))
val methodMirror = mirror.reflect(obj).reflectMethod(selectedMethodSymbol.asMethod)
methodMirror("pbi").asInstanceOf[String]
}

private def getHeaders: Map[String, String] = {
Map(
"Authorization" -> s"Bearer $getAccessToken",
"RequestId" -> UUID.randomUUID().toString,
"Content-Type" -> "application/json",
"x-ms-workload-resource-moniker" -> UUID.randomUUID().toString
)
}

private def getCertifiedEventUri: String = {
val sc = SparkSession.builder().getOrCreate().sparkContext
val workspaceId = sc.hadoopConfiguration.get("trident.artifact.workspace.id")
val capacityId = sc.hadoopConfiguration.get("trident.capacity.id")
val pbiEnv = sc.getConf.get("spark.trident.pbienv").toLowerCase()

val clusterDetailUrl = s"${PbiGlobalServiceEndpoints(pbiEnv)}powerbi/globalservice/v201606/clusterDetails"
val headers = getHeaders

val clusterUrl = usageGet(clusterDetailUrl, headers)
.asJsObject.fields("clusterUrl").convertTo[String]
val tokenUrl: String = s"$clusterUrl/metadata/v201606/generatemwctokenv2"

val payload =
s"""{
|"capacityObjectId": "$capacityId",
|"workspaceObjectId": "$workspaceId",
|"workloadType": "ML"
|}""".stripMargin


val host = usagePost(tokenUrl, payload, headers)
.asJsObject.fields("TargetUriHost").convertTo[String]

s"https://$host/webapi/Capacities/$capacityId/workloads/ML/MLAdmin/Automatic/workspaceid/$workspaceId/telemetry"
}


private[ml] def logToCertifiedEvents(featureName: String,
activityName: String,
attributes: Map[String, String]): Unit = {

if (runningOnFabric) {
val payload =
s"""{
|"timestamp":${Instant.now().getEpochSecond},
|"feature_name":"$featureName",
|"activity_name":"$activityName",
|"attributes":${attributes.toJson.compactPrint}
|}""".stripMargin

usagePost(CertifiedEventUri, payload, getHeaders)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.fabric

import com.microsoft.azure.synapse.ml.io.http.RESTHelpers
import org.apache.commons.io.IOUtils
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
import spray.json.{JsObject, JsValue, _}

trait RESTUtils {
def usagePost(url: String, body: String, headers: Map[String, String]): JsValue = {
val request = new HttpPost(url)

for ((k, v) <- headers)
request.addHeader(k, v)

request.setEntity(new StringEntity(body))

val response = RESTHelpers.safeSend(request, close = false)
val parsedResponse = parseResponse(response)
response.close()
parsedResponse
}

def usageGet(url: String, headers: Map[String, String]): JsValue = {
val request = new HttpGet(url)
for ((k, v) <- headers)
request.addHeader(k, v)

val response = RESTHelpers.safeSend(request, close = false)
val result = parseResponse(response)
response.close()
result
}

private def parseResponse(response: CloseableHttpResponse): JsValue = {
val content: String = IOUtils.toString(response.getEntity.getContent, "utf-8")
if (content.nonEmpty) {
content.parseJson
} else {
JsObject()
}
}

}
1 change: 0 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,3 @@ dependencies:
- pypandoc
- markdownify
- traitlets

0 comments on commit 9a4800c

Please sign in to comment.