page_parsing.py (11391B)
1 from psycopg2.extras import execute_values 2 import lxml.html 3 import tldextract 4 from urllib.parse import urlparse 5 import time 6 import sys 7 from indexing.utils import get_plaintext, get_words, get_plaintext_words 8 import gc 9 from concurrent.futures import ThreadPoolExecutor, as_completed 10 import random 11 from indexing.utils import detect_language 12 import re 13 import tqdm 14 from crawling.spider import get_indexing_db_connection 15 16 MAX_WORKERS = 5 17 BATCH_SIZE = 10 18 19 def get_term_list(filepath): 20 21 plaintext = get_plaintext(filepath) 22 cleaned = re.sub(r'[^a-zA-Z0-9\s]', ' ', plaintext).lower() 23 terms = cleaned.split() 24 25 final_terms = [] 26 for term in terms: 27 term = term.strip() 28 if len(term) <= 50 and len(term) > 0: 29 final_terms.append(term) 30 return final_terms 31 32 def remove_from_queue(conn, filepath): 33 cursor = conn.cursor() 34 # Delete by filepath because the same url can be queued multiple times. 35 query = """ 36 DELETE FROM indexing_queue 37 WHERE filepath = %s 38 """ 39 cursor.execute(query, (filepath,)) 40 cursor.close() 41 conn.commit() 42 43 # No need to remove existing ones, that will be handled gracefully. 44 def add_terms(conn, terms): 45 cursor = conn.cursor() 46 query = """ 47 INSERT INTO term (name) 48 VALUES (%s) ON CONFLICT (name) DO NOTHING 49 """ 50 cursor.executemany( 51 query, 52 [(name,) for name in terms] 53 ) 54 cursor.close() 55 conn.commit() 56 57 def get_terms(conn): 58 cursor = conn.cursor() 59 query = """ 60 SELECT name from term 61 """ 62 cursor.execute(query) 63 results = [res[0] for res in cursor.fetchall()] 64 return results 65 66 67 def process_file(filepath): 68 language = detect_language(filepath) 69 if language != 'en': 70 return filepath, True 71 72 text = get_plaintext(filepath) 73 74 if len(text) < 750: 75 return filepath, True 76 return filepath, False 77 78 # TODO: Optimize this. we are repeatedly getting the plaintext here (if the lang isn't in the html, and for length.) 79 80 def prune_documents(conn, filepaths, max_workers): 81 82 remaining = [] 83 84 with ThreadPoolExecutor(max_workers=max_workers) as executor: 85 futures = {executor.submit(process_file, fp): fp for fp in filepaths} 86 87 for future in tqdm.tqdm(as_completed(futures), total=len(filepaths)): 88 filepath, should_delete = future.result() 89 if should_delete: 90 remove_from_queue(conn, filepath) 91 else: 92 remaining.append(filepath) 93 del future 94 return remaining 95 96 # Use the existing data to derive term document_count per term 97 def full_term_update(conn, term): 98 99 # TODO: Probably want to do url term count here as well for idf later. 100 101 query = """ 102 SELECT COUNT(url) FROM document_term WHERE term = %s; 103 """ 104 105 cursor = conn.cursor() 106 cursor.execute(query, (term,)) 107 108 count = cursor.fetchall()[0][0] 109 110 111 update_query = """ 112 INSERT INTO term (name, document_count) 113 VALUES (%s, %s) 114 ON CONFLICT (name) DO UPDATE SET document_count = EXCLUDED.document_count 115 """ 116 117 cursor.execute(update_query, (term, count)) 118 cursor.close() 119 conn.commit() 120 121 122 def get_k_documents(conn, k): 123 124 # Duplicate urls in the same batch can be painful. 125 126 query = """ 127 WITH unique_urls AS ( 128 SELECT DISTINCT ON (url) id 129 FROM indexing_queue 130 WHERE status = 'pending' 131 ORDER BY url, creation_timestamp ASC 132 ), 133 to_process AS ( 134 SELECT id FROM indexing_queue 135 WHERE id IN (SELECT id FROM unique_urls) 136 ORDER BY creation_timestamp ASC 137 LIMIT %s 138 FOR UPDATE SKIP LOCKED 139 ) 140 UPDATE indexing_queue 141 SET status = 'processing', claimed_at = NOW() 142 WHERE id IN (SELECT id FROM to_process) 143 RETURNING url, filepath; 144 """ 145 146 cursor = conn.cursor() 147 cursor.execute(query, (k,)) 148 results = cursor.fetchall() 149 cursor.close() 150 151 urls = [res[0] for res in results] 152 filepaths = [res[1] for res in results] 153 return urls, filepaths 154 155 def get_title_postings(filepath): 156 title = lxml.html.parse(filepath).find('.//title').text 157 words = get_words(title) 158 return words 159 160 def get_url_postings(url): 161 ext = tldextract.extract(url) 162 parts = [ext.subdomain, ext.domain] 163 parts = [p for p in parts if p and p != 'www'] 164 path = urlparse(url).path 165 combined = '.'.join(parts) + path 166 words = get_words(combined) 167 168 postings = {} 169 for position, word in enumerate(words): 170 if word not in postings: 171 postings[word] = [] 172 postings[word].append(position) 173 return postings 174 175 def update_collection_metrics(conn): 176 177 # TODO: This should lock the db for consistency of terms and document count. 178 # Ehh, maybe close enough is good enough... 179 180 query = """ 181 SELECT DISTINCT term FROM document_term; 182 """ 183 cursor = conn.cursor() 184 cursor.execute(query) 185 186 distinct_terms = [res[0] for res in cursor.fetchall()] 187 print('Fetched distinct terms') 188 189 for term in distinct_terms: 190 full_term_update(conn, term) 191 192 query = """ 193 SELECT count(url) FROM page; 194 """ 195 196 cursor = conn.cursor() 197 cursor.execute(query) 198 distinct_urls = cursor.fetchall()[0][0] 199 cursor.close() 200 201 print('Fetched page count') 202 203 query = """ 204 SELECT AVG(term_count) FROM page; 205 """ 206 207 cursor = conn.cursor() 208 cursor.execute(query) 209 average_document_length = cursor.fetchall()[0][0] 210 211 cursor.close() 212 213 print('Fetched average document length') 214 215 query = """ 216 SELECT AVG(url_term_count) FROM page; 217 """ 218 219 cursor = conn.cursor() 220 cursor.execute(query) 221 average_url_length = cursor.fetchall()[0][0] 222 cursor.close() 223 224 query = """ 225 INSERT INTO collection (num_documents, average_document_length, average_url_length) 226 VALUES (%s, %s, %s) 227 ON CONFLICT (id) DO UPDATE 228 SET num_documents = EXCLUDED.num_documents, average_document_length = EXCLUDED.average_document_length, 229 average_url_length = EXCLUDED.average_url_length 230 """ 231 232 cursor = conn.cursor() 233 cursor.execute(query, (distinct_urls, average_document_length, average_url_length)) 234 cursor.close() 235 conn.commit() 236 print('Inserted collection record') 237 238 239 240 if __name__ == "__main__": 241 242 243 244 # {filepath : {term1: [position1, position2], term2: [position1]}} 245 246 247 conn = get_indexing_db_connection() 248 if len(sys.argv) > 1 and sys.argv[1] == "update": 249 update_collection_metrics(conn) 250 print('Updated collection metrics.') 251 exit() 252 253 # TODO: Add all the deletion logic. Still wip so I don't want to mess up my queue yet. 254 # TODO: Add unlocking for failed attempts (processing + timepassed) 255 256 while True: 257 258 urls, filepaths = get_k_documents(conn, BATCH_SIZE) 259 260 # TODO: Delete existing records during same write transaction. 261 262 if len(urls) == 0: 263 print('There are no more queued pages! Exiting') 264 exit() 265 266 filepath_lookup = {} 267 268 for i in range(len(filepaths)): 269 filepath_lookup[filepaths[i]] = urls[i] 270 271 272 remaining = filepaths 273 print("Pruning documents.") 274 before = len(filepaths) 275 remaining = prune_documents(conn, filepaths, MAX_WORKERS) 276 after = len(remaining) 277 print("Documents pruned.") 278 print(f'{before - after} documents removed') 279 280 document_positional_postings = {} 281 url_word_postings = {} 282 title_word_postings = {} 283 284 for filepath in remaining: 285 document_positional_postings[filepath] = get_plaintext_words(filepath) 286 url_word_postings[filepath] = get_url_postings(filepath_lookup[filepath]) 287 title_word_postings[filepath] = get_title_postings(filepath) 288 289 count = 0 290 291 # TODO: On conflict handling makes this shit slow(er). 292 293 query = """ 294 INSERT INTO document_term (term, url, tf, positional_postings) 295 VALUES %s 296 ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings 297 """ 298 299 query_url_term = """ 300 INSERT INTO url_term (term, url, tf, positional_postings) 301 VALUES %s 302 ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings 303 """ 304 305 query_title_term = """ 306 INSERT INTO title_term (term, url, tf, positional_postings) 307 VALUES %s 308 ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings 309 """ 310 311 query_page = """ 312 INSERT INTO page (url, language, url_term_count, term_count) 313 VALUES %s 314 ON CONFLICT (url) DO UPDATE 315 SET 316 last_updated_timestamp = NOW(), 317 term_count = EXCLUDED.term_count, 318 url_term_count = EXCLUDED.url_term_count, 319 language = EXCLUDED.language; 320 """ 321 322 term_inserts = [] 323 url_term_inserts = [] 324 title_term_inserts = [] 325 page_inserts = [] 326 327 for filepath in remaining: 328 url = filepath_lookup[filepath] 329 330 doc_terms_data = document_positional_postings[filepath] 331 document_term_count = sum(len(positions) for positions in doc_terms_data.values()) 332 333 for term, positional_postings in doc_terms_data.items(): 334 tf = len(positional_postings) / document_term_count 335 term_inserts.append((term, url, tf, positional_postings)) 336 count += 1 337 338 url_terms_data = url_word_postings[filepath] 339 title_terms_data = title_word_postings[filepath] 340 title_term_count = sum(len(positions) for positions in title_terms_data.values()) 341 url_term_count = sum(len(positions) for positions in url_terms_data.values()) 342 343 for term, positional_postings in title_terms_data.items(): 344 tf = len(positional_postings) / title_term_count if title_term_count > 0 else 0 345 title_term_inserts.append((term, url, tf, positional_postings)) 346 347 for term, positional_postings in url_terms_data.items(): 348 tf = len(positional_postings) / url_term_count if url_term_count > 0 else 0 349 url_term_inserts.append((term, url, tf, positional_postings)) 350 351 page_inserts.append((url, 'en', url_term_count, document_term_count)) 352 353 cursor = conn.cursor() 354 355 execute_values( 356 cursor, 357 query_title_term, 358 title_term_inserts, 359 page_size=100000 360 ) 361 362 execute_values( 363 cursor, 364 query_url_term, 365 url_term_inserts, 366 page_size=100000 367 ) 368 369 execute_values( 370 cursor, 371 query, 372 term_inserts, 373 page_size=100000 # TODO: Optimize this. It's kinda quick, but also random, the number that is. 374 ) 375 execute_values( 376 cursor, 377 query_page, 378 page_inserts, 379 page_size=100000 380 ) 381 382 cursor.close() 383 conn.commit() 384 print(f'Inserted {count} terms with positional postings') 385 386 conn.close()