Skip to content

Commit

Permalink
Add input cli
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagocoutinho committed Dec 4, 2023
1 parent cf87367 commit a3e895b
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions linuxpy/input/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# This file is part of the linuxpy project
#
# Copyright (c) 2023 Tiago Coutinho
# Distributed under the GPLv3 license. See LICENSE for more info.

import argparse
import asyncio

from linuxpy.input.device import Device, async_event_batch_stream, event_batch_stream, find


def ls(_):
print(f"{'Name':^32} {'Location':<32} {'Version':<6} {'Filename':<32}")
for inp in sorted(find(find_all=True), key=lambda d: d.index):
with inp:
print(f"{inp.name:>32} {inp.physical_location:>32} {inp.version:6} {inp.filename}")


def listen(args):
print(f" # {'Name':^32} {'Type':^6} {'Code':<16} {'Value':<6}")
with Device.from_id(args.addr) as device:
for batch in event_batch_stream(device.fileno()):
for event in batch:
print(f"{device.index:<2} {device.name:>32} {event.type.name:<6} {event.code.name:<16} {event.value}")


async def async_listen(args):
print(f" # {'Name':^32} {'Type':^6} {'Code':<16} {'Value':<6}")
queue = asyncio.Queue()

async def go(addr):
with Device.from_id(addr) as device:
async for event in async_event_batch_stream(device.fileno()):
await queue.put((device, event))

[asyncio.create_task(go(addr)) for addr in args.addr]

while True:
device, batch = await queue.get()
for event in batch:
print(f"{device.index:<2} {device.name:>32} {event.type.name:<6} {event.code.name:<16} {event.value}")


def cli():
parser = argparse.ArgumentParser()
sub_parsers = parser.add_subparsers(
title="sub-commands", description="valid sub-commands", help="select one command", required=True, dest="command"
)
listen = sub_parsers.add_parser("listen", aliases=["dump"], help="listen for events on selected input")
listen.add_argument("addr", help="address", type=int)
alisten = sub_parsers.add_parser("alisten", aliases=["adump"], help="listen for events on selected input(s)")
alisten.add_argument("addr", help="address(es)", type=int, nargs="+")
sub_parsers.add_parser("ls", help="list inputs")
return parser


def run(args):
if args.command in {"listen", "dump"}:
listen(args)
elif args.command in {"alisten", "adump"}:
asyncio.run(async_listen(args))
elif args.command == "ls":
ls(args)


def main(args=None):
parser = cli()
args = parser.parse_args(args=args)
try:
run(args)
except KeyboardInterrupt:
print("\rCtrl-C pressed. Bailing out")


if __name__ == "__main__":
main()

0 comments on commit a3e895b

Please sign in to comment.