小鹅通 M3U8 视频下载器

共计 8123 个字符,预计需要花费 21 分钟才能阅读完成。

支持自动解密、多线程并发与浏览器联动的本地命令行下载工具,配合浏览器,能够无缝接管课程下载任务,真正进入高效的“边看边存”状态

核心特性
浏览器联动,一键发送:运行后在本地启动 HTTP 服务(端口 8910),配合浏览器脚本,在课程页面点击“发送到下载”即可自动拉起后台下载,零繁琐链接复制,彻底解放双手。
开箱即用,免配依赖:脚本内置环境自检逻辑。首次运行自动检测并安装缺失的 Python 依赖(requests, pycryptodome, tqdm)。
全自动解密与合并,效率翻倍:
自动解密 → 自动获取并解析 AES-128 密钥与 IV,在内存中完成 TS 切片解密。
智能合并 → 极速调用 FFmpeg 将切片合并为 MP4,内置“Stream Copy”与“重编码”双重回退机制,确保视频完美合并。
多线程并发,极速下载:采用 ThreadPoolExecutor 提供多线程并发下载,搭配 tqdm 进度条,直观展示下载进度与切片完成情况。
稳定可靠,日志追溯:提供“控制台 + 本地日志(downloader.log)”双重记录,所有下载历史有迹可循。

重点说明:
// @match        *://*.xiaoeknow.com/*
// @match        *://*.xet.tech/*
// @match        *://*.xiaoe-tech.com/*
// @match        *://*.xet.pomoho.com/*
// @match        *://*.xet-pc.citv.cn/*
因平台支持自定义域名,因此若脚本中不包含所购买的课程的自定义域名,则需要手动进行添加

使用步骤:一、环境与工具准备安装扩展:为实现网页端联动,用户需要手动在浏览器(如 Chrome、Edge)的扩展商店中安装 Tampermonkey(油猴)插件。添加脚本:在油猴插件中,添加并启用文件目录下的网页端用户脚本(用于在页面生成交互按钮)
小鹅通 M3U8 视频下载器

  • 免配置环境:程序已设置好本地 Winpython32-3.8.3.0 环境,无需手动创建环境,无需任何编译,真正做到开箱即用。
  • 启动服务:直接运行工具,程序会自动完成自检。当控制台出现“等待浏览器发送下载任务 …”提示时,代表本地监听服务已成功启动。

二、一键发送任务,全自动下载
保持下载器的命令行窗口在后台运行,切勿关闭
打开课程到播放页面,如需特定清晰度需要手动调整,等获取到视频地址后,直接点击由油猴脚本生成的“发送到下载”按钮。
接收到任务后,下载器将在本地 downloads 文件夹下全自动完成切片下载、密钥解密与最终的 MP4 视频合并导出。
小鹅通 M3U8 视频下载器 小鹅通 M3U8 视频下载器

205931asp5gozj4q1gvu44.webp (69.98 KB, 下载次数: 0)

下载附件    保存到相册

小鹅通  M3U8  视频下载器

昨天 20:59 上传

下载地址:https://wwamb.lanzoul.com/ijN623u0mxji
最低系统可支持到 win7 32 位,若存在不兼容问题可以直接替换环境
[Python]

"""小鹅通 M3U8 视频下载器 — 交互式命令行版"""
import os, time, shutil, subprocess, binascii, re
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from tqdm import tqdm
 
# 强制 stdout 行缓冲,确保 Windows CMD 中实时输出
if hasattr(sys.stdout, 'reconfigure'):
    sys.stdout.reconfigure(line_buffering=True)
 
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
OUT = os.path.join(BASE_DIR, 'downloads')
os.makedirs(OUT, exist_ok=True)
LOG_FILE = os.path.join(BASE_DIR, 'downloader.log')
 
 
def _console_encoding():
    """获取 Windows 控制台实际代码页,避免 UTF-8/GBK 猜错"""
    try:
        import ctypes
        cp = ctypes.windll.kernel32.GetConsoleOutputCP()
        if cp == 65001:
            return 'utf-8'
        elif cp == 936:
            return 'gbk'
        else:
            return f'cp{cp}'
    except Exception:
        return (sys.stdout.encoding or 'utf-8').lower()
 
 
CONSOLE_ENC = _console_encoding()
 
 
def log(msg, end='\n'):
    """同时输出到控制台和日志文件。控制台按实际代码页编码,避免乱码"""
    ts = time.strftime('%H:%M:%S')
    line = f'[{ts}] {msg}'
    # 1. 文件日志(始终可靠)try:
        with open(LOG_FILE, 'a', encoding='utf-8') as f:
            f.write(line)
            if end:
                f.write(end)
            f.flush()
    except Exception:
        pass
    # 2. 控制台:按真实代码页编码
    try:
        data = (line + end).encode(CONSOLE_ENC, errors='replace')
    except Exception:
        data = (line + end).encode('utf-8', errors='replace')
    try:
        os.write(1, data)
    except Exception:
        try:
            os.write(2, data)
        except Exception:
            pass
 
 
