Skip to content

Commit

Permalink
Fix so that tick stream can be properly closed
Browse files Browse the repository at this point in the history
  • Loading branch information
arowM committed Jun 6, 2024
1 parent e77dee9 commit 2b612bd
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 82 deletions.
150 changes: 76 additions & 74 deletions src/Internal/Core.elm
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Internal.Core exposing
, syncPromise
, liftPromiseMemory, maybeLiftPromiseMemory
, neverResolved
, portRequest, portStream, PortRequest, releasePorts
, portRequest, portStream, PortRequest
, httpRequest, httpBytesRequest, HttpRequestError(..), HttpRequest
, HttpRequestBody(..), RequestId(..)
, now, here
Expand All @@ -28,6 +28,7 @@ module Internal.Core exposing
, assertionError
, RandomValue(..), RandomRequest(..), RandomSpec(..), isRequestForSpec
, Stream(..)
, releaseStreamResources
, onGoingProcedure
, newLayer, onLayer
, init, update, NewState, Log(..)
Expand Down Expand Up @@ -58,7 +59,7 @@ module Internal.Core exposing
@docs syncPromise
@docs liftPromiseMemory, maybeLiftPromiseMemory
@docs neverResolved
@docs portRequest, portStream, PortRequest, releasePorts
@docs portRequest, portStream, PortRequest
@docs httpRequest, httpBytesRequest, HttpRequestError, HttpRequest
@docs HttpRequestBody, RequestId
@docs now, here
Expand Down Expand Up @@ -88,6 +89,7 @@ module Internal.Core exposing
# Stream
@docs Stream
@docs releaseStreamResources
# Helper Procedures
Expand Down Expand Up @@ -167,11 +169,19 @@ type alias Context m =
, nextRequestId : RequestId
, nextLayerId : LayerId
, nextIntervalId : Dict Int Int
, subs : List (m -> Maybe ( RequestId, Sub Msg ))
, ticks : List TickRequest
, ports : List PortRequest
}


{-| -}
type alias TickRequest =
{ request : RequestId
, layer : LayerId
, msec : Float
}


{-| -}
type alias PortRequest =
{ request : RequestId
Expand Down Expand Up @@ -925,7 +935,10 @@ onLayer_ lid param (Promise prom1) =
, nextLayerId =
context.nextLayerId
, nextIntervalId = context.nextIntervalId
, subs = []
, ticks =
List.filter
(\p -> p.layer /= lid)
context.ticks
, ports =
List.filter
(\p ->
Expand Down Expand Up @@ -955,13 +968,8 @@ onLayer_ lid param (Promise prom1) =
, nextRequestId = eff1.newContext.nextRequestId
, nextLayerId = eff1.newContext.nextLayerId
, nextIntervalId = eff1.newContext.nextIntervalId
, subs =
context.subs
++ List.map
(\f m ->
f { link = param.getLink m, body = Nothing }
)
eff1.newContext.subs
, ticks =
eff1.newContext.ticks
, ports =
eff1.newContext.ports
}
Expand Down Expand Up @@ -1030,7 +1038,7 @@ onLayer_ lid param (Promise prom1) =
, nextLayerId =
context.nextLayerId
, nextIntervalId = context.nextIntervalId
, subs = []
, ticks = context.ticks
, ports = context.ports
}

Expand Down Expand Up @@ -1077,18 +1085,8 @@ onLayer_ lid param (Promise prom1) =
, nextRequestId = eff1.newContext.nextRequestId
, nextLayerId = eff1.newContext.nextLayerId
, nextIntervalId = eff1.newContext.nextIntervalId
, subs =
context.subs
++ List.map
(\f m ->
f
{ link = param.getLink m
, body =
param.getBody m
|> Maybe.map layerStateOf
}
)
eff1.newContext.subs
, ticks =
eff1.newContext.ticks
, ports =
eff1.newContext.ports
}
Expand Down Expand Up @@ -1208,6 +1206,34 @@ releasePorts released =
}


{-| -}
releaseTicks : List RequestId -> Promise m ()
releaseTicks released =
Promise <|
\context ->
{ newContext =
{ context
| ticks =
List.filter
(\p -> not <| List.member p.request released)
context.ticks
}
, realCmds = []
, logs = []
, state = Resolved (LayerExist ())
}


{-| -}
releaseStreamResources : List RequestId -> Promise m ()
releaseStreamResources released =
releasePorts released
|> andThenPromise
(\_ ->
releaseTicks released
)


