黄金价格消息推送

共计 13202 个字符,预计需要花费 34 分钟才能阅读完成。

一、背景需求
随着这两年黄金价格大涨,大家想实时关注金价但是有时又不能不方便实时查看手机或电脑,需要一个价格提醒的需求,虽然现有很多 app 提供价格提醒,但是可能我接触的少,发现很多软件都不是自己需要的,所以基于自己想法和需求想弄一个价格提醒系统。此程序适合做短线黄金交易。投资有风险,购买请谨慎!
二、技术实现
PS:作为一个白剽党而言想办法节省或者免费才是王道!哈
服务器:本地运行或云服务器域名:(可选)
开发语言:Python
主力开发:AI
消息推送:微信公众号(可申请微信公众平台的测试号 - 免费)
整个程序都是基于半吊子全栈开发“AI”训练而来,历经 15 天左右,因为是业余时间所以比较长哈,现在基本完成也测试了一个月左右,肯定有不尽人意的地方,但是还算满意(声明:本人对于 python 是大白,所以介意者可忽略本文及代码,自行实现)
三、开发思路整理
补充下一个重要说明,微信公众平台测试号有个坑,就是千万不要设置提醒过于频繁,每天最多 100 次左右吧,超过了他回缓存,递增给每天的配额,也就是说如果你一次性发 1000 条消息推送,那未来 10 天你都看不见新消息了,所以千万谨慎设置。
功能特点

  • 自动从新浪财经抓取实时黄金价格(每 5 分钟抓取一次)
  • 可配置的价格预警阈值
  • 通过微信公众号模板消息向用户发送价格提醒
  • 支持价格涨跌预警功能
  • 可配置推送间隔时间和推送次数
  • 支持定时推送(每小时的 01 分和 31 分)
  • 预警推送与定时推送互不影响(不太理想有 bug)
  • 动态更新基准价格(不太理想有 bug)
  • 增强的日志记录和监控
  • 简单的缓存机制提高性能
  • 支持缓存清除功能
  • 支持生成 HTML 文件用于 Web 预览
  • 支持生成 windows 桌面软件


https://gitee.com/hejsky/gold-price

黄金价格消息推送 黄金价格消息推送
代码 access_token.py

import requests
import time
from logger_config import get_logger
from config import APP_ID, APP_SECRET
 
# 获取日志记录器
logger = get_logger(__name__)
 
 
class AccessToken:
    """微信公众号 Access Token 管理类"""
 
    def __init__(self):
        self.access_token = None
        self.token_expire_time = 0
 
    def get_access_token(self):
        """
        获取 access_token
        :return: access_token 或 None
        """
        # 检查 token 是否仍然有效
        if self.access_token and time.time() < self.token_expire_time:
            logger.debug("使用缓存的 Access Token")
            return self.access_token
 
        url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={}'.format(APP_ID, APP_SECRET)
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                          'Chrome/91.0.4472.124 Safari/537.36'
        }
        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()  # 检查 HTTP 状态码
            result = response.json()
 
            if 'access_token' in result:
                self.access_token = result['access_token']
                # 设置过期时间(提前 5 分钟刷新)expires_in = result.get('expires_in', 7200)
                self.token_expire_time = time.time() + expires_in - 300
                logger.info("成功获取新的 Access Token")
                return self.access_token
            else:
                logger.error(f"获取 Access Token 失败: {result}")
                self._reset_token()
                return None
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP 请求异常: {e}")
            self._reset_token()
            return None
        except ValueError as e:
            logger.error(f"响应解析异常: {e}")
            self._reset_token()
            return None
        except Exception as e:
            logger.error(f"获取 Access Token 发生未知异常: {e}")
            self._reset_token()
            return None
             
    def _reset_token(self):
        """重置 token 信息"""
        self.access_token = None
        self.token_expire_time = 0
 
    def refresh_access_token(self):
        """手动刷新 access_token"""
        # 清除当前 token 信息
        self.access_token = None
        self.token_expire_time = 0
        self.get_access_token()

数据抓取来源 data_source.py

# coding: utf-8
"""
黄金价格数据源模块
负责获取和处理黄金价格数据
"""
 
import time
import requests
from logger_config import get_logger
from playwright.sync_api import sync_playwright
from config import PRICE_CACHE_EXPIRATION
 
# 获取日志记录器
logger = get_logger(__name__)
 
