共计 13202 个字符,预计需要花费 34 分钟才能阅读完成。
一、背景需求
随着这两年黄金价格大涨,大家想实时关注金价但是有时又不能不方便实时查看手机或电脑,需要一个价格提醒的需求,虽然现有很多 app 提供价格提醒,但是可能我接触的少,发现很多软件都不是自己需要的,所以基于自己想法和需求想弄一个价格提醒系统。此程序适合做短线黄金交易。投资有风险,购买请谨慎!
二、技术实现
PS:作为一个白剽党而言想办法节省或者免费才是王道!哈
服务器:本地运行或云服务器域名:(可选)
开发语言:Python
主力开发:AI
消息推送:微信公众号(可申请微信公众平台的测试号 - 免费)
整个程序都是基于半吊子全栈开发“AI”训练而来,历经 15 天左右,因为是业余时间所以比较长哈,现在基本完成也测试了一个月左右,肯定有不尽人意的地方,但是还算满意(声明:本人对于 python 是大白,所以介意者可忽略本文及代码,自行实现)
三、开发思路整理
补充下一个重要说明,微信公众平台测试号有个坑,就是千万不要设置提醒过于频繁,每天最多 100 次左右吧,超过了他回缓存,递增给每天的配额,也就是说如果你一次性发 1000 条消息推送,那未来 10 天你都看不见新消息了,所以千万谨慎设置。
功能特点
- 自动从新浪财经抓取实时黄金价格(每 5 分钟抓取一次)
- 可配置的价格预警阈值
- 通过微信公众号模板消息向用户发送价格提醒
- 支持价格涨跌预警功能
- 可配置推送间隔时间和推送次数
- 支持定时推送(每小时的 01 分和 31 分)
- 预警推送与定时推送互不影响(不太理想有 bug)
- 动态更新基准价格(不太理想有 bug)
- 增强的日志记录和监控
- 简单的缓存机制提高性能
- 支持缓存清除功能
- 支持生成 HTML 文件用于 Web 预览
- 支持生成 windows 桌面软件
https://gitee.com/hejsky/gold-price

