Skip to content

Commit

Permalink
feat: OTel rewrite w/o unliftio
Browse files Browse the repository at this point in the history
  • Loading branch information
develop7 committed Feb 12, 2024
1 parent 23737b3 commit 6b891c2
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 54 deletions.
55 changes: 28 additions & 27 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import OpenTelemetry.Instrumentation.Wai (newOpenTelemetryWaiMiddleware)
import OpenTelemetry.Trace (defaultSpanArguments)
import PostgREST.OpenTelemetry (inSpan)
import Protolude hiding (Handler)
import System.TimeIt (timeItT)
import OpenTelemetry.Trace (inSpan, defaultSpanArguments)

type Handler = ExceptT Error

Expand Down Expand Up @@ -109,7 +110,7 @@ postgrest conf appState connWorker =
Logger.middleware (configLogLevel conf) $
-- fromJust can be used, because the auth middleware will **always** add
-- some AuthResult to the vault.
\req respond -> inSpan (getOTelTracer appState) "PostgREST.postgrest" defaultSpanArguments $
\req respond -> inSpan (getOTelTracer appState) "PostgREST.postgrest" defaultSpanArguments $
case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
Expand Down Expand Up @@ -174,54 +175,54 @@ handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool ->
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime =
case (iAction, iTarget) of
(ActionRead headersOnly, TargetIdent identifier) -> do
(planTime', wrPlan) <- withOTel "" $ withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl Nothing (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
(planTime', wrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationCreate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationUpdate, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationSingleUpsert, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionMutate MutationDelete, TargetIdent identifier) -> do
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInvoke invMethod, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdTimeout $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdTimeout $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInspect headersOnly, TargetDefaultSpec tSchema) -> do
(planTime', iPlan) <- withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withTiming $ runQuery roleIsoLvl Nothing (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
(planTime', iPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.inspectPlan apiReq
(txTime', oaiResult) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl Nothing (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst

(ActionInfo, TargetIdent identifier) -> do
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst

(ActionInfo, TargetProc identifier _) -> do
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' Nothing respTime') pgrst

(ActionInfo, TargetDefaultSpec _) -> do
Expand All @@ -246,7 +247,7 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A

withTiming = calcTiming $ configServerTimingEnabled conf

withOTel label = inSpan (getOTelTracer appState) label defaultSpanArguments
withOTel label = inSpan (getOTelTracer appState) label defaultSpanArguments

calcTiming :: Bool -> Handler IO a -> Handler IO (Maybe Double, a)
calcTiming timingEnabled f = if timingEnabled
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ getSocketREST = stateSocketREST
getSocketAdmin :: AppState -> Maybe NS.Socket
getSocketAdmin = stateSocketAdmin

getOTelTracer :: AppState -> Tracer
getOTelTracer :: AppState -> Tracer
getOTelTracer = oTelTracer

-- | Log to stderr with local time
Expand Down
60 changes: 34 additions & 26 deletions src/PostgREST/OpenTelemetry.hs
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
{-# LANGUAGE ScopedTypeVariables, OverloadedLists #-}
module PostgREST.OpenTelemetry (withTracer) where

import OpenTelemetry.Trace (InstrumentationLibrary (..), Tracer,
initializeGlobalTracerProvider,
makeTracer, shutdownTracerProvider,
tracerOptions, SpanArguments (..), Span, createSpanWithoutCallStack, setStatus, SpanStatus (..), recordException, endSpan)
import PostgREST.Version (prettyVersion)
import qualified Data.HashMap.Strict as H
import Protolude
import OpenTelemetry.Attributes (Attribute, ToAttribute (..))
import qualified Data.Text as T
import Data.String (String)
import GHC.Stack
import OpenTelemetry.Context.ThreadLocal (getContext, adjustContext)
import OpenTelemetry.Context (insertSpan, lookupSpan, removeSpan)
import qualified Control.Exception as EUnsafe
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE ScopedTypeVariables #-}
module PostgREST.OpenTelemetry (withTracer, inSpan, inSpan', inSpan'') where

import qualified Control.Exception as EUnsafe
import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
import GHC.Base (String)
import GHC.Stack (SrcLoc (..))
import OpenTelemetry.Attributes (ToAttribute (..))
import OpenTelemetry.Context (insertSpan,
lookupSpan,
removeSpan)
import OpenTelemetry.Context.ThreadLocal (adjustContext,
getContext)
import OpenTelemetry.Trace (Attribute,
InstrumentationLibrary (..),
Span,
SpanArguments (..),
SpanStatus (..),
Tracer,
createSpanWithoutCallStack,
endSpan,
initializeGlobalTracerProvider,
makeTracer,
recordException,
setStatus,
shutdownTracerProvider,
tracerOptions)
import PostgREST.Version (prettyVersion)
import Protolude


withTracer :: Text -> (Tracer -> IO c) -> IO c
Expand All @@ -26,16 +40,10 @@ withTracer label f = bracket
instrumentationLibrary = InstrumentationLibrary {libraryName = label, libraryVersion = decodeUtf8 prettyVersion}


ownCodeAttributes :: (HasCallStack) => H.HashMap Text Attribute
ownCodeAttributes = case getCallStack callStack of
_ : caller : _ -> srcAttributes caller
_ -> mempty


callerAttributes :: (HasCallStack) => H.HashMap Text Attribute
callerAttributes = case getCallStack callStack of
_ : _ : caller : _ -> srcAttributes caller
_ -> mempty
_ -> mempty


srcAttributes :: (String, GHC.Stack.SrcLoc) -> H.HashMap Text Attribute
Expand Down Expand Up @@ -109,8 +117,8 @@ inSpan'' t n args f = do
@since 0.1.0.0
-}
bracketError :: (MonadIO m) => m a -> (Maybe SomeException -> a -> m b) -> (a -> m c) -> m c
bracketError before after thing = withRunInIO $ \run -> EUnsafe.mask $ \restore -> do
bracketError :: (Monad m) => m a -> (Maybe SomeException -> a -> m b) -> (a -> m c) -> m c
bracketError before after thing = liftIO $ \run -> EUnsafe.mask $ \restore -> do
x <- run before
res1 <- EUnsafe.try $ restore $ run $ thing x
case res1 of
Expand Down

0 comments on commit 6b891c2

Please sign in to comment.