information-retrieval

Exploration of information retrieval topics
git clone git://git.laack.co/information-retrieval.git
Log | Files | Refs

page_parsing.py (11391B)


      1 from psycopg2.extras import execute_values
      2 import lxml.html
      3 import tldextract
      4 from urllib.parse import urlparse
      5 import time
      6 import sys
      7 from indexing.utils import get_plaintext, get_words, get_plaintext_words
      8 import gc
      9 from concurrent.futures import ThreadPoolExecutor, as_completed
     10 import random
     11 from indexing.utils import detect_language
     12 import re
     13 import tqdm
     14 from crawling.spider import get_indexing_db_connection
     15 
     16 MAX_WORKERS = 5
     17 BATCH_SIZE = 10
     18 
     19 def get_term_list(filepath):
     20 
     21     plaintext = get_plaintext(filepath)
     22     cleaned = re.sub(r'[^a-zA-Z0-9\s]', ' ', plaintext).lower()
     23     terms = cleaned.split()
     24 
     25     final_terms = []
     26     for term in terms:
     27         term = term.strip()
     28         if len(term) <= 50 and len(term) > 0:
     29             final_terms.append(term)
     30     return final_terms
     31 
     32 def remove_from_queue(conn, filepath):
     33     cursor = conn.cursor()
     34     # Delete by filepath because the same url can be queued multiple times.
     35     query = """
     36         DELETE FROM indexing_queue 
     37         WHERE filepath = %s
     38     """
     39     cursor.execute(query, (filepath,))
     40     cursor.close()
     41     conn.commit()
     42 
     43 # No need to remove existing ones, that will be handled gracefully.
     44 def add_terms(conn, terms):
     45     cursor = conn.cursor()
     46     query = """
     47         INSERT INTO term (name)
     48         VALUES (%s) ON CONFLICT (name) DO NOTHING
     49     """
     50     cursor.executemany(
     51         query,
     52         [(name,) for name in terms]
     53     )
     54     cursor.close()
     55     conn.commit()
     56 
     57 def get_terms(conn):
     58     cursor = conn.cursor()
     59     query = """
     60         SELECT name from term
     61     """
     62     cursor.execute(query)
     63     results = [res[0] for res in cursor.fetchall()]
     64     return results
     65 
     66 
     67 def process_file(filepath):
     68     language = detect_language(filepath)
     69     if language != 'en':
     70         return filepath, True
     71 
     72     text = get_plaintext(filepath)
     73 
     74     if len(text) < 750:
     75         return filepath, True
     76     return filepath, False
     77 
     78 # TODO: Optimize this. we are repeatedly getting the plaintext here (if the lang isn't in the html, and for length.)
     79 
     80 def prune_documents(conn, filepaths, max_workers):
     81 
     82     remaining = []
     83 
     84     with ThreadPoolExecutor(max_workers=max_workers) as executor:
     85         futures = {executor.submit(process_file, fp): fp for fp in filepaths}
     86         
     87         for future in tqdm.tqdm(as_completed(futures), total=len(filepaths)):
     88             filepath, should_delete = future.result()
     89             if should_delete:
     90                 remove_from_queue(conn, filepath)
     91             else:
     92                 remaining.append(filepath)
     93             del future
     94     return remaining
     95     
     96 # Use the existing data to derive term document_count per term
     97 def full_term_update(conn, term):
     98 
     99     # TODO: Probably want to do url term count here as well for idf later.
    100 
    101     query = """
    102         SELECT COUNT(url) FROM document_term WHERE term = %s;
    103     """
    104 
    105     cursor = conn.cursor()
    106     cursor.execute(query, (term,))
    107 
    108     count = cursor.fetchall()[0][0]
    109 
    110     
    111     update_query = """
    112         INSERT INTO term (name, document_count)
    113         VALUES (%s, %s) 
    114         ON CONFLICT (name) DO UPDATE SET document_count = EXCLUDED.document_count
    115     """
    116 
    117     cursor.execute(update_query, (term, count))
    118     cursor.close()
    119     conn.commit()
    120 
    121 
    122 def get_k_documents(conn, k):
    123 
    124     # Duplicate urls in the same batch can be painful.
    125 
    126     query = """
    127         WITH unique_urls AS (
    128             SELECT DISTINCT ON (url) id
    129             FROM indexing_queue
    130             WHERE status = 'pending'
    131             ORDER BY url, creation_timestamp ASC
    132         ),
    133         to_process AS (
    134             SELECT id FROM indexing_queue
    135             WHERE id IN (SELECT id FROM unique_urls)
    136             ORDER BY creation_timestamp ASC
    137             LIMIT %s
    138             FOR UPDATE SKIP LOCKED
    139         )
    140         UPDATE indexing_queue
    141         SET status = 'processing', claimed_at = NOW()
    142         WHERE id IN (SELECT id FROM to_process)
    143         RETURNING url, filepath;
    144     """
    145 
    146     cursor = conn.cursor()
    147     cursor.execute(query, (k,))
    148     results = cursor.fetchall()
    149     cursor.close()
    150 
    151     urls = [res[0] for res in results]
    152     filepaths  = [res[1] for res in results]
    153     return urls, filepaths
    154 
    155 def get_title_postings(filepath):
    156     title = lxml.html.parse(filepath).find('.//title').text
    157     words = get_words(title)
    158     return words
    159 
    160 def get_url_postings(url):
    161     ext = tldextract.extract(url)
    162     parts = [ext.subdomain, ext.domain]
    163     parts = [p for p in parts if p and p != 'www']
    164     path = urlparse(url).path
    165     combined = '.'.join(parts) + path
    166     words = get_words(combined)
    167 
    168     postings = {}
    169     for position, word in enumerate(words):
    170         if word not in postings:
    171             postings[word] = []
    172         postings[word].append(position)
    173     return postings
    174 
    175 def update_collection_metrics(conn):
    176 
    177     # TODO: This should lock the db for consistency of terms and document count.
    178     # Ehh, maybe close enough is good enough... 
    179 
    180     query = """
    181         SELECT DISTINCT term FROM document_term;
    182     """
    183     cursor = conn.cursor()
    184     cursor.execute(query)
    185     
    186     distinct_terms = [res[0] for res in cursor.fetchall()]
    187     print('Fetched distinct terms')
    188 
    189     for term in distinct_terms:
    190         full_term_update(conn, term)
    191 
    192     query = """
    193         SELECT count(url) FROM page;
    194     """
    195 
    196     cursor = conn.cursor()
    197     cursor.execute(query)
    198     distinct_urls = cursor.fetchall()[0][0]
    199     cursor.close()
    200 
    201     print('Fetched page count')
    202 
    203     query = """
    204         SELECT AVG(term_count) FROM page;
    205     """
    206 
    207     cursor = conn.cursor()
    208     cursor.execute(query)
    209     average_document_length = cursor.fetchall()[0][0]
    210 
    211     cursor.close()
    212 
    213     print('Fetched average document length')
    214 
    215     query = """
    216         SELECT AVG(url_term_count) FROM page;
    217     """
    218 
    219     cursor = conn.cursor()
    220     cursor.execute(query)
    221     average_url_length = cursor.fetchall()[0][0]
    222     cursor.close()
    223 
    224     query = """
    225         INSERT INTO collection (num_documents, average_document_length, average_url_length)
    226         VALUES (%s, %s, %s)
    227         ON CONFLICT (id) DO UPDATE
    228             SET num_documents = EXCLUDED.num_documents, average_document_length = EXCLUDED.average_document_length,
    229             average_url_length = EXCLUDED.average_url_length
    230     """
    231 
    232     cursor = conn.cursor()
    233     cursor.execute(query, (distinct_urls, average_document_length, average_url_length))
    234     cursor.close()
    235     conn.commit()
    236     print('Inserted collection record')
    237 
    238 
    239 
    240 if __name__ == "__main__":
    241 
    242 
    243 
    244     # {filepath : {term1: [position1, position2], term2: [position1]}}
    245 
    246 
    247     conn = get_indexing_db_connection()
    248     if len(sys.argv) > 1 and sys.argv[1] == "update":
    249         update_collection_metrics(conn)
    250         print('Updated collection metrics.')
    251         exit()
    252 
    253     # TODO: Add all the deletion logic. Still wip so I don't want to mess up my queue yet.
    254     # TODO: Add unlocking for failed attempts (processing + timepassed)
    255 
    256     while True:
    257 
    258         urls, filepaths = get_k_documents(conn, BATCH_SIZE)
    259 
    260         # TODO: Delete existing records during same write transaction.
    261 
    262         if len(urls) == 0:
    263             print('There are no more queued pages! Exiting')
    264             exit()
    265 
    266         filepath_lookup = {}
    267 
    268         for i in range(len(filepaths)):
    269             filepath_lookup[filepaths[i]] = urls[i]
    270 
    271 
    272         remaining = filepaths
    273         print("Pruning documents.")
    274         before = len(filepaths)
    275         remaining = prune_documents(conn, filepaths, MAX_WORKERS)
    276         after = len(remaining)
    277         print("Documents pruned.")
    278         print(f'{before - after} documents removed')
    279 
    280         document_positional_postings = {}
    281         url_word_postings = {}
    282         title_word_postings = {}
    283 
    284         for filepath in remaining:
    285             document_positional_postings[filepath] = get_plaintext_words(filepath)
    286             url_word_postings[filepath] = get_url_postings(filepath_lookup[filepath])
    287             title_word_postings[filepath] = get_title_postings(filepath)
    288 
    289         count = 0
    290 
    291         # TODO: On conflict handling makes this shit slow(er).
    292 
    293         query = """
    294             INSERT INTO document_term (term, url, tf, positional_postings)
    295             VALUES %s
    296             ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings
    297         """
    298 
    299         query_url_term = """
    300             INSERT INTO url_term (term, url, tf, positional_postings)
    301             VALUES %s
    302             ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings
    303             """
    304 
    305         query_title_term = """
    306             INSERT INTO title_term (term, url, tf, positional_postings)
    307             VALUES %s
    308             ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings
    309             """
    310 
    311         query_page = """
    312             INSERT INTO page (url, language, url_term_count, term_count)
    313             VALUES %s
    314             ON CONFLICT (url) DO UPDATE
    315             SET
    316               last_updated_timestamp = NOW(),
    317               term_count = EXCLUDED.term_count,
    318               url_term_count = EXCLUDED.url_term_count,
    319               language = EXCLUDED.language;
    320         """
    321 
    322         term_inserts = []
    323         url_term_inserts = []
    324         title_term_inserts = []
    325         page_inserts = []
    326 
    327         for filepath in remaining:
    328             url = filepath_lookup[filepath]
    329             
    330             doc_terms_data = document_positional_postings[filepath]
    331             document_term_count = sum(len(positions) for positions in doc_terms_data.values())
    332             
    333             for term, positional_postings in doc_terms_data.items():
    334                 tf = len(positional_postings) / document_term_count
    335                 term_inserts.append((term, url, tf, positional_postings))
    336                 count += 1
    337             
    338             url_terms_data = url_word_postings[filepath]
    339             title_terms_data = title_word_postings[filepath]
    340             title_term_count = sum(len(positions) for positions in title_terms_data.values())
    341             url_term_count = sum(len(positions) for positions in url_terms_data.values())
    342 
    343             for term, positional_postings in title_terms_data.items():
    344                 tf = len(positional_postings) / title_term_count if title_term_count > 0 else 0
    345                 title_term_inserts.append((term, url, tf, positional_postings))
    346             
    347             for term, positional_postings in url_terms_data.items():
    348                 tf = len(positional_postings) / url_term_count if url_term_count > 0 else 0
    349                 url_term_inserts.append((term, url, tf, positional_postings))
    350             
    351             page_inserts.append((url, 'en', url_term_count, document_term_count))
    352 
    353         cursor = conn.cursor()
    354 
    355         execute_values(
    356             cursor,
    357             query_title_term,
    358             title_term_inserts,
    359             page_size=100000
    360         )
    361 
    362         execute_values(
    363             cursor,
    364             query_url_term,
    365             url_term_inserts,
    366             page_size=100000
    367         )
    368 
    369         execute_values(
    370             cursor,
    371             query,
    372             term_inserts,
    373             page_size=100000 # TODO: Optimize this. It's kinda quick, but also random, the number that is.
    374         )
    375         execute_values(
    376             cursor,
    377             query_page,
    378             page_inserts,
    379             page_size=100000
    380         )
    381 
    382         cursor.close()
    383         conn.commit()
    384         print(f'Inserted {count} terms with positional postings')
    385         
    386     conn.close()