From: pete-cs Date: Fri, 14 Jun 2013 23:05:07 +0000 (+0000) Subject: new maxusb library X-Git-Url: http://git.rot13.org/?p=goodfet;a=commitdiff_plain;h=96ebf28fac2e03cfee954db85282eac7b8ec8015;hp=78533a51ab5421601b046a917dd0f6f01a402a49 new maxusb library git-svn-id: https://svn.code.sf.net/p/goodfet/code/trunk@1596 12e2690d-a6be-4b82-a7b7-67c4a43b65c8 --- diff --git a/client/Facedancer.py b/client/Facedancer.py new file mode 100644 index 0000000..a0cfc19 --- /dev/null +++ b/client/Facedancer.py @@ -0,0 +1,208 @@ +# Facedancer.py +# +# Contains class definitions for Facedancer, FacedancerCommand, FacedancerApp, +# and GoodFETMonitorApp. + +from util import * + +class Facedancer: + def __init__(self, serialport, verbose=0): + self.serialport = serialport + self.verbose = verbose + + self.reset() + self.monitor_app = GoodFETMonitorApp(self, verbose=self.verbose) + self.monitor_app.announce_connected() + + def halt(self): + self.serialport.setRTS(1) + self.serialport.setDTR(1) + + def reset(self): + if self.verbose > 1: + print("Facedancer resetting...") + + self.halt() + self.serialport.setDTR(0) + + c = self.readcmd() + + if self.verbose > 0: + print("Facedancer reset") + + def read(self, n): + """Read raw bytes.""" + + b = self.serialport.read(n) + + if self.verbose > 3: + print("Facedancer received", len(b), "bytes;", + self.serialport.inWaiting(), "bytes remaining") + + if self.verbose > 2: + print("Facedancer Rx:", bytes_as_hex(b)) + + return b + + def readcmd(self): + """Read a single command.""" + + b = self.read(4) + + app = b[0] + verb = b[1] + n = b[2] + (b[3] << 8) + + if n > 0: + data = self.read(n) + else: + data = b'' + + if len(data) != n: + raise ValueError('Facedancer expected ' + str(n) \ + + ' bytes but received only ' + str(len(data))) + + cmd = FacedancerCommand(app, verb, data) + + if self.verbose > 1: + print("Facedancer Rx command:", cmd) + + return cmd + + def write(self, b): + """Write raw bytes.""" + + if self.verbose > 2: + print("Facedancer Tx:", bytes_as_hex(b)) + + self.serialport.write(b) + + def writecmd(self, c): + """Write a single command.""" + self.write(c.as_bytestring()) + + if self.verbose > 1: + print("Facedancer Tx command:", c) + + +class FacedancerCommand: + def __init__(self, app=None, verb=None, data=None): + self.app = app + self.verb = verb + self.data = data + + def __str__(self): + s = "app 0x%02x, verb 0x%02x, len %d" % (self.app, self.verb, + len(self.data)) + + if len(self.data) > 0: + s += ", data " + bytes_as_hex(self.data) + + return s + + def long_string(self): + s = "app: " + str(self.app) + "\n" \ + + "verb: " + str(self.verb) + "\n" \ + + "len: " + str(len(self.data)) + + if len(self.data) > 0: + try: + s += "\n" + self.data.decode("utf-8") + except UnicodeDecodeError: + s += "\n" + bytes_as_hex(self.data) + + return s + + def as_bytestring(self): + n = len(self.data) + + b = bytearray(n + 4) + b[0] = self.app + b[1] = self.verb + b[2] = n & 0xff + b[3] = n >> 8 + b[4:] = self.data + + return b + + +class FacedancerApp: + app_name = "override this" + app_num = 0x00 + + def __init__(self, device, verbose=0): + self.device = device + self.verbose = verbose + + self.init_commands() + + if self.verbose > 0: + print(self.app_name, "initialized") + + def init_commands(self): + pass + + def enable(self): + for i in range(3): + self.device.writecmd(self.enable_app_cmd) + self.device.readcmd() + + if self.verbose > 0: + print(self.app_name, "enabled") + + +class GoodFETMonitorApp(FacedancerApp): + app_name = "GoodFET monitor" + app_num = 0x00 + + def read_byte(self, addr): + d = [ addr & 0xff, addr >> 8 ] + cmd = FacedancerCommand(0, 2, d) + + self.device.writecmd(cmd) + resp = self.device.readcmd() + + return resp.data[0] + + def get_infostring(self): + return bytes([ self.read_byte(0xff0), self.read_byte(0xff1) ]) + + def get_clocking(self): + return bytes([ self.read_byte(0x57), self.read_byte(0x56) ]) + + def print_info(self): + infostring = self.get_infostring() + clocking = self.get_clocking() + + print("MCU", bytes_as_hex(infostring, delim="")) + print("clocked at", bytes_as_hex(clocking, delim="")) + + def list_apps(self): + cmd = FacedancerCommand(self.app_num, 0x82, b'0x0') + self.device.writecmd(cmd) + + resp = self.device.readcmd() + print("build date:", resp.data.decode("utf-8")) + + print("firmware apps:") + while True: + resp = self.device.readcmd() + if len(resp.data) == 0: + break + print(resp.data.decode("utf-8")) + + def echo(self, s): + b = bytes(s, encoding="utf-8") + + cmd = FacedancerCommand(self.app_num, 0x81, b) + self.device.writecmd(cmd) + + resp = self.device.readcmd() + + return resp.data == b + + def announce_connected(self): + cmd = FacedancerCommand(self.app_num, 0xb1, b'') + self.device.writecmd(cmd) + resp = self.device.readcmd() + diff --git a/client/GoodFETMAXUSB.py b/client/GoodFETMAXUSB.py index c20a748..b918dd0 100644 --- a/client/GoodFETMAXUSB.py +++ b/client/GoodFETMAXUSB.py @@ -7,9 +7,14 @@ import sys, time, string, cStringIO, struct, glob, os; +import warnings from GoodFET import GoodFET; +warnings.warn( +"""This library will soon be deprecated in favor of the USB*.py libraries.""" +) + #Handy registers. rEP0FIFO=0 rEP1OUTFIFO=1 diff --git a/client/MAXUSBApp.py b/client/MAXUSBApp.py new file mode 100644 index 0000000..335c597 --- /dev/null +++ b/client/MAXUSBApp.py @@ -0,0 +1,217 @@ +# MAXUSBApp.py +# +# Contains class definition for MAXUSBApp. + +from util import * +from Facedancer import * +from USB import * +from USBDevice import USBDeviceRequest + +class MAXUSBApp(FacedancerApp): + app_name = "MAXUSB" + app_num = 0x40 + + reg_ep0_fifo = 0x00 + reg_ep1_out_fifo = 0x01 + reg_ep2_in_fifo = 0x02 + reg_ep3_in_fifo = 0x03 + reg_setup_data_fifo = 0x04 + reg_ep0_byte_count = 0x05 + reg_ep1_out_byte_count = 0x06 + reg_ep2_in_byte_count = 0x07 + reg_ep3_in_byte_count = 0x08 + reg_ep_stalls = 0x09 + reg_clr_togs = 0x0a + reg_endpoint_irq = 0x0b + reg_endpoint_interrupt_enable = 0x0c + reg_usb_irq = 0x0d + reg_usb_interrupt_enable = 0x0e + reg_usb_control = 0x0f + reg_cpu_control = 0x10 + reg_pin_control = 0x11 + reg_revision = 0x12 + reg_function_address = 0x13 + reg_io_pins = 0x14 + + # bitmask values for reg_endpoint_irq = 0x0b + is_setup_data_avail = 0x20 # SUDAVIRQ + is_in3_buffer_avail = 0x10 # IN3BAVIRQ + is_in2_buffer_avail = 0x08 # IN2BAVIRQ + is_out1_data_avail = 0x04 # OUT1DAVIRQ + is_out0_data_avail = 0x02 # OUT0DAVIRQ + is_in0_buffer_avail = 0x01 # IN0BAVIRQ + + # bitmask values for reg_usb_control = 0x0f + usb_control_vbgate = 0x40 + usb_control_connect = 0x08 + + # bitmask values for reg_pin_control = 0x11 + interrupt_level = 0x08 + full_duplex = 0x10 + + def __init__(self, device, verbose=0): + FacedancerApp.__init__(self, device, verbose) + + self.connected_device = None + + self.enable() + + if verbose > 0: + rev = self.read_register(self.reg_revision) + print(self.app_name, "revision", rev) + + # set duplex and negative INT level (from GoodFEDMAXUSB.py) + self.write_register(self.reg_pin_control, + self.full_duplex | self.interrupt_level) + + def init_commands(self): + self.read_register_cmd = FacedancerCommand(self.app_num, 0x00, b'') + self.write_register_cmd = FacedancerCommand(self.app_num, 0x00, b'') + self.enable_app_cmd = FacedancerCommand(self.app_num, 0x10, b'') + self.ack_cmd = FacedancerCommand(self.app_num, 0x00, b'\x01') + + def read_register(self, reg_num, ack=False): + if self.verbose > 1: + print(self.app_name, "reading register 0x%02x" % reg_num) + + self.read_register_cmd.data = bytearray([ reg_num << 3, 0 ]) + if ack: + self.write_register_cmd.data[0] |= 1 + + self.device.writecmd(self.read_register_cmd) + + resp = self.device.readcmd() + + if self.verbose > 2: + print(self.app_name, "read register 0x%02x has value 0x%02x" % + (reg_num, resp.data[1])) + + return resp.data[1] + + def write_register(self, reg_num, value, ack=False): + if self.verbose > 2: + print(self.app_name, "writing register 0x%02x with value 0x%02x" % + (reg_num, value)) + + self.write_register_cmd.data = bytearray([ (reg_num << 3) | 2, value ]) + if ack: + self.write_register_cmd.data[0] |= 1 + + self.device.writecmd(self.write_register_cmd) + self.device.readcmd() + + def get_version(self): + return self.read_register(self.reg_revision) + + def ack_status_stage(self): + if self.verbose > 5: + print(self.app_name, "sending ack!") + + self.device.writecmd(self.ack_cmd) + self.device.readcmd() + + def connect(self, usb_device): + self.write_register(self.reg_usb_control, self.usb_control_vbgate | + self.usb_control_connect) + + self.connected_device = usb_device + + if self.verbose > 0: + print(self.app_name, "connected device", self.connected_device.name) + + def disconnect(self): + self.write_register(self.reg_usb_control, self.usb_control_vbgate) + + if self.verbose > 0: + print(self.app_name, "disconnected device", self.connected_device.name) + self.connected_device = None + + def clear_irq_bit(self, reg, bit): + self.write_register(reg, bit) + + def read_bytes(self, reg, n): + if self.verbose > 2: + print(self.app_name, "reading", n, "bytes from register", reg) + + data = bytes([ (reg << 3) ] + ([0] * n)) + cmd = FacedancerCommand(self.app_num, 0x00, data) + + self.device.writecmd(cmd) + resp = self.device.readcmd() + + if self.verbose > 3: + print(self.app_name, "read", len(resp.data) - 1, "bytes from register", reg) + + return resp.data[1:] + + def write_bytes(self, reg, data): + data = bytes([ (reg << 3) | 3 ]) + data + cmd = FacedancerCommand(self.app_num, 0x00, data) + + self.device.writecmd(cmd) + self.device.readcmd() # null response + + if self.verbose > 3: + print(self.app_name, "wrote", len(data) - 1, "bytes to register", reg) + + # HACK: but given the limitations of the MAX chips, it seems necessary + def send_on_endpoint(self, ep_num, data): + if ep_num == 0: + fifo_reg = self.reg_ep0_fifo + bc_reg = self.reg_ep0_byte_count + elif ep_num == 2: + fifo_reg = self.reg_ep2_in_fifo + bc_reg = self.reg_ep2_in_byte_count + elif ep_num == 3: + fifo_reg = self.reg_ep3_in_fifo + bc_reg = self.reg_ep3_in_byte_count + else: + raise ValueError('endpoint ' + str(ep_num) + ' not supported') + + self.write_bytes(fifo_reg, data) + self.write_register(bc_reg, len(data), ack=True) + + if self.verbose > 1: + print(self.app_name, "wrote", bytes_as_hex(data), "to endpoint", + ep_num) + + # TODO + def read_from_endpoint(self, ep_num): + pass + + def stall_ep0(self): + if self.verbose > 0: + print(self.app_name, "stalling endpoint 0") + + self.write_register(self.reg_ep_stalls, 0x23) + + def service_irqs(self): + while True: + irq = self.read_register(self.reg_endpoint_irq) + + if self.verbose > 3: + print(self.app_name, "read endpoint irq: 0x%02x" % irq) + + if self.verbose > 2: + if irq & ~ (self.is_in0_buffer_avail \ + | self.is_in2_buffer_avail | self.is_in3_buffer_avail): + print(self.app_name, "notable irq: 0x%02x" % irq) + + if irq & self.is_setup_data_avail: + self.clear_irq_bit(self.reg_endpoint_irq, self.is_setup_data_avail) + + b = self.read_bytes(self.reg_setup_data_fifo, 8) + req = USBDeviceRequest(b) + self.connected_device.handle_request(req) + + if irq & self.is_out1_data_avail: + data = b'' # TODO: read from out1 + self.connected_device.handle_data_available(1, data) + self.clear_irq_bit(self.reg_endpoint_irq, self.is_out1_data_avail) + + if irq & self.is_in2_buffer_avail: + self.connected_device.handle_buffer_available(2) + + if irq & self.is_in3_buffer_avail: + self.connected_device.handle_buffer_available(3) + diff --git a/client/USB.py b/client/USB.py new file mode 100644 index 0000000..552b036 --- /dev/null +++ b/client/USB.py @@ -0,0 +1,53 @@ +# USB.py +# +# Contains definition of USB class, which is just a container for a bunch of +# constants/enums associated with the USB protocol. +# +# TODO: would be nice if this module could re-export the other USB* classes so +# one need import only USB to get all the functionality + +class USB: + state_detached = 0 + state_attached = 1 + state_powered = 2 + state_default = 3 + state_address = 4 + state_configured = 5 + state_suspended = 6 + + request_direction_host_to_device = 0 + request_direction_device_to_host = 1 + + request_type_standard = 0 + request_type_class = 1 + request_type_vendor = 2 + + request_recipient_device = 0 + request_recipient_interface = 1 + request_recipient_endpoint = 2 + request_recipient_other = 3 + + feature_endpoint_halt = 0 + feature_device_remote_wakeup = 1 + feature_test_mode = 2 + + desc_type_device = 1 + desc_type_configuration = 2 + desc_type_string = 3 + desc_type_interface = 4 + desc_type_endpoint = 5 + desc_type_device_qualifier = 6 + desc_type_other_speed_configuration = 7 + desc_type_interface_power = 8 + desc_type_hid = 33 + desc_type_report = 34 + + # while this holds for HID, it may not be a correct model for the USB + # ecosystem at large + if_class_to_desc_type = { + 3 : desc_type_hid + } + + def interface_class_to_descriptor_type(interface_class): + return USB.if_class_to_desc_type[interface_class] + diff --git a/client/USBConfiguration.py b/client/USBConfiguration.py new file mode 100644 index 0000000..cadf106 --- /dev/null +++ b/client/USBConfiguration.py @@ -0,0 +1,33 @@ +# USBConfiguration.py +# +# Contains class definition for USBConfiguration. + +class USBConfiguration: + def __init__(self, configuration_index, configuration_string, interfaces): + self.configuration_index = configuration_index + self.configuration_string = configuration_string + self.configuration_string_index = 0 + self.interfaces = interfaces + + def set_configuration_string_index(self, i): + self.configuration_string_index = i + + def get_descriptor(self): + interface_descriptors = bytearray() + for i in self.interfaces: + interface_descriptors += i.get_descriptor() + + total_len = len(interface_descriptors) + 7 + + d = bytes([ + 7, # length of descriptor in bytes + 2, # descriptor type 2 == configuration + total_len & 0xff, + (total_len >> 8) & 0xff, + len(self.interfaces), + self.configuration_index, + self.configuration_string_index + ]) + + return d + interface_descriptors + diff --git a/client/USBDevice.py b/client/USBDevice.py new file mode 100644 index 0000000..84e8b1f --- /dev/null +++ b/client/USBDevice.py @@ -0,0 +1,320 @@ +# USBDevice.py +# +# Contains class definitions for USBDevice and USBDeviceRequest. + +from USB import * + +class USBDevice: + name = "generic device" + + def __init__(self, maxusb_app, device_class, device_subclass, + protocol_rel_num, max_packet_size_ep0, vendor_id, product_id, + device_rev, manufacturer_string, product_string, + serial_number_string, configurations=[], descriptors={}, + verbose=0): + self.maxusb_app = maxusb_app + self.verbose = verbose + + self.strings = [ ] + + self.usb_spec_version = 0x0001 + self.device_class = device_class + self.device_subclass = device_subclass + self.protocol_rel_num = protocol_rel_num + self.max_packet_size_ep0 = max_packet_size_ep0 + self.vendor_id = vendor_id + self.product_id = product_id + self.device_rev = device_rev + self.manufacturer_string_id = self.get_string_id(manufacturer_string) + self.product_string_id = self.get_string_id(product_string) + self.serial_number_string_id = self.get_string_id(serial_number_string) + + # maps from USB.desc_type_* to bytearray OR callable + self.descriptors = descriptors + self.descriptors[USB.desc_type_device] = self.get_descriptor + self.descriptors[USB.desc_type_configuration] = self.handle_get_configuration_descriptor_request + self.descriptors[USB.desc_type_string] = self.handle_get_string_descriptor_request + + self.config_num = -1 + self.configuration = None + self.configurations = configurations + + for c in self.configurations: + csi = self.get_string_id(c.configuration_string) + c.set_configuration_string_index(csi) + + for i in c.interfaces: + i.device = self + + self.state = USB.state_detached + self.ready = False + + self.address = 0 + + self.setup_request_handlers() + + def get_string_id(self, s): + try: + i = self.strings.index(s) + except ValueError: + # string descriptors start at index 1 + self.strings.append(s) + i = len(self.strings) + + return i + + def setup_request_handlers(self): + # see table 9-4 of USB 2.0 spec, page 279 + self.request_handlers = { + 0 : self.handle_get_status_request, + 1 : self.handle_clear_feature_request, + 3 : self.handle_set_feature_request, + 5 : self.handle_set_address_request, + 6 : self.handle_get_descriptor_request, + 7 : self.handle_set_descriptor_request, + 8 : self.handle_get_configuration_request, + 9 : self.handle_set_configuration_request, + 10 : self.handle_get_interface_request, + 11 : self.handle_set_interface_request, + 12 : self.handle_synch_frame_request + } + + def connect(self): + self.maxusb_app.connect(self) + + # skipping USB.state_attached may not be strictly correct (9.1.1.{1,2}) + self.state = USB.state_powered + + def disconnect(self): + self.maxusb_app.disconnect() + + self.state = USB.state_detached + + def run(self): + self.maxusb_app.service_irqs() + + def ack_status_stage(self): + self.maxusb_app.ack_status_stage() + + def get_descriptor(self, n): + d = bytearray([ + 18, # length of descriptor in bytes + 1, # descriptor type 1 == device + (self.usb_spec_version >> 8) & 0xff, + self.usb_spec_version & 0xff, + self.device_class, + self.device_subclass, + self.protocol_rel_num, + self.max_packet_size_ep0, + (self.vendor_id >> 8) & 0xff, + self.vendor_id & 0xff, + (self.product_id >> 8) & 0xff, + self.product_id & 0xff, + (self.device_rev >> 8) & 0xff, + self.device_rev & 0xff, + self.manufacturer_string_id, + self.product_string_id, + self.serial_number_string_id, + len(self.configurations) + ]) + + return d + + # IRQ handlers + ##################################################### + + def handle_request(self, req): + if self.verbose > 3: + print(self.name, "received request", req) + + # figure out the intended recipient + if req.get_recipient() == USB.request_recipient_device: + recipient = self + elif req.get_recipient() == USB.request_recipient_interface: + recipient = self.configuration.interfaces[req.index] + elif req.get_recipient() == USB.request_recipient_endpoint: + recipient = self.configuration.endpoints[req.index] + + # and then the type + if req.get_type() == USB.request_type_standard: + handler = recipient.request_handlers[req.request] + handler(req) + elif req.get_type() == USB.request_type_class: + # HACK: evidently, FreeBSD doesn't pay attention to the device + # until it sends a GET_STATUS(class) message + self.ready = True + self.maxusb_app.stall_ep0() + elif req.get_type() == USB.request_type_vendor: + self.maxusb_app.stall_ep0() + + def handle_data_available(self, ep_num, data): + if self.ready and ep_num in self.endpoints: + endpoint = self.endpoints[ep_num] + endpoint.handler(data) + + def handle_buffer_available(self, ep_num): + if self.ready and ep_num in self.endpoints: + endpoint = self.endpoints[ep_num] + endpoint.handler() + + # standard request handlers + ##################################################### + + # USB 2.0 specification, section 9.4.5 (p 282 of pdf) + def handle_get_status_request(self, req): + print(self.name, "received GET_STATUS request") + + # self-powered and remote-wakeup (USB 2.0 Spec section 9.4.5) + response = b'\x03\x00' + self.maxusb_app.send_on_endpoint(0, response) + + # USB 2.0 specification, section 9.4.1 (p 280 of pdf) + def handle_clear_feature_request(self, req): + print(self.name, "received CLEAR_FEATURE request with type 0x%02x and value 0x%02x" \ + % (req.request_type, req.value)) + + # USB 2.0 specification, section 9.4.9 (p 286 of pdf) + def handle_set_feature_request(self, req): + print(self.name, "received SET_FEATURE request") + + # USB 2.0 specification, section 9.4.6 (p 284 of pdf) + def handle_set_address_request(self, req): + self.address = req.value + self.state = USB.state_address + self.ack_status_stage() + + if self.verbose > 2: + print(self.name, "received SET_ADDRESS request for address", + self.address) + + # USB 2.0 specification, section 9.4.3 (p 281 of pdf) + def handle_get_descriptor_request(self, req): + dtype = (req.value >> 8) & 0xff + dindex = req.value & 0xff + lang = req.index + n = req.length + + response = None + + if self.verbose > 2: + print(self.name, ("received GET_DESCRIPTOR req %d, index %d, " \ + + "language 0x%04x, length %d") \ + % (dtype, dindex, lang, n)) + + # TODO: handle KeyError + response = self.descriptors[dtype] + if callable(response): + response = response(dindex) + + if response: + n = min(n, len(response)) + self.maxusb_app.verbose += 1 + self.maxusb_app.send_on_endpoint(0, response[:n]) + self.maxusb_app.verbose -= 1 + + if self.verbose > 5: + print(self.name, "sent", n, "bytes in response") + + def handle_get_configuration_descriptor_request(self, num): + return self.configurations[num].get_descriptor() + + def handle_get_string_descriptor_request(self, num): + if num == 0: + # HACK: hard-coding baaaaad + d = bytes([ + 4, # length of descriptor in bytes + 3, # descriptor type 3 == string + 9, # language code 0, byte 0 + 4 # language code 0, byte 1 + ]) + else: + # string descriptors start at 1 + s = self.strings[num-1].encode('utf-16') + d = bytearray([ + len(s) + 2, # length of descriptor in bytes + 3 # descriptor type 3 == string + ]) + d += s + + return d + + # USB 2.0 specification, section 9.4.8 (p 285 of pdf) + def handle_set_descriptor_request(self, req): + print(self.name, "received SET_DESCRIPTOR request") + + # USB 2.0 specification, section 9.4.2 (p 281 of pdf) + def handle_get_configuration_request(self, req): + print(self.name, "received GET_CONFIGURATION request with data 0x%02x" \ + % req.data) + + # USB 2.0 specification, section 9.4.7 (p 285 of pdf) + def handle_set_configuration_request(self, req): + print(self.name, "received SET_CONFIGURATION request") + + # configs are one-based + self.config_num = req.value - 1 + self.configuration = self.configurations[self.config_num] + self.state = USB.state_configured + + # collate endpoint numbers + self.endpoints = { } + for i in self.configuration.interfaces: + for e in i.endpoints: + self.endpoints[e.number] = e + + # HACK: blindly acknowledge request + self.ack_status_stage() + + # USB 2.0 specification, section 9.4.4 (p 282 of pdf) + def handle_get_interface_request(self, req): + print(self.name, "received GET_INTERFACE request") + + if req.index == 0: + # HACK: currently only support one interface + self.maxusb_app.send_on_endpoint(0, b'\x00') + else: + self.maxusb_app.stall_ep0() + + # USB 2.0 specification, section 9.4.10 (p 288 of pdf) + def handle_set_interface_request(self, req): + print(self.name, "received SET_INTERFACE request") + + # USB 2.0 specification, section 9.4.11 (p 288 of pdf) + def handle_synch_frame_request(self, req): + print(self.name, "received SYNCH_FRAME request") + + +class USBDeviceRequest: + def __init__(self, raw_bytes): + """Expects raw 8-byte setup data request packet""" + + self.request_type = raw_bytes[0] + self.request = raw_bytes[1] + self.value = (raw_bytes[3] << 8) | raw_bytes[2] + self.index = (raw_bytes[5] << 8) | raw_bytes[4] + self.length = (raw_bytes[7] << 8) | raw_bytes[6] + + def __str__(self): + s = "dir=%d, type=%d, rec=%d, r=%d, v=%d, i=%d, l=%d" \ + % (self.get_direction(), self.get_type(), self.get_recipient(), + self.request, self.value, self.index, self.length) + return s + + def raw(self): + """returns request as bytes""" + b = bytes([ self.request_type, self.request, + (self.value >> 8) & 0xff, self.value & 0xff, + (self.index >> 8) & 0xff, self.index & 0xff, + (self.length >> 8) & 0xff, self.length & 0xff + ]) + return b + + def get_direction(self): + return (self.request_type >> 7) & 0x01 + + def get_type(self): + return (self.request_type >> 5) & 0x03 + + def get_recipient(self): + return self.request_type & 0x1f + diff --git a/client/USBEndpoint.py b/client/USBEndpoint.py new file mode 100644 index 0000000..a0d1c57 --- /dev/null +++ b/client/USBEndpoint.py @@ -0,0 +1,53 @@ +# USBEndpoint.py +# +# Contains class definition for USBEndpoint. + +class USBEndpoint: + direction_out = 0x00 + direction_in = 0x01 + + transfer_type_control = 0x00 + transfer_type_isochronous = 0x01 + transfer_type_bulk = 0x02 + transfer_type_interrupt = 0x03 + + sync_type_none = 0x00 + sync_type_async = 0x01 + sync_type_adaptive = 0x02 + sync_type_synchronous = 0x03 + + usage_type_data = 0x00 + usage_type_feedback = 0x01 + usage_type_implicit_feedback = 0x02 + + def __init__(self, number, direction, transfer_type, sync_type, + usage_type, max_packet_size, interval, handler): + + self.number = number + self.direction = direction + self.transfer_type = transfer_type + self.sync_type = sync_type + self.usage_type = usage_type + self.max_packet_size = max_packet_size + self.interval = interval + self.handler = handler + + # see Table 9-13 of USB 2.0 spec (pdf page 297) + def get_descriptor(self): + address = (self.number & 0x0f) | (self.direction << 7) + attributes = (self.transfer_type & 0x03) \ + | ((self.sync_type & 0x03) << 2) \ + | ((self.usage_type & 0x03) << 4) + + d = bytearray([ + 7, # length of descriptor in bytes + 5, # descriptor type 5 == endpoint + address, + attributes, + (self.max_packet_size >> 8) & 0xff, + self.max_packet_size & 0xff, + self.interval + ]) + + return d + diff --git a/client/USBInterface.py b/client/USBInterface.py new file mode 100644 index 0000000..0cedd6b --- /dev/null +++ b/client/USBInterface.py @@ -0,0 +1,82 @@ +# USBInterface.py +# +# Contains class definition for USBInterface. + +from USB import * + +class USBInterface: + name = "generic USB interface" + + def __init__(self, interface_number, interface_alternate, interface_class, + interface_subclass, interface_protocol, interface_string_index, + verbose=0, endpoints=[], descriptors={}): + + self.number = interface_number + self.alternate = interface_alternate + self.iclass = interface_class + self.subclass = interface_subclass + self.protocol = interface_protocol + self.string_index = interface_string_index + + self.endpoints = endpoints + self.descriptors = descriptors + + self.verbose = verbose + + self.descriptors[USB.desc_type_interface] = self.get_descriptor + + self.request_handlers = { + 6 : self.handle_get_descriptor_request + } + + # USB 2.0 specification, section 9.4.3 (p 281 of pdf) + # HACK: blatant copypasta from USBDevice pains me deeply + def handle_get_descriptor_request(self, req): + dtype = (req.value >> 8) & 0xff + dindex = req.value & 0xff + lang = req.index + n = req.length + + response = None + + if self.verbose > 2: + print(self.name, ("received GET_DESCRIPTOR req %d, index %d, " \ + + "language 0x%04x, length %d") \ + % (dtype, dindex, lang, n)) + + # TODO: handle KeyError + response = self.descriptors[dtype] + if callable(response): + response = response(dindex) + + if response: + n = min(n, len(response)) + self.device.maxusb_app.send_on_endpoint(0, response[:n]) + + if self.verbose > 5: + print(self.name, "sent", n, "bytes in response") + + # Table 9-12 of USB 2.0 spec (pdf page 296) + def get_descriptor(self): + + d = bytearray([ + 9, # length of descriptor in bytes + 4, # descriptor type 4 == interface + self.number, + self.alternate, + len(self.endpoints), + self.iclass, + self.subclass, + self.protocol, + self.string_index + ]) + + if self.iclass: + iclass_desc_num = USB.interface_class_to_descriptor_type(self.iclass) + d += self.descriptors[iclass_desc_num] + + for e in self.endpoints: + d += e.get_descriptor() + + return d + diff --git a/client/USBKeyboard.py b/client/USBKeyboard.py new file mode 100644 index 0000000..64f6ec0 --- /dev/null +++ b/client/USBKeyboard.py @@ -0,0 +1,93 @@ +# USBKeyboard.py +# +# Contains class definitions to implement a USB keyboard. + +from USB import * +from USBDevice import * +from USBConfiguration import * +from USBInterface import * +from USBEndpoint import * + +class USBKeyboardInterface(USBInterface): + name = "USB keyboard interface" + + hid_descriptor = b'\x09\x21\x10\x01\x00\x01\x22\x2b\x00' + report_descriptor = b'\x05\x01\x09\x06\xA1\x01\x05\x07\x19\xE0\x29\xE7\x15\x00\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x01\x19\x00\x29\x65\x15\x00\x25\x65\x75\x08\x95\x01\x81\x00\xC0' + + def __init__(self, verbose=0): + descriptors = { + USB.desc_type_hid : self.hid_descriptor, + USB.desc_type_report : self.report_descriptor + } + + endpoint = USBEndpoint( + 3, # endpoint number + USBEndpoint.direction_in, + USBEndpoint.transfer_type_interrupt, + USBEndpoint.sync_type_none, + USBEndpoint.usage_type_data, + 16384, # max packet size + 10, # polling interval, see USB 2.0 spec Table 9-13 + self.handle_buffer_available # handler function + ) + + # TODO: un-hardcode string index (last arg before "verbose") + USBInterface.__init__( + self, + 0, # interface number + 0, # alternate setting + 3, # interface class + 0, # subclass + 0, # protocol + 0, # string index + verbose, + [ endpoint ], + descriptors + ) + + # "ls" + self.text = [ chr(x) for x in [ 0x0f, 0x16, 0x28, 0x00 ] ] + + def handle_buffer_available(self): + if not self.text: + return + + letter = self.text.pop(0) + self.type_letter(letter) + + def type_letter(self, letter, modifiers=0): + data = bytes([ 0, 0, ord(letter) ]) + + if self.verbose > 2: + print(self.name, "sending keypress 0x%02x" % ord(letter)) + + self.device.maxusb_app.send_on_endpoint(3, data) + + +class USBKeyboardDevice(USBDevice): + name = "USB keyboard device" + + def __init__(self, maxusb_app, verbose=0): + config = USBConfiguration( + 1, # index + "Maxim Emulated Keyboard Configuration", # string desc + [ USBKeyboardInterface() ] # interfaces + ) + + USBDevice.__init__( + self, + maxusb_app, + 0, # device class + 0, # device subclass + 0, # protocol release number + 64, # max packet size for endpoint 0 + 0x610b, # vendor id + 0x4653, # product id + 0x3412, # device revision + "Maxim", # manufacturer string + "MAX3420E Enum Code", # product string + "S/N3420E", # serial number string + [ config ], + verbose=verbose + ) + diff --git a/client/facedancer-keyboard.py b/client/facedancer-keyboard.py new file mode 100755 index 0000000..85c8678 --- /dev/null +++ b/client/facedancer-keyboard.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# +# usb-test.py + +from serial import Serial, PARITY_NONE + +from Facedancer import * +from MAXUSBApp import * +from USBKeyboard import * + +sp = Serial("/dev/ttyUSB0", 115200, parity=PARITY_NONE, timeout=2) +fd = Facedancer(sp, verbose=1) +u = MAXUSBApp(fd, verbose=1) + +d = USBKeyboardDevice(u, verbose=4) + +d.connect() + +try: + d.run() +# SIGINT raises KeyboardInterrupt +except KeyboardInterrupt: + d.disconnect() + diff --git a/client/facedancer-monitor.py b/client/facedancer-monitor.py new file mode 100755 index 0000000..40b9a5e --- /dev/null +++ b/client/facedancer-monitor.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# +# monitor-test.py + +from serial import Serial, PARITY_NONE + +from Facedancer import * + +sp = Serial("/dev/ttyUSB0", 115200, parity=PARITY_NONE, timeout=1) +fd = Facedancer(sp) + +fd.monitor_app.print_info() +fd.monitor_app.list_apps() + +res = fd.monitor_app.echo("I am the very model of a modern major general.") + +if res == 0: + print("echo failed") +else: + print("echo succeeded") + diff --git a/client/goodfet.maxusb b/client/goodfet.maxusb index 5b7372c..87b1bcd 100755 --- a/client/goodfet.maxusb +++ b/client/goodfet.maxusb @@ -6,9 +6,16 @@ import sys; import binascii; import array; +import warnings from GoodFETMAXUSB import GoodFETMAXUSB; +warnings.warn( +"""The libraries upon which this program depends will soon be deprecated in +favor of the USB*.py libraries. See facedancer-monitor.py for an example of +this program written using the new libraries.""" +) + if(len(sys.argv)==1): print "Usage: %s verb [objects]\n" % sys.argv[0]; print "%s info" % sys.argv[0]; diff --git a/client/goodfet.maxusbhid b/client/goodfet.maxusbhid index cc355f7..4396b1e 100755 --- a/client/goodfet.maxusbhid +++ b/client/goodfet.maxusbhid @@ -6,9 +6,16 @@ import sys; import binascii; import array; +import warnings from GoodFETMAXUSB import GoodFETMAXUSBHID; +warnings.warn( +"""The libraries upon which this program depends will soon be deprecated in +favor of the USB*.py libraries. See facedancer-keyboard.py for an example of +this program written using the new libraries.""" +) + #Initialize FET and set baud rate client=GoodFETMAXUSBHID(); client.serInit() diff --git a/client/goodfet.maxusbhost b/client/goodfet.maxusbhost index 9058f06..feba1e1 100755 --- a/client/goodfet.maxusbhost +++ b/client/goodfet.maxusbhost @@ -7,9 +7,16 @@ import sys; import binascii; import array; import time; +import warnings from GoodFETMAXUSB import GoodFETMAXUSBHost; +warnings.warn( +"""The libraries upon which this program depends will soon be deprecated in +favor of the USB*.py libraries. The new libraries do not yet support host +mode, but an example will be written and documented when they do.""" +) + if(len(sys.argv)==1): print "Usage: %s verb [objects]\n" % sys.argv[0]; print "%s info" % sys.argv[0]; diff --git a/client/goodfet.maxusbmass b/client/goodfet.maxusbmass index 9796971..8c12923 100755 --- a/client/goodfet.maxusbmass +++ b/client/goodfet.maxusbmass @@ -8,9 +8,15 @@ import sys; import binascii; import array; import time; +import warnings from GoodFETMAXUSB import *; +warnings.warn( +"""The libraries upon which this program depends will soon be deprecated in +favor of the USB*.py libraries. See facedancer-umass.py (forthcoming) for an +example of this program written using the new libraries.""" +) # This constant is kinda complicated and very ugly. The idea is that # if we take too long in any given transaction, the host will abort. diff --git a/client/util.py b/client/util.py new file mode 100644 index 0000000..da9d344 --- /dev/null +++ b/client/util.py @@ -0,0 +1,7 @@ +# util.py +# +# Random helpful functions. + +def bytes_as_hex(b, delim=" "): + return delim.join(["%02x" % x for x in b]) +