Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ base ] Add non-blocking and timeout variants for channelGet #3435

Open
wants to merge 55 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a4aae0a
Initial implementation of non-blocking version of blodwen-channel-get.
Matthew-Mosior Nov 25, 2024
f261821
Adding channelGetNonBlocking function to System.Concurrency.idr (non-…
Matthew-Mosior Nov 25, 2024
68ecf8b
Adding matching mutex-release call to initial mutex-acquire call.
Matthew-Mosior Nov 25, 2024
89f7991
Fixing indentation in blodwen-channel-get-non-blocking.
Matthew-Mosior Nov 25, 2024
9381947
Adding matching mutex-release call to initial mutex-acquire call (aga…
Matthew-Mosior Nov 25, 2024
a5610fa
Fixing return values for blodwen-channel-get-non-blocking.
Matthew-Mosior Nov 25, 2024
fb04471
Fixing extern type signature and channelGetNonBlocking function, alon…
Matthew-Mosior Nov 25, 2024
9a53f9a
Removing blocking aspect from blodwen-channel-get-non-blocking.
Matthew-Mosior Nov 25, 2024
818c5ba
Fixing styling of blodwen-channel-get-non-blocking.
Matthew-Mosior Nov 25, 2024
fe1dd2d
Removing extra mutex-release call.
Matthew-Mosior Nov 25, 2024
8754137
Removing duplicate val calls.
Matthew-Mosior Nov 25, 2024
7936ae1
Fixing return type.
Matthew-Mosior Nov 25, 2024
45dfec5
Fixing return type.
Matthew-Mosior Nov 25, 2024
21e877a
Fixing channelGetNonBlocking.
Matthew-Mosior Nov 25, 2024
82defee
Reverting return type fix.
Matthew-Mosior Nov 25, 2024
af8f499
Removing unnecessary comments.
Matthew-Mosior Nov 25, 2024
dbf8e83
Remove incorrect documentation.
Matthew-Mosior Nov 25, 2024
fadb1e9
Altering blodwen-channel-get-non-blocking to error when mutex cant be…
Matthew-Mosior Nov 25, 2024
fb8fb26
Cleaning up commented out code.
Matthew-Mosior Nov 25, 2024
2b6077b
Adding blodwen-channel-check scheme function to enable correct handli…
Matthew-Mosior Nov 25, 2024
f8c24af
Optimizing blodwen-channel-get-non-blocking.
Matthew-Mosior Nov 26, 2024
9e7ba48
Changing Boolean comparision to Int (compatible with expected Prim ty…
Matthew-Mosior Nov 26, 2024
0684a10
Adding decoding support for channelGetNonBlocking.
Matthew-Mosior Nov 27, 2024
56d42bd
Removing extra %foreign pragmas.
Matthew-Mosior Nov 27, 2024
e8a39f3
Removing extra %foreign.
Matthew-Mosior Nov 27, 2024
5a18386
Fixing blodwen-channel-get-non-blocking by removing extra set of pare…
Matthew-Mosior Nov 27, 2024
6e8c4a5
Adding channelGetNonBlocking test.
Matthew-Mosior Nov 27, 2024
0c4b2dd
Changing 0 to () for the return on failed mutex acquisition or an emp…
Matthew-Mosior Nov 27, 2024
042799d
Removing extraneous whitespace.
Matthew-Mosior Nov 27, 2024
636ca50
Removing unnecessary scheme decoding.
Matthew-Mosior Nov 27, 2024
2d1b96c
Fixing initial channelGetNonBlocking test.
Matthew-Mosior Nov 27, 2024
4a4699f
Fixing indentation for blodwen-channel-get-non-blocking.
Matthew-Mosior Nov 27, 2024
5db18d8
Adding back decoding support, fixing blodwen-channel-get-non-blocking…
Nov 28, 2024
87265de
Adding another test for channelGetNonBlocking.
Nov 28, 2024
714e5b1
Removing tabs from blodwen-channel-get-non-blocking.
Nov 28, 2024
126a52a
Adding initial skeleton for blodwen-channel-get-with-timeout.
Dec 2, 2024
8d78679
Adding initial blodwen-channel-get-with-timeout function and associat…
Matthew-Mosior Dec 3, 2024
b3fa699
Fixing small issue with blodwen-channel-get-with-timeout.
Dec 3, 2024
8ee6e4b
Adding corrected blodwen-channel-get-with-timeout implementation and …
Dec 4, 2024
fdc11b6
Updating documentation for channelGetWithTimeout function.
Dec 4, 2024
98b485a
Adding Scheme Nat implementation and fixing test.
Dec 4, 2024
8eb7457
Re-ordering imports for channel007 test.
Dec 4, 2024
4f85729
Adding changes of this PR to CHANGELOG_NEXT.md.
Dec 4, 2024
80fafa9
Merge branch 'main' into Issue-3424
Matthew-Mosior Dec 4, 2024
474bd6b
Fixing linting issues.
Dec 4, 2024
cd24d35
Adding additional sleep in channel007 test.
Dec 4, 2024
970ace4
Addressing comments.
Dec 5, 2024
0a79b46
Addressing comments (fixed).
Dec 5, 2024
6a5129f
Addressing comments (tests).
Dec 5, 2024
917b240
Fixing linting.
Dec 5, 2024
2a70bad
Addressing comments.
Matthew-Mosior Dec 6, 2024
6b13dc8
Addressing @cypheon's comments.
Matthew-Mosior Dec 7, 2024
fa50df7
Removing duplicate tests from allscheme.
Matthew-Mosior Dec 7, 2024
ef41d27
Merge branch 'main' into Issue-3424
Matthew-Mosior Dec 13, 2024
eddaf30
Changing Nat to Int for FFI type on prim__channelGetWithTimeout.
Matthew-Mosior Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG_NEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ This CHANGELOG describes the merged but unreleased changes. Please see [CHANGELO

* Several functions like `pop`, `differenceMap` and `toSortedMap` were added to `Data.Sorted{Map|Set}`

* Added `System.Concurrency.channelGetNonBlocking` for the chez backend.

* Added `System.Concurrency.channelGetWithTimeout` for the chez backend.

#### Contrib

* `Data.List.Lazy` was moved from `contrib` to `base`.
Expand Down
225 changes: 221 additions & 4 deletions libs/base/System/Concurrency.idr
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,15 @@ data Condition : Type where [external]
%foreign "scheme,racket:blodwen-make-cv"
"scheme,chez:blodwen-make-condition"
prim__makeCondition : PrimIO Condition

%foreign "scheme,racket:blodwen-cv-wait"
"scheme,chez:blodwen-condition-wait"
prim__conditionWait : Condition -> Mutex -> PrimIO ()

%foreign "scheme,chez:blodwen-condition-wait-timeout"
-- "scheme,racket:blodwen-cv-wait-timeout"
prim__conditionWaitTimeout : Condition -> Mutex -> Int -> PrimIO ()

%foreign "scheme,racket:blodwen-cv-signal"
"scheme,chez:blodwen-condition-signal"
prim__conditionSignal : Condition -> PrimIO ()

%foreign "scheme,racket:blodwen-cv-broadcast"
"scheme,chez:blodwen-condition-broadcast"
prim__conditionBroadcast : Condition -> PrimIO ()
Expand Down Expand Up @@ -183,10 +179,169 @@ barrierWait barrier = primIO (prim__barrierWait barrier)
export
data Channel : Type -> Type where [external]

data ChannelObj : Type where [external]

data ChannelSchemeObj : Type where
Null : ChannelSchemeObj
Cons : ChannelSchemeObj -> ChannelSchemeObj -> ChannelSchemeObj
IntegerVal : Integer -> ChannelSchemeObj
FloatVal : Double -> ChannelSchemeObj
StringVal : String -> ChannelSchemeObj
CharVal : Char -> ChannelSchemeObj
Symbol : String -> ChannelSchemeObj
Box : ChannelSchemeObj -> ChannelSchemeObj
Vector : Integer -> List ChannelSchemeObj -> ChannelSchemeObj
Procedure : ChannelObj -> ChannelSchemeObj
Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved

export
interface Scheme a where
Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved
fromScheme : ChannelSchemeObj -> Maybe a

export
Scheme Integer where
fromScheme (IntegerVal x) = Just x
fromScheme _ = Nothing

export
Scheme Nat where
fromScheme (IntegerVal x) = Just $ integerToNat x
fromScheme _ = Nothing

export
Scheme Int where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Int8 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Int16 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Int32 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Int64 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Bits8 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Bits16 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Bits32 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme Bits64 where
fromScheme (IntegerVal x) = Just (cast x)
fromScheme _ = Nothing

export
Scheme String where
fromScheme (StringVal x) = Just x
fromScheme _ = Nothing

export
Scheme Double where
fromScheme (FloatVal x) = Just x
fromScheme _ = Nothing

export
Scheme Char where
fromScheme (CharVal x) = Just x
fromScheme _ = Nothing

export
Scheme Bool where
fromScheme (IntegerVal 0) = Just False
fromScheme (IntegerVal 1) = Just True
fromScheme _ = Nothing

export
Scheme a => Scheme (List a) where
fromScheme Null = Just []
fromScheme (Cons x xs) = Just $ !(fromScheme x) :: !(fromScheme xs)
fromScheme _ = Nothing

export
(Scheme a, Scheme b) => Scheme (a, b) where
fromScheme (Cons x y) = Just (!(fromScheme x), !(fromScheme y))
fromScheme _ = Nothing

export
Scheme a => Scheme (Maybe a) where
fromScheme Null = Just Nothing
fromScheme (Box x) = Just $ Just !(fromScheme x)
fromScheme _ = Nothing

%foreign "scheme:blodwen-is-number"
prim_isNumber : ChannelObj -> Int
%foreign "scheme:blodwen-is-integer"
prim_isInteger : ChannelObj -> Int
%foreign "scheme:blodwen-is-float"
prim_isFloat : ChannelObj -> Int
%foreign "scheme:blodwen-is-char"
prim_isChar : ChannelObj -> Int
%foreign "scheme:blodwen-is-string"
prim_isString : ChannelObj -> Int
%foreign "scheme:blodwen-is-procedure"
prim_isProcedure : ChannelObj -> Int
%foreign "scheme:blodwen-is-symbol"
prim_isSymbol : ChannelObj -> Int
%foreign "scheme:blodwen-is-nil"
prim_isNil : ChannelObj -> Int
%foreign "scheme:blodwen-is-pair"
prim_isPair : ChannelObj -> Int
%foreign "scheme:blodwen-is-vector"
prim_isVector : ChannelObj -> Int
%foreign "scheme:blodwen-id"
unsafeGetInteger : ChannelObj -> Integer
%foreign "scheme:blodwen-id"
unsafeGetString : ChannelObj -> String
%foreign "scheme:blodwen-id"
unsafeGetFloat : ChannelObj -> Double
%foreign "scheme:blodwen-id"
unsafeGetChar : ChannelObj -> Char
%foreign "scheme:car"
unsafeFst : ChannelObj -> ChannelObj
%foreign "scheme:cdr"
unsafeSnd : ChannelObj -> ChannelObj
%foreign "scheme:blodwen-vector-ref"
unsafeVectorRef : ChannelObj -> Integer -> ChannelObj
%foreign "scheme:blodwen-unbox"
unsafeUnbox : ChannelObj -> ChannelObj
%foreign "scheme:blodwen-vector-length"
unsafeVectorLength : ChannelObj -> Integer
%foreign "scheme:blodwen-vector-list"
unsafeVectorToList : ChannelObj -> List ChannelObj
%foreign "scheme:blodwen-read-symbol"
unsafeReadSymbol : ChannelObj -> String
%foreign "scheme:blodwen-is-box"
prim_isBox : ChannelObj -> Int
%foreign "scheme:blodwen-make-channel"
prim__makeChannel : PrimIO (Channel a)
%foreign "scheme:blodwen-channel-get"
prim__channelGet : Channel a -> PrimIO a
%foreign "scheme:blodwen-channel-get-non-blocking"
prim__channelGetNonBlocking : Channel a -> PrimIO ChannelObj
%foreign "scheme:blodwen-channel-get-with-timeout"
prim__channelGetWithTimeout : Channel a -> Nat -> PrimIO ChannelObj
%foreign "scheme:blodwen-channel-put"
prim__channelPut : Channel a -> a -> PrimIO ()

Expand All @@ -208,6 +363,68 @@ export
channelGet : HasIO io => (chan : Channel a) -> io a
channelGet chan = primIO (prim__channelGet chan)

||| Non-blocking version of channelGet.
|||
||| @ chan the channel to receive on
partial
export
channelGetNonBlocking : HasIO io => Scheme a => (chan : Channel a) -> io (Maybe a)
channelGetNonBlocking chan =
pure $ (fromScheme . decodeObj) !(primIO (prim__channelGetNonBlocking chan))
where
decodeObj : ChannelObj -> ChannelSchemeObj
decodeObj obj =
if prim_isInteger obj == 1 then IntegerVal (unsafeGetInteger obj)
else if prim_isVector obj == 1 then Vector (unsafeGetInteger (unsafeVectorRef obj 0))
(readVector (unsafeVectorLength obj) 1 obj)
else if prim_isPair obj == 1 then Cons (decodeObj (unsafeFst obj))
(decodeObj (unsafeSnd obj))
else if prim_isFloat obj == 1 then FloatVal (unsafeGetFloat obj)
else if prim_isString obj == 1 then StringVal (unsafeGetString obj)
else if prim_isChar obj == 1 then CharVal (unsafeGetChar obj)
else if prim_isSymbol obj == 1 then Symbol (unsafeReadSymbol obj)
else if prim_isProcedure obj == 1 then Procedure obj
else if prim_isBox obj == 1 then Box (decodeObj (unsafeUnbox obj))
else Null
Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved
where
readVector : Integer -> Integer -> ChannelObj -> List ChannelSchemeObj
readVector len i obj
= if len == i
then []
else decodeObj (unsafeVectorRef obj i) :: readVector len (i + 1) obj

||| Timeout version of channelGet.
||| Continously loops with 1ms delays until `seconds` has elapsed, or a value is provided through `chan`.
|||
||| @ chan the channel to receive on
||| @ seconds how many seconds to wait until timeout
partial
export
channelGetWithTimeout : HasIO io => Scheme a => (chan : Channel a) -> (seconds : Nat) -> io (Maybe a)
channelGetWithTimeout chan seconds =
pure $ (fromScheme . decodeObj) !(primIO (prim__channelGetWithTimeout chan seconds))
where
decodeObj : ChannelObj -> ChannelSchemeObj
decodeObj obj =
if prim_isInteger obj == 1 then IntegerVal (unsafeGetInteger obj)
else if prim_isVector obj == 1 then Vector (unsafeGetInteger (unsafeVectorRef obj 0))
(readVector (unsafeVectorLength obj) 1 obj)
else if prim_isPair obj == 1 then Cons (decodeObj (unsafeFst obj))
(decodeObj (unsafeSnd obj))
else if prim_isFloat obj == 1 then FloatVal (unsafeGetFloat obj)
else if prim_isString obj == 1 then StringVal (unsafeGetString obj)
else if prim_isChar obj == 1 then CharVal (unsafeGetChar obj)
else if prim_isSymbol obj == 1 then Symbol (unsafeReadSymbol obj)
else if prim_isProcedure obj == 1 then Procedure obj
else if prim_isBox obj == 1 then Box (decodeObj (unsafeUnbox obj))
else Null
where
readVector : Integer -> Integer -> ChannelObj -> List ChannelSchemeObj
readVector len i obj
= if len == i
then []
else decodeObj (unsafeVectorRef obj i) :: readVector len (i + 1) obj

||| Puts a value on the given channel.
|||
||| @ chan the `Channel` to send the value over
Expand Down
46 changes: 45 additions & 1 deletion support/chez/support.ss
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,51 @@
(condition-signal read-cv)
the-val))

(define (blodwen-channel-get-non-blocking ty chan)
(if (mutex-acquire (channel-read-mut chan) #f)
(let* ([val-box (channel-val-box chan)]
[read-box (channel-read-box chan)]
[read-cv (channel-read-cv chan)]
[the-val (unbox val-box)]
)
(if (null? the-val)
(begin
(mutex-release (channel-read-mut chan))
'())
(begin
(set-box! val-box '())
(set-box! read-box #t)
(mutex-release (channel-read-mut chan))
(condition-signal read-cv)
the-val)
))
'()))

(define (blodwen-channel-get-with-timeout ty chan seconds)
(let loop ([start-time (current-time)]
)
(let ([elapsed (time-difference (current-time) start-time)])
(if (time>=? elapsed (make-time 'time-duration 0 seconds))
'()
(if (mutex-acquire (channel-read-mut chan) #f)
(let* ([val-box (channel-val-box chan)]
[the-val (unbox val-box)])
(if (null? the-val)
(begin
(mutex-release (channel-read-mut chan))
(sleep (make-time 'time-duration 1000000 0))
(loop start-time))
(let* ([read-box (channel-read-box chan)]
[read-cv (channel-read-cv chan)])
(set-box! val-box '())
(set-box! read-box #t)
(mutex-release (channel-read-mut chan))
(condition-signal read-cv)
the-val)))
(begin
(sleep (make-time 'time-duration 1000000 0))
(loop start-time)))))))

Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved
;; Mutex

(define (blodwen-make-mutex)
Expand Down Expand Up @@ -499,7 +544,6 @@
(define (blodwen-clock-second time) (time-second time))
(define (blodwen-clock-nanosecond time) (time-nanosecond time))


(define (blodwen-arg-count)
(length (command-line)))

Expand Down
23 changes: 23 additions & 0 deletions tests/allschemes/channels007/Main.idr
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import System
import System.Concurrency

-- Test that using channelGetNonBlocking works as expected.
main : IO ()
main = do
chan <- makeChannel
threadID <- fork $ do
channelPut chan "Hello"
channelPut chan "Goodbye"
sleep 1
case !(channelGetNonBlocking chan) of
Nothing =>
putStrLn "Nothing"
Just val' =>
putStrLn val'
case !(channelGetNonBlocking chan) of
Nothing =>
putStrLn "Nothing"
Just val' =>
putStrLn val'
sleep 1

Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions tests/allschemes/channels007/expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello
Goodbye
3 changes: 3 additions & 0 deletions tests/allschemes/channels007/run
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
. ../../testutils.sh

run Main.idr
31 changes: 31 additions & 0 deletions tests/allschemes/channels008/Main.idr
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import System
import System.Concurrency

-- Simple producing thread.
producer : Channel Nat -> Nat -> IO ()
producer c n = ignore $ producer' n
where
producer' : Nat -> IO ()
producer' Z = pure ()
producer' (S n) = do
channelPut c n
sleep 1

-- Test that channelGetWithTimeout works as expected.
main : IO ()
main =
do c <- makeChannel
tids <- for [0..11] $ \n => fork $ producer c n
vals <- for [0..11] $ \_ => channelGetWithTimeout c 5
ignore $ traverse (\t => threadWait t) tids
Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved
let vals' = map (\val => case val of
Nothing =>
0
Just val' =>
val'
) vals
s = sum vals'
Matthew-Mosior marked this conversation as resolved.
Show resolved Hide resolved
if s == 55
then putStrLn "Success!"
else putStrLn "How did we get here?"

1 change: 1 addition & 0 deletions tests/allschemes/channels008/expected
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Success!
3 changes: 3 additions & 0 deletions tests/allschemes/channels008/run
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
. ../../testutils.sh

run Main.idr
Loading