Skip to content

Commit

Permalink
add TSubscriptionRef (#2725)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Arnaldi <[email protected]>
Co-authored-by: Tim <[email protected]>
  • Loading branch information
3 people committed Oct 14, 2024
1 parent 9e2cc4e commit 5c05781
Show file tree
Hide file tree
Showing 15 changed files with 716 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/cold-cougars-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add TSubscriptionRef
5 changes: 5 additions & 0 deletions .changeset/shiny-squids-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add Stream.fromTQueue & Stream.fromTPubSub
18 changes: 18 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import type * as Sink from "./Sink.js"
import type * as Emit from "./StreamEmit.js"
import type * as HaltStrategy from "./StreamHaltStrategy.js"
import type * as Take from "./Take.js"
import type { TPubSub } from "./TPubSub.js"
import type { TDequeue } from "./TQueue.js"
import type * as Tracer from "./Tracer.js"
import type { Covariant, NoInfer, TupleOf } from "./Types.js"
import type * as Unify from "./Unify.js"
Expand Down Expand Up @@ -2013,6 +2015,14 @@ export const fromPubSub: {
): Stream<A>
} = internal.fromPubSub

/**
* Creates a stream from a subscription to a `TPubSub`.
*
* @since 3.10.0
* @category constructors
*/
export const fromTPubSub: <A>(pubsub: TPubSub<A>) => Stream<A> = internal.fromTPubSub

/**
* Creates a new `Stream` from an iterable collection of values.
*
Expand Down Expand Up @@ -2094,6 +2104,14 @@ export const fromQueue: <A>(
}
) => Stream<A> = internal.fromQueue

/**
* Creates a stream from a TQueue of values
*
* @since 3.10.0
* @category constructors
*/
export const fromTQueue: <A>(queue: TDequeue<A>) => Stream<A> = internal.fromTQueue

/**
* Creates a stream from a `ReadableStream`.
*
Expand Down
9 changes: 9 additions & 0 deletions packages/effect/src/TPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ export const isEmpty: <A>(self: TPubSub<A>) => STM.STM<boolean> = internal.isEmp
*/
export const isFull: <A>(self: TPubSub<A>) => STM.STM<boolean> = internal.isFull

/**
* Interrupts any fibers that are suspended on `offer` or `take`. Future calls
* to `offer*` and `take*` will be interrupted immediately.
*
* @since 2.0.0
* @category utils
*/
export const shutdown: <A>(self: TPubSub<A>) => STM.STM<void> = internal.shutdown

/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
*
Expand Down
14 changes: 7 additions & 7 deletions packages/effect/src/TQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ export const isTEnqueue: (u: unknown) => u is TEnqueue<unknown> = internal.isTEn
* @since 2.0.0
* @category mutations
*/
export const awaitShutdown: <A>(self: TQueue<A>) => STM.STM<void> = internal.awaitShutdown
export const awaitShutdown: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<void> = internal.awaitShutdown

/**
* Creates a bounded queue with the back pressure strategy. The queue will
Expand All @@ -226,7 +226,7 @@ export const bounded: <A>(requestedCapacity: number) => STM.STM<TQueue<A>> = int
* @since 2.0.0
* @category getters
*/
export const capacity: <A>(self: TQueue<A>) => number = internal.capacity
export const capacity: <A>(self: TDequeue<A> | TEnqueue<A>) => number = internal.capacity

/**
* Creates a bounded queue with the dropping strategy. The queue will drop new
Expand All @@ -245,7 +245,7 @@ export const dropping: <A>(requestedCapacity: number) => STM.STM<TQueue<A>> = in
* @since 2.0.0
* @category getters
*/
export const isEmpty: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isEmpty
export const isEmpty: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<boolean> = internal.isEmpty

/**
* Returns `true` if the `TQueue` contains at least one element, `false`
Expand All @@ -254,15 +254,15 @@ export const isEmpty: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isEmpt
* @since 2.0.0
* @category getters
*/
export const isFull: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isFull
export const isFull: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<boolean> = internal.isFull

/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
*
* @since 2.0.0
* @category getters
*/
export const isShutdown: <A>(self: TQueue<A>) => STM.STM<boolean> = internal.isShutdown
export const isShutdown: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<boolean> = internal.isShutdown

/**
* Places one value in the queue.
Expand Down Expand Up @@ -345,7 +345,7 @@ export const seek: {
* @since 2.0.0
* @category mutations
*/
export const shutdown: <A>(self: TQueue<A>) => STM.STM<void> = internal.shutdown
export const shutdown: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<void> = internal.shutdown

