1.windows性能监视器
我的电脑
->管理
->计算机管理(本地)
->性能
->监视工具
->性能监视器
2.计数器功能
性能对象 | 计数器 | 介绍 |
---|---|---|
Process | IO Data Bytes/sec | 处理给不包括数据的IO操作(如控制操作)字节的速率。这个计数器为所有由本处理产生的包括文件、网络和设备IO的活动计数。 |
3.实现
import win32pdh, time, psutil,os
class Process: # 进程
def __init__(self, ProcessPid):
self.ProcessPid = ProcessPid
# function: getCurrentPid
# params: None
# return: example-> 10000
# return type: int
# introduction: 获取当前运行的pid
@staticmethod
def getCurrentPid() -> int:
return os.getpid()
# function: getCurrentPidName(self)
# params: None
# return: example-> python
# return type: str
# introduction: 获取当前Pid的名字
def getCurrentPidName(self) -> str or None:
for proc in psutil.process_iter():
if self.ProcessPid == proc.pid:
currentPidName = proc.name().replace(".exe", "")
# print("pid-%d,name:%s" % (proc.pid, proc.name()))
return currentPidName
# function: getSameNameProcessIndex(self, ProcessName)
# params: ProcessName:对象的实例名称
# return: example-> [{"pid": "...", "pidIndex": "..."},...]
# return type: list
# introduction: 获取各个对象的实例,pid和索引,因为对象的实例不止一个
def getSameNameProcessIndex(self, ProcessName) -> list:
items, instances = win32pdh.EnumObjectItems(None, None, "Process", win32pdh.PERF_DETAIL_WIZARD)
list_pid_pidIndex = []
index = 0
for name in instances:
if name == ProcessName:
dict_pid_pidName = {"pid": "", "pidIndex": ""}
val = None
query = win32pdh.OpenQuery() # 打开查询
hcs = []
path = win32pdh.MakeCounterPath((None, "Process", ProcessName, None, index, "ID Process")) # 建立计数器路径
hcs.append(win32pdh.AddCounter(query, path))
win32pdh.CollectQueryData(query)
time.sleep(0.01)
win32pdh.CollectQueryData(query)
for hc in hcs:
type, val = win32pdh.GetFormattedCounterValue(hc, win32pdh.PDH_FMT_LONG)
win32pdh.RemoveCounter(hc)
win32pdh.CloseQuery(query)
dict_pid_pidName["pid"] = val
dict_pid_pidName["pidIndex"] = index
list_pid_pidIndex.append(dict_pid_pidName)
index += 1
return list_pid_pidIndex
# function: getItemInfo(self, Item,ProcessName, Index)
# params: Item:计数器名称, ProcessName:实例名称, Index:实例索引
# return: example-> 131304
# return type: int
# introduction: # 获取IO Other Bytes/sec,或IO Data Bytes/sec等...,返回计数器查询的数值
def getItemInfo(self, Item,ProcessName, Index) -> int:
object = "Process"
# items, instances = win32pdh.EnumObjectItems(None, None, object, win32pdh.PERF_DETAIL_WIZARD)
val = None
# if ProcessName in instances:
hq = win32pdh.OpenQuery()
hcs = []
path = win32pdh.MakeCounterPath((None, object, ProcessName, None, Index, Item))
hcs.append(win32pdh.AddCounter(hq, path))
win32pdh.CollectQueryData(hq)
time.sleep(1)
win32pdh.CollectQueryData(hq)
for hc in hcs:
type, val = win32pdh.GetFormattedCounterValue(hc, win32pdh.PDH_FMT_LONG)
win32pdh.RemoveCounter(hc)
win32pdh.CloseQuery(hq)
return val
# function: Bytes_to_KBytes(self, Bytes)
# params: Bytes:字节数
# return: example-> 1024.0
# return type: float
# introduction: # 获取字节转KB
def Bytes_to_KBytes(self, Bytes) -> float: # Bytes->Kb
return round(Bytes / (1024), 2)
# function: Bytes_to_MBytes(self, Bytes)
# params: Bytes:字节数
# return: example-> 1024.0
# return type: float
# introduction: # 获取字节转MB
def Bytes_to_MBytes(self, Bytes) -> float: # Bytes->Mb
return round(Bytes / (1024 * 2), 2)