代码 access_token.py
import requests
import time
from logger_config import get_logger
from config import APP_ID, APP_SECRET
# 获取日志记录器
logger = get_logger(__name__)
class AccessToken:
"""微信公众号 Access Token 管理类"""
def __init__(self):
self.access_token = None
self.token_expire_time = 0
def get_access_token(self):
"""
获取 access_token
:return: access_token 或 None
"""
# 检查 token 是否仍然有效
if self.access_token and time.time() < self.token_expire_time:
logger.debug("使用缓存的 Access Token")
return self.access_token
url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={}'.format(APP_ID, APP_SECRET)
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
'Chrome/91.0.4472.124 Safari/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # 检查 HTTP 状态码
result = response.json()
if 'access_token' in result:
self.access_token = result['access_token']
# 设置过期时间(提前 5 分钟刷新)expires_in = result.get('expires_in', 7200)
self.token_expire_time = time.time() + expires_in - 300
logger.info("成功获取新的 Access Token")
return self.access_token
else:
logger.error(f"获取 Access Token 失败: {result}")
self._reset_token()
return None
except requests.exceptions.RequestException as e:
logger.error(f"HTTP 请求异常: {e}")
self._reset_token()
return None
except ValueError as e:
logger.error(f"响应解析异常: {e}")
self._reset_token()
return None
except Exception as e:
logger.error(f"获取 Access Token 发生未知异常: {e}")
self._reset_token()
return None
def _reset_token(self):
"""重置 token 信息"""
self.access_token = None
self.token_expire_time = 0
def refresh_access_token(self):
"""手动刷新 access_token"""
# 清除当前 token 信息
self.access_token = None
self.token_expire_time = 0
self.get_access_token()
数据抓取来源 data_source.py
# coding: utf-8
"""
黄金价格数据源模块
负责获取和处理黄金价格数据
"""
import time
import requests
from logger_config import get_logger
from playwright.sync_api import sync_playwright
from config import PRICE_CACHE_EXPIRATION
# 获取日志记录器
logger = get_logger(__name__)
# 备选数据源配置
GOLD_PRICE_SOURCES = [
{
"name": "新浪财经 - 黄金频道",
"url": "https://finance.sina.com.cn/nmetal/",
"method": "playwright"
},
{
"name": "新浪财经 - 黄金期货",
"url": "https://finance.sina.com.cn/futures/quotes/XAU.shtml",
"method": "playwright"
},
{
"name": "新浪财经 -API",
"url": "https://hq.sinajs.cn/list=njs_gold",
"method": "api"
}
]
# 时间变量
TIMEOUT = 10 # 通用超时时间(秒)RETRY_COUNT = 3 # 重试次数
RETRY_INTERVAL = 2 # 重试间隔(秒)# 缓存机制
class PriceCache:
"""
价格缓存类,管理价格数据的缓存
用于减少重复的网络请求,提高性能
"""
def __init__(self, expiration=PRICE_CACHE_EXPIRATION):
self._cache = {} # 缓存字典
self._expiration = expiration # 缓存过期时间(秒)def get(self, key):
"""
获取缓存数据,如果过期则返回 None
:param key: 缓存键
:return: 缓存值或 None
"""
if key in self._cache:
cached_data = self._cache[key]
if time.time() < cached_data['expires_at']: return cached_data['value'] else: # 缓存已过期,删除它 del self._cache[key] return None def set(self, key, value): """设置缓存数据 :param key: 缓存键 :param value: 缓存值""" self._cache[key] = {'value': value, 'expires_at': time.time() + self._expiration } def clear(self): """清除所有缓存数据""" cache_size = len(self._cache) self._cache.clear() logger.info(f"缓存已全部清除,共清除 {cache_size} 条记录") def clear_key(self, key): """清除指定键的缓存数据 :param key: 缓存键""" if key in self._cache: del self._cache[key] logger.info(f"已清除缓存键: {key}") return True return False # 创建价格缓存实例 price_cache = PriceCache() # 使用配置文件中的缓存过期时间 def get_gold_price_from_sina_page_playwright(url): """使用 Playwright 从新浪财经页面获取黄金价格 :param url: 数据源 URL :return: 价格数据字典或 None""" browser = None page = None try: with sync_playwright() as p: # 启动浏览器 ( 增加超时时间) browser = p.chromium.launch(headless=True, timeout=15000) page = browser.new_page() # 设置用户代理 page.set_extra_http_headers({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)" "Chrome/120.0.0.0 Safari/537.36" }) # 访问页面 (增加超时时间) page.goto(url, timeout=20000) # 等待元素出现 (增加超时时间) page.wait_for_selector("#realtimeGC", timeout=20000) # 获取价格链接元素,通过 CSS 选择器精确定位 price_link_element = page.locator("#realtimeGC > .r_g_price_c_r > a")
# 获取人民币价格
price_text = price_link_element.locator(".r_g_price_now").text_content(timeout=10000)
# 获取价格变化
price_change_text = price_link_element.locator(".r_g_price_change").text_content(timeout=10000)
# 解析价格文本
if price_text:
# 移除可能的空格和换行符
price_text = price_text.strip()
try:
price = float(price_text)
if price <= 0: logger.error("Playwright 获取页面价格数据异常,获取到的价格为 0 或负数") return None # 返回包含所有价格相关信息的对象 result = {"price": price, "change": price_change_text.strip() if price_change_text else "","timestamp": int(time.time()),"readable_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) } return result except ValueError as e: logger.error(f" 价格文本转换为数字失败: {e}, 原始文本: {price_text}") return None else: logger.error(" 无法解析页面价格数据(Playwright 方式),未找到有效价格文本 ") return None except Exception as e: logger.error(f" 使用 Playwright 抓取页面失败: {e}") return None finally: # 确保浏览器和页面总是被关闭 # 注意:在 with sync_playwright() 上下文内,Playwright 会自动关闭资源 # 这里的关闭操作可能会因为事件循环已关闭而失败,所以添加 try-except if page: try: page.close() except Exception as e: logger.debug(f" 关闭页面失败(可能是因为事件循环已关闭): {e}") if browser: try: browser.close() except Exception as e: logger.debug(f" 关闭浏览器失败(可能是因为事件循环已关闭): {e}") def get_gold_price_from_api(url):""" 使用 API 方式获取黄金价格(更轻量级,速度更快):param url: API 数据源 URL :return: 价格数据字典或 None """try: # 发送 HTTP 请求获取数据 response = requests.get(url, timeout=TIMEOUT) response.raise_for_status() # 检查 HTTP 状态码 # 解析新浪财经 API 返回的数据 # 新浪财经 API 返回格式:var hq_str_njs_gold=" 黄金 T +D,453.50,453.50,453.50,453.50,0.00,0.00,09:29:59,2023-12-01"; data = response.text.strip() if data and"="in data and""" in data: # 提取引号内的数据部分 data_part = data.split("=")[1].strip().strip(""") # 分割数据字段 fields = data_part.split(",") if len(fields) >= 8:
# 新浪财经 API 数据格式:产品名称, 最新价, 开盘价, 最高价, 最低价, 涨跌额, 涨跌幅, 时间, 日期
price_str = fields[1].strip()
try:
price = float(price_str)
if price <= 0: logger.error(f"API 获取价格数据异常,获取到的价格为 0 或负数: {price}") return None # 返回包含所有价格相关信息的对象 result = {"price": price, "change": fields[5].strip() if len(fields) > 5 else "","timestamp": int(time.time()),"readable_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
return result
except ValueError as e:
logger.error(f"API 价格文本转换为数字失败: {e}, 原始文本: {price_str}")
return None
else:
logger.error(f"API 返回数据格式异常,字段数量不足: {fields}")
return None
else:
logger.error(f"API 返回数据格式异常,无法解析: {data}")
return None
except Exception as e:
logger.error(f"使用 API 获取数据失败: {e}")
return None
def get_gold_price():
"""
获取黄金价格的主函数,尝试多个数据源
使用缓存机制以提高性能
:return: 黄金价格(元 / 克)或 None
"""
# 检查缓存
cached_price = price_cache.get('gold_price')
if cached_price is not None:
logger.info("使用缓存的金价数据")
return cached_price
# 遍历所有备选数据源
for source in GOLD_PRICE_SOURCES:
source_name = source["name"]
source_url = source["url"]
source_method = source["method"]
logger.info(f"尝试从 {source_name} 获取黄金价格")
# 根据数据源类型选择不同的获取方式
for retry in range(RETRY_COUNT):
try:
if source_method == "api":
# 使用 API 方式获取数据(更轻量级)price_data = get_gold_price_from_api(source_url)
else:
# 使用 Playwright 方式获取数据
price_data = get_gold_price_from_sina_page_playwright(source_url)
if price_data is not None:
# 更新缓存
price_cache.set('gold_price', price_data["price"])
logger.info(f"成功获取黄金价格: ¥{price_data['price']}/ 克")
return price_data["price"]
else:
logger.warning(f"从 {source_name} 获取数据失败,第 {retry + 1} 次尝试")
if retry < RETRY_COUNT - 1:
time.sleep(RETRY_INTERVAL)
except Exception as e:
logger.error(f"从 {source_name} 获取数据时发生异常: {e}, 第 {retry + 1} 次尝试")
if retry < RETRY_COUNT - 1: time.sleep(RETRY_INTERVAL) # 所有方案都失败 logger.error("所有数据源获取失败,无法获取黄金价格") return None def display_price_info(price, last_price=None): """显示价格信息和涨跌情况""" # 获取价格涨跌箭头 arrow, direction = get_price_arrow(price, last_price) # 获取当前时间 time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return arrow, direction def get_price_arrow(price, last_price): """根据当前价格与基准价格比较,判断价格上涨或下跌 返回相应的箭头符号和描述文字""" # 如果没有上一次价格记录,则认为是持平 if last_price is None: return "(持平)", "持平" # 判断涨跌 if price > last_price:
return "(上涨)", "上涨"
elif price < last_price:
return "(下跌)", "下跌"
else:
return "(持平)", "持平"
主程序 main.py
import time
from datetime import datetime, time as dt_time
# 导入配置文件
from config import PUSH_START_TIME, PUSH_END_TIME, REGULAR_PUSH_MINUTES, REGULAR_PUSH_WINDOW, DATA_FETCH_INTERVAL
# 延迟导入 TEST_MODE,需要时重新导入
import importlib
import config
# 导入数据获取和处理模块
from data_source import get_gold_price, display_price_info
# 导入黄金预警管理器
from gold_alert import gold_alert_manager
# 导入消息发送模块
from message import MessageSender
# 导入日志配置
from logger_config import get_logger
# 导入工具函数
from utils import build_message_data, send_push_message, is_push_blocked
# 获取日志记录器
logger = get_logger(__name__)
# 全局变量
last_price = None # 记录上一次的价格,用于比较价格变化
first_run = True # 标记是否为首次运行
# 消息发送实例 - 延迟初始化,在 run_gold_price_monitor 函数中创建
message_sender = None
def is_within_push_time():
"""
检查当前时间是否在推送时间范围内(工作日):return: bool 是否在推送时间范围内
"""
# 获取当前日期和时间
current_datetime = datetime.now()
now = current_datetime.time()
weekday = current_datetime.weekday() # 获取星期几,0= 周一,4= 周五,5= 周六,6= 周日
# 检查是否为工作日(周一到周五)if weekday > 4: # 周六或周日
logger.debug(f"当前为非工作日({weekday}),跳过推送")
return False
# 如果是测试模式,记录日志但不忽略时间限制
if config.TEST_MODE:
logger.info("[ 测试模式] 按照正常时间范围进行推送")
start_time = dt_time.fromisoformat(PUSH_START_TIME)
end_time = dt_time.fromisoformat(PUSH_END_TIME)
# 处理跨日期的情况(例如:22:00 到 06:00)if start_time <= end_time:
# 不跨日期,正常区间判断
return start_time <= now <= end_time else: # 跨日期,例如 22:00 到 06:00 return now >= start_time or now <= end_time def is_push_minute(): """检查当前时间是否应该进行定期推送 逻辑:1. 检查当前时间是否在配置的推送分钟点的时间窗口内 2. 确保每个推送周期只推送一次 :return: bool 是否为推送分钟""" now = datetime.now() current_hour = now.hour current_minute = now.minute # 获取上次推送的状态 last_push_status = gold_alert_manager.push_status_manager.last_regular_push_minute # 解析上次推送的小时和分钟 if last_push_status > 100:
last_push_hour = last_push_status // 100
last_push_min = last_push_status % 100
else:
# 兼容旧格式
last_push_hour = -1
last_push_min = last_push_status
# 检查当前推送周期是否已经推送过
# 推送周期是指:当前小时的当前分钟(测试模式下)或配置的推送分钟点(正式模式下)if config.TEST_MODE:
# 测试模式下,每个小时只推送一次
current_push_cycle = current_hour
last_push_cycle = last_push_hour
logger.info("[ 测试模式] 忽略推送分钟限制,允许随时推送,但每个小时只推送一次")
else:
# 正式模式下,检查当前时间是否在配置的推送分钟点的时间窗口内
# 计算当前应该推送的分钟点
should_push = False
target_minute = None
for push_min in REGULAR_PUSH_MINUTES:
# 计算推送窗口的开始和结束时间
window_start = push_min
window_end = push_min + REGULAR_PUSH_WINDOW
# 检查当前分钟是否在推送窗口内
if window_start <= current_minute <= window_end:
should_push = True
target_minute = push_min
break
if not should_push:
return False
# 正式模式下,推送周期是当前小时的当前推送分钟点
current_push_cycle = current_hour * 100 + target_minute
last_push_cycle = last_push_hour * 100 + last_push_min if last_push_hour != -1 else -1
# 如果当前推送周期已经推送过,直接返回 False
if current_push_cycle == last_push_cycle:
logger.debug(f"当前推送周期 {current_push_cycle} 已经推送过,跳过本次推送")
return False
# 当前时间符合推送条件,允许推送
return True
def send_regular_price_update(price, arrow):
"""
发送定期价格更新消息
:param price: 当前价格
:param arrow: 价格变化箭头
"""
# 使用工具函数构建消息数据
message_data = build_message_data(price, arrow, gold_alert_manager, push_type="regular")
# 使用通用推送函数发送消息
result = send_push_message(message_sender, message_data, push_type="regular")
return result['success']
def run_gold_price_monitor():
"""
运行黄金价格监控循环
根据需求调整逻辑:1. 每 5 分钟抓取一次黄金价格数据
2. 在指定时间点(每小时 01 分和 31 分)进行定期推送
3. 当价格达到预警阈值时立即推送(不影响定时推送规则)4. 生成 HTML 文件用于 Web 预览(根据配置)5. 编译 Windows 运行文件(根据配置)"""
# 声明全局变量
global last_price, message_sender, first_run
# 初始化消息发送实例
if message_sender is None:
message_sender = MessageSender()
# 编译 Windows 运行文件(根据配置,只执行一次)import config
if config.GENERATE_TYPE == 2:
logger.info("开始编译 Windows 运行文件...")
from windows_compile import WindowsCompiler
compiler = WindowsCompiler()
if compiler.compile(onefile=True, console=True):
logger.info("Windows 运行文件编译成功")
else:
logger.error("Windows 运行文件编译失败")
logger.error("编译失败,程序将停止运行")
return # 编译失败,停止程序运行
# 主循环
while True:
try:
# 检查是否被禁止推送
if is_push_blocked():
logger.error("当天已被禁止推送,程序将停止运行")
return # 被禁止推送,停止程序运行
# 1. 获取当前金价
price = get_gold_price()
if price is not None:
# 2. 显示价格信息并获取价格变化趋势
arrow, direction = display_price_info(price, last_price)
# 3. 记录当前价格用于下次比较
last_price = price
# 4. 获取当前时间信息
now = datetime.now()
current_minute = now.minute
# 5. 保存原始的 dynamic_base_price,用于 HTML 生成
# 这样可以在预警触发时正确显示预警信息
original_base_price = gold_alert_manager.dynamic_base_price
# 6. 检查预警条件(立即推送)# 这会更新 dynamic_base_price
within_push_time = is_within_push_time()
if not is_push_blocked() and within_push_time:
alert_result = gold_alert_manager.check_alert_conditions(price)
# 7. 生成 HTML 文件(根据配置,不受时间范围和微信推送逻辑影响)import config
if config.GENERATE_TYPE == 1:
from generate_html import HTMLGenerator
html_generator = HTMLGenerator()
# 生成 HTML 时使用原始的 base_price,这样可以显示预警信息
html_data = {
'current_price': price,
'last_price': last_price,
'base_price': original_base_price
}
html_path = html_generator.generate_html(html_data)
if html_path:
logger.info(f"成功生成 HTML 文件: {html_path}")
else:
logger.error("生成 HTML 文件失败")
logger.error("HTML 生成失败,程序将停止运行")
return # HTML 生成失败,停止程序运行
# 8. 检查是否需要进行定期推送
if within_push_time:
# 检查当前是否为推送分钟
if is_push_minute():
# 发送定期价格更新
if send_regular_price_update(price, arrow):
# 更新推送状态
gold_alert_manager.push_status_manager.reset_regular_push_status(current_minute, price)
logger.info(f"定期推送完成,重置推送状态: 分钟 ={current_minute}")
else:
# 如果定期推送失败,可能表示推送服务出现问题
logger.warning("定期推送失败,检查是否被禁止推送")
if is_push_blocked():
logger.error("当天已被禁止推送,程序将停止运行")
return # 被禁止推送,停止程序运行
# 9. 每 5 分钟抓取一次数据
time.sleep(DATA_FETCH_INTERVAL)
else:
# 10. 无法获取金价数据时的处理
logger.error(f"无法获取金价数据,程序将停止运行")
return # 无法获取价格数据,停止程序运行
except KeyboardInterrupt:
# 9. 处理用户中断
logger.info("程序已退出")
break
except Exception as e:
# 10. 处理其他异常
logger.error(f"程序运行异常: {e}", exc_info=True) # 记录完整的异常信息
# 检查是否被禁止推送
if is_push_blocked():
logger.error("当天已被禁止推送,程序将停止运行")
return
time.sleep(10) # 短暂休眠后重试
if __name__ == "__main__":
run_gold_price_monitor()
整套程序下载地址:https://pan.quark.cn/s/48dc0f054c6d(含有部署到宝塔教程!)
正文完
