利用Python实现网络测试的脚本分享

yipeiwu_com6年前Python基础

前言

最近同学让我帮忙写一个测试网络的工具。由于工作上的事情,断断续续地拖了很久才给出一个相对完整的版本。其实,我Python用的比较少,所以基本都是边查资料边写程序。

程序的主要逻辑如下:

读取一个excel文件中的ip列表,然后使用多线程调用ping统计每个ip的网络参数,最后把结果输出到excel文件中。

代码如下所示:

#! /usr/bin/env python
# -*- coding: UTF-8 -*-
# File: pingtest_test.py
# Date: 2008-09-28
# Author: Michael Field
# Modified By:intheworld
# Date: 2017-4-17
import sys
import os
import getopt
import commands
import subprocess
import re
import time
import threading
import xlrd
import xlwt

TEST = [
  '220.181.57.217',
  '166.111.8.28',
  '202.114.0.242',
  '202.117.0.20',
  '202.112.26.34',
  '202.203.128.33',
  '202.115.64.33',
  '202.201.48.2',
  '202.114.0.242',
  '202.116.160.33',
  '202.202.128.33',
]
RESULT={}
def usage():
 print "USEAGE:"
 print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" %sys.argv[0]
 print "\t TEST为简单测试的IP列表"
 print "\t-t times 测试次数;默认为1000;"
 print "\t-c concurrent number 并行线程数目:默认为10"
 print "\t-h|-?, 帮助信息"
 print "\t 输出为当前目录文件ping_result.txt 和 ping_result.xls"
 print "for example:"
 print "\t./ping_test.py -n TEST -t 1 -c 10"

def div_list(ls,n):
 if not isinstance(ls,list) or not isinstance(n,int):
  return []
 ls_len = len(ls)
 print 'ls length = %s' %ls_len
 if n<=0 or 0==ls_len:
  return []
 if n > ls_len:
  return []
 elif n == ls_len:
  return [[i] for i in ls]
 else:
  j = ls_len/n
  k = ls_len%n
  ### j,j,j,...(前面有n-1个j),j+k
  #步长j,次数n-1
  ls_return = []
  for i in xrange(0,(n-1)*j,j):
   ls_return.append(ls[i:i+j])
  #算上末尾的j+k
  ls_return.append(ls[(n-1)*j:])
  return ls_return

def pin(IP):
 try:
  xpin=subprocess.check_output("ping -n 1 -w 100 %s" %IP, shell=True)
 except Exception:
  xpin = 'empty'
 ms = '=[0-9]+ms'.decode("utf8")
 print "%s" %ms
 print "%s" %xpin
 mstime=re.search(ms,xpin)
 if not mstime:
  MS='timeout'
  return MS
 else:
  MS=mstime.group().split('=')[1]
  return MS.strip('ms')
def count(total_count,I):
 global RESULT
 nowsecond = int(time.time())
 nums = 0
 oknums = 0
 timeout = 0
 lostpacket = 0.0
 total_ms = 0.0
 avgms = 0.0
 maxms = -1
 while nums < total_count:
  nums += 1
  MS = pin(I)
  print 'pin output = %s' %MS
  if MS == 'timeout':
   timeout += 1
   lostpacket = timeout*100.0 / nums
  else:
   oknums += 1
   total_ms = total_ms + float(MS)
   if oknums == 0:
    oknums = 1
    maxms = float(MS)
    avgms = total_ms / oknums
   else:
    avgms = total_ms / oknums
    maxms = max(maxms, float(MS))
  RESULT[I] = (I, avgms, maxms, lostpacket)

def thread_func(t, ipList):
 if not isinstance(ipList,list):
  return
 else:
  for ip in ipList:
   count(t, ip)

def readIpsInFile(excelName):
 data = xlrd.open_workbook(excelName)
 table = data.sheets()[0]
 nrows = table.nrows
 print 'nrows %s' %nrows
 ips = []
 for i in range(nrows):
  ips.append(table.cell_value(i, 0))
  print table.cell_value(i, 0)
 return ips
 