# 备选数据源配置
GOLD_PRICE_SOURCES = [
    {
        "name": "新浪财经 - 黄金频道",
        "url": "https://finance.sina.com.cn/nmetal/",
        "method": "playwright"
    },
    {
        "name": "新浪财经 - 黄金期货",
        "url": "https://finance.sina.com.cn/futures/quotes/XAU.shtml",
        "method": "playwright"
    },
    {
        "name": "新浪财经 -API",
        "url": "https://hq.sinajs.cn/list=njs_gold",
        "method": "api"
    }
]
 
# 时间变量
TIMEOUT = 10  # 通用超时时间(秒)RETRY_COUNT = 3  # 重试次数
RETRY_INTERVAL = 2  # 重试间隔(秒)# 缓存机制
class PriceCache:
    """
    价格缓存类,管理价格数据的缓存
    用于减少重复的网络请求,提高性能
    """
    def __init__(self, expiration=PRICE_CACHE_EXPIRATION):
        self._cache = {}  # 缓存字典
        self._expiration = expiration  # 缓存过期时间(秒)def get(self, key):
        """
        获取缓存数据,如果过期则返回 None
        :param key: 缓存键
        :return: 缓存值或 None
        """
        if key in self._cache:
            cached_data = self._cache[key]
            if time.time() < cached_data['expires_at']: return cached_data['value'] else: # 缓存已过期,删除它 del self._cache[key] return None def set(self, key, value): """设置缓存数据 :param key: 缓存键 :param value: 缓存值""" self._cache[key] = {'value': value, 'expires_at': time.time() + self._expiration } def clear(self): """清除所有缓存数据""" cache_size = len(self._cache) self._cache.clear() logger.info(f"缓存已全部清除,共清除 {cache_size} 条记录") def clear_key(self, key): """清除指定键的缓存数据 :param key: 缓存键""" if key in self._cache: del self._cache[key] logger.info(f"已清除缓存键: {key}") return True return False # 创建价格缓存实例 price_cache = PriceCache() # 使用配置文件中的缓存过期时间 def get_gold_price_from_sina_page_playwright(url): """使用 Playwright 从新浪财经页面获取黄金价格 :param url: 数据源 URL :return: 价格数据字典或 None""" browser = None page = None try: with sync_playwright() as p: # 启动浏览器 ( 增加超时时间) browser = p.chromium.launch(headless=True, timeout=15000) page = browser.new_page() # 设置用户代理 page.set_extra_http_headers({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)" "Chrome/120.0.0.0 Safari/537.36" }) # 访问页面 (增加超时时间) page.goto(url, timeout=20000) # 等待元素出现 (增加超时时间) page.wait_for_selector("#realtimeGC", timeout=20000) # 获取价格链接元素,通过 CSS 选择器精确定位 price_link_element = page.locator("#realtimeGC > .r_g_price_c_r > a")
 
            # 获取人民币价格
            price_text = price_link_element.locator(".r_g_price_now").text_content(timeout=10000)
            # 获取价格变化
            price_change_text = price_link_element.locator(".r_g_price_change").text_content(timeout=10000)
 
            # 解析价格文本
            if price_text:
                # 移除可能的空格和换行符
                price_text = price_text.strip()
                try:
                    price = float(price_text)
                    if price <= 0: logger.error("Playwright 获取页面价格数据异常,获取到的价格为 0 或负数") return None # 返回包含所有价格相关信息的对象 result = {"price": price, "change": price_change_text.strip() if price_change_text else "","timestamp": int(time.time()),"readable_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) } return result except ValueError as e: logger.error(f" 价格文本转换为数字失败: {e}, 原始文本: {price_text}") return None else: logger.error(" 无法解析页面价格数据(Playwright 方式),未找到有效价格文本 ") return None except Exception as e: logger.error(f" 使用 Playwright 抓取页面失败: {e}") return None finally: # 确保浏览器和页面总是被关闭 # 注意:在 with sync_playwright() 上下文内,Playwright 会自动关闭资源 # 这里的关闭操作可能会因为事件循环已关闭而失败,所以添加 try-except if page: try: page.close() except Exception as e: logger.debug(f" 关闭页面失败(可能是因为事件循环已关闭): {e}") if browser: try: browser.close() except Exception as e: logger.debug(f" 关闭浏览器失败(可能是因为事件循环已关闭): {e}") def get_gold_price_from_api(url):""" 使用 API 方式获取黄金价格(更轻量级,速度更快):param url: API 数据源 URL :return: 价格数据字典或 None """try: # 发送 HTTP 请求获取数据 response = requests.get(url, timeout=TIMEOUT) response.raise_for_status() # 检查 HTTP 状态码 # 解析新浪财经 API 返回的数据 # 新浪财经 API 返回格式:var hq_str_njs_gold=" 黄金 T +D,453.50,453.50,453.50,453.50,0.00,0.00,09:29:59,2023-12-01"; data = response.text.strip() if data and"="in data and""" in data: # 提取引号内的数据部分 data_part = data.split("=")[1].strip().strip(""") # 分割数据字段 fields = data_part.split(",") if len(fields) >= 8:
                # 新浪财经 API 数据格式:产品名称, 最新价, 开盘价, 最高价, 最低价, 涨跌额, 涨跌幅, 时间, 日期
                price_str = fields[1].strip()
                try:
                    price = float(price_str)
                    if price <= 0: logger.error(f"API 获取价格数据异常,获取到的价格为 0 或负数: {price}") return None # 返回包含所有价格相关信息的对象 result = {"price": price, "change": fields[5].strip() if len(fields) > 5 else "","timestamp": int(time.time()),"readable_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                    }
                    return result
                except ValueError as e:
                    logger.error(f"API 价格文本转换为数字失败: {e}, 原始文本: {price_str}")
                    return None
            else:
                logger.error(f"API 返回数据格式异常,字段数量不足: {fields}")
                return None
        else:
            logger.error(f"API 返回数据格式异常,无法解析: {data}")
            return None
    except Exception as e:
        logger.error(f"使用 API 获取数据失败: {e}")
        return None
 
 
