Skip to content

Commit

Permalink
refactor(sistana): new snapshot model
Browse files Browse the repository at this point in the history
fixme: poor performance
todo: _compact_keywords
todo: separator_optbind
todo: sort some non-pattern attrs
  • Loading branch information
GreyElaina committed Oct 4, 2024
1 parent fa20e28 commit b245fd0
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 435 deletions.
2 changes: 0 additions & 2 deletions src/arclet/alconna/sistana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from .model import Mix as Mix
from .model import ObjectCapture as ObjectCapture
from .model import OptionPattern as OptionPattern
from .model import OptionTraverse as OptionTraverse
from .model import PlainCapture as PlainCapture
from .model import Pointer as Pointer
from .model import Preset as Preset
Expand All @@ -42,6 +41,5 @@
from .model import RxPut as RxPut
from .model import SimpleCapture as SimpleCapture
from .model import SubcommandPattern as SubcommandPattern
from .model import SubcommandTraverse as SubcommandTraverse
from .model import Track as Track
from .some import Value as Value
253 changes: 99 additions & 154 deletions src/arclet/alconna/sistana/analyzer.py

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions src/arclet/alconna/sistana/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@
from .receiver import RxPrev as RxPrev
from .receiver import RxPut as RxPut
from .snapshot import AnalyzeSnapshot as AnalyzeSnapshot
from .snapshot import OptionTraverse as OptionTraverse
from .snapshot import SubcommandTraverse as SubcommandTraverse
218 changes: 85 additions & 133 deletions src/arclet/alconna/sistana/model/mix.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,60 @@
from __future__ import annotations

from collections import deque
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any

from .pointer import Pointer
from ..err import CaptureRejected, ReceivePanic, TransformPanic, ValidateRejected
from ..some import Some, Value
from ..some import Value
from .fragment import _Fragment, assert_fragments_order

if TYPE_CHECKING:
from elaina_segment import Buffer

from .receiver import RxPrev, RxPut


class Track:
__slots__ = ("fragments", "assignes", "header")
__slots__ = ("fragments", "header", "cursor", "max_length", "emitted")

header: _Fragment | None
fragments: deque[_Fragment]
assignes: dict[str, Any]
fragments: tuple[_Fragment, ...]
cursor: int
max_length: int
emitted: bool

def __init__(self, fragments: Iterable[_Fragment], assignes: dict[str, Any] | None = None, header: _Fragment | None = None):
self.fragments = deque(fragments)
self.assignes = assignes or {}
def __init__(self, fragments: tuple[_Fragment, ...], header: _Fragment | None = None):
self.fragments = fragments
self.header = header
self.cursor = 0
self.max_length = len(self.fragments)
self.emitted = False

@property
def satisfied(self):
if not self.fragments:
return True
return self.cursor >= self.max_length or self.fragments[0].default is not None or self.fragments[0].variadic

first = self.fragments[0]
return first.default is not None or first.variadic

def apply_defaults(self):
def apply_defaults(self, mix: Mix):
for frag in self.fragments:
if frag.name not in self.assignes and frag.default is not None:
self.assignes[frag.name] = frag.default.value

if self.header is not None and self.header.name not in self.assignes and self.header.default is not None:
self.assignes[self.header.name] = self.header.default.value
if frag.name not in mix.assignes and frag.default is not None:
mix.assignes[frag.name] = frag.default.value

def complete(self):
self.apply_defaults()
if self.header is not None and self.header.name not in mix.assignes and self.header.default is not None:
mix.assignes[self.header.name] = self.header.default.value

if self.fragments:
first = self.fragments[-1]
if first.variadic and first.name not in self.assignes:
self.assignes[first.name] = []
def complete(self, mix: Mix):
if not self.fragments:
return

self.apply_defaults(mix)

first = self.fragments[-1]
if first.variadic and first.name not in mix.assignes:
mix.assignes[first.name] = []

def fetch(
self,
mix: Mix,
frag: _Fragment,
buffer: Buffer,
upper_separators: str,
rxprev: RxPrev[Any] | None = None, # type: ignore
rxput: RxPut[Any] | None = None, # type: ignore
):
tail = None
token = None
Expand Down Expand Up @@ -84,24 +83,19 @@ def rxfetch():

return val

if rxprev is None:

def rxprev():
if frag.name in self.assignes:
return Value(self.assignes[frag.name])
def rxprev():
if frag.name in mix.assignes:
return Value(mix.assignes[frag.name])

if rxput is None:
if frag.variadic:
if frag.variadic:
def rxput(val):
if frag.name not in mix.assignes:
mix.assignes[frag.name] = []

def rxput(val):
if frag.name not in self.assignes:
self.assignes[frag.name] = []

self.assignes[frag.name].append(val)
else:

