youtube-scraping-only-lengths-comprehensive.py (5574B)
1 # https://www.kaggle.com/datasets/muhammedtausif/data-science-trends-on-google 2 import re 3 import json 4 import csv 5 import requests 6 from concurrent.futures import ThreadPoolExecutor, as_completed 7 import pandas as pd 8 9 df = pd.read_csv('trends.csv') 10 11 search_terms = df['query'].to_list() 12 print(len(search_terms)) 13 14 session = requests.Session() 15 16 HEADERS = { 17 "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " 18 "(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" 19 } 20 21 API_URL = "https://www.youtube.com/youtubei/v1/search" 22 API_KEY_RE = re.compile(r'"INNERTUBE_API_KEY":"(.*?)"') 23 CLIENT_CONTEXT = { 24 "context": { 25 "client": { 26 "clientName": "WEB", 27 "clientVersion": "2.20240722.00.00" 28 } 29 } 30 } 31 32 def parse_videos(obj): 33 results = [] 34 if isinstance(obj, dict): 35 if 'videoRenderer' in obj or 'shortsRenderer' in obj: 36 v = obj.get('videoRenderer') or obj.get('shortsRenderer') 37 title = v.get('title', {}).get('runs', [{}])[0].get('text') 38 video_id = v.get('videoId') 39 length = v.get('lengthText', {}).get('simpleText') if 'lengthText' in v else None 40 secs = 0 41 if length: 42 parts = length.split(':') 43 for p in parts: 44 secs = secs * 60 + int(p) 45 results.append((title, f"https://www.youtube.com/watch?v={video_id}", secs, video_id)) 46 for v in obj.values(): 47 results.extend(parse_videos(v)) 48 elif isinstance(obj, list): 49 for v in obj: 50 results.extend(parse_videos(v)) 51 return results 52 53 def fetch_highest_resolution(video_id): 54 return "Not Used In This Run" 55 56 def fetch_more_results(continuation, api_key): 57 try: 58 resp = session.post( 59 f"{API_URL}?key={api_key}", 60 headers=HEADERS, 61 json={**CLIENT_CONTEXT, "continuation": continuation}, 62 timeout=10 63 ) 64 resp.raise_for_status() 65 data = resp.json() 66 videos = parse_videos(data) 67 cont = None 68 try: 69 cont = data['onResponseReceivedCommands'][0]['appendContinuationItemsAction'] \ 70 ['continuationItems'][-1]['continuationItemRenderer'] \ 71 ['continuationEndpoint']['continuationCommand']['token'] 72 except Exception: 73 pass 74 return cont, videos 75 except Exception: 76 return None, [] 77 78 def get_recent_videos_for_term(term, max_videos=1000): 79 url = f"https://www.youtube.com/results?search_query={term.replace(' ', '+')}&sp=CAI%253D" 80 print(f"\nFetching for term '{term}': {url}") 81 resp = session.get(url, headers=HEADERS) 82 html = resp.text 83 84 api_match = API_KEY_RE.search(html) 85 if not api_match: 86 print("Could not find API key for term:", term) 87 return [] 88 api_key = api_match.group(1) 89 90 data_match = re.search(r'var ytInitialData = ({.*?});</script>', html, re.DOTALL) 91 if not data_match: 92 print("No ytInitialData found for term:", term) 93 return [] 94 data = json.loads(data_match.group(1)) 95 96 videos = parse_videos(data) 97 98 try: 99 cont = data['contents']['twoColumnSearchResultsRenderer']['primaryContents'] \ 100 ['sectionListRenderer']['contents'][-1]['continuationItemRenderer'] \ 101 ['continuationEndpoint']['continuationCommand']['token'] 102 except Exception: 103 cont = None 104 105 while cont and len(videos) < max_videos: 106 cont, more = fetch_more_results(cont, api_key) 107 for vid in more: 108 if len(videos) < max_videos: 109 videos.append(vid) 110 print(f" Got {len(videos)} so far for '{term}'...") 111 112 final_videos = [] 113 with ThreadPoolExecutor(max_workers=200) as executor: 114 future_to_vid = {executor.submit(fetch_highest_resolution, vid_id): (title, url, secs) 115 for title, url, secs, vid_id in videos[:max_videos]} 116 for future in as_completed(future_to_vid): 117 title, url, secs = future_to_vid[future] 118 res = future.result() 119 final_videos.append((title, url, secs, res)) 120 121 return final_videos 122 123 all_rows = [] 124 MAX_CONCURRENT_TERMS = 200 125 126 with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TERMS) as executor: 127 future_to_term = {executor.submit(get_recent_videos_for_term, term, 1000): term for term in search_terms} 128 for future in as_completed(future_to_term): 129 term = future_to_term[future] 130 try: 131 vids = future.result() 132 if vids: 133 avg = sum(v[2] for v in vids) / len(vids) 134 print(f"Average for '{term}': {avg/60:.2f} minutes over {len(vids)} videos") 135 for title, url, secs, res in vids: 136 all_rows.append((term, title, url, secs, res)) 137 else: 138 print(f"No videos found for '{term}'.") 139 except Exception as e: 140 print(f"Error fetching term '{term}': {e}") 141 142 with open("results-only-lengths-comp.csv", "w", newline="", encoding="utf-8") as f: 143 writer = csv.writer(f) 144 writer.writerow(["term", "title", "url", "duration_seconds", "highest_resolution"]) 145 146 for future in as_completed(future_to_term): 147 term = future_to_term[future] 148 try: 149 vids = future.result() 150 if vids: 151 for title, url, secs, res in vids: 152 writer.writerow((term, title, url, secs, res)) 153 except Exception as e: 154 print(f"Error fetching term '{term}': {e}") 155 print("\nWrote results to results.csv")