{-| -}
maybeLiftPromiseMemory :
{ get : m -> Maybe m1
Expand Down Expand Up @@ -1248,7 +1274,7 @@ maybeLiftPromiseMemory o (Promise prom1) =
, nextRequestId = context.nextRequestId
, nextLayerId = context.nextLayerId
, nextIntervalId = context.nextIntervalId
, subs = []
, ticks = context.ticks
, ports = context.ports
}
in
Expand All @@ -1269,13 +1295,7 @@ maybeLiftPromiseMemory o (Promise prom1) =
, nextRequestId = eff1.newContext.nextRequestId
, nextLayerId = eff1.newContext.nextLayerId
, nextIntervalId = eff1.newContext.nextIntervalId
, subs =
context.subs
++ List.map
(\f m ->
o.get m |> Maybe.andThen f
)
eff1.newContext.subs
, ticks = eff1.newContext.ticks
, ports = eff1.newContext.ports
}
, realCmds = eff1.realCmds
Expand Down Expand Up @@ -1331,7 +1351,7 @@ liftPromiseMemory o (Promise prom1) =
, nextRequestId = context.nextRequestId
, nextLayerId = context.nextLayerId
, nextIntervalId = context.nextIntervalId
, subs = []
, ticks = context.ticks
, ports = context.ports
}
in
Expand All @@ -1352,13 +1372,7 @@ liftPromiseMemory o (Promise prom1) =
, nextRequestId = eff1.newContext.nextRequestId
, nextLayerId = eff1.newContext.nextLayerId
, nextIntervalId = eff1.newContext.nextIntervalId
, subs =
context.subs
++ List.map
(\f m ->
o.get m |> f
)
eff1.newContext.subs
, ticks = eff1.newContext.ticks
, ports = eff1.newContext.ports
}
, realCmds = eff1.realCmds
Expand Down Expand Up @@ -1919,22 +1933,14 @@ listenTimeEvery interval handler =
newContext =
{ context
| nextRequestId = incRequestId context.nextRequestId
, subs =
(\_ ->
Just
( myRequestId
, Time.every (toFloat interval) toIntervalMsg
)
)
:: context.subs
, ticks =
{ request = myRequestId
, layer = thisLayerId
, msec = toFloat interval
}
:: context.ticks
}

toIntervalMsg timestamp =
IntervalMsg
{ requestId = myRequestId
, timestamp = timestamp
}

awaitForever : Msg -> m -> Promise m ()
awaitForever msg _ =
case msg of
Expand Down Expand Up @@ -1985,14 +1991,12 @@ tick interval =
{ context
| nextRequestId = incRequestId context.nextRequestId
, nextIntervalId = Dict.insert interval (thisIntervalId + 1) context.nextIntervalId
, subs =
(\_ ->
Just
( myRequestId
, Time.every (toKeyedInterval interval) toIntervalMsg
)
)
:: context.subs
, ticks =
{ request = myRequestId
, layer = thisLayerId
, msec = toKeyedInterval interval
}
:: context.ticks
}

-- Work around for bug about `Time.every`.
Expand All @@ -2002,12 +2006,6 @@ tick interval =
toKeyedInterval n =
toFloat n + (toFloat thisIntervalId * 1.0e-6)

toIntervalMsg timestamp =
IntervalMsg
{ requestId = myRequestId
, timestamp = timestamp
}

nextStream : () -> Stream Posix
nextStream () =
ActiveStream
Expand Down Expand Up @@ -2728,7 +2726,7 @@ initContext memory =
, nextRequestId = initRequestId
, nextLayerId = incLayerId initLayerId
, nextIntervalId = Dict.empty
, subs = []
, ticks = []
, ports = []
}

Expand Down Expand Up @@ -2808,12 +2806,16 @@ documentView f model =
{-| -}
subscriptions : Model memory -> Sub Msg
subscriptions (Model model) =
[ List.filterMap
(\f ->
f model.context.layer.state
[ List.map
(\req ->
Time.every req.msec <|
\timestamp ->
IntervalMsg
{ requestId = req.request
, timestamp = timestamp
}
)
model.context.subs
|> List.map Tuple.second
model.context.ticks
, List.map .sub model.context.ports
]
|> List.concat
Expand Down
16 changes: 8 additions & 8 deletions src/Tepa/Stream.elm
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ run f stream =
-- IGNORE TCO
case stream of
Core.EndOfStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released

Core.ActiveStream param ->
Tepa.sequence
[ Core.releasePorts param.released
[ Core.releaseStreamResources param.released
, Core.justAwaitPromise <|
\msg _ ->
let
Expand Down Expand Up @@ -435,15 +435,15 @@ awaitAll_ reversed stream =
-- IGNORE TCO
case stream of
Core.EndOfStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released
|> Tepa.andThen
(\() ->
Tepa.succeed <|
List.reverse reversed
)

Core.ActiveStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released
|> Tepa.andThen
(\() ->
Core.justAwaitPromise <|
Expand All @@ -469,15 +469,15 @@ awaitWhile_ reversed p stream =
-- IGNORE TCO
case stream of
Core.EndOfStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released
|> Tepa.andThen
(\() ->
Tepa.succeed <|
List.reverse reversed
)

Core.ActiveStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released
|> Tepa.andThen
(\() ->
Core.justAwaitPromise <|
Expand Down Expand Up @@ -553,10 +553,10 @@ while f stream =
-- IGNORE TCO
case stream of
Core.EndOfStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released

Core.ActiveStream param ->
Core.releasePorts param.released
Core.releaseStreamResources param.released
|> Tepa.andThen
(\() ->
Core.justAwaitPromise <|
Expand Down

0 comments on commit 2b612bd

Please sign in to comment.