commit 7844a7045ce08b808a43213e2f2a24588b3109cd
parent 26aaac222ee693bd1d6ec1306ec135468ddb3deb
Author: Andrew Laack <andrew.laack@imbue.com>
Date: Thu, 1 Jan 2026 15:56:18 -0600
Reworked spider to queue links
Diffstat:
4 files changed, 129 insertions(+), 46 deletions(-)
diff --git a/TODO.md b/TODO.md
@@ -27,3 +27,20 @@
- currently just based on length, but I could see information content being useful too
- since we have words, the information content could be computed based on word frequency
- actually, that'd be a bit different because we don't have word frequency globally
+
+
+---
+
+- url lookup table
+ - fixes some of the memory issues
+- ensure pruning prior to writing
+ - should there be a penalty for this? probably
+- Make everything incremental
+ - this is getting too slow to do tf / idf on everything...
+ - I probably don't want to do indexing and crawling at the same time for perf sake so there should be a field added to the site table for 'indexed'.
+
+- How to do eviction of old urls / sites that are queried?
+ - this might be early, but this can ballon quickly (100s of gb added every few days)
+
+- forward / backlink calculation
+ - maybe after crawling as these are derived / indexing type values
diff --git a/collection/spider.py b/collection/spider.py
@@ -1,5 +1,15 @@
+# TODO: Create another database (it should be unique in case it needs to be flushed, it is basically tmp after all)
+# that is a priority queue for links to be parsed. This requires consideration for how we determine what should be
+# searched and when it should be searched.
+
+# MVP: Create db to store links to follow. Add to the bottom and take from the top.
+
+# Q: How should we handle the query timelines?
+# A: We have a day check for selections, but I don't like this. We should instead be
+# using a queue to remove this additional logic. We can reuse that to populate the queue, but that is
+# unrelated.
+
import requests
-import time
import os
import datetime
import uuid
@@ -9,6 +19,8 @@ from bs4 import BeautifulSoup
import sys
import base64
import sqlite3
+import random
+from prune import process_file
# Layout:
# - sites
@@ -22,12 +34,17 @@ import sqlite3
# tables:
# site
# url, filepath, date
+ # tf
+ # TODO
+ # - urls.db
+ # - this is only for url lookups. this table can be considered ephemeral
+ # tables:
+ # url, priority, (possible add distance from authority here as well, uncertain)
# bytes
MAX_SIZE = 2_000_000
MAX_WORKERS = 5
-MAX_URLS_PER_LEVEL = 100_000
-MAX_URLS_PER_SITE = 500
+MAX_URLS_PER_SITE = 100
def url_to_filename(url):
return base64.urlsafe_b64encode(url.encode()).decode() + ".html"
@@ -36,7 +53,7 @@ def filename_to_url(filename):
return base64.urlsafe_b64decode(filename[:-5]).decode()
def search_url(url, filepath):
- links = []
+ links = set()
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:142.0) Gecko/20100101 Firefox/142.0',
}
@@ -49,25 +66,53 @@ def search_url(url, filepath):
with open(filepath, 'w') as f:
f.write(content)
print(f'Wrote {url} to {filepath}')
+ deleted = process_file(filepath)
+
+ # we don't want to spider from bad sites.
+ # process_file does some regexp checks on the site to see if it is short / bad in some other way.
+
+ if deleted:
+ return "", "", []
else:
print(f'skipping fs write for {url}, too large')
except Exception as e:
print(e)
return "", "", []
- base_domain = urlparse(url).netloc
+ current_url_without_fragment = urlparse(url)._replace(fragment='').geturl()
+
+ # find all links < max_urls_per_site that direct to a different page.
for link in soup.find_all('a', href=True):
href = link.get('href')
+
+ if href.startswith('#'):
+ continue
+
absolute_url = urljoin(url, href)
parsed = urlparse(absolute_url)
- if parsed.scheme in ('http', 'https') and parsed.netloc == base_domain:
+
+ url_without_fragment = parsed._replace(fragment='').geturl()
+
+ if url_without_fragment == current_url_without_fragment:
+ continue
+
+ if parsed.scheme in ('http', 'https'):
if len(links) < MAX_URLS_PER_SITE:
- links.append(absolute_url)
+ links.add(absolute_url)
return filepath, url, links
+def get_links(num_links, cur_link):
+ cur_link.execute("""
+ SELECT url FROM link ORDER BY priority DESC LIMIT ?
+ """, (num_links,))
+ return {row[0] for row in cur_link.fetchall()}
+
+
if __name__ == "__main__":
- seed_filename = sys.argv[1]
+ seed_filename = ""
+ if len(sys.argv) == 2:
+ seed_filename = sys.argv[1]
con = sqlite3.connect('database/manifest.db', timeout=60)
con.execute('PRAGMA journal_mode=WAL')
cur = con.cursor()
@@ -76,24 +121,34 @@ if __name__ == "__main__":
cur.execute("CREATE INDEX IF NOT EXISTS idx_site_url ON site(url)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_site_filepath ON site(filepath)")
- urls = []
- with open(seed_filename, 'r') as f:
- urls = f.readlines()
- for i in range(len(urls)):
- urls[i] = urls[i].strip()
- print(urls)
- searched = set()
+ con_links = sqlite3.connect('database/urls.db', timeout=60)
+ con_links.execute('PRAGMA journal_mode=WAL')
+ cur_link = con_links.cursor()
+ cur_link.execute("CREATE TABLE IF NOT EXISTS link(url UNIQUE, priority)")
+ cur_link.execute("CREATE INDEX IF NOT EXISTS idx_link_url ON link(url)")
+ cur_link.execute("CREATE INDEX IF NOT EXISTS idx_link_priority ON link(priority)")
+ urls = set()
+
+ if seed_filename != "":
+ urlLs = []
+ with open(seed_filename, 'r') as f:
+ urlLs = f.readlines()
+ for i in range(len(urlLs)):
+ urls.add(urlLs[i].strip())
+ print(f"Loaded seed file with {len(urls)} urls")
save_location = 'sites/'
- depth = 0
- max_depth = int(sys.argv[2])
- # TODO: How can we only search sites we haven't seen recently?
- while len(urls) != 0 and depth < max_depth:
+ # TODO: better stopping. only stops when all links have been traversed
+ while True:
+ if len(urls) == 0:
+ urls = get_links(MAX_WORKERS, cur_link)
+ if len(urls) == 0:
+ print("NO MORE QUEUED LINKS TO SEARCH, EXITING")
+ break
+ print(f"Loaded {len(urls)} urls from queue")
- print(f"Depth {depth}: processing {len(urls)} URLs")
- next_urls = set()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
now = datetime.datetime.now()
pth = save_location + now.strftime("%Y-%m-%d") + "/"
@@ -105,27 +160,29 @@ if __name__ == "__main__":
for future in as_completed(futures):
filepath, url, links = future.result()
-
- for link in links:
- if link not in searched and link not in urls and len(next_urls) < MAX_URLS_PER_LEVEL:
- next_urls.add(link)
-
- time = datetime.datetime.now().timestamp()
if filepath != '' and url != '':
+ # dequeue: this assumes we can only queue once. it might make sense to not do this in the future.
+ cur_link.execute("""
+ DELETE FROM link where url = ?
+ """, (url, ))
+ con_links.commit()
+
+ # insert into site list
cur.execute("""
INSERT INTO site VALUES (?, ?, ?)
- """, (url, filepath, time))
+ """, (url, filepath, datetime.datetime.now().timestamp()))
con.commit()
-
- one_day_ago = (datetime.datetime.now() - datetime.timedelta(days=1)).timestamp()
- fresh_urls = []
- for url in next_urls:
- cur.execute("SELECT 1 FROM site WHERE url = ? AND date > ?", (url, one_day_ago))
- if cur.fetchone() is None:
- fresh_urls.append(url)
- else:
- print('Already traversed today, skipping')
-
- urls = fresh_urls # haven't traversed in the last day.
-
- depth += 1
+ for link in links:
+ # TODO: Make priority better, also speed this up with transactions
+ # also, do we want duplicates? we assume earlier ones are better than the current, but that is weird
+ cur_link.execute("""
+ INSERT OR IGNORE INTO link VALUES (?, ?)
+ """, (link, random.randint(0,10000)))
+ con_links.commit()
+ urls = set()
+
+
+ cur_link.close()
+ con_links.close()
+ cur.close()
+ con.close()
diff --git a/indexing/tf.py b/indexing/tf.py
@@ -10,11 +10,13 @@ if __name__ == "__main__":
# returns a set of all indexed terms
terms = get_terms('database/manifest.db')
+ print(f'fetched {len(terms)} terms')
# tfs is a dict:
# term : tf
# tf(document_path, term, value)
+ # having the one db makes the deletion very slow.
con = sqlite3.connect('database/manifest.db', timeout=60)
con.execute('PRAGMA journal_mode=WAL')
cur = con.cursor()
@@ -23,7 +25,7 @@ if __name__ == "__main__":
cur.execute("CREATE INDEX IF NOT EXISTS idx_tf_term ON tf(term)")
cur.execute("DELETE FROM tf")
- # the reason we want a term list is because that gives us a guarantee about having an idf
+ # the reason we want a term list is because that gives us a guarantee about having an idf.
# considerations can be made for the necessity of this, but I think this is safe for now.
# TODO: Possibly add idf imputation during for searches on terms that don't exist in the term table.
diff --git a/indexing/utils.py b/indexing/utils.py
@@ -13,7 +13,11 @@ nltk.download('stopwords')
# get term frequencies for file given our list of terms (we only care about indexed terms which doesn't include certain terms like stop words)
def get_tfs(filepath, terms):
plaintext = get_plaintext(filepath)
- return get_word_counts_in_included(plaintext, terms)
+ term_counts, total_terms = get_word_counts_in_included_and_total_words(plaintext, terms)
+ tfs = {}
+ for term in term_counts:
+ tfs[term] = term_counts[term] / total_terms
+ return tfs
# select all terms from manifest.db
def get_terms(db_path):
@@ -48,7 +52,9 @@ def get_plaintext_words(filepath):
plaintext = get_plaintext(filepath)
return get_words(plaintext)
-def get_word_counts_in_included(text, included):
+# NOTE: The total number of terms is based on the inclusion list
+def get_word_counts_in_included_and_total_words(text, included):
+ total_terms = 0
current = {}
lines = text
lines = re.sub('[^0-9a-zA-Z]+', ' ', lines)
@@ -61,7 +67,8 @@ def get_word_counts_in_included(text, included):
current[cw] = 1
else:
current[cw] += 1
- return current
+ total_terms += 1
+ return current, total_terms
@@ -94,7 +101,7 @@ def get_filepaths(document_directory):
# given all of this, it seems right for now to simply index on each token. further considerations can be made later, but this seems reasonable, especially if we are only indexing large sites.
-def get_log_doc_freqs(filepaths, lower_bound_percentage=0, upper_bound_percentage=0, filter_stop_words=False):
+def get_log_doc_freqs(filepaths, lower_bound_percentage=0, upper_bound_percentage=1, filter_stop_words=False):
stop_words = set()
if filter_stop_words:
stop_words = set(stopwords.words('english'))