turn ftdi driver into echo server
[goodfet] / client / USBDevice.py
1 # USBDevice.py
2 #
3 # Contains class definitions for USBDevice and USBDeviceRequest.
4
5 from USB import *
6 from USBClass import *
7
8 class USBDevice:
9     name = "generic device"
10
11     def __init__(self, maxusb_app, device_class, device_subclass,
12             protocol_rel_num, max_packet_size_ep0, vendor_id, product_id,
13             device_rev, manufacturer_string, product_string,
14             serial_number_string, configurations=[], descriptors={},
15             verbose=0):
16         self.maxusb_app = maxusb_app
17         self.verbose = verbose
18
19         self.strings = [ ]
20
21         self.usb_spec_version           = 0x0001
22         self.device_class               = device_class
23         self.device_subclass            = device_subclass
24         self.protocol_rel_num           = protocol_rel_num
25         self.max_packet_size_ep0        = max_packet_size_ep0
26         self.vendor_id                  = vendor_id
27         self.product_id                 = product_id
28         self.device_rev                 = device_rev
29         self.manufacturer_string_id     = self.get_string_id(manufacturer_string)
30         self.product_string_id          = self.get_string_id(product_string)
31         self.serial_number_string_id    = self.get_string_id(serial_number_string)
32
33         # maps from USB.desc_type_* to bytearray OR callable
34         self.descriptors = descriptors
35         self.descriptors[USB.desc_type_device] = self.get_descriptor
36         self.descriptors[USB.desc_type_configuration] = self.handle_get_configuration_descriptor_request
37         self.descriptors[USB.desc_type_string] = self.handle_get_string_descriptor_request
38
39         self.config_num = -1
40         self.configuration = None
41         self.configurations = configurations
42
43         for c in self.configurations:
44             csi = self.get_string_id(c.configuration_string)
45             c.set_configuration_string_index(csi)
46             c.set_device(self)
47
48         self.state = USB.state_detached
49         self.ready = False
50
51         self.address = 0
52
53         self.setup_request_handlers()
54
55     def get_string_id(self, s):
56         try:
57             i = self.strings.index(s)
58         except ValueError:
59             # string descriptors start at index 1
60             self.strings.append(s)
61             i = len(self.strings)
62
63         return i
64
65     def setup_request_handlers(self):
66         # see table 9-4 of USB 2.0 spec, page 279
67         self.request_handlers = {
68              0 : self.handle_get_status_request,
69              1 : self.handle_clear_feature_request,
70              3 : self.handle_set_feature_request,
71              5 : self.handle_set_address_request,
72              6 : self.handle_get_descriptor_request,
73              7 : self.handle_set_descriptor_request,
74              8 : self.handle_get_configuration_request,
75              9 : self.handle_set_configuration_request,
76             10 : self.handle_get_interface_request,
77             11 : self.handle_set_interface_request,
78             12 : self.handle_synch_frame_request
79         }
80
81     def connect(self):
82         self.maxusb_app.connect(self)
83
84         # skipping USB.state_attached may not be strictly correct (9.1.1.{1,2})
85         self.state = USB.state_powered
86
87     def disconnect(self):
88         self.maxusb_app.disconnect()
89
90         self.state = USB.state_detached
91
92     def run(self):
93         self.maxusb_app.service_irqs()
94
95     def ack_status_stage(self):
96         self.maxusb_app.ack_status_stage()
97
98     def get_descriptor(self, n):
99         d = bytearray([
100             18,         # length of descriptor in bytes
101             1,          # descriptor type 1 == device
102             (self.usb_spec_version >> 8) & 0xff,
103             self.usb_spec_version & 0xff,
104             self.device_class,
105             self.device_subclass,
106             self.protocol_rel_num,
107             self.max_packet_size_ep0,
108             self.vendor_id & 0xff,
109             (self.vendor_id >> 8) & 0xff,
110             self.product_id & 0xff,
111             (self.product_id >> 8) & 0xff,
112             self.device_rev & 0xff,
113             (self.device_rev >> 8) & 0xff,
114             self.manufacturer_string_id,
115             self.product_string_id,
116             self.serial_number_string_id,
117             len(self.configurations)
118         ])
119
120         return d
121
122     # IRQ handlers
123     #####################################################
124
125     def handle_request(self, req):
126         if self.verbose > 3:
127             print(self.name, "received request", req)
128
129         # figure out the intended recipient
130         recipient_type = req.get_recipient()
131         recipient = None
132         index = req.get_index()
133         if recipient_type == USB.request_recipient_device:
134             recipient = self
135         elif recipient_type == USB.request_recipient_interface:
136             if index < len(self.configuration.interfaces):
137                 recipient = self.configuration.interfaces[index]
138         elif recipient_type == USB.request_recipient_endpoint:
139             recipient = self.endpoints.get(index, None)
140
141         if not recipient:
142             print(self.name, "invalid recipient, stalling")
143             self.maxusb_app.stall_ep0()
144             return
145
146         # and then the type
147         req_type = req.get_type()
148         handler_entity = None
149         if req_type == USB.request_type_standard:
150             handler_entity = recipient
151         elif req_type == USB.request_type_class:
152             handler_entity = recipient.device_class
153         elif req_type == USB.request_type_vendor:
154             handler_entity = recipient.device_vendor
155
156         if not handler_entity:
157             print(self.name, "invalid handler entity, stalling")
158             self.maxusb_app.stall_ep0()
159             return
160
161         handler = handler_entity.request_handlers.get(req.request, None)
162
163         if not handler:
164             print(self.name, "invalid handler, stalling")
165             self.maxusb_app.stall_ep0()
166             return
167
168         handler(req)
169
170     def handle_data_available(self, ep_num, data):
171         if self.state == USB.state_configured and ep_num in self.endpoints:
172             endpoint = self.endpoints[ep_num]
173             if callable(endpoint.handler):
174                 endpoint.handler(data)
175
176     def handle_buffer_available(self, ep_num):
177         if self.state == USB.state_configured and ep_num in self.endpoints:
178             endpoint = self.endpoints[ep_num]
179             if callable(endpoint.handler):
180                 endpoint.handler()
181     
182     # standard request handlers
183     #####################################################
184
185     # USB 2.0 specification, section 9.4.5 (p 282 of pdf)
186     def handle_get_status_request(self, req):
187         print(self.name, "received GET_STATUS request")
188
189         # self-powered and remote-wakeup (USB 2.0 Spec section 9.4.5)
190         response = b'\x03\x00'
191         self.maxusb_app.send_on_endpoint(0, response)
192
193     # USB 2.0 specification, section 9.4.1 (p 280 of pdf)
194     def handle_clear_feature_request(self, req):
195         print(self.name, "received CLEAR_FEATURE request with type 0x%02x and value 0x%02x" \
196                 % (req.request_type, req.value))
197
198     # USB 2.0 specification, section 9.4.9 (p 286 of pdf)
199     def handle_set_feature_request(self, req):
200         print(self.name, "received SET_FEATURE request")
201
202     # USB 2.0 specification, section 9.4.6 (p 284 of pdf)
203     def handle_set_address_request(self, req):
204         self.address = req.value
205         self.state = USB.state_address
206         self.ack_status_stage()
207
208         if self.verbose > 2:
209             print(self.name, "received SET_ADDRESS request for address",
210                     self.address)
211
212     # USB 2.0 specification, section 9.4.3 (p 281 of pdf)
213     def handle_get_descriptor_request(self, req):
214         dtype  = (req.value >> 8) & 0xff
215         dindex = req.value & 0xff
216         lang   = req.index
217         n      = req.length
218
219         response = None
220
221         if self.verbose > 2:
222             print(self.name, ("received GET_DESCRIPTOR req %d, index %d, " \
223                     + "language 0x%04x, length %d") \
224                     % (dtype, dindex, lang, n))
225
226         response = self.descriptors.get(dtype, None)
227         if callable(response):
228             response = response(dindex)
229
230         if response:
231             n = min(n, len(response))
232             self.maxusb_app.verbose += 1
233             self.maxusb_app.send_on_endpoint(0, response[:n])
234             self.maxusb_app.verbose -= 1
235
236             if self.verbose > 5:
237                 print(self.name, "sent", n, "bytes in response")
238         else:
239             self.maxusb_app.stall_ep0()
240
241     def handle_get_configuration_descriptor_request(self, num):
242         return self.configurations[num].get_descriptor()
243
244     def handle_get_string_descriptor_request(self, num):
245         if num == 0:
246             # HACK: hard-coding baaaaad
247             d = bytes([
248                     4,      # length of descriptor in bytes
249                     3,      # descriptor type 3 == string
250                     9,      # language code 0, byte 0
251                     4       # language code 0, byte 1
252             ])
253         else:
254             # string descriptors start at 1
255             s = self.strings[num-1].encode('utf-16')
256
257             # Linux doesn't like the leading 2-byte Byte Order Mark (BOM);
258             # FreeBSD is okay without it
259             s = s[2:]
260
261             d = bytearray([
262                     len(s) + 2,     # length of descriptor in bytes
263                     3               # descriptor type 3 == string
264             ])
265             d += s
266
267         return d
268
269     # USB 2.0 specification, section 9.4.8 (p 285 of pdf)
270     def handle_set_descriptor_request(self, req):
271         print(self.name, "received SET_DESCRIPTOR request")
272
273     # USB 2.0 specification, section 9.4.2 (p 281 of pdf)
274     def handle_get_configuration_request(self, req):
275         print(self.name, "received GET_CONFIGURATION request with data 0x%02x" \
276                 % req.data)
277
278     # USB 2.0 specification, section 9.4.7 (p 285 of pdf)
279     def handle_set_configuration_request(self, req):
280         print(self.name, "received SET_CONFIGURATION request")
281
282         # configs are one-based
283         self.config_num = req.value - 1
284         self.configuration = self.configurations[self.config_num]
285         self.state = USB.state_configured
286
287         # collate endpoint numbers
288         self.endpoints = { }
289         for i in self.configuration.interfaces:
290             for e in i.endpoints:
291                 self.endpoints[e.number] = e
292
293         # HACK: blindly acknowledge request
294         self.ack_status_stage()
295
296     # USB 2.0 specification, section 9.4.4 (p 282 of pdf)
297     def handle_get_interface_request(self, req):
298         print(self.name, "received GET_INTERFACE request")
299
300         if req.index == 0:
301             # HACK: currently only support one interface
302             self.maxusb_app.send_on_endpoint(0, b'\x00')
303         else:
304             self.maxusb_app.stall_ep0()
305
306     # USB 2.0 specification, section 9.4.10 (p 288 of pdf)
307     def handle_set_interface_request(self, req):
308         print(self.name, "received SET_INTERFACE request")
309
310     # USB 2.0 specification, section 9.4.11 (p 288 of pdf)
311     def handle_synch_frame_request(self, req):
312         print(self.name, "received SYNCH_FRAME request")
313
314
315 class USBDeviceRequest:
316     def __init__(self, raw_bytes):
317         """Expects raw 8-byte setup data request packet"""
318
319         self.request_type   = raw_bytes[0]
320         self.request        = raw_bytes[1]
321         self.value          = (raw_bytes[3] << 8) | raw_bytes[2]
322         self.index          = (raw_bytes[5] << 8) | raw_bytes[4]
323         self.length         = (raw_bytes[7] << 8) | raw_bytes[6]
324
325     def __str__(self):
326         s = "dir=%d, type=%d, rec=%d, r=%d, v=%d, i=%d, l=%d" \
327                 % (self.get_direction(), self.get_type(), self.get_recipient(),
328                    self.request, self.value, self.index, self.length)
329         return s
330
331     def raw(self):
332         """returns request as bytes"""
333         b = bytes([ self.request_type, self.request,
334                     (self.value  >> 8) & 0xff, self.value  & 0xff,
335                     (self.index  >> 8) & 0xff, self.index  & 0xff,
336                     (self.length >> 8) & 0xff, self.length & 0xff
337                   ])
338         return b
339
340     def get_direction(self):
341         return (self.request_type >> 7) & 0x01
342
343     def get_type(self):
344         return (self.request_type >> 5) & 0x03
345
346     def get_recipient(self):
347         return self.request_type & 0x1f
348
349     # meaning of bits in wIndex changes whether we're talking about an
350     # interface or an endpoint (see USB 2.0 spec section 9.3.4)
351     def get_index(self):
352         rec = self.get_recipient()
353         if rec == 1:                # interface
354             return self.index
355         elif rec == 2:              # endpoint
356             return self.index & 0x0f
357