FFMPEG = os.path.join(BASE_DIR, 'ffmpeg.exe')
 
 
def safe_name(name):
    """去除文件名中的非法字符"""
    return re.sub(r'[<>:"/\\|?*]','_', name).strip()
 
 
def find_ffmpeg():
    for p in [FFMPEG, shutil.which('ffmpeg'), shutil.which('ffmpeg.exe')]:
        if p and os.path.isfile(p):
            return p
    return None
 
 
def download_one(name, m3u8_url):
    """下载单个视频"""
    s = requests.Session()
    s.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Referer': 'https://xiaoe-tech.com/',
    })
 
    log(f'\n  {name}')
    log(f'{"─"* 50}')
 
    # 1. 下载 M3U8
    log('[1/4] 下载索引...', end=' ')
    try:
        resp = s.get(m3u8_url, timeout=30)
        resp.raise_for_status()
    except Exception as e:
        log(f'失败: {e}')
        return False
    m3u8 = resp.text
    base = '/'.join(m3u8_url.split('/')[:-1]) + '/'
 
    # 2. 解析
    segments = []
    key_url = None
    iv = b'\x00' * 16
    for line in m3u8.split('\n'):
        line = line.strip()
        if 'URI=' in line and 'AES-128' in line:
            a = line.find('URI="') + 5
            b = line.find('"', a)
            key_url = line[a:b]
            if not key_url.startswith('http'):
                key_url = urljoin(base, key_url)
            iv_s = line.find('IV=0x')
            if iv_s != -1:
                iv = binascii.unhexlify(line[iv_s + 5:iv_s + 37])
        elif line and not line.startswith('#'):
            u = line if line.startswith('http') else urljoin(base, line)
            segments.append(u)
 
    log(f'{len(segments)} 片段')
 
    # 3. 获取密钥
    log('[2/4] 获取密钥...', end=' ')
    try:
        key = s.get(key_url, timeout=15).content
        log(f'{len(key)} 字节')
    except Exception as e:
        log(f'失败: {e}')
        return False
 
    # 4. 并行下载
    tmp = os.path.join(OUT, f'tmp_{int(time.time())}')
    os.makedirs(tmp, exist_ok=True)
 
    def dl_one(url, key, iv, idx):
        for _ in range(3):
            try:
                data = s.get(url, timeout=60).content
                break
            except:
                time.sleep(1)
        else:
            return None
        seg_iv = iv[:12] + idx.to_bytes(4, 'big')
        dec = AES.new(key, AES.MODE_CBC, iv=seg_iv).decrypt(data)
        try:
            dec = unpad(dec, AES.block_size)
        except:
            pass
        fp = os.path.join(tmp, f's_{idx:05d}.ts')
        with open(fp, 'wb') as f:
            f.write(dec)
        return fp
 
    t0 = time.time()
    results = {}
    with ThreadPoolExecutor(max_workers=6) as ex:
        fut = {ex.submit(dl_one, u, key, iv, i): i for i, u in enumerate(segments)}
        with tqdm(total=len(segments), desc='[3/4] 下载中', unit='片',
                  ncols=60, bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}') as pbar:
            for f in as_completed(fut):
                if f.result():
                    results[fut[f]] = f.result()
                    pbar.update(1)
    elapsed = time.time() - t0
 
    if not results:
        log('所有片段下载失败')
        shutil.rmtree(tmp, ignore_errors=True)
        return False
 
    # 5. 合并
    log(f'[4/4] 合并 ({elapsed:.1f}s 下载, {len(results)}/{len(segments)} 成功)...', end=' ')
    files = [results[i] for i in sorted(results)]
    output = os.path.join(OUT, f'{safe_name(name)}.mp4')
 
    # Concat 列表
    lst = os.path.join(OUT, '_concat.txt')
    with open(lst, 'w', encoding='utf-8') as f:
        for fp in files:
            f.write(f"file'{fp.replace(os.sep, '/')}'\n")
 
    if os.path.exists(output):
        os.remove(output)
 
    cmd = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0',
           '-i', lst, '-c', 'copy', '-movflags', '+faststart', '-y', output]
    r = subprocess.run(cmd, capture_output=True, encoding='utf-8',
                       errors='replace', timeout=600)
    if os.path.exists(lst):
        os.remove(lst)
 
    if r.returncode != 0:
        # 重编码回退
        lst2 = os.path.join(OUT, '_concat2.txt')
        with open(lst2, 'w', encoding='utf-8') as f:
            for fp in files:
                f.write(f"file'{fp.replace(os.sep, '/')}'\n")
        cmd2 = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0',
                '-i', lst2, '-c:v', 'libx264', '-c:a', 'aac',
                '-movflags', '+faststart', '-y', output]
        subprocess.run(cmd2, capture_output=True, encoding='utf-8',
                       errors='replace', timeout=600)
        if os.path.exists(lst2):
            os.remove(lst2)
 
    shutil.rmtree(tmp, ignore_errors=True)
 
    if os.path.exists(output):
        mb = os.path.getsize(output) / 1048576
        log(f'{mb:.1f} MB  ->  {output}')
        return True
    else:
        log('合并失败')
        return False
 
 
