Paramiko是一个用于执行SSH命令的Python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考。
目前本人巡检百台设备完全无压力,如果要巡检过千台则需要多线程的支持,过万台则需要加入智能判断等。
实现命令执行: 直接使用过程化封装,执行CMD命令.
import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.<p>本文来源gao!%daima.com搞$代*!码9网(</p>replace("b'", "").replace("'", "") return result else: return None except Exception: return None
实现磁盘巡检: 获取磁盘空间并返回字典格式
def GetAllDiskSpace(address,username,password,port): ref_dict = {} cmd_dict = {"Linux\n" : "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'", "AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } # 首先检测系统版本 os_version = BatchCMD(address,username,password,port,"uname") for version,run_cmd in cmd_dict.items(): if(version == os_version): # 根据不同版本选择不同的命令 os_ref = BatchCMD(address,username,password,port,run_cmd) ref_list= os_ref.split("\n") # 循环将其转换为字典 for each in ref_list: # 判断最后是否为空,过滤最后一项 if each != "": ref_dict[str(each.split(":")[1])] = str(each.split(":")[0]) return ref_dict # 磁盘巡检总函数 def DiskMain(): with open("db.json", "r", encoding="utf-8") as read_fp: load_json = read_fp.read() js = json.loads(load_json) base = js.get("base") count = len(base) for each in range(0,count): print("\033[37m-\033[0m" * 80) print("\033[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}\033[0m". format(base[each][1],base[each][2],base[each][3],base[each][4])) print("\033[37m-\033[0m" * 80) ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4]) for k,v in ref.items(): # 判断是否存在空盘 if( v.split("%")[0] != "-"): # 将占用百分比转换为整数 space_ret = int(v.split("%")[0]) if space_ret >= 70: print("\033[31m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k,v)) continue if space_ret >= 50: print("\033[33m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k, v)) continue else: print("\033[34m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k, v)) continue print() # 组内传递用户名密码时调用此方法 def GroupDiskMain(address,username,password,port): ref = GetAllDiskSpace(address,username,password,port) for k, v in ref.items(): if (v.split("%")[0] != "-"): space_ret = int(v.split("%")[0]) if space_ret >= 70: print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k, v)) continue if space_ret >= 50: print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k, v)) continue else: print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [正常]".format(k, v)) continue print()