Source code for pysamloader.samba

# Copyright (c) 2012-2019 Chintalagiri Shashank
#
# This file is part of pysamloader.

# pysamloader is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pysamloader is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pysamloader.  If not, see <http://www.gnu.org/licenses/>.

import logging
from time import sleep
from serial import Serial

from .samdevice import SAMDevice
from .chipid import SamChipID
from .efcdescriptor import EFCFlashDescriptor
from . import log

logger = logging.getLogger('samba')
log.loggers.append(logger)


[docs]class SamBAConnectionError(Exception): def __init__(self, msg): self.msg = msg
[docs]class SamBAConnection(object): ser = Serial() def __init__(self, port='/dev/ttyUSB1', baud=115200, device=None): """ Opens the serial port for the SAM-BA connection """ self.ser.baudrate = baud self.ser.port = port self.ser.timeout = 1 try: self.ser.open() except: # noqa raise SamBAConnectionError( "Unable to open serial port.\n\ Check your connections and try again.") if not device: self._device = SAMDevice() else: self._device = device() if self.ser.isOpen(): self.make_connection(auto_baud=self._device.AutoBaud) sleep(1)
[docs] def retrieve_response(self): """ Read a response from SAM-BA, delimited by > """ char = '' data = '' while char != '>': data += char char = self.ser.read(1).decode() if not char: self.ser.close() raise SamBAConnectionError( "Read byte timed out on SAM-BA. Check your connections " "and device configuration and retry.") logger.debug("Got response : {0}".format(data.strip())) return data
[docs] def make_connection(self, auto_baud=False): """ Test connection to SAM-BA by reading its version """ logger.debug("Connecting to SAM-BA on {0} at {1}".format( self.ser.port, self.ser.baudrate)) if auto_baud is True: """Auto Baud""" logger.info("Attempting Auto-Baud with SAM-BA") status = 0 while not status: self.ser.write('\x80') self.ser.write('\x80') self.ser.write('#') sleep(0.001) resp = self.ser.read(1).decode() if resp == '>': status = 1 logger.info("SAM-BA Auto-Baud Successful") self.flush_all() self.ser.read(22).decode() sleep(1) self.write_message("V#") sleep(0.01) resp = self.retrieve_response() logger.info("SAM-BA Version : ") logger.info(resp.strip()) if resp: return else: self.ser.close() raise SamBAConnectionError("SAM-BA did not respond to V#")
[docs] def close(self): self.ser.close()
[docs] def flush_all(self): """ Flush serial communication buffers """ self.ser.flushInput() self.ser.flushOutput()
[docs] def write_message(self, msg): if self.ser.isOpen(): self.flush_all() logger.debug("Writing to device : {0}".format(msg.encode())) self.ser.write(msg.encode()) return else: raise IOError("Serial port does not seem to be open!")
[docs] def write_byte(self, address, contents): """ Write 1 byte at a specific address. Both address and contents expected to be character strings """ self.flush_all() logger.debug("Writing byte at {0} : {1}" "".format(address, contents)) self.write_message("O{0},{1}#".format(address, contents)) return self.retrieve_response()
[docs] def write_hword(self, address, contents): """ Write 2 bytes at a specific address. Both address and contents expected to be character strings """ self.flush_all() logger.debug("Writing half word at {0} : {1}" "".format(address, contents)) self.write_message("H{0},{1}#".format(address, contents)) return self.retrieve_response()
[docs] def write_word(self, address, contents): """ Write 4 bytes at a specific address. Both address and contents expected to be character strings """ self.flush_all() logger.debug("Writing word at {0} : {1}" "".format(address, contents)) self.write_message("W{0},{1}#".format(address, contents)) return self.retrieve_response()
[docs] def read_byte(self, address): """ Read 1 byte from a specific address. Both address and returned contents are character strings """ self.flush_all() msg = "o{0},#".format(address) logger.debug("Reading byte with command : {0}".format(msg)) self.write_message(msg) return self.retrieve_response().strip()
[docs] def read_hword(self, address): """ Read 2 bytes from a specific address. Both address and returned contents are character strings """ self.flush_all() msg = "h{0},#".format(address) logger.debug("Reading half word with command : {0}".format(msg)) self.write_message(msg) return self.retrieve_response().strip()
[docs] def read_word(self, address): """ Read 4 bytes from a specific address. Both address and returned contents are character strings """ self.flush_all() msg = "w{0},#".format(address) logger.debug("Reading word with command : {0}".format(msg)) self.write_message(msg) return self.retrieve_response()
[docs] def xm_init_sf(self, address): """ Initialize XMODEM file send to specified address """ self.flush_all() msg = "S{0},#".format(address) logger.debug("Starting send file with command : {0}".format(msg)) self.write_message(msg) _ = self.ser.read(2) return
[docs] def xm_init_rf(self, address, size): """ Initialize XMODEM file read from specified address """ pass
[docs] def xm_getc(self, size, timeout=1): """ getc function for the xmodem protocol """ data = self.ser.read(size) logger.debug("XM_RESP [{0:>3}] : {1}".format(len(data), data)) return data
[docs] def xm_putc(self, data, timeout=1): """ putc function for the xmodem protocol """ logger.debug("XM_SEND [{0:>3}] : {1}".format(len(data), data)) self.ser.write(data) return len(data)
[docs] def efc_wready(self): """ Wait for EFC to report ready """ status = self.efc_rstat() while not status: logger.debug("Waiting for EFC") sleep(0.01) status = self.efc_rstat() return
[docs] def efc_readfrr(self): return self.read_word(self._device.EFC_FRR)
[docs] def efc_readfmr(self): return self.read_word(self._device.EFC_FMR)
[docs] def efc_setfmr(self, mode): return self.write_word(self._device.EFC_FMR, mode)
[docs] def efc_ewp(self, pno): """ EFC trigger write page. Pno is an integer """ self.write_word(self._device.EFC_FCR, '5A{0}{1}'.format( hex(pno)[2:].zfill(4), self._device.WPC ))
[docs] def efc_rstat(self): """ Read EFC status. Returns True if EFC is ready, False if busy. """ efc_status = self.read_word(self._device.EFC_FSR).strip() logger.debug("EFC Status : {0}".format(efc_status)) return efc_status[9] == "1"
[docs] def efc_cleargpnvm(self, bno): """ EFC Fucntion to clear specified GPNVM bit. bno is an integer """ self.efc_wready() self.write_word(self._device.EFC_FCR, '5A{0}{1}'.format( hex(bno)[2:].zfill(4), self._device.CGPB_CMD )) self.efc_wready()
[docs] def efc_setgpnvm(self, bno): """ EFC Fucntion to set specified GPNVM bit. bno is an integer """ self.efc_wready() self.write_word(self._device.EFC_FCR, '5A{0}{1}'.format( hex(bno)[2:].zfill(4), self._device.SGPB_CMD )) self.efc_wready() return
[docs] def efc_eraseall(self): """ EFC Function to Erase All """ self.efc_wready() self.write_word(self._device.EFC_FCR, '5A0000{0}'.format(self._device.EAC)) self.efc_wready()
[docs] def getchipid(self): cidr = self.read_word(self._device.CHIPID_CIDR).strip() exid = self.read_word(self._device.CHIPID_EXID).strip() return SamChipID(cidr, exid)
[docs] def efc_getflashdescriptor(self): self.efc_wready() self.write_word(self._device.EFC_FCR, '5A0000{0}'.format(self._device.GD_CMD)) self.efc_wready() return EFCFlashDescriptor(self)
[docs] def efc_getuid(self): self.efc_wready() self.write_word(self._device.EFC_FCR, '5A0000{0}'.format(self._device.STUI_CMD)) uid = "" for i in range(4): uid += self.read_word(hex(0x80000 + i * 4)[2:]).strip()[2:] self.write_word(self._device.EFC_FCR, '5A0000{0}'.format(self._device.SPUI_CMD)) self.efc_wready() return uid