def rxput(val):
self.assignes[frag.name] = val
mix.assignes[frag.name].append(val)
else:
def rxput(val):
mix.assignes[frag.name] = val

try:
frag.receiver.receive(rxfetch, rxprev, rxput)
Expand All @@ -118,37 +112,35 @@ def rxput(val):

def forward(
self,
mix: Mix,
buffer: Buffer,
separators: str,
rxprev: RxPrev[Any] | None = None,
rxput: RxPut[Any] | None = None,
):
if not self.fragments:
if self.cursor >= self.max_length:
return

first = self.fragments[0]
self.fetch(first, buffer, separators, rxprev, rxput)
first = self.fragments[self.cursor]
self.fetch(mix, first, buffer, separators)

if not first.variadic:
self.fragments.popleft()
self.cursor += 1

return first

def emit_header(self, segment: str, rx_prev: RxPrev[Any] | None = None):
def emit_header(self, mix: Mix, segment: str):
self.emitted = True

if self.header is None:
return

header = self.header

if rx_prev is None:
def rxprev():
if header.name in self.assignes:
return Value(self.assignes[header.name])
else:
rxprev = rx_prev
def rxprev():
if header.name in mix.assignes:
return Value(mix.assignes[header.name])

def rxput(val):
self.assignes[header.name] = val
mix.assignes[header.name] = val

try:
header.receiver.receive(
Expand All @@ -163,97 +155,57 @@ def rxput(val):

@property
def assignable(self):
return bool(self.fragments)
return self.cursor < self.max_length

def copy(self):
return Track(fragments=self.fragments.copy(), assignes=self.assignes.copy(), header=self.header)
return Track(self.fragments, self.header)

def reset(self):
self.cursor = 0

def copy_spec(self):
return Track(fragments=self.fragments.copy(), header=self.header)
def __bool__(self):
return bool(self.fragments)


class Preset:
__slots__ = ("tracks", "net")
__slots__ = ("subcommand_track", "option_tracks")

tracks: dict[str, Track]
subcommand_track: Track
option_tracks: dict[str, Track]

def __init__(self, tracks: dict[str, Track] | None = None):
self.tracks = tracks or {}
def __init__(self, subcommand_track: Track, option_tracks: dict[str, Track]):
self.subcommand_track = subcommand_track
self.option_tracks = option_tracks

for track in self.tracks.values():
assert_fragments_order(subcommand_track.fragments)
for track in self.option_tracks.values():
assert_fragments_order(track.fragments)

def new_track(self, name: str) -> Track:
return self.tracks[name].copy_spec()

def new_mix(self) -> Mix:
return Mix(self)


class Mix:
__slots__ = ("preset", "tracks", "net")

preset: Preset
tracks: dict[str, Track]

def __init__(self, preset: Preset):
self.preset = preset
self.tracks = {name: self.preset.new_track(name) for name in self.preset.tracks}

def pop_track(self, name: str, keep_assignes: bool = False) -> Track:
track = self.tracks.pop(name)
self.tracks[name] = self.preset.new_track(name)

if keep_assignes:
self.tracks[name].assignes.update(track.assignes)

return track

def reset_track(self, name: str):
if name not in self.tracks:
raise KeyError # invalid track name

track = self.tracks[name]
track.fragments = self.preset.tracks[name].fragments.copy()
track.assignes.clear()
__slots__ = ("assignes", "tracks")

def get_track(self, name: str) -> Track:
return self.tracks[name]
assignes: dict[str, Any]
tracks: dict[Pointer, Track]

def is_track_satisfied(self, name: str) -> bool:
return self.tracks[name].satisfied
def __init__(self):
self.assignes = {}
self.tracks = {}

@property
def satisfied(self) -> bool:
def complete(self):
for track in self.tracks.values():
if not track.satisfied:
return False
track.complete(self)

return True
def reset_track(self, ref: Pointer):
track = self.tracks[ref]
track.reset()

def complete_track(self, name: str):
self.tracks[name].complete()

def complete_all(self):
for track in self.tracks.values():
track.complete()
@property
def satisfied(self):
return all(track.satisfied for track in self.tracks.values())

def export_track(self, name: str) -> dict[str, Some[Any]]:
track = self.tracks[name]
result = {}
for frag in self.preset.tracks[name].fragments:
if not frag.export:
continue
def update(self, root: Pointer, preset: Preset):
self.tracks[root] = preset.subcommand_track.copy()

if frag.name in track.assignes:
result[frag.name] = Value(track.assignes[frag.name])
else:
result[frag.name] = None

if track.header is not None and track.header.export:
if track.header.name in track.assignes:
result[track.header.name] = Value(track.assignes[track.header.name])
else:
result[track.header.name] = None

return result
for track_id, track in preset.option_tracks.items():
self.tracks[root.option(track_id)] = track.copy()
Loading

0 comments on commit b245fd0

Please sign in to comment.