Skip to content

Commit

Permalink
Merge branch 'master' of github.com:undera/pylgbst
Browse files Browse the repository at this point in the history
  • Loading branch information
undera committed Feb 15, 2023
2 parents 754add6 + 1d77645 commit 27e2ffe
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 62 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ If you have Vernie assembled, you might run scripts from [`examples/vernie`](exa
[![Color Pin Bot](http://img.youtube.com/vi/QY6nRYXQw_U/0.jpg)](https://youtu.be/QY6nRYXQw_U)
[![BB-8 Joystick](http://img.youtube.com/vi/55kE9I4IQSU/0.jpg)](https://youtu.be/55kE9I4IQSU)

[Dancing Vernie](https://youtu.be/Cp2gDleP8_M)


## Features

Expand Down Expand Up @@ -106,3 +108,4 @@ hub = MoveHub(conn)
- https://github.com/nathankellenicki/node-poweredup - JavaScript version of library
- https://github.com/spezifisch/sphero-python/blob/master/BB8joyDrive.py - example with another approach to bluetooth libs
- https://github.com/virantha/bricknil - for the lovers of async Python, alternative implementation of library to control PoweredUp Hubs
- https://virantha.github.io/bricknil/lego_api/lego.html - good infor about modes by BrickNil
14 changes: 9 additions & 5 deletions pylgbst/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

PERIPHERAL_TYPES = {
DevTypes.MOTOR: Motor,
DevTypes.SYSTEM_TRAIN_MOTOR: EncodedMotor,
DevTypes.SYSTEM_TRAIN_MOTOR: TrainMotor,
DevTypes.MOTOR_EXTERNAL_TACHO: EncodedMotor,
DevTypes.MOTOR_INTERNAL_TACHO: EncodedMotor,
DevTypes.VISION_SENSOR: VisionSensor,
Expand Down Expand Up @@ -152,8 +152,11 @@ def _handle_action(self, msg):

def _handle_device_change(self, msg):
if msg.event == MsgHubAttachedIO.EVENT_DETACHED:
log.debug("Detaching peripheral: %s", self.peripherals[msg.port])
self.peripherals.pop(msg.port)
if msg.port not in self.peripherals:
log.warning("Strange: got detach command for port %s that is not attached, will ignore it", msg.port)
else:
log.info("Detaching peripheral: %s", self.peripherals[msg.port])
self.peripherals.pop(msg.port)
return

assert msg.event in (msg.EVENT_ATTACHED, msg.EVENT_ATTACHED_VIRTUAL)
Expand Down Expand Up @@ -219,8 +222,8 @@ class MoveHub(Hub):
:type current: Current
:type voltage: Voltage
:type vision_sensor: pylgbst.peripherals.VisionSensor
:type port_C: Peripheral
:type port_D: Peripheral
:type port_C: pylgbst.peripherals.Peripheral
:type port_D: pylgbst.peripherals.Peripheral
:type motor_A: EncodedMotor
:type motor_B: EncodedMotor
:type motor_AB: EncodedMotor
Expand Down Expand Up @@ -335,6 +338,7 @@ def _handle_device_change(self, msg):
class SmartHub(Hub):
"""
Class implementing Lego SmartHub specifics
https://www.lego.com/en-pt/product/hub-88009
:type led: LEDRGB
:type current: Current
Expand Down
123 changes: 67 additions & 56 deletions pylgbst/peripherals.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import math
import time
import traceback
from struct import pack, unpack
Expand All @@ -15,7 +14,7 @@
MsgPortModeInfo,
MsgPortInputFmtSingle,
)
from pylgbst.utilities import queue, str2hex, usbyte, ushort, usint
from pylgbst.utilities import queue, str2hex, usbyte, ushort, usint, abs_scaled_100

log = logging.getLogger("peripherals")

Expand Down Expand Up @@ -75,8 +74,8 @@ def __init__(self, parent, port):

self._incoming_port_data = queue.Queue(1) # limit 1 means we drop data if we can't handle it fast enough
thr = Thread(target=self._queue_reader)
thr.setDaemon(True)
thr.setName("Port data queue: %s" % self)
thr.daemon = True
thr.name = "Port data queue: %s" % self
thr.start()

def __repr__(self):
Expand All @@ -97,9 +96,9 @@ def set_port_mode(self, mode, send_updates=None, update_delta=None):
log.debug("Implied update delta=%s", update_delta)

if (
self._port_mode.mode == mode
and self._port_mode.upd_enabled == send_updates
and self._port_mode.upd_delta == update_delta
self._port_mode.mode == mode
and self._port_mode.upd_enabled == send_updates
and self._port_mode.upd_delta == update_delta
):
log.debug("Already in target mode, no need to switch")
return
Expand Down Expand Up @@ -246,10 +245,10 @@ def set_color(self, color):
assert len(color) == 3, "RGB color has to have 3 values"
self.set_port_mode(self.MODE_RGB)
payload = (
pack("<B", self.MODE_RGB)
+ pack("<B", color[0])
+ pack("<B", color[1])
+ pack("<B", color[2])
pack("<B", self.MODE_RGB)
+ pack("<B", color[0])
+ pack("<B", color[1])
+ pack("<B", color[2])
)
else:
if color == COLOR_NONE:
Expand Down Expand Up @@ -283,7 +282,7 @@ def _decode_port_data(self, msg):
usbyte(msg.payload, 2),
)
else:
return (usbyte(msg.payload, 0),)
return usbyte(msg.payload, 0),

# Set color of the RGB LED (no getter)
color = property(fset=set_color)
Expand All @@ -304,9 +303,9 @@ def set_brightness(self, brightness):
:type brightness: <int> or <float>
"""
if (
not isinstance(brightness, (int, float))
or brightness > 100
or brightness < 0
not isinstance(brightness, (int, float))
or brightness > 100
or brightness < 0
):
raise ValueError("Brightness must be a number between 0 and 100")

Expand All @@ -331,10 +330,36 @@ def brightness(self, value):
self.set_brightness(value)

def _decode_port_data(self, msg):
return (usbyte(msg.payload, 0),)
return usbyte(msg.payload, 0),


class Motor(Peripheral):
class BaseMotor(Peripheral):
def _write_direct_mode(self, subcmd, params):
params = pack("<B", subcmd) + params
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, params)
self._send_output(msg)


class TrainMotor(BaseMotor):
"""
Simple DC motor (Lego 88011).
See https://github.com/undera/pylgbst/issues/129
"""
SUBCMD_POWER = 0x00
SUBCMD_1 = 0x01 # TODO: figure out what it does. We know it's not sensor mode.

def power(self, param=1.0):
"""
Power the motor, with value -1.0..1.0
"""
params = pack("<b", abs_scaled_100(param))
self._write_direct_mode(self.SUBCMD_POWER, params)

def stop(self):
self.power(0)


class Motor(BaseMotor):
SUBCMD_START_POWER = 0x01
SUBCMD_START_POWER_GROUPED = 0x02
SUBCMD_SET_ACC_TIME = 0x05
Expand All @@ -352,27 +377,13 @@ def __init__(self, parent, port):
super().__init__(parent, port)
self.cmd_in_progress = False

def _speed_abs(self, relative):
def _speed_abs(self, relative): # FIXME: it's not "speed", rather it's a "power"
if relative == Motor.END_STATE_BRAKE or relative == Motor.END_STATE_HOLD:
# special value for BRAKE
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
return relative

if relative < -1:
log.warning("Speed cannot be less than -1")
relative = -1

if relative > 1:
log.warning("Speed cannot be more than 1")
relative = 1

absolute = math.ceil(relative * 100) # scale of 100 is proven by experiments
return int(absolute)

def _write_direct_mode(self, subcmd, params):
params = pack("<B", subcmd) + params
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, params)
self._send_output(msg)
return abs_scaled_100(relative)

def _send_cmd(self, subcmd, params, wait_complete=True):
if self.virtual_ports:
Expand Down Expand Up @@ -530,10 +541,10 @@ def _decode_port_data(self, msg):
data = msg.payload
if self._port_mode.mode == self.SENSOR_ANGLE:
angle = unpack("<l", data[0:4])[0]
return (angle,)
return angle,
elif self._port_mode.mode == self.SENSOR_SPEED:
speed = unpack("<b", data[0:1])[0]
return (speed,)
return speed,
else:
log.debug("Got motor sensor data while in unexpected mode: %r", self._port_mode)
return ()
Expand Down Expand Up @@ -604,29 +615,29 @@ def _decode_port_data(self, msg):
if self._port_mode.mode == self.MODE_2AXIS_ANGLE:
roll = unpack("<b", data[0:1])[0]
pitch = unpack("<b", data[1:2])[0]
return (roll, pitch)
return roll, pitch
elif self._port_mode.mode == self.MODE_3AXIS_SIMPLE:
state = usbyte(data, 0)
return (state,)
return state,
elif self._port_mode.mode == self.MODE_2AXIS_SIMPLE:
state = usbyte(data, 0)
return (state,)
return state,
elif self._port_mode.mode == self.MODE_IMPACT_COUNT:
bump_count = usint(data, 0)
return (bump_count,)
return bump_count,
elif self._port_mode.mode == self.MODE_3AXIS_ACCEL:
roll = unpack("<b", data[0:1])[0]
pitch = unpack("<b", data[1:2])[0]
yaw = unpack("<b", data[2:3])[0] # did I get the order right?
return (roll, pitch, yaw)
return roll, pitch, yaw
elif self._port_mode.mode == self.MODE_ORIENT_CF:
state = usbyte(data, 0)
return (state,)
return state,
elif self._port_mode.mode == self.MODE_IMPACT_CF:
state = usbyte(data, 0)
return (state,)
return state,
elif self._port_mode.mode == self.MODE_CALIBRATION:
return (usbyte(data, 0), usbyte(data, 1), usbyte(data, 2))
return usbyte(data, 0), usbyte(data, 1), usbyte(data, 2)
else:
log.debug("Got tilt sensor data while in unexpected mode: %r", self._port_mode)
return ()
Expand Down Expand Up @@ -659,35 +670,35 @@ def _decode_port_data(self, msg):
data = msg.payload
if self._port_mode.mode == self.COLOR_INDEX:
color = usbyte(data, 0)
return (color,)
return color,
elif self._port_mode.mode == self.COLOR_DISTANCE_FLOAT:
color = usbyte(data, 0)
val = usbyte(data, 1)
partial = usbyte(data, 3)
if partial:
val += 1.0 / partial
return (color, float(val))
return color, float(val)
elif self._port_mode.mode == self.DISTANCE_INCHES:
val = usbyte(data, 0)
return (val,)
return val,
elif self._port_mode.mode == self.DISTANCE_REFLECTED:
val = usbyte(data, 0) / 100.0
return (val,)
return val,
elif self._port_mode.mode == self.AMBIENT_LIGHT:
val = usbyte(data, 0) / 100.0
return (val,)
return val,
elif self._port_mode.mode == self.COUNT_2INCH:
count = usint(data, 0)
return (count,)
return count,
elif self._port_mode.mode == self.COLOR_RGB:
val1 = int(255 * ushort(data, 0) / 1023.0)
val2 = int(255 * ushort(data, 2) / 1023.0)
val3 = int(255 * ushort(data, 4) / 1023.0)
return (val1, val2, val3)
return val1, val2, val3
elif self._port_mode.mode == self.DEBUG:
val1 = 10 * ushort(data, 0) / 1023.0
val2 = 10 * ushort(data, 2) / 1023.0
return (val1, val2)
return val1, val2
elif self._port_mode.mode == self.CALIBRATE:
return [ushort(data, x * 2) for x in range(8)]
else:
Expand Down Expand Up @@ -783,7 +794,7 @@ def _decode_port_data(self, msg):
data = msg.payload
val = ushort(data, 0)
volts = 9600.0 * val / 3893.0 / 1000.0
return (volts,)
return volts,

@property
def voltage(self):
Expand All @@ -807,7 +818,7 @@ def __init__(self, parent, port):
def _decode_port_data(self, msg):
val = ushort(msg.payload, 0)
milliampers = 2444 * val / 4095.0
return (milliampers,)
return milliampers,

@property
def current(self):
Expand Down Expand Up @@ -846,8 +857,8 @@ def _props_msg(self, msg):
:type msg: MsgHubProperties
"""
if (
msg.property == MsgHubProperties.BUTTON
and msg.operation == MsgHubProperties.UPSTREAM_UPDATE
msg.property == MsgHubProperties.BUTTON
and msg.operation == MsgHubProperties.UPSTREAM_UPDATE
):
self._notify_subscribers(usbyte(msg.parameters, 0))

Expand All @@ -862,7 +873,7 @@ def __init__(self, parent, port):
def _decode_port_data(self, msg):
# Fix temp with a small offset to get the real temperature
magic_offset = 2.1
return ((unpack("<h", msg.payload)[0] / 10) - magic_offset,)
return (unpack("<h", msg.payload)[0] / 10) - magic_offset,

@property
def temperature(self):
Expand Down
16 changes: 15 additions & 1 deletion pylgbst/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import binascii
import logging
import math
import sys
from struct import unpack

Expand All @@ -19,7 +20,7 @@

def check_unpack(seq, index, pattern, size):
"""Check that we got size bytes, if so, unpack using pattern"""
data = seq[index : index + size]
data = seq[index: index + size]
assert len(data) == size, "Unexpected data len %d, expected %d" % (len(data), size)
return unpack(pattern, data)[0]

Expand All @@ -43,3 +44,16 @@ def str2hex(data): # we need it for python 2+3 compatibility
data = bytes(data, "ascii")
hexed = binascii.hexlify(data)
return hexed


def abs_scaled_100(relative):
if relative < -1.0:
log.warning("Speed cannot be less than -1")
relative = -1.0

if relative > 1.0:
log.warning("Speed cannot be more than 1")
relative = 1.0

absolute = math.ceil(relative * 100) # scale of 100 is proven by experiments
return int(absolute)

0 comments on commit 27e2ffe

Please sign in to comment.