# ============================================================
#  HTTP 服务 — 接收浏览器发来的下载任务
# ============================================================
 
import json
import traceback
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
 
PORT = 8910
total_count = 0
count_lock = threading.Lock()
 
# 线程池:支持同时下载最多 3 个
download_pool = ThreadPoolExecutor(max_workers=3)
 
 
class Handler(BaseHTTPRequestHandler):
    def log_message(self, *args):
        pass
 
    def do_OPTIONS(self):
        log(f'HTTP OPTIONS {self.path}')
        self._cors()
 
    def do_GET(self):
        log(f'HTTP GET {self.path}')
        self._cors()
        self._json({'status': 'running', 'port': PORT, 'total': total_count})
 
    def do_POST(self):
        global total_count
        log(f'HTTP POST {self.path} 开始处理')
        try:
            length = int(self.headers.get('Content-Length', 0))
            raw = self.rfile.read(length)
            data = json.loads(raw)
            name = data.get('name', '').strip()
            url = data.get('url', '').strip()
 
            log(f'HTTP POST {self.path} body_len={length}')
            log(f'JSON: name={name!r} url={url[:60]}...')
 
            if not name or not url:
                log('拒绝任务:name 或 url 为空')
                self._cors()
                self._json({'status': 'error', 'msg': 'missing name or url'})
                return
 
            log(f'>>> 收到任务: [{name}]')
            log(f'URL: {url[:80]}...')
 
            self._cors()
            self._json({'status': 'accepted', 'msg': f'已接收: {name}'})
            log(f'已响应 accepted,提交后台下载...')
 
            download_pool.submit(self._do_download, name, url)
 
        except Exception as e:
            tb = traceback.format_exc()
            log(f'HTTP POST 异常: {e}')
            log(f'异常堆栈: {tb}')
            self._cors()
            self._json({'status': 'error', 'msg': str(e)})
 
    def _do_download(self, name, url):
        """在单独线程中执行下载"""
        global total_count
        log(f'开始下载: [{name}]')
        ok = download_one(name, url)
        with count_lock:
            if ok:
                total_count += 1
            log(f'--- 任务结束: [{name}] {" 成功 "if ok else" 失败 "} (累计: {total_count}) ---')
            log('等待新任务...')
 
    def _cors(self):
        self.send_response(200)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.send_header('Content-Type', 'application/json')
        self.end_headers()
 
    def _json(self, obj):
        body = json.dumps(obj, ensure_ascii=False).encode('utf-8')
        self.wfile.write(body)
 
 
def main():
    ffmpeg = find_ffmpeg()
    if not ffmpeg:
        log('未找到 ffmpeg.exe,请放到本目录下')
        sys.exit(1)
 
    server = HTTPServer(('127.0.0.1', PORT), Handler)
 
    log('=' * 50)
    log('小鹅通 M3U8 视频下载器 启动')
    log(f'ffmpeg: {ffmpeg}')
    log(f'输出:   {OUT}')
    log(f'端口:   localhost:{PORT}')
    log('等待浏览器发送下载任务...')
    log('在课程页面点击" 发送到下载 "即可')
    log('按 Ctrl+C 退出')
    log('=' * 50)
 
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        log('\n 退出。本次共下载 {} 个视频。'.format(total_count))
        server.shutdown()
 
 
if __name__ == '__main__':
    main()

正文完
 0
suyan
版权声明:本站原创文章,由 suyan 于2026-07-03发表,共计8123字。
转载说明:转载本网站任何内容,请按照转载方式正确书写本站原文地址。本站提供的一切软件、教程和内容信息仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑或手机中彻底删除上述内容。如果您喜欢该程序,请支持正版,购买注册,得到更好的正版服务。如有侵权请邮件QQ邮箱:suyanw520@163.com 与我们联系处理。敬请谅解!
评论(没有评论)
验证码