/**
* Retrieves the size of the queue, which is equal to the number of elements
Expand All @@ -355,7 +355,7 @@ export const shutdown: <A>(self: TQueue<A>) => STM.STM<void> = internal.shutdown
* @since 2.0.0
* @category getters
*/
export const size: <A>(self: TQueue<A>) => STM.STM<number> = internal.size
export const size: <A>(self: TDequeue<A> | TEnqueue<A>) => STM.STM<number> = internal.size

/**
* Creates a bounded queue with the sliding strategy. The queue will add new
Expand Down
3 changes: 2 additions & 1 deletion packages/effect/src/TRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type * as TxnId from "./internal/stm/stm/txnId.js"
import type * as Versioned from "./internal/stm/stm/versioned.js"
import * as internal from "./internal/stm/tRef.js"
import type * as Option from "./Option.js"
import type { Pipeable } from "./Pipeable.js"
import type * as STM from "./STM.js"
import type * as Types from "./Types.js"

Expand Down Expand Up @@ -34,7 +35,7 @@ export type TRefTypeId = typeof TRefTypeId
* @since 2.0.0
* @category models
*/
export interface TRef<in out A> extends TRef.Variance<A> {
export interface TRef<in out A> extends TRef.Variance<A>, Pipeable {
/**
* Note: the method is unbound, exposed only for potential extensions.
*/
Expand Down
192 changes: 192 additions & 0 deletions packages/effect/src/TSubscriptionRef.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* @since 3.10.0
*/
import type * as Effect from "./Effect.js"
import * as internal from "./internal/stm/tSubscriptionRef.js"
import type * as Option from "./Option.js"
import type * as Scope from "./Scope.js"
import type * as STM from "./STM.js"
import type * as Stream from "./Stream.js"
import type * as TPubSub from "./TPubSub.js"
import type * as TQueue from "./TQueue.js"
import type * as TRef from "./TRef.js"
import type * as Types from "./Types.js"

/**
* @since 3.10.0
* @category symbols
*/
export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId

/**
* @since 3.10.0
* @category symbols
*/
export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId

/**
* A `TSubscriptionRef<A>` is a `TRef` that can be subscribed to in order to
* receive a `TDequeue<A>` of the current value and all committed changes to the value.
*
* @since 3.10.0
* @category models
*/
export interface TSubscriptionRef<in out A> extends TSubscriptionRef.Variance<A>, TRef.TRef<A> {
/** @internal */
readonly ref: TRef.TRef<A>
/** @internal */
readonly pubsub: TPubSub.TPubSub<A>
/** @internal */
modify<B>(f: (a: A) => readonly [B, A]): STM.STM<B>

/**
* A TDequeue containing the current value of the `Ref` as well as all changes
* to that value.
*/
readonly changes: STM.STM<TQueue.TDequeue<A>>
}

/**
* @since 3.10.0
*/
export declare namespace TSubscriptionRef {
/**
* @since 3.10.0
* @category models
*/
export interface Variance<in out A> {
readonly [TSubscriptionRefTypeId]: {
readonly _A: Types.Invariant<A>
}
}
}

/**
* @since 3.10.0
* @category mutations
*/
export const get: <A>(self: TSubscriptionRef<A>) => STM.STM<A> = internal.get

/**
* @since 3.10.0
* @category mutations
*/
export const getAndSet: {
<A>(value: A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, value: A): STM.STM<A>
} = internal.getAndSet

/**
* @since 3.10.0
* @category mutations
*/
export const getAndUpdate: {
<A>(f: (a: A) => A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => A): STM.STM<A>
} = internal.getAndUpdate

/**
* @since 3.10.0
* @category mutations
*/
export const getAndUpdateSome: {
<A>(f: (a: A) => Option.Option<A>): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => Option.Option<A>): STM.STM<A>
} = internal.getAndUpdateSome

/**
* @since 3.10.0
* @category constructors
*/
export const make: <A>(value: A) => STM.STM<TSubscriptionRef<A>> = internal.make

/**
* @since 3.10.0
* @category mutations
*/
export const modify: {
<A, B>(f: (a: A) => readonly [B, A]): (self: TSubscriptionRef<A>) => STM.STM<B>
<A, B>(self: TSubscriptionRef<A>, f: (a: A) => readonly [B, A]): STM.STM<B>
} = internal.modify

/**
* @since 3.10.0
* @category mutations
*/
export const modifySome: {
<A, B>(fallback: B, f: (a: A) => Option.Option<readonly [B, A]>): (self: TSubscriptionRef<A>) => STM.STM<B>
<A, B>(self: TSubscriptionRef<A>, fallback: B, f: (a: A) => Option.Option<readonly [B, A]>): STM.STM<B>
} = internal.modifySome

/**
* @since 3.10.0
* @category mutations
*/
export const set: {
<A>(value: A): (self: TSubscriptionRef<A>) => STM.STM<void>
<A>(self: TSubscriptionRef<A>, value: A): STM.STM<void>
} = internal.set

/**
* @since 3.10.0
* @category mutations
*/
export const setAndGet: {
<A>(value: A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, value: A): STM.STM<A>
} = internal.setAndGet

/**
* @since 3.10.0
* @category mutations
*/
export const update: {
<A>(f: (a: A) => A): (self: TSubscriptionRef<A>) => STM.STM<void>
<A>(self: TSubscriptionRef<A>, f: (a: A) => A): STM.STM<void>
} = internal.update

/**
* @since 3.10.0
* @category mutations
*/
export const updateAndGet: {
<A>(f: (a: A) => A): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => A): STM.STM<A>
} = internal.updateAndGet