if __name__ == '__main__':
 file = 'ping_result.txt'
 times = 10
 network = ''
 thread_num = 10
 args = sys.argv[1:]
 try:
  (opts, getopts) = getopt.getopt(args, 'n:t:c:h?')
 except:
  print "\nInvalid command line option detected."
  usage()
  sys.exit(1)
 for opt, arg in opts:
  if opt in ('-n'):
   network = arg
  if opt in ('-h', '-?'):
   usage()
   sys.exit(0)
  if opt in ('-t'):
   times = int(arg)
  if opt in ('-c'):
   thread_num = int(arg)
 f = open(file, 'w')
 workbook = xlwt.Workbook()
 sheet1 = workbook.add_sheet("sheet1", cell_overwrite_ok=True)
 if not isinstance(times,int):
  usage()
  sys.exit(0)
 if network not in ['TEST'] and not os.path.exists(os.path.join(os.path.dirname(__file__), network)):
  print "The network is wrong or excel file does not exist. please check it."
  usage()
  sys.exit(0)
 else:
  if network == 'TEST':
   ips = TEST
  else:
   ips = readIpsInFile(network)
  print 'Starting...'
  threads = []
  nest_list = div_list(ips, thread_num)
  loops = range(len(nest_list))
  print 'Total %s Threads is working...' %len(nest_list)
  for ipList in nest_list:
   t = threading.Thread(target=thread_func,args=(times,ipList))
   threads.append(t)
  for i in loops:
   threads[i].start()
  for i in loops:
   threads[i].join()
  it = 0
  for line in RESULT:
   value = RESULT[line]
   sheet1.write(it, 0, line)
   sheet1.write(it, 1, str('%.2f'%value[1]))
   sheet1.write(it, 2, str('%.2f'%value[2]))
   sheet1.write(it, 3, str('%.2f'%value[3]))
   it+=1
   f.write(line + '\t'+ str('%.2f'%value[1]) + '\t'+ str('%.2f'%value[2]) + '\t'+ str('%.2f'%value[3]) + '\n')
  f.close()
  workbook.save('ping_result.xls')
  print 'Work Done. please check result %s and ping_result.xls.'%file

这段代码参照了别人的实现,虽然不是特别复杂,这里还是简单解释一下。

  •     excel读写使用了xlrd和xlwt,基本都是使用了一些简单的api。
  •     使用了threading实现多线程并发,和POSIX标准接口非常相似。thread_func是线程的处理函数,它的输入包含了一个ip的List,所以在函数内部通过循环处理各个ip。
  •     此外,Python的commands在Windows下并不兼容,所以使用了subprocess模块。

到目前为止,我对Python里面字符集的理解还不到位,所以正则表达式匹配的代码并不够强壮,不过目前勉强可以工作,以后有必要再改咯!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对【听图阁-专注于Python设计】的支持。

相关文章

python 反编译exe文件为py文件的实例代码

我们用pyinstaller把朋友文件打包成exe文件,但有时候我们需要还原,我们可以用pyinstxtractor.py 用法: python pyinstxtractor.py xx...

python实现自主查询实时天气

python实现自主查询实时天气

本文实例为大家分享了python实现自主查询实时天气的具体代码,供大家参考,具体内容如下 用到了urllib2 json  很简单的一个应用 如下 获取城市编号 #cod...

python中dict使用方法详解

dict的特性 dict是python中的一个可变的数据类型,用{}表示,dict的key必须是不可变的数据类型,而value的数据类型可以任意。 格式:{key:value,key...

一个可以套路别人的python小程序实例代码

先简要介绍一下程序。  程序是使用pycharm工具,python语言所写。程序包括客户端 client.py 和服务器端 server.py 两部分,利用了python中的s...

Python实现Mysql数据库连接池实例详解

Python实现Mysql数据库连接池实例详解

python连接Mysql数据库: Python编程中可以使用MySQLdb进行数据库的连接及诸如查询/插入/更新等操作,但是每次连接MySQL数据库请求时,都是独立的去请求访问,相当...