Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

ADB over serial port support. serial=/dev/ttyS1,115200 #178

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions adb/adb_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, **

if serial and ':' in serial:
self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms)
elif 'tty' in serial:
self._handle = common.SerialHandle(serial, timeout_ms=default_timeout_ms)
else:
self._handle = common.UsbHandle.FindAndOpen(
DeviceIsAvailable, port_path=port_path, serial=serial,
Expand Down
56 changes: 56 additions & 0 deletions adb/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import weakref
import select

import serial

import libusb1
import usb1

Expand Down Expand Up @@ -285,6 +287,60 @@ def FindDevices(cls, setting_matcher, device_matcher=None,
if device_matcher is None or device_matcher(handle):
yield handle

class SerialHandle(object):
def __init__(self, serial, timeout_ms=None):

if isinstance(serial, (bytes, bytearray)):
serial = serial.decode('utf-8')

if ',' in serial:
self.port, self.speed = serial.split(',')
else:
self.port = serial
self.speed = 115200

self._connection = None
self._serial_number = '%s,%s' % (self.port, self.speed)
self._timeout_ms = float(timeout_ms) if timeout_ms else None

self._connect()

def _connect(self):
timeout = self.TimeoutSeconds(self._timeout_ms)

self._connection = serial.Serial(self.port, self.speed, timeout=timeout)

@property
def serial_number(self):
return self._serial_number

def BulkWrite(self, data, timeout=None):
t = self.TimeoutSeconds(timeout)
_, writeable, _ = select.select([], [self._connection], [], t)
if writeable:
return self._connection.write(data)
msg = 'Sending data to {} timed out after {}s. No data was sent.'.format(
self.serial_number, t)
raise serial.SerialTimeoutException(msg)

def BulkRead(self, numbytes, timeout=None):
t = self.TimeoutSeconds(timeout)
readable, _, _ = select.select([self._connection], [], [], t)
if readable:
return self._connection.read(numbytes)
msg = 'Reading from {} timed out (Timeout {}s)'.format(
self._serial_number, t)
#raise serial.TcpTimeoutException(msg)

def Timeout(self, timeout_ms):
return float(timeout_ms) if timeout_ms is not None else self._timeout_ms

def TimeoutSeconds(self, timeout_ms):
timeout = self.Timeout(timeout_ms)
return timeout / 1000.0 if timeout is not None else timeout

def Close(self):
return self._connection.close()

class TcpHandle(object):
"""TCP connection object.
Expand Down