Skip to content

Commit

Permalink
feat: infer client for replay events
Browse files Browse the repository at this point in the history
cfoust committed Nov 22, 2023
1 parent 7ef910a commit ea567e2
Showing 4 changed files with 129 additions and 47 deletions.
7 changes: 1 addition & 6 deletions pkg/bind/module.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ type ActionEvent[T any] struct {
Source *trie.Trie[T]
Sequence []string
// Any regex traversals that matched
Args []string
Args []string
}

type Match[T any] struct {
@@ -37,11 +37,6 @@ type PartialEvent[T any] struct {
Matches []Match[T]
}

// All input that did not match anything in the scope
type RawEvent struct {
Data []byte
}

// Contains data for a single input event
type input taro.Msg

64 changes: 28 additions & 36 deletions pkg/cy/client.go
Original file line number Diff line number Diff line change
@@ -8,13 +8,11 @@ import (

"github.com/cfoust/cy/pkg/bind"
cyParams "github.com/cfoust/cy/pkg/cy/params"
"github.com/cfoust/cy/pkg/events"
"github.com/cfoust/cy/pkg/frames"
"github.com/cfoust/cy/pkg/geom"
P "github.com/cfoust/cy/pkg/io/protocol"
"github.com/cfoust/cy/pkg/io/ws"
"github.com/cfoust/cy/pkg/mux/screen"
"github.com/cfoust/cy/pkg/mux/screen/replay"
"github.com/cfoust/cy/pkg/mux/screen/server"
"github.com/cfoust/cy/pkg/mux/screen/splash"
"github.com/cfoust/cy/pkg/mux/screen/toasts"
@@ -31,14 +29,21 @@ import (

type Connection = ws.Client[P.Message]

type ClientID = int32

type Client struct {
deadlock.RWMutex
util.Lifetime

// the unique identifier for the client, forever
id ClientID

conn Connection

cy *Cy

// All of the environment variables in the client's original
// environment at connection time
env Environment

node tree.Node
@@ -112,39 +117,6 @@ func (c *Client) pollRender() {
}
}

func (c *Cy) pollNodeEvents(ctx context.Context, events <-chan events.Msg) {
for {
select {
case <-ctx.Done():
return
case event := <-events:
nodeEvent, ok := event.(tree.NodeEvent)
if !ok {
continue
}

c.RLock()
clients := c.clients
c.RUnlock()

// TODO(cfoust): 10/18/23 infer client
if len(clients) == 0 {
continue
}

client := clients[0]

switch event := nodeEvent.Event.(type) {
case replay.CopyEvent:
client.buffer = event.Text
case bind.BindEvent:
go client.runAction(event)
}

}
}
}

func (c *Client) runAction(event bind.BindEvent) {
args := make([]interface{}, 0)
for _, arg := range event.Args {
@@ -168,6 +140,17 @@ func (c *Client) runAction(event bind.BindEvent) {
))
}

func (c *Client) interact(out chan historyEvent) {
c.RLock()
node := c.node.Id()
c.RUnlock()
out <- historyEvent{
Client: c.id,
Node: node,
Stamp: time.Now(),
}
}

func (c *Client) pollEvents() {
for {
select {
@@ -179,7 +162,12 @@ func (c *Client) pollEvents() {
continue
}

// TODO(cfoust): 07/18/23 error handling
// We only consider key presses to be an interaction
// We don't want mouse motion to trigger this
if _, ok := event.(taro.KeyMsg); ok {
c.interact(c.cy.writes)
}

c.renderer.Send(event)
}
}
@@ -216,6 +204,8 @@ func (c *Cy) pollClient(ctx context.Context, client *Client) {
}
}

client.id = c.nextClientID.Add(1)

go client.pollEvents()
go client.binds.Poll(client.Ctx())

@@ -438,6 +428,8 @@ func (c *Client) Attach(node tree.Node) error {
c.history = append(c.history, node.Id())
c.Unlock()

c.interact(c.cy.visits)

// Update bindings
scopes := make([]*bind.BindScope, 0)
for _, pathNode := range path {
97 changes: 96 additions & 1 deletion pkg/cy/module.go
Original file line number Diff line number Diff line change
@@ -4,13 +4,16 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
"time"

"github.com/cfoust/cy/pkg/bind"
"github.com/cfoust/cy/pkg/cy/cmd"
"github.com/cfoust/cy/pkg/events"
"github.com/cfoust/cy/pkg/geom"
"github.com/cfoust/cy/pkg/janet"
"github.com/cfoust/cy/pkg/mux/screen"
"github.com/cfoust/cy/pkg/mux/screen/replay"
"github.com/cfoust/cy/pkg/mux/screen/server"
"github.com/cfoust/cy/pkg/mux/screen/toasts"
"github.com/cfoust/cy/pkg/mux/screen/tree"
@@ -30,12 +33,20 @@ type Options struct {
DataDir string
}

type historyEvent struct {
Stamp time.Time
Client ClientID
Node tree.NodeID
}

type Cy struct {
util.Lifetime
deadlock.RWMutex

janet *janet.VM

nextClientID atomic.Int32

muxServer *server.Server

// The top-level fallback for all parameter queries. This is distinct
@@ -57,6 +68,12 @@ type Cy struct {

toast *ToastLogger
queuedToasts []toasts.Toast

// Every time a client writes to or visits a node, we make a note of it
// so we can infer who last used it
// (tmux does the same thing)
lastWrite, lastVisit map[tree.NodeID]historyEvent
writes, visits chan historyEvent
}

func (c *Cy) loadUserConfig(ctx context.Context) {
@@ -105,6 +122,78 @@ func (c *Cy) Shutdown() error {
return nil
}

func (c *Cy) pollInteractions(ctx context.Context, journal map[tree.NodeID]historyEvent, channel chan historyEvent) {
for {
select {
case <-ctx.Done():
return
case event := <-channel:
c.Lock()
journal[event.Node] = event
c.Unlock()
}
}
}

func (c *Cy) getClient(id ClientID) (client *Client, found bool) {
c.RLock()
defer c.RUnlock()

for _, otherClient := range c.clients {
if otherClient.id == id {
client = otherClient
found = true
return
}
}

return
}

func (c *Cy) inferClient(node tree.NodeID) (client *Client, found bool) {
c.RLock()
write, haveWrite := c.lastWrite[node]
visit, haveVisit := c.lastVisit[node]
c.RUnlock()

if !haveVisit {
return
}

if !haveWrite || write.Stamp.Before(visit.Stamp) {
return c.getClient(visit.Client)
}

return c.getClient(write.Client)
}

func (c *Cy) pollNodeEvents(ctx context.Context, events <-chan events.Msg) {
for {
select {
case <-ctx.Done():
return
case event := <-events:
nodeEvent, ok := event.(tree.NodeEvent)
if !ok {
continue
}

client, ok := c.inferClient(nodeEvent.Id)
if !ok {
continue
}

switch event := nodeEvent.Event.(type) {
case replay.CopyEvent:
client.buffer = event.Text
case bind.BindEvent:
go client.runAction(event)
}

}
}
}

func Start(ctx context.Context, options Options) (*Cy, error) {
replayBinds := bind.NewBindScope()

@@ -116,15 +205,21 @@ func Start(ctx context.Context, options Options) (*Cy, error) {
muxServer: server.New(),
replayBinds: replayBinds,
defaults: defaults,
lastVisit: make(map[tree.NodeID]historyEvent),
lastWrite: make(map[tree.NodeID]historyEvent),
writes: make(chan historyEvent),
visits: make(chan historyEvent),
}
cy.toast = NewToastLogger(cy.sendToast)
err := cy.setDefaults(options)
if err != nil {
return nil, err
return nil, err
}

subscriber := t.Subscribe(cy.Ctx())
go cy.pollNodeEvents(cy.Ctx(), subscriber.Recv())
go cy.pollInteractions(cy.Ctx(), cy.lastWrite, cy.writes)
go cy.pollInteractions(cy.Ctx(), cy.lastVisit, cy.visits)

replayable, _ := cmd.New(
cy.Ctx(),
8 changes: 4 additions & 4 deletions pkg/mux/screen/tree/tree.go
Original file line number Diff line number Diff line change
@@ -14,16 +14,16 @@ import (
type Tree struct {
deadlock.RWMutex
*mux.UpdatePublisher
root *Group
nodes map[NodeID]Node
nodeIndex atomic.Int32
root *Group
nodes map[NodeID]Node
nextNodeID atomic.Int32
}

func (t *Tree) newMetadata() *metaData {
t.Lock()
defer t.Unlock()

id := t.nodeIndex.Add(1)
id := t.nextNodeID.Add(1)
node := &metaData{
id: id,
binds: bind.NewBindScope(),

0 comments on commit ea567e2

Please sign in to comment.