背景

一开始是通过Api获取数据,但是最近他们增加X-Gnarly参数,而且在github上没有找有效的方案后,放弃api请求,改用页面爬取的方式。彻底避免参数加密校验。

我的环境

    python 3.11 
    selenium 4.39.0
    playwright 1.57.0

评论页面

实现啦抓取第一页和第二页的评论,你们要是抓更多页可以吧第二页改成循环。
执行脚本后会在当前目录生成一份json文件,里面是/api/comment/list/接口返回的数据。

 python3.11 comment_scraper.py "@mahi.islam.oliva/video/7565942090039954706"

image.png

代码如下:

import json
import time
import sys
import base64
import re,os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import argparse



def merge_comments(first_page, second_page):
    """合并两页的评论数据"""
    merged_data = first_page.copy()
    if 'comments' in second_page:
        if 'comments' not in merged_data:
            merged_data['comments'] = []
        merged_data['comments'].extend(second_page['comments'])
    return merged_data

def extract_tiktok_filename(path: str) -> str:
    """
    从 TikTok 路径(如 '@username/video/123456')中提取 'username_123456'
    支持带或不带 @、带 URL 等情况
    """
    # 匹配模式:可选的 @ + 用户名(字母数字下划线.)+ /video/ + 数字ID
    match = re.search(r'@?([\w.]+)/video/(\d{16,})', path)
    if match:
        username = match.group(1)
        video_id = match.group(2)
        return f"{username}_{video_id}"
    else:
        # 如果格式不符,回退到清理后的通用方式
        safe = re.sub(r'[\\/:*?"<>|\s]+', '_', path.strip('@/'))
        return safe[:100]


class TiktokScraper:
    def __init__(self):
        self.comments_data = []
        self.setup_driver()


    def setup_driver(self):

        chrome_options = Options()
        chrome_options.set_capability("goog:loggingPrefs", {"performance": "ALL"})

        chrome_options.add_argument("--start-maximized")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--headless=new")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-blink-features=AutomationControlled")
        chrome_options.add_argument("--disable-infobars")
        chrome_options.add_argument("--disable-extensions")
        chrome_options.add_argument("--disable-gpu")  # 减少 WebGL 差异(可选)
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)

        user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
        chrome_options.add_argument('user-agent={0}'.format(user_agent))

        self.driver = webdriver.Chrome(options=chrome_options)

        self.driver.execute_cdp_cmd("Emulation.setDeviceMetricsOverride", {
            "width": 1440,
            "height": 900,
            "deviceScaleFactor": 2,  # macOS Retina
            "mobile": False
        })

        # 覆盖 WebGL 参数(关键!)
        self.driver.execute_cdp_cmd("Emulation.setHardwareConcurrencyOverride", {"hardwareConcurrency": 8})
        # 1. 设置基础 UA(CDP 安全方式)
        self.driver.execute_cdp_cmd("Emulation.setUserAgentOverride", {
            "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "platform": "MacIntel"
        })

        # 2. 用 JS 覆盖高级指纹(包括 userAgentData)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": """
