552 lines
20 KiB
Python
552 lines
20 KiB
Python
|
#! python
|
||
|
# Python Serial Port Extension for Win32, Linux, BSD, Jython
|
||
|
# see __init__.py
|
||
|
#
|
||
|
# (C) 2001-2010 Chris Liechti <cliechti@gmx.net>
|
||
|
# this is distributed under a free software license, see license.txt
|
||
|
|
||
|
# compatibility for older Python < 2.6
|
||
|
try:
|
||
|
bytes
|
||
|
bytearray
|
||
|
except (NameError, AttributeError):
|
||
|
# Python older than 2.6 do not have these types. Like for Python 2.6 they
|
||
|
# should behave like str. For Python older than 3.0 we want to work with
|
||
|
# strings anyway, only later versions have a true bytes type.
|
||
|
bytes = str
|
||
|
# bytearray is a mutable type that is easily turned into an instance of
|
||
|
# bytes
|
||
|
class bytearray(list):
|
||
|
# for bytes(bytearray()) usage
|
||
|
def __str__(self): return ''.join(self)
|
||
|
def __repr__(self): return 'bytearray(%r)' % ''.join(self)
|
||
|
# append automatically converts integers to characters
|
||
|
def append(self, item):
|
||
|
if isinstance(item, str):
|
||
|
list.append(self, item)
|
||
|
else:
|
||
|
list.append(self, chr(item))
|
||
|
# +=
|
||
|
def __iadd__(self, other):
|
||
|
for byte in other:
|
||
|
self.append(byte)
|
||
|
return self
|
||
|
|
||
|
def __getslice__(self, i, j):
|
||
|
return bytearray(list.__getslice__(self, i, j))
|
||
|
|
||
|
def __getitem__(self, item):
|
||
|
if isinstance(item, slice):
|
||
|
return bytearray(list.__getitem__(self, item))
|
||
|
else:
|
||
|
return ord(list.__getitem__(self, item))
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
if isinstance(other, basestring):
|
||
|
other = bytearray(other)
|
||
|
return list.__eq__(self, other)
|
||
|
|
||
|
# ``memoryview`` was introduced in Python 2.7 and ``bytes(some_memoryview)``
|
||
|
# isn't returning the contents (very unfortunate). Therefore we need special
|
||
|
# cases and test for it. Ensure that there is a ``memoryview`` object for older
|
||
|
# Python versions. This is easier than making every test dependent on its
|
||
|
# existence.
|
||
|
try:
|
||
|
memoryview
|
||
|
except (NameError, AttributeError):
|
||
|
# implementation does not matter as we do not realy use it.
|
||
|
# it just must not inherit from something else we might care for.
|
||
|
class memoryview:
|
||
|
pass
|
||
|
|
||
|
|
||
|
# all Python versions prior 3.x convert ``str([17])`` to '[17]' instead of '\x11'
|
||
|
# so a simple ``bytes(sequence)`` doesn't work for all versions
|
||
|
def to_bytes(seq):
|
||
|
"""convert a sequence to a bytes type"""
|
||
|
if isinstance(seq, bytes):
|
||
|
return seq
|
||
|
elif isinstance(seq, bytearray):
|
||
|
return bytes(seq)
|
||
|
elif isinstance(seq, memoryview):
|
||
|
return seq.tobytes()
|
||
|
else:
|
||
|
b = bytearray()
|
||
|
for item in seq:
|
||
|
b.append(item) # this one handles int and str for our emulation and ints for Python 3.x
|
||
|
return bytes(b)
|
||
|
|
||
|
# create control bytes
|
||
|
XON = to_bytes([17])
|
||
|
XOFF = to_bytes([19])
|
||
|
|
||
|
CR = to_bytes([13])
|
||
|
LF = to_bytes([10])
|
||
|
|
||
|
|
||
|
PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S'
|
||
|
STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2)
|
||
|
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
|
||
|
|
||
|
PARITY_NAMES = {
|
||
|
PARITY_NONE: 'None',
|
||
|
PARITY_EVEN: 'Even',
|
||
|
PARITY_ODD: 'Odd',
|
||
|
PARITY_MARK: 'Mark',
|
||
|
PARITY_SPACE: 'Space',
|
||
|
}
|
||
|
|
||
|
|
||
|
class SerialException(IOError):
|
||
|
"""Base class for serial port related exceptions."""
|
||
|
|
||
|
|
||
|
class SerialTimeoutException(SerialException):
|
||
|
"""Write timeouts give an exception"""
|
||
|
|
||
|
|
||
|
writeTimeoutError = SerialTimeoutException('Write timeout')
|
||
|
portNotOpenError = SerialException('Attempting to use a port that is not open')
|
||
|
|
||
|
|
||
|
class FileLike(object):
|
||
|
"""An abstract file like class.
|
||
|
|
||
|
This class implements readline and readlines based on read and
|
||
|
writelines based on write.
|
||
|
This class is used to provide the above functions for to Serial
|
||
|
port objects.
|
||
|
|
||
|
Note that when the serial port was opened with _NO_ timeout that
|
||
|
readline blocks until it sees a newline (or the specified size is
|
||
|
reached) and that readlines would never return and therefore
|
||
|
refuses to work (it raises an exception in this case)!
|
||
|
"""
|
||
|
|
||
|
def __init__(self):
|
||
|
self.closed = True
|
||
|
|
||
|
def close(self):
|
||
|
self.closed = True
|
||
|
|
||
|
# so that ports are closed when objects are discarded
|
||
|
def __del__(self):
|
||
|
"""Destructor. Calls close()."""
|
||
|
# The try/except block is in case this is called at program
|
||
|
# exit time, when it's possible that globals have already been
|
||
|
# deleted, and then the close() call might fail. Since
|
||
|
# there's nothing we can do about such failures and they annoy
|
||
|
# the end users, we suppress the traceback.
|
||
|
try:
|
||
|
self.close()
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
def writelines(self, sequence):
|
||
|
for line in sequence:
|
||
|
self.write(line)
|
||
|
|
||
|
def flush(self):
|
||
|
"""flush of file like objects"""
|
||
|
pass
|
||
|
|
||
|
# iterator for e.g. "for line in Serial(0): ..." usage
|
||
|
def next(self):
|
||
|
line = self.readline()
|
||
|
if not line: raise StopIteration
|
||
|
return line
|
||
|
|
||
|
def __iter__(self):
|
||
|
return self
|
||
|
|
||
|
def readline(self, size=None, eol=LF):
|
||
|
"""read a line which is terminated with end-of-line (eol) character
|
||
|
('\n' by default) or until timeout."""
|
||
|
leneol = len(eol)
|
||
|
line = bytearray()
|
||
|
while True:
|
||
|
c = self.read(1)
|
||
|
if c:
|
||
|
line += c
|
||
|
if line[-leneol:] == eol:
|
||
|
break
|
||
|
if size is not None and len(line) >= size:
|
||
|
break
|
||
|
else:
|
||
|
break
|
||
|
return bytes(line)
|
||
|
|
||
|
def readlines(self, sizehint=None, eol=LF):
|
||
|
"""read a list of lines, until timeout.
|
||
|
sizehint is ignored."""
|
||
|
if self.timeout is None:
|
||
|
raise ValueError("Serial port MUST have enabled timeout for this function!")
|
||
|
leneol = len(eol)
|
||
|
lines = []
|
||
|
while True:
|
||
|
line = self.readline(eol=eol)
|
||
|
if line:
|
||
|
lines.append(line)
|
||
|
if line[-leneol:] != eol: # was the line received with a timeout?
|
||
|
break
|
||
|
else:
|
||
|
break
|
||
|
return lines
|
||
|
|
||
|
def xreadlines(self, sizehint=None):
|
||
|
"""Read lines, implemented as generator. It will raise StopIteration on
|
||
|
timeout (empty read). sizehint is ignored."""
|
||
|
while True:
|
||
|
line = self.readline()
|
||
|
if not line: break
|
||
|
yield line
|
||
|
|
||
|
# other functions of file-likes - not used by pySerial
|
||
|
|
||
|
#~ readinto(b)
|
||
|
|
||
|
def seek(self, pos, whence=0):
|
||
|
raise IOError("file is not seekable")
|
||
|
|
||
|
def tell(self):
|
||
|
raise IOError("file is not seekable")
|
||
|
|
||
|
def truncate(self, n=None):
|
||
|
raise IOError("file is not seekable")
|
||
|
|
||
|
def isatty(self):
|
||
|
return False
|
||
|
|
||
|
|
||
|
class SerialBase(object):
|
||
|
"""Serial port base class. Provides __init__ function and properties to
|
||
|
get/set port settings."""
|
||
|
|
||
|
# default values, may be overridden in subclasses that do not support all values
|
||
|
BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
|
||
|
9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000,
|
||
|
576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000,
|
||
|
3000000, 3500000, 4000000)
|
||
|
BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
|
||
|
PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)
|
||
|
STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
|
||
|
|
||
|
def __init__(self,
|
||
|
port = None, # number of device, numbering starts at
|
||
|
# zero. if everything fails, the user
|
||
|
# can specify a device string, note
|
||
|
# that this isn't portable anymore
|
||
|
# port will be opened if one is specified
|
||
|
baudrate=9600, # baud rate
|
||
|
bytesize=EIGHTBITS, # number of data bits
|
||
|
parity=PARITY_NONE, # enable parity checking
|
||
|
stopbits=STOPBITS_ONE, # number of stop bits
|
||
|
timeout=None, # set a timeout value, None to wait forever
|
||
|
xonxoff=False, # enable software flow control
|
||
|
rtscts=False, # enable RTS/CTS flow control
|
||
|
writeTimeout=None, # set a timeout for writes
|
||
|
dsrdtr=False, # None: use rtscts setting, dsrdtr override if True or False
|
||
|
interCharTimeout=None # Inter-character timeout, None to disable
|
||
|
):
|
||
|
"""Initialize comm port object. If a port is given, then the port will be
|
||
|
opened immediately. Otherwise a Serial port object in closed state
|
||
|
is returned."""
|
||
|
|
||
|
self._isOpen = False
|
||
|
self._port = None # correct value is assigned below through properties
|
||
|
self._baudrate = None # correct value is assigned below through properties
|
||
|
self._bytesize = None # correct value is assigned below through properties
|
||
|
self._parity = None # correct value is assigned below through properties
|
||
|
self._stopbits = None # correct value is assigned below through properties
|
||
|
self._timeout = None # correct value is assigned below through properties
|
||
|
self._writeTimeout = None # correct value is assigned below through properties
|
||
|
self._xonxoff = None # correct value is assigned below through properties
|
||
|
self._rtscts = None # correct value is assigned below through properties
|
||
|
self._dsrdtr = None # correct value is assigned below through properties
|
||
|
self._interCharTimeout = None # correct value is assigned below through properties
|
||
|
|
||
|
# assign values using get/set methods using the properties feature
|
||
|
self.port = port
|
||
|
self.baudrate = baudrate
|
||
|
self.bytesize = bytesize
|
||
|
self.parity = parity
|
||
|
self.stopbits = stopbits
|
||
|
self.timeout = timeout
|
||
|
self.writeTimeout = writeTimeout
|
||
|
self.xonxoff = xonxoff
|
||
|
self.rtscts = rtscts
|
||
|
self.dsrdtr = dsrdtr
|
||
|
self.interCharTimeout = interCharTimeout
|
||
|
|
||
|
if port is not None:
|
||
|
self.open()
|
||
|
|
||
|
def isOpen(self):
|
||
|
"""Check if the port is opened."""
|
||
|
return self._isOpen
|
||
|
|
||
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||
|
|
||
|
# TODO: these are not really needed as the is the BAUDRATES etc. attribute...
|
||
|
# maybe i remove them before the final release...
|
||
|
|
||
|
def getSupportedBaudrates(self):
|
||
|
return [(str(b), b) for b in self.BAUDRATES]
|
||
|
|
||
|
def getSupportedByteSizes(self):
|
||
|
return [(str(b), b) for b in self.BYTESIZES]
|
||
|
|
||
|
def getSupportedStopbits(self):
|
||
|
return [(str(b), b) for b in self.STOPBITS]
|
||
|
|
||
|
def getSupportedParities(self):
|
||
|
return [(PARITY_NAMES[b], b) for b in self.PARITIES]
|
||
|
|
||
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||
|
|
||
|
def setPort(self, port):
|
||
|
"""Change the port. The attribute portstr is set to a string that
|
||
|
contains the name of the port."""
|
||
|
|
||
|
was_open = self._isOpen
|
||
|
if was_open: self.close()
|
||
|
if port is not None:
|
||
|
if isinstance(port, basestring):
|
||
|
self.portstr = port
|
||
|
else:
|
||
|
self.portstr = self.makeDeviceName(port)
|
||
|
else:
|
||
|
self.portstr = None
|
||
|
self._port = port
|
||
|
self.name = self.portstr
|
||
|
if was_open: self.open()
|
||
|
|
||
|
def getPort(self):
|
||
|
"""Get the current port setting. The value that was passed on init or using
|
||
|
setPort() is passed back. See also the attribute portstr which contains
|
||
|
the name of the port as a string."""
|
||
|
return self._port
|
||
|
|
||
|
port = property(getPort, setPort, doc="Port setting")
|
||
|
|
||
|
|
||
|
def setBaudrate(self, baudrate):
|
||
|
"""Change baud rate. It raises a ValueError if the port is open and the
|
||
|
baud rate is not possible. If the port is closed, then the value is
|
||
|
accepted and the exception is raised when the port is opened."""
|
||
|
try:
|
||
|
b = int(baudrate)
|
||
|
except TypeError:
|
||
|
raise ValueError("Not a valid baudrate: %r" % (baudrate,))
|
||
|
else:
|
||
|
if b <= 0:
|
||
|
raise ValueError("Not a valid baudrate: %r" % (baudrate,))
|
||
|
self._baudrate = b
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getBaudrate(self):
|
||
|
"""Get the current baud rate setting."""
|
||
|
return self._baudrate
|
||
|
|
||
|
baudrate = property(getBaudrate, setBaudrate, doc="Baud rate setting")
|
||
|
|
||
|
|
||
|
def setByteSize(self, bytesize):
|
||
|
"""Change byte size."""
|
||
|
if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % (bytesize,))
|
||
|
self._bytesize = bytesize
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getByteSize(self):
|
||
|
"""Get the current byte size setting."""
|
||
|
return self._bytesize
|
||
|
|
||
|
bytesize = property(getByteSize, setByteSize, doc="Byte size setting")
|
||
|
|
||
|
|
||
|
def setParity(self, parity):
|
||
|
"""Change parity setting."""
|
||
|
if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % (parity,))
|
||
|
self._parity = parity
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getParity(self):
|
||
|
"""Get the current parity setting."""
|
||
|
return self._parity
|
||
|
|
||
|
parity = property(getParity, setParity, doc="Parity setting")
|
||
|
|
||
|
|
||
|
def setStopbits(self, stopbits):
|
||
|
"""Change stop bits size."""
|
||
|
if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: %r" % (stopbits,))
|
||
|
self._stopbits = stopbits
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getStopbits(self):
|
||
|
"""Get the current stop bits setting."""
|
||
|
return self._stopbits
|
||
|
|
||
|
stopbits = property(getStopbits, setStopbits, doc="Stop bits setting")
|
||
|
|
||
|
|
||
|
def setTimeout(self, timeout):
|
||
|
"""Change timeout setting."""
|
||
|
if timeout is not None:
|
||
|
try:
|
||
|
timeout + 1 # test if it's a number, will throw a TypeError if not...
|
||
|
except TypeError:
|
||
|
raise ValueError("Not a valid timeout: %r" % (timeout,))
|
||
|
if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,))
|
||
|
self._timeout = timeout
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getTimeout(self):
|
||
|
"""Get the current timeout setting."""
|
||
|
return self._timeout
|
||
|
|
||
|
timeout = property(getTimeout, setTimeout, doc="Timeout setting for read()")
|
||
|
|
||
|
|
||
|
def setWriteTimeout(self, timeout):
|
||
|
"""Change timeout setting."""
|
||
|
if timeout is not None:
|
||
|
if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,))
|
||
|
try:
|
||
|
timeout + 1 #test if it's a number, will throw a TypeError if not...
|
||
|
except TypeError:
|
||
|
raise ValueError("Not a valid timeout: %r" % timeout)
|
||
|
|
||
|
self._writeTimeout = timeout
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getWriteTimeout(self):
|
||
|
"""Get the current timeout setting."""
|
||
|
return self._writeTimeout
|
||
|
|
||
|
writeTimeout = property(getWriteTimeout, setWriteTimeout, doc="Timeout setting for write()")
|
||
|
|
||
|
|
||
|
def setXonXoff(self, xonxoff):
|
||
|
"""Change XON/XOFF setting."""
|
||
|
self._xonxoff = xonxoff
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getXonXoff(self):
|
||
|
"""Get the current XON/XOFF setting."""
|
||
|
return self._xonxoff
|
||
|
|
||
|
xonxoff = property(getXonXoff, setXonXoff, doc="XON/XOFF setting")
|
||
|
|
||
|
def setRtsCts(self, rtscts):
|
||
|
"""Change RTS/CTS flow control setting."""
|
||
|
self._rtscts = rtscts
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getRtsCts(self):
|
||
|
"""Get the current RTS/CTS flow control setting."""
|
||
|
return self._rtscts
|
||
|
|
||
|
rtscts = property(getRtsCts, setRtsCts, doc="RTS/CTS flow control setting")
|
||
|
|
||
|
def setDsrDtr(self, dsrdtr=None):
|
||
|
"""Change DsrDtr flow control setting."""
|
||
|
if dsrdtr is None:
|
||
|
# if not set, keep backwards compatibility and follow rtscts setting
|
||
|
self._dsrdtr = self._rtscts
|
||
|
else:
|
||
|
# if defined independently, follow its value
|
||
|
self._dsrdtr = dsrdtr
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getDsrDtr(self):
|
||
|
"""Get the current DSR/DTR flow control setting."""
|
||
|
return self._dsrdtr
|
||
|
|
||
|
dsrdtr = property(getDsrDtr, setDsrDtr, "DSR/DTR flow control setting")
|
||
|
|
||
|
def setInterCharTimeout(self, interCharTimeout):
|
||
|
"""Change inter-character timeout setting."""
|
||
|
if interCharTimeout is not None:
|
||
|
if interCharTimeout < 0: raise ValueError("Not a valid timeout: %r" % interCharTimeout)
|
||
|
try:
|
||
|
interCharTimeout + 1 # test if it's a number, will throw a TypeError if not...
|
||
|
except TypeError:
|
||
|
raise ValueError("Not a valid timeout: %r" % interCharTimeout)
|
||
|
|
||
|
self._interCharTimeout = interCharTimeout
|
||
|
if self._isOpen: self._reconfigurePort()
|
||
|
|
||
|
def getInterCharTimeout(self):
|
||
|
"""Get the current inter-character timeout setting."""
|
||
|
return self._interCharTimeout
|
||
|
|
||
|
interCharTimeout = property(getInterCharTimeout, setInterCharTimeout, doc="Inter-character timeout setting for read()")
|
||
|
|
||
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||
|
|
||
|
_SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff',
|
||
|
'dsrdtr', 'rtscts', 'timeout', 'writeTimeout', 'interCharTimeout')
|
||
|
|
||
|
def getSettingsDict(self):
|
||
|
"""Get current port settings as a dictionary. For use with
|
||
|
applySettingsDict"""
|
||
|
return dict([(key, getattr(self, '_'+key)) for key in self._SETTINGS])
|
||
|
|
||
|
def applySettingsDict(self, d):
|
||
|
"""apply stored settings from a dictionary returned from
|
||
|
getSettingsDict. it's allowed to delete keys from the dictionary. these
|
||
|
values will simply left unchanged."""
|
||
|
for key in self._SETTINGS:
|
||
|
if d[key] != getattr(self, '_'+key): # check against internal "_" value
|
||
|
setattr(self, key, d[key]) # set non "_" value to use properties write function
|
||
|
|
||
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||
|
|
||
|
def __repr__(self):
|
||
|
"""String representation of the current port settings and its state."""
|
||
|
return "%s<id=0x%x, open=%s>(port=%r, baudrate=%r, bytesize=%r, parity=%r, stopbits=%r, timeout=%r, xonxoff=%r, rtscts=%r, dsrdtr=%r)" % (
|
||
|
self.__class__.__name__,
|
||
|
id(self),
|
||
|
self._isOpen,
|
||
|
self.portstr,
|
||
|
self.baudrate,
|
||
|
self.bytesize,
|
||
|
self.parity,
|
||
|
self.stopbits,
|
||
|
self.timeout,
|
||
|
self.xonxoff,
|
||
|
self.rtscts,
|
||
|
self.dsrdtr,
|
||
|
)
|
||
|
|
||
|
|
||
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||
|
# compatibility with io library
|
||
|
|
||
|
def readable(self): return True
|
||
|
def writable(self): return True
|
||
|
def seekable(self): return False
|
||
|
def readinto(self, b):
|
||
|
data = self.read(len(b))
|
||
|
n = len(data)
|
||
|
try:
|
||
|
b[:n] = data
|
||
|
except TypeError, err:
|
||
|
import array
|
||
|
if not isinstance(b, array.array):
|
||
|
raise err
|
||
|
b[:n] = array.array('b', data)
|
||
|
return n
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
import sys
|
||
|
s = SerialBase()
|
||
|
sys.stdout.write('port name: %s\n' % s.portstr)
|
||
|
sys.stdout.write('baud rates: %s\n' % s.getSupportedBaudrates())
|
||
|
sys.stdout.write('byte sizes: %s\n' % s.getSupportedByteSizes())
|
||
|
sys.stdout.write('parities: %s\n' % s.getSupportedParities())
|
||
|
sys.stdout.write('stop bits: %s\n' % s.getSupportedStopbits())
|
||
|
sys.stdout.write('%s\n' % s)
|