共计 8123 个字符,预计需要花费 21 分钟才能阅读完成。
支持自动解密、多线程并发与浏览器联动的本地命令行下载工具,配合浏览器,能够无缝接管课程下载任务,真正进入高效的“边看边存”状态
核心特性
浏览器联动,一键发送:运行后在本地启动 HTTP 服务(端口 8910),配合浏览器脚本,在课程页面点击“发送到下载”即可自动拉起后台下载,零繁琐链接复制,彻底解放双手。
开箱即用,免配依赖:脚本内置环境自检逻辑。首次运行自动检测并安装缺失的 Python 依赖(requests, pycryptodome, tqdm)。
全自动解密与合并,效率翻倍:
自动解密 → 自动获取并解析 AES-128 密钥与 IV,在内存中完成 TS 切片解密。
智能合并 → 极速调用 FFmpeg 将切片合并为 MP4,内置“Stream Copy”与“重编码”双重回退机制,确保视频完美合并。
多线程并发,极速下载:采用 ThreadPoolExecutor 提供多线程并发下载,搭配 tqdm 进度条,直观展示下载进度与切片完成情况。
稳定可靠,日志追溯:提供“控制台 + 本地日志(downloader.log)”双重记录,所有下载历史有迹可循。
重点说明:
// @match *://*.xiaoeknow.com/*
// @match *://*.xet.tech/*
// @match *://*.xiaoe-tech.com/*
// @match *://*.xet.pomoho.com/*
// @match *://*.xet-pc.citv.cn/*
因平台支持自定义域名,因此若脚本中不包含所购买的课程的自定义域名,则需要手动进行添加
使用步骤:一、环境与工具准备安装扩展:为实现网页端联动,用户需要手动在浏览器(如 Chrome、Edge)的扩展商店中安装 Tampermonkey(油猴)插件。添加脚本:在油猴插件中,添加并启用文件目录下的网页端用户脚本(用于在页面生成交互按钮)

- 免配置环境:程序已设置好本地 Winpython32-3.8.3.0 环境,无需手动创建环境,无需任何编译,真正做到开箱即用。
- 启动服务:直接运行工具,程序会自动完成自检。当控制台出现“等待浏览器发送下载任务 …”提示时,代表本地监听服务已成功启动。
二、一键发送任务,全自动下载
保持下载器的命令行窗口在后台运行,切勿关闭。
打开课程到播放页面,如需特定清晰度需要手动调整,等获取到视频地址后,直接点击由油猴脚本生成的“发送到下载”按钮。
接收到任务后,下载器将在本地 downloads 文件夹下全自动完成切片下载、密钥解密与最终的 MP4 视频合并导出。

