From ea567e23ac64fbde6e9679bf5b7213d3ca917a9f Mon Sep 17 00:00:00 2001 From: Caleb Foust Date: Wed, 22 Nov 2023 07:51:40 -0500 Subject: [PATCH] feat: infer client for replay events --- pkg/bind/module.go | 7 +-- pkg/cy/client.go | 64 +++++++++++------------- pkg/cy/module.go | 97 ++++++++++++++++++++++++++++++++++++- pkg/mux/screen/tree/tree.go | 8 +-- 4 files changed, 129 insertions(+), 47 deletions(-) diff --git a/pkg/bind/module.go b/pkg/bind/module.go index e033857a..39973a19 100644 --- a/pkg/bind/module.go +++ b/pkg/bind/module.go @@ -23,7 +23,7 @@ type ActionEvent[T any] struct { Source *trie.Trie[T] Sequence []string // Any regex traversals that matched - Args []string + Args []string } type Match[T any] struct { @@ -37,11 +37,6 @@ type PartialEvent[T any] struct { Matches []Match[T] } -// All input that did not match anything in the scope -type RawEvent struct { - Data []byte -} - // Contains data for a single input event type input taro.Msg diff --git a/pkg/cy/client.go b/pkg/cy/client.go index 4bc90f21..ab2eccc8 100644 --- a/pkg/cy/client.go +++ b/pkg/cy/client.go @@ -8,13 +8,11 @@ import ( "github.com/cfoust/cy/pkg/bind" cyParams "github.com/cfoust/cy/pkg/cy/params" - "github.com/cfoust/cy/pkg/events" "github.com/cfoust/cy/pkg/frames" "github.com/cfoust/cy/pkg/geom" P "github.com/cfoust/cy/pkg/io/protocol" "github.com/cfoust/cy/pkg/io/ws" "github.com/cfoust/cy/pkg/mux/screen" - "github.com/cfoust/cy/pkg/mux/screen/replay" "github.com/cfoust/cy/pkg/mux/screen/server" "github.com/cfoust/cy/pkg/mux/screen/splash" "github.com/cfoust/cy/pkg/mux/screen/toasts" @@ -31,14 +29,21 @@ import ( type Connection = ws.Client[P.Message] +type ClientID = int32 + type Client struct { deadlock.RWMutex util.Lifetime + // the unique identifier for the client, forever + id ClientID + conn Connection cy *Cy + // All of the environment variables in the client's original + // environment at connection time env Environment node tree.Node @@ -112,39 +117,6 @@ func (c *Client) pollRender() { } } -func (c *Cy) pollNodeEvents(ctx context.Context, events <-chan events.Msg) { - for { - select { - case <-ctx.Done(): - return - case event := <-events: - nodeEvent, ok := event.(tree.NodeEvent) - if !ok { - continue - } - - c.RLock() - clients := c.clients - c.RUnlock() - - // TODO(cfoust): 10/18/23 infer client - if len(clients) == 0 { - continue - } - - client := clients[0] - - switch event := nodeEvent.Event.(type) { - case replay.CopyEvent: - client.buffer = event.Text - case bind.BindEvent: - go client.runAction(event) - } - - } - } -} - func (c *Client) runAction(event bind.BindEvent) { args := make([]interface{}, 0) for _, arg := range event.Args { @@ -168,6 +140,17 @@ func (c *Client) runAction(event bind.BindEvent) { )) } +func (c *Client) interact(out chan historyEvent) { + c.RLock() + node := c.node.Id() + c.RUnlock() + out <- historyEvent{ + Client: c.id, + Node: node, + Stamp: time.Now(), + } +} + func (c *Client) pollEvents() { for { select { @@ -179,7 +162,12 @@ func (c *Client) pollEvents() { continue } - // TODO(cfoust): 07/18/23 error handling + // We only consider key presses to be an interaction + // We don't want mouse motion to trigger this + if _, ok := event.(taro.KeyMsg); ok { + c.interact(c.cy.writes) + } + c.renderer.Send(event) } } @@ -216,6 +204,8 @@ func (c *Cy) pollClient(ctx context.Context, client *Client) { } } + client.id = c.nextClientID.Add(1) + go client.pollEvents() go client.binds.Poll(client.Ctx()) @@ -438,6 +428,8 @@ func (c *Client) Attach(node tree.Node) error { c.history = append(c.history, node.Id()) c.Unlock() + c.interact(c.cy.visits) + // Update bindings scopes := make([]*bind.BindScope, 0) for _, pathNode := range path { diff --git a/pkg/cy/module.go b/pkg/cy/module.go index 8b8d1e41..35b329ac 100644 --- a/pkg/cy/module.go +++ b/pkg/cy/module.go @@ -4,13 +4,16 @@ import ( "context" "fmt" "os" + "sync/atomic" "time" "github.com/cfoust/cy/pkg/bind" "github.com/cfoust/cy/pkg/cy/cmd" + "github.com/cfoust/cy/pkg/events" "github.com/cfoust/cy/pkg/geom" "github.com/cfoust/cy/pkg/janet" "github.com/cfoust/cy/pkg/mux/screen" + "github.com/cfoust/cy/pkg/mux/screen/replay" "github.com/cfoust/cy/pkg/mux/screen/server" "github.com/cfoust/cy/pkg/mux/screen/toasts" "github.com/cfoust/cy/pkg/mux/screen/tree" @@ -30,12 +33,20 @@ type Options struct { DataDir string } +type historyEvent struct { + Stamp time.Time + Client ClientID + Node tree.NodeID +} + type Cy struct { util.Lifetime deadlock.RWMutex janet *janet.VM + nextClientID atomic.Int32 + muxServer *server.Server // The top-level fallback for all parameter queries. This is distinct @@ -57,6 +68,12 @@ type Cy struct { toast *ToastLogger queuedToasts []toasts.Toast + + // Every time a client writes to or visits a node, we make a note of it + // so we can infer who last used it + // (tmux does the same thing) + lastWrite, lastVisit map[tree.NodeID]historyEvent + writes, visits chan historyEvent } func (c *Cy) loadUserConfig(ctx context.Context) { @@ -105,6 +122,78 @@ func (c *Cy) Shutdown() error { return nil } +func (c *Cy) pollInteractions(ctx context.Context, journal map[tree.NodeID]historyEvent, channel chan historyEvent) { + for { + select { + case <-ctx.Done(): + return + case event := <-channel: + c.Lock() + journal[event.Node] = event + c.Unlock() + } + } +} + +func (c *Cy) getClient(id ClientID) (client *Client, found bool) { + c.RLock() + defer c.RUnlock() + + for _, otherClient := range c.clients { + if otherClient.id == id { + client = otherClient + found = true + return + } + } + + return +} + +func (c *Cy) inferClient(node tree.NodeID) (client *Client, found bool) { + c.RLock() + write, haveWrite := c.lastWrite[node] + visit, haveVisit := c.lastVisit[node] + c.RUnlock() + + if !haveVisit { + return + } + + if !haveWrite || write.Stamp.Before(visit.Stamp) { + return c.getClient(visit.Client) + } + + return c.getClient(write.Client) +} + +func (c *Cy) pollNodeEvents(ctx context.Context, events <-chan events.Msg) { + for { + select { + case <-ctx.Done(): + return + case event := <-events: + nodeEvent, ok := event.(tree.NodeEvent) + if !ok { + continue + } + + client, ok := c.inferClient(nodeEvent.Id) + if !ok { + continue + } + + switch event := nodeEvent.Event.(type) { + case replay.CopyEvent: + client.buffer = event.Text + case bind.BindEvent: + go client.runAction(event) + } + + } + } +} + func Start(ctx context.Context, options Options) (*Cy, error) { replayBinds := bind.NewBindScope() @@ -116,15 +205,21 @@ func Start(ctx context.Context, options Options) (*Cy, error) { muxServer: server.New(), replayBinds: replayBinds, defaults: defaults, + lastVisit: make(map[tree.NodeID]historyEvent), + lastWrite: make(map[tree.NodeID]historyEvent), + writes: make(chan historyEvent), + visits: make(chan historyEvent), } cy.toast = NewToastLogger(cy.sendToast) err := cy.setDefaults(options) if err != nil { - return nil, err + return nil, err } subscriber := t.Subscribe(cy.Ctx()) go cy.pollNodeEvents(cy.Ctx(), subscriber.Recv()) + go cy.pollInteractions(cy.Ctx(), cy.lastWrite, cy.writes) + go cy.pollInteractions(cy.Ctx(), cy.lastVisit, cy.visits) replayable, _ := cmd.New( cy.Ctx(), diff --git a/pkg/mux/screen/tree/tree.go b/pkg/mux/screen/tree/tree.go index b646e395..b75b41ae 100644 --- a/pkg/mux/screen/tree/tree.go +++ b/pkg/mux/screen/tree/tree.go @@ -14,16 +14,16 @@ import ( type Tree struct { deadlock.RWMutex *mux.UpdatePublisher - root *Group - nodes map[NodeID]Node - nodeIndex atomic.Int32 + root *Group + nodes map[NodeID]Node + nextNodeID atomic.Int32 } func (t *Tree) newMetadata() *metaData { t.Lock() defer t.Unlock() - id := t.nodeIndex.Add(1) + id := t.nextNodeID.Add(1) node := &metaData{ id: id, binds: bind.NewBindScope(),