Connection Dropper
Today I was playing with python and ctypes to build a connection dropper… cause we (me and swirl) need a small lib to embed in an our tool.
Before to proceed we need to know how the following windows functions works: GetTcpTable (the GetTcpTable function retrieves the IPv4 TCP connection table) and SetTcpEntry (the SetTcpEntry function sets the state of a TCP connection).
Well so all we have to do is to retrieve the connections list by using GetTcpTable, select the connection we want to drop and then change its status to MIB_TCP_STATE_DELETE_TCB by using SetTcpEntry. Easy… let’s code…
First we have to create some defines…
#connection dropper - defines.py
# by ratsoul
from ctypes import *
BYTE = c_ubyte
WORD = c_ushort
DWORD = c_ulong
ERROR_INSUFFICIENT_BUFFER = 122
CONNECTION_STATUS = {
1 : "closed",
2 : "listen",
5 : "estab",
9 : "closing"
}
DROP = MIB_TCP_STATE_DELETE_TCB = 12
class MIB_TCPROW(Structure):
_fields_ = \
[
("dwState", DWORD),
("dwLocalAddr", DWORD),
("dwLocalPort", DWORD),
("dwRemoteAddr", DWORD),
("dwRemotePort", DWORD)
]
class MIB_TCPTABLE(Structure):
_fields_ = \
[
("dwNumEntries", DWORD),
("table", MIB_TCPROW * 4000)
]
class S_UN_B(Structure):
_fields_ = \
[
("s_b1", BYTE),
("s_b2", BYTE),
("s_b3", BYTE),
("s_b4", BYTE)
]
class S_UN_W(Structure):
_field_ = \
[
("s_w1", BYTE),
("s_w2", BYTE)
]
class S_UN(Union):
_fields_ = \
[
("s_un_b", S_UN_B),
("s_un_w", S_UN_W),
("s_addr", DWORD)
]
class IN_ADDR(Structure):
_fields_ = [("s_un", S_UN)]
Then we can proceed with the main code…
#connection dropper - cdrop.py
# by ratsoul
from ctypes import *
from defines import *
import socket
#dll
iph = windll.Iphlpapi
ws2 = windll.Ws2_32
msv = windll.msvcrt
#struct
ipaddr = IN_ADDR()
mtable = MIB_TCPTABLE()
#misc
size = c_ulong(sizeof(mtable))
connections_list = {}
#GetTcpTable
ret = iph.GetTcpTable(byref(mtable), byref(size), True)
if ret == ERROR_INSUFFICIENT_BUFFER:
#no problem..
mtable = MIB_TCPTABLE()
iph.GetTcpTable(byref(mtable), byref(size), True)
mt = mtable.table
id = 0
print "> Connections list:"
for i in range(mtable.dwNumEntries):
t = mt[i]
#get status info
try:
status = CONNECTION_STATUS[t.dwState]
except:
status = "unkn"
#get local info
ipaddr.s_un.s_addr = t.dwLocalAddr
local_address = socket.inet_ntoa(ipaddr)
local_port = t.dwLocalPort
#get remote info
ipaddr.s_un.s_addr = t.dwRemoteAddr
remote_address = socket.inet_ntoa(ipaddr)
remote_port = t.dwRemotePort
#skip local
if not (remote_address == "0.0.0.0" or remote_address == "127.0.0.1") :
#save current MIB_TCPROW
connections_list[id] = t;
#print some info about current MIB_TCPROW
try:
print "\t%d) link: %s -> %s" %(id, socket.gethostbyaddr(local_address)[0], socket.gethostbyaddr(remote_address)[0])
except:
print "\t%d) link: %s -> %s" %(id, socket.gethostbyaddr(local_address)[0], remote_address)
id += 1
print "> ---"
val = int( raw_input("> connection to drop [0-%d]: " % (len(connections_list)-1)))
conn2drop = connections_list[val]
conn2drop.dwState = DROP
#SetTcpEntry
if iph.SetTcpEntry(byref(conn2drop)):
print "[!] unable to drop selected connection id=%d (on Vista run as Admin)" % val
else:
print "[*] bye bye connection id=%d" % val
Full source code here.
Have fun ;)