下载地址:https://wwamb.lanzoul.com/ijN623u0mxji
最低系统可支持到 win7 32 位,若存在不兼容问题可以直接替换环境
[Python]
"""小鹅通 M3U8 视频下载器 — 交互式命令行版"""
import os, time, shutil, subprocess, binascii, re
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from tqdm import tqdm
# 强制 stdout 行缓冲,确保 Windows CMD 中实时输出
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(line_buffering=True)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
OUT = os.path.join(BASE_DIR, 'downloads')
os.makedirs(OUT, exist_ok=True)
LOG_FILE = os.path.join(BASE_DIR, 'downloader.log')
def _console_encoding():
"""获取 Windows 控制台实际代码页,避免 UTF-8/GBK 猜错"""
try:
import ctypes
cp = ctypes.windll.kernel32.GetConsoleOutputCP()
if cp == 65001:
return 'utf-8'
elif cp == 936:
return 'gbk'
else:
return f'cp{cp}'
except Exception:
return (sys.stdout.encoding or 'utf-8').lower()
CONSOLE_ENC = _console_encoding()
def log(msg, end='\n'):
"""同时输出到控制台和日志文件。控制台按实际代码页编码,避免乱码"""
ts = time.strftime('%H:%M:%S')
line = f'[{ts}] {msg}'
# 1. 文件日志(始终可靠)try:
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(line)
if end:
f.write(end)
f.flush()
except Exception:
pass
# 2. 控制台:按真实代码页编码
try:
data = (line + end).encode(CONSOLE_ENC, errors='replace')
except Exception:
data = (line + end).encode('utf-8', errors='replace')
try:
os.write(1, data)
except Exception:
try:
os.write(2, data)
except Exception:
pass
FFMPEG = os.path.join(BASE_DIR, 'ffmpeg.exe')
def safe_name(name):
"""去除文件名中的非法字符"""
return re.sub(r'[<>:"/\\|?*]','_', name).strip()
def find_ffmpeg():
for p in [FFMPEG, shutil.which('ffmpeg'), shutil.which('ffmpeg.exe')]:
if p and os.path.isfile(p):
return p
return None
def download_one(name, m3u8_url):
"""下载单个视频"""
s = requests.Session()
s.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://xiaoe-tech.com/',
})
log(f'\n {name}')
log(f'{"─"* 50}')
# 1. 下载 M3U8
log('[1/4] 下载索引...', end=' ')
try:
resp = s.get(m3u8_url, timeout=30)
resp.raise_for_status()
except Exception as e:
log(f'失败: {e}')
return False
m3u8 = resp.text
base = '/'.join(m3u8_url.split('/')[:-1]) + '/'
# 2. 解析
segments = []
key_url = None
iv = b'\x00' * 16
for line in m3u8.split('\n'):
line = line.strip()
if 'URI=' in line and 'AES-128' in line:
a = line.find('URI="') + 5
b = line.find('"', a)
key_url = line[a:b]
if not key_url.startswith('http'):
key_url = urljoin(base, key_url)
iv_s = line.find('IV=0x')
if iv_s != -1:
iv = binascii.unhexlify(line[iv_s + 5:iv_s + 37])
elif line and not line.startswith('#'):
u = line if line.startswith('http') else urljoin(base, line)
segments.append(u)
log(f'{len(segments)} 片段')
# 3. 获取密钥
log('[2/4] 获取密钥...', end=' ')
try:
key = s.get(key_url, timeout=15).content
log(f'{len(key)} 字节')
except Exception as e:
log(f'失败: {e}')
return False
# 4. 并行下载
tmp = os.path.join(OUT, f'tmp_{int(time.time())}')
os.makedirs(tmp, exist_ok=True)
def dl_one(url, key, iv, idx):
for _ in range(3):
try:
data = s.get(url, timeout=60).content
break
except:
time.sleep(1)
else:
return None
seg_iv = iv[:12] + idx.to_bytes(4, 'big')
dec = AES.new(key, AES.MODE_CBC, iv=seg_iv).decrypt(data)
try:
dec = unpad(dec, AES.block_size)
except:
pass
fp = os.path.join(tmp, f's_{idx:05d}.ts')
with open(fp, 'wb') as f:
f.write(dec)
return fp
t0 = time.time()
results = {}
with ThreadPoolExecutor(max_workers=6) as ex:
fut = {ex.submit(dl_one, u, key, iv, i): i for i, u in enumerate(segments)}
with tqdm(total=len(segments), desc='[3/4] 下载中', unit='片',
ncols=60, bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}') as pbar:
for f in as_completed(fut):
if f.result():
results[fut[f]] = f.result()
pbar.update(1)
elapsed = time.time() - t0
if not results:
log('所有片段下载失败')
shutil.rmtree(tmp, ignore_errors=True)
return False
# 5. 合并
log(f'[4/4] 合并 ({elapsed:.1f}s 下载, {len(results)}/{len(segments)} 成功)...', end=' ')
files = [results[i] for i in sorted(results)]
output = os.path.join(OUT, f'{safe_name(name)}.mp4')
# Concat 列表
lst = os.path.join(OUT, '_concat.txt')
with open(lst, 'w', encoding='utf-8') as f:
for fp in files:
f.write(f"file'{fp.replace(os.sep, '/')}'\n")
if os.path.exists(output):
os.remove(output)
cmd = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0',
'-i', lst, '-c', 'copy', '-movflags', '+faststart', '-y', output]
r = subprocess.run(cmd, capture_output=True, encoding='utf-8',
errors='replace', timeout=600)
if os.path.exists(lst):
os.remove(lst)
if r.returncode != 0:
# 重编码回退
lst2 = os.path.join(OUT, '_concat2.txt')
with open(lst2, 'w', encoding='utf-8') as f:
for fp in files:
f.write(f"file'{fp.replace(os.sep, '/')}'\n")
cmd2 = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0',
'-i', lst2, '-c:v', 'libx264', '-c:a', 'aac',
'-movflags', '+faststart', '-y', output]
subprocess.run(cmd2, capture_output=True, encoding='utf-8',
errors='replace', timeout=600)
if os.path.exists(lst2):
os.remove(lst2)
shutil.rmtree(tmp, ignore_errors=True)
if os.path.exists(output):
mb = os.path.getsize(output) / 1048576
log(f'{mb:.1f} MB -> {output}')
return True
else:
log('合并失败')
return False
# ============================================================
# HTTP 服务 — 接收浏览器发来的下载任务
# ============================================================
import json
import traceback
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
PORT = 8910
total_count = 0
count_lock = threading.Lock()
# 线程池:支持同时下载最多 3 个
download_pool = ThreadPoolExecutor(max_workers=3)
class Handler(BaseHTTPRequestHandler):
def log_message(self, *args):
pass
def do_OPTIONS(self):
log(f'HTTP OPTIONS {self.path}')
self._cors()
def do_GET(self):
log(f'HTTP GET {self.path}')
self._cors()
self._json({'status': 'running', 'port': PORT, 'total': total_count})
def do_POST(self):
global total_count
log(f'HTTP POST {self.path} 开始处理')
try:
length = int(self.headers.get('Content-Length', 0))
raw = self.rfile.read(length)
data = json.loads(raw)
name = data.get('name', '').strip()
url = data.get('url', '').strip()
log(f'HTTP POST {self.path} body_len={length}')
log(f'JSON: name={name!r} url={url[:60]}...')
if not name or not url:
log('拒绝任务:name 或 url 为空')
self._cors()
self._json({'status': 'error', 'msg': 'missing name or url'})
return
log(f'>>> 收到任务: [{name}]')
log(f'URL: {url[:80]}...')
self._cors()
self._json({'status': 'accepted', 'msg': f'已接收: {name}'})
log(f'已响应 accepted,提交后台下载...')
download_pool.submit(self._do_download, name, url)
except Exception as e:
tb = traceback.format_exc()
log(f'HTTP POST 异常: {e}')
log(f'异常堆栈: {tb}')
self._cors()
self._json({'status': 'error', 'msg': str(e)})
def _do_download(self, name, url):
"""在单独线程中执行下载"""
global total_count
log(f'开始下载: [{name}]')
ok = download_one(name, url)
with count_lock:
if ok:
total_count += 1
log(f'--- 任务结束: [{name}] {" 成功 "if ok else" 失败 "} (累计: {total_count}) ---')
log('等待新任务...')
def _cors(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.send_header('Content-Type', 'application/json')
self.end_headers()
def _json(self, obj):
body = json.dumps(obj, ensure_ascii=False).encode('utf-8')
self.wfile.write(body)
def main():
ffmpeg = find_ffmpeg()
if not ffmpeg:
log('未找到 ffmpeg.exe,请放到本目录下')
sys.exit(1)
server = HTTPServer(('127.0.0.1', PORT), Handler)
log('=' * 50)
log('小鹅通 M3U8 视频下载器 启动')
log(f'ffmpeg: {ffmpeg}')
log(f'输出: {OUT}')
log(f'端口: localhost:{PORT}')
log('等待浏览器发送下载任务...')
log('在课程页面点击" 发送到下载 "即可')
log('按 Ctrl+C 退出')
log('=' * 50)
try:
server.serve_forever()
except KeyboardInterrupt:
log('\n 退出。本次共下载 {} 个视频。'.format(total_count))
server.shutdown()
if __name__ == '__main__':
main()