def get_gold_price():
    """
    获取黄金价格的主函数,尝试多个数据源
    使用缓存机制以提高性能
    :return: 黄金价格(元 / 克)或 None
    """
    # 检查缓存
    cached_price = price_cache.get('gold_price')
    if cached_price is not None:
        logger.info("使用缓存的金价数据")
        return cached_price
     
    # 遍历所有备选数据源
    for source in GOLD_PRICE_SOURCES:
        source_name = source["name"]
        source_url = source["url"]
        source_method = source["method"]
         
        logger.info(f"尝试从 {source_name} 获取黄金价格")
         
        # 根据数据源类型选择不同的获取方式
        for retry in range(RETRY_COUNT):
            try:
                if source_method == "api":
                    # 使用 API 方式获取数据(更轻量级)price_data = get_gold_price_from_api(source_url)
                else:
                    # 使用 Playwright 方式获取数据
                    price_data = get_gold_price_from_sina_page_playwright(source_url)
                 
                if price_data is not None:
                    # 更新缓存
                    price_cache.set('gold_price', price_data["price"])
                    logger.info(f"成功获取黄金价格: ¥{price_data['price']}/ 克")
                    return price_data["price"]
                else:
                    logger.warning(f"从 {source_name} 获取数据失败,第 {retry + 1} 次尝试")
                    if retry < RETRY_COUNT - 1:
                        time.sleep(RETRY_INTERVAL)
            except Exception as e:
                logger.error(f"从 {source_name} 获取数据时发生异常: {e}, 第 {retry + 1} 次尝试")
                if retry < RETRY_COUNT - 1: time.sleep(RETRY_INTERVAL) # 所有方案都失败 logger.error("所有数据源获取失败,无法获取黄金价格") return None def display_price_info(price, last_price=None): """显示价格信息和涨跌情况""" # 获取价格涨跌箭头 arrow, direction = get_price_arrow(price, last_price) # 获取当前时间 time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return arrow, direction def get_price_arrow(price, last_price): """根据当前价格与基准价格比较,判断价格上涨或下跌 返回相应的箭头符号和描述文字""" # 如果没有上一次价格记录,则认为是持平 if last_price is None: return "(持平)", "持平" # 判断涨跌 if price > last_price:
        return "(上涨)", "上涨"
    elif price < last_price:
        return "(下跌)", "下跌"
    else:
        return "(持平)", "持平"

主程序 main.py

import time
from datetime import datetime, time as dt_time
 
# 导入配置文件
from config import PUSH_START_TIME, PUSH_END_TIME, REGULAR_PUSH_MINUTES, REGULAR_PUSH_WINDOW, DATA_FETCH_INTERVAL
# 延迟导入 TEST_MODE,需要时重新导入
import importlib
import config
# 导入数据获取和处理模块
from data_source import get_gold_price, display_price_info
# 导入黄金预警管理器
from gold_alert import gold_alert_manager
# 导入消息发送模块
from message import MessageSender
# 导入日志配置
from logger_config import get_logger
# 导入工具函数
from utils import build_message_data, send_push_message, is_push_blocked
 
# 获取日志记录器
logger = get_logger(__name__)
 
