当前仓库属于暂停状态,部分功能使用受限,详情请查阅 仓库状态说明
1 Star 2 Fork 0

zmr / AutoPubyun
暂停

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
AutoPubyun.py 6.14 KB
一键复制 编辑 原始数据 按行查看 历史
zmr 提交于 2021-03-06 10:36 . 修改bug
import urllib.request
import json
import logging
import os
import time
import threading
import sys
import socket
#import commands
import subprocess
import requests
# 默认配置
defaultConfig = {"DomainNames": {"www.xxxx.com": "Password", "www.xxxx.net": "Password"}, "ExcludeIP": ["192.168.0.21", "192.168.0.22", "192.168.0.11"],"RetryTimes": 3, "ConnectNetworkBeforeRetry": False, "CheckNetworkUrl": "www.baidu.com",
"ConnectNetworkCmd": "rasdial ConnectName UserName Password", "LogLevel": 40}
logFile = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logger.log')
# 配置日志参数
logging.basicConfig(filename=logFile,
format="%(asctime)s \n%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
# filemode='w',
level=40)
# 用户配置(稍后从配置文件AutoPubyun.cfg加载)
userConfig = {}
def log(level, message):
"写日志"
logging.log(level, message)
return
def checkNetwork():
"检查网络是否通畅"
# 打包后不能运行,暂时仅用脚本执行
# 检查宽带连接
log(logging.DEBUG, "【开始】检查宽带连接命令: rasdial")
result = os.popen("rasdial")
r = result.read(3)
log(logging.DEBUG, "【结束】检查宽带连接结果: {0}".format(r))
if ("已连接" not in r):
return False
# 检查网络是否通畅
cmd = 'ping ' + userConfig["CheckNetworkUrl"]
log(logging.DEBUG, "【开始】执行ping命令: {0}".format(cmd))
exit_code = os.system(cmd)
log(logging.DEBUG, "【结束】执行ping命令结果: {0}".format(exit_code))
return exit_code == 0
def tryConnectNetwork():
"尝试拨号连接"
try:
cmd = userConfig["ConnectNetworkCmd"]
log(logging.DEBUG, "执行重连网络命令: {0}".format(cmd))
os.system(cmd)
except BaseException as e:
log(logging.ERROR, "重连网络【异常】{0}".format(e))
return
def checkDDN(domainName, allIP):
"检查域名是否指向正确"
try:
# 获取域名绑定的IP
domainIP = socket.gethostbyname(domainName)
log(logging.DEBUG, "域名【{0}】绑定的IP: {1}".format(domainName, domainIP))
# 需要排除的IP(通常是内网IP)
excludeIPs = userConfig.get("ExcludeIP", [])
# 检查本机IP中是否有域名IP
for ip in allIP:
# 排除内网IP
if (ip in excludeIPs):
continue
if (domainIP == ip):
log(logging.DEBUG, "本机IP: {0}".format(ip))
return True
except BaseException as e:
log(logging.ERROR, "检查域名指向【异常】{0}".format(e))
return False
return False
def updateIP(domainName, password):
"更新域名IP"
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log(logging.DEBUG, "更新域名【开始】{0}\ndomainName = {1}, password = {2}".format(
t, domainName, password))
url = "http://members.3322.net/dyndns/update?system=dyndns&hostname={0}".format(
domainName)
user = "root"
try:
pwdmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pwdmgr.add_password(None, url, user, password)
auth_handler = urllib.request.HTTPBasicAuthHandler(pwdmgr)
opener = urllib.request.build_opener(auth_handler)
response = opener.open(url)
result = response.read().decode('utf-8')
result = str.strip(result)
log(logging.DEBUG, "更新域名{0}【结束】{1}".format(domainName, result))
return True
except BaseException as e:
log(logging.ERROR, "更新域名{0}【异常】{1}".format(domainName, e))
return False
def getAllIP():
"获取本机所有IP4(包括外网IP)"
allIP = []
try:
# 本机IP
allData = socket.getaddrinfo(socket.gethostname(), None)
for data in allData:
af = data[0]
if (af == socket.AF_INET):
localIP4 = data[4][0]
allIP.append(localIP4)
# 外网IP
publicIP = requests.get('http://ifconfig.me/ip', timeout=1).text.strip()
allIP.append(publicIP)
# 去重
allIP = list(set(allIP))
except BaseException as e:
log(logging.ERROR, "获取本机IP【异常】{0}".format(e))
return allIP
return allIP
def run():
"更新所有域名IP"
# 检查并连接网络
if (userConfig["ConnectNetworkBeforeRetry"]):
net = checkNetwork()
if (net == False):
tryConnectNetwork()
# 获取本机所有IP4(包括外网IP)
allIP = getAllIP()
# 更新所有域名
domainNames = userConfig.get("DomainNames", {})
for key in domainNames:
# 检查域名指向是否正确
if (checkDDN(key, allIP) == False):
# 更新域名
result = updateIP(key, domainNames[key])
# 重试
retryTimes = userConfig["RetryTimes"]
while (result == False and retryTimes > 0):
retryTimes -= 1
log(logging.DEBUG, "正在重试: {0}".format(key))
result = updateIP(key, domainNames[key])
return
# 加载配置
log(logging.DEBUG, "加载配置【开始】")
try:
configFile = os.path.join(os.path.dirname(os.path.abspath(__file__)), "AutoPubyun.cfg")
if os.path.exists(configFile) == False:
with open(configFile, "w", encoding="utf8") as f:
json.dump(defaultConfig, f, indent=4)
userConfig.update(defaultConfig)
else:
with open(configFile, 'r', encoding="utf8") as f:
config = json.load(f)
for key in defaultConfig:
userConfig[key] = config.get(key, defaultConfig[key])
except BaseException as e:
log(logging.ERROR, "加载配置【异常】{0}".format(e))
os._exit(0)
log(logging.DEBUG, "加载配置【结束】" + json.dumps(userConfig))
# 配置日志输出级别
logger = logging.getLogger()
logger.setLevel(userConfig.get("LogLevel", logging.ERROR))
# 开始更新操作
log(logging.DEBUG, "【开始】开始运行")
run()
log(logging.DEBUG, "【结束】结束运行\n")
Python
1
https://gitee.com/Zero_gitee/AutoPubyun.git
git@gitee.com:Zero_gitee/AutoPubyun.git
Zero_gitee
AutoPubyun
AutoPubyun
master

搜索帮助