/**
* @since 3.10.0
* @category mutations
*/
export const updateSome: {
<A>(f: (a: A) => Option.Option<A>): (self: TSubscriptionRef<A>) => STM.STM<void>
<A>(self: TSubscriptionRef<A>, f: (a: A) => Option.Option<A>): STM.STM<void>
} = internal.updateSome

/**
* @since 3.10.0
* @category mutations
*/
export const updateSomeAndGet: {
<A>(f: (a: A) => Option.Option<A>): (self: TSubscriptionRef<A>) => STM.STM<A>
<A>(self: TSubscriptionRef<A>, f: (a: A) => Option.Option<A>): STM.STM<A>
} = internal.updateSomeAndGet

/**
* @since 3.10.0
* @category mutations
*/
export const changesScoped: <A>(self: TSubscriptionRef<A>) => Effect.Effect<TQueue.TDequeue<A>, never, Scope.Scope> =
internal.changesScoped

/**
* @since 3.10.0
* @category mutations
*/
export const changesStream: <A>(self: TSubscriptionRef<A>) => Stream.Stream<A> = internal.changesStream

/**
* @since 3.10.0
* @category mutations
*/
export const changes: <A>(self: TSubscriptionRef<A>) => STM.STM<TQueue.TDequeue<A>> = (self) => self.changes
5 changes: 5 additions & 0 deletions packages/effect/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,11 @@ export * as TSemaphore from "./TSemaphore.js"
*/
export * as TSet from "./TSet.js"

/**
* @since 3.10.0
*/
export * as TSubscriptionRef from "./TSubscriptionRef.js"

/**
* @since 2.0.0
*/
Expand Down
3 changes: 1 addition & 2 deletions packages/effect/src/internal/stm/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import { pipeArguments } from "../../Pipeable.js"
import { hasProperty } from "../../Predicate.js"
import type * as Scheduler from "../../Scheduler.js"
import type * as STM from "../../STM.js"
import { StreamTypeId } from "../../Stream.js"
import { YieldWrap } from "../../Utils.js"
import { ChannelTypeId } from "../core-stream.js"
import { withFiberRuntime } from "../core.js"
import { effectVariance } from "../effectable.js"
import { effectVariance, StreamTypeId } from "../effectable.js"
import { OP_COMMIT } from "../opCodes/effect.js"
import { SingleShotGen } from "../singleShotGen.js"
import { SinkTypeId } from "../sink.js"
Expand Down
1 change: 1 addition & 0 deletions packages/effect/src/internal/stm/tPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class TPubSubSubscriptionImpl<in out A> implements TQueue.TDequeue<A> {
capacity(): number {
return this.requestedCapacity
}

size: STM.STM<number> = core.withSTMRuntime((runtime) => {
let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal)
if (currentSubscriberHead === undefined) {
Expand Down
4 changes: 2 additions & 2 deletions packages/effect/src/internal/stm/tQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Chunk from "../../Chunk.js"
import { dual, pipe } from "../../Function.js"
import * as Option from "../../Option.js"
import { hasProperty, type Predicate } from "../../Predicate.js"
import * as STM from "../../STM.js"
import type * as STM from "../../STM.js"
import type * as TQueue from "../../TQueue.js"
import type * as TRef from "../../TRef.js"
import * as core from "./core.js"
Expand Down Expand Up @@ -99,7 +99,7 @@ class TQueueImpl<in out A> implements TQueue.TQueue<A> {
size: STM.STM<number> = core.withSTMRuntime((runtime) => {
const queue = tRef.unsafeGet(this.ref, runtime.journal)
if (queue === undefined) {
return STM.interruptAs(runtime.fiberId)
return core.interruptAs(runtime.fiberId)
}
return core.succeed(queue.length)
})
Expand Down
Loading

0 comments on commit 5c05781

Please sign in to comment.