yt-lots.py (6190B)
1 # https://www.kaggle.com/datasets/muhammedtausif/data-science-trends-on-google 2 import re 3 import json 4 import csv 5 import requests 6 from concurrent.futures import ThreadPoolExecutor, as_completed 7 import pandas as pd 8 9 df = pd.read_csv('trends.csv') 10 search_terms = df['query'].to_list() 11 print(len(search_terms)) 12 13 session = requests.Session() 14 15 HEADERS = { 16 "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " 17 "(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" 18 } 19 20 API_URL = "https://www.youtube.com/youtubei/v1/search" 21 API_KEY_RE = re.compile(r'"INNERTUBE_API_KEY":"(.*?)"') 22 CLIENT_CONTEXT = { 23 "context": { 24 "client": { 25 "clientName": "WEB", 26 "clientVersion": "2.20240722.00.00" 27 } 28 } 29 } 30 31 def parse_videos(obj): 32 results = [] 33 if isinstance(obj, dict): 34 if 'videoRenderer' in obj or 'shortsRenderer' in obj: 35 v = obj.get('videoRenderer') or obj.get('shortsRenderer') 36 title = v.get('title', {}).get('runs', [{}])[0].get('text') 37 video_id = v.get('videoId') 38 length = v.get('lengthText', {}).get('simpleText') if 'lengthText' in v else None 39 secs = 0 40 if length: 41 parts = length.split(':') 42 for p in parts: 43 secs = secs * 60 + int(p) 44 results.append((title, f"https://www.youtube.com/watch?v={video_id}", secs, video_id)) 45 for v in obj.values(): 46 results.extend(parse_videos(v)) 47 elif isinstance(obj, list): 48 for v in obj: 49 results.extend(parse_videos(v)) 50 return results 51 52 def fetch_resolution_and_bitrate(video_id): 53 # return "Not including" 54 url = f"https://www.youtube.com/watch?v={video_id}" 55 try: 56 resp = session.get(url, headers=HEADERS, timeout=10) 57 resp.raise_for_status() 58 match = re.search(r'ytInitialPlayerResponse\s*=\s*({.*?});', resp.text) 59 if not match: 60 return "N/A", "N/A" 61 data = json.loads(match.group(1)) 62 formats = data.get("streamingData", {}).get("formats", []) 63 max_res = 0 64 max_bitrate = 0 65 for f in formats: 66 if "height" in f and f["height"] > max_res: 67 max_res = f["height"] 68 if "bitrate" in f and f["bitrate"] > max_bitrate: 69 max_bitrate = f["bitrate"] 70 return (f"{max_res}p" if max_res else "N/A", 71 f"{round(max_bitrate/1000)} kbps" if max_bitrate else "N/A") 72 except Exception: 73 return "N/A", "N/A" 74 75 def fetch_more_results(continuation, api_key): 76 try: 77 resp = session.post( 78 f"{API_URL}?key={api_key}", 79 headers=HEADERS, 80 json={**CLIENT_CONTEXT, "continuation": continuation}, 81 timeout=10 82 ) 83 resp.raise_for_status() 84 data = resp.json() 85 videos = parse_videos(data) 86 cont = None 87 try: 88 cont = data['onResponseReceivedCommands'][0]['appendContinuationItemsAction'] \ 89 ['continuationItems'][-1]['continuationItemRenderer'] \ 90 ['continuationEndpoint']['continuationCommand']['token'] 91 except Exception: 92 pass 93 return cont, videos 94 except Exception: 95 return None, [] 96 97 def get_recent_videos_for_term(term, max_videos=10000): 98 url = f"https://www.youtube.com/results?search_query={term.replace(' ', '+')}&sp=CAI%253D" 99 print(f"\nFetching for term '{term}': {url}") 100 resp = session.get(url, headers=HEADERS) 101 html = resp.text 102 103 api_match = API_KEY_RE.search(html) 104 if not api_match: 105 print("Could not find API key for term:", term) 106 return [] 107 api_key = api_match.group(1) 108 109 data_match = re.search(r'var ytInitialData = ({.*?});</script>', html, re.DOTALL) 110 if not data_match: 111 print("No ytInitialData found for term:", term) 112 return [] 113 data = json.loads(data_match.group(1)) 114 115 videos = parse_videos(data) 116 117 try: 118 cont = data['contents']['twoColumnSearchResultsRenderer']['primaryContents'] \ 119 ['sectionListRenderer']['contents'][-1]['continuationItemRenderer'] \ 120 ['continuationEndpoint']['continuationCommand']['token'] 121 except Exception: 122 cont = None 123 124 while cont and len(videos) < max_videos: 125 cont, more = fetch_more_results(cont, api_key) 126 for vid in more: 127 if len(videos) < max_videos: 128 videos.append(vid) 129 print(f" Got {len(videos)} so far for '{term}'...") 130 131 final_videos = [] 132 with ThreadPoolExecutor(max_workers=200) as executor: 133 future_to_vid = {executor.submit(fetch_resolution_and_bitrate, vid_id): (title, url, secs) 134 for title, url, secs, vid_id in videos[:max_videos]} 135 for future in as_completed(future_to_vid): 136 title, url, secs = future_to_vid[future] 137 res, bitrate = future.result() 138 final_videos.append((title, url, secs, res, bitrate)) 139 140 return final_videos 141 142 MAX_CONCURRENT_TERMS = 200 143 144 with open("lots-without-res.csv", "w", newline="", encoding="utf-8") as f: 145 writer = csv.writer(f) 146 writer.writerow(["term", "title", "url", "duration_seconds", "highest_resolution", "max_bitrate"]) 147 148 with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TERMS) as executor: 149 future_to_term = { 150 executor.submit(get_recent_videos_for_term, term, 10000): term 151 for term in search_terms 152 } 153 154 for future in as_completed(future_to_term): 155 term = future_to_term[future] 156 try: 157 vids = future.result() 158 if vids: 159 avg = sum(v[2] for v in vids) / len(vids) 160 print(f"Average for '{term}': {avg/60:.2f} minutes over {len(vids)} videos") 161 162 rows = [(term, title, url, secs, res, bitrate) for title, url, secs, res, bitrate in vids] 163 writer.writerows(rows) 164 f.flush() 165 else: 166 print(f"No videos found for '{term}'.") 167 except Exception as e: 168 print(f"Error fetching term '{term}': {e}")