#!/usr/bin/env python #USB Mass Storage Emulator #by Travis Goodspeed #with thanks to Brandon Wilson and his Linky project. import sys; import binascii; import array; import time; from GoodFETMAXUSB import *; class GoodFETMAXUSBMass(GoodFETMAXUSBDevice): """This emulates a USB Mass Storage device.""" #Too much data to watch everything. usbverbose=False; def getSectorData(self,lba): """Overload this to return data from a given 512-byte sector.""" print "You forgot to overload getSectorData(). Returning something neighborly."; sector=[ 0xE9, 0x86, 0x00, 0x0A, 0x47, 0x6F, 0x6F, 0x64, 0x44, 0x69, 0x73, 0x6B, 0x20, 0x30, 0x2E, 0x30, 0x31, 0x0A, 0x0D, 0x62, 0x79, 0x20, 0x54, 0x72, 0x61, 0x76, 0x69, 0x73, 0x20, 0x47, 0x6F, 0x6F, 0x64, 0x73, 0x70, 0x65, 0x65, 0x64, 0x0A, 0x0A, 0x0D, 0x00, 0x59, 0x6F, 0x75, 0x20, 0x68, 0x61, 0x76, 0x65, 0x20, 0x62, 0x65, 0x65, 0x6E, 0x20, 0x65, 0x61, 0x74, 0x65, 0x6E, 0x20, 0x62, 0x79, 0x20, 0x61, 0x20, 0x67, 0x72, 0x75, 0x65, 0x2E, 0x20, 0x20, 0x53, 0x6F, 0x72, 0x72, 0x79, 0x2E, 0x0A, 0x0D, 0x00, 0x31, 0x29, 0x20, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6E, 0x67, 0x20, 0x6B, 0x65, 0x72, 0x6E, 0x65, 0x6C, 0x20, 0x66, 0x72, 0x6F, 0x6D, 0x20, 0x64, 0x69, 0x73, 0x6B, 0x2E, 0x0A, 0x0D, 0x00, 0x32, 0x29, 0x20, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6E, 0x67, 0x20, 0x6B, 0x65, 0x72, 0x6E, 0x65, 0x6C, 0x2E, 0x0A, 0x0D, 0x00, 0xBE, 0x03, 0x7C, 0xE8, 0x41, 0x00, 0xE8, 0x7B, 0x00, 0x31, 0xC0, 0x30, 0xD2, 0xCD, 0x13, 0x0F, 0x82, 0xE8, 0x00, 0xBE, 0x53, 0x7C, 0xE8, 0x2E, 0x00, 0xB8, 0xE0, 0x07, 0x8E, 0xC0, 0x31, 0xDB, 0xB8, 0x10, 0x02, 0xB5, 0x00, 0xB1, 0x02, 0xB6, 0x00, 0xB2, 0x00, 0xCD, 0x13, 0x0F, 0x82, 0xCA, 0x00, 0xB8, 0x00, 0x7E, 0x89, 0xC6, 0xE8, 0x7C, 0x00, 0xBE, 0x72, 0x7C, 0xE8, 0x08, 0x00, 0xEA, 0x00, 0x00, 0xE0, 0x07, 0xE8, 0xB4, 0x00, 0xAC, 0x3C, 0x00, 0x74, 0x06, 0xB4, 0x0E, 0xCD, 0x10, 0xEB, 0xF5, 0xC3, 0x30, 0x78, 0x00, 0x20, 0x62, 0x79, 0x74, 0x65, 0x73, 0x20, 0x6F, 0x66, 0x20, 0x6D, 0x65, 0x6D, 0x6F, 0x72, 0x79, 0x20, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x65, 0x64, 0x2E, 0x0A, 0x0D, 0x00, 0x53, 0x65, 0x67, 0x6D, 0x65, 0x6E, 0x74, 0x73, 0x3A, 0x20, 0x00, 0x2C, 0x20, 0x00, 0x0A, 0x0D, 0x00, 0xBE, 0xDC, 0x7C, 0xE8, 0xBD, 0xFF, 0xE8, 0x63, 0x00, 0xE8, 0x07, 0x00, 0xBE, 0xDF, 0x7C, 0xE8, 0xB1, 0xFF, 0xC3, 0x89, 0xC3, 0xC1, 0xE8, 0x0C, 0xE8, 0x39, 0x00, 0x89, 0xD8, 0xC1, 0xE8, 0x08, 0xE8, 0x31, 0x00, 0x89, 0xD8, 0xC1, 0xE8, 0x04, 0xE8, 0x29, 0x00, 0x89, 0xD8, 0xE8, 0x24, 0x00, 0xC3, 0x31, 0xC9, 0xAD, 0xE8, 0xDC, 0xFF, 0xE8, 0x2C, 0x00, 0x83, 0xC1, 0x02, 0x81, 0xF9, 0x00, 0x02, 0x75, 0xF0, 0xC3, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x50, 0x56, 0x83, 0xE0, 0x0F, 0x05, 0x51, 0x7D, 0x89, 0xC6, 0xAC, 0xB4, 0x0E, 0xCD, 0x10, 0x5E, 0x58, 0xC3, 0xB8, 0x20, 0x0E, 0xCD, 0x10, 0xC3, 0x31, 0xC0, 0xCD, 0x12, 0x72, 0x05, 0x85, 0xC0, 0x74, 0x01, 0xC3, 0xBE, 0x2A, 0x7C, 0xE8, 0x46, 0xFF, 0xEB, 0xFE, 0xEA, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0xAA ]; return sector; def putSectorData(self,lba,block): """Overload this to write data to a given 512-byte sector.""" print "You forgot to overload putSectorData(). Ignoring sector write."; return; def getSectorCount(self): """Returns the number of viable Logical Block Addresses.""" print "You forgot to overload getSectorCount(). Guessing 0x200."; return 0x200; def massinit(self): """Initialize a USB Mass Storage device.""" self.usb_disconnect(); time.sleep(1); self.usb_connect(); self.massrun(); def massrun(self): """Main loop of the USB Mass Storage emulator.""" print "Starting a Mass Storage device."; while 1: sys.stdout.flush(); self.service_irqs(); def do_SETUP(self): """Handle USB Enumeration""" #Grab the SETUP packet from the buffer. SUD=self.readbytes(rSUDFIFO,8); #Parse the SETUP packet print "Handling a setup packet of %s" % self.setup2str(SUD); setuptype=(ord(SUD[bmRequestType])&0x60); if setuptype==0x00: self.std_request(SUD); elif setuptype==0x20: self.class_request(SUD); elif setuptype==0x40: self.vendor_request(SUD); else: print "Unknown bmRequestType=0x%02x." % ord(SUD[bmRequestType]) self.STALL_EP0(SUD); def class_request(self,SUD): """Handle a class request.""" requesttype=ord(SUD[bmRequestType]); request=ord(SUD[bRequest]); if requesttype==0xA1 and request==0xFE: print "Reporting 0 as the maximum LUN."; #This is a Get Max LUN request. #Return 1-byte maximum Logical Unit Number self.wreg(rEP0FIFO,0x00); # Just one LUN. self.wregAS(rEP0BC,1); # ARM and fire! return; #Don't stall. if requesttype==0x21 and request==0xff: print "Received BBB reset." self.wregAS(rEP0BC,0); # ARM and fire! return; #Don't stall. print "Stalling an unknown class request: %s" % self.setup2str(SUD); self.STALL_EP0(SUD); def vendor_request(self,SUD): """Handle a vendor request.""" request=ord(SUD[bRequest]); print "Why the hell is there a vendor request?"; #self.wreg(rEP0FIFO,0); self.wregAS(rEP0BC,0); def std_request(self,SUD): """Handles a standard setup request.""" setuptype=ord(SUD[bRequest]); if setuptype==SR_GET_DESCRIPTOR: self.send_descriptor(SUD); #elif setuptype==SR_SET_FEATURE: self.feature(1); elif setuptype==SR_SET_CONFIGURATION: self.set_configuration(SUD); elif setuptype==SR_GET_STATUS: self.get_status(SUD); elif setuptype==SR_SET_ADDRESS: self.rregAS(rFNADDR); elif setuptype==SR_GET_INTERFACE: self.get_interface(SUD); else: #print "Stalling Unknown standard setup request type %02x" % setuptype; #self.STALL_EP0(SUD); print "Accepting unknown standard setup request type %02x" % setuptype; self.wregAS(rEP0BC,0); def get_interface(self,SUD): """Handles a setup request for SR_GET_INTERFACE.""" if ord(SUD[wIndexL]==0): self.wreg(rEP0FIFO,0); self.wregAS(rEP0BC,1); else: self.STALL_EP0(SUD); #Device Descriptor DD=[ 0x12, #length 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x40, 0x81, 0x07, #Sandisk 0x50, 0x51, #SDCZ2 Cruzer Mini Flash Drive (thin) 0x00, 0x03, 0x01, 0x02, 0x03, #Strings 0x01 ]; #Configuration Descriptor CD=[ 0x09, #Length 0x02, #Type 0x20, #Total Length 0x00, 0x01, 0x01, 0x00, 0xE0, 0x00, 0x09, 0x04, 0x00, 0x00, 0x02, #Num Endpoints 0x08, #Mass Storage Bulk Only 0x06, #SCSI 0x50, 0x00, #OUT EP1 0x07, 0x05, 0x01, 0x02, 0x40, 0x00, 0x00, #IN EP3 0x07, 0x05, 0x83, 0x02, 0x40, 0x00, 0x00, ]; strDesc=[ # STRING descriptor 0--Language string "\x04\x03\x09\x04", # [ # 0x04, # bLength # 0x03, # bDescriptorType = string # 0x09,0x04 # wLANGID(L/H) = English-United Sates # ], # STRING descriptor 1--Manufacturer ID "\x10\x03G\x00o\x00o\x00d\x00F\x00E\x00T\x00", # STRING descriptor 2 - Product ID "\x1C\x03M\x00A\x00S\x00S\x00 \x00E\x00m\x00u\x00l\x00a\x00t\x00o\x00r\x00", # STRING descriptor 3 - Serial Number ID "\x14\x03S\x00/\x00N\x00 \x003\x004\x002\x000\x00E\x00" ]; def get_status(self,SUD): """Get the USB Setup Status.""" testbyte=ord(SUD[bmRequestType]) #Toward Device if testbyte==0x80: self.wreg(rEP0FIFO,0x03); #Enable RWU and self-powered self.wreg(rEP0FIFO,0x00); #Second byte is always zero. self.wregAS(rEP0BC,2); #Load byte count, arm transfer, and ack CTL. #Toward Interface elif testbyte==0x81: self.wreg(rEP0FIFO,0x00); self.wreg(rEP0FIFO,0x00); #Second byte is always zero. self.wregAS(rEP0BC,2); #Toward Endpoint elif testbyte==0x82: if(ord(SUD[wIndexL])==0x83): print "This code almost certainly doesn't work."; self.wreg(rEP0FIFO,0x01); #Stall EP3 self.wreg(rEP0FIFO,0x00); #Second byte is always zero. self.wregAS(rEP0BC,2); else: print "Stalling unknown status."; self.STALL_EP0(SUD); else: print "Stalling unknown status."; self.STALL_EP0(SUD); def do_IN3(self): """Handle IN3 input event.""" # Do nothing here, as it'll be taken care of elsewhere. The # interrupt just means that the buffer is empty, not that we # are expected to fill it. def do_OUT1(self): """Handle an OUT1 output event.""" l=self.rreg(rEP1OUTBC); frame=self.readbytes(rEP1OUTFIFO,l); self.handleCBW(frame); lastCBW=""; def handleCBW(self,cbw): """Handles an incoming Command Block Wrapper. See USB Mass Storage Class for details.""" if len(cbw)!=31: print "Invalid CBW length of %i bytes. Aborting." % len(cbw); return; sig=cbw[0:4]; if sig!="USBC": print "Invalid CBW signature: %s. Should be USBC; aborting." % sig; return; self.lastCBW=cbw; dtlen=ord(cbw[8])+(ord(cbw[9])<<8)+(ord(cbw[10])<<16)+(ord(cbw[11])<<24); flags=ord(cbw[12]); dtdir=flags&0x80; # 0x80 for dev->host, 0x00 for host->dev lun=ord(cbw[13])&0x0F; # Should be zero, as we only reported one LUN. cblen=ord(cbw[14])&0x1F; cb=cbw[15:31]; self.handleCB(cb,cblen,dtlen,dtdir); def handleCB(self,cb,cblen,dtlen,dtdir): """Handles a command block, then replies with a CSW.""" if self.usbverbose: print "Got command block, type 0x%02x requesting 0x%02x bytes" % ( ord(cb[0]), dtlen); verb=ord(cb[0]); status=00; #good, set to 1 for bad. if verb==0x00: # Test Unit Ready # Send nothing, just the success code. status=0; elif verb==0x03: # Request Sense print "Responding to Request Sense. Needed for Macs."; response=[0x70, 0x00, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x0A, 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0,0,0,0,0]; #status=1; self.writebytes(rEP3INFIFO, response); self.wreg(rEP3INBC,len(response)); elif verb==0x12: #Inquiry #print "Responding to CB inquiry."; response=[ 0x00, # 00 for Direct, 1F for "no floppy" 0x80, # make 0x80 for removable media 0x00, # Version 0x01, # Response Data Format 0x1f, #Additional length. 0x00, 0x00, 0x00, #Manufacturer ord('G'),ord('o'),ord('o'),ord('d'),ord('F'),ord('E'),ord('T'),0x20, #Device name ord('G'),ord('o'),ord('o'),ord('d'),ord('F'),ord('E'),ord('T'),0x20, 0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20, ord('0'),ord('.'),ord('0'),ord('1')] #print "Sending %i byte reply to %i byte query." % ( # len(response),dtlen); #while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): # #Wait for the packet to complete before sending the next. # print "Waiting to complete inquiry." # pass; self.writebytes(rEP3INFIFO, response); self.wregAS(rEP3INBC, dtlen); #len(response)); #self.wreg(rEPIRQ,bmIN3BAVIRQ); #Clear the bit #while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): # #Wait for the packet to complete before sending the next. # print "Waiting to complete inquiry." # pass; elif verb==0x1e: #Prevent/Allow Removal # Give a good status to pretend we understand. status=0x00; elif verb==0x1A: #Mode Sense (6) # I should probably send six bytes here. response=[0x12,0,0,0, 0,0,0,0x1C]; self.writebytes(rEP3INFIFO, response); self.wregAS(rEP3INBC, len(response)); elif verb==0x23: #Read Format Capacity response=[ 0x00, 0,0x00,0x08, #Capacity list length. 0,0x00,0x10,0x00, # Number of sectors, implying 10MB. 0x01,0x00, #reserved/desciptor code. 0x02,0x00 # 512 bytes/sector. Why is this twice? ]; self.writebytes(rEP3INFIFO, response); self.wregAS(rEP3INBC, len(response)); elif verb==0x25: #Read Capacity lastlba=self.getSectorCount(); response=[ #0x00, 0, 0x0f, 0xFF, # Last LBA (lastlba>>24)&0xFF, (lastlba>>16)&0xFF, (lastlba>>8)&0xFF, lastlba&0xFF, # Last LBA 0x00,0x00,0x02,0x00 # Block length of 512 bytes. ]; self.writebytes(rEP3INFIFO, response); self.wregAS(rEP3INBC, len(response)); elif verb==0x28: #READ SECTOR cbw=self.lastCBW; baselba=( ord(cbw[20]) | (ord(cbw[19])<<8) | (ord(cbw[18])<<16) | (ord(cbw[17])<<24) ); count=dtlen/512; print "Fetching %i blocks starting at LBA %i." % (count,baselba); if count>32: count=0; status=1; #Fail if we're asked to read more than 32 blocks. #Now we need to stall EP3. It's not acceptable to just forget to transmit. self.wreg(rEPSTALLS,0x10); for i in range(0,count): data=self.getSectorData(baselba+i); for j in range(0,8): #print "Sending block fragment %i,%i" % (i,j); #Transmit each 64-byte block fragment, then wait for next. while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): pass; response=data[j*64:j*64+64]; self.writebytes(rEP3INFIFO, response); self.wregAS(rEP3INBC, 64); #sys.exit(); elif verb==0x2A: #WRITE SECTOR print "Haven't implemented WRITE SECTOR."; #sys.exit(); else: print "ERROR: Unknown SCSI command block verb %02x." % verb; status=1; #Command Failed if dtlen>0: print "Perhaps I should send %i bytes of dummy data here." % dtlen; sys.exit(1); cbw=self.lastCBW; #Now we need to send the CSW. csw=[ #Standard prefix. ord('U'),ord('S'),ord('B'),ord('S'), #CBW key; must be the same as the one we're replying to. ord(cbw[4]),ord(cbw[5]),ord(cbw[6]),ord(cbw[7]), #CSW Data Residue, probably oughtn't be zeroed. 0,0,0,0, #Status byte: 00 for good, 01 for bad. status]; self.writebytes(rEP3INFIFO, csw); self.wregAS(rEP3INBC,len(csw)); return; class GoodFETMAXUSBMassFile(GoodFETMAXUSBMass): """This emulates a USB Mass Storage Device, providing a file as its image. Writes are not yet supported, and this is very slow. Performance hacks will come after the code stabilizes.""" datafile=None; datafilelen=None; def openImage(self,filename): """Opens an image for use. Call this *before* massinit().""" self.datafile=open(filename,"rb"); print "Opened an image with %i blocks." % self.getSectorCount(); def putSectorData(self,lba,block): """Writes a 512-byte sector to the lba address.""" print "Writes aren't yet supported."; return; def getSectorData(self,lba): """Returns data from a 512-byte sector.""" toret=""; #Seek to the appropraite block. #print "Seeking to position %i"% (lba*512) self.datafile.seek(lba*512,0); pos=self.datafile.tell(); if pos!=lba*512: print "SEEK ERROR: Seeked to %i (lba=%i), but now I'm at %i (lba=%i)" % ( lba*512,lba, pos,pos/512); #sys.exit(); #Dump the data out, assuming no blocking and filling with nonsense. toret=self.datafile.read(512); if len(toret)<512: print "Holy hell, I only have %i bytes of 512." % len(toret); toretbytes=range(0,len(toret)); for b in range(0,len(toret)): toretbytes[b]=ord(toret[b]); return toretbytes; def getSectorCount(self): """Returns the number of viable Logical Block Addresses.""" # Python must have a better way to read a file length, but for # now we just read the whole damned thing and then throw it # away. With present performance, we can't read anything # large enough for this to be a problem. if self.datafilelen==None: self.datafile.seek(0); self.datafilelen=len(self.datafile.read()); if self.datafilelen%512!=0: print "ERROR: Image does not have an integer number of blocks!" print "%i \% 512 == %i" % (self.datafilelen, self.datafilelen%512); sys.exit(); return self.datafilelen/512-1; if(len(sys.argv)==1): print "Usage: %s disk.img\n" % sys.argv[0]; sys.exit(); #Initialize FET and set baud rate client=GoodFETMAXUSBMassFile(); client.serInit() client.openImage(sys.argv[1]); client.MAXUSBsetup(); client.massinit();