blog

Personal blog
git clone git://git.laack.co/blog.git
Log | Files | Refs

yt-lots.py (6190B)


      1 # https://www.kaggle.com/datasets/muhammedtausif/data-science-trends-on-google
      2 import re
      3 import json
      4 import csv
      5 import requests
      6 from concurrent.futures import ThreadPoolExecutor, as_completed
      7 import pandas as pd
      8 
      9 df = pd.read_csv('trends.csv')
     10 search_terms = df['query'].to_list()
     11 print(len(search_terms))
     12 
     13 session = requests.Session()
     14 
     15 HEADERS = {
     16     "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
     17                   "(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
     18 }
     19 
     20 API_URL = "https://www.youtube.com/youtubei/v1/search"
     21 API_KEY_RE = re.compile(r'"INNERTUBE_API_KEY":"(.*?)"')
     22 CLIENT_CONTEXT = {
     23     "context": {
     24         "client": {
     25             "clientName": "WEB",
     26             "clientVersion": "2.20240722.00.00"
     27         }
     28     }
     29 }
     30 
     31 def parse_videos(obj):
     32     results = []
     33     if isinstance(obj, dict):
     34         if 'videoRenderer' in obj or 'shortsRenderer' in obj:
     35             v = obj.get('videoRenderer') or obj.get('shortsRenderer')
     36             title = v.get('title', {}).get('runs', [{}])[0].get('text')
     37             video_id = v.get('videoId')
     38             length = v.get('lengthText', {}).get('simpleText') if 'lengthText' in v else None
     39             secs = 0
     40             if length:
     41                 parts = length.split(':')
     42                 for p in parts:
     43                     secs = secs * 60 + int(p)
     44             results.append((title, f"https://www.youtube.com/watch?v={video_id}", secs, video_id))
     45         for v in obj.values():
     46             results.extend(parse_videos(v))
     47     elif isinstance(obj, list):
     48         for v in obj:
     49             results.extend(parse_videos(v))
     50     return results
     51 
     52 def fetch_resolution_and_bitrate(video_id):
     53 #    return "Not including"
     54     url = f"https://www.youtube.com/watch?v={video_id}"
     55     try:
     56         resp = session.get(url, headers=HEADERS, timeout=10)
     57         resp.raise_for_status()
     58         match = re.search(r'ytInitialPlayerResponse\s*=\s*({.*?});', resp.text)
     59         if not match:
     60             return "N/A", "N/A"
     61         data = json.loads(match.group(1))
     62         formats = data.get("streamingData", {}).get("formats", [])
     63         max_res = 0
     64         max_bitrate = 0
     65         for f in formats:
     66             if "height" in f and f["height"] > max_res:
     67                 max_res = f["height"]
     68             if "bitrate" in f and f["bitrate"] > max_bitrate:
     69                 max_bitrate = f["bitrate"]
     70         return (f"{max_res}p" if max_res else "N/A",
     71                 f"{round(max_bitrate/1000)} kbps" if max_bitrate else "N/A")
     72     except Exception:
     73         return "N/A", "N/A"
     74 
     75 def fetch_more_results(continuation, api_key):
     76     try:
     77         resp = session.post(
     78             f"{API_URL}?key={api_key}",
     79             headers=HEADERS,
     80             json={**CLIENT_CONTEXT, "continuation": continuation},
     81             timeout=10
     82         )
     83         resp.raise_for_status()
     84         data = resp.json()
     85         videos = parse_videos(data)
     86         cont = None
     87         try:
     88             cont = data['onResponseReceivedCommands'][0]['appendContinuationItemsAction'] \
     89                 ['continuationItems'][-1]['continuationItemRenderer'] \
     90                 ['continuationEndpoint']['continuationCommand']['token']
     91         except Exception:
     92             pass
     93         return cont, videos
     94     except Exception:
     95         return None, []
     96 
     97 def get_recent_videos_for_term(term, max_videos=10000):
     98     url = f"https://www.youtube.com/results?search_query={term.replace(' ', '+')}&sp=CAI%253D"
     99     print(f"\nFetching for term '{term}': {url}")
    100     resp = session.get(url, headers=HEADERS)
    101     html = resp.text
    102 
    103     api_match = API_KEY_RE.search(html)
    104     if not api_match:
    105         print("Could not find API key for term:", term)
    106         return []
    107     api_key = api_match.group(1)
    108 
    109     data_match = re.search(r'var ytInitialData = ({.*?});</script>', html, re.DOTALL)
    110     if not data_match:
    111         print("No ytInitialData found for term:", term)
    112         return []
    113     data = json.loads(data_match.group(1))
    114 
    115     videos = parse_videos(data)
    116 
    117     try:
    118         cont = data['contents']['twoColumnSearchResultsRenderer']['primaryContents'] \
    119             ['sectionListRenderer']['contents'][-1]['continuationItemRenderer'] \
    120             ['continuationEndpoint']['continuationCommand']['token']
    121     except Exception:
    122         cont = None
    123 
    124     while cont and len(videos) < max_videos:
    125         cont, more = fetch_more_results(cont, api_key)
    126         for vid in more:
    127             if len(videos) < max_videos:
    128                 videos.append(vid)
    129         print(f"  Got {len(videos)} so far for '{term}'...")
    130 
    131     final_videos = []
    132     with ThreadPoolExecutor(max_workers=200) as executor:  
    133         future_to_vid = {executor.submit(fetch_resolution_and_bitrate, vid_id): (title, url, secs)
    134                          for title, url, secs, vid_id in videos[:max_videos]}
    135         for future in as_completed(future_to_vid):
    136             title, url, secs = future_to_vid[future]
    137             res, bitrate = future.result()
    138             final_videos.append((title, url, secs, res, bitrate))
    139 
    140     return final_videos
    141 
    142 MAX_CONCURRENT_TERMS = 200
    143 
    144 with open("lots-without-res.csv", "w", newline="", encoding="utf-8") as f:
    145     writer = csv.writer(f)
    146     writer.writerow(["term", "title", "url", "duration_seconds", "highest_resolution", "max_bitrate"])
    147 
    148     with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TERMS) as executor:
    149         future_to_term = {
    150             executor.submit(get_recent_videos_for_term, term, 10000): term
    151             for term in search_terms
    152         }
    153 
    154         for future in as_completed(future_to_term):
    155             term = future_to_term[future]
    156             try:
    157                 vids = future.result()
    158                 if vids:
    159                     avg = sum(v[2] for v in vids) / len(vids)
    160                     print(f"Average for '{term}': {avg/60:.2f} minutes over {len(vids)} videos")
    161 
    162                     rows = [(term, title, url, secs, res, bitrate) for title, url, secs, res, bitrate in vids]
    163                     writer.writerows(rows)
    164                     f.flush()
    165                 else:
    166                     print(f"No videos found for '{term}'.")
    167             except Exception as e:
    168                 print(f"Error fetching term '{term}': {e}")