# 全局变量
last_price = None  # 记录上一次的价格,用于比较价格变化
first_run = True  # 标记是否为首次运行
 
# 消息发送实例 - 延迟初始化,在 run_gold_price_monitor 函数中创建
message_sender = None
 
 
 
def is_within_push_time():
    """
    检查当前时间是否在推送时间范围内(工作日):return: bool 是否在推送时间范围内
    """
    # 获取当前日期和时间
    current_datetime = datetime.now()
    now = current_datetime.time()
    weekday = current_datetime.weekday()  # 获取星期几,0= 周一,4= 周五,5= 周六,6= 周日
     
    # 检查是否为工作日(周一到周五)if weekday > 4:  # 周六或周日
        logger.debug(f"当前为非工作日({weekday}),跳过推送")
        return False
     
    # 如果是测试模式,记录日志但不忽略时间限制
    if config.TEST_MODE:
        logger.info("[ 测试模式] 按照正常时间范围进行推送")
     
    start_time = dt_time.fromisoformat(PUSH_START_TIME)
    end_time = dt_time.fromisoformat(PUSH_END_TIME)
     
    # 处理跨日期的情况(例如:22:00 到 06:00)if start_time <= end_time:
        # 不跨日期,正常区间判断
        return start_time <= now <= end_time else: # 跨日期,例如 22:00 到 06:00 return now >= start_time or now <= end_time def is_push_minute(): """检查当前时间是否应该进行定期推送 逻辑:1. 检查当前时间是否在配置的推送分钟点的时间窗口内 2. 确保每个推送周期只推送一次 :return: bool 是否为推送分钟""" now = datetime.now() current_hour = now.hour current_minute = now.minute # 获取上次推送的状态 last_push_status = gold_alert_manager.push_status_manager.last_regular_push_minute # 解析上次推送的小时和分钟 if last_push_status > 100:
        last_push_hour = last_push_status // 100
        last_push_min = last_push_status % 100
    else:
        # 兼容旧格式
        last_push_hour = -1
        last_push_min = last_push_status
     
    # 检查当前推送周期是否已经推送过
    # 推送周期是指:当前小时的当前分钟(测试模式下)或配置的推送分钟点(正式模式下)if config.TEST_MODE:
        # 测试模式下,每个小时只推送一次
        current_push_cycle = current_hour
        last_push_cycle = last_push_hour
        logger.info("[ 测试模式] 忽略推送分钟限制,允许随时推送,但每个小时只推送一次")
    else:
        # 正式模式下,检查当前时间是否在配置的推送分钟点的时间窗口内
        # 计算当前应该推送的分钟点
        should_push = False
        target_minute = None
         
        for push_min in REGULAR_PUSH_MINUTES:
            # 计算推送窗口的开始和结束时间
            window_start = push_min
            window_end = push_min + REGULAR_PUSH_WINDOW
             
            # 检查当前分钟是否在推送窗口内
            if window_start <= current_minute <= window_end:
                should_push = True
                target_minute = push_min
                break
         
        if not should_push:
            return False
         
        # 正式模式下,推送周期是当前小时的当前推送分钟点
        current_push_cycle = current_hour * 100 + target_minute
        last_push_cycle = last_push_hour * 100 + last_push_min if last_push_hour != -1 else -1
     
    # 如果当前推送周期已经推送过,直接返回 False
    if current_push_cycle == last_push_cycle:
        logger.debug(f"当前推送周期 {current_push_cycle} 已经推送过,跳过本次推送")
        return False
     
    # 当前时间符合推送条件,允许推送
    return True
 
 
 
def send_regular_price_update(price, arrow):
    """
    发送定期价格更新消息
    :param price: 当前价格
    :param arrow: 价格变化箭头
    """
    # 使用工具函数构建消息数据
    message_data = build_message_data(price, arrow, gold_alert_manager, push_type="regular")
     
    # 使用通用推送函数发送消息
    result = send_push_message(message_sender, message_data, push_type="regular")
    return result['success']
 
 
 
