information-retrieval

Exploration of information retrieval topics
git clone git://git.laack.co/information-retrieval.git
Log | Files | Refs

spider.py (16516B)


      1 # GUARANTEE:
      2     # the crawler will crawl every url in the crawling queue at least once
      3         # can be more in cases of crashing
      4     # the crawler bg job will not delete files that are in the indexing queue, but will delete all orphans
      5         # When the bg job runs, it will find old files (based on mtime), and then
      6         # check if they are queued for indexing, if they are, they persist. if they arent, 
      7         # they are deleted.
      8             # mtimes are useful here because there is a non-zero amount of time between fs write and db queue insertion.
      9         # This guarantees fs usage won't ballon for no reason over time unless the indexer
     10         # queue is growing.
     11             # we assume the indexer will delete records from the queue to support this, but not files on the fs
     12     # every url that is crawled will make its way into the indexer queue with a lifetime
     13         # associated with the lifetime of said record
     14 
     15 # TODO: To support the above guarantee, we must implement logic to unlock records that 
     16 # were set as being processed in the past, but the execution of said crawler crashed
     17 # before deleting the record from the queue.
     18 
     19 import urllib.robotparser
     20 from urllib.parse import urlparse
     21 import urllib.request
     22 import requests
     23 import os
     24 from urllib.parse import urljoin, urlparse
     25 from lxml import html
     26 import sys
     27 import psycopg2
     28 from crawling.constants import CRAWLING_DB 
     29 from crawling.constants import INDEXING_DB
     30 from crawling.constants import CACHE_DIRECTORY
     31 from crawling.constants import DB_PASSWORD_ENV_VAR
     32 from crawling.constants import DB_USER_ENV_VAR
     33 from crawling.constants import DB_HOST
     34 from crawling.constants import DB_PORT
     35 import urllib
     36 import uuid
     37 from concurrent.futures import ThreadPoolExecutor, as_completed
     38 
     39 # this is the number of links we take out of the queue
     40 LINK_SELECTION_COUNT = 50000
     41 MAX_SITE_SIZE = 2_000_000
     42 MAX_URLS_PER_SITE = 100
     43 # number of concurrent workers for thread pool executor
     44 MAX_WORKERS = 200
     45 
     46 # TODO: Only queue if we haven't already indexed it recently, or some other logic here.
     47 
     48 def queue_urls_for_crawling(conn, urls, prior_depth):
     49     current_depth = prior_depth + 1
     50     for url in urls:
     51         insert_or_increment_urls(conn, {url: current_depth})
     52 
     53 def is_allowed(url, user_agent, timeout=1):
     54     try:
     55         parsed = urlparse(url)
     56         robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
     57         rp = urllib.robotparser.RobotFileParser()
     58         rp.set_url(robots_url)
     59         with urllib.request.urlopen(robots_url, timeout=timeout) as response:
     60             rp.parse(response.read().decode('utf-8').splitlines())
     61         return rp.can_fetch(user_agent, url)
     62     except Exception:
     63         return True
     64 
     65 # TODO: How can we limit the request size prior to loading it into memory?
     66 def crawl_url(url, filepath):
     67     links = set()
     68     written_to_fs = False
     69 
     70     user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:142.0) Gecko/20100101 Firefox/142.0'
     71 
     72     headers = {
     73         'User-agent': user_agent,
     74     }
     75 
     76 
     77     # TODO: Remove to support more languages.
     78     # TODO: Add Content-Length check for head (not always supported for shitty sites, like js stuff)
     79 
     80     try:
     81         head_response = requests.head(url, headers=headers, timeout=1)
     82         content_lang = head_response.headers.get('Content-Language', '')
     83         if content_lang:
     84             if not content_lang.split(',')[0].strip().lower().startswith('en'):
     85                 print(f'site not english {url}')
     86                 return written_to_fs, links
     87     except Exception:
     88         return written_to_fs, links
     89 
     90     if not is_allowed(url, user_agent):
     91         print(f"Can't crawl {url} due to robots.txt violation")
     92         return written_to_fs, links
     93     try:
     94         source_code = requests.get(url, headers=headers, timeout=1)
     95         if not source_code.ok:
     96             print(f'Status code not 2xx for {url}, returning.')
     97             return written_to_fs, links
     98         
     99         content_type = source_code.headers.get('Content-Type', '')
    100         if 'text/html' not in content_type:
    101             print(f'Content type for {url} not html, returning.')
    102             return written_to_fs, links
    103         doc = html.document_fromstring(source_code.content)
    104         doc.make_links_absolute(url)
    105         content = html.tostring(doc, pretty_print=True, encoding='unicode')
    106         if len(content.encode('utf-8')) < MAX_SITE_SIZE:
    107             with open(filepath, 'w') as f:
    108                 f.write(content)
    109                 written_to_fs = True
    110                 print(f'Wrote {url} to {filepath}')
    111         else:
    112             print(f'skipping fs write for {url}, too large')
    113             return written_to_fs, links
    114     except Exception as e:
    115         print(e)
    116         return written_to_fs, links
    117     current_url_without_fragment = urlparse(url)._replace(fragment='').geturl()
    118     for el in doc.iter('a'):
    119         href = el.get('href')
    120         if not href:
    121             continue
    122         
    123         parsed = urlparse(href)
    124         url_without_fragment = parsed._replace(fragment='').geturl()
    125         
    126         if url_without_fragment == current_url_without_fragment:
    127             continue
    128         
    129         if len(href) > 50:
    130             continue
    131         if parsed.scheme in ('http', 'https') and len(links) < MAX_URLS_PER_SITE:
    132             links.add(href)
    133     assert written_to_fs == True
    134     return written_to_fs, links
    135 
    136 def crawl(url):
    137     filepath = CACHE_DIRECTORY + "/" + str(uuid.uuid4())
    138     success, links = crawl_url(url, filepath)
    139     return url, success, filepath, links
    140 
    141 # urls: {url: depth}
    142 def insert_or_increment_urls(conn, urls_dict):
    143     cursor = conn.cursor()
    144     for url in urls_dict:
    145         depth = urls_dict[url]
    146         query = """
    147             INSERT INTO queued_site (url, depth)
    148             VALUES (%s, %s)
    149             ON CONFLICT (url) DO UPDATE
    150             SET depth = LEAST(queued_site.depth, EXCLUDED.depth),
    151             crawl_requests = queued_site.crawl_requests + 1
    152         """
    153         cursor.execute(query, (url, depth))
    154 
    155     cursor.close()
    156     conn.commit()
    157 
    158 def move_url_to_indexing_if_success(conn, url, filepath, success, conn_indexing):
    159     cursor = conn.cursor()
    160     delete_query = """
    161         DELETE FROM queued_site
    162         WHERE url = %s
    163     """
    164     cursor.execute(delete_query, (url,))
    165     cursor.close()
    166 
    167 
    168     if success:
    169         assert os.path.isfile(filepath) # If this is not true, this file was created more than an hour ago and 
    170                                         # the cleanup job cleared it. Either that, or the file was never created.
    171                                         # This is an assertion because the hour time was chosen to give ample time
    172                                         # between file creation and insertion into the indexing_queue.
    173         
    174         # The only way the filepath doesn't exist, but is still inserted into the db is if the file was created
    175         # more than an hour ago, but was deleted by the background job between the above assertion and commiting
    176         # this transaction. This would be exceedingly unlikely, so much so that it should be considered impossible
    177         # as it would be insane that the file was created on the filessytem and a few operations took exactly an amount
    178         # of time > 1 hour and identical to when the timer runs. 
    179 
    180         cursor_indexing = conn_indexing.cursor()
    181         upsert_query = """
    182             INSERT INTO indexing_queue (url, filepath) VALUES (%s, %s)
    183         """
    184         cursor_indexing.execute(upsert_query, (url, filepath))
    185         cursor_indexing.close()
    186         conn_indexing.commit()
    187 
    188     conn.commit()
    189 
    190 
    191 
    192 def get_k_urls_with_depth_from_db(conn, k):
    193     cursor = conn.cursor()
    194 
    195     # TODO: Improve this to make use of all attributes we have.
    196 
    197     select_top_priority_elements_query = """
    198         UPDATE queued_site 
    199         SET status = 'processing', claimed_at = NOW()
    200         WHERE (url) IN (
    201             SELECT url
    202             FROM queued_site 
    203             WHERE status = 'pending' 
    204             ORDER BY depth ASC, creation_timestamp ASC 
    205             LIMIT %s
    206             FOR UPDATE SKIP LOCKED
    207         )
    208         RETURNING url, depth;
    209     """
    210     cursor.execute(select_top_priority_elements_query, (k,))
    211     result = cursor.fetchall()
    212     conn.commit()
    213     result = {res[0] : res[1] for res in result}
    214     cursor.close()
    215     return result
    216 
    217 def ensure_indexing_queue_and_get_connection(db_name, db_user, db_password, db_host, db_port):
    218     conn = psycopg2.connect(
    219         database=db_name,
    220         user=db_user,
    221         password=db_password,
    222         host=db_host,
    223         port=db_port
    224     )
    225     cursor = conn.cursor()
    226 
    227     # These statuses are **only** for the indexer, not for the crawler.
    228     # This is the same pattern the crawler uses to ensure it only dispatches one indexer per record
    229 
    230     # Also, the indexing queue should only be added to by the crawler. If something must be re-indexed, 
    231     # it should be added to the crawler queue and go through the entire process again.
    232 
    233     create_table_query = """
    234         CREATE TABLE IF NOT EXISTS indexing_queue (
    235             id SERIAL PRIMARY KEY,
    236             url TEXT NOT NULL,
    237             status TEXT DEFAULT 'pending' NOT NULL
    238                 CHECK (status IN ('pending', 'processing')),
    239             claimed_at TIMESTAMPTZ,
    240             filepath TEXT NOT NULL,
    241             creation_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL
    242         );
    243     """
    244 
    245     cursor.execute(create_table_query)
    246     conn.commit()
    247     cursor.close()
    248     return conn
    249 
    250 def ensure_queued_sites_and_get_connection(db_name, db_user, db_password, db_host, db_port):
    251     conn = psycopg2.connect(
    252         database=db_name,
    253         user=db_user,
    254         password=db_password,
    255         host=db_host,
    256         port=db_port
    257     )
    258     cursor = conn.cursor()
    259 
    260     # If a site can't be reached it will still be removed from the db.
    261     # The status is only for spiders that stop mid execution, and to ensure
    262     # multiple spiders don't grab the same url.
    263 
    264     # TODO: See above, add logic to unset claimed status after certain amount of time.
    265 
    266     create_table_query = """
    267         CREATE TABLE IF NOT EXISTS queued_site (
    268             url TEXT PRIMARY KEY,
    269             creation_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL,
    270             status TEXT DEFAULT 'pending' NOT NULL
    271                 CHECK (status IN ('pending', 'processing')),
    272             claimed_at TIMESTAMPTZ,
    273             crawl_requests INTEGER DEFAULT 1 NOT NULL CHECK (crawl_requests >= 1),
    274             depth INTEGER NOT NULL CHECK (depth >= 0)
    275         );
    276         CREATE INDEX IF NOT EXISTS idx_queued_url ON queued_site (url);
    277         CREATE INDEX IF NOT EXISTS idx_queued_depth ON queued_site (depth);
    278     """
    279 
    280     cursor.execute(create_table_query)
    281     conn.commit()
    282     cursor.close()
    283     return conn
    284 
    285 
    286 def ensure_page_table(conn):
    287     cursor = conn.cursor()
    288 
    289     create_table_query = """
    290         CREATE TABLE IF NOT EXISTS page (
    291             url TEXT PRIMARY KEY,
    292             language TEXT NOT NULL,
    293             term_count INTEGER NOT NULL CHECK (term_count >= 0),
    294             url_term_count INTEGER NOT NULL CHECK (url_term_count >= 0),
    295             last_updated_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL
    296         );
    297     """
    298 
    299     cursor.execute(create_table_query)
    300     conn.commit()
    301     cursor.close()
    302     return conn
    303 
    304 
    305 def ensure_collection_table(conn):
    306     cursor = conn.cursor()
    307 
    308     # TODO: This should probably be a materialized view
    309 
    310     # TODO: Average should probably be a float
    311 
    312     create_table_query = """
    313         CREATE TABLE IF NOT EXISTS collection (
    314             id INTEGER PRIMARY KEY DEFAULT 0,
    315             num_documents INTEGER NOT NULL CHECK (num_documents >= 0),
    316             average_document_length INTEGER NOT NULL CHECK (average_document_length >= 0),
    317             average_url_length INTEGER NOT NULL CHECK (average_url_length >= 0) 
    318         );
    319     """
    320 
    321     cursor.execute(create_table_query)
    322     conn.commit()
    323     cursor.close()
    324     return conn
    325 
    326 def ensure_terms_table(conn):
    327     cursor = conn.cursor()
    328 
    329     # TODO: This should probably be a materialized view
    330 
    331     create_table_query = """
    332         CREATE TABLE IF NOT EXISTS term (
    333             name TEXT PRIMARY KEY,
    334             document_count INTEGER NOT NULL CHECK (document_count >= 0) 
    335         );
    336     """
    337 
    338     cursor.execute(create_table_query)
    339     conn.commit()
    340     cursor.close()
    341     return conn
    342 
    343 
    344 def ensure_title_term_table(conn):
    345     cursor = conn.cursor()
    346 
    347     # inverse index
    348     # TODO: Add cleanup for this (in clean.py)
    349 
    350     create_table_query = """
    351         CREATE TABLE IF NOT EXISTS title_term (
    352             term TEXT,
    353             url TEXT NOT NULL,
    354             tf REAL NOT NULL,
    355             positional_postings INTEGER [],
    356             PRIMARY KEY(term, url)
    357         );
    358          CREATE INDEX IF NOT EXISTS idx_title_term ON title_term (term);
    359     """
    360 
    361     cursor.execute(create_table_query)
    362     conn.commit()
    363     cursor.close()
    364     return conn
    365 
    366 def ensure_url_term_table(conn):
    367     cursor = conn.cursor()
    368 
    369     # inverse index
    370     # TODO: Add cleanup for this (in clean.py)
    371 
    372     create_table_query = """
    373         CREATE TABLE IF NOT EXISTS url_term (
    374             term TEXT,
    375             url TEXT NOT NULL,
    376             tf REAL NOT NULL,
    377             positional_postings INTEGER [],
    378             PRIMARY KEY(term, url)
    379         );
    380          CREATE INDEX IF NOT EXISTS idx_url_term ON url_term (term);
    381     """
    382 
    383     cursor.execute(create_table_query)
    384     conn.commit()
    385     cursor.close()
    386     return conn
    387 
    388 
    389 def ensure_document_term_table(conn):
    390     cursor = conn.cursor()
    391 
    392     # inverse index
    393     # TODO: Add cleanup for this (in clean.py)
    394 
    395     create_table_query = """
    396         CREATE TABLE IF NOT EXISTS document_term (
    397             term TEXT,
    398             url TEXT NOT NULL,
    399             tf REAL NOT NULL,
    400             positional_postings INTEGER [],
    401             PRIMARY KEY(term, url)
    402         );
    403          CREATE INDEX IF NOT EXISTS idx_document_term ON document_term (term);
    404     """
    405 
    406     cursor.execute(create_table_query)
    407     conn.commit()
    408     cursor.close()
    409     return conn
    410 
    411 def insert_seed_file(filepath, conn):
    412     with open(filepath, 'r') as f:
    413         urls = f.readlines()
    414 
    415     to_insert = {}
    416     for url in urls:
    417         to_insert[url.strip()] = 0
    418 
    419     insert_or_increment_urls(conn, to_insert)
    420 
    421 def get_crawling_db_connection():
    422     password = os.getenv(DB_PASSWORD_ENV_VAR)
    423     username = os.getenv(DB_USER_ENV_VAR)
    424     conn = ensure_queued_sites_and_get_connection(CRAWLING_DB, username, password, DB_HOST, DB_PORT)
    425     return conn
    426 
    427 def get_indexing_db_connection():
    428     password = os.getenv(DB_PASSWORD_ENV_VAR)
    429     username = os.getenv(DB_USER_ENV_VAR)
    430     conn_indexing_queue = ensure_indexing_queue_and_get_connection(INDEXING_DB, username, password, DB_HOST, DB_PORT)
    431     ensure_terms_table(conn_indexing_queue)
    432     ensure_url_term_table(conn_indexing_queue)
    433     ensure_title_term_table(conn_indexing_queue)
    434     ensure_document_term_table(conn_indexing_queue)
    435     ensure_collection_table(conn_indexing_queue)
    436     ensure_page_table(conn_indexing_queue)
    437     return conn_indexing_queue
    438 
    439 if __name__ == "__main__":
    440 
    441     if not os.path.exists(CACHE_DIRECTORY):
    442         os.makedirs(CACHE_DIRECTORY)
    443 
    444     conn = get_crawling_db_connection()
    445     conn_indexing_queue = get_indexing_db_connection()
    446 
    447     if len(sys.argv) > 1:
    448         for filepath in sys.argv[1:]:
    449             insert_seed_file(filepath, conn)
    450             print(f"Inserted urls from {filepath}")
    451 
    452     while True:
    453         urls_dict = get_k_urls_with_depth_from_db(conn, LINK_SELECTION_COUNT)
    454         if len(urls_dict) == 0:
    455             print('No URLs to search... Exiting (if you are just starting, try passing in a seed file)')
    456             break
    457 
    458         with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    459             futures = {executor.submit(crawl, url): url for url in urls_dict}
    460             for future in as_completed(futures):
    461                 try:
    462 
    463                     url, success, filepath, links = future.result(timeout=20)
    464                     # success means the html was written to the filepath
    465                     # if not success, just delete from the db
    466                     move_url_to_indexing_if_success(conn, url, filepath, success, conn_indexing_queue)
    467                     current_urls_depth = urls_dict[url]
    468                     queue_urls_for_crawling(conn, links, current_urls_depth)
    469                 except TimeoutError:
    470                     print(f"Timeout, cancelling")
    471                     future.cancel()
    472 
    473     conn.close()
    474     conn_indexing_queue.close()