Skip to content

Commit

Permalink
Add concurrency scope and enum converter
Browse files Browse the repository at this point in the history
  • Loading branch information
djfarrelly committed Jul 22, 2024
1 parent e891cb3 commit cd29418
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package com.inngest.testserver

import com.inngest.*

@FunctionConfig(id = "fn-follow-up", name = "My follow up function!")
@FunctionEventTrigger(event = "user.signup.completed")
@FunctionEventTrigger(event = "random-event")
class FollowupFunction : InngestFunction() {
override fun execute(
ctx: FunctionContext,
step: Step,
): LinkedHashMap<String, Any> {
println("-> follow up handler called " + ctx.event.name)
return ctx.event.data
}
}

//class FollowupFunction : InngestFunction() {
// override fun execute(
// ctx: FunctionContext,
// step: Step,
// ): LinkedHashMap<String, Any> {
// println("-> follow up handler called " + ctx.event.name)
// return ctx.event.data
// }
//}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ class ProcessAlbum : InngestFunction() {

override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder {
return builder
.id("ProcessAlbum")
.name("Process Album!")
.triggerEvent("delivery/process.requested")
.triggerCron("5 0 * 8 *")
.trigger(
InngestFunctionTriggers.Cron("5 0 * 8 *"))
.trigger(InngestFunctionTriggers.Cron("5 0 * 8 *"))
.batchEvents(30, Duration.ofSeconds(10))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RestoreFromGlacier : InngestFunction() {
.id("RestoreFromGlacier")
.name("Restore from Glacier")
.trigger(InngestFunctionTriggers.Event("delivery/restore.requested"))
.concurrency(10, null, ConcurrencyScope.ENVIRONMENT)
}

override fun execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package com.inngest.testserver
import com.inngest.*
import java.time.Duration

@FunctionConfig(id = "fn-id-slug", name = "My function!")
@FunctionEventTrigger(event = "user-signup")
class UserSignupFunction : InngestFunction() {
class ProcessUserSignup : InngestFunction() {

override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder {
return builder.id("process-user-signup")
.triggerEvent("user-signup")
}

override fun execute(
ctx: FunctionContext,
step: Step,
Expand Down
1 change: 1 addition & 0 deletions inngest/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class CommHandler(
try {
return Klaxon()
.fieldConverter(KlaxonDuration::class, durationConverter)
.fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter)
.toJsonString(payload)
} catch (e: Exception) {
println(e);
Expand Down
4 changes: 3 additions & 1 deletion inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ data class StepConfig(
val runtime: HashMap<String, String> = hashMapOf("type" to "http"),
)

class InternalFunctionConfig @JvmOverloads constructor(
internal class InternalFunctionConfig @JvmOverloads constructor(
val id: String,
val name: String?,
val triggers: MutableList<InngestFunctionTrigger>,
@Json(serializeNull = false)
val concurrency: MutableList<Concurrency>?,
@Json(serializeNull = false)
val batchEvents: BatchEvents?,
val steps: Map<String, StepConfig>,
) {
Expand Down
1 change: 1 addition & 0 deletions inngest/src/main/kotlin/com/inngest/HttpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal class HttpClient(private val clientConfig: RequestConfig) {
): okhttp3.Request {
val jsonRequestBody = Klaxon()
.fieldConverter(KlaxonDuration::class, durationConverter)
.fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter)
.toJsonString(payload)
val body = jsonRequestBody.toRequestBody(jsonMediaType)

Expand Down
4 changes: 3 additions & 1 deletion inngest/src/main/kotlin/com/inngest/InngestFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ abstract class InngestFunction {
try {
return buildConfig().id!!
} catch (e: Exception) {
throw InngestInvalidConfigurationException("Function id must be configured via builder")
throw InngestInvalidConfigurationException(
"Function id must be configured via builder: ${this.javaClass.name}"
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class InngestFunctionConfigBuilder() {
var id: String? = null;
private var name: String? = null;
private var triggers: MutableList<InngestFunctionTrigger> = mutableListOf();
private var concurrency: MutableList<Concurrency>? = null;
private var batchEvents: BatchEvents? = null;

/**
Expand Down Expand Up @@ -77,6 +78,25 @@ class InngestFunctionConfigBuilder() {
return this;
}

/**
* Configure step concurrency limit
*
* @param limit Maximum number of concurrent executing steps across function type
* @param key A CEL expression to apply limit using event payload properties. Example: "event.data.destinationId"
* @param scope The scope to apply the limit to. Options
*/
fun concurrency(limit: Int, key: String? = null, scope: ConcurrencyScope? = null): InngestFunctionConfigBuilder {
val c = Concurrency(limit, key, scope)
// TODO - Limit concurrency length to 2
if (this.concurrency == null) {
this.concurrency = mutableListOf(c)
} else {
this.concurrency?.add(c)
}
println(scope?.value)
return this
}

private fun buildSteps(serveUrl: String): Map<String, StepConfig> {
val scheme = serveUrl.split("://")[0]
return mapOf(
Expand All @@ -98,15 +118,18 @@ class InngestFunctionConfigBuilder() {
)
}

fun build(appId: String, serverUrl: String): InternalFunctionConfig {
internal fun build(appId: String, serverUrl: String): InternalFunctionConfig {
if (id == null) {
throw InngestInvalidConfigurationException("Function id must be configured via builder")
}
val globalId = String.format("%s-%s", appId, id)
println("concurrency")
println(concurrency)
val config = InternalFunctionConfig(
globalId,
name,
triggers,
concurrency,
batchEvents,
steps = buildSteps(serverUrl)
)
Expand All @@ -127,7 +150,34 @@ val durationConverter = object : Converter {
override fun toJson(value: Any): String = """"${(value as Duration).seconds}s""""
}

data class BatchEvents
@Target(AnnotationTarget.FIELD)
annotation class KlaxonConcurrencyScope

val concurrencyScopeConverter = object : Converter {
override fun canConvert(cls: Class<*>): Boolean = cls.isEnum
override fun fromJson(jv: JsonValue): ConcurrencyScope = enumValueOf<ConcurrencyScope>(jv.string!!)
override fun toJson(value: Any): String = """"${(value as ConcurrencyScope).value}""""
}

// TODO - Convert enum element to value, not name
enum class ConcurrencyScope(val value: String) {
ACCOUNT("account"),
ENVIRONMENT("env"),
FUNCTION("fn"),
}

internal data class Concurrency
@JvmOverloads
constructor(
val limit: Int,
@Json(serializeNull = false)
val key: String? = null,
@Json(serializeNull = false)
@KlaxonConcurrencyScope
val scope: ConcurrencyScope? = null,
)

internal data class BatchEvents
@JvmOverloads
constructor(
val maxSize: Int,
Expand Down

0 comments on commit cd29418

Please sign in to comment.