def run_gold_price_monitor():
    """
    运行黄金价格监控循环
    根据需求调整逻辑:1. 每 5 分钟抓取一次黄金价格数据
    2. 在指定时间点(每小时 01 分和 31 分)进行定期推送
    3. 当价格达到预警阈值时立即推送(不影响定时推送规则)4. 生成 HTML 文件用于 Web 预览(根据配置)5. 编译 Windows 运行文件(根据配置)"""
    # 声明全局变量
    global last_price, message_sender, first_run
     
    # 初始化消息发送实例
    if message_sender is None:
        message_sender = MessageSender()
     
    # 编译 Windows 运行文件(根据配置,只执行一次)import config
    if config.GENERATE_TYPE == 2:
        logger.info("开始编译 Windows 运行文件...")
        from windows_compile import WindowsCompiler
        compiler = WindowsCompiler()
        if compiler.compile(onefile=True, console=True):
            logger.info("Windows 运行文件编译成功")
        else:
            logger.error("Windows 运行文件编译失败")
            logger.error("编译失败,程序将停止运行")
            return  # 编译失败,停止程序运行
     
    # 主循环
    while True:
        try:
            # 检查是否被禁止推送
            if is_push_blocked():
                logger.error("当天已被禁止推送,程序将停止运行")
                return  # 被禁止推送,停止程序运行
             
            # 1. 获取当前金价
            price = get_gold_price()
 
            if price is not None:
                # 2. 显示价格信息并获取价格变化趋势
                arrow, direction = display_price_info(price, last_price)
                 
                # 3. 记录当前价格用于下次比较
                last_price = price
                 
                # 4. 获取当前时间信息
                now = datetime.now()
                current_minute = now.minute
                 
                # 5. 保存原始的 dynamic_base_price,用于 HTML 生成
                # 这样可以在预警触发时正确显示预警信息
                original_base_price = gold_alert_manager.dynamic_base_price
                 
                # 6. 检查预警条件(立即推送)# 这会更新 dynamic_base_price
                within_push_time = is_within_push_time()
                if not is_push_blocked() and within_push_time:
                    alert_result = gold_alert_manager.check_alert_conditions(price)
                 
                # 7. 生成 HTML 文件(根据配置,不受时间范围和微信推送逻辑影响)import config
                if config.GENERATE_TYPE == 1:
                    from generate_html import HTMLGenerator
                    html_generator = HTMLGenerator()
                     
                    # 生成 HTML 时使用原始的 base_price,这样可以显示预警信息
                    html_data = {
                        'current_price': price,
                        'last_price': last_price,
                        'base_price': original_base_price
                    }
                    html_path = html_generator.generate_html(html_data)
                    if html_path:
                        logger.info(f"成功生成 HTML 文件: {html_path}")
                    else:
                        logger.error("生成 HTML 文件失败")
                        logger.error("HTML 生成失败,程序将停止运行")
                        return  # HTML 生成失败,停止程序运行
 
                # 8. 检查是否需要进行定期推送
                if within_push_time:
                    # 检查当前是否为推送分钟
                    if is_push_minute():
                        # 发送定期价格更新
                        if send_regular_price_update(price, arrow):
                            # 更新推送状态
                            gold_alert_manager.push_status_manager.reset_regular_push_status(current_minute, price)
                            logger.info(f"定期推送完成,重置推送状态: 分钟 ={current_minute}")
                        else:
                            # 如果定期推送失败,可能表示推送服务出现问题
                            logger.warning("定期推送失败,检查是否被禁止推送")
                            if is_push_blocked():
                                logger.error("当天已被禁止推送,程序将停止运行")
                                return  # 被禁止推送,停止程序运行
                 
                # 9. 每 5 分钟抓取一次数据
                time.sleep(DATA_FETCH_INTERVAL)
            else:
                # 10. 无法获取金价数据时的处理
                logger.error(f"无法获取金价数据,程序将停止运行")
                return  # 无法获取价格数据,停止程序运行
 
        except KeyboardInterrupt:
            # 9. 处理用户中断
            logger.info("程序已退出")
            break
        except Exception as e:
            # 10. 处理其他异常
            logger.error(f"程序运行异常: {e}", exc_info=True)  # 记录完整的异常信息
            # 检查是否被禁止推送
            if is_push_blocked():
                logger.error("当天已被禁止推送,程序将停止运行")
                return
            time.sleep(10)  # 短暂休眠后重试
 
 
if __name__ == "__main__":
    run_gold_price_monitor()

整套程序下载地址:https://pan.quark.cn/s/48dc0f054c6d(含有部署到宝塔教程!)

正文完
 0
suyan
版权声明:本站原创文章,由 suyan 于2026-01-31发表,共计13202字。
转载说明:转载本网站任何内容,请按照转载方式正确书写本站原文地址。本站提供的一切软件、教程和内容信息仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑或手机中彻底删除上述内容。如果您喜欢该程序,请支持正版,购买注册,得到更好的正版服务。如有侵权请邮件QQ邮箱:suyanw520@163.com 与我们联系处理。敬请谅解!
评论(没有评论)
验证码