spider.py (16516B)
1 # GUARANTEE: 2 # the crawler will crawl every url in the crawling queue at least once 3 # can be more in cases of crashing 4 # the crawler bg job will not delete files that are in the indexing queue, but will delete all orphans 5 # When the bg job runs, it will find old files (based on mtime), and then 6 # check if they are queued for indexing, if they are, they persist. if they arent, 7 # they are deleted. 8 # mtimes are useful here because there is a non-zero amount of time between fs write and db queue insertion. 9 # This guarantees fs usage won't ballon for no reason over time unless the indexer 10 # queue is growing. 11 # we assume the indexer will delete records from the queue to support this, but not files on the fs 12 # every url that is crawled will make its way into the indexer queue with a lifetime 13 # associated with the lifetime of said record 14 15 # TODO: To support the above guarantee, we must implement logic to unlock records that 16 # were set as being processed in the past, but the execution of said crawler crashed 17 # before deleting the record from the queue. 18 19 import urllib.robotparser 20 from urllib.parse import urlparse 21 import urllib.request 22 import requests 23 import os 24 from urllib.parse import urljoin, urlparse 25 from lxml import html 26 import sys 27 import psycopg2 28 from crawling.constants import CRAWLING_DB 29 from crawling.constants import INDEXING_DB 30 from crawling.constants import CACHE_DIRECTORY 31 from crawling.constants import DB_PASSWORD_ENV_VAR 32 from crawling.constants import DB_USER_ENV_VAR 33 from crawling.constants import DB_HOST 34 from crawling.constants import DB_PORT 35 import urllib 36 import uuid 37 from concurrent.futures import ThreadPoolExecutor, as_completed 38 39 # this is the number of links we take out of the queue 40 LINK_SELECTION_COUNT = 50000 41 MAX_SITE_SIZE = 2_000_000 42 MAX_URLS_PER_SITE = 100 43 # number of concurrent workers for thread pool executor 44 MAX_WORKERS = 200 45 46 # TODO: Only queue if we haven't already indexed it recently, or some other logic here. 47 48 def queue_urls_for_crawling(conn, urls, prior_depth): 49 current_depth = prior_depth + 1 50 for url in urls: 51 insert_or_increment_urls(conn, {url: current_depth}) 52 53 def is_allowed(url, user_agent, timeout=1): 54 try: 55 parsed = urlparse(url) 56 robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" 57 rp = urllib.robotparser.RobotFileParser() 58 rp.set_url(robots_url) 59 with urllib.request.urlopen(robots_url, timeout=timeout) as response: 60 rp.parse(response.read().decode('utf-8').splitlines()) 61 return rp.can_fetch(user_agent, url) 62 except Exception: 63 return True 64 65 # TODO: How can we limit the request size prior to loading it into memory? 66 def crawl_url(url, filepath): 67 links = set() 68 written_to_fs = False 69 70 user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:142.0) Gecko/20100101 Firefox/142.0' 71 72 headers = { 73 'User-agent': user_agent, 74 } 75 76 77 # TODO: Remove to support more languages. 78 # TODO: Add Content-Length check for head (not always supported for shitty sites, like js stuff) 79 80 try: 81 head_response = requests.head(url, headers=headers, timeout=1) 82 content_lang = head_response.headers.get('Content-Language', '') 83 if content_lang: 84 if not content_lang.split(',')[0].strip().lower().startswith('en'): 85 print(f'site not english {url}') 86 return written_to_fs, links 87 except Exception: 88 return written_to_fs, links 89 90 if not is_allowed(url, user_agent): 91 print(f"Can't crawl {url} due to robots.txt violation") 92 return written_to_fs, links 93 try: 94 source_code = requests.get(url, headers=headers, timeout=1) 95 if not source_code.ok: 96 print(f'Status code not 2xx for {url}, returning.') 97 return written_to_fs, links 98 99 content_type = source_code.headers.get('Content-Type', '') 100 if 'text/html' not in content_type: 101 print(f'Content type for {url} not html, returning.') 102 return written_to_fs, links 103 doc = html.document_fromstring(source_code.content) 104 doc.make_links_absolute(url) 105 content = html.tostring(doc, pretty_print=True, encoding='unicode') 106 if len(content.encode('utf-8')) < MAX_SITE_SIZE: 107 with open(filepath, 'w') as f: 108 f.write(content) 109 written_to_fs = True 110 print(f'Wrote {url} to {filepath}') 111 else: 112 print(f'skipping fs write for {url}, too large') 113 return written_to_fs, links 114 except Exception as e: 115 print(e) 116 return written_to_fs, links 117 current_url_without_fragment = urlparse(url)._replace(fragment='').geturl() 118 for el in doc.iter('a'): 119 href = el.get('href') 120 if not href: 121 continue 122 123 parsed = urlparse(href) 124 url_without_fragment = parsed._replace(fragment='').geturl() 125 126 if url_without_fragment == current_url_without_fragment: 127 continue 128 129 if len(href) > 50: 130 continue 131 if parsed.scheme in ('http', 'https') and len(links) < MAX_URLS_PER_SITE: 132 links.add(href) 133 assert written_to_fs == True 134 return written_to_fs, links 135 136 def crawl(url): 137 filepath = CACHE_DIRECTORY + "/" + str(uuid.uuid4()) 138 success, links = crawl_url(url, filepath) 139 return url, success, filepath, links 140 141 # urls: {url: depth} 142 def insert_or_increment_urls(conn, urls_dict): 143 cursor = conn.cursor() 144 for url in urls_dict: 145 depth = urls_dict[url] 146 query = """ 147 INSERT INTO queued_site (url, depth) 148 VALUES (%s, %s) 149 ON CONFLICT (url) DO UPDATE 150 SET depth = LEAST(queued_site.depth, EXCLUDED.depth), 151 crawl_requests = queued_site.crawl_requests + 1 152 """ 153 cursor.execute(query, (url, depth)) 154 155 cursor.close() 156 conn.commit() 157 158 def move_url_to_indexing_if_success(conn, url, filepath, success, conn_indexing): 159 cursor = conn.cursor() 160 delete_query = """ 161 DELETE FROM queued_site 162 WHERE url = %s 163 """ 164 cursor.execute(delete_query, (url,)) 165 cursor.close() 166 167 168 if success: 169 assert os.path.isfile(filepath) # If this is not true, this file was created more than an hour ago and 170 # the cleanup job cleared it. Either that, or the file was never created. 171 # This is an assertion because the hour time was chosen to give ample time 172 # between file creation and insertion into the indexing_queue. 173 174 # The only way the filepath doesn't exist, but is still inserted into the db is if the file was created 175 # more than an hour ago, but was deleted by the background job between the above assertion and commiting 176 # this transaction. This would be exceedingly unlikely, so much so that it should be considered impossible 177 # as it would be insane that the file was created on the filessytem and a few operations took exactly an amount 178 # of time > 1 hour and identical to when the timer runs. 179 180 cursor_indexing = conn_indexing.cursor() 181 upsert_query = """ 182 INSERT INTO indexing_queue (url, filepath) VALUES (%s, %s) 183 """ 184 cursor_indexing.execute(upsert_query, (url, filepath)) 185 cursor_indexing.close() 186 conn_indexing.commit() 187 188 conn.commit() 189 190 191 192 def get_k_urls_with_depth_from_db(conn, k): 193 cursor = conn.cursor() 194 195 # TODO: Improve this to make use of all attributes we have. 196 197 select_top_priority_elements_query = """ 198 UPDATE queued_site 199 SET status = 'processing', claimed_at = NOW() 200 WHERE (url) IN ( 201 SELECT url 202 FROM queued_site 203 WHERE status = 'pending' 204 ORDER BY depth ASC, creation_timestamp ASC 205 LIMIT %s 206 FOR UPDATE SKIP LOCKED 207 ) 208 RETURNING url, depth; 209 """ 210 cursor.execute(select_top_priority_elements_query, (k,)) 211 result = cursor.fetchall() 212 conn.commit() 213 result = {res[0] : res[1] for res in result} 214 cursor.close() 215 return result 216 217 def ensure_indexing_queue_and_get_connection(db_name, db_user, db_password, db_host, db_port): 218 conn = psycopg2.connect( 219 database=db_name, 220 user=db_user, 221 password=db_password, 222 host=db_host, 223 port=db_port 224 ) 225 cursor = conn.cursor() 226 227 # These statuses are **only** for the indexer, not for the crawler. 228 # This is the same pattern the crawler uses to ensure it only dispatches one indexer per record 229 230 # Also, the indexing queue should only be added to by the crawler. If something must be re-indexed, 231 # it should be added to the crawler queue and go through the entire process again. 232 233 create_table_query = """ 234 CREATE TABLE IF NOT EXISTS indexing_queue ( 235 id SERIAL PRIMARY KEY, 236 url TEXT NOT NULL, 237 status TEXT DEFAULT 'pending' NOT NULL 238 CHECK (status IN ('pending', 'processing')), 239 claimed_at TIMESTAMPTZ, 240 filepath TEXT NOT NULL, 241 creation_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL 242 ); 243 """ 244 245 cursor.execute(create_table_query) 246 conn.commit() 247 cursor.close() 248 return conn 249 250 def ensure_queued_sites_and_get_connection(db_name, db_user, db_password, db_host, db_port): 251 conn = psycopg2.connect( 252 database=db_name, 253 user=db_user, 254 password=db_password, 255 host=db_host, 256 port=db_port 257 ) 258 cursor = conn.cursor() 259 260 # If a site can't be reached it will still be removed from the db. 261 # The status is only for spiders that stop mid execution, and to ensure 262 # multiple spiders don't grab the same url. 263 264 # TODO: See above, add logic to unset claimed status after certain amount of time. 265 266 create_table_query = """ 267 CREATE TABLE IF NOT EXISTS queued_site ( 268 url TEXT PRIMARY KEY, 269 creation_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, 270 status TEXT DEFAULT 'pending' NOT NULL 271 CHECK (status IN ('pending', 'processing')), 272 claimed_at TIMESTAMPTZ, 273 crawl_requests INTEGER DEFAULT 1 NOT NULL CHECK (crawl_requests >= 1), 274 depth INTEGER NOT NULL CHECK (depth >= 0) 275 ); 276 CREATE INDEX IF NOT EXISTS idx_queued_url ON queued_site (url); 277 CREATE INDEX IF NOT EXISTS idx_queued_depth ON queued_site (depth); 278 """ 279 280 cursor.execute(create_table_query) 281 conn.commit() 282 cursor.close() 283 return conn 284 285 286 def ensure_page_table(conn): 287 cursor = conn.cursor() 288 289 create_table_query = """ 290 CREATE TABLE IF NOT EXISTS page ( 291 url TEXT PRIMARY KEY, 292 language TEXT NOT NULL, 293 term_count INTEGER NOT NULL CHECK (term_count >= 0), 294 url_term_count INTEGER NOT NULL CHECK (url_term_count >= 0), 295 last_updated_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL 296 ); 297 """ 298 299 cursor.execute(create_table_query) 300 conn.commit() 301 cursor.close() 302 return conn 303 304 305 def ensure_collection_table(conn): 306 cursor = conn.cursor() 307 308 # TODO: This should probably be a materialized view 309 310 # TODO: Average should probably be a float 311 312 create_table_query = """ 313 CREATE TABLE IF NOT EXISTS collection ( 314 id INTEGER PRIMARY KEY DEFAULT 0, 315 num_documents INTEGER NOT NULL CHECK (num_documents >= 0), 316 average_document_length INTEGER NOT NULL CHECK (average_document_length >= 0), 317 average_url_length INTEGER NOT NULL CHECK (average_url_length >= 0) 318 ); 319 """ 320 321 cursor.execute(create_table_query) 322 conn.commit() 323 cursor.close() 324 return conn 325 326 def ensure_terms_table(conn): 327 cursor = conn.cursor() 328 329 # TODO: This should probably be a materialized view 330 331 create_table_query = """ 332 CREATE TABLE IF NOT EXISTS term ( 333 name TEXT PRIMARY KEY, 334 document_count INTEGER NOT NULL CHECK (document_count >= 0) 335 ); 336 """ 337 338 cursor.execute(create_table_query) 339 conn.commit() 340 cursor.close() 341 return conn 342 343 344 def ensure_title_term_table(conn): 345 cursor = conn.cursor() 346 347 # inverse index 348 # TODO: Add cleanup for this (in clean.py) 349 350 create_table_query = """ 351 CREATE TABLE IF NOT EXISTS title_term ( 352 term TEXT, 353 url TEXT NOT NULL, 354 tf REAL NOT NULL, 355 positional_postings INTEGER [], 356 PRIMARY KEY(term, url) 357 ); 358 CREATE INDEX IF NOT EXISTS idx_title_term ON title_term (term); 359 """ 360 361 cursor.execute(create_table_query) 362 conn.commit() 363 cursor.close() 364 return conn 365 366 def ensure_url_term_table(conn): 367 cursor = conn.cursor() 368 369 # inverse index 370 # TODO: Add cleanup for this (in clean.py) 371 372 create_table_query = """ 373 CREATE TABLE IF NOT EXISTS url_term ( 374 term TEXT, 375 url TEXT NOT NULL, 376 tf REAL NOT NULL, 377 positional_postings INTEGER [], 378 PRIMARY KEY(term, url) 379 ); 380 CREATE INDEX IF NOT EXISTS idx_url_term ON url_term (term); 381 """ 382 383 cursor.execute(create_table_query) 384 conn.commit() 385 cursor.close() 386 return conn 387 388 389 def ensure_document_term_table(conn): 390 cursor = conn.cursor() 391 392 # inverse index 393 # TODO: Add cleanup for this (in clean.py) 394 395 create_table_query = """ 396 CREATE TABLE IF NOT EXISTS document_term ( 397 term TEXT, 398 url TEXT NOT NULL, 399 tf REAL NOT NULL, 400 positional_postings INTEGER [], 401 PRIMARY KEY(term, url) 402 ); 403 CREATE INDEX IF NOT EXISTS idx_document_term ON document_term (term); 404 """ 405 406 cursor.execute(create_table_query) 407 conn.commit() 408 cursor.close() 409 return conn 410 411 def insert_seed_file(filepath, conn): 412 with open(filepath, 'r') as f: 413 urls = f.readlines() 414 415 to_insert = {} 416 for url in urls: 417 to_insert[url.strip()] = 0 418 419 insert_or_increment_urls(conn, to_insert) 420 421 def get_crawling_db_connection(): 422 password = os.getenv(DB_PASSWORD_ENV_VAR) 423 username = os.getenv(DB_USER_ENV_VAR) 424 conn = ensure_queued_sites_and_get_connection(CRAWLING_DB, username, password, DB_HOST, DB_PORT) 425 return conn 426 427 def get_indexing_db_connection(): 428 password = os.getenv(DB_PASSWORD_ENV_VAR) 429 username = os.getenv(DB_USER_ENV_VAR) 430 conn_indexing_queue = ensure_indexing_queue_and_get_connection(INDEXING_DB, username, password, DB_HOST, DB_PORT) 431 ensure_terms_table(conn_indexing_queue) 432 ensure_url_term_table(conn_indexing_queue) 433 ensure_title_term_table(conn_indexing_queue) 434 ensure_document_term_table(conn_indexing_queue) 435 ensure_collection_table(conn_indexing_queue) 436 ensure_page_table(conn_indexing_queue) 437 return conn_indexing_queue 438 439 if __name__ == "__main__": 440 441 if not os.path.exists(CACHE_DIRECTORY): 442 os.makedirs(CACHE_DIRECTORY) 443 444 conn = get_crawling_db_connection() 445 conn_indexing_queue = get_indexing_db_connection() 446 447 if len(sys.argv) > 1: 448 for filepath in sys.argv[1:]: 449 insert_seed_file(filepath, conn) 450 print(f"Inserted urls from {filepath}") 451 452 while True: 453 urls_dict = get_k_urls_with_depth_from_db(conn, LINK_SELECTION_COUNT) 454 if len(urls_dict) == 0: 455 print('No URLs to search... Exiting (if you are just starting, try passing in a seed file)') 456 break 457 458 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 459 futures = {executor.submit(crawl, url): url for url in urls_dict} 460 for future in as_completed(futures): 461 try: 462 463 url, success, filepath, links = future.result(timeout=20) 464 # success means the html was written to the filepath 465 # if not success, just delete from the db 466 move_url_to_indexing_if_success(conn, url, filepath, success, conn_indexing_queue) 467 current_urls_depth = urls_dict[url] 468 queue_urls_for_crawling(conn, links, current_urls_depth) 469 except TimeoutError: 470 print(f"Timeout, cancelling") 471 future.cancel() 472 473 conn.close() 474 conn_indexing_queue.close()