#             delete navigator.__proto__.webdriver;

            Object.defineProperty(navigator, 'platform', { get: () => 'MacIntel' });
            Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });

            // 伪造 userAgentData
            if (!navigator.userAgentData) {
                Object.defineProperty(navigator, 'userAgentData', {
                    value: {
                        brands: [
                            { brand: "Chromium", version: "120" },
                            { brand: "Google Chrome", version: "120" },
                            { brand: "Not:A-Brand", version: "99" }
                        ],
                        mobile: false,
                        platform: "macOS",
                        getHighEntropyValues: async (hints) => ({
                            architecture: "x86_64",
                            model: "",
                            platform: "macOS",
                            platformVersion: "13.5",
                            uaFullVersion: "120.0.6099.0"
                        })
                    },
                    writable: false,
                    configurable: false
                });
            }
            """
        })

        # 覆盖 WebGL 渲染器(防指纹关键)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": """
            const getParameter = WebGLRenderingContext.prototype.getParameter;
            WebGLRenderingContext.prototype.getParameter = function(param) {
                if (param === 37445) return 'Apple Inc.'; // UNMASKED_VENDOR_WEBGL
                if (param === 37446) return 'Apple GPU';   // UNMASKED_RENDERER_WEBGL
                return getParameter.call(this, param);
            };
            """
        })
        self.driver.execute_cdp_cmd("Emulation.setTimezoneOverride", {"timezoneId": "America/New_York"})
        self.driver.execute_cdp_cmd("Emulation.setLocaleOverride", {"locale": "en-US"})

    def extract_comment_response_from_logs(self):
        """从 performance 日志中提取评论 API 的完整响应"""
        try:
            logs = self.driver.get_log("performance")
        except Exception as e:
            print(f"获取日志失败: {e}")
            return None

        request_id_to_url = {}
        finished_request_ids = set()

        for entry in logs:
            try:
                message = json.loads(entry["message"])
                method = message.get("message", {}).get("method")
                params = message.get("message", {}).get("params", {})

                if method == "Network.responseReceived":
                    url = params.get("response", {}).get("url", "")
                    request_id = params.get("requestId")
                    if request_id and re.search(r'comment.*list|comments.*aweme', url, re.I):
                        request_id_to_url[request_id] = url

                elif method == "Network.loadingFinished":
                    request_id = params.get("requestId")
                    if request_id:
                        finished_request_ids.add(request_id)
            except Exception:
                continue

        for req_id, url in request_id_to_url.items():
            if req_id in finished_request_ids:
                try:
                    body = self.driver.execute_cdp_cmd(
                        "Network.getResponseBody",
                        {"requestId": req_id}
                    )
                    raw = body.get("body", "{}")
                    if body.get("base64Encoded"):
                        raw = base64.b64decode(raw).decode("utf-8")
                    data = json.loads(raw)
                    if isinstance(data, dict) and ("comments" in data or "item_comments" in data):
                        print(f"✅ 捕获评论接口: {url}")
                        return data
                except Exception as e:
                    print(f"获取响应体失败 (req_id={req_id}): {e}")

        return None

    def scroll_comment_section(self):
        """在 .TUXTabBar-content 内部查找并滚动真正的评论列表容器"""
        script = """
            const tabContent = document.querySelector('.TUXTabBar-content');
            if (!tabContent) {
                console.log('❌ .TUXTabBar-content not found');
                return false;
            }

            // 获取所有子 div
            const candidates = Array.from(tabContent.querySelectorAll('div'));

            // 按 DOM 层级深度排序(优先选深层级的,通常是列表)
            candidates.sort((a, b) => {
                let depthA = 0, depthB = 0;
                let p = a; while (p && p !== tabContent) { depthA++; p = p.parentElement; }
                p = b; while (p && p !== tabContent) { depthB++; p = p.parentElement; }
                return depthB - depthA; // 深的优先
            });

            for (const el of candidates) {
                const style = window.getComputedStyle(el);
                const overflowY = style.overflowY;
                // 必须满足:可滚动 + 有溢出内容
                if ((overflowY === 'auto' || overflowY === 'scroll') &&
                    el.scrollHeight > el.clientHeight) {
                    el.scrollTop = el.scrollHeight+100;
                    console.log('✅ Scrolled real comment container');
                    return true;
                }
            }

            console.log('⚠️ No scrollable child found in .TUXTabBar-content');
            return false;
        """
        try:
            result = self.driver.execute_script(script)
            return result is True
        except Exception as e:
            print(f"滚动执行异常: {e}")
            return False

    def auto_play_and_load_more_comments(self, user_input):

        url = 'https://www.tiktok.com/' + user_input
        print(f"打开视频页面: {url}")
        self.driver.get(url)
        wait = WebDriverWait(self.driver, 20)
        # wait.until(EC.presence_of_element_located((By.TAG_NAME, "video")))
        # 等待评论tab加载完毕
        # wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.TUXTabBar-list")))
        wait.until(EC.presence_of_element_located((By.XPATH, "//span[@data-e2e='comment-icon']")))

        print("视频评论已加载")

        # 点击评论按钮
        try:
            comment_span = wait.until(
                EC.element_to_be_clickable((By.XPATH, '//span[@data-e2e="comment-icon"]'))
            )
            print("正在点击评论图标 (span[@data-e2e='comment-icon'])...")
            self.driver.execute_script("arguments[0].click();", comment_span)
        except Exception as e:
            print(f"无法点击评论按钮: {e}")
            return

#         time.sleep(2)
#         debug_prefix = extract_tiktok_filename(user_input)
#         try:
#             # 保存 HTML
#             with open(f"{debug_prefix}_after_click.html", "w", encoding="utf-8") as f:
#                 f.write(self.driver.page_source)
#             print(f"页面 HTML 已保存: {debug_prefix}_after_click.html")
#
#             # 保存截图
#             self.driver.save_screenshot(f"{debug_prefix}_after_click.png")
#             print(f"页面截图已保存: {debug_prefix}_after_click.png")
#         except Exception as e:
#             print(f"保存调试文件失败: {e}")

        # 加载第一页评论
        first_page_data = self.wait_for_comments(10)
        if not first_page_data:
            print("未捕获到第一页评论")
            return
        self.comments_data.append(first_page_data)

        # 模拟滚动加载更多评论
        # self.driver.execute_script("document.querySelector('.TUXTabBar-content').scrollTo(0, document.querySelector('.TUXTabBar-content').scrollHeight);")
        # 改为调用新方法
        time.sleep(1)
        if self.scroll_comment_section():
            print("已滚动加载更多评论...")
            time.sleep(1)  # 等待新评论加载
        else:
            print("无法滚动评论区,可能结构变化")

        # 加载第二页评论
        second_page_data = self.wait_for_comments(10)
        if second_page_data:
            # 假设每页返回的数据结构相似,合并 comments 字段
            merged_comments = merge_comments(first_page_data, second_page_data)
        else:
            merged_comments = first_page_data
            print("未捕获到第二页评论")


        filename = f"{extract_tiktok_filename(user_input)}.json"
        print(filename)
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(merged_comments, f, ensure_ascii=False, indent=2)
        print(f"评论数据已保存到: {filename}")
        print(f"   共 {len(merged_comments.get('comments', []))} 条评论")

    def wait_for_comments(self, timeout_seconds=10):
        """等待并捕获评论API响应"""
        start_time = time.time()
        while time.time() - start_time < timeout_seconds:
            comment_data = self.extract_comment_response_from_logs()
            if comment_data:
                return comment_data
            time.sleep(0.5)
        return None

    def close(self):
        if hasattr(self, "driver"):
            self.driver.quit()


def main():
    parser = argparse.ArgumentParser(
        description="Scrape TikTok comments via /api/comment/list/ ")
    parser.add_argument(
        "video_input",
        help="TikTok video URL or video_id, e.g., '/@user/video/7318855966163275054' "
    )
    args = parser.parse_args()

    video_input = args.video_input.strip()
    print(video_input)

    if not video_input:
        print("Error: Video input cannot be empty")
        sys.exit(1)


    scraper = TiktokScraper()
    try:
        scraper.auto_play_and_load_more_comments(video_input)
        time.sleep(1)  # 保持窗口打开以便观察
    finally:
        scraper.close()

    sys.exit(0)

if __name__ == "__main__":
    main()

用户页面发布的视频

这里只实现啦只第一页接口的数据, /api/post/item_list/把这个接口的数据放到啦一个json文件中。
这个页面我做了根据cookie的登陆,其实不登陆应该也可以。cookie 文件是通过chrome扩展 Cookies.txt 生成。登陆TikTok后点击这个扩展下载文件下来就行。
image.png

python3.11 post_item_list.py @dlw2026
image.png

post_item_list.py 代码如下:

# scraper.py
import asyncio
import json
import sys
import argparse
from playwright.async_api import async_playwright
from cookies import load_cookies_safely


# 这是用来抓取用户主页的 /api/post/item_list/


async def scrape_tiktok_user(username):
    target_responses = []
    clean_username = username.lstrip("@")
    output_json = clean_username + "_posts.json"
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(
            viewport={"width": 1920, "height": 1080},
            user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
        )

        cookies = load_cookies_safely()
        await context.add_cookies(cookies)
        page = await context.new_page()

         # 隐藏自动化特征
        await page.add_init_script("""
            // 隐藏 webdriver 标志
            delete navigator.__proto__.webdriver;
            window.chrome = { runtime: {} };
            // 伪造 platform 为 Mac
            Object.defineProperty(navigator, 'platform', {
                get: () => 'MacIntel'
            });
            // 伪造 userAgentData(高熵指纹)
            if (!navigator.userAgentData) {
                Object.defineProperty(navigator, 'userAgentData', {
                    value: {
                        brands: [
                            { brand: "Chromium", version: "120" },
                            { brand: "Google Chrome", version: "120" },
                            { brand: "Not:A-Brand", version: "99" }
                        ],
                        mobile: false,
                        platform: "macOS",
                        getHighEntropyValues: async (hints) => ({
                            architecture: "x86_64",
                            model: "",
                            platform: "macOS",
                            platformVersion: "13.5",
                            uaFullVersion: "120.0.6099.0"
                        })
                    },
                    writable: false,
                    configurable: false
                });
            }
        """)

        # ✅ 关键:宽松匹配 API(不再检查 content-type)
        def handle_response(response):
            url = response.url
            if (
                    "/api/post/item_list/" in url
                    and response.status == 200
                    and "tiktok.com" in url
            ):
                if not target_responses:
                    target_responses.append(response)
                    print(f"捕获 API: {url.split('?')[0]}")

        page.on("response", handle_response)

        url = f"https://www.tiktok.com/{username}"
        print(f"打开页面: {url}")
        await page.goto(url, wait_until="domcontentloaded", timeout=50000)

        # 等待用户信息出现
        try:
            await page.wait_for_selector('h1[data-e2e="user-title"]', timeout=15000)
            print("用户主页加载成功")
        except:
            print("用户信息未加载,继续尝试...")

        # 滚动一下,触发懒加载(重要!)
        await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 3)")

        # 等待 API(最多 20 秒)
        for i in range(40):
            if target_responses:
                break
            if i % 10 == 0:
                print(f"⏳ 等待 API 中... ({i * 0.5}s)")
            await asyncio.sleep(0.5)

        api_data = None
        if target_responses:
            try:
                api_data = await target_responses[0].json()
                print("✅ 成功解析 JSON 数据")
            except Exception as e:
                # 如果 .json() 失败,可能是 text/plain,手动解析
                try:
                    text = await target_responses[0].text()
                    api_data = json.loads(text)
                    print("✅ 通过 .text() 成功解析 JSON")
                except:
                    print(f"❌ 完全无法解析响应: {e}")

        if api_data:
            with open(output_json, "w", encoding="utf-8") as f:
                json.dump(api_data, f, ensure_ascii=False, indent=2)
            items = api_data.get("itemList", [])
            print(f"抓取到 {len(items)} 个视频,已保存至 {output_json}")
        else:
            print("未捕获到任何 API 数据")
            # 调试:打印所有请求(可选)
#             await page.route("**/*", lambda route: print("REQ:", route.request.url) or route.continue_())
#         screenshot_path = f"{clean_username}_homepage.png"
#         await page.screenshot(path=screenshot_path, full_page=True)
#         print(f"已保存页面截图: {screenshot_path}")

        await page.wait_for_timeout(5000)
        await browser.close()
        if api_data:
            return True
        else:
            return False


def main():
    parser = argparse.ArgumentParser(description="Scrape TikTok user profile")
    parser.add_argument("username", help="TikTok username (with or without @), e.g., @dishilife or dishilife")
    args = parser.parse_args()

    username = args.username.strip()
    if not username:
        print("Error: Username cannot be empty")
        sys.exit(1)
    if not username.startswith('@'):
        username = '@' + username
    success = asyncio.run(scrape_tiktok_user(username))
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()

cookies.py脚本:

import os
from datetime import datetime



COOKIES_FILE = "cookies.txt"

def load_cookies_safely():
    filepath = COOKIES_FILE
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"❌ Cookie 文件不存在: {os.path.abspath(filepath)}")

    cookies = []
    current_ts = int(datetime.now().timestamp())
    tiktok_domains = {".tiktok.com", "www.tiktok.com"}

    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            parts = line.split("\t")
            if len(parts) < 7:
                continue

            domain = parts[0]
            if domain.startswith("#HttpOnly_"):
                domain = domain[len("#HttpOnly_"):]
            if not domain.startswith("."):
                domain = "." + domain.lstrip(".")

            if not any(t in domain for t in tiktok_domains):
                continue

            cookie = {
                "name": parts[5],
                "value": parts[6],
                "domain": domain,
                "path": parts[2],
                "secure": parts[3].upper() == "TRUE",
            }

            expires_str = parts[4]
            if expires_str.isdigit():
                expires = int(expires_str)
                if expires > current_ts:
                    cookie["expires"] = expires

            cookies.append(cookie)

    if not cookies:
        raise ValueError("❌ 未加载有效 Cookie!请确认包含 sessionid。")
    return cookies

if __name__ == "__main__":
    print('不可以直接执行')

标签: tiktok, python, selenium, Web Scraping, Playwright, Automation, Browser Automation, Data Crawling, Comments Scraper, Anti-detection

添加新评论