blog

Personal blog
git clone git://git.laack.co/blog.git
Log | Files | Refs

youtube-scraping-only-lengths-comprehensive.py (5574B)


      1 # https://www.kaggle.com/datasets/muhammedtausif/data-science-trends-on-google
      2 import re
      3 import json
      4 import csv
      5 import requests
      6 from concurrent.futures import ThreadPoolExecutor, as_completed
      7 import pandas as pd
      8 
      9 df = pd.read_csv('trends.csv')
     10 
     11 search_terms = df['query'].to_list()
     12 print(len(search_terms))
     13 
     14 session = requests.Session()
     15 
     16 HEADERS = {
     17     "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
     18                   "(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
     19 }
     20 
     21 API_URL = "https://www.youtube.com/youtubei/v1/search"
     22 API_KEY_RE = re.compile(r'"INNERTUBE_API_KEY":"(.*?)"')
     23 CLIENT_CONTEXT = {
     24     "context": {
     25         "client": {
     26             "clientName": "WEB",
     27             "clientVersion": "2.20240722.00.00"
     28         }
     29     }
     30 }
     31 
     32 def parse_videos(obj):
     33     results = []
     34     if isinstance(obj, dict):
     35         if 'videoRenderer' in obj or 'shortsRenderer' in obj:
     36             v = obj.get('videoRenderer') or obj.get('shortsRenderer')
     37             title = v.get('title', {}).get('runs', [{}])[0].get('text')
     38             video_id = v.get('videoId')
     39             length = v.get('lengthText', {}).get('simpleText') if 'lengthText' in v else None
     40             secs = 0
     41             if length:
     42                 parts = length.split(':')
     43                 for p in parts:
     44                     secs = secs * 60 + int(p)
     45             results.append((title, f"https://www.youtube.com/watch?v={video_id}", secs, video_id))
     46         for v in obj.values():
     47             results.extend(parse_videos(v))
     48     elif isinstance(obj, list):
     49         for v in obj:
     50             results.extend(parse_videos(v))
     51     return results
     52 
     53 def fetch_highest_resolution(video_id):
     54     return "Not Used In This Run"
     55 
     56 def fetch_more_results(continuation, api_key):
     57     try:
     58         resp = session.post(
     59             f"{API_URL}?key={api_key}",
     60             headers=HEADERS,
     61             json={**CLIENT_CONTEXT, "continuation": continuation},
     62             timeout=10
     63         )
     64         resp.raise_for_status()
     65         data = resp.json()
     66         videos = parse_videos(data)
     67         cont = None
     68         try:
     69             cont = data['onResponseReceivedCommands'][0]['appendContinuationItemsAction'] \
     70                 ['continuationItems'][-1]['continuationItemRenderer'] \
     71                 ['continuationEndpoint']['continuationCommand']['token']
     72         except Exception:
     73             pass
     74         return cont, videos
     75     except Exception:
     76         return None, []
     77 
     78 def get_recent_videos_for_term(term, max_videos=1000):
     79     url = f"https://www.youtube.com/results?search_query={term.replace(' ', '+')}&sp=CAI%253D"
     80     print(f"\nFetching for term '{term}': {url}")
     81     resp = session.get(url, headers=HEADERS)
     82     html = resp.text
     83 
     84     api_match = API_KEY_RE.search(html)
     85     if not api_match:
     86         print("Could not find API key for term:", term)
     87         return []
     88     api_key = api_match.group(1)
     89 
     90     data_match = re.search(r'var ytInitialData = ({.*?});</script>', html, re.DOTALL)
     91     if not data_match:
     92         print("No ytInitialData found for term:", term)
     93         return []
     94     data = json.loads(data_match.group(1))
     95 
     96     videos = parse_videos(data)
     97 
     98     try:
     99         cont = data['contents']['twoColumnSearchResultsRenderer']['primaryContents'] \
    100             ['sectionListRenderer']['contents'][-1]['continuationItemRenderer'] \
    101             ['continuationEndpoint']['continuationCommand']['token']
    102     except Exception:
    103         cont = None
    104 
    105     while cont and len(videos) < max_videos:
    106         cont, more = fetch_more_results(cont, api_key)
    107         for vid in more:
    108             if len(videos) < max_videos:
    109                 videos.append(vid)
    110         print(f"  Got {len(videos)} so far for '{term}'...")
    111 
    112     final_videos = []
    113     with ThreadPoolExecutor(max_workers=200) as executor:  
    114         future_to_vid = {executor.submit(fetch_highest_resolution, vid_id): (title, url, secs)
    115                          for title, url, secs, vid_id in videos[:max_videos]}
    116         for future in as_completed(future_to_vid):
    117             title, url, secs = future_to_vid[future]
    118             res = future.result()
    119             final_videos.append((title, url, secs, res))
    120 
    121     return final_videos
    122 
    123 all_rows = []
    124 MAX_CONCURRENT_TERMS = 200
    125 
    126 with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TERMS) as executor:
    127     future_to_term = {executor.submit(get_recent_videos_for_term, term, 1000): term for term in search_terms}
    128     for future in as_completed(future_to_term):
    129         term = future_to_term[future]
    130         try:
    131             vids = future.result()
    132             if vids:
    133                 avg = sum(v[2] for v in vids) / len(vids)
    134                 print(f"Average for '{term}': {avg/60:.2f} minutes over {len(vids)} videos")
    135                 for title, url, secs, res in vids:
    136                     all_rows.append((term, title, url, secs, res))
    137             else:
    138                 print(f"No videos found for '{term}'.")
    139         except Exception as e:
    140             print(f"Error fetching term '{term}': {e}")
    141 
    142 with open("results-only-lengths-comp.csv", "w", newline="", encoding="utf-8") as f:
    143     writer = csv.writer(f)
    144     writer.writerow(["term", "title", "url", "duration_seconds", "highest_resolution"])
    145 
    146     for future in as_completed(future_to_term):
    147         term = future_to_term[future]
    148         try:
    149             vids = future.result()
    150             if vids:
    151                 for title, url, secs, res in vids:
    152                     writer.writerow((term, title, url, secs, res))
    153         except Exception as e:
    154             print(f"Error fetching term '{term}': {e}")
    155 print("\nWrote results to results.csv")