3 # Contains class definitions to implement a USB mass storage device.
9 from USBDevice import *
10 from USBConfiguration import *
11 from USBInterface import *
12 from USBEndpoint import *
13 from USBClass import *
17 class USBMassStorageClass(USBClass):
18 name = "USB mass storage class"
20 def setup_request_handlers(self):
21 self.request_handlers = {
22 0xFF : self.handle_bulk_only_mass_storage_reset_request,
23 0xFE : self.handle_get_max_lun_request
26 def handle_bulk_only_mass_storage_reset_request(self, req):
27 self.interface.configuration.device.maxusb_app.send_on_endpoint(0, b'')
29 def handle_get_max_lun_request(self, req):
30 self.interface.configuration.device.maxusb_app.send_on_endpoint(0, b'\x00')
33 class USBMassStorageInterface(USBInterface):
34 name = "USB mass storage interface"
36 def __init__(self, disk_image, verbose=0):
37 self.disk_image = disk_image
43 USBEndpoint.direction_out,
44 USBEndpoint.transfer_type_bulk,
45 USBEndpoint.sync_type_none,
46 USBEndpoint.usage_type_data,
47 16384, # max packet size
48 0, # polling interval, see USB 2.0 spec Table 9-13
49 self.handle_data_available # handler function
53 USBEndpoint.direction_in,
54 USBEndpoint.transfer_type_bulk,
55 USBEndpoint.sync_type_none,
56 USBEndpoint.usage_type_data,
57 16384, # max packet size
58 0, # polling interval, see USB 2.0 spec Table 9-13
59 None # handler function
63 # TODO: un-hardcode string index (last arg before "verbose")
64 USBInterface.__init__(
67 0, # alternate setting
68 8, # interface class: Mass Storage
69 6, # subclass: SCSI transparent command set
70 0x50, # protocol: bulk-only (BBB) transport
77 self.device_class = USBMassStorageClass()
78 self.device_class.set_interface(self)
80 self.is_write_in_progress = False
82 self.write_base_lba = 0
86 def handle_data_available(self, data):
87 print(self.name, "handling", len(data), "bytes of SCSI data")
89 cbw = CommandBlockWrapper(data)
92 status = 0 # default to success
93 response = None # with no response data
95 if self.is_write_in_progress:
97 print(self.name, "got", len(data), "bytes of SCSI write data")
99 self.write_data += data
101 if len(self.write_data) < self.write_length:
102 # more yet to read, don't send the CSW
105 self.disk_image.put_sector_data(self.write_base_lba, self.write_data)
108 self.is_write_in_progress = False
109 self.write_data = b''
111 elif opcode == 0x00: # Test Unit Ready: just return OK status
113 print(self.name, "got SCSI Test Unit Ready")
115 elif opcode == 0x03: # Request Sense
117 print(self.name, "got SCSI Request Sense, data",
118 bytes_as_hex(cbw.cb[1:]))
120 response = b'\x70\x00\xFF\x00\x00\x00\x00\x0A\x00\x00\x00\x00\xFF\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00'
122 elif opcode == 0x12: # Inquiry
124 print(self.name, "got SCSI Inquiry, data",
125 bytes_as_hex(cbw.cb[1:]))
128 0x00, # 00 for Direct, 1F for "no floppy"
129 0x00, # make 0x80 for removable media, 0x00 for fixed
131 0x01, # Response Data Format
132 0x14, # Additional length.
136 response += b'GoodFET ' # vendor
137 response += b'GoodFET ' # product id
138 response += b' ' # product revision
141 # pad up to data_transfer_length bytes
142 #diff = cbw.data_transfer_length - len(response)
143 #response += bytes([0] * diff)
145 elif opcode == 0x1e: # Prevent/Allow Removal: feign success
147 print(self.name, "got SCSI Prevent/Allow Removal")
149 #elif opcode == 0x1a or opcode == 0x5a: # Mode Sense (6 or 10)
152 elif opcode == 0x23: # Read Format Capacity
154 print(self.name, "got SCSI Read Format Capacity")
157 0x00, 0x00, 0x00, 0x08, # capacity list length
158 0x00, 0x00, 0x10, 0x00, # number of sectors (0x1000 = 10MB)
159 0x10, 0x00, # reserved/descriptor code
160 0x02, 0x00, # 512-byte sectors
163 elif opcode == 0x25: # Read Capacity
165 print(self.name, "got SCSI Read Capacity, data",
166 bytes_as_hex(cbw.cb[1:]))
168 lastlba = self.disk_image.get_sector_count()
171 (lastlba >> 24) & 0xff,
172 (lastlba >> 16) & 0xff,
173 (lastlba >> 8) & 0xff,
175 0x00, 0x00, 0x02, 0x00, # 512-byte blocks
178 elif opcode == 0x28: # Read (10)
179 base_lba = cbw.cb[2] << 24 \
184 num_blocks = cbw.cb[7] << 8 \
188 print(self.name, "got SCSI Read (10), lba", base_lba, "+",
189 num_blocks, "block(s)")
192 # Note that here we send the data directly rather than putting
193 # something in 'response' and letting the end of the switch send
194 for block_num in range(num_blocks):
195 data = self.disk_image.get_sector_data(base_lba + block_num)
196 self.configuration.device.maxusb_app.send_on_endpoint(3, data)
198 elif opcode == 0x2a: # Write (10)
200 print(self.name, "got SCSI Write (10), data",
201 bytes_as_hex(cbw.cb[1:]))
203 base_lba = cbw.cb[1] << 24 \
208 num_blocks = cbw.cb[7] << 8 \
212 print(self.name, "got SCSI Write (10), lba", base_lba, "+",
213 num_blocks, "block(s)")
217 self.write_base_lba = base_lba
218 self.write_length = num_blocks * self.disk_image.block_size
219 self.is_write_in_progress = True
221 # because we need to snarf up the data from wire before we reply
225 elif opcode == 0x35: # Synchronize Cache (10): blindly OK
227 print(self.name, "got Synchronize Cache (10)")
230 print(self.name, "received unsupported SCSI opcode 0x%x" % opcode)
231 status = 0x02 # command failed
232 if cbw.data_transfer_length > 0:
233 response = bytes([0] * cbw.data_transfer_length)
237 print(self.name, "responding with", len(response), "bytes:",
238 bytes_as_hex(response))
240 self.configuration.device.maxusb_app.send_on_endpoint(3, response)
243 ord('U'), ord('S'), ord('B'), ord('S'),
244 cbw.tag[0], cbw.tag[1], cbw.tag[2], cbw.tag[3],
245 0x00, 0x00, 0x00, 0x00,
250 print(self.name, "responding with status =", status)
252 self.configuration.device.maxusb_app.send_on_endpoint(3, csw)
256 def __init__(self, filename, block_size):
257 self.filename = filename
258 self.block_size = block_size
260 statinfo = os.stat(self.filename)
261 self.size = statinfo.st_size
263 self.file = open(self.filename, 'r+b')
264 self.image = mmap(self.file.fileno(), 0)
270 def get_sector_count(self):
271 return int(self.size / self.block_size) - 1
273 def get_sector_data(self, address):
274 block_start = address * self.block_size
275 block_end = (address + 1) * self.block_size # slices are NON-inclusive
277 return self.image[block_start:block_end]
279 def put_sector_data(self, address, data):
280 block_start = address * self.block_size
281 block_end = (address + 1) * self.block_size # slices are NON-inclusive
283 self.image[block_start:block_end] = data[:self.block_size]
287 class CommandBlockWrapper:
288 def __init__(self, bytestring):
289 self.signature = bytestring[0:4]
290 self.tag = bytestring[4:8]
291 self.data_transfer_length = bytestring[8] \
292 | bytestring[9] << 8 \
293 | bytestring[10] << 16 \
294 | bytestring[11] << 24
295 self.flags = int(bytestring[12])
296 self.lun = int(bytestring[13] & 0x0f)
297 self.cb_length = int(bytestring[14] & 0x1f)
298 #self.cb = bytestring[15:15+self.cb_length]
299 self.cb = bytestring[15:]
302 s = "sig: " + bytes_as_hex(self.signature) + "\n"
303 s += "tag: " + bytes_as_hex(self.tag) + "\n"
304 s += "data transfer len: " + str(self.data_transfer_length) + "\n"
305 s += "flags: " + str(self.flags) + "\n"
306 s += "lun: " + str(self.lun) + "\n"
307 s += "command block len: " + str(self.cb_length) + "\n"
308 s += "command block: " + bytes_as_hex(self.cb) + "\n"
313 class USBMassStorageDevice(USBDevice):
314 name = "USB mass storage device"
316 def __init__(self, maxusb_app, disk_image_filename, verbose=0):
317 self.disk_image = DiskImage(disk_image_filename, 512)
319 interface = USBMassStorageInterface(self.disk_image, verbose=verbose)
321 config = USBConfiguration(
323 "Maxim umass config", # string desc
324 [ interface ] # interfaces
332 0, # protocol release number
333 64, # max packet size for endpoint 0
334 0x8107, # vendor id: Sandisk
335 0x5051, # product id: SDCZ2 Cruzer Mini Flash Drive (thin)
336 0x0003, # device revision
337 "Maxim", # manufacturer string
338 "MAX3420E Enum Code", # product string
339 "S/N3420E", # serial number string
344 def disconnect(self):
345 self.disk_image.close()
346 USBDevice.disconnect(self)