win32pdh获取进程网络速率

发布于 2021-04-02  348 次阅读


1.windows性能监视器

我的电脑->管理->计算机管理(本地)->性能->监视工具->性能监视器

2.计数器功能

性能对象 计数器 介绍
Process IO Data Bytes/sec 处理给不包括数据的IO操作(如控制操作)字节的速率。这个计数器为所有由本处理产生的包括文件、网络和设备IO的活动计数。

3.实现

import win32pdh, time, psutil,os
class Process:  # 进程
    def __init__(self, ProcessPid):
        self.ProcessPid = ProcessPid

    # function: getCurrentPid
    # params: None
    # return: example-> 10000
    # return type: int
    # introduction: 获取当前运行的pid
    @staticmethod
    def getCurrentPid() -> int:
        return os.getpid()

    # function: getCurrentPidName(self)
    # params: None
    # return: example-> python
    # return type: str
    # introduction: 获取当前Pid的名字
    def getCurrentPidName(self) -> str or None:
        for proc in psutil.process_iter():
            if self.ProcessPid == proc.pid:
                currentPidName = proc.name().replace(".exe", "")
                # print("pid-%d,name:%s" % (proc.pid, proc.name()))
                return currentPidName

    # function: getSameNameProcessIndex(self, ProcessName)
    # params: ProcessName:对象的实例名称
    # return: example-> [{"pid": "...", "pidIndex": "..."},...]
    # return type: list
    # introduction:  获取各个对象的实例,pid和索引,因为对象的实例不止一个
    def getSameNameProcessIndex(self, ProcessName) -> list:
        items, instances = win32pdh.EnumObjectItems(None, None, "Process", win32pdh.PERF_DETAIL_WIZARD)
        list_pid_pidIndex = []
        index = 0
        for name in instances:
            if name == ProcessName:
                dict_pid_pidName = {"pid": "", "pidIndex": ""}
                val = None
                query = win32pdh.OpenQuery()  # 打开查询
                hcs = []
                path = win32pdh.MakeCounterPath((None, "Process", ProcessName, None, index, "ID Process"))  # 建立计数器路径
                hcs.append(win32pdh.AddCounter(query, path))
                win32pdh.CollectQueryData(query)
                time.sleep(0.01)
                win32pdh.CollectQueryData(query)
                for hc in hcs:
                    type, val = win32pdh.GetFormattedCounterValue(hc, win32pdh.PDH_FMT_LONG)
                    win32pdh.RemoveCounter(hc)
                win32pdh.CloseQuery(query)
                dict_pid_pidName["pid"] = val
                dict_pid_pidName["pidIndex"] = index
                list_pid_pidIndex.append(dict_pid_pidName)
                index += 1

        return list_pid_pidIndex

    # function: getItemInfo(self, Item,ProcessName, Index)
    # params: Item:计数器名称, ProcessName:实例名称, Index:实例索引
    # return: example-> 131304
    # return type: int
    # introduction: # 获取IO Other Bytes/sec,或IO Data Bytes/sec等...,返回计数器查询的数值
    def getItemInfo(self, Item,ProcessName, Index) -> int:
        object = "Process"
        # items, instances = win32pdh.EnumObjectItems(None, None, object, win32pdh.PERF_DETAIL_WIZARD)
        val = None
        # if ProcessName in instances:
        hq = win32pdh.OpenQuery()
        hcs = []
        path = win32pdh.MakeCounterPath((None, object, ProcessName, None, Index, Item))

        hcs.append(win32pdh.AddCounter(hq, path))
        win32pdh.CollectQueryData(hq)
        time.sleep(1)
        win32pdh.CollectQueryData(hq)

        for hc in hcs:
            type, val = win32pdh.GetFormattedCounterValue(hc, win32pdh.PDH_FMT_LONG)
            win32pdh.RemoveCounter(hc)
        win32pdh.CloseQuery(hq)
        return val

    # function: Bytes_to_KBytes(self, Bytes)
    # params: Bytes:字节数
    # return: example-> 1024.0
    # return type: float
    # introduction: # 获取字节转KB
    def Bytes_to_KBytes(self, Bytes) -> float:  # Bytes->Kb
        return round(Bytes / (1024), 2)

    # function: Bytes_to_MBytes(self, Bytes)
    # params: Bytes:字节数
    # return: example-> 1024.0
    # return type: float
    # introduction: # 获取字节转MB

    def Bytes_to_MBytes(self, Bytes) -> float:  # Bytes->Mb
        return round(Bytes / (1024 * 2), 2)