X-Git-Url: http://git.rot13.org/?p=goodfet;a=blobdiff_plain;f=client%2FGoodFETCCSPI.py;h=518add32343696291392d8cbe4dc9abc82831660;hp=1d27c24e82178d6ca212394b156cf14a5cb2a464;hb=0a6754712a364a01d149dc518f44b258a3a37cf8;hpb=ef478608abc99029a46aaf682c0e2022a70877ed diff --git a/client/GoodFETCCSPI.py b/client/GoodFETCCSPI.py index 1d27c24..518add3 100644 --- a/client/GoodFETCCSPI.py +++ b/client/GoodFETCCSPI.py @@ -1,11 +1,11 @@ #!/usr/bin/env python # GoodFET Chipcon RF Radio Client # -# (C) 2009 Travis Goodspeed +# (C) 2009, 2012 Travis Goodspeed # # This code is being rewritten and refactored. You've been warned! -import sys, time, string, cStringIO, struct, glob, serial, os; +import sys, time, string, cStringIO, struct, glob, os; from GoodFET import GoodFET; @@ -19,10 +19,13 @@ class GoodFETCCSPI(GoodFET): #Set up the radio for ZigBee self.strobe(0x01); #SXOSCON - self.poke(0x11, 0x0AC2); #MDMCTRL0 + self.strobe(0x02); #SCAL + self.poke(0x11, 0x0AC2 & (~0x0800)); #MDMCTRL0, promiscuous self.poke(0x12, 0x0500); #MDMCTRL1 self.poke(0x1C, 0x007F); #IOCFG0 self.poke(0x19, 0x01C4); #SECCTRL0, disabling crypto + #self.poke(0x19, 0x0204); #SECCTRL0, as seen elsewhere. + #self.RF_setsync(); def ident(self): return self.peek(0x1E); #MANFIDL @@ -56,10 +59,10 @@ class GoodFETCCSPI(GoodFET): self.strobe(0x04); #0x05 for CCA def CC_RFST_RX(self): """Switch the radio to RX mode.""" - self.strobe(0x03); + self.strobe(0x03); #RX ON def CC_RFST_CAL(self): """Calibrate strobe the radio.""" - self.strobe(0x02); + self.strobe(0x02); #RX Calibrate def CC_RFST(self,state=0x00): self.strobe(state); return; @@ -82,22 +85,37 @@ class GoodFETCCSPI(GoodFET): """Write a CCSPI Register.""" data=[reg,(val>>8)&0xFF,val&0xFF]; self.writecmd(self.CCSPIAPP,0x03,len(data),data); - if self.peek(reg,bytes)!=val: + if self.peek(reg,bytes)!=val and reg!=0x18: print "Warning, failed to set r%02x=0x%04x, got %02x." %( reg, val, self.peek(reg,bytes)); - return; + return False; + return True; def status(self): """Read the status byte.""" + statusbits={0x80: "?", + 0x40: "XOSC16M_STABLE", + 0x20: "TX_UNDERFLOW", + 0x10: "ENC_BUSY", + 0x08: "TX_ACTIVE", + 0x04: "LOCK", + 0x02: "RSSI_VALID", + 0x01: "?"}; status=self.strobe(0x00); - print "Status=%02x" % status; + i=1; + str=""; + while i<0x100: + if status&i: + str="%s %s" % (statusbits[i],str); + i*=2; + return str; #Radio stuff begins here. def RF_setenc(self,code="802.15.4"): """Set the encoding type.""" - return; + return code; def RF_getenc(self): """Get the encoding type.""" return "802.15.4"; @@ -105,17 +123,47 @@ class GoodFETCCSPI(GoodFET): return 0; def RF_setrate(self,rate=0): return 0; + def RF_getsync(self): + return self.peek(0x14); + def RF_setsync(self,sync=0xa70F): + """Set the SYNC preamble. + Use 0xA70F for 0xA7.""" + self.poke(0x14,sync); + return; + + def RF_setkey(self,key): + """Sets the first key for encryption to the given argument.""" + print "ERROR: Forgot to set the key."; + + return; + def RF_setnonce(self,key): + """Sets the first key for encryption to the given argument.""" + print "ERROR: Forgot to set the nonce."; + + return; + def RF_setfreq(self,frequency): """Set the frequency in Hz.""" mhz=frequency/1000000; - fsctrl=self.peek(0x18)&~0x3FF; + #fsctrl=0x8000; # + fsctrl=self.peek(0x18)&(~0x3FF); fsctrl=fsctrl+int(mhz-2048) self.poke(0x18,fsctrl); + #self.CC_RFST_IDLE(); + self.strobe(0x02);#SCAL + time.sleep(0.01); + self.strobe(0x03);#SRXON def RF_getfreq(self): """Get the frequency in Hz.""" fsctrl=self.peek(0x18); mhz=2048+(fsctrl&0x3ff) return mhz*1000000; + def RF_setchan(self,channel): + """Set the ZigBee/802.15.4 channel number.""" + if channel < 11 or channel > 26: + print "Only 802.15.4 channels 11 to 26 are currently supported."; + else: + self.RF_setfreq( ( (channel-11)*5 + 2405 ) * 1000000 ); def RF_getsmac(self): """Return the source MAC address.""" return 0xdeadbeef; @@ -129,19 +177,28 @@ class GoodFETCCSPI(GoodFET): """Set the target MAC address.""" return 0xdeadbeef; def RF_getrssi(self): - """Returns the received signal strenght, with a weird offset.""" + """Returns the received signal strength, with a weird offset.""" rssival=self.peek(0x13)&0xFF; #raw RSSI register return rssival^0x80; + + def peekram(self,adr,count): + """Peeks data from CC2420 RAM.""" + data=[ + adr&0xFF,adr>>8, # Address first. + count&0xFF,count>>8 # Then length. + ]; + self.writecmd(self.CCSPIAPP,0x84,len(data),data); + return self.data; + def pokeram(self,adr,data): + """Pokes data into CC2420 RAM.""" + data=[adr&0xFF, adr>>8]+data; + self.writecmd(self.CCSPIAPP,0x85,len(data),data); + return; + lastpacket=range(0,0xff); def RF_rxpacket(self): - """Get a packet from the radio. Returns None if none is waiting. In - order to not require the SFD, FIFO, or FIFOP lines, this - implementation works by comparing the buffer to the older - contents. - """ - - # TODO -- Flush only if there's an overflow. - self.strobe(0x08); #SFLUSHRX + """Get a packet from the radio. Returns None if none is + waiting.""" data="\0"; self.data=data; @@ -151,41 +208,137 @@ class GoodFETCCSPI(GoodFET): self.lastpacket=buffer; if(len(buffer)==0): return None; - return buffer; - def RF_rxpacket_old(self): - """Get a packet from the radio. Returns None if none is waiting. In - order to not require the SFD, FIFO, or FIFOP lines, this - implementation works by comparing the buffer to the older - contents. - """ - self.strobe(0x03); #SRXON - self.strobe(0x08); #SFLUSHRX - buffer=range(0,0xff); - buffer[0]=0x3F | 0x40; #RXFIFO - buffer=self.trans(buffer); + return buffer; + def RF_rxpacketrepeat(self): + """Gets packets from the radio, ignoring all future requests so as + not to waste time. Call RF_rxpacket() after this.""" - new=False; - for foo in range(0,ord(buffer[0])): - if buffer[foo]!=self.lastpacket[foo]: - new=True; - if not new: - return None; + self.writecmd(self.CCSPIAPP,0x91,0,None); + return None; + + def RF_rxpacketdec(self): + """Get and decrypt a packet from the radio. Returns None if + none is waiting.""" + data="\0"; + self.data=data; + self.writecmd(self.CCSPIAPP,0x90,len(data),data); + buffer=self.data; self.lastpacket=buffer; + if(len(buffer)==0): + return None; + return buffer; + + def RF_txpacket(self,packet): + """Send a packet through the radio.""" + self.writecmd(self.CCSPIAPP,0x81,len(packet),packet); + #time.sleep(1); + #self.strobe(0x09); + return; + + def RF_reflexjam(self,duration=0): + """Place the device into reflexive jamming mode.""" + data = [duration&0xff, + (duration>>8)&0xff]; + self.writecmd(self.CCSPIAPP,0xA0,len(data),data); + return; + + def RF_reflexjam_autoack(self): + """Place the device into reflexive jamming mode + and that also sends a forged ACK if needed.""" + data = ""; + self.writecmd(self.CCSPIAPP,0xA1,len(data),data); + print "Got:", data, "and", self.data + return; + + def RF_modulated_spectrum(self): + """Hold a carrier wave on the present frequency.""" + # print "Don't know how to hold a carrier."; + # 33.1 p.55: + # reset chip + # SXOSCON + # set MDMCTRL1.TX_MODE to 3 0x12 3:2 + # STXON 0x04 + + mdmctrl1=self.peek(0x12); + #print "mdmctrl1 was %04x" % mdmctrl1; + mdmctrl1=mdmctrl1|0x00c0; #MDMCTRL1.TX_MODE = 3 + self.poke(0x12, mdmctrl1); #MDMCTRL1 + + mdmctrl1=self.peek(0x12); + #print "mdmctrl1 is %04x" % mdmctrl1; + + # http://e2e.ti.com/support/low_power_rf/f/155/t/15914.aspx?PageIndex=2 + # suggests this + self.strobe(0x02); #STXCAL + #print "STXCAL status: %s" % self.status() + + # is this necessary? + self.strobe(0x09); #SFLUSHTX + #print "SFLUSHTX status: %s" % self.status() + + self.strobe(0x04); #STXON + #print "STXON status: %s" % self.status() + def RF_carrier(self): """Hold a carrier wave on the present frequency.""" - print "Don't know how to hold a carrier."; + # print "Don't know how to hold a carrier."; + # 33.1 p.54: + # reset chip + # SXOSCON + # set MDMCTRL1.TX_MODE to 2 or 3 0x12 3:2 + # set DACTST to 0x1800 0x2E + # STXON 0x04 + + mdmctrl1=self.peek(0x12); + #print "mdmctrl1 was %04x" % mdmctrl1; + mdmctrl1=mdmctrl1|0x0080; + mdmctrl1=mdmctrl1&0x0080; #MDMCTRL1.TX_MODE = 2 + self.poke(0x12, mdmctrl1); #MDMCTRL1 + + mdmctrl1=self.peek(0x12); + #print "mdmctrl1 is %04x" % mdmctrl1; + + self.poke(0x2E, 0x1800); #DACTST + dactst=self.peek(0x2E); + #print "dactst is %04x" % dactst; + + # see above for why this is here + self.strobe(0x02); #STXCAL + #print "STXCAL status: %s" % self.status() + self.strobe(0x09); #SFLUSHTX + #print "SFLUSHTX status: %s" % self.status() + + self.strobe(0x04); #STXON + #print "STXON status: %s" % self.status() + def RF_promiscuity(self,promiscuous=1): mdmctrl0=self.peek(0x11); - print "mdmctrl0 was %04x" % mdmctrl0; - mdmctrl0=mdmctrl0&(~0x800); - print "mdmctrl0 is now %04x" % mdmctrl0; + if promiscuous>0: + mdmctrl0=mdmctrl0&(~0x800); + else: + mdmctrl0=mdmctrl0|0x800; + self.poke(0x11,mdmctrl0); + return; + def RF_autocrc(self,autocrc=1): + mdmctrl0=self.peek(0x11); + if autocrc==0: + mdmctrl0=mdmctrl0&(~0x0020); + else: + mdmctrl0=mdmctrl0|0x0020; + self.poke(0x11,mdmctrl0); + return; + def RF_autoack(self,autoack=1): + mdmctrl0=self.peek(0x11); + if autoack==0: + mdmctrl0=mdmctrl0&(~0x0010); + else: + mdmctrl0=mdmctrl0|0x0010; self.poke(0x11,mdmctrl0); return; - packetlen=16; def RF_setpacketlen(self,len=16): """Set the number of bytes in the expected payload.""" @@ -210,3 +363,25 @@ class GoodFETCCSPI(GoodFET): choice=choices[len]; self.poke(0x03,choice); self.maclen=len; + def printpacket(self,packet,prefix="#"): + print self.packet2str(packet,prefix); + def packet2str(self,packet,prefix="#"): + s=""; + i=0; + for foo in packet: + s="%s %02x" % (s,ord(foo)); + return "%s%s" % (prefix,s); + + def printdissect(self,packet): + try: + from scapy.all import Dot15d4 + except ImportError: + print "To use packet disection, Scapy must be installed and have the Dot15d4 extension present." + print "try: hg clone http://hg.secdev.org/scapy-com"; + print " sudo ./setup.py install"; + self.printpacket(packet); + try: + scapyd = Dot15d4(packet[1:]); + scapyd.show(); + except: + pass;