La solution étant déjà trouvée, je pose tout de même une réponse n'utilisant pas de Popen ou autres (j'étais curieux de comment faire ça, n'étant pas un Windows user de base, j'ai fouillé un peu sur le net et je suis arrivé une solution), en espérant que ça serve un jour à quelqu'un.
import struct
from ctypes import *
from ctypes.wintypes import *
from socket import inet_aton, inet_ntoa, htons
from socket import AF_INET
from win32com.client import GetObject
TCP_TABLE_BASIC_LISTENER = 0
TCP_TABLE_BASIC_CONNECTIONS = 1
TCP_TABLE_BASIC_ALL = 2
TCP_TABLE_OWNER_PID_LISTENER = 3
TCP_TABLE_OWNER_PID_CONNECTIONS = 4
TCP_TABLE_OWNER_PID_ALL = 5
TCP_TABLE_OWNER_MODULE_LISTENER = 6
TCP_TABLE_OWNER_MODULE_CONNECTIONS = 7
TCP_TABLE_OWNER_MODULE_ALL = 8
NO_ERROR = 0
ERROR_INVALID_PARAMETER = 87
ERROR_INSUFFICIENT_BUFFER = 122
STATES = {
1: "CLOSED",
2: "LISTENING",
3: "SYN_SENT",
4: "SYN_RCVD",
5: "ESTABLISHED",
6: "FIN_WAIT",
7: "FIN_WAIT_2",
8: "CLOSE_WAIT",
9: "CLOSING",
10: "LAST_ACK",
11: "TIME_WAIT",
12: "DELETE_TCB",
}
class socket_info(object):
State = None
LocalAddr = None
LocalPort = None
RemoteAddr = None
RemotePort = None
def __init__ (self, **kwargs):
for key, word in kwargs.items():
setattr(self, key, word)
class MIB_TCPROW_OWNER_PID(Structure):
_fields_ = [
("dwState", DWORD),
("dwLocalAddr", DWORD),
("dwLocalPort", DWORD),
("dwRemoteAddr", DWORD),
("dwRemotePort", DWORD),
("dwOwningPid", DWORD)
]
class MIB_TCPTABLE_OWNER_PID(Structure):
_fields_ = [
("dwNumEntries", DWORD),
("MIB_TCPROW_OWNER_PID", MIB_TCPROW_OWNER_PID * 0)
]
def formatip(row):
return (inet_ntoa(struct.pack("L", row.dwLocalAddr)),
inet_ntoa(struct.pack("L", row.dwRemoteAddr)))
_GetExtendedTcpTable = windll.iphlpapi.GetExtendedTcpTable
def GetExtendedTcpTable(vip=AF_INET,
table_class=TCP_TABLE_OWNER_PID_ALL):
if vip == AF_INET:
table_type = MIB_TCPTABLE_OWNER_PID
row_type = MIB_TCPROW_OWNER_PID
format_func = formatip
else:
raise ValueError('ip version must be AF_INET')
table = table_type()
size = DWORD()
order = True
failure = _GetExtendedTcpTable(None,
byref(size),
order,
vip,
table_class,
0)
if failure == ERROR_INSUFFICIENT_BUFFER:
resize(table, size.value)
memset(byref(table), 0, sizeof(table))
failure = _GetExtendedTcpTable(byref(table),
byref(size),
order,
vip,
table_class,
0)
if failure != NO_ERROR:
raise WinError(failure)
ptr_type = POINTER(row_type * table.dwNumEntries)
tables = cast(getattr(table, row_type.__name__), ptr_type)[0]
pytables = []
for row in tables:
LocalAddr, RemoteAddr = format_func(row)
pytables.append(
socket_info(
State=STATES.get(row.dwState, "UNKNOWN_STATE_%s" % (str(row.dwState))),
LocalAddr=LocalAddr,
LocalPort=htons(row.dwLocalPort),
RemoteAddr=RemoteAddr,
RemotePort=htons(row.dwRemotePort),
OwningPid=int(row.dwOwningPid)
)
)
return pytables
def GetTcpTableForPid(pid,
vip=AF_INET,
table_class=TCP_TABLE_OWNER_PID_ALL):
pytables = [p for p in GetExtendedTcpTable(vip, table_class) if p.OwningPid == pid]
return pytables
def GetPid(name):
WMI = GetObject('winmgmts:')
processes = WMI.InstancesOf('Win32_Process')
pid = [p.Properties_("ProcessID").Value for p in processes if p.Properties_("Name").Value.lower() == name.lower()]
if not pid:
raise ValueError("Unable to find pid for %s" % (name,))
return pid[0]
if __name__ == '__main__':
argv = sys.argv[1:]
if not argv:
tcp_info = GetExtendedTcpTable(table_class=TCP_TABLE_OWNER_PID_LISTENER)
else:
tcp_info = GetTcpTableForPid(GetPid(argv[0]), table_class=TCP_TABLE_OWNER_PID_LISTENER)
print('%-25s%-25s%-30s%s\n' % ('Local', 'Foreign', 'State', 'PID'))
for item in tcp_info:
LocalAddr = '{}:{}'.format(item.LocalAddr, item.LocalPort)
RemoteAddr = '{}:{}'.format(item.RemoteAddr, item.RemotePort)
print('%-25s%-25s%-30s%s' % (LocalAddr, RemoteAddr,
item.State, item.OwningPid))
refs:
http://stackoverflow.com/questions/18062175/error-using-getextendedtcptable-in-python
http://stackoverflow.com/questions/550653/cross-platform-way-to-get-pids-by-process-name-in-python