commit 2fd9d19d4eb4a93e01e89647fea0cd40de0850e3
parent 7067745bb56e6e07eabb2574447937fbc0ec3779
Author: Andrew Laack <andrew.laack@imbue.com>
Date: Mon, 5 Jan 2026 21:05:35 -0600
Made a bunch of changes to bootstrap tfidf for full text and url search.
Diffstat:
14 files changed, 872 insertions(+), 29 deletions(-)
diff --git a/Makefile b/Makefile
@@ -1,2 +1,3 @@
clean:
- python3 crawling/clean.py
+ python3 -m crawling.clean
+ python3 -m indexing.clean
diff --git a/TODO.md b/TODO.md
@@ -0,0 +1,21 @@
+1. crawler (A crawler bg job is responsible for cleaning up old files, even if they exist in the indexing_queue, crawl_cache is ephemeral)
+ - pulls link from queue
+ - pull data
+ - saves to disk
+ - extracts links to crawl next
+ - adds file to indexing queue
+2. indexer (NOTE: The indexer never changes crawl_cache data, even after using all of the information it needs)
+ - if the file exists on disk it loads it into memory / copies it to a persistent location for manipulation
+ - parses data and performs calculations for scoring
+ - deletes existing indexed data for the current url from the db and in the same transaction
+ writes all necessary data to support search engine functionality atomically to the db including snippets
+ (NOTE: The old data should still be deleted in certain circumstances even if there is nothing to insert)
+3. search engine
+ - ranks search results, uses indexed snippets and scoring.
+
+---
+
+- How to do incremental updates of idf over time?
+ - should this just be derived by querying across all terms on some timescale?
+ - that would be easier since all the data is known, and really isn't that bad because it is more of a suggestion than anything else
+ as long as it updates over time, that should suffice (take advantage of new information)
diff --git a/crawling/__init__.py b/crawling/__init__.py
diff --git a/crawling/clean.py b/crawling/clean.py
@@ -1,11 +1,11 @@
-from spider import get_crawling_db_connection
-import os
-from spider import get_indexing_db_connection
+from crawling.spider import get_crawling_db_connection
+import shutil
+from crawling.spider import get_indexing_db_connection
if __name__ == "__main__":
try:
- os.rmdir("/var/lib/search/crawl_cache")
+ shutil.rmtree("/var/lib/search/crawl_cache")
except FileNotFoundError as e:
print("Crawl cache directory doesn't exist, continuing with cleanup")
crawling_conn = get_crawling_db_connection()
diff --git a/crawling/clean_cache.py b/crawling/clean_cache.py
@@ -0,0 +1,58 @@
+from crawling.spider import get_indexing_db_connection
+import os
+import glob
+from crawling.constants import CACHE_DIRECTORY
+import time
+
+def get_queued_cache_files(conn):
+ cursor = conn.cursor()
+ query = """
+ SELECT filepath FROM indexing_queue
+ """
+ cursor.execute(query)
+ results = [res[0] for res in cursor.fetchall()]
+ return results
+
+def get_all_cache_files():
+ document_directory = CACHE_DIRECTORY
+ filepaths = []
+ for filepath in glob.iglob(document_directory + '**/**', recursive=True):
+ if os.path.isfile(filepath):
+ filepaths.append(filepath)
+ return filepaths
+
+def get_old_files(filepaths):
+
+ # This is ample time to ensure the files are not in the process of being inserted into the db.
+ one_hour_ago = time.time() - 3600
+ old_files = []
+ for filepath in filepaths:
+ if os.stat(filepath).st_ctime < one_hour_ago:
+ old_files.append(filepath)
+ return old_files
+
+
+def clean_cache():
+ conn = get_indexing_db_connection()
+ all_cache_files = get_all_cache_files()
+ queued_cache_files = set(get_queued_cache_files(conn))
+ unqueued_files = []
+
+ for filepath in all_cache_files:
+ if filepath not in queued_cache_files:
+ unqueued_files.append(filepath)
+
+ to_delete = get_old_files(unqueued_files)
+ print(f"Deleting {len(to_delete)} old files.")
+
+ deleted = 0
+ for filepath in to_delete:
+ try:
+ os.remove(filepath)
+ deleted += 1
+ except:
+ print(f"Failed to delete {filepath}, continuing on")
+ print(f"Deleted {deleted} old files.")
+
+if __name__ == "__main__":
+ clean_cache()
diff --git a/crawling/spider.py b/crawling/spider.py
@@ -1,3 +1,21 @@
+# GUARANTEE:
+ # the crawler will crawl every url in the crawling queue at least once
+ # can be more in cases of crashing
+ # the crawler bg job will not delete files that are in the indexing queue, but will delete all orphans
+ # When the bg job runs, it will find old files (based on mtime), and then
+ # check if they are queued for indexing, if they are, they persist. if they arent,
+ # they are deleted.
+ # mtimes are useful here because there is a non-zero amount of time between fs write and db queue insertion.
+ # This guarantees fs usage won't ballon for no reason over time unless the indexer
+ # queue is growing.
+ # we assume the indexer will delete records from the queue to support this, but not files on the fs
+ # every url that is crawled will make its way into the indexer queue with a lifetime
+ # associated with the lifetime of said record
+
+# TODO: To support the above guarantee, we must implement logic to unlock records that
+# were set as being processed in the past, but the execution of said crawler crashed
+# before deleting the record from the queue.
+
import urllib.robotparser
from urllib.parse import urlparse
import urllib.request
@@ -7,13 +25,13 @@ from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import sys
import psycopg2
-from constants import CRAWLING_DB
-from constants import INDEXING_DB
-from constants import CACHE_DIRECTORY
-from constants import DB_PASSWORD_ENV_VAR
-from constants import DB_USER_ENV_VAR
-from constants import DB_HOST
-from constants import DB_PORT
+from crawling.constants import CRAWLING_DB
+from crawling.constants import INDEXING_DB
+from crawling.constants import CACHE_DIRECTORY
+from crawling.constants import DB_PASSWORD_ENV_VAR
+from crawling.constants import DB_USER_ENV_VAR
+from crawling.constants import DB_HOST
+from crawling.constants import DB_PORT
import urllib
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -25,7 +43,8 @@ MAX_URLS_PER_SITE = 100
# number of concurrent workers for thread pool executor
MAX_WORKERS = 50
-# TODO: Only queue if we haven't already indexed it recently.
+# TODO: Only queue if we haven't already indexed it recently, or some other logic here.
+
def queue_urls_for_crawling(conn, urls, prior_depth):
current_depth = prior_depth + 1
for url in urls:
@@ -55,7 +74,7 @@ def crawl_url(url, filepath):
headers = {
- 'User-Agent': user_agent,
+ 'User-agent': user_agent,
}
try:
@@ -100,6 +119,11 @@ def crawl_url(url, filepath):
if url_without_fragment == current_url_without_fragment:
continue
+
+ # TODO: Rank domains / subdomains or something like that
+ # there are some really long domains that seem to trap my crawlers.
+ if len(absolute_url) > 50:
+ continue
if parsed.scheme in ('http', 'https'):
if len(links) < MAX_URLS_PER_SITE:
@@ -139,30 +163,36 @@ def move_url_to_indexing_if_success(conn, url, filepath, success, conn_indexing)
cursor.execute(delete_query, (url,))
cursor.close()
+
if success:
- assert os.path.isfile(filepath)
+ assert os.path.isfile(filepath) # If this is not true, this file was created more than an hour ago and
+ # the cleanup job cleared it. Either that, or the file was never created.
+ # This is an assertion because the hour time was chosen to give ample time
+ # between file creation and insertion into the indexing_queue.
+
+ # The only way the filepath doesn't exist, but is still inserted into the db is if the file was created
+ # more than an hour ago, but was deleted by the background job between the above assertion and commiting
+ # this transaction. This would be exceedingly unlikely, so much so that it should be considered impossible
+ # as it would be insane that the file was created on the filessytem and a few operations took exactly an amount
+ # of time > 1 hour and identical to when the timer runs.
cursor_indexing = conn_indexing.cursor()
- cursor_indexing.execute("SELECT filepath FROM indexing_queue WHERE url = %s", (url,))
- existing = cursor_indexing.fetchone()
- old_filepath = existing[0] if existing else None
upsert_query = """
INSERT INTO indexing_queue (url, filepath) VALUES (%s, %s)
- ON CONFLICT (url) DO UPDATE SET filepath = EXCLUDED.filepath
"""
cursor_indexing.execute(upsert_query, (url, filepath))
- if old_filepath and os.path.isfile(old_filepath):
- os.remove(old_filepath)
cursor_indexing.close()
conn_indexing.commit()
conn.commit()
+
def get_k_urls_with_depth_from_db(conn, k):
cursor = conn.cursor()
# TODO: Improve this to make use of all attributes we have.
+
select_top_priority_elements_query = """
UPDATE queued_site
SET status = 'processing', claimed_at = NOW()
@@ -170,8 +200,9 @@ def get_k_urls_with_depth_from_db(conn, k):
SELECT url
FROM queued_site
WHERE status = 'pending'
- ORDER BY depth DESC, creation_timestamp ASC
+ ORDER BY depth ASC, creation_timestamp ASC
LIMIT %s
+ FOR UPDATE SKIP LOCKED
)
RETURNING url, depth;
"""
@@ -191,9 +222,20 @@ def ensure_indexing_queue_and_get_connection(db_name, db_user, db_password, db_h
port=db_port
)
cursor = conn.cursor()
+
+ # These statuses are **only** for the indexer, not for the crawler.
+ # This is the same pattern the crawler uses to ensure it only dispatches one indexer per record
+
+ # Also, the indexing queue should only be added to by the crawler. If something must be re-indexed,
+ # it should be added to the crawler queue and go through the entire process again.
+
create_table_query = """
CREATE TABLE IF NOT EXISTS indexing_queue (
- url TEXT PRIMARY KEY,
+ id SERIAL PRIMARY KEY,
+ url TEXT NOT NULL,
+ status TEXT DEFAULT 'pending' NOT NULL
+ CHECK (status IN ('pending', 'processing')),
+ claimed_at TIMESTAMPTZ,
filepath TEXT NOT NULL,
creation_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL
);
@@ -237,6 +279,132 @@ def ensure_queued_sites_and_get_connection(db_name, db_user, db_password, db_hos
cursor.close()
return conn
+
+def ensure_page_table(conn):
+ cursor = conn.cursor()
+
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS page (
+ url TEXT PRIMARY KEY,
+ language TEXT NOT NULL,
+ term_count INTEGER NOT NULL CHECK (term_count >= 0),
+ url_term_count INTEGER NOT NULL CHECK (url_term_count >= 0),
+ last_updated_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL
+ );
+ """
+
+ cursor.execute(create_table_query)
+ conn.commit()
+ cursor.close()
+ return conn
+
+
+def ensure_collection_table(conn):
+ cursor = conn.cursor()
+
+ # TODO: This should probably be a materialized view
+
+ # TODO: Average should probably be a float
+
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS collection (
+ id INTEGER PRIMARY KEY DEFAULT 0,
+ num_documents INTEGER NOT NULL CHECK (num_documents >= 0),
+ average_document_length INTEGER NOT NULL CHECK (average_document_length >= 0),
+ average_url_length INTEGER NOT NULL CHECK (average_url_length >= 0)
+ );
+ """
+
+ cursor.execute(create_table_query)
+ conn.commit()
+ cursor.close()
+ return conn
+
+def ensure_terms_table(conn):
+ cursor = conn.cursor()
+
+ # TODO: This should probably be a materialized view
+
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS term (
+ name TEXT PRIMARY KEY,
+ document_count INTEGER NOT NULL CHECK (document_count >= 0)
+ );
+ """
+
+ cursor.execute(create_table_query)
+ conn.commit()
+ cursor.close()
+ return conn
+
+
+def ensure_title_term_table(conn):
+ cursor = conn.cursor()
+
+ # inverse index
+ # TODO: Add cleanup for this (in clean.py)
+
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS title_term (
+ term TEXT,
+ url TEXT NOT NULL,
+ tf REAL NOT NULL,
+ positional_postings INTEGER [],
+ PRIMARY KEY(term, url)
+ );
+ CREATE INDEX IF NOT EXISTS idx_title_term ON title_term (term);
+ """
+
+ cursor.execute(create_table_query)
+ conn.commit()
+ cursor.close()
+ return conn
+
+def ensure_url_term_table(conn):
+ cursor = conn.cursor()
+
+ # inverse index
+ # TODO: Add cleanup for this (in clean.py)
+
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS url_term (
+ term TEXT,
+ url TEXT NOT NULL,
+ tf REAL NOT NULL,
+ positional_postings INTEGER [],
+ PRIMARY KEY(term, url)
+ );
+ CREATE INDEX IF NOT EXISTS idx_url_term ON url_term (term);
+ """
+
+ cursor.execute(create_table_query)
+ conn.commit()
+ cursor.close()
+ return conn
+
+
+def ensure_document_term_table(conn):
+ cursor = conn.cursor()
+
+ # inverse index
+ # TODO: Add cleanup for this (in clean.py)
+
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS document_term (
+ term TEXT,
+ url TEXT NOT NULL,
+ tf REAL NOT NULL,
+ positional_postings INTEGER [],
+ PRIMARY KEY(term, url)
+ );
+ CREATE INDEX IF NOT EXISTS idx_document_term ON document_term (term);
+ """
+
+ cursor.execute(create_table_query)
+ conn.commit()
+ cursor.close()
+ return conn
+
def insert_seed_file(filepath, conn):
with open(filepath, 'r') as f:
urls = f.readlines()
@@ -257,9 +425,16 @@ def get_indexing_db_connection():
password = os.getenv(DB_PASSWORD_ENV_VAR)
username = os.getenv(DB_USER_ENV_VAR)
conn_indexing_queue = ensure_indexing_queue_and_get_connection(INDEXING_DB, username, password, DB_HOST, DB_PORT)
+ ensure_terms_table(conn_indexing_queue)
+ ensure_url_term_table(conn_indexing_queue)
+ ensure_title_term_table(conn_indexing_queue)
+ ensure_document_term_table(conn_indexing_queue)
+ ensure_collection_table(conn_indexing_queue)
+ ensure_page_table(conn_indexing_queue)
return conn_indexing_queue
if __name__ == "__main__":
+
if not os.path.exists(CACHE_DIRECTORY):
os.makedirs(CACHE_DIRECTORY)
@@ -282,6 +457,7 @@ if __name__ == "__main__":
for future in as_completed(futures):
url, success, filepath, links = future.result()
# success means the html was written to the filepath
+ # if not success, just delete from the db
move_url_to_indexing_if_success(conn, url, filepath, success, conn_indexing_queue)
current_urls_depth = urls_dict[url]
queue_urls_for_crawling(conn, links, current_urls_depth)
diff --git a/indexing/README.md b/indexing/README.md
@@ -0,0 +1,48 @@
+# Indexing
+
+The indexer reads from the indexing queue and indexes results in said queue.
+
+## Guarantees
+
+- Every filepath / url pair added to the indexing queue will start the indexing process at least once
+ - NOTE: There is an early pruning process that may result in it not appearing in searches.
+ - This removes non-english documents and documents that are short after reading the plaintext.
+ - This is achieved with the pending status and pending status unlocking based on time passed
+ - It is uncertain if we will guarantee correct ordering, because there may be additional priorities
+ so if the same url is added with different filepaths it shouldn't be assumed which one will be indexed.
+- All information required for the search engine to function will be stored in the indexing database
+ - No necessary data, including snippets, will be stored on the filesystem
+ - There is still consideration for where things like pagerank will live, so those calculations may not be
+ part of indexing, but the information required to do that will reside in the indexing db.
+- Old indexed data will be removed from the database in the same transaction as new data being added for a given page
+- Old indexed data may be removed at any time, for any reason.
+
+## Metrics
+
+- BM25
+ - Calculation for query term q_i:
+ - IDF(q_i) * ((occurences in document * (k_1 + 1)) / (occurences in document + k_1) * (1 - b + b * (terms in d / average document length)))
+ - We note k_1 and b are hyperparameters often:
+ - k_1 \in [1.2, 2.0]
+ - b = 0.75
+ - Given the above, we must be able to determine how many instances of the term there are in a document, average document length, and the total number of terms in the document.
+ - We also need to idf which requires the total number of documents and the number of documents containing the term
+- Page rankings
+- Domain rankings
+
+## Schema
+
+(Consider how to rank domains / pages by quality beyond bm25 and such)
+(consider how to include serach information like paragraphs, titles, etc.)
+
+- indexing_queue(id, url, status, claimed_at, filepath, creation_timestamp)
+- page(url, language, url_term_count, term_count, last_updated_timestamp) -- currently only supports english
+- document_term(term, url, tf, positional_postings)
+- url_term(term, url, tf, positional_postings)
+- title_term(term, url, tf, positional_postings)
+- link(source, destination)
+- term(name, document_count) -- should this be a computed value instead of document_count?
+- collection(num_documents, average_document_length, average_url_length)
+
+
+TODO: Add tf to document_term table.
diff --git a/indexing/__init__.py b/indexing/__init__.py
diff --git a/indexing/clean.py b/indexing/clean.py
@@ -0,0 +1,17 @@
+from crawling.spider import get_indexing_db_connection
+
+
+if __name__ == "__main__":
+ indexing_conn = get_indexing_db_connection()
+ indexing_cur = indexing_conn.cursor()
+
+ indexing_cur.execute("""
+ DROP TABLE term;
+ """)
+ indexing_cur.close()
+ indexing_conn.commit()
+ indexing_conn.close()
+
+ print("Indexing datbase cleaned")
+ # We don't want to clear the queue here, run make clean if you want everything cleaned, this is just for
+ # indexed information.
diff --git a/indexing/page_parsing.py b/indexing/page_parsing.py
@@ -0,0 +1,348 @@
+from psycopg2.extras import execute_values
+import tldextract
+from urllib.parse import urlparse
+import time
+import sys
+from indexing.utils import get_plaintext, get_words, get_plaintext_words
+import gc
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import random
+from indexing.utils import detect_language
+import re
+import tqdm
+from crawling.spider import get_indexing_db_connection
+
+MAX_WORKERS = 5
+BATCH_SIZE = 1000
+
+def get_term_list(filepath):
+
+ plaintext = get_plaintext(filepath)
+ cleaned = re.sub(r'[^a-zA-Z0-9\s]', ' ', plaintext).lower()
+ terms = cleaned.split()
+
+ final_terms = []
+ for term in terms:
+ term = term.strip()
+ if len(term) <= 50 and len(term) > 0:
+ final_terms.append(term)
+ return final_terms
+
+def remove_from_queue(conn, filepath):
+ cursor = conn.cursor()
+ # Delete by filepath because the same url can be queued multiple times.
+ query = """
+ DELETE FROM indexing_queue
+ WHERE filepath = %s
+ """
+ cursor.execute(query, (filepath,))
+ cursor.close()
+ conn.commit()
+
+# No need to remove existing ones, that will be handled gracefully.
+def add_terms(conn, terms):
+ cursor = conn.cursor()
+ query = """
+ INSERT INTO term (name)
+ VALUES (%s) ON CONFLICT (name) DO NOTHING
+ """
+ cursor.executemany(
+ query,
+ [(name,) for name in terms]
+ )
+ cursor.close()
+ conn.commit()
+
+def get_terms(conn):
+ cursor = conn.cursor()
+ query = """
+ SELECT name from term
+ """
+ cursor.execute(query)
+ results = [res[0] for res in cursor.fetchall()]
+ return results
+
+
+def process_file(filepath):
+ language = detect_language(filepath)
+ if language != 'en':
+ return filepath, True
+
+ text = get_plaintext(filepath)
+
+ if len(text) < 750:
+ return filepath, True
+ return filepath, False
+
+# TODO: Optimize this. we are repeatedly getting the plaintext here (if the lang isn't in the html, and for length.)
+
+def prune_documents(conn, filepaths, max_workers):
+
+ remaining = []
+
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ futures = {executor.submit(process_file, fp): fp for fp in filepaths}
+
+ for future in tqdm.tqdm(as_completed(futures), total=len(filepaths)):
+ filepath, should_delete = future.result()
+ if should_delete:
+ remove_from_queue(conn, filepath)
+ else:
+ remaining.append(filepath)
+ del future
+ return remaining
+
+# Use the existing data to derive term document_count per term
+def full_term_update(conn, term):
+
+ query = """
+ SELECT COUNT(url) FROM document_term WHERE term = %s;
+ """
+
+ cursor = conn.cursor()
+ cursor.execute(query, (term,))
+
+ count = cursor.fetchall()[0][0]
+
+
+ update_query = """
+ INSERT INTO term (name, document_count)
+ VALUES (%s, %s)
+ ON CONFLICT (name) DO UPDATE SET document_count = EXCLUDED.document_count
+ """
+
+ cursor.execute(update_query, (term, count))
+ cursor.close()
+ conn.commit()
+
+
+def get_k_documents(conn, k):
+ query = """
+ UPDATE indexing_queue
+ SET status = 'processing', claimed_at = NOW()
+ WHERE id IN(
+ SELECT id FROM indexing_queue
+ WHERE status = 'pending'
+ ORDER BY creation_timestamp asc
+ LIMIT %s
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING url, filepath;
+ """
+
+ cursor = conn.cursor()
+ cursor.execute(query, (k,))
+ results = cursor.fetchall()
+ cursor.close()
+
+ urls = [res[0] for res in results]
+ filepaths = [res[1] for res in results]
+ return urls, filepaths
+
+# TODO: Should I apply stemming to this?
+
+def get_url_postings(url):
+ ext = tldextract.extract(url)
+ parts = [ext.subdomain, ext.domain]
+ parts = [p for p in parts if p and p != 'www']
+ path = urlparse(url).path
+ combined = '.'.join(parts) + path
+ words = re.sub(r'[^a-zA-Z0-9\s]', ' ', combined).lower().split()
+
+ postings = {}
+ for position, word in enumerate(words):
+ if word not in postings:
+ postings[word] = []
+ postings[word].append(position)
+ return postings
+
+def update_collection_metrics(conn):
+
+ # TODO: This should lock the db for consistency of terms and document count.
+ # Ehh, maybe close enough is good enough...
+
+ query = """
+ SELECT DISTINCT term FROM document_term;
+ """
+ cursor = conn.cursor()
+ cursor.execute(query)
+
+ distinct_terms = [res[0] for res in cursor.fetchall()]
+ print('Fetched distinct terms')
+
+ for term in distinct_terms:
+ full_term_update(conn, term)
+
+ query = """
+ SELECT count(url) FROM page;
+ """
+
+ cursor = conn.cursor()
+ cursor.execute(query)
+ distinct_urls = cursor.fetchall()[0][0]
+ cursor.close()
+
+ print('Fetched page count')
+
+ query = """
+ SELECT AVG(term_count) FROM page;
+ """
+
+ cursor = conn.cursor()
+ cursor.execute(query)
+ average_document_length = cursor.fetchall()[0][0]
+
+ cursor.close()
+
+ print('Fetched average document length')
+
+ query = """
+ SELECT AVG(url_term_count) FROM page;
+ """
+
+ cursor = conn.cursor()
+ cursor.execute(query)
+ average_url_length = cursor.fetchall()[0][0]
+ cursor.close()
+
+ query = """
+ INSERT INTO collection (num_documents, average_document_length, average_url_length)
+ VALUES (%s, %s, %s)
+ ON CONFLICT (id) DO UPDATE
+ SET num_documents = EXCLUDED.num_documents, average_document_length = EXCLUDED.average_document_length,
+ average_url_length = EXCLUDED.average_url_length
+ """
+
+ cursor = conn.cursor()
+ cursor.execute(query, (distinct_urls, average_document_length, average_url_length))
+ cursor.close()
+ conn.commit()
+ print('Inserted collection record')
+
+
+
+if __name__ == "__main__":
+
+
+
+ # {filepath : {term1: [position1, position2], term2: [position1]}}
+
+
+ conn = get_indexing_db_connection()
+ if len(sys.argv) > 1 and sys.argv[1] == "update":
+ update_collection_metrics(conn)
+ print('Updated collection metrics.')
+ exit()
+
+ # TODO: Add all the deletion logic. Still wip so I don't want to mess up my queue yet.
+ # TODO: Add unlocking for failed attempts (processing + timepassed)
+
+ while True:
+
+ urls, filepaths = get_k_documents(conn, BATCH_SIZE)
+
+ # TODO: Delete existing records during same write transaction.
+
+ if len(urls) == 0:
+ print('There are no more queued pages! Exiting')
+ exit()
+
+ filepath_lookup = {}
+
+ for i in range(len(filepaths)):
+ filepath_lookup[filepaths[i]] = urls[i]
+
+
+ remaining = filepaths
+ print("Pruning documents.")
+ before = len(filepaths)
+ remaining = prune_documents(conn, filepaths, MAX_WORKERS)
+ after = len(remaining)
+ print("Documents pruned.")
+ print(f'{before - after} documents removed')
+
+ document_positional_postings = {}
+ url_word_postings = {}
+
+ for filepath in remaining:
+ document_positional_postings[filepath] = get_plaintext_words(filepath)
+ url_word_postings[filepath] = get_url_postings(filepath_lookup[filepath])
+
+ count = 0
+
+ # TODO: On conflict handling makes this shit slow(er).
+
+ query = """
+ INSERT INTO document_term (term, url, tf, positional_postings)
+ VALUES %s
+ ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings
+ """
+
+ query_url_term = """
+ INSERT INTO url_term (term, url, tf, positional_postings)
+ VALUES %s
+ ON CONFLICT (term, url) DO UPDATE SET positional_postings = EXCLUDED.positional_postings
+ """
+
+ query_page = """
+ INSERT INTO page (url, language, url_term_count, term_count)
+ VALUES %s
+ ON CONFLICT (url) DO UPDATE
+ SET
+ last_updated_timestamp = NOW(),
+ term_count = EXCLUDED.term_count,
+ url_term_count = EXCLUDED.url_term_count,
+ language = EXCLUDED.language;
+ """
+
+ term_inserts = []
+ url_term_inserts = []
+ page_inserts = []
+
+ for filepath in remaining:
+ url = filepath_lookup[filepath]
+
+ doc_terms_data = document_positional_postings[filepath]
+ document_term_count = sum(len(positions) for positions in doc_terms_data.values())
+
+ for term, positional_postings in doc_terms_data.items():
+ tf = len(positional_postings) / document_term_count
+ term_inserts.append((term, url, tf, positional_postings))
+ count += 1
+
+ url_terms_data = url_word_postings[filepath]
+ url_term_count = sum(len(positions) for positions in url_terms_data.values())
+
+ for term, positional_postings in url_terms_data.items():
+ tf = len(positional_postings) / url_term_count if url_term_count > 0 else 0
+ url_term_inserts.append((term, url, tf, positional_postings))
+
+ page_inserts.append((url, 'en', url_term_count, document_term_count))
+
+ cursor = conn.cursor()
+
+ execute_values(
+ cursor,
+ query_url_term,
+ url_term_inserts,
+ page_size=100000
+ )
+
+ execute_values(
+ cursor,
+ query,
+ term_inserts,
+ page_size=100000 # TODO: Optimize this. It's kinda quick, but also random.
+ )
+ execute_values(
+ cursor,
+ query_page,
+ page_inserts,
+ page_size=100000
+ )
+
+ cursor.close()
+ conn.commit()
+ print(f'Inserted {count} terms with positional postings')
+
+ conn.close()
diff --git a/indexing/utils.py b/indexing/utils.py
@@ -0,0 +1,79 @@
+from langdetect import detect
+from bs4 import BeautifulSoup
+import re
+from bs4 import BeautifulSoup
+from nltk.stem import PorterStemmer
+
+ps = PorterStemmer()
+
+# Only include important information that is formatted well.
+# I want lots of quality information with minimal overhead.
+
+def get_plaintext(filepath):
+
+ with open(filepath, 'r') as f:
+ body = f.read()
+ soup = BeautifulSoup(body, 'html.parser')
+
+ for selector in ['nav', 'footer', 'header', '[role="navigation"]',
+ '.flash-error', '.js-header-wrapper', '.footer']:
+ for tag in soup.select(selector):
+ tag.decompose()
+
+ tags_to_extract = soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'td', 'blockquote', 'code'])
+ return u" ".join(tag.get_text(separator=" ", strip=True) for tag in tags_to_extract)
+
+def get_plaintext_words(filepath):
+ plaintext = get_plaintext(filepath)
+ return get_words(plaintext)
+
+def get_words(text):
+ current = {}
+ lines = text
+
+ # There are special characters with diacritics we probably still want.
+ # only word characters remain
+ # Replaces symbols and such
+
+ lines = re.sub(r"[^\w]+", " ", lines, flags=re.UNICODE)
+ lines = lines.split(' ')
+ for i in range(0, len(lines)):
+ if lines[i] != '':
+ current_word = ps.stem(lines[i].lower())
+ if current_word not in current:
+ current[current_word] = [i]
+ else:
+ current[current_word].append(i)
+ return current
+
+def get_html_language(filepath):
+ try:
+ with open(filepath, 'r', encoding='utf-8') as f:
+ soup = BeautifulSoup(f, 'html.parser')
+ html_tag = soup.find('html')
+ if html_tag and html_tag.get('lang'):
+ return html_tag.get('lang').split('-')[0].lower()
+ except OSError:
+ pass
+
+def detect_language(filepath):
+ result = get_html_language(filepath)
+ if result is not None:
+ return result
+ try:
+
+ # it might be worth considering more stringent criteria.
+ # Sometimes a site will be detected as english, but this is only because of other
+ # on screen artifacts (menus and such).
+
+ # TODO: Should this even exist? If a site doesn't have the path, do we want it?
+
+ detected = detect(get_plaintext(filepath))
+ return detected
+ except:
+
+ # Failed to detect
+ # This can happen if the plaintext is empty or a few other cases.
+ # Should be rare for quality sites.
+
+ return "zz"
diff --git a/run.sh b/run.sh
@@ -0,0 +1,11 @@
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py &
+python3 crawling/spider.py
diff --git a/search/query.py b/search/query.py
@@ -0,0 +1,88 @@
+from crawling.spider import get_indexing_db_connection
+import math
+import sys
+from indexing.page_parsing import get_words
+
+def get_idf(conn, term):
+ cursor = conn.cursor()
+
+ query = """
+ SELECT document_count FROM term
+ WHERE name = %s
+ """
+ cursor.execute(query, (term,))
+
+ result = cursor.fetchall()
+ cursor.close()
+
+
+ cursor = conn.cursor()
+
+ query = """
+ SELECT num_documents FROM collection
+ """
+ cursor.execute(query)
+ total_documents = cursor.fetchall()[0][0]
+ cursor.close()
+
+ documents_with_term = 0
+ if len(result) != 0:
+ documents_with_term = result[0][0]
+
+ if documents_with_term == 0:
+ return 0
+
+ idf = math.log(total_documents / documents_with_term)
+ return idf
+
+def get_url_tfs(conn, term):
+ cursor = conn.cursor()
+ query = """
+ SELECT url, tf FROM url_term
+ WHERE term = %s
+ """
+ cursor.execute(query, (term,))
+ result = cursor.fetchall()
+ result = {res[0]: res[1] for res in result}
+ cursor.close()
+ return result
+
+def get_tfs(conn, term):
+ cursor = conn.cursor()
+ query = """
+ SELECT url, tf FROM document_term
+ WHERE term = %s
+ """
+ cursor.execute(query, (term,))
+ result = cursor.fetchall()
+
+ result = {res[0] : res[1] for res in result}
+
+ cursor.close()
+ return result
+
+if __name__ == "__main__":
+ user_input = sys.argv[1]
+
+ user_input = list(get_words(user_input).keys())
+
+ conn = get_indexing_db_connection()
+
+ tf_idf = {}
+
+ for term_o in user_input:
+ idf = get_idf(conn, term_o)
+
+ tfs = get_tfs(conn, term_o)
+ for url in tfs:
+ tf_idf[url] = tf_idf.get(url, 0) + tfs[url] * idf
+
+ url_tfs = get_url_tfs(conn, term_o)
+ for url in url_tfs:
+ tf_idf[url] = tf_idf.get(url, 0) + 2 * url_tfs[url] * idf
+
+ k = 10
+
+ sorted_results = sorted(tf_idf.items(), key=lambda x: x[1], reverse=True)[:k]
+ for url, score in sorted_results:
+ print(f"{score:.4f} {url}")
diff --git a/seeds/code.txt b/seeds/code.txt
@@ -1,7 +1,3 @@
-https://www.die.net/
+https://landchad.net/
+https://die.net
https://en.wikipedia.org/wiki/List_of_programming_languages
-https://ziglang.org/
-https://rust-lang.org/
-https://www.python.org/
-https://cppreference.com/
-https://cplusplus.com/reference/