commit 8122fad87d302497e97e44d2ec8e709f92e61762
parent 3fc1fa6843dfe44231a12016cfc1b463f2425b1a
Author: andrewlaack-collab <andrew.laack@imbue.com>
Date: Thu, 29 Jan 2026 19:25:41 +0000
Remove unnecessary code + logging (#2)
* Removed unnecessary stuff
* Removed logging stuff, started refactoring error logging stuff (too much inderiction right now).
* Stripped down logging more
* Formatted
Diffstat:
49 files changed, 535 insertions(+), 5012 deletions(-)
diff --git a/imbue_core/imbue_core/agents/agent_api/union_types.py b/imbue_core/imbue_core/agents/agent_api/union_types.py
@@ -1,12 +0,0 @@
-from typing import Annotated
-
-from pydantic import Tag
-
-from imbue_core.agents.agent_api.claude.data_types import ClaudeCodeOptions
-from imbue_core.agents.agent_api.codex.data_types import CodexOptions
-from imbue_core.pydantic_serialization import build_discriminator
-
-AgentOptionsUnion = Annotated[
- Annotated[ClaudeCodeOptions, Tag("ClaudeCodeOptions")] | Annotated[CodexOptions, Tag("CodexOptions")],
- build_discriminator(),
-]
diff --git a/imbue_core/imbue_core/agents/llm_apis/groq_api.py b/imbue_core/imbue_core/agents/llm_apis/groq_api.py
@@ -44,7 +44,7 @@ from imbue_core.secrets_utils import get_secret
# the reason is that these models are actually now mostly deterministic, and it is much easier to debug if we know what model was used
# also, there's no need to troll yourself by wondering why results have improved (or gotten worse) when you dont realized that the version has shifted under you
# if you want to use an upgraded model, just upgrade the model to the key displayed on the website
-# please do NOT set these back to the generic model names! Josh will be very annoyed
+# please do NOT set these back to the generic model names!
# TODO: there are likely more models to add
diff --git a/imbue_core/imbue_core/agents/llm_apis/openai_api.py b/imbue_core/imbue_core/agents/llm_apis/openai_api.py
@@ -55,7 +55,7 @@ from imbue_core.secrets_utils import get_secret
# the reason is that these models are actually now mostly deterministic, and it is much easier to debug if we know what model was used
# also, there's no need to troll yourself by wondering why results have improved (or gotten worse) when you dont realized that the version has shifted under you
# if you want to use an upgraded model, just upgrade the model to the key displayed here: https://platform.openai.com/docs/models/overview
-# please do NOT set these back to the generic model names! Josh will be very annoyed
+# please do NOT set these back to the generic model names!
FINE_TUNED_GPT4O_MINI_2024_07_18_PREFIX = "ft:gpt-4o-mini-2024-07-18"
FINE_TUNED_GPT4O_2024_08_06_PREFIX = "ft:gpt-4o-2024-08-06"
diff --git a/imbue_core/imbue_core/async_monkey_patches.py b/imbue_core/imbue_core/async_monkey_patches.py
@@ -6,24 +6,13 @@ from types import TracebackType
from typing import Any
from typing import Sequence
-import sentry_sdk
+from loguru import logger
-from imbue_core.constants import ExceptionPriority
-from imbue_core.error_utils import get_sentry_event_handler
-from imbue_core.error_utils import get_traceback_with_vars
from imbue_core.errors import ExpectedError
-from imbue_core.s3_uploader import EXTRAS_UPLOADED_FILES_KEY
-from imbue_core.s3_uploader import get_s3_upload_key
-from imbue_core.s3_uploader import get_s3_upload_url
-from imbue_core.s3_uploader import upload_to_s3_with_key
_IS_SHUTTING_DOWN = False
-# This is the name of the attribute we set on our exceptions to ensure they are logged (esp. to Sentry) at most once.
-EXCEPTION_LOGGED_FLAG = "_was_logged_by_log_exception"
-
-
def notify_task_groups_of_shutdown() -> None:
global _IS_SHUTTING_DOWN
_IS_SHUTTING_DOWN = True
@@ -339,101 +328,14 @@ def _filter_exception_group(exc_group: ExceptionGroup) -> list[Exception]:
return result
-def _upload_traceback(key: str, exception: BaseException) -> None:
- tb_with_vars = get_traceback_with_vars(exception)
- if tb_with_vars is not None:
- upload_to_s3_with_key(key, tb_with_vars.encode())
-
-
-def pre_filter_exception(exc: BaseException, message: str | None = None) -> bool:
- # deferred import, will have been imported anyway by this point
- from loguru import logger
-
- if getattr(exc, EXCEPTION_LOGGED_FLAG, False):
- logger.info("Skipping duplicate log of exception {} with message {!r}", exc, message)
- return True
- try:
- setattr(exc, EXCEPTION_LOGGED_FLAG, True)
- except AttributeError:
- logger.info("Unable to guarantee that {} will not be logged again", exc)
- return False
-
-
-def inject_exception_and_log(
- exc: BaseException,
- message: str,
- priority: ExceptionPriority | None = None,
- *args: Any,
- **kwargs: Any,
-) -> None:
- # deferred import, will have been imported anyway by this point
- from loguru import logger
-
- # inject received exception stack trace into logger error message
- options = (exc,) + logger._options[1:] # pyre-fixme[16]: pyre doesn't know that _options exists
- if priority is not None:
- level = priority.value
- else:
- level = "ERROR"
- logger._log(level, False, options, message, args, kwargs) # pyre-fixme[16]: pyre doesn't know that _log exists
-
-
def log_exception(
exc: BaseException,
message: str,
- priority: ExceptionPriority | None = None,
*args: Any,
- sentry_extra_uploads: dict[str, str] | None = None,
**kwargs: Any,
) -> None:
- """Josh doesn't like that `loguru.exception()` takes only a message, and grabs the current exception from sys.exc_info().
-
- So this is a more explicit alternative that takes the exception as an argument.
- """
- should_skip = pre_filter_exception(exc, message)
- if should_skip:
- return None
-
- # use a new scope to ensure these attachments don't bleed to other events that might have the same scope
- # TODO: unify the uploading logic that we have in Sculptor with this, avoid coupling through two global objects (sentry event handler, s3_upload)
- with sentry_sdk.new_scope() as scope:
- sentry_event_handler = get_sentry_event_handler()
- traceback_str = "".join(traceback.format_stack())
- message = f"{message}\n\nlog_exception CALL SITE TRACEBACK:\n\n{traceback_str}\nORIGINAL EXCEPTION TRACEBACK FOLLOWS:\n"
- if sentry_event_handler is not None:
- s3_uploads = []
- callbacks = []
- traceback_str_s3_key = get_s3_upload_key("logsite_traceback", ".txt")
-
- # attach traceback of log_exception callsite
- logsite_url = get_s3_upload_url(traceback_str_s3_key)
- s3_uploads.append(logsite_url)
- callbacks.append(
- lambda key=traceback_str_s3_key, data=traceback_str: upload_to_s3_with_key(key, data.encode())
- )
-
- # for original exception, get traceback with variables and attach
- traceback_with_variables_s3_key = get_s3_upload_key("original_exc_traceback_with_vars", ".txt")
- s3_uploads.append(get_s3_upload_url(traceback_with_variables_s3_key))
- callbacks.append(
- lambda key=traceback_with_variables_s3_key, exception=exc: _upload_traceback(key, exception)
- )
- # upload some extra data if provided
- if sentry_extra_uploads is not None:
- for key, value in sentry_extra_uploads.items():
- key = get_s3_upload_key(key, ".txt")
- s3_uploads.append(get_s3_upload_url(key))
- callbacks.append(lambda key=key, data=value: upload_to_s3_with_key(key, data.encode()))
-
- sentry_event_handler.schedule_callbacks(callbacks)
-
- # watch out; this will stomp on existing "extras" in the event
- s3_uploads = [upload for upload in s3_uploads if upload is not None]
- if s3_uploads:
- scope.set_extra(EXTRAS_UPLOADED_FILES_KEY, s3_uploads)
-
- # inject received exception stack trace into logger error message
- inject_exception_and_log(exc, message, priority, *args, **kwargs)
+ """Log an exception with its traceback to stderr via loguru."""
+ logger.opt(exception=exc).error(message, *args, **kwargs)
def apply() -> None:
diff --git a/imbue_core/imbue_core/async_monkey_patches_test.py b/imbue_core/imbue_core/async_monkey_patches_test.py
@@ -1,4 +1,6 @@
+import sys
from contextlib import contextmanager
+from contextvars import ContextVar
from typing import Any
from typing import Callable
from typing import Generator
@@ -7,47 +9,46 @@ from typing import Iterator
import pytest
from loguru import logger
-from imbue_core.async_monkey_patches import log_exception
-from imbue_core.constants import ExceptionPriority
-
class IncorrectErrorsLoggedDuringTesting(Exception):
pass
+_expecting_errors: ContextVar[bool] = ContextVar("expecting_errors", default=False)
+
+
@contextmanager
def check_logged_errors(check_func: Callable[[list[str]], None]) -> Iterator[None]:
- """Context manager that monkey patches logger._log to accumulate error messages instead of logging them.
- Then it runs the check function on the accumulated errors."""
- original_log_func = logger._log # pyre-fixme[16]: pyre doesn't know that _log exists
+ """Context manager that intercepts ERROR logs using loguru's sink system.
+ Then it runs the check function on the accumulated errors.
+
+ Sets the _expecting_errors context variable so that explode_on_error knows to
+ ignore errors during this block.
+ """
accumulated_errors: list[str] = []
- error_level_names = (
- "ERROR",
- ExceptionPriority.LOW_PRIORITY.value,
- ExceptionPriority.MEDIUM_PRIORITY.value,
- ExceptionPriority.HIGH_PRIORITY.value,
- )
+ token = _expecting_errors.set(True)
+
+ def error_catching_sink(message: Any) -> None:
+ record = message.record
+ if record["level"].name == "ERROR":
+ accumulated_errors.append(record["message"])
+ sys.stderr.write(f"CAUGHT ERROR LOG: {record['message'].splitlines()[0][:100]}\n")
+ else:
+ sys.stderr.write(str(message))
+
+ handler_id = logger.add(error_catching_sink, format="{message}", level="DEBUG")
+ try:
+ logger.remove(0)
+ except ValueError:
+ pass
- logger._log = lambda level, flag, options, message, args, kwargs: (
- (
- accumulated_errors.append(message) is None
- and original_log_func(
- "INFO",
- flag,
- options,
- "CAUGHT ERROR LOG: " + message.splitlines()[0][:100],
- args,
- kwargs,
- )
- )
- if level in error_level_names
- else original_log_func(level, flag, options, message, args, kwargs)
- )
try:
yield
finally:
- logger._log = original_log_func
+ _expecting_errors.reset(token)
+ logger.remove(handler_id)
+ logger.add(sys.stderr, level="DEBUG")
check_func(accumulated_errors)
@@ -72,7 +73,7 @@ def at_least_check_maker(expected_errors_set: set[str]) -> Callable[[list[str]],
@contextmanager
def expect_at_least_logged_errors(expected_errors: set[str]) -> Iterator[None]:
- """Context manager that monkey patches logger._log to accumulate error messages instead of logging them.
+ """Context manager that intercepts ERROR logs using loguru's sink system.
Checks that all expected errors are in the accumulated errors, in no particular order.
"""
check_func = at_least_check_maker(expected_errors)
@@ -99,57 +100,34 @@ def exact_check_maker(expected_errors: list[str]) -> Callable[[list[str]], None]
@contextmanager
def expect_exact_logged_errors(expected_errors: list[str]) -> Iterator[None]:
- """Context manager that monkey patches logger._log to accumulate error messages instead of logging them.
+ """Context manager that intercepts ERROR logs using loguru's sink system.
Checks that all expected errors are in the accumulated errors, in the same order."""
check_func = exact_check_maker(expected_errors)
with check_logged_errors(check_func):
yield
-def test_log_exception() -> None:
- with expect_exact_logged_errors(["Test log_exception"]):
- try:
- x = 1 / 0
- except Exception as e:
- log_exception(e, "Test log_exception")
- assert True # If we reach here, the test passes
- else:
- assert False, "log_exception did not raise an exception"
-
-
-def test_log_exception_with_priority() -> None:
- # ensure_core_log_levels_configured auto-used in conftest
- with expect_exact_logged_errors(["Test log_exception"]):
- try:
- x = 1 / 0
- except Exception as e:
- log_exception(e, "Test log_exception", priority=ExceptionPriority.LOW_PRIORITY)
- assert True # If we reach here, the test passes
- else:
- assert False, "log_exception did not raise an exception"
-
-
@pytest.fixture
def explode_on_error() -> Generator[None, None, None]:
- """Fixture to explode on error."""
- original_log_func = logger._log # pyre-fixme[16]: pyre doesn't know that _log exists
+ """Fixture to explode on error - fails the test if any ERROR logs are recorded."""
accumulated_errors: list[str] = []
- def _log_wrapper(
- level: str,
- flag: int,
- options: tuple[int, ...],
- message: str,
- args: tuple,
- kwargs: dict,
- ) -> Any:
- if level == "ERROR":
- accumulated_errors.append(message)
- new_options = list(options)
- new_options[1] = 1
- return original_log_func(level, flag, tuple(new_options), message, args, kwargs)
-
- logger._log = _log_wrapper
+ def error_catching_sink(message: Any) -> None:
+ record = message.record
+ if record["level"].name == "ERROR":
+ if not _expecting_errors.get():
+ accumulated_errors.append(record["message"])
+ sys.stderr.write(str(message))
+
+ handler_id = logger.add(
+ error_catching_sink,
+ format="{level} | {name}:{function}:{line} - {message}",
+ level="DEBUG",
+ )
+ try:
+ logger.remove(0)
+ except ValueError:
+ pass
try:
yield
@@ -159,7 +137,8 @@ def explode_on_error() -> Generator[None, None, None]:
if len(accumulated_errors) > 0:
raise IncorrectErrorsLoggedDuringTesting(f"Errors logged during testing: {accumulated_errors}")
finally:
- logger._log = original_log_func
+ logger.remove(handler_id)
+ logger.add(sys.stderr, level="DEBUG")
def test_log_error(explode_on_error: Any) -> None:
diff --git a/imbue_core/imbue_core/computing_environment/computing_environment.py b/imbue_core/imbue_core/computing_environment/computing_environment.py
@@ -519,7 +519,7 @@ async def force_push_commit(computing_environment: ComputingEnvironment, commit:
await computing_environment.run_git(["push", "-f", "origin", f"{commit}:{branch_name}"], is_error_logged=False)
except RunCommandError as e:
if "fatal: bad object" in e.stderr:
- # TODO (danielmewes): We're retrying failed fetches here. However, there is also a separate
+ # TODO : We're retrying failed fetches here. However, there is also a separate
# force_push_commit_with_retry method that retries the entire force_push_commit.
# We should probably try the fetch only once, and then rely on the outer
# force_push_commit_with_retry to retry the entire force_push_commit call when retrying is =
diff --git a/imbue_core/imbue_core/conftest.py b/imbue_core/imbue_core/conftest.py
@@ -6,14 +6,6 @@ from typing import Iterator
import pytest
-from imbue_core.log_utils import ensure_core_log_levels_configured
-from imbue_core.test_repo_utils import make_test_data_mock_repo
-
-
-@pytest.fixture(scope="session", autouse=True)
-def setup_logging_and_secrets() -> None:
- ensure_core_log_levels_configured()
-
@contextlib.contextmanager
def create_temp_file(contents: str, suffix: str, root_dir: Path) -> Generator[Path, None, None]:
@@ -25,9 +17,6 @@ def create_temp_file(contents: str, suffix: str, root_dir: Path) -> Generator[Pa
Path(temp_file.name).unlink()
-mock_repo = pytest.fixture(make_test_data_mock_repo)
-
-
@contextlib.contextmanager
def dummy_exception_manager() -> Iterator[None]:
"""
diff --git a/imbue_core/imbue_core/constants.py b/imbue_core/imbue_core/constants.py
@@ -1,21 +0,0 @@
-from enum import StrEnum
-from pathlib import Path
-
-IMBUE_TOML_PATH = Path("~/.imbue/config.toml").expanduser()
-
-DEFAULT_PROJECT_SECRET_PATH = Path(".env")
-
-LOW_PRIORITY_LEVEL = 37
-MEDIUM_PRIORITY_LEVEL = 38
-HIGH_PRIORITY_LEVEL = 39
-
-DISCORD_URL = "https://discord.gg/sBAVvHPUTE"
-
-
-class ExceptionPriority(StrEnum):
- # for issues that will result in the app crashing
- HIGH_PRIORITY = "HIGH_PRIORITY"
- # for issues that will cause major functionality to stop working
- MEDIUM_PRIORITY = "MEDIUM_PRIORITY"
- # everything else -- e.g. exception sites that are typically retriable, but may catch something unrecoverable
- LOW_PRIORITY = "LOW_PRIORITY"
diff --git a/imbue_core/imbue_core/error_utils.py b/imbue_core/imbue_core/error_utils.py
@@ -1,379 +1,14 @@
-import functools
-import os
-import re
+"""Error handling utilities."""
+
import sys
-import threading
-import time
import traceback
-from collections.abc import Callable
-from collections.abc import Collection
-from collections.abc import Hashable
-from enum import StrEnum
-from typing import Any
-from typing import Iterable
-from typing import Mapping
-from typing import MutableMapping
-from typing import assert_never
-import sentry_sdk
-import sentry_sdk.utils
import traceback_with_variables
-from loguru import logger
-from pydantic import Field
-from pydantic import PrivateAttr
-from sentry_sdk import HttpTransport
-from sentry_sdk.attachments import Attachment
-from sentry_sdk.consts import EndpointType
-from sentry_sdk.envelope import Envelope
-from sentry_sdk.integrations.stdlib import StdlibIntegration
-from sentry_sdk.types import Event
-from sentry_sdk.types import Hint
from traceback_with_variables import Format
-from imbue_core.common import truncate_string
-from imbue_core.pydantic_serialization import FrozenModel
-from imbue_core.pydantic_serialization import MutableModel
-from imbue_core.s3_uploader import upload_to_s3
-from imbue_core.sentry_loguru_handler import SENTRY_LOG_FORMAT
-from imbue_core.sentry_loguru_handler import SentryBreadcrumbHandler
-from imbue_core.sentry_loguru_handler import SentryEventHandler
-from imbue_core.sentry_loguru_handler import SentryLoguruLoggingLevels
-from imbue_core.sentry_loguru_handler import log_error_inside_sentry
-
-try:
- import brotli # type: ignore
-except ImportError:
- brotli = None
-
-
-# sentry's size limits are annoyingly hard to evaluate before sending the event. we'll just try to be conservative.
-# https://docs.sentry.io/concepts/data-management/size-limits/
-# https://develop.sentry.dev/sdk/data-model/envelopes/#size-limits
-MAX_SENTRY_ATTACHMENT_SIZE = 10 * 1024 * 1024
-
-
-class SentryEventRejected(Exception):
- pass
-
-
-class ExceptionKey(FrozenModel):
- exception_type: type[BaseException] | None
- exception_args: tuple[Hashable, ...]
-
- @classmethod
- def build_from_exception_or_fingerprint(
- cls, exception: BaseException | None, log_fingerprint: str | None
- ) -> "ExceptionKey":
- if exception is None:
- return cls(
- exception_type=None,
- exception_args=(log_fingerprint,),
- )
- else:
- return cls(
- exception_type=type(exception),
- # FIXME: we may grab things with references here unnecessarily. Let's store only the hash here and stringified representation.
- exception_args=tuple(arg for arg in exception.args if isinstance(arg, Hashable)),
- )
-
-
-class ExceptionHistory(MutableModel):
- total_sent: int = 0
- total_throttled: int = 0
-
- last_reported_at: float | None = None # monotonic clock value
- throttled_since_last_report: int = 0
-
- @property
- def since_last_report(self) -> float:
- last_reported_at = self.last_reported_at
- if last_reported_at is None:
- return float("inf")
- return time.monotonic() - last_reported_at
-
- def log_throttled(self):
- self.throttled_since_last_report += 1
- self.total_throttled += 1
-
- def log_reported(self):
- self.last_reported_at = time.monotonic()
- self.throttled_since_last_report = 0
- self.total_sent += 1
-
-
-def _first_line_of_log_message(event: Event) -> str | None:
- """Extracts the first line of the log message from the event, if any."""
- message = event.get("logentry", {}).get("message")
- if message and isinstance(message, str):
- message_lines = message.strip().splitlines()
- if message_lines:
- return message_lines[0]
- return None
-
-
-def _get_full_location_from_event(event: Event) -> str | None:
- """Extracts the `full_location` field that we are supposed to generate in our log handlers."""
- extra = event.get("extra", {}).get("extra")
- if extra and isinstance(extra, dict):
- full_location = extra.get("full_location")
- if full_location and isinstance(full_location, str):
- return full_location.strip() or None
- return None
-
-
-class _ReasonToAllowSendingEvent(StrEnum):
- PASS_THRU = "pass_thru"
- NO_RATE_LIMIT_INFO = "no_rate_limit_info"
- TOO_MANY_TRACKED_EXCEPTIONS = "too_many_tracked_exceptions"
- INITIAL = "initial"
- INITIAL_GRACE_PERIOD = "initial_grace_period"
- TIMEOUT_ELAPSED = "timeout_elapsed"
-
-
-class _SentryEventRateLimiter(MutableModel):
- """Prevent logging the same specific exceptions multiple times to sentry.
-
- Each allowed exception is assumed to be sent.
- """
-
- # these exception will never be rate limited
- pass_thru_exception_types: Collection[type[BaseException]] = Field(default_factory=set)
- # the number of initial reports to allow before starting to apply rate limiting
- initial_reports_without_rate_limiting: int = 2
- # the time (in seconds) that must pass since the last report of a given exception before allowing
- # another report it is multiplied by the number of times the exception has been passed-thru since
- # the app start after the first throttling event
- timeout_factor: float = 60.0
- # maximum number of different exceptions to track for rate limiting
- # once this number is exceeded, all events will be passed through unfiltered
- max_tracked_rate_limited_exceptions: int = 10_000
-
- # we should not be called in parallel, but better safe than sorry
- # this lock protects access to _exception_history, its contents, and the total counters
- _lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
- _exception_history: MutableMapping[ExceptionKey, ExceptionHistory] = PrivateAttr(default_factory=dict)
- _total_throttled: int = PrivateAttr(default=0)
- _total_sent: int = PrivateAttr(default=0)
-
- def _annotate_event(
- self,
- event: Event,
- reason_to_allow: _ReasonToAllowSendingEvent,
- past_history: ExceptionHistory | None = None,
- ) -> Event:
- logger.trace("Annotating event with rate limiter: {}", reason_to_allow)
-
- annotation: dict[str, Any] = {
- "reason_to_allow": reason_to_allow.value,
- "application": {
- "total_throttled": self._total_throttled,
- "total_sent": self._total_sent,
- "total_tracked": len(
- self._exception_history
- ), # thread-safe to read without lock since we don't care about consistency
- },
- }
- if past_history is not None:
- annotation["instance"] = {
- "since_last_report": past_history.since_last_report,
- "throttled_since_last_report": past_history.throttled_since_last_report,
- "total_throttled": past_history.total_throttled,
- "total_sent": past_history.total_sent,
- }
-
- event.setdefault("extra", {})
- event["extra"]["rate_limiter"] = annotation
-
- event.setdefault("tags", {})
- event["tags"]["rate_limiter_reason_to_allow"] = reason_to_allow
- return event
-
- def before_send(self, event: Event, hint: Hint) -> Event | None:
- annotated_event = self._before_send(event, hint)
- with self._lock:
- if annotated_event is None:
- self._total_throttled += 1
- else:
- self._total_sent += 1
-
- return annotated_event
-
- def _before_send(self, event: Event, hint: Hint) -> Event | None:
- exception = None
- exception_type = None
- # see sentry_sdk._types.ExcInfo which sadly we can't import
- if "exc_info" in hint:
- exception_type, exception, _ = hint["exc_info"]
-
- if (exception_type is not None) and (exception_type in self.pass_thru_exception_types):
- return self._annotate_event(event, _ReasonToAllowSendingEvent.PASS_THRU)
-
- first_line = _first_line_of_log_message(event)
- full_location = _get_full_location_from_event(event)
- if first_line and full_location:
- log_fingerprint = "\n".join([first_line, full_location])
- else:
- log_fingerprint = None
-
- if not (log_fingerprint or exception):
- # nothing to rate limit on
- return self._annotate_event(event, _ReasonToAllowSendingEvent.NO_RATE_LIMIT_INFO)
-
- key = ExceptionKey.build_from_exception_or_fingerprint(exception, log_fingerprint)
- with self._lock:
- if key not in self._exception_history:
- # we could LRU but if we got to this point, there's something else to figure out, like bad keying
- if len(self._exception_history) >= self.max_tracked_rate_limited_exceptions:
- return self._annotate_event(event, _ReasonToAllowSendingEvent.TOO_MANY_TRACKED_EXCEPTIONS)
- history = ExceptionHistory(last_reported_at=time.monotonic(), total_sent=1)
- self._exception_history[key] = history
- return self._annotate_event(event, _ReasonToAllowSendingEvent.INITIAL)
-
- history = self._exception_history[key]
- reason_to_allow: _ReasonToAllowSendingEvent | None = None
- if history.total_sent < self.initial_reports_without_rate_limiting:
- reason_to_allow = _ReasonToAllowSendingEvent.INITIAL_GRACE_PERIOD
- else:
- current_timeout = self.timeout_factor * max(
- 1,
- history.total_sent - self.initial_reports_without_rate_limiting + 1,
- )
- if history.since_last_report >= current_timeout:
- logger.trace("Timeout elapsed for event: {}, {}", key, current_timeout)
- reason_to_allow = _ReasonToAllowSendingEvent.TIMEOUT_ELAPSED
-
- if reason_to_allow:
- event = self._annotate_event(event, reason_to_allow=reason_to_allow, past_history=history)
- history.log_reported()
- return event
- history.log_throttled()
-
- logger.trace("Rate limiting event: {}", key)
- return None
-
-
-class ImbueSentryHttpTransport(HttpTransport):
- """The sentry python sdk has pretty lame behavior if the event is too large.
- It'll just drop it, and record stats indicating that an event was dropped.
- You can see these at `https://generally-intelligent-e3.sentry.io/stats`, category "invalid".
- But there's no way to recover any information about the dropped event.
-
- We could try to just ensure the events don't violate the size limit, which we try to do,
- but their size limits are a bit complicated and thus hard to pre-verify. So we also want to know if anything slips through.
-
- The actual sentry web API does return a status code (413) if the event was rejected,
- so we need to handle this at the level of the sentry HttpTransport and do something with it.
- """
-
- def _send_request(
- self,
- body: bytes,
- headers: dict[str, str],
- endpoint_type: EndpointType = EndpointType.ENVELOPE,
- envelope: Envelope | None = None,
- ) -> None:
- """This is a copy of the original `_send_request` method from the HttpTransport class,
- with a hook to call `on_too_large_event` added.
- """
-
- def record_loss(reason: str) -> None:
- if envelope is None:
- self.record_lost_event(reason, data_category="error")
- else:
- envelope_items = envelope.items
- assert envelope_items is not None
- for item in envelope_items:
- self.record_lost_event(reason, item=item)
-
- headers.update(
- {
- "User-Agent": str(self._auth.client),
- "X-Sentry-Auth": str(self._auth.to_header()),
- }
- )
- try:
- response = self._request(
- "POST",
- endpoint_type,
- body,
- headers,
- )
- except Exception:
- self.on_dropped_event("network")
- record_loss("network_error")
- raise
-
- try:
- self._update_rate_limits(response)
-
- if response.status == 429:
- # if we hit a 429. Something was rate limited but we already
- # acted on this in `self._update_rate_limits`. Note that we
- # do not want to record event loss here as we will have recorded
- # an outcome in relay already.
- self.on_dropped_event("status_429")
-
- elif response.status >= 300 or response.status < 200:
- sentry_sdk.utils.logger.error(
- "Unexpected status code: %s (body: %s)",
- response.status,
- getattr(response, "data", getattr(response, "content", None)),
- )
- self.on_dropped_event("status_{}".format(response.status))
- record_loss("network_error")
-
- if response.status == 413:
- assert envelope is not None
- self.on_too_large_event(body, envelope)
- finally:
- response.close()
-
- def on_too_large_event(self, body: bytes, envelope: Envelope) -> None:
- """we want to log _something_ to sentry, because otherwise we have no idea what happened,
- but we also need to be super careful that this fallback doesn't itself fail.
-
- exceptions raised here will simply get eaten and result in nothing getting logged to sentry,
- both due to sentry's usage of `capture_internal_exceptions`
- and that we're running in a worker thread and i don't think they make an effort to re-surface exceptions from threads.
- """
- msg = "request was too large to send to sentry"
- try:
- raise SentryEventRejected(msg)
- except SentryEventRejected as e:
- stripped_envelope = Envelope(headers=envelope.headers)
- attachment_sizes = {}
- envelope_items = envelope.items
- assert envelope_items is not None
- for item in envelope_items:
- if item.data_category == "attachment":
- payload = item.payload
- payload_bytes_len = len(payload.get_bytes() if not isinstance(payload, (bytes, str)) else payload)
- item_headers = item.headers
- assert item_headers is not None
- attachment_sizes[item_headers["filename"]] = payload_bytes_len
- continue
- stripped_envelope.add_item(item)
- # this is uncompressed (so we can inspect it)
- serialized_stripped_envelope = stripped_envelope.serialize()
-
- extra: dict[str, str | int] = {
- "uncompressed_attachment_sizes": str(attachment_sizes),
- "original_compressed_request_body_size": len(body),
- "uncompressed_stripped_envelope_size": len(serialized_stripped_envelope),
- }
-
- # send stripped envelope to S3 -- is preceding code now overkill?
- upload_name = upload_to_s3("stripped_envelope", ".txt", serialized_stripped_envelope)
-
- log_error_inside_sentry(
- e,
- msg,
- extra=extra,
- additional_s3_uploads=(upload_name,) if upload_name else None,
- )
-
def get_traceback_with_vars(exception: BaseException | None = None) -> str:
+
# be careful of potential performance regressions with increasing these limits
tb_format = Format(max_value_str_len=100_000, max_exc_str_len=2_000_000)
if exception is None:
@@ -391,180 +26,3 @@ def get_traceback_with_vars(exception: BaseException | None = None) -> str:
return (
f"got exception while formatting traceback with `traceback_with_variables`: {traceback.format_exception(e)}"
)
-
-
-def _default_sentry_add_extra_info_hook(event: Event, hint: Hint) -> tuple[Event, Hint, tuple[Callable, ...]]:
- """Add traceback with variables to the event as an attachment."""
- # TODO: We just use sentry attachments here; we could also upload to S3, but figure this hook is itself a fallback, so leaving it for now?
- expected_attachments = []
- tb_with_vars = truncate_string(get_traceback_with_vars(), MAX_SENTRY_ATTACHMENT_SIZE)
- hint["attachments"].append(Attachment(tb_with_vars.encode(), filename="traceback_with_variables.txt"))
- expected_attachments.append("traceback_with_variables.txt")
- # record the names of the expected attachments just in case there's any weirdness about attachments not showing up
- event.setdefault("extra", {})["expected_attachments"] = str(expected_attachments)
- return event, hint, ()
-
-
-# We define BeforeSendType here to be one or more callables that match the signature of sentry's before_send hook.
-# The event will be passed through each one in our wrapping code.
-BaseBeforeSendType = Callable[[Event, Hint], Event | None]
-BeforeSendType = BaseBeforeSendType | list[BaseBeforeSendType]
-
-
-# NOTE: if the actual event (without attachments) being too large is a problem, then it will be handled
-# in our custom logic in ImbueSentryHttpTransport above.
-def _before_send_wrapper(
- event: Event,
- hint: Hint,
- before_send_list: Iterable[BaseBeforeSendType],
-) -> Event | None:
- try:
- for before_send in before_send_list:
- # pyre-fixme[9]: the result of before_send can be None which is not compatible with event annotation.
- event = before_send(event, hint)
- if event is None:
- return None
-
- return event
- except Exception as e:
- # It is critical that we catch errors here and print them, because this is called from sentry
- # Failing to do so means that we will see NOTHING about the failure!
- # See this PR for more: https://gitlab.com/generally-intelligent/generally_intelligent/-/merge_requests/5789
- #
- # Questions to the above:
- # - why are we not relying on the Sentry's logger for this?
- # - won't the call to `logger.exception` itself try to send something to Sentry causing recursion?
- # - the following message will likely hit an error inside Loguru handler because it is not allowed
- # to call emit from inside emit (that's what we're in here).
- logger.exception("Failure when processing event in before_send hook: {}", e)
- # NOTE: this re-raise will get suppressed by Sentry and treated as if `before_send` returned `None`
- raise
-
-
-def _fixup_release_id(release_id: str) -> str:
- """
- For pre-release release candidate versions, Sentry requires the release ID to be in the semver format.
-
- E.g. "0.1.0rc1" should be converted to "0.1.0-rc.1".
-
- """
- return re.sub(r"(\d+\.\d+\.\d+)rc(\d+)", r"\1-rc.\2", release_id)
-
-
-def setup_sentry(
- dsn: str,
- release_id: str,
- global_user_context: Mapping[str, str] | None = None,
- integrations: tuple[Any, ...] = (),
- before_send: BeforeSendType | None = None,
- add_extra_info_hook: Callable[[Event, Hint], tuple[Event, Hint, tuple[Callable, ...]]] | None = None,
- environment: str | None = None,
-) -> None:
- """Sets up the main Sentry instance for this process.
-
- This should be done *after* setting up normal loguru loggers, to ensure that sentry handling happens after normal logging.
- In case the sentry stuff hangs or something odd, we want to make sure to at least get regular log output.
-
- Args:
- ...
- add_extra_info_hook: If provided, this function will be called with the event at Handle time.
- before_send: If provided, this function (or list of functions) will be called in order to handle and mutate the event before sending to Sentry.
- """
- assert (
- "SENTRY_DSN" not in os.environ
- ), "Please `unset SENTRY_DSN` in your environment. Set the DSN via the server settings FRONTEND_SENTRY_DSN and BACKEND_SENTRY_DSN instead."
-
- before_send_unrolled = []
-
- if isinstance(before_send, list):
- before_send_unrolled = list(before_send)
- elif callable(before_send):
- before_send_unrolled = [before_send]
- elif before_send is None:
- pass
- else:
- assert_never(before_send)
-
- # NOTE: the rate limiter object's lifetime is maintained by being captured in the
- # closure of the before_send function
- rate_limiter = _SentryEventRateLimiter()
- before_send_unrolled.append(rate_limiter.before_send)
-
- before_send = functools.partial(
- _before_send_wrapper,
- before_send_list=before_send_unrolled,
- )
-
- sentry_sdk.init(
- sample_rate=1.0,
- environment=environment,
- traces_sample_rate=1.0,
- # required for `logger.error` calls to include stacktraces
- attach_stacktrace=True,
- # note this will capture unhandled exceptions even if not explicitly logged, among other things
- # https://docs.sentry.io/platforms/python/integrations/default-integrations/
- default_integrations=True,
- # this doesn't affect the default integrations, but prevents any other ones from being added automatically
- auto_enabling_integrations=False,
- integrations=[
- *integrations,
- ],
- disabled_integrations=[
- # this only adds hooks to subprocess and httplib, which imo just adds noisy breadcrumbs.
- StdlibIntegration()
- ],
- dsn=dsn,
- # may want to get more restrictive about this in the future
- # see https://docs.sentry.io/platforms/python/data-management/data-collected/
- send_default_pii=True,
- # sentry has a max payload size of 1MB, so we can't make this infinite
- max_value_length=10_000,
- add_full_stack=True,
- before_send=before_send,
- release=_fixup_release_id(release_id),
- # default is 100; can't make it too large because total event size must be <1MB
- max_breadcrumbs=100,
- # if the locals is very large, sentry gets to be quite slow to log errors if this is enabled.
- # we log our own traceback_with_variables anyways.
- include_local_variables=False,
- transport=ImbueSentryHttpTransport,
- )
- logger.info("Sentry initialized")
-
- if global_user_context is not None:
- sentry_sdk.set_user(dict(global_user_context))
-
- # capture loguru errors/exceptions with a custom handler
- min_sentry_level: int = SentryLoguruLoggingLevels.LOW_PRIORITY.value
- handler = SentryEventHandler(
- level=min_sentry_level,
- add_extra_info_hook=add_extra_info_hook or _default_sentry_add_extra_info_hook,
- )
- register_sentry_event_handler(handler)
- logger.add(
- handler,
- level=min_sentry_level,
- diagnose=False,
- format=SENTRY_LOG_FORMAT,
- )
- # capture lower level loguru messages to add as breadcrumbs on events
- # the extra info is not helpful here and makes the breadcrumbs larger; they're still available in the log file attachment
- breadcrumb_level: int = SentryLoguruLoggingLevels.INFO.value
- logger.add(
- SentryBreadcrumbHandler(level=breadcrumb_level, strip_extra=True),
- level=breadcrumb_level,
- diagnose=False,
- format=SENTRY_LOG_FORMAT,
- )
-
-
-_SENTRY_EVENT_HANDLER: SentryEventHandler | None = None
-
-
-def register_sentry_event_handler(handler: SentryEventHandler) -> None:
- global _SENTRY_EVENT_HANDLER
- _SENTRY_EVENT_HANDLER = handler
-
-
-def get_sentry_event_handler() -> SentryEventHandler | None:
- return _SENTRY_EVENT_HANDLER
diff --git a/imbue_core/imbue_core/errors.py b/imbue_core/imbue_core/errors.py
@@ -3,8 +3,6 @@
Please subclass from one of the errors in this module for errors that you expect other Imbumans to handle.
"""
-from typing import Any
-
class ImbueRuntimeException(BaseException):
"""Base class for all things that could go wrong within Imbue code.
@@ -13,12 +11,7 @@ class ImbueRuntimeException(BaseException):
ImbueRuntimeExceptions.
"""
- _was_logged_by_log_exception: bool
-
- def __init__(self, *args: Any) -> None:
- super().__init__(*args)
- # New instances start out marked as not-yet-logged.
- self._was_logged_by_log_exception = False
+ pass
class ImbueError(ImbueRuntimeException, Exception):
diff --git a/imbue_core/imbue_core/imbue_cli/__init__.py b/imbue_core/imbue_core/imbue_cli/__init__.py
diff --git a/imbue_core/imbue_core/imbue_cli/action.py b/imbue_core/imbue_core/imbue_cli/action.py
@@ -1,71 +0,0 @@
-from typing import Annotated
-from typing import Literal
-
-from pydantic import Field
-from pydantic import Tag
-
-from imbue_core.imbue_cli.scout_message_types import ScoutMessageUnion
-from imbue_core.issues import CheckFailedIssue
-from imbue_core.issues import IdentifiedIssue
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-from imbue_core.suggestions import CheckOutputID
-from imbue_core.suggestions import Suggestion
-
-
-class UserDisplayOutput(SerializableModel):
- object_type: str
-
-
-class ErroredOutput(UserDisplayOutput):
- object_type: Literal["ErroredOutput"] = "ErroredOutput"
- error_message: str
-
-
-class CommandTextOutput(UserDisplayOutput):
- object_type: Literal["CommandTextOutput"] = "CommandTextOutput"
- output: str
-
-
-class CommandHTMLOutput(UserDisplayOutput):
- object_type: Literal["CommandHTMLOutput"] = "CommandHTMLOutput"
- output: str
-
-
-UserDisplayOutputUnion = Annotated[
- Annotated[ErroredOutput, Tag("ErroredOutput")]
- | Annotated[CommandTextOutput, Tag("CommandTextOutput")]
- | Annotated[CommandHTMLOutput, Tag("CommandHTMLOutput")],
- build_discriminator(),
-]
-
-
-class RetrieveOutput(SerializableModel):
- object_type: str = "RetrieveOutput"
- files: tuple[str, ...]
-
-
-class ScoutOutput(SerializableModel):
- object_type: str = "ScoutOutput"
- id: CheckOutputID
- data: ScoutMessageUnion
-
-
-ActionOutputUnion = Annotated[
- (
- Annotated[ScoutOutput, Tag("ScoutOutput")]
- | Annotated[Suggestion, Tag("Suggestion")]
- | Annotated[CheckFailedIssue, Tag("CheckFailedIssue")]
- | Annotated[IdentifiedIssue, Tag("IdentifiedIssue")]
- | Annotated[RetrieveOutput, Tag("RetrieveOutput")]
- ),
- build_discriminator(),
-]
-
-
-class ActionOutput(SerializableModel):
- command: str = Field(description="The command that was executed to produce the output.")
- outputs: tuple[ActionOutputUnion, ...] = Field(description="The structured output data from the action.")
- user_display: UserDisplayOutputUnion = Field(
- description="The user display output from the action. This can be used by consumers to display a user-friendly version of the action output."
- )
diff --git a/imbue_core/imbue_core/imbue_cli/scout_data_types.py b/imbue_core/imbue_core/imbue_cli/scout_data_types.py
@@ -1,40 +0,0 @@
-from typing import Literal
-
-from imbue_core.pydantic_serialization import SerializableModel
-
-# TODO refactor evidence example to use different classes for different output types
-
-
-class ScoutEvidenceExample(SerializableModel):
- """A single example of evidence for the report."""
-
- description: str
- type: Literal["positive", "negative"]
- command: str | None = None
- output: str | None = None
- code: str | None = None
- image_path: str | None = None
- image_data: bytes | None = None
- image_format: str | None = None
- image_caption: str | None = None
-
-
-class ScoutEvidence(SerializableModel):
- """A piece of evidence for the report."""
-
- question: str
- action: str
- result: str
- score: Literal["Good", "Moderate", "Bad"]
- confidence: Literal["High", "Medium", "Low"]
- reference: str | None = None
- examples: list[ScoutEvidenceExample] | None = None
-
-
-class ScoutReport(SerializableModel):
- """A report of the scout analysis."""
-
- goal: str
- evidence: list[ScoutEvidence]
- total_cost: float
- total_time_taken: float
diff --git a/imbue_core/imbue_core/imbue_cli/scout_message_types.py b/imbue_core/imbue_core/imbue_cli/scout_message_types.py
@@ -1,96 +0,0 @@
-"""Message types for scout output."""
-
-import time
-from typing import Annotated
-from typing import Literal
-
-from pydantic import Field
-from pydantic import Tag
-from pydantic import TypeAdapter
-
-from imbue_core.imbue_cli.scout_data_types import ScoutEvidenceExample
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-
-
-class ScoutMessage(SerializableModel):
- """Base class for all scout output messages."""
-
- object_type: str = Field(description="Discriminator field for message type")
- timestamp: float = Field(default_factory=time.time, description="Unix timestamp when message was created")
-
-
-class EvidenceMessage(ScoutMessage):
- """Message containing a piece of evidence."""
-
- object_type: Literal["EvidenceMessage"] = "EvidenceMessage"
-
- # Evidence fields (same as current ScoutEvidence)
- question: str
- action: str
- result: str
- score: Literal["Good", "Moderate", "Bad"]
- confidence: Literal["High", "Medium", "Low"]
- reference: str | None = None
- examples: list[ScoutEvidenceExample] | None = None
-
-
-class ScoreMessage(ScoutMessage):
- """Message containing an overall score assessment."""
-
- object_type: Literal["ScoreMessage"] = "ScoreMessage"
-
- overall_score: float # 0.0 to 1.0
- evidence_count: int # Number of evidence pieces contributing to this score
- score_breakdown: dict[str, int] # Distribution of Good/Moderate/Bad evidence counts
- confidence_breakdown: dict[str, int] # Distribution of High/Medium/Low confidence counts
- time_elapsed: float # Time elapsed since start in seconds
-
-
-class MetadataMessage(ScoutMessage):
- """Message containing metadata about the scout run."""
-
- object_type: Literal["MetadataMessage"] = "MetadataMessage"
-
- goal: str
- repo_path: str
- model: str
- started_at: float
-
-
-class CostMessage(ScoutMessage):
- """Message containing cost information."""
-
- object_type: Literal["CostMessage"] = "CostMessage"
-
- total_cost_usd: float
- tokens_used: int | None = None
-
-
-class StatusMessage(ScoutMessage):
- """Message containing status updates."""
-
- object_type: Literal["StatusMessage"] = "StatusMessage"
-
- status: Literal["started", "running", "completed", "failed"]
- message: str | None = None
-
-
-# Union type for all messages
-ScoutMessageUnion = Annotated[
- (
- Annotated[EvidenceMessage, Tag("EvidenceMessage")]
- | Annotated[ScoreMessage, Tag("ScoreMessage")]
- | Annotated[MetadataMessage, Tag("MetadataMessage")]
- | Annotated[CostMessage, Tag("CostMessage")]
- | Annotated[StatusMessage, Tag("StatusMessage")]
- ),
- build_discriminator(),
-]
-
-_scout_message_type_adapter = TypeAdapter(ScoutMessageUnion)
-
-
-def deserialize_scout_message_json(data: str) -> ScoutMessageUnion:
- print(f"Parsing scout message json: {data}")
- return _scout_message_type_adapter.validate_json(data)
diff --git a/imbue_core/imbue_core/issues.py b/imbue_core/imbue_core/issues.py
@@ -1,67 +0,0 @@
-from enum import StrEnum
-from typing import Annotated
-
-from pydantic import Tag
-
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-
-
-class IssueSeverityLevel(StrEnum):
- CRITICAL = "CRITICAL"
- ERROR = "ERROR"
- WARNING = "WARNING"
- NIT = "NIT"
-
-
-class IssueKey(SerializableModel):
- # issues from different commands should not collide
- command: str
- # this should NOT contain line numbers, as we want it to be stable across changes as much as possible
- # NOTE: we do some initial formatting to avoid issues around message containing code
- identifier: str
-
-
-class Issue(SerializableModel):
- key: IssueKey
- severity: IssueSeverityLevel
-
- def is_equal(self, other: "Issue") -> bool:
- return self.key == other.key
-
- def summary_line(self) -> str:
- return f"{self.severity}: {self.key}"
-
-
-class CheckFailedIssue(Issue):
- object_type: str = "CheckFailedIssue"
- error_message: str
-
- raw: str | None = None
-
-
-class IdentifiedIssue(Issue):
- object_type: str = "IdentifiedIssue"
- issue_location: str | None = None
-
- # The oneliner message of the problem.
- # PYTEST: AssertionError
- # MYPY: error: Function is missing a type annotation
- message: str
-
- # The full description of the issue (can be many lines)
- # Examples:
- # PYTEST: the longrepr, which is "def test_pearson_correlation_basic_lists():\n x = [1, 2, 3, 4, 5]\n y = [2, 4, 6, 8, 10]\n expected_correlation = 0.9819805060619657\n> ..." # noqa
- # MYPY: "def calculate_pearson_correlation(x, y):"
- description: str
-
- def summary_line(self) -> str:
- return f"{super().summary_line()} message={self.message!r}" + (
- f" ({self.issue_location})" if self.issue_location else ""
- )
-
-
-IssueUnion = Annotated[
- (Annotated[CheckFailedIssue, Tag("CheckFailedIssue")] | Annotated[IdentifiedIssue, Tag("IdentifiedIssue")]),
- build_discriminator(),
-]
diff --git a/imbue_core/imbue_core/log_utils.py b/imbue_core/imbue_core/log_utils.py
@@ -1,146 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-from pathlib import Path
-from typing import Any
-from typing import Callable
-from typing import Mapping
-from typing import TYPE_CHECKING
-
-import loguru
-
-from imbue_core.constants import ExceptionPriority
-from imbue_core.constants import HIGH_PRIORITY_LEVEL
-from imbue_core.constants import LOW_PRIORITY_LEVEL
-from imbue_core.constants import MEDIUM_PRIORITY_LEVEL
-
-if TYPE_CHECKING:
- loguru_record = loguru.Record
-else:
- loguru_record = dict[str, Any]
-FilterDict = dict[str | None, str | int | bool]
-FilterFunction = Callable[[loguru_record], bool]
-LOCATION_WIDTH = 60
-TRACE = "TRACE"
-
-# between DEBUG and INFO: https://loguru.readthedocs.io/en/stable/api/logger.html
-DETAIL = "DETAIL"
-DETAIL_LEVEL = 15
-
-# the first 4 chars are used for "tsk_" and the next ~7 bytes are used for the timestamp
-# thus we need at least a few extra characters to make sure we don't troll ourselves when two tasks are created
-# very close in time
-TASK_ID_MESSAGE_WIDTH = 16
-
-LOG_LEVEL_NO_COLOR_TUPLES = [
- (DETAIL, DETAIL_LEVEL, "<fg 128,128,128>"),
- (ExceptionPriority.LOW_PRIORITY.value, LOW_PRIORITY_LEVEL, "<yellow>"),
- (ExceptionPriority.MEDIUM_PRIORITY.value, MEDIUM_PRIORITY_LEVEL, "<fg 255,127,0>"),
- (ExceptionPriority.HIGH_PRIORITY.value, HIGH_PRIORITY_LEVEL, "<red>"),
-]
-
-
-def fix_full_location(record: "loguru.Record") -> str:
- """
- One goal of this function is to format the location in an IDE-friendly way,
- so that control-clicking on the logged location opens the correct file
- and puts the cursor at the correct line.
-
- `record` looks like this:
- ```
- {
- "elapsed": datetime.timedelta(seconds=5, microseconds=152312),
- "exception": None,
- "extra": {
- "machine": "32de5bcafaa8",
- "user": "user",
- "agent_type": None,
- "agent_id": None,
- "parent_id": None,
- "async_task_id": None,
- "formatted_task_id": "",
- "formatted_agent_id": "",
- "sandbox_id": None,
- "formatted_sandbox_id": "",
- },
- "file": (
- name="error_dump_utils.py",
- path="/thad/dropbox/Thad Hughes/src/generally_intelligent/computronium/computronium/common/error_dump_utils.py",
- ),
- "function": "write_exception",
- "level": (name="INFO", no=20, icon="ℹ️"),
- "line": 214,
- "message": "Full traceback for is available at http://node-004.snake-blues.ts.net:7777/exceptions/thad__notebook_2025_01_28_legolas/1000__2025_01_28_17_47_19_170102__P000_W0000/C0000__user_100.110.58.95_5045/2025_03_06_09_59_10_188008_KeyboardInterrupt_traceback.txt",
- "module": "error_dump_utils",
- "name": "computronium.common.error_dump_utils",
- "process": (id=3099015, name="MainProcess"),
- "thread": (id=140434013706048, name="MainThread"),
- "time": datetime(
- 2025, 3, 6, 9, 59, 10, 920101, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=57600), "PST")
- ),
- }
- ```
- """
- log_path = Path(record["file"].path)
- try:
- cwd = Path.cwd()
- except FileNotFoundError:
- cwd = Path("/")
-
- if log_path.is_relative_to(cwd):
- log_path = log_path.relative_to(cwd)
- location: str = record["extra"].get("full_location", f"{str(log_path)}:{record['line']}:{record['function']}")
- while len(location) > LOCATION_WIDTH and "/" in location:
- location = location[location.find("/") + 1 :]
- return location[-LOCATION_WIDTH:].rjust(LOCATION_WIDTH)
-
-
-def format_task_id(async_task_id: str) -> str:
- return async_task_id[:TASK_ID_MESSAGE_WIDTH].rjust(TASK_ID_MESSAGE_WIDTH)
-
-
-def patch_log_context_in_place(record: "loguru.Record", format_task_id: Callable[[str], str] = format_task_id) -> None:
- record["extra"]["full_location"] = fix_full_location(record)
-
- async_task_id = None
- try:
- # get the task id
- current_task = asyncio.current_task()
- if current_task is not None:
- async_task_id = current_task.get_name()
- except RuntimeError:
- # we're not in an asyncio event loop
- pass
-
- if async_task_id:
- formatted_task_id = format_task_id(async_task_id)
- record["extra"]["formatted_task_id"] = f" [{formatted_task_id}]"
- record["extra"]["async_task_id"] = async_task_id
-
-
-# TODO: Consider moving all levels from computronium log_utils.py _ensure_levels_configured here
-def ensure_core_log_levels_configured(
- additional_log_levels: Mapping[str, int] | None = None,
-) -> None:
- from loguru import logger
-
- logger.trace("configuring detail and ExceptionPriority log levels")
- for level, no, color in LOG_LEVEL_NO_COLOR_TUPLES:
- try:
- logger.level(level, no=no, color=color)
- except (TypeError, ValueError) as e:
- is_level_already_set_thus_ok = "already exists, you can't update its severity no" in str(e)
- if not is_level_already_set_thus_ok:
- raise
-
- if additional_log_levels is None:
- return
-
- logger.trace("configuring additional log levels {}", additional_log_levels)
- for level_name, level_no in additional_log_levels.items():
- try:
- logger.level(level_name, no=level_no)
- except (TypeError, ValueError) as e:
- is_level_already_set_thus_ok = "already exists, you can't update its severity no" in str(e)
- if not is_level_already_set_thus_ok:
- raise
diff --git a/imbue_core/imbue_core/repo_state.py b/imbue_core/imbue_core/repo_state.py
@@ -1,109 +0,0 @@
-from enum import StrEnum
-from functools import cached_property
-from typing import Annotated
-from typing import Any
-
-from pydantic import Field
-from pydantic import Tag
-from pydantic import computed_field
-
-from imbue_core.frozen_utils import FrozenDict
-from imbue_core.pydantic_serialization import PydanticFrozenDictAnnotation
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-
-ResourceURL = str
-
-
-class ConflictType(StrEnum):
- MERGE = "MERGE"
- REBASE = "REBASE"
- CHERRY_PICK = "CHERRY_PICK"
- APPLY = "APPLY"
- REVERT = "REVERT"
- BISECT = "BISECT"
-
-
-class RepoOperation(SerializableModel):
- pass
-
- @computed_field
- @cached_property
- def is_empty(self) -> bool:
- """Whether this repo operation leaves the repo unchanged.
-
- Defaults to False. But should be overridden by subclasses as appropriate.
- """
- return False
-
-
-class ConflictedRepoOperation(RepoOperation):
- object_type: str = "ConflictedRepoOperation"
-
- blob_content_by_hash: Annotated[FrozenDict[str, bytes], PydanticFrozenDictAnnotation]
- index_content: bytes
- modified_file_contents_by_path: Annotated[FrozenDict[str, bytes], PydanticFrozenDictAnnotation]
- conflict_type: ConflictType
- special_git_file_contents_by_path: Annotated[FrozenDict[str, bytes], PydanticFrozenDictAnnotation]
-
-
-class CleanRepoOperation(RepoOperation):
- """
- A clean repo operation is a repo operation that has no conflicts.
-
- It is a contains the staged diff, the unstaged diff, and the combination of the previous two.
- """
-
- object_type: str = "CleanRepoOperation"
- combined_diff: str
- staged_diff: str = ""
- unstaged_diff: str = ""
-
- # FIXME: this is now doing validation -- should be converted to the pydantic way of doing this!
- def model_post_init(self, __context: Any) -> None:
- super().model_post_init(__context)
- if self.combined_diff.strip() != "":
- assert (
- self.staged_diff.strip() != "" or self.unstaged_diff.strip() != ""
- ), "combined diff is not empty, so staged and unstaged diffs must be non-empty"
-
- @computed_field
- @cached_property
- def is_empty(self) -> bool:
- return self.combined_diff.strip() == ""
-
-
-class RepoState(SerializableModel):
- git_hash: str
- repo_operation: (
- Annotated[CleanRepoOperation, Tag("CleanRepoOperation")]
- | Annotated[ConflictedRepoOperation, Tag("ConflictedRepoOperation")]
- ) = Field(discriminator=build_discriminator())
-
- @computed_field
- @cached_property
- def is_conflicted(self) -> bool:
- return isinstance(self.repo_operation, ConflictedRepoOperation)
-
- @computed_field
- @cached_property
- def has_operations(self) -> bool:
- repo_operation = self.repo_operation
- return isinstance(repo_operation, ConflictedRepoOperation) or repo_operation.combined_diff.strip() != ""
-
- @computed_field
- @cached_property
- def type_name(self) -> str:
- return self.__class__.__name__
-
- def build_with_new_commit(self, git_hash: str) -> "RepoState":
- return RepoState(git_hash=git_hash, repo_operation=self.repo_operation)
-
-
-GIT_FILE_PATH_NAMES_BY_CONFLICT_TYPE: dict[ConflictType, tuple[str, ...]] = {
- ConflictType.MERGE: ("MERGE_HEAD", "AUTO_MERGE", "MERGE_MSG", "MERGE_MODE"),
- ConflictType.REBASE: ("REBASE_HEAD",),
- ConflictType.CHERRY_PICK: ("CHERRY_PICK_HEAD",),
- ConflictType.APPLY: (),
- ConflictType.REVERT: ("REVERT_HEAD",),
-}
diff --git a/imbue_core/imbue_core/s3_uploader.py b/imbue_core/imbue_core/s3_uploader.py
@@ -1,213 +0,0 @@
-import threading
-import time
-import typing
-import uuid
-from concurrent.futures import ThreadPoolExecutor
-from datetime import datetime
-from datetime import timezone
-
-import boto3
-from botocore import UNSIGNED
-from botocore.config import Config
-from loguru import logger
-
-if typing.TYPE_CHECKING:
- # type: ignore[import-not-found]: pyre on modal does't believe in mypy_boto3_s3
- from mypy_boto3_s3 import Client
-else:
- Client = object
-
-from pydantic import PrivateAttr
-
-from imbue_core.pydantic_serialization import FrozenModel
-
-EXTRAS_UPLOADED_FILES_KEY = "uploaded_files"
-
-PRODUCTION_UPLOADS_BUCKET = "traceback-uploads-production"
-STAGING_UPLOADS_BUCKET = "traceback-uploads-staging"
-
-DEFAULT_REGION = "us-west-2"
-MAXIMUM_QUEUED_S3_UPLOADS = (
- 50 # rather arbitrary but better to err on the side of caution when going from the current unbounded
-)
-
-
-class _S3Uploader(FrozenModel):
- bucket: str
- region: str
- maximum_concurrency: int = MAXIMUM_QUEUED_S3_UPLOADS
-
- _s3_client: Client = PrivateAttr() # type: ignore[valid-type]
-
- # protects access to the thread collections
- _thread_pool: ThreadPoolExecutor = PrivateAttr()
- _thread_limiter: threading.Semaphore = PrivateAttr()
-
- def model_post_init(self, context) -> None:
- # NOTE: we use an unsigned client to avoid the need to provide AWS credentials.
- self._s3_client = boto3.client("s3", region_name=self.region, config=Config(signature_version=UNSIGNED))
- self._thread_pool = ThreadPoolExecutor(max_workers=None, thread_name_prefix=f"s3_upload")
- # Unfortunately, there's no safe access to the queue size of the thread pool so calculating that number precisely
- # using a semaphore. Each queued up uploads acquires a single value and returns it only after its thread is done
- # interacting with S3. The value of the semaphore at any given time is the number of available work slots, and
- # it cannot go negative. The semaphore is bounded purely to track that there are no more releases than acquisitions.
- self._thread_limiter = threading.BoundedSemaphore(self.maximum_concurrency)
-
- def _upload_thread(self, key: str, contents: bytes) -> None:
- try:
- logger.debug("Uploading to s3://{}/{}", self.bucket, key)
- # NOTE: we use put_object instead of upload_file because we don't want multipart uploads
- # multipart uploads are not allowed for unsigned clients
- self._s3_client.put_object(
- Bucket=self.bucket,
- Key=key,
- Body=contents,
- )
- logger.debug("Done uploading to s3://{}/{}", self.bucket, key) # XXX remove before merge
- except Exception as e:
- logger.info("Failed to upload {} to S3: {}", key, e)
- # if re-raised, who would even catch this exception?
- finally:
- self._thread_limiter.release()
-
- def s3_uri_from_key(self, key: str) -> str:
- return f"s3://{self.bucket}/{key}"
-
- def upload_if_possible(self, key: str, contents: bytes) -> str | None:
- """Returns the S3 URL of the upload or None if the upload is not possible"""
- if not self._thread_limiter.acquire(timeout=0):
- logger.debug(
- "Skipping upload to {key}, maximum concurrent uploads in progress already (limit={limit})",
- key=key,
- limit=self.maximum_concurrency,
- )
- return None
-
- try:
- self._thread_pool.submit(self._upload_thread, key, contents)
- except Exception as e:
- # we have to release the semaphore since the thread didn't start
- # this shouldn't ever happen but the docs for `.submit` don't promise
- # anything
- self._thread_limiter.release()
- logger.debug("Failed to queue a thread for an upload to {key}: {e}", key=key, e=e)
- return None
-
- return self.s3_uri_from_key(key)
-
- def wait_for_all_uploads(self, timeout: float | None, is_shutting_down: bool) -> bool:
- """Waits for all the uploads that may still be in progress or queued.
-
- When is_shutting_down is True, the function will block until all uploads are completed and will disable any
- future use of the uploader.
-
- The is_shutting_down parameter is meant to be overridden only in tests as a way to checkpoint before
- proceeding to schedule more work.
- """
- deadline = None
- if timeout is not None:
- deadline = time.monotonic() + timeout
-
- # tracks the number of work slots from semaphore that this function already acquired
- # if all self.maximum_concurrency values are collected then we guarantee that no other
- # work is queue or in progress
- n = 0
-
- # a fast pass to collect available tickets before we start waiting and log messages to the user.
- while self._thread_limiter.acquire(timeout=0):
- n += 1
-
- while (deadline is None or time.monotonic() < deadline) and n < self.maximum_concurrency:
- if is_shutting_down:
- logger.info(
- "Please stand by: waiting for remaining uploads to finish! Still uploading: {} reports",
- self.maximum_concurrency - n,
- )
- timeout = None if deadline is None else deadline - time.monotonic()
- if self._thread_limiter.acquire(timeout=timeout):
- n += 1
-
- all_done = n == self.maximum_concurrency
- if is_shutting_down:
- # block more work from getting scheduled in case we didn't gobble up all the slots
- # this may also make the semaphore out of sync, as cancelled queued features will not
- # release theirs
- self._thread_pool.shutdown(wait=False, cancel_futures=True)
- if not all_done:
- logger.info(
- "Failed to upload the S3 reports after timeout reached (timeout={}), {} reports still uploading",
- timeout,
- self.maximum_concurrency - n,
- )
- elif n > 0:
- # allow further work
- logger.debug("Letting go of {n} slots", n=n)
- self._thread_limiter.release(n)
-
- return all_done
-
-
-# FIXME: move the methods below to error-handling specific module and get rid of this global variable if possible
-_S3_UPLOADER: _S3Uploader | None = None
-
-
-def setup_s3_uploads(is_production: bool = False) -> None:
- """Set up S3 upload settings."""
- global _S3_UPLOADER
- if _S3_UPLOADER is not None:
- logger.debug("S3 upload settings already initialized, skipping setup")
- return
- if is_production:
- bucket_name = PRODUCTION_UPLOADS_BUCKET
- else:
- bucket_name = STAGING_UPLOADS_BUCKET
- _S3_UPLOADER = _S3Uploader(bucket=bucket_name, region=DEFAULT_REGION)
-
-
-def get_s3_upload_key(key_prefix: str, key_suffix: str) -> str:
- """Get a URL for an S3 upload."""
- key = (
- "_".join(
- [
- key_prefix,
- datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S"),
- uuid.uuid4().hex,
- ]
- )
- + key_suffix
- )
- return key
-
-
-def get_s3_upload_url(key: str) -> str | None:
- """Get a URL for an S3 upload."""
- if _S3_UPLOADER is None:
- logger.info("S3 upload settings not initialized. Skipping upload.")
- return None
- return _S3_UPLOADER.s3_uri_from_key(key)
-
-
-def upload_to_s3_with_key(key: str, contents: bytes) -> str | None:
- """Upload a file to S3 and return the S3 URL. Returns None if upload is not possible."""
- if _S3_UPLOADER is None:
- logger.info("S3 upload settings not initialized. Skipping upload.")
- return None
- return _S3_UPLOADER.upload_if_possible(key, contents)
-
-
-def upload_to_s3(key_prefix: str, key_suffix: str, contents: bytes) -> str | None:
- """Upload a file to S3 in the background."""
- if _S3_UPLOADER is None:
- logger.info("S3 upload settings not initialized. Skipping upload.")
- return None
-
- key = get_s3_upload_key(key_prefix, key_suffix)
- return upload_to_s3_with_key(key, contents)
-
-
-def wait_for_s3_uploads(timeout: float | None, is_shutting_down: bool) -> bool | None:
- logger.info("Checking whether S3 uploads are still in progress!")
- if _S3_UPLOADER is None:
- return None
-
- return _S3_UPLOADER.wait_for_all_uploads(timeout=timeout, is_shutting_down=is_shutting_down)
diff --git a/imbue_core/imbue_core/sculptor/__init__.py b/imbue_core/imbue_core/sculptor/__init__.py
@@ -1,4 +0,0 @@
-"""
-This package defines Sculptor types and utilities that need to be shared with other projects,
-currently imbue_cli and imbue_verify.
-"""
diff --git a/imbue_core/imbue_core/sculptor/state/chat_state.py b/imbue_core/imbue_core/sculptor/state/chat_state.py
@@ -1,188 +0,0 @@
-from enum import StrEnum
-from typing import Annotated
-from typing import Any
-from typing import Literal
-
-from pydantic import Field
-from pydantic import Tag
-
-from imbue_core.agents.data_types.ids import AgentMessageID
-from imbue_core.agents.data_types.ids import TaskID
-from imbue_core.ids import ToolUseID
-from imbue_core.imbue_cli.action import ActionOutput as ImbueCLIActionOutput
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-
-# ========================
-# Chat Type Definitions
-# ========================
-
-
-class ContentBlock(SerializableModel):
- object_type: str = Field(..., description="Type discriminator for content blocks")
- type: str = Field(..., description="Type discriminator for content blocks")
-
-
-class TextBlock(ContentBlock):
- object_type: str = "TextBlock"
- type: Literal["text"] = "text"
- text: str
-
-
-class ContextSummaryBlock(ContentBlock):
- object_type: str = "ContextSummaryBlock"
- type: Literal["context_summary"] = "context_summary"
- text: str
-
-
-class ResumeResponseBlock(ContentBlock):
- object_type: str = "ResumeResponseBlock"
- type: Literal["resume_response"] = "resume_response"
-
-
-class ForkedToBlock(ContentBlock):
- object_type: str = "ForkedToBlock"
- type: Literal["forked_to"] = "forked_to"
- forked_to_task_id: TaskID
-
-
-class ForkedFromBlock(ContentBlock):
- object_type: str = "ForkedFromBlock"
- type: Literal["forked_from"] = "forked_from"
- forked_from_task_id: TaskID
-
-
-class CommandBlock(ContentBlock):
- object_type: str = "CommandBlock"
- type: Literal["command"] = "command"
- command: str
- is_automated: bool = Field(default=False, description="Whether the command is automated")
-
-
-ToolInput = dict[str, Any]
-
-
-class ToolUseBlock(ContentBlock):
- object_type: str = "ToolUseBlock"
- type: Literal["tool_use"] = "tool_use"
- id: ToolUseID = Field(..., description="Unique identifier for this tool use")
- name: str = Field(..., description="Name of the tool being used")
- input: ToolInput = Field(default_factory=ToolInput, description="Input parameters for the tool")
-
-
-class ToolResultContent(SerializableModel):
- """Base class for tool result content with type discriminator"""
-
- content_type: str = Field(..., description="Type discriminator for tool result content")
-
-
-class SimpleToolContent(ToolResultContent):
- """Generic tool content, or information to reconstruct diff tool content"""
-
- content_type: Literal["simple"] = "simple"
- text: str = Field(..., description="The tool output as text")
- tool_input: ToolInput
- tool_content: Any
-
-
-class GenericToolContent(ToolResultContent):
- """Generic content for most tools - just a string"""
-
- content_type: Literal["generic"] = "generic"
- text: str = Field(..., description="The tool output as text")
-
-
-class DiffToolContent(ToolResultContent):
- """Content for diff-producing tools (Write, Edit, MultiEdit)"""
-
- content_type: Literal["diff"] = "diff"
- diff: str = Field(..., description="The git diff string")
- file_path: str = Field(..., description="The file that was modified")
-
-
-class ImbueCLIToolContent(ToolResultContent):
- """Content for Imbue CLI MCP results that may be composed of multiple actions"""
-
- content_type: Literal["imbue_cli"] = "imbue_cli"
-
- action_outputs: list[ImbueCLIActionOutput] = Field(
- ..., description="List of action outputs from the Imbue CLI tool"
- )
-
-
-ToolResultContentType = GenericToolContent | DiffToolContent | ImbueCLIToolContent
-
-
-class ToolResultBlockSimple(ContentBlock):
- object_type: str = "ToolResultBlockSimple"
- type: Literal["tool_result_simple"] = "tool_result_simple"
- tool_use_id: ToolUseID = Field(..., description="ID of the corresponding tool use")
- tool_name: str = Field(..., description="Name of the tool that was used")
- invocation_string: str = Field(..., description="String representation of how the tool was invoked")
- content: SimpleToolContent | ImbueCLIToolContent = Field(..., description="Result content from the tool execution")
- is_error: bool = Field(default=False, description="Whether the tool execution resulted in an error")
-
-
-class ToolResultBlock(ContentBlock):
- object_type: str = "ToolResultBlock"
- type: Literal["tool_result"] = "tool_result"
- tool_use_id: ToolUseID = Field(..., description="ID of the corresponding tool use")
- tool_name: str = Field(..., description="Name of the tool that was used")
- invocation_string: str = Field(..., description="String representation of how the tool was invoked")
- content: ToolResultContentType = Field(..., description="Result content from the tool execution")
- is_error: bool = Field(default=False, description="Whether the tool execution resulted in an error")
-
-
-class WarningBlock(ContentBlock):
- object_type: str = "WarningBlock"
- type: Literal["warning"] = "warning"
- message: str = Field(..., description="Warning message")
- traceback: str | None = Field(..., description="Warning traceback")
- warning_type: str | None = Field(..., description="Type of warning, i.e. name of the exception that was raised")
-
-
-class ErrorBlock(ContentBlock):
- object_type: str = "ErrorBlock"
- type: Literal["error"] = "error"
- message: str = Field(..., description="Error message")
- traceback: str = Field(..., description="Error traceback")
- error_type: str = Field(..., description="Type of error, i.e. name of the exception that was raised")
-
-
-class FileBlock(ContentBlock):
- object_type: str = "FileBlock"
- type: Literal["file"] = "file"
- source: str = Field(..., description="A file path on the users local machine.")
-
-
-ContentBlockTypes = Annotated[
- (
- Annotated[TextBlock, Tag("TextBlock")]
- | Annotated[CommandBlock, Tag("CommandBlock")]
- | Annotated[ToolUseBlock, Tag("ToolUseBlock")]
- | Annotated[ToolResultBlock, Tag("ToolResultBlock")]
- | Annotated[ErrorBlock, Tag("ErrorBlock")]
- | Annotated[WarningBlock, Tag("WarningBlock")]
- | Annotated[ContextSummaryBlock, Tag("ContextSummaryBlock")]
- | Annotated[ResumeResponseBlock, Tag("ResumeResponseBlock")]
- | Annotated[FileBlock, Tag("FileBlock")]
- | Annotated[ForkedToBlock, Tag("ForkedToBlock")]
- | Annotated[ForkedFromBlock, Tag("ForkedFromBlock")]
- ),
- build_discriminator(),
-]
-
-
-class ChatMessageRole(StrEnum):
- USER = "USER"
- ASSISTANT = "ASSISTANT"
-
-
-class ChatMessage(SerializableModel):
- """Chat message with content blocks. A ChatMessage corresponds to a single turn in the conversation."""
-
- role: ChatMessageRole
- id: AgentMessageID
- content: tuple[ContentBlockTypes, ...]
- snapshot_id: str | None = None
- did_snapshot_fail: bool = False
diff --git a/imbue_core/imbue_core/sculptor/state/messages.py b/imbue_core/imbue_core/sculptor/state/messages.py
@@ -1,141 +0,0 @@
-import datetime
-from enum import StrEnum
-from typing import Annotated
-from typing import Literal
-
-from pydantic import Field
-from pydantic import Tag
-
-from imbue_core.agents.data_types.ids import AgentMessageID
-from imbue_core.ids import AssistantMessageID
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-from imbue_core.sculptor.state.chat_state import ContentBlockTypes
-from imbue_core.sculptor.telemetry import PosthogEventPayload
-from imbue_core.sculptor.telemetry_constants import ConsentLevel
-from imbue_core.sculptor.telemetry_utils import with_consent
-from imbue_core.sculptor.telemetry_utils import without_consent
-from imbue_core.time_utils import get_current_time
-
-
-class LLMModel(StrEnum):
- CLAUDE_4_OPUS = "CLAUDE-4-OPUS"
- CLAUDE_4_SONNET = "CLAUDE-4-SONNET"
- CLAUDE_4_HAIKU = "CLAUDE-4-HAIKU"
- GPT_5_1_CODEX = "GPT-5.1-CODEX"
- GPT_5_1_CODEX_MINI = "GPT-5.1-CODEX-MINI"
- GPT_5_1 = "GPT-5.1"
- GPT_5_2 = "GPT-5.2"
-
-
-# ==================================
-# Backend Message Type Definitions
-# ==================================
-
-
-class AgentMessageSource(StrEnum):
- """
- Messages can come the AGENT (in-container LLM), USER (chat messages & direct interactions), SCULPTOR_SYSTEM (multifaceted sculptor app and service code) and RUNNER (the process controlling a task on the server.)
- """
-
- # Messages coming directly from the agent from inside the environment.
- AGENT = "AGENT"
-
- # Messages coming directly from a user interacting with the interface, ie chat
- USER = "USER"
-
- # Messages coming from sculptor-mediated actions and automations, like local sync updates or manual sync operations.
- # If there is ambiguity, (ie, "the user _did_ click a button but we did a lot of magic in the resolution") prefer SCULPTOR_SYSTEM.
- SCULPTOR_SYSTEM = "SCULPTOR_SYSTEM"
-
- # Messages coming from the task runner wrapper, such as environment shutdown.
- # conceptually a subset of SCULPTOR_SYSTEM
- RUNNER = "RUNNER"
-
-
-class Message(SerializableModel):
- """Base class for all messages sent to or from the agent and user."""
-
- # used to dispatch and discover the type of message
- object_type: str
- # the unique ID of the message, used to track it across the system and prevent duplicates.
- # FIXME: get rid of the explicit passing of message_id
- message_id: AgentMessageID = Field(default_factory=AgentMessageID)
- # the source of the message, which can be either the agent, user, or runner.
- source: AgentMessageSource
- # roughly when the message was created, in UTC.
- # note that this is approximate due to clock skew -- these messages are created on different machines.
- # you should *not* sort by this field -- instead, rely on the order in which the messages are received.
- approximate_creation_time: datetime.datetime = Field(default_factory=get_current_time)
-
- # if the message is ephemeral, it will be logged but not saved to the database
- # if it is persistent, it will be logged AND saved to the database
- @property
- def is_ephemeral(self) -> bool:
- raise NotImplementedError("All messages must be subclassed off of PersistentMessage or EphemeralMessage")
-
-
-class PersistentMessage(Message):
- @property
- def is_ephemeral(self) -> bool:
- return False
-
-
-class PersistentUserMessage(PersistentMessage, PosthogEventPayload):
- """
- One of two base classes for messages sent from the user.
- Persistent user messages are saved to the database.
- Persistent user messages are queued in the task runner before they are sent to the agent.
- """
-
- # Override inherited fields with consent annotations
- # TODO (moishe): if other classes that derive from Message also start getting logged,
- # change the base Message class to derive from PosthogEventPayload. For now, doing
- # that is overkill and requires lots of annotations of irrelevant classes.
- #
- # TODO (mjr): We should really have `PersistentHoggableMessage` and `EphemeralHoggableMessage` or something
- object_type: str = without_consent(description="Type discriminator for user messages")
- message_id: AgentMessageID = without_consent(
- default_factory=AgentMessageID,
- description="Unique identifier for the user message",
- )
- source: AgentMessageSource = without_consent(default=AgentMessageSource.USER)
- approximate_creation_time: datetime.datetime = without_consent(
- default_factory=get_current_time,
- description="Approximate UTC timestamp when user message was created",
- )
-
-
-class ChatInputUserMessage(PersistentUserMessage):
- object_type: str = without_consent(default="ChatInputUserMessage")
- text: str = with_consent(ConsentLevel.LLM_LOGS, description="User input text content")
- model_name: LLMModel = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=None,
- description="Selected LLM model for the chat request",
- )
- files: list[str] = with_consent(
- ConsentLevel.LLM_LOGS,
- default_factory=list,
- description="List of file paths (images, PDFs, etc., stored in Electron app folder) attached to this message",
- )
-
-
-class PersistentAgentMessage(PersistentMessage):
- """Base class for messages sent from the agent."""
-
- source: AgentMessageSource = AgentMessageSource.AGENT
-
-
-class ResponseBlockAgentMessage(PersistentAgentMessage):
- object_type: str = "ResponseBlockAgentMessage"
- role: Literal["user", "assistant", "system"]
- assistant_message_id: AssistantMessageID
- content: tuple[ContentBlockTypes, ...]
-
-
-ConversationMessageUnion = Annotated[
- Annotated[ResponseBlockAgentMessage, Tag("ResponseBlockAgentMessage")]
- | Annotated[ChatInputUserMessage, Tag("ChatInputUserMessage")],
- build_discriminator(),
-]
diff --git a/imbue_core/imbue_core/sculptor/telemetry.py b/imbue_core/imbue_core/sculptor/telemetry.py
@@ -1,809 +0,0 @@
-"""This module exposes an interface for instrumenting telemetry in Sculptor. It's implemented in Imbue Core so that it
-may be re-used between both Sculptor and Imbue CLI.
-
-To use this module well, you MUST:
-
-* call either init_posthog or init_anonymous_posthog on application or lifetime startup.
-* Make sure to call shutdown_posthog() on application close.
-
-* emit_posthog_event() is a low-level library function to send an event to PostHog. If you are a product developer on
- Sculptor, you should probably use fire_posthog_event instead.
-
-Similary you can call
-
-* init_sentry()
-* and MUST call flush_sentry_and_exit_program()
-
-For some reason, flush_sentry_and_exit_program() also shuts down posthog. We will figure this out.
-"""
-
-import os
-import traceback
-from collections import defaultdict
-from contextlib import contextmanager
-from pathlib import Path
-from typing import Any
-from typing import Callable
-from typing import Generator
-from typing import Generic
-from typing import Mapping
-from typing import Optional
-from typing import Protocol
-from typing import TypeVar
-from typing import cast
-from typing import runtime_checkable
-
-import sentry_sdk
-from loguru import logger
-from posthog import Posthog
-from posthog.scopes import identify_context
-from posthog.scopes import new_context
-from pydantic import BaseModel
-from pydantic import ConfigDict
-from pydantic import Field
-from pydantic import ValidationError
-from pydantic import create_model
-from pydantic.fields import FieldInfo
-from sentry_sdk.types import Event
-from sentry_sdk.types import Hint
-
-from imbue_core.agents.data_types.ids import TaskID
-from imbue_core.async_monkey_patches import inject_exception_and_log
-from imbue_core.async_monkey_patches import log_exception
-from imbue_core.async_monkey_patches import pre_filter_exception
-from imbue_core.common import is_running_within_a_pytest_tree
-from imbue_core.constants import ExceptionPriority
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_utils import model_update
-from imbue_core.sculptor.telemetry_constants import ConsentLevel
-from imbue_core.sculptor.telemetry_constants import ProductComponent
-from imbue_core.sculptor.telemetry_constants import SculptorPosthogEvent
-from imbue_core.sculptor.telemetry_constants import UserAction
-from imbue_core.sculptor.telemetry_utils import with_consent
-from imbue_core.sculptor.telemetry_utils import without_consent
-from imbue_core.sculptor.user_config import PrivacySettings
-from imbue_core.sculptor.user_config import UserConfig
-
-# This file is written into the state directory by the Sculptor server inside the task container
-# to provide it with the telemetry info for the task.
-TELEMETRY_TASK_INFO_JSON_STATE_FILE = "telemetry_task_info.json"
-
-
-class TelemetryInfo(SerializableModel):
- """Information needed for setting up telemetry.
-
- This data structure is generated once in the Sculptor server,
- and gets propagated elsewhere (such as to Imbue CLI).
- """
-
- # Putting the User Config into this object is a smell. The UserConfig can and will change idependently of this
- # model, and that can lead to all sorts of issues. Consider refactoring this code.
- user_config: UserConfig
- sculptor_version: str
- sculptor_git_sha: str
- sculptor_execution_instance_id: str
- posthog_token: str
- posthog_api_host: str
- sentry_dsn: str
-
-
-class TelemetryProjectInfo(SerializableModel):
- """Used to communicate project-level information tasks inside containers."""
-
- telemetry_info: TelemetryInfo
- project_id: str
-
- # Does not contain a token -- that should come through the environment.
- gitlab_mirror_repo_url: str | None
- original_git_repo_url: str | None
-
-
-class TelemetryTaskInfo(SerializableModel):
- """Used to communicate task-level information tasks inside containers."""
-
- telemetry_project_info: TelemetryProjectInfo
- task_id: TaskID
-
-
-class PosthogEventPayload(SerializableModel):
- """A base model for PostHog events that validates the presence of
- 'consent_level' metadata on each field.
- """
-
- def __init_subclass__(cls, **kwargs: Any) -> None:
- super().__init_subclass__(**kwargs) # pyre-fixme[6]: pyre can't type check this untyped dict
-
- # Run validation after subclass is defined
- cls._validate_class()
-
- @classmethod
- def _validate_class(cls) -> None:
- """
- Checks that every field has a 'consent_level' in its JSON schema metadata.
- Issues a UserWarning if the metadata is missing.
- """
- for field_name in cls.__annotations__.keys():
- field_info = cls.__dict__[field_name]
- # Check that we're using pydantic.Field
- assert isinstance(field_info, FieldInfo), "Field {} does not extend pydantic.Field".format(field_name)
- # Get the extra schema info, defaulting to an empty dict if it's None
- extra_schema: dict[str, Any] = {}
- match field_info.json_schema_extra:
- # If it's a callable we can call it to get the FieldInfo
- case None:
- pass
- case dict() as d:
- extra_schema = d
- case func if callable(func):
- maybe_extra_schema = func({})
- # TODO: func is supposed to be -> None, so the following should in theory never happen...
- if maybe_extra_schema is not None:
- extra_schema = cast(dict[str, Any], maybe_extra_schema)
- case _:
- pass
-
- assert (
- "consent_level" in extra_schema
- ), """Field '{}' in '{}' is missing the
-'consent_level' metadata. Please use the decorator
-with_consent or without_consent to populate the field annotation:
-`json_schema_extra={{'consent_level': ...}}`""".format(
- field_name, cls.__name__
- )
-
-
-# All data models sent to PostHog MUST define consent annotations and subclass PosthogEventPayload.
-T = TypeVar("T", bound=PosthogEventPayload)
-
-
-# Potentially we could have a ratchet test to remind folks to use
-# consent decorators.
-class PosthogEventModel(SerializableModel, Generic[T]):
- """
- Represents a PostHog event, with each field tagged
- with the minimum consent level required for logging.
- """
-
- # Always defined fields
- name: SculptorPosthogEvent = without_consent(description="Name of event, give it meaning!")
- component: ProductComponent = without_consent(description="App component")
-
- # User Activity field
- action: UserAction | None = with_consent(ConsentLevel.PRODUCT_ANALYTICS)
-
- # Task ID - should be set if this event is associated with a task.
- task_id: str | None = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- description="The task id if this event is task-specific",
- )
-
- # Payload field with consent level
- payload: T | None = without_consent(description="PostHog Event payload Model")
-
-
-def _create_posthog_event_payload_event_data_class(
- additional_field_definitions: Mapping[str, Any] | None = None,
-) -> type[PosthogEventPayload]:
- """Generates a subclass of PosthogEventPayload with the type annotations and consent declarations set up.
-
- The PosthogEventPayload type can very based on the Event, so we are going to provide a mechanism to call this for
- any Posthog Event that needs it.
- """
- field_definitions = {}
-
- additional_field_definitions = additional_field_definitions or {}
-
- for field_name, field_info in UserConfig.model_fields.items():
- field_type = field_info.annotation | None if field_info.annotation else None
- field_definitions[field_name] = (field_type, field_info)
-
- for field_name, field_tuple in additional_field_definitions.items():
- field_definitions[field_name] = field_tuple
-
- field_definitions["sculptor_version"] = (Optional[str], without_consent())
-
- return create_model(
- "TelemetryInfoEventData",
- __base__=PosthogEventPayload,
- **field_definitions,
- )
-
-
-# When we don't know what kind of Payload to use, we use this, which is the bare minimum TelemetryInfo data.
-TelemetryInfoEventData: type[PosthogEventPayload] = _create_posthog_event_payload_event_data_class()
-
-
-# For every Event, we define additional fields that it might have.
-# NOTE: This will need to be refactored into a proper covariant pattern, but this works for now.
-SCULPTOR_POSTHOG_EVENT_TO_PAYLOAD_TYPE = defaultdict(
- lambda: TelemetryInfoEventData,
- {
- SculptorPosthogEvent.ONBOARDING_EMAIL_CONFIRMATION: _create_posthog_event_payload_event_data_class(
- {"did_opt_in_to_marketing": (Optional[bool], without_consent())}
- )
- },
-)
-
-
-def make_telemetry_event_data(telemetry_info: TelemetryInfo) -> PosthogEventPayload:
- user_config_data = telemetry_info.user_config.model_dump()
- return TelemetryInfoEventData(**user_config_data)
-
-
-@runtime_checkable
-class PosthogProtocol(Protocol):
- """
- A protocol satisfied by the Posthog client class and a stub implementation.
- """
-
- host: str
-
- def capture(
- self,
- distinct_id=None,
- event=None,
- properties=None,
- timestamp=None,
- uuid=None,
- groups=None,
- send_feature_flags=False,
- disable_geoip=None,
- ) -> None:
- """
- Capture a telemetry event.
- """
-
- def identify(self, identifier, properties=None) -> None:
- """
- Identifies a user.
- """
-
- def alias(
- self,
- previous_id=None,
- distinct_id=None,
- context=None,
- timestamp=None,
- uuid=None,
- disable_geoip=None,
- ) -> None:
- """
- Links two users together, distinct_id -(maps)-> previous_id
- """
-
- def shutdown(self) -> None:
- """
- Flush all messages and cleanly shut down the client.
- """
-
- def capture_exception(self, exception: BaseException, distinct_id=None, properties=None) -> None:
- """Capture an exception event."""
-
-
-# TODO: Should this inherit from PosthogProtocol?
-class StubPosthog:
- host: str = "stub"
-
- def capture(
- self,
- distinct_id=None,
- event=None,
- properties=None,
- timestamp=None,
- uuid=None,
- groups=None,
- send_feature_flags=False,
- disable_geoip=None,
- ) -> None:
- # Do nothing for now.
- #
- # If we want to test calls to posthog.capture later,
- # we can augment this method to save the arguments internally.
- pass
-
- def identify(self, identifier, properties=None) -> None:
- pass
-
- def alias(
- self,
- previous_id=None,
- distinct_id=None,
- context=None,
- timestamp=None,
- uuid=None,
- disable_geoip=None,
- ) -> None:
- pass
-
- def shutdown(self) -> None:
- pass
-
- def capture_exception(self, exception: BaseException, distinct_id=None, properties=None) -> None:
- pass
-
-
-class PosthogUserInstance(BaseModel):
- model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
- posthog_instance: PosthogProtocol
- is_anonymous: bool
-
- # Every PosthogUserInstance needs to access the UserConfig, this is a function which returns that.
- user_config_accessor: Callable[[], UserConfig]
-
- def access_user_config(self) -> UserConfig:
- return self.user_config_accessor()
-
-
-class AnonymousPosthogUserInstance(PosthogUserInstance):
- """This specific case is used to store a user when they happen to be initted."""
-
- initial_user_id: str
-
-
-# This private in-memory cached value is set or updated whenever we
-# * begin with an anonymous user (init_posthog)
-# * begin with an identified user (init_anonymous_posthog)
-# * convert an identified user into an anonymous user (identify_user)
-_POSTHOG_USER_INSTANCE: PosthogUserInstance | AnonymousPosthogUserInstance | None = None
-
-
-def is_posthog_identified() -> bool:
- return _POSTHOG_USER_INSTANCE is not None and not _POSTHOG_USER_INSTANCE.is_anonymous
-
-
-# This used to be cached, but we wanted the user to be able to change telemetry preferences within a container.
-def _get_telemetry_task_info_if_inside_container() -> TelemetryTaskInfo | None:
- """Mock this for testing.
-
- It is arranged thus because `get_telemetry_task_info_if_inside_container` is imported in many places,
- and to mock, monkeypatch would need to replace definitions at those locations.
-
- With this, monkeypatch only needs to replace this function."""
- telemetry_info_path = Path("/imbue_addons/state") / TELEMETRY_TASK_INFO_JSON_STATE_FILE
- if telemetry_info_path.exists():
- try:
- telemetry_task_info = TelemetryTaskInfo.model_validate_json(telemetry_info_path.read_text())
- return telemetry_task_info
- except ValidationError as e:
- log_exception(
- e,
- "Telemetry info file {telemetry_info_path} invalid, not initializing Posthog.",
- telemetry_info_path=telemetry_info_path,
- )
- return None
-
-
-def get_telemetry_task_info_if_inside_container() -> TelemetryTaskInfo | None:
- """Loads the telemetry task info from the expected location in the container.
- This is a no-op if the file doesn't exist."""
- return _get_telemetry_task_info_if_inside_container()
-
-
-# TODO (CAP-636): Remove upstream git repo logic once GitLab mirroring is completed.
-def get_original_git_repo_url_if_inside_container() -> str | None:
- telemetry_task_info = get_telemetry_task_info_if_inside_container()
- if telemetry_task_info and telemetry_task_info.telemetry_project_info.original_git_repo_url:
- return telemetry_task_info.telemetry_project_info.original_git_repo_url
- return None
-
-
-def init_posthog(
- info: TelemetryInfo,
- source: str,
- user_config_accessor: Callable[[], UserConfig] | None = None,
- is_anonymous: bool = False,
-) -> None:
- """Initialize Posthog for a _known_ user.
-
- After this function is called,
- get_user_posthog_instance and posthog_context can be used.
-
- This function lives here so that the Sculptor backend and Imbue CLI can initialize PostHog in the same way.
-
- Args:
- user_config_accessor: Primarily exists to provide the PosthogUser instance a way to get the _latest_ user config at any time.
- """
- global _POSTHOG_USER_INSTANCE
- if _POSTHOG_USER_INSTANCE is not None:
- raise RuntimeError("Posthog endpoint already initialized.")
-
- posthog: Posthog | StubPosthog
-
- # TODO: Try to remove this test-specific code if possible.
- if is_running_within_a_pytest_tree():
- posthog = StubPosthog()
- else:
- posthog = Posthog(
- info.posthog_token,
- host=info.posthog_api_host,
- super_properties={
- "source": source,
- "sculptor_version": info.sculptor_version,
- "session": {
- # In theory we should go through the accessor here,
- # but at init time, when the user start the first time,
- # they should be the same.
- "instance_id": info.user_config.instance_id,
- "execution_instance_id": info.sculptor_execution_instance_id,
- },
- },
- )
- if not is_anonymous:
- posthog.identify(
- info.user_config.user_id,
- {"email": info.user_config.user_email},
- )
-
- if is_anonymous:
- _POSTHOG_USER_INSTANCE = AnonymousPosthogUserInstance(
- posthog_instance=posthog, # pyre-fixme[6]: pyre seems confused by PosthogProtocol
- is_anonymous=True,
- initial_user_id=info.user_config.user_id,
- user_config_accessor=user_config_accessor or (lambda: info.user_config),
- )
- else:
- _POSTHOG_USER_INSTANCE = PosthogUserInstance(
- posthog_instance=posthog, # pyre-fixme[6]: pyre seems confused by PosthogProtocol
- is_anonymous=False,
- user_config_accessor=user_config_accessor or (lambda: info.user_config),
- )
-
-
-def identify_posthog_user(user_config_accessor: Callable[[], UserConfig]) -> None:
- """Update the initialized PostHog instance with user identity.
-
- At this point, the previous posthog instance should be an anonymous PostHog instance.
- This means logs emitted prior to this call will create a new person entry and not
- populate `person.properties.email` field.
-
- We will use PostHog client's alias function to associate two unique ids together to
- the same person entry.
- """
- global _POSTHOG_USER_INSTANCE
- if _POSTHOG_USER_INSTANCE is None:
- logger.error("Posthog endpoint not initialized")
- return
- if not _POSTHOG_USER_INSTANCE.is_anonymous:
- logger.error("Posthog endpoint already identified with user")
- return
-
- if not isinstance(_POSTHOG_USER_INSTANCE, AnonymousPosthogUserInstance):
- raise RuntimeError("Anonymous PosthogUser instance expected")
-
- # At this point, the previous DataModel holds the anonymous instance id generated.
- initial_user_id = _POSTHOG_USER_INSTANCE.initial_user_id
-
- # We can preserve the previously running posthog instance.
- posthog = _POSTHOG_USER_INSTANCE.posthog_instance
-
- _POSTHOG_USER_INSTANCE = PosthogUserInstance(
- posthog_instance=posthog,
- is_anonymous=False,
- user_config_accessor=user_config_accessor,
- )
-
- latest_user_config = _POSTHOG_USER_INSTANCE.access_user_config()
-
- # Identify should only be called once per instance lifetime.
- posthog.identify(
- initial_user_id,
- {"email": latest_user_config.user_email},
- )
- posthog.alias(previous_id=latest_user_config.user_id, distinct_id=initial_user_id)
-
- logger.info(
- "Associating identified user {} with current instance initial user {}",
- latest_user_config.user_id,
- initial_user_id,
- )
-
-
-def get_user_posthog_instance() -> PosthogUserInstance | None:
- """Returns the global PostHog client or None if it has not been configured"""
- return _POSTHOG_USER_INSTANCE
-
-
-@contextmanager
-def posthog_context(
- posthog_user_instance: PosthogUserInstance | None = None,
-) -> Generator[PosthogProtocol, None, None]:
- """A context manager that creates a PostHog context with the appropriate user ID.
-
- Must be called after init_posthog or with a passed in posthog_user_instance.
-
- TODO: Can we delete this in favor of emit_posthog_event? If not, explain here when to use this.
- TODO: Do we actually need to yield the PosthogProtocol?
- """
- posthog_user_instance = posthog_user_instance or _POSTHOG_USER_INSTANCE
- assert posthog_user_instance is not None
- with new_context():
- # Not having a distinct ID is troublesome...
- current_user_id = posthog_user_instance.access_user_config().user_id
- identify_context(current_user_id)
- try:
- yield posthog_user_instance.posthog_instance
- except Exception as e:
- log_exception(e, "Error in logging to posthog")
-
-
-def is_consent_allowable(required_consent: ConsentLevel | None, privacy_settings: PrivacySettings) -> bool:
- """Check the appropriate value of user consent fields to establish allowable consent."""
- if required_consent is None:
- return True
- elif required_consent == ConsentLevel.NONE:
- return True
- elif required_consent == ConsentLevel.ERROR_REPORTING:
- return privacy_settings.is_error_reporting_enabled
- elif required_consent == ConsentLevel.PRODUCT_ANALYTICS:
- return privacy_settings.is_product_analytics_enabled
- elif required_consent == ConsentLevel.LLM_LOGS:
- return privacy_settings.is_llm_logs_enabled
- elif required_consent == ConsentLevel.SESSION_RECORDING:
- return privacy_settings.is_session_recording_enabled
- elif required_consent == ConsentLevel.NEVER_PERSIST:
- return False
- else:
- logger.info("Unexpected consent level: {}", required_consent)
- return False
-
-
-def filter_model_by_consent(model: SerializableModel, privacy_settings: PrivacySettings) -> SerializableModel:
- """Recursively filter a SerializableModel based on consent toggles.
-
- Args:
- model: The model to filter
- user_config: The user's configuration with consent toggles
-
- Returns:
- SerializableModel with None for fields that don't meet the consent requirements.
- Tries to handle ValidationError gracefully by creating compatible models.
- """
- updates: dict[str, SerializableModel | list[SerializableModel] | None] = {}
-
- for field_name, field_info in model.__class__.model_fields.items():
- field_value = getattr(model, field_name)
-
- # Retrieve the metadata we attached using the decorator
- metadata = field_info.json_schema_extra or {}
- required_level = metadata.get("consent_level")
-
- # A field without a consent level is considered public OR
- # Include the field if the user's consent level is sufficient
- if is_consent_allowable(required_level, privacy_settings):
- # If the field value is also a SerializableModel, recursively filter it
- if isinstance(field_value, SerializableModel):
- updates[field_name] = filter_model_by_consent(field_value, privacy_settings)
- elif isinstance(field_value, list):
- filtered_list = []
- for item in field_value:
- if isinstance(item, SerializableModel):
- filtered_list.append(filter_model_by_consent(item, privacy_settings))
- else:
- filtered_list.append(item)
- updates[field_name] = filtered_list
- else:
- updates[field_name] = field_value
- else:
- # Set field to None if consent is not allowable
- updates[field_name] = None
-
- try:
- # Try the standard approach first
- return model_update(model, updates)
- except ValidationError:
- # If validation fails due to non-optional fields being set to None,
- # create a new model where those fields are Optional
-
- field_definitions: dict[str, tuple[type[Any] | None, FieldInfo | None]] = {}
- for field_name, field_info in model.__class__.model_fields.items():
- if field_name in updates and updates[field_name] is None:
- # Make this field Optional since we're setting it to None, preserving metadata
- field_definitions[field_name] = (
- field_info.annotation | None,
- Field(default=None, json_schema_extra=field_info.json_schema_extra),
- )
- else:
- # Keep original field definition
- field_definitions[field_name] = (field_info.annotation, field_info)
-
- # Create a new model class with filtered fields made Optional
- base_class = (
- model.__class__
- if hasattr(model.__class__, "__bases__") and model.__class__.__bases__
- else SerializableModel
- )
- filtered_model_class = create_model(
- f"{model.__class__.__name__}Filtered",
- __base__=base_class,
- **field_definitions, # pyre-ignore[6]: pyre can't check this since it's an untyped dict
- )
- return filtered_model_class(**updates) # pyre-ignore[6]: pyre can't check this since it's an untyped dict
-
-
-def emit_posthog_event(posthog_event: PosthogEventModel[Any]) -> None:
- """Filters properties from a Pydantic model instance based on the user's given consent level.
-
- This can be called both from inside the imblue-cli task container, or the Sculptor backend.
- If invoked from inside the imbue-cli task container, task_id is added to the event_data by calling
- get_telemetry_task_info_when_inside_container().
-
- If you are in the Sculptor backend, you should probably use fire_posthog_event instead, as that ensures you're using
- a known event, and attaches correct context.
- """
- posthog_user_instance = get_user_posthog_instance()
-
- if posthog_user_instance:
- user_config_instance = posthog_user_instance.access_user_config()
- privacy_settings = user_config_instance.privacy_settings
-
- with posthog_context(posthog_user_instance):
- if not is_consent_allowable(ConsentLevel.NONE, privacy_settings):
- # User did not opt-into any data collection.
- # We should not log user-identifiable PostHog events.
- return
- elif posthog_event.action is not None and not is_consent_allowable(
- ConsentLevel.PRODUCT_ANALYTICS, privacy_settings
- ):
- # If this is a user_activity event but user has not consented
- # to product analytics logging level, do not emit event.
- return
- event_name = posthog_event.name.value
-
- # Use the recursive filtering function
- try:
- filtered_model = filter_model_by_consent(posthog_event, privacy_settings)
- properties = filtered_model.model_dump()
- except Exception as e:
- logger.info("Failed to filter posthog event: {}", e)
- # We could also choose to drop the entire event, or replace payload with an error message.
- properties = posthog_event.model_dump()
-
- # some events don't have a payload, but they should still be logged.
- if properties.get("payload"):
- payload = properties["payload"]
- if not any(value is not None for value in payload.values()):
- logger.debug("No payload data to log for event of type {}", event_name)
- return
-
- # Check for task-specific telemetry info and add it to the properties
- telemetry_task_info = get_telemetry_task_info_if_inside_container()
- if telemetry_task_info:
- # I think we will further change where we put this.
- properties["task_id"] = str(telemetry_task_info.task_id)
-
- posthog_user_instance.posthog_instance.capture(event=event_name, properties=properties)
-
-
-def shutdown_posthog() -> None:
- """Flush all messages and cleanly shut down the client."""
- posthog_instance = get_user_posthog_instance()
- if posthog_instance is not None:
- posthog_instance.posthog_instance.shutdown()
-
-
-class PosthogExceptionPayload(PosthogEventPayload):
- exception_name: str = with_consent(ConsentLevel.ERROR_REPORTING, description="The name of the raised exception.")
- exception_value: str = with_consent(ConsentLevel.ERROR_REPORTING, description="The value of the raised exception.")
- exception_traceback: str | None = with_consent(
- ConsentLevel.ERROR_REPORTING,
- description="Formatted traceback of the raised exception.",
- )
- message: str | None = with_consent(
- ConsentLevel.ERROR_REPORTING,
- description="The message that accompanies the raised exception.",
- )
-
-
-def get_exception_payload(
- exception: BaseException,
- message: str | None = None,
- include_traceback: bool = False,
-) -> PosthogExceptionPayload:
- formatted_traceback = "".join(traceback.format_exception(type(exception), exception, exception.__traceback__))
- return PosthogExceptionPayload(
- exception_name=type(exception).__name__,
- exception_value=str(exception),
- exception_traceback=formatted_traceback if include_traceback else None,
- message=message,
- )
-
-
-def send_exception_to_posthog(
- error_source: SculptorPosthogEvent,
- exception: BaseException,
- message: str | None = None,
- include_traceback: bool = False,
- component: ProductComponent = ProductComponent.CROSS_COMPONENT,
- task_id: TaskID | None = None,
-) -> None:
- """Sends error details to PostHog for telemetry purposes.
-
- The idea is that for some exceptions, we don't want to send them to Sentry because we're not able to act on them anyway.
- But we should still keep an eye on how often they happen so we send them to PostHog instead.
- """
-
- # TODO: do we want to include this filtering even if we're sending it to posthog rather than sentry?
- should_skip = pre_filter_exception(exception, message)
- if should_skip:
- return
-
- emit_posthog_event(
- PosthogEventModel(
- name=error_source,
- component=component,
- payload=get_exception_payload(exception, message, include_traceback),
- task_id=str(task_id) if task_id else None,
- )
- )
-
- inject_exception_and_log(exception, message or "", priority=ExceptionPriority.LOW_PRIORITY)
-
-
-def flush_sentry_and_exit_program(exit_code: int, final_message: str) -> None:
- """Flush Sentry events and then immediately exit the program with a final message.
-
- We enforce the final message so that the last line that the user sees is relevant to the shutdown.
- """
- sentry_sdk.flush()
- shutdown_posthog()
- logger.info(final_message)
- os._exit(exit_code)
-
-
-def mirror_exception_to_posthog(event: Event, hint: Hint) -> Event:
- """Helper/utility function to mirror an exception from Sentry to PostHog.
-
- When this is wired up to the before_send hook in Sentry, it will send a correctly-shaped event to PostHog, and annotate the Sentry event with the PostHog user id.
- """
- # Only mirror error events
- if event.get("level") in ("warning", "error", "fatal") and hint and hint.get("exc_info"):
- logger.info("We are going to mirror this exception to posthog")
- _, exc_value, _ = hint["exc_info"]
- # Attach useful Sentry context as PostHog properties
- props = {
- "$exception_level": event.get("level"),
- "sentry_event_id": event.get("event_id"),
- "sentry_issue_id": event.get("contexts", {}).get("trace", {}).get("trace_id"),
- "tags": event.get("tags"),
- "release": event.get("release"),
- "environment": event.get("environment"),
- "log_message": event.get("logentry", {}).get("message"),
- }
-
- user_posthog_instance = get_user_posthog_instance()
- if user_posthog_instance:
- user_config = user_posthog_instance.access_user_config()
- try:
- user_posthog_instance.posthog_instance.capture_exception(
- exc_value,
- # We're relying on the fact that we are in a single-user context to always have a distinct_id.
- distinct_id=user_config.user_id,
- properties=props,
- )
-
- event.setdefault("tags", {})
- assert "tags" in event, "Only to shut up typechecker below"
-
- event["tags"]["posthog_exception_mirrored"] = "true"
-
- event.setdefault("extra", {})
- assert "extra" in event, "Only to shut up typechecker below"
-
- event["extra"]["posthog_user_id"] = (user_config.user_id,)
-
- posthog_app_domain = user_posthog_instance.posthog_instance.host.replace(
- ".i.posthog.com", ".posthog.com"
- )
-
- event["extra"]["posthog_user_link"] = f"{posthog_app_domain}/persons/{user_config.user_id}"
-
- except Exception as e:
- # We don't want to trigger an infinite loop of exceptions if PostHog is down. We're sending a message to
- # Sentry after all.
- logger.debug("Failed to mirror exception to PostHog: {}", e)
- finally:
- # We must return the event in all code paths to ensure sentry continues.
- return event
-
- # We must return the event in all code paths to ensure sentry continues.
- return event
diff --git a/imbue_core/imbue_core/sculptor/telemetry_constants.py b/imbue_core/imbue_core/sculptor/telemetry_constants.py
@@ -1,216 +0,0 @@
-from enum import Enum
-
-
-class ConsentLevel(Enum):
- """Defines the hierarchy of user consent levels."""
-
- NONE = 0
- ERROR_REPORTING = 1 # PostHog and Sentry’s error reporting
- PRODUCT_ANALYTICS = 2 # PostHog’s pageview and autocapture events
- LLM_LOGS = 3 # Capability logging
- SESSION_RECORDING = 4 # PostHog and Sentry’s session recording
-
- NEVER_PERSIST = "never_persist"
-
-
-class ProductComponent(Enum):
- AGENT_TASK = "agent_task"
- CHECKS = "checks"
- TASK = "task"
- ONBOARDING = "onboarding"
- STARTUP = "startup"
- ENVIRONMENT_SETUP = "environment_setup"
- FIX = "fix"
- CLAUDE_CODE = "claude_code"
- IMBUE_VERIFY = "imbue_verify"
- IMBUE_CLI = "imbue_cli"
- AUTH = "auth"
- DATABASE = "database"
- LOCAL_SYNC = "local_sync"
- MANUAL_SYNC = "manual_sync"
- # CROSS_COMPONENT is for logging concerns that are not local to a specific component.
- CROSS_COMPONENT = "cross_component"
- CONFIGURATION = "configuration"
-
-
-class UserAction(Enum):
- CLICKED = "clicked"
- CALLED = "called"
- # more to be defined later
-
-
-# Adding a new event? Please see _get_posthog_token_and_api_host for information about
-# using the developer posthog instance as you build/test your event.
-class SculptorPosthogEvent(Enum):
- """
- DO NOT MUTATE the string values!
-
- Mark as deprecated enums when no longer used.
- """
-
- # TESTING
- TEST_EVENT = "test_event"
-
- # ONBOARDING
- ONBOARDING_INITIALIZATION = "onboarding_initialization"
- ONBOARDING_CONFIGURATION_WIZARD = "onboarding_configuration_wizard"
- ONBOARDING_EMAIL_CONFIRMATION = "onboarding_email_confirmation"
- ONBOARDING_TELEMETRY_CONSENT = "onboarding_telemetry_consent"
- ONBOARDING_STARTUP_CHECKS = "onboarding_startup_checks"
- ONBOARDING_USER_CONFIG_SETTINGS = "onboarding_user_config_settings" # Deprecated, use the following one:
- ONBOARDING_USER_CONFIG_SETTINGS_LOADED = "onboarding_user_config_settings_loaded"
- ONBOARDING_COMPLETED = "onboarding_completed"
-
- ONBOARDING_ANTHROPIC_API_KEY_SET = "onboarding_anthropic_api_key_set"
- ONBOARDING_ANTHROPIC_CREDENTIALS_EXIST = (
- "onboarding_anthropic_credentials_exist" # This only means that oauth completed.
- )
- ONBOARDING_ANTHROPIC_OAUTH_STARTED = "onboarding_anthropic_oauth_started"
- ONBOARDING_ANTHROPIC_OAUTH_CANCELLED = "onboarding_anthropic_oauth_cancelled"
- ONBOARDING_ANTHROPIC_AUTHORIZED = (
- "onboarding_anthropic_authorized" # We've successfully authorized, whether via Oauth or API key
- )
- ONBOARDING_OPENAI_AUTHORIZED = "onboarding_openai_authorized"
- ONBOARDING_DOCKER_INSTALLED = "onboarding_docker_installed"
- ONBOARDING_DOCKER_STARTED = "onboarding_docker_started"
- ONBOARDING_GIT_INSTALLED = "onboarding_git_installed"
-
- # STARTUP
- STARTUP_REMOTE_URL = "startup_remote_url"
- DESKTOP_BACKEND_STARTED = "desktop_backend_started"
-
- # Settings, configuration and preferences
- USER_CONFIG_SETTINGS_EDITED = "user_config_settings_edited"
-
- # TASK
- TASK_PREDICT_BRANCH_NAME = "task_predict_branch_name"
- TASK_START_MESSAGE = "task_start_message"
- TASK_START_REQUESTED = "task_start_requested"
- TASK_FORK_REQUESTED = "task_fork_requested"
- TASK_RUN_TASK_STARTED = "task_run_task_started"
- TASK_USER_MESSAGE = "task_user_message"
- TASK_USER_COMMAND = "task_user_command"
- TASK_USER_FEEDBACK = "task_user_feedback"
-
- # ENVIRONMENT SETUP
- ENVIRONMENT_SETUP_REUSED_EXISTING_ENVIRONMENT = "environment_setup_reused_existing_environment"
- ENVIRONMENT_SETUP_FAILED_TO_REUSE_EXISTING_ENVIRONMENT = "environment_setup_failed_to_reuse_existing_environment"
- ENVIRONMENT_SETUP_IMAGE_CREATION_STARTED = "environment_setup_image_creation_started"
- ENVIRONMENT_SETUP_USING_EXISTING_IMAGE = "environment_setup_using_existing_image"
- ENVIRONMENT_SETUP_IMAGE_CREATION_FINISHED = "environment_setup_image_creation_finished"
- ENVIRONMENT_SETUP_IMAGE_ENSURED = "environment_setup_image_ensured"
- ENVIRONMENT_SETUP_HARD_OVERWROTE_WORKSPACE = "environment_setup_hard_overwrote_workspace"
- ENVIRONMENT_SETUP_DOCKER_CONTROL_PLANE_ALREADY_DOWNLOADED = (
- "environment_setup_docker_control_plane_already_downloaded"
- )
- ENVIRONMENT_SETUP_DOCKER_CONTROL_PLANE_DOWNLOAD_FINISHED = (
- "environment_setup_docker_control_plane_download_finished"
- )
- ENVIRONMENT_SETUP_WAITING_FOR_CONTROL_PLANE_SETUP = "environment_setup_waiting_for_control_plane_setup"
- ENVIRONMENT_SETUP_DOCKER_STARTED_EXISTING_CONTAINER = "environment_setup_docker_started_existing_container"
- ENVIRONMENT_SETUP_DOCKER_CONTAINER_CREATED = "environment_setup_docker_container_created"
- ENVIRONMENT_SETUP_DOCKER_CONTAINER_FINISHED_SETUP = "environment_setup_docker_container_finished_setup"
- ENVIRONMENT_SETUP_REPO_ARCHIVE_CREATED = "environment_setup_repo_archive_created"
- ENVIRONMENT_SETUP_IMAGE_CREATED = "environment_setup_image_created"
- ENVIRONMENT_SETUP_LOCAL_DOCKERFILE_BUILT = "environment_setup_local_dockerfile_built"
- ENVIRONMENT_SETUP_FELL_BACK_TO_DEFAULT_DEVCONTAINER = "environment_setup_fell_back_to_default_devcontainer"
- ENVIRONMENT_SETUP_WRAPPER_DOCKERFILE_BUILT = "environment_setup_wrapper_dockerfile_built"
-
- # TOOL READINESS
- TOOL_READINESS_EVENT_COMPLETED = "tool_readiness_event_completed"
-
- # AGENT_TASK
- AGENT_TASK_ENVIRONMENT_SETUP_FINISHED = "agent_task_environment_setup_finished"
- AGENT_TASK_GIT_SETUP_FINALIZED = "agent_task_git_setup_finalized"
- AGENT_TASK_RUNNING_IN_ENVIRONMENT = "agent_task_running_in_environment"
- AGENT_TASK_RECEIVED_FIRST_TOKEN_FROM_AGENT = "agent_task_received_first_token_from_agent"
-
- # FIX
- FIX_ISSUE_SELECT = "fix_issue_select"
-
- # AGENT RESPONSES
- AGENT_INIT = "agent_init"
- AGENT_ASSISTANT_MESSAGE = "agent_assistant_message"
- AGENT_TOOL_RESULT = "agent_tool_result"
- AGENT_SESSION_END = "agent_session_end"
-
- # USER MESSAGES
- USER_CHAT_INPUT = "user_chat_input"
- USER_COMMAND_INPUT = "user_command_input"
- USER_WRITE_FILE = "user_write_file"
- USER_STOP_AGENT = "user_stop_agent"
- USER_INTERRUPT_PROCESS = "user_interrupt_process"
- USER_FORK_AGENT = "user_fork_agent"
- USER_REMOVE_QUEUED_MESSAGE = "user_remove_queued_message"
- USER_GIT_COMMIT_AND_PUSH = "user_git_commit_and_push"
- USER_GIT_PULL = "user_git_pull"
- USER_COMPACT_TASK_MESSAGE = "user_compact_task_message"
- USER_CONFIGURATION_DATA = "user_configuration_data"
- PROJECT_CONFIGURATION_DATA = "project_configuration_data"
- COMPACTION_SUCCESS = "compaction_success"
-
- # CHECKS
- CHECK_STARTED = "check_started"
- USER_STOP_CHECK_MESSAGE = "user_stop_check_message"
- USER_RESTART_CHECK_MESSAGE = "user_restart_check_message"
-
- # SYSTEM MESSAGES (eg Local * Manual Sync)
- LOCAL_SYNC_SETUP_STARTED = "local_sync_setup_started"
- LOCAL_SYNC_SETUP_AND_ENABLED = "local_sync_setup_and_enabled"
- LOCAL_SYNC_UPDATE_PENDING = "local_sync_update_pending"
- LOCAL_SYNC_UPDATE_COMPLETED = "local_sync_update_completed"
- LOCAL_SYNC_UPDATE_PAUSED = "local_sync_update_paused"
- LOCAL_SYNC_DISABLED = "local_sync_disabled"
- MANUAL_SYNC_MERGE_INTO_USER_ATTEMPTED = "manual_sync_merge_into_user_attempted"
- MANUAL_SYNC_MERGE_INTO_AGENT_ATTEMPTED = "manual_sync_merge_into_agent_attempted"
- RUNNER_RESUME_USER_MESSAGE = "runner_resume_user_message"
- WARNING_AGENT_MESSAGE = "warning_agent_message"
-
- # This is poorly named; it refers to starting a claude -p command in the environment
- # CLAUDE MESSAGES
- CLAUDE_COMMAND = "claude_command"
-
- # This is poorly named; it refers to starting a codex exec command in the environment
- # CODEX MESSAGES
- CODEX_COMMAND = "codex_command"
-
- # IMBUE VERIFY
- IMBUE_VERIFY_CALLED = "imbue_verify_called"
- TRIMMED_IMBUE_VERIFY_CALLED = "trimmed_imbue_verify_called"
- IMBUE_VERIFY_FAILED = "imbue_verify_failed"
-
- # IMBUE CLI
- IMBUE_CLI_START = "imbue_cli_start"
- IMBUE_CLI_CHECK_INITIATED = "imbue_cli_check_initiated"
-
- # LOGIN
- LOGIN_INITIATED = "login_initiated"
- LOGIN_SUCCEEDED = "login_succeeded"
-
- # DATABASE
- DB_WRITE = "db_write"
-
- # RUNTIME TRACKING
- RUNTIME_MEASUREMENT = "runtime_measurement"
-
- # SPACE USAGE TRACKING
- SNAPSHOT_SIZE_MEASUREMENT = "snapshot_size_measurement"
- IMAGE_INFORMATION = "image_information"
-
- # EXCEPTIONS
- # NOTE: if you're adding a new call to log_error_to_posthog, you should most likely add a new value here!
- # this is the only way that we determine where the error originated (unless you set include_traceback=True),
- # so we don't want to reuse these without a good reason
- IRRECOVERABLE_EXCEPTION = (
- "irrecoverable_exception" # only use this if we have no other information on the error's source
- )
- SENTRY_EXCEPTION_DATA_COLLECTION_TOO_SLOW = "sentry_exception_data_collection_too_slow"
- CLAUDE_TRANSIENT_ERROR = "claude_transient_error"
- DATABASE_LOCK_ACQUISITION_TIMEOUT = "database_lock_acquisition_timeout"
- INCOMPATIBLE_DATABASE_LIKELY_FROM_DOWNGRADE = "incompatible_database_likely_from_downgrade"
- FAILED_TO_PARSE_LLM_RESPONSE_WHEN_GENERATING_ISSUES = "failed_to_parse_llm_response_when_generating_issues"
- INVALID_FILE_PATH_FROM_LLM_IN_ISSUE_LOCATION = "invalid_file_path_from_llm_in_issue_location"
- TASK_FAILED_WITH_EXPECTED_ERROR = "task_failed_with_expected_error"
- AGENT_RUNNER_FAILED_BECAUSE_DOCKER_IS_DOWN = "agent_runner_failed_because_docker_is_down"
- FAILED_TO_SNAPSHOT_IMAGE_DURING_SHUTDOWN = "failed_to_snapshot_image_during_shutdown"
- THREAD_IRRECOVERABLE_EXCEPTION = "thread_irrecoverable_exception"
diff --git a/imbue_core/imbue_core/sculptor/telemetry_utils.py b/imbue_core/imbue_core/sculptor/telemetry_utils.py
@@ -1,56 +0,0 @@
-from typing import Any
-
-from pydantic import Field
-
-from imbue_core.sculptor.telemetry_constants import ConsentLevel
-
-
-def _with_consent_level(
- level: ConsentLevel,
- default_factory: Any | None = None,
- default: Any | None = None,
- **kwargs: Any,
-) -> Any:
- """A Pydantic Field factory to annotate a field with a consent level.
- It attaches the level as metadata within the field's JSON schema extras.
- """
- if default_factory is not None:
- assert default is None, "Cannot specify both default and default_factory"
- # pyre-fixme[6]: pyre is confused by the dict literal here, especially since level is not a JsonValue
- return Field(
- default_factory=default_factory,
- json_schema_extra={"consent_level": level},
- **kwargs,
- )
-
- # pyre-fixme[6]: pyre is confused by the dict literal here, especially since level is not a JsonValue
- return Field(default, json_schema_extra={"consent_level": level}, **kwargs)
-
-
-def with_consent(
- level: ConsentLevel,
- default_factory: Any | None = None,
- default: Any | None = None,
- **kwargs: Any,
-) -> Any:
- """A Pydantic Field factory to annotate a field with a consent level.
- It attaches the level as metadata within the field's JSON schema extras.
- """
- return _with_consent_level(level, default_factory=default_factory, default=default, **kwargs)
-
-
-def without_consent(default: Any | None = None, default_factory: Any | None = None, **kwargs: Any) -> Any:
- """A Pydantic Field factory to annotate a field without a consent level."""
- return _with_consent_level(ConsentLevel.NONE, default_factory=default_factory, default=default, **kwargs)
-
-
-def never_log(default: Any | None = None, default_factory: Any | None = None, **kwargs: Any) -> Any:
- """A Pydantic Field factory to annotate a field that should never be logged.
- This is used for in-memory or temporary data that should not be stored long-term.
- """
- return _with_consent_level(
- ConsentLevel.NEVER_PERSIST,
- default_factory=default_factory,
- default=default,
- **kwargs,
- )
diff --git a/imbue_core/imbue_core/sculptor/user_config.py b/imbue_core/imbue_core/sculptor/user_config.py
@@ -1,252 +0,0 @@
-import sys
-from enum import StrEnum
-from typing import Any
-
-from pydantic import Field
-from pydantic.alias_generators import to_camel
-
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.sculptor.telemetry_constants import ConsentLevel
-from imbue_core.sculptor.telemetry_utils import never_log
-from imbue_core.sculptor.telemetry_utils import with_consent
-from imbue_core.sculptor.telemetry_utils import without_consent
-
-_DEFAULT_MODIFIER_KEY = "Cmd" if sys.platform == "darwin" else "Ctrl"
-
-
-class UpdateChannel(StrEnum):
- """Update channel for receiving Sculptor updates."""
-
- STABLE = "STABLE"
- ALPHA = "ALPHA"
-
-
-class PrivacySettings(SerializableModel):
- """This model contains a subset of the the privacy fields that we support."""
-
- is_error_reporting_enabled: bool = Field(False, description="Whether to enable error reporting, i.e. Sentry")
- is_product_analytics_enabled: bool = Field(
- False, description="Whether to enable product analytics, e.g. through PostHog"
- )
- is_llm_logs_enabled: bool = Field(False, description="Whether to enable LLM logs spooling to our systems")
- is_session_recording_enabled: bool = Field(False, description="Whether to enable session recording")
- is_repo_backup_enabled: bool = Field(False, description="Whether to enable repo backup")
- is_full_contribution: bool = Field(
- False,
- description="Synthetic field to let us know if the user has selected full contribution. This includes 'full LLM logs, including code' to train our agent.",
- )
- telemetry_consent_level: str = Field("", description="Telemetry level description")
-
-
-class UserConfig(SerializableModel):
- """Most configuration for user and for Sculptor app behavior should go here.
-
- All required fields must be provided or validation will fail.
-
- When you add a new field, you should add it as a field with a default value so that it is backwards compatible.
- """
-
- user_email: str = without_consent(..., description="User email address")
- user_full_name: str | None = without_consent(None, description="Full name of the user")
- user_git_username: str = without_consent(..., description="Git User name")
- user_id: str = without_consent(..., description="User ID")
- anonymous_access_token: str = never_log(
- ..., description="Unique and local anonymous access token for imbue_gateway"
- )
- organization_id: str = without_consent(..., description="Organization ID")
- instance_id: str = without_consent(..., description="Instance ID")
- is_error_reporting_enabled: bool = without_consent(False, description="Whether to enable error reporting")
- is_product_analytics_enabled: bool = without_consent(False, description="Whether to enable product analytics")
- is_llm_logs_enabled: bool = without_consent(False, description="Whether to enable LLM logs")
- is_session_recording_enabled: bool = without_consent(False, description="Whether to enable session recording")
- is_repo_backup_enabled: bool = without_consent(False, description="Whether to enable repo backup")
- is_full_contribution: bool = without_consent(
- False,
- description="Synthetic field to let us know if the user has selected full contribution. This includes 'full LLM logs, including code' to train our agent.",
- )
- telemetry_consent_level: str = without_consent("", description="Telemetry level description")
- # For now, we give users the option to opt-out of syncing their Claude settings with Sculptor.
- is_claude_configuration_synchronized: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=True,
- description="Whether user's local Claude Code configuration is synchronized with Sculptor.",
- )
- anthropic_api_key: str | None = never_log(None, description="Anthropic API key")
- openai_api_key: str | None = never_log(None, description="OpenAI API key")
- gemini_api_key: str | None = never_log(None, description="Gemini API key")
- is_privacy_policy_consented: bool = without_consent(
- False, description="Whether the user consented to our privacy policy"
- )
- is_telemetry_level_set: bool = without_consent(
- False, description="Whether the user consented to our telemetry level"
- )
- # App configuration:
- app_theme: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default="system",
- description="App theme: light, dark, or system",
- )
- does_send_message_shortcut_include_modifier: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=True,
- description="True if the send message shortcut includes the modifier key. Eg. Cmd+Enter instead of Enter alone.)",
- )
- new_agent_shortcut: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=f"{_DEFAULT_MODIFIER_KEY}+N",
- description="Shortcut for creating a new agent",
- )
- search_agents_shortcut: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=f"{_DEFAULT_MODIFIER_KEY}+K",
- description="Shortcut for searching agents",
- )
- toggle_sidebar_shortcut: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=f"{_DEFAULT_MODIFIER_KEY}+S",
- description="Shortcut for toggling the sidebar",
- )
- global_hotkey: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default="",
- description="Global hotkey to open Sculptor",
- )
- default_llm: str | None = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=None,
- description="Default LLM model for new agents. If None, then most recently used LLM will be used.",
- )
- has_seen_pairing_mode_modal: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=False,
- description="Whether the user has seen the pairing mode modal",
- )
- are_suggestions_enabled: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=True,
- description="Whether to enable the suggestions feature",
- )
- imbue_verify_run_frequency: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default="auto",
- description="Frequency for running Imbue Verify: auto or manual",
- )
- imbue_verify_token_usage_requirement: str = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default="low",
- description="Token threshold for running Imbue Verify: none, low, medium, or high",
- )
- is_forking_beta_feature_on: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=False,
- description="Whether to enable the forking beta feature",
- )
- is_pairing_mode_stashing_beta_feature_on: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=False,
- description="Whether to enable the pairing mode stashing beta feature",
- )
- is_pairing_mode_warning_before_stash_enabled: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=True,
- description="Whether to show a warning dialog before stashing changes when starting pairing mode",
- )
- are_dev_suggestions_on: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=False,
- description="Whether to enable the dev suggestions pane",
- )
- is_scout_beta_feature_on: bool = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=False,
- description="Whether to enable the scout beta feature",
- )
-
- # NOTE: The electron frontend might read this value directly in configFallback.ts. Please remember to keep them in sync.
- update_channel: UpdateChannel = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=UpdateChannel.STABLE,
- description="Update channel for receiving Sculptor updates (stable or alpha)",
- )
- max_snapshot_size_bytes: int = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=50 * 1024 * 1024,
- description="Maximum snapshot size in bytes.",
- )
- min_free_disk_gb: float = with_consent(
- ConsentLevel.PRODUCT_ANALYTICS,
- default=2.0,
- description="The minimum free disk space before Sculptor will stop allowing new tasks and messages",
- )
-
- @property
- def is_imbue_user(self) -> bool:
- return self.user_email.endswith("@imbue.com")
-
- @property
- def free_disk_gb_warn_limit(self) -> float:
- return self.min_free_disk_gb * 3.0
-
- @property
- def privacy_settings(self) -> PrivacySettings:
- """Retrieves the subset of fields associated with Privacy Settings"""
- return PrivacySettings(
- is_error_reporting_enabled=self.is_error_reporting_enabled,
- is_product_analytics_enabled=self.is_product_analytics_enabled,
- is_llm_logs_enabled=self.is_llm_logs_enabled,
- is_session_recording_enabled=self.is_session_recording_enabled,
- is_repo_backup_enabled=self.is_repo_backup_enabled,
- is_full_contribution=self.is_full_contribution,
- telemetry_consent_level=self.telemetry_consent_level,
- )
-
- @property
- def sentry_user_context(self) -> dict[str, str]:
- """Returns a dictionary of user context information for Sentry error reporting."""
- return {
- "id": self.user_id, # this is conveniently the same id as used by posthog client
- "email": self.user_email,
- "username": self.user_email, # traditionally what we have been setting as username
- }
-
-
-# At Runtime, ensure that all fields in PrivacySettings are also in UserConfig
-for field in PrivacySettings.model_fields:
- assert field in UserConfig.model_fields, f"PrivacySettings field {field} is missing from UserConfig"
-
-
-def _generate_user_config_field_enum() -> type[StrEnum]:
- """Generate UserConfigField enum from UserConfig model fields"""
- fields = {}
- for field_name in UserConfig.model_fields.keys():
- # Convert field name to SCREAMING_SNAKE_CASE for enum constant
- enum_name = field_name.upper()
- fields[enum_name] = to_camel(field_name)
- # pyre thinks this is an instance of a StrEnum because it doesn't understand enums
- return StrEnum("UserConfigField", fields) # pyre-ignore[7, 19]
-
-
-UserConfigField: type[StrEnum] = _generate_user_config_field_enum()
-
-
-def calculate_user_config_prior_values(
- old_config: UserConfig, new_config: UserConfig, privacy_settings: PrivacySettings
-) -> dict[str, Any]:
- from imbue_core.sculptor.telemetry import is_consent_allowable
-
- old_dict = old_config.model_dump()
- new_dict = new_config.model_dump()
- prior_values: dict[str, Any] = {}
-
- for field_name in old_dict:
- if old_dict[field_name] != new_dict[field_name]:
- field_info = UserConfig.model_fields.get(field_name)
- if field_info:
- metadata = field_info.json_schema_extra or {}
- required_level = metadata.get("consent_level")
- if is_consent_allowable(required_level, privacy_settings):
- prior_values[field_name] = old_dict[field_name]
- else:
- prior_values[field_name] = None
-
- return prior_values
diff --git a/imbue_core/imbue_core/sentry_loguru_handler.py b/imbue_core/imbue_core/sentry_loguru_handler.py
@@ -1,367 +0,0 @@
-"""
-inlines sentry_sdk.integrations.loguru and sentry_sdk.integrations.logging, so we can make some changes.
-i'm intentionally keeping most of the old logic so this still behaves roughly as expected/documented.
-
-we probably could/should go through and fully streamline this though to do just what we need.
-
-The changes so far (could be out of date):
-- adds `strip_extra` to the breadcrumb handler
-- adds `add_extra_info_hook` to the event handler, with a watchdog to make sure it doesn't slow things down
-"""
-
-import asyncio
-import enum
-import logging
-from concurrent.futures import Future
-from concurrent.futures import ThreadPoolExecutor
-from concurrent.futures import wait
-from datetime import datetime
-from datetime import timezone
-from fnmatch import fnmatch
-from typing import Any
-from typing import Callable
-from typing import Iterable
-from typing import Sequence
-
-import sentry_sdk
-from loguru import logger
-from sentry_sdk import new_scope
-
-# "This disables recording (both in breadcrumbs and as events) calls to a logger of a specific name. Among other uses, many of our integrations
-# use this to prevent their actions being recorded as breadcrumbs. Exposed to users as a way to quiet spammy loggers."
-# We have to import it so that existing setters work properly
-from sentry_sdk.integrations.logging import _IGNORED_LOGGERS
-from sentry_sdk.types import Event
-from sentry_sdk.types import Hint
-from sentry_sdk.utils import current_stacktrace
-from sentry_sdk.utils import event_from_exception
-from sentry_sdk.utils import to_string
-
-from imbue_core.constants import HIGH_PRIORITY_LEVEL
-from imbue_core.constants import LOW_PRIORITY_LEVEL
-from imbue_core.constants import MEDIUM_PRIORITY_LEVEL
-from imbue_core.s3_uploader import EXTRAS_UPLOADED_FILES_KEY
-
-# for formatting the log message. we don't want the timestamp/level because sentry already tracks that,
-# and it messes up event grouping since this string becomes the event title.
-SENTRY_LOG_FORMAT = "{name}:{function}:{line} - {message}"
-
-
-class SentryLoguruLoggingLevels(enum.IntEnum):
- TRACE = 5
- DEBUG = 10
- INFO = 20
- SUCCESS = 25
- WARNING = 30
- # Additional loguru levels for sentry hot-wiring that we also present with custom colors in the console.
- # The mapping to sentry levels for both breadcrumbs and reporting is done in map_to_sentry_name()
- LOW_PRIORITY = LOW_PRIORITY_LEVEL # pyre-ignore[8]: pyre doesn't understand enums
- MEDIUM_PRIORITY = MEDIUM_PRIORITY_LEVEL # pyre-ignore[8]: pyre doesn't understand enums
- HIGH_PRIORITY = HIGH_PRIORITY_LEVEL # pyre-ignore[8]: pyre doesn't understand enums
- ERROR = 40
- CRITICAL = 50
-
- def map_to_sentry_name(self) -> str:
- # Sentry only understands and respects "debug", "info", "warning", "error", "critical", "fatal"
- match self:
- case SentryLoguruLoggingLevels.TRACE | SentryLoguruLoggingLevels.DEBUG:
- return "debug"
- case SentryLoguruLoggingLevels.INFO | SentryLoguruLoggingLevels.SUCCESS:
- return "info"
- case SentryLoguruLoggingLevels.LOW_PRIORITY:
- return "info"
- case SentryLoguruLoggingLevels.MEDIUM_PRIORITY | SentryLoguruLoggingLevels.WARNING:
- return "warning"
- case SentryLoguruLoggingLevels.HIGH_PRIORITY | SentryLoguruLoggingLevels.ERROR:
- return "error"
- case SentryLoguruLoggingLevels.CRITICAL:
- return "critical"
- case _:
- return ""
-
-
-class _BaseHandler(logging.Handler):
- COMMON_RECORD_ATTRS = frozenset(
- (
- "args",
- "created",
- "exc_info",
- "exc_text",
- "filename",
- "funcName",
- "levelname",
- "levelno",
- "linenno",
- "lineno",
- "message",
- "module",
- "msecs",
- "msg",
- "name",
- "pathname",
- "process",
- "processName",
- "relativeCreated",
- "stack",
- "tags",
- "taskName",
- "thread",
- "threadName",
- "stack_info",
- )
- )
-
- def _can_record(self, record: logging.LogRecord) -> bool:
- """Prevents ignored loggers from recording"""
- for logger in _IGNORED_LOGGERS:
- if fnmatch(record.name, logger):
- return False
- return True
-
- def _extra_from_record(self, record: logging.LogRecord) -> dict[str, object]:
- return {
- k: v
- for k, v in vars(record).items()
- if k not in self.COMMON_RECORD_ATTRS and (not isinstance(k, str) or not k.startswith("_"))
- }
-
- def _logging_to_event_level(self, record: logging.LogRecord) -> str:
- try:
- return SentryLoguruLoggingLevels(record.levelno).map_to_sentry_name()
- except ValueError:
- return record.levelname.lower() if record.levelname else ""
-
-
-def _wrap_callback(callback: Callable) -> None:
- try:
- callback()
- except Exception as e:
- log_error_inside_sentry(e, "Sentry callback raised")
-
-
-class SentryEventHandler(_BaseHandler):
- """A logging handler that emits Sentry events for each log record."""
-
- def __init__(
- self,
- level: int = logging.NOTSET,
- add_extra_info_hook: Callable[[Event, Hint], tuple[Event, Hint, tuple[Callable, ...]]] | None = None,
- ) -> None:
- super().__init__(level=level)
- self.add_extra_info_hook = add_extra_info_hook
- self.add_extra_info_previously_timed_out = False
- self._executor: ThreadPoolExecutor | None = ThreadPoolExecutor()
- self._futures: list[Future] = []
-
- def emit(self, record: logging.LogRecord) -> Any:
- self.format(record)
- return self._emit(record)
-
- def schedule_callbacks(self, callbacks: Sequence[Callable]) -> None:
- executor = self._executor
- if executor is not None:
- logger.info(f"Sentry event handler registered {len(callbacks)} callbacks with an executor")
- for callback in callbacks:
- future = executor.submit(lambda c=callback: _wrap_callback(c))
- self._futures.append(future)
- else:
- logger.debug(
- f"Sentry event handler failed to register {len(callbacks)} callbacks because no executor was found"
- )
-
- def close(self) -> None:
- executor = self._executor
- if executor is not None:
- executor.shutdown(wait=False)
- wait(self._futures, timeout=5.0)
- self._executor = None
- super().close()
-
- def _emit(self, record: logging.LogRecord) -> None:
- if not self._can_record(record):
- return
-
- # Filter out KeyboardInterrupt and CancelledError exceptions from being logged to Sentry
- if record.exc_info and record.exc_info[0] is not None:
- exc_type = record.exc_info[0]
- if exc_type is KeyboardInterrupt or exc_type is asyncio.CancelledError:
- return
-
- client = sentry_sdk.get_client()
- if not client.is_active():
- return
-
- client_options = client.options
-
- # exc_info might be None or (None, None, None)
- #
- # exc_info may also be any falsy value due to Python stdlib being
- # liberal with what it receives and Celery's billiard being "liberal"
- # with what it sends. See
- # https://github.com/getsentry/sentry-python/issues/904
- if record.exc_info and record.exc_info[0] is not None:
- event, hint = event_from_exception(
- record.exc_info,
- client_options=client_options,
- mechanism={"type": "logging", "handled": True},
- )
- elif (record.exc_info and record.exc_info[0] is None) or record.stack_info:
- event: Event = {}
- hint: Hint = {}
- event["threads"] = {
- "values": [
- {
- "stacktrace": current_stacktrace(
- include_local_variables=client_options["include_local_variables"],
- max_value_length=client_options["max_value_length"],
- ),
- "crashed": False,
- "current": True,
- }
- ]
- }
- else:
- event: Event = {}
- hint: Hint = {}
-
- hint["log_record"] = record
-
- level = self._logging_to_event_level(record)
- if level in {"debug", "info", "warning", "error", "critical", "fatal"}:
- # standard levels that sentry understands, it ignores any other types
- event["level"] = level # type: ignore[typeddict-item]
-
- event["logger"] = record.name
-
- # Log records from `warnings` module as separate issues
- record_captured_from_warnings_module = record.name == "py.warnings" and record.msg == "%s"
- if record_captured_from_warnings_module:
- # use the actual message and not "%s" as the message
- # this prevents grouping all warnings under one "%s" issue
- msg = record.args[0] # type: ignore
-
- event["logentry"] = {
- "message": msg,
- "params": (),
- }
-
- else:
- event["logentry"] = {
- # TODO: a bit lame, but we don't have access to the unformatted message, so we just reverse our current format...
- "message": to_string(record.msg).split(" - ")[-1],
- "params": record.args,
- }
-
- event["extra"] = self._extra_from_record(record)
-
- if self.add_extra_info_hook:
- event, hint, callbacks = self.add_extra_with_watchdog(event, hint, timeout=1)
- self.schedule_callbacks(callbacks)
-
- sentry_sdk.capture_event(event, hint)
-
- def add_extra_with_watchdog(
- self, event: Event, hint: Hint, timeout: float
- ) -> tuple[Event, Hint, tuple[Callable, ...]]:
- """Call the add_extra_info_hook with a watchdog so we can skip it if it's slow, and get another sentry error about that."""
- if self.add_extra_info_previously_timed_out:
- event.setdefault("extra", {})["add_extra_info_previously_timed_out"] = True
- return event, hint, tuple()
- if "attachments" not in hint:
- hint["attachments"] = []
- executor = ThreadPoolExecutor()
- add_extra_info_hook = self.add_extra_info_hook
- assert add_extra_info_hook is not None
- future = executor.submit(add_extra_info_hook, event, hint)
- try:
- event, hint, callbacks = future.result(timeout=timeout)
- executor.shutdown()
- return event, hint, callbacks
- except TimeoutError as e:
- from imbue_core.sculptor.telemetry import ProductComponent
- from imbue_core.sculptor.telemetry import SculptorPosthogEvent
- from imbue_core.sculptor.telemetry import send_exception_to_posthog
-
- send_exception_to_posthog(
- SculptorPosthogEvent.SENTRY_EXCEPTION_DATA_COLLECTION_TOO_SLOW,
- e,
- component=ProductComponent.CROSS_COMPONENT,
- )
- # this will leave the thread still running; there's no real way to cancel it.
- # we'll at least set this flag so future errors don't try to run the (bugged?) hook again.
- executor.shutdown(wait=False)
- self.add_extra_info_previously_timed_out = True
-
- # continue with the main event without the extra info
- return event, hint, tuple()
-
-
-class SentryBreadcrumbHandler(_BaseHandler):
- """
- A logging handler that records breadcrumbs for each log record.
-
- Note that you do not have to use this class if the logging integration is enabled, which it is by default.
- """
-
- def __init__(self, level: int = logging.NOTSET, strip_extra: bool = False) -> None:
- super().__init__(level=level)
- self.strip_extra = strip_extra
-
- def emit(self, record: logging.LogRecord) -> Any:
- # is this needed? keeping in case there are side effects that we want to trigger here
- self.format(record)
- return self._emit(record)
-
- def _emit(self, record: logging.LogRecord) -> None:
- if not self._can_record(record):
- return
-
- sentry_sdk.add_breadcrumb(self._breadcrumb_from_record(record), hint={"log_record": record})
-
- def _breadcrumb_from_record(self, record: logging.LogRecord) -> dict[str, Any]:
- return {
- "type": "log",
- "level": self._logging_to_event_level(record),
- "category": record.name,
- "message": record.message,
- "timestamp": datetime.fromtimestamp(record.created, timezone.utc),
- "data": self._extra_from_record(record) if not self.strip_extra else {},
- }
-
-
-def log_error_inside_sentry(
- exception: Exception,
- message: str,
- extra: dict[str, str | int] | None = None,
- additional_s3_uploads: Iterable[str] | None = None,
-) -> None:
- """Log an error to sentry that happens during processing of a sentry event.
-
- This needs to be done very carefully to ensure it won't fail - we don't want to have to have a fallback-fallback handler.
- The caller should ensure everything passed into this is small so there's no chance of size issues.
- """
- client = sentry_sdk.get_client()
- # we want to get rid of any breadcrumbs, attachments, and other stuff that might have caused the original request to fail.
- # this will obviously make it harder to debug; we may want to selectively add some of this back.
- with new_scope() as scope:
- scope.clear()
- event, hint = event_from_exception(
- exception,
- client_options=client.options,
- mechanism={"type": "watchdog", "handled": True},
- )
- event["message"] = message
- if extra is not None:
- if "extra" not in event:
- event["extra"] = {}
- for k, v in extra.items():
- event["extra"][k] = v
-
- # record any other files uploaded to s3
- if additional_s3_uploads is not None:
- event["extra"][EXTRAS_UPLOADED_FILES_KEY + "_erred"] = str(list(additional_s3_uploads))
-
- # Note that new_scope() gives a new "current scope" but doesn't affect the global or isolation scope,
- # which is where most info is actually stored. Typically all 3 scopes are merged before logging the event.
- # So we'll make sure to call capture_event in such a way that this merging doesn't happen.
- client.capture_event(event=event, hint=hint, scope=scope)
diff --git a/imbue_core/imbue_core/serialization.py b/imbue_core/imbue_core/serialization.py
@@ -30,7 +30,6 @@ from yasoo.utils import is_obj_supported_primitive
from yasoo.utils import normalize_type
from yasoo.utils import resolve_types
-from imbue_core.async_monkey_patches import EXCEPTION_LOGGED_FLAG
from imbue_core.fixed_traceback import FixedTraceback
from imbue_core.pydantic_serialization import SerializableModel
from imbue_core.serialization_types import Serializable
@@ -253,7 +252,6 @@ class SerializedException(SerializableModel):
exception: str
args: "tuple[SerializedException | JsonTypeAlias, ...]" # pyre-ignore[11]: pyre doesn't like TypeAliasType
traceback_dict: JsonTypeAlias
- was_logged_by_log_exception: bool = False
@classmethod
def build(cls, exception: BaseException, traceback: TracebackType | None = None) -> "SerializedException":
@@ -269,7 +267,6 @@ class SerializedException(SerializableModel):
exception=get_fully_qualified_name_for_error(exception),
args=tuple(_convert_serialized_exception_args(x, traceback) for x in exception.args),
traceback_dict=FixedTraceback.from_tb(traceback).as_dict(),
- was_logged_by_log_exception=getattr(exception, EXCEPTION_LOGGED_FLAG, False),
)
@cached_property
@@ -309,12 +306,6 @@ class SerializedException(SerializableModel):
)
raise TypeError(" ".join(message_with_arg_info)) from e
- try:
- setattr(exception, EXCEPTION_LOGGED_FLAG, True)
- except AttributeError:
- # We could not set the flag correctly
- pass
-
return exception
def as_formatted_traceback(self) -> str:
diff --git a/imbue_core/imbue_core/subprocess_utils.py b/imbue_core/imbue_core/subprocess_utils.py
@@ -1,798 +0,0 @@
-from __future__ import annotations
-
-import os
-import shlex
-import subprocess
-import time
-from functools import cached_property
-from io import BytesIO
-from pathlib import Path
-from threading import Event
-from typing import Callable
-from typing import Final
-from typing import IO
-from typing import Mapping
-from typing import Protocol
-from typing import Sequence
-
-import attr
-from loguru import logger
-from typing_extensions import Self
-
-from imbue_core.async_monkey_patches import log_exception
-from imbue_core.constants import ExceptionPriority
-from imbue_core.context_managers import call_on_exit
-from imbue_core.errors import ExpectedError
-from imbue_core.event_utils import CompoundEvent
-from imbue_core.event_utils import MutableEvent
-from imbue_core.event_utils import ReadOnlyEvent
-from imbue_core.log_utils import DETAIL
-from imbue_core.log_utils import TRACE
-from imbue_core.pydantic_serialization import FrozenModel
-
-# Received a shutdown signal
-SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE = -9999
-
-SSH_ERROR_RETURN_CODE = 255
-
-
-class HasStdoutAndStderr(Protocol):
- stdout: bytes
- stderr: bytes
-
-
-class HasSshLogs(HasStdoutAndStderr, Protocol):
- ssh_logs: str | None
-
-
-# Useful for streaming logs from a subprocess which may themselves be generated from log lines.
-def log_subprocess_output_line_for_sculptor(
- output_line: str,
- relog_loguru_lines: bool = False,
- is_logging_without_loguru_formatting: bool = False,
-) -> None:
- return log_subprocess_output_line(
- output_line=output_line,
- relog_loguru_lines=relog_loguru_lines,
- is_logging_without_loguru_formatting=is_logging_without_loguru_formatting,
- is_logging_traced=True,
- )
-
-
-def log_subprocess_output_line(
- output_line: str,
- relog_loguru_lines: bool = False,
- is_logging_without_loguru_formatting: bool = False,
- # TODO: remove this -- Sculptor code should not be calling this function. Will be fixed in a followup PR.
- is_logging_traced: bool = False,
-) -> None:
- log_level = TRACE if is_logging_traced else DETAIL
- output_line = output_line.rstrip("\n")
- # very brittle parsing of log format for recursive logging: ef460144-072f-4b74-a712-0f728fdd3f50
- if len(output_line) >= 36 and output_line[4] == "-" and output_line[7] == "-" and output_line[34:36] == "Tuple ":
- # these lines have already been logged in the child; only relog them if we really want to.
- if relog_loguru_lines:
- logger.opt(raw=True).log(log_level, output_line.rstrip("\n") + "\n")
- else:
- if is_logging_without_loguru_formatting:
- logger.opt(raw=True).log(log_level, output_line.rstrip("\n") + "\n")
- else:
- logger.log(log_level, "> " + output_line)
-
-
-def maybe_truncate_middle(output: str, size: int) -> str:
- assert size > 1000, "This doesn't handle small sizes nicely"
- if len(output) < size:
- return output
- # Note: not doing any sort of escaping or special formatting because this should only be for human consumption
- truncate_message = f"\n... OUTPUT TRUNCATED DUE TO BEING OVER {size:_} CHARACTERS ...\n"
- truncate_size = (size - len(truncate_message)) // 2 - 1
- return output[:truncate_size] + truncate_message + output[-truncate_size:]
-
-
-def _stdout_str(has_stdout: "HasStdoutAndStderr") -> str:
- return has_stdout.stdout.decode("utf-8", errors="replace")
-
-
-def _stderr_str(has_stderr: "HasStdoutAndStderr") -> str:
- return has_stderr.stderr.decode("utf-8", errors="replace")
-
-
-def _create_output_from_stdout_and_stderr(
- has_stdout_and_stderr: "HasStdoutAndStderr",
-) -> str:
- return _stdout_str(has_stdout_and_stderr) + _stderr_str(has_stdout_and_stderr)
-
-
-def _create_output_from_stdout_and_stderr_and_ssh_logs(has_ssh_logs: HasSshLogs) -> str:
- without_ssh_logs = _create_output_from_stdout_and_stderr(has_ssh_logs)
- if has_ssh_logs.ssh_logs is None:
- return without_ssh_logs
- return has_ssh_logs.ssh_logs + without_ssh_logs
-
-
-@attr.s(auto_exc=True, auto_attribs=True)
-class CommandError(Exception):
- returncode: int
- stdout: bytes
- stderr: bytes
- command: str
- is_output_already_logged: bool
-
- stdout_str = property(_stdout_str)
- stderr_str = property(_stderr_str)
- output = cached_property(_create_output_from_stdout_and_stderr)
-
- def __str__(self) -> str:
- s = f"Command failed with return code {self.returncode}. command=`{self.command}`"
- if not self.is_output_already_logged:
- # TODO: Consider making truncation configurable
- maybe_truncated_output = maybe_truncate_middle(self.output, 8_000)
- s += f"\noutput:\n{maybe_truncated_output}"
- return s
-
-
-@attr.s(auto_exc=True, auto_attribs=True)
-class RemoteCommandError(CommandError):
- """Remotely executed command failure not due to ssh, meaning `returncode not in (0, 255)`."""
-
- # TODO: include a machine on this so that we can print out the actual hostname!
-
- # We make these errors slightly differently from CommandErrors -
- # where those have the full command line as command, in this we set that to be just the remote command and this var will hold the ssh command.
- ssh_command: str
- ssh_logs: str | None
-
- output = cached_property(_create_output_from_stdout_and_stderr_and_ssh_logs)
-
- def full_command(self) -> str:
- return self.ssh_command + " " + self.command
-
- def __str__(self) -> str:
- s = f"Remote command failed with returncode {self.returncode}. ssh_command= `{self.ssh_command}`, command=`{self.command}`"
- if not self.is_output_already_logged:
- maybe_truncated_output = maybe_truncate_middle(self.output, 8_000)
- s += f"\noutput:\n{maybe_truncated_output}"
- if self.ssh_logs is not None:
- maybe_truncated_ssh_logs = maybe_truncate_middle(self.ssh_logs, 8_000)
- s += f"\nssh_logs:\n{maybe_truncated_ssh_logs}"
- return s
-
-
-@attr.s(auto_exc=True, auto_attribs=True)
-class SSHConnectionError(CommandError):
- """Error for ssh connections that return 255.
-
- No other shell commands should exit 255, as it will be conflated with ssh's error code.
- """
-
- # TODO: include a machine on this so that we can print out the actual hostname!
-
- returncode = SSH_ERROR_RETURN_CODE
- # We make these errors slightly differently from CommandErrors -
- # where those have the full command line as command, in this we set that to be just the remote command and this var will hold the ssh command.
- ssh_command: str
-
- ssh_logs: str | None
-
- output = cached_property(_create_output_from_stdout_and_stderr_and_ssh_logs)
-
- _CONFLATION_WARNING = "WARNING: the remote command may have returned 255 rather than SSH -- Don't do that!"
-
- def __str__(self) -> str:
- return f"SSH failed to connect with returncode {self.returncode}. ssh_command= `{self.ssh_command}`, command=`{self.command}`, ssh_logs=`{self.ssh_logs}`\n{self._CONFLATION_WARNING}"
-
-
-@attr.s(auto_attribs=True, kw_only=True)
-class CompletedProcess:
- """
- Mostly a reimplementation of subprocess.CompletedProcess but allows us to deal with some GI-specific concerns.
- A class to make process results easier to work with for us. We have a couple concerns that are different from typical:
- We run commands over SSH a lot and care about making sure that those errors clearly show both the command being run and the host being run on.
- """
-
- returncode: int
- stdout: bytes
- stderr: bytes
- command: str
- is_output_already_logged: bool
-
- stdout_str = property(_stdout_str)
- stderr_str = property(_stderr_str)
- output = cached_property(_create_output_from_stdout_and_stderr)
-
- def check(self) -> Self:
- if self.returncode != 0:
- error = CommandError(
- command=self.command,
- returncode=self.returncode,
- stdout=self.stdout,
- stderr=self.stderr,
- is_output_already_logged=self.is_output_already_logged,
- )
- if "output" in self.__dict__:
- # We've already calculated the output, so we can just set it here.
- error.output = self.output
- raise error
- # So that this can be chained. For example,
- # hostname = run_local_command("hostname").check().stdout
- return self
-
-
-class ProcessError(ExpectedError):
- def __init__(
- self,
- command: tuple[str, ...],
- stdout: str,
- stderr: str,
- returncode: int | None = None,
- is_output_already_logged: bool | None = False,
- message: str | None = "Command failed with non-zero exit code",
- ) -> None:
- self.returncode = returncode
- self.stdout = stdout
- self.stderr = stderr
- self.command = command
- self.is_output_already_logged = is_output_already_logged
- self.message = message
-
- def describe(self, is_output_included: bool, output_truncation: int | None = 8_000) -> str:
- command = " ".join(shlex.quote(arg) for arg in self.command)
- s = f"{self.message} {self.returncode}. command=`{command}`"
- if is_output_included:
- # TODO: Consider making truncation configurable
- output = self.stdout + "\n" + self.stderr
- maybe_truncated_output = maybe_truncate_middle(output, output_truncation) if output_truncation else output
- s += f"\noutput:\n{maybe_truncated_output}"
- return s
-
- def __str__(self) -> str:
- return self.describe(is_output_included=True)
-
-
-class ProcessTimeoutError(ProcessError):
- def __init__(
- self,
- command: tuple[str, ...],
- stdout: str,
- stderr: str,
- is_output_already_logged: bool = False,
- ) -> None:
- super().__init__(
- command,
- stdout,
- stderr,
- None,
- is_output_already_logged=is_output_already_logged,
- message="Command timed out",
- )
-
-
-class ProcessSetupError(ProcessError):
- def __init__(
- self,
- command: tuple[str, ...],
- stdout: str,
- stderr: str,
- is_output_already_logged: bool = False,
- ) -> None:
- super().__init__(
- command,
- stdout,
- stderr,
- None,
- is_output_already_logged=is_output_already_logged,
- message="Command failed to start",
- )
-
-
-class FinishedProcess(FrozenModel):
- """
- Mostly a reimplementation of subprocess.CompletedProcess but allows us to deal with some GI-specific concerns.
- A class to make process results easier to work with for us. We have a couple concerns that are different from typical:
- We run commands over SSH a lot and care about making sure that those errors clearly show both the command being run and the host being run on.
- """
-
- returncode: int | None = None
- stdout: str
- stderr: str
- command: tuple[str, ...]
- is_timed_out: bool = False
- is_output_already_logged: bool
-
- def check(self) -> Self:
- if self.is_timed_out:
- raise ProcessTimeoutError(
- command=self.command,
- stdout=self.stdout,
- stderr=self.stderr,
- is_output_already_logged=self.is_output_already_logged,
- )
- if self.returncode != 0:
- raise ProcessError(
- command=self.command,
- returncode=self.returncode,
- stdout=self.stdout,
- stderr=self.stderr,
- is_output_already_logged=self.is_output_already_logged,
- )
- # So that this can be chained. For example,
- # hostname = run_local_command("hostname").check().stdout
- return self
-
-
-@attr.s(auto_attribs=True, kw_only=True)
-class RemoteCompletedProcess(CompletedProcess):
- """
- A remote completed process. Beyond CompletedProcess, includes ssh information.
- """
-
- ssh_command: str
-
- # The ssh logs if they were split out.
- ssh_logs: str | None = None
-
- output = cached_property(_create_output_from_stdout_and_stderr_and_ssh_logs)
-
- def check(self) -> Self:
- if self.returncode == SSH_ERROR_RETURN_CODE:
- ssh_connection_error = SSHConnectionError(
- command=self.command,
- returncode=SSH_ERROR_RETURN_CODE,
- stdout=self.stdout,
- stderr=self.stderr,
- ssh_command=self.ssh_command,
- is_output_already_logged=self.is_output_already_logged,
- ssh_logs=self.ssh_logs,
- )
- if "output" in self.__dict__:
- # We've already calculated the output, so we can just set it here.
- ssh_connection_error.output = self.output
- raise ssh_connection_error
- if self.returncode != 0:
- remote_command_error = RemoteCommandError(
- command=self.command,
- ssh_command=self.ssh_command,
- returncode=self.returncode,
- stdout=self.stdout,
- stderr=self.stderr,
- is_output_already_logged=self.is_output_already_logged,
- ssh_logs=self.ssh_logs,
- )
- if "output" in self.__dict__:
- # We've already calculated the output, so we can just set it here.
- remote_command_error.output = self.output
- raise remote_command_error
- return self
-
-
-_READ_SIZE: Final[int] = 2**20
-
-
-@attr.s(auto_attribs=True)
-class PartialOutputContainer:
- """A helper class to make reconstructing log lines returned by pipe.read() easier."""
-
- buffer: BytesIO = attr.ib(factory=BytesIO)
- # Note: This in-memory line could become huge if no newlines are output
- in_progress_line: bytearray = attr.ib(factory=bytearray)
- on_complete_line: Callable[[str], None] | None = None
-
- def write(self, output: bytes) -> None:
- """`output` is the output of pipe.read(), ie a string that may contain newlines."""
- self.buffer.write(output)
- on_complete_line = self.on_complete_line
- if on_complete_line is None:
- # If we don't have a callback, we don't need to do anything else.
- return
-
- lines = output.splitlines(keepends=True)
- for line in lines:
- self.in_progress_line.extend(line)
- if line.endswith((b"\n", b"\r")):
- on_complete_line(self.in_progress_line.decode("utf-8", errors="replace"))
- self.in_progress_line.clear()
-
- def get_complete_output(self) -> bytes:
- return self.buffer.getvalue()
-
-
-@attr.s(auto_attribs=True)
-class OutputGatherer:
- stdout: IO[bytes]
- stderr: IO[bytes]
- stdout_container: PartialOutputContainer
- stderr_container: PartialOutputContainer
- shutdown_event: ReadOnlyEvent
-
- @classmethod
- def build_from_popen(
- cls,
- popen: subprocess.Popen[bytes],
- on_complete_line_from_stdout: Callable[[str], None] | None,
- on_complete_line_from_stderr: Callable[[str], None] | None,
- shutdown_event: ReadOnlyEvent,
- ) -> Self:
- stdout = popen.stdout
- stderr = popen.stderr
- assert stdout is not None
- assert stderr is not None
- # this makes reads on process.stdout nonblocking
- os.set_blocking(stdout.fileno(), False)
- os.set_blocking(stderr.fileno(), False)
-
- return cls(
- stdout=stdout,
- stderr=stderr,
- stdout_container=PartialOutputContainer(on_complete_line=on_complete_line_from_stdout),
- stderr_container=PartialOutputContainer(on_complete_line=on_complete_line_from_stderr),
- shutdown_event=shutdown_event,
- )
-
- def gather_output(self) -> None:
- is_more_from_stdout = True
- is_more_from_stderr = True
- # We may drop some output if the shutdown event is set, but that's okay.
- while not self.shutdown_event.is_set() and (is_more_from_stdout or is_more_from_stderr):
- # We always attempt to read from both streams to avoid starvation.
- partial_stdout = self.stdout.read(_READ_SIZE)
- if partial_stdout is not None:
- self.stdout_container.write(partial_stdout)
- is_more_from_stdout = len(partial_stdout) == _READ_SIZE
- else:
- is_more_from_stdout = False
- partial_stderr = self.stderr.read(_READ_SIZE)
- if partial_stderr is not None:
- self.stderr_container.write(partial_stderr)
- is_more_from_stderr = len(partial_stderr) == _READ_SIZE
- else:
- is_more_from_stderr = False
-
- def get_output(self) -> tuple[bytes, bytes]:
- return (
- self.stdout_container.get_complete_output(),
- self.stderr_container.get_complete_output(),
- )
-
- def get_incomplete_lines(self) -> tuple[str, str]:
- return self.stdout_container.in_progress_line.decode(
- "utf-8", errors="replace"
- ), self.stderr_container.in_progress_line.decode("utf-8", errors="replace")
-
-
-def _shutdown_popen(process: subprocess.Popen[bytes], command: str, shutdown_timeout_sec: float) -> int | None:
- logger.debug(
- f"run_local_command: aborting command (via sigterm to {process.pid}) due to signal...\n",
- command=truncate_command(command, 500),
- )
- # this sends SIGTERM, which is "the normal way to politely ask a program to terminate"
- process.terminate()
- try:
- process.wait(timeout=shutdown_timeout_sec)
- return process.returncode
- except subprocess.TimeoutExpired as e:
- extra = {"command": command, "shutdown_timeout_sec": str(shutdown_timeout_sec)}
- log_exception(
- e,
- "process didn't die within shutdown_timeout_sec of SIGTERM",
- extra=extra,
- priority=ExceptionPriority.LOW_PRIORITY,
- )
- # this sends SIGKILL which immediately kills the process
- process.kill()
- try:
- process.wait(timeout=2)
- return process.returncode
- except subprocess.TimeoutExpired as e:
- log_exception(
- e,
- "process didn't die after kill()",
- extra=extra,
- priority=ExceptionPriority.LOW_PRIORITY,
- )
- return None
-
-
-def _log_input_command(command: str) -> None:
- input_lines = command.splitlines()
- truncation_context = 2
- is_worth_truncating = len(input_lines) > 3 * truncation_context
- if is_worth_truncating:
- input_lines = (
- input_lines[:truncation_context]
- + [" (...content truncated...)"]
- + input_lines[-truncation_context:]
- )
- for line in input_lines:
- logger.trace("< " + line)
-
-
-def _is_timeout(timeout_time: float | None = None) -> bool:
- if timeout_time is None:
- return False
- else:
- return time.time() > timeout_time
-
-
-def run_local_command(
- command: str,
- is_checked: bool = True,
- timeout: float | None = None,
- trace_output: bool = True,
- cwd: Path | None = None,
- trace_on_complete_line_callback: Callable[[str], None] | None = log_subprocess_output_line,
- trace_log_context: Mapping[str, object] | None = None,
- shutdown_event: Event | CompoundEvent | None = None,
- shutdown_timeout_sec: float = 30.0,
-) -> CompletedProcess:
- """
- implementation notes:
- - this function is really tricky to implement well! check with josh, bawr, or zack before making nontrivial changes
- - the reason it's tricky is that we need to both monitor the shutdown event, while also reading the subprocess
- output in realtime to allow realtime log tracing of the output.
- - we previously had an implementation that used a helper thread to read the output, but never seemed to shutdown
- cleanly and left a mess of warnings in the logs, even though it seemed to be implemented properly.
- - the current implementation aims to use just the main thread to avoid this. but then you need to be very careful
- to avoid anything blocking.
- - thus we set the pipe to nonblocking mode so that reads are nonblocking, and we also don't use readline()
- as that could potentially block/deadlock if the process prints long lines with no newlines.
- - don't redirect the process output to a file, as then the command may detect an interactive terminal and use
- line buffering.
- - (bawr) DO NOT CHANGE STDIN TO ANYTHING BESIDES DEV NULL, that'll cause race conditions.
- - potentially there's a cleaner implementation using asyncio, but better the devil you know, yknow?
- """
- trace_log_context = trace_log_context if trace_log_context is not None else {}
- shutdown_event = shutdown_event or Event()
-
- if shutdown_event.is_set():
- result = CompletedProcess(
- returncode=SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE,
- stdout=b"",
- stderr=b"",
- command=command,
- is_output_already_logged=trace_output,
- )
- if is_checked:
- result.check()
- return result
-
- if trace_output:
- _log_input_command(command)
-
- # with bufsize 0 and not setting text, encoding, or errors, the pipe objects will be RawIOBase.
- # use read(2**30) or similar if using this for a nonblocking read.
- # with nonzero bufsize, they will be BufferedIOBase. use read1() if using this for a nonblocking read.
- # with text, encoding, or errors, they will be TextIOBase.
- # this doesn't seem to play nice with nonblocking mode and read().
- process = subprocess.Popen(
- command,
- cwd=cwd,
- shell=True,
- executable="/bin/bash",
- bufsize=0,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env={**os.environ, "TERM": "dumb"},
- )
-
- gatherer = OutputGatherer.build_from_popen(
- process,
- on_complete_line_from_stdout=(trace_on_complete_line_callback if trace_output else None),
- on_complete_line_from_stderr=(trace_on_complete_line_callback if trace_output else None),
- shutdown_event=shutdown_event,
- )
-
- timeout_time = time.time() + timeout if timeout is not None else None
-
- with logger.contextualize(**trace_log_context):
- while not shutdown_event.wait(0.001) and not _is_timeout(timeout_time):
- maybe_exit_code = process.poll()
- gatherer.gather_output()
- if maybe_exit_code is not None:
- exit_code = maybe_exit_code
- break
- else:
- # The shutdown event was set or a timeout limit has been reached,
- # so we should shutdown the process.
- _shutdown_popen(process, command, shutdown_timeout_sec)
- exit_code = SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE
-
- stdout, stderr = gatherer.get_output()
- result = CompletedProcess(
- returncode=exit_code,
- stdout=stdout,
- stderr=stderr,
- command=command,
- is_output_already_logged=trace_output,
- )
- if is_checked:
- result.check()
-
- return result
-
-
-# NOTE: this function is largely duplicated with the above, but subtle changes to the types
-# most of the logic should be the same though, with the exception that we assume stdout and stderr are strings
-def run_local_command_modern_version(
- command: Sequence[str],
- is_checked: bool = True,
- timeout: float | None = None,
- trace_output: bool = False,
- cwd: Path | None = None,
- # this is called for each line, including the last line even if it doesn't end with a newline
- # if there is no output, it is never called
- trace_on_line_callback: Callable[[str, bool], None] | None = None,
- trace_log_context: Mapping[str, object] | None = None,
- shutdown_event: MutableEvent | None = None,
- shutdown_timeout_sec: float = 30.0,
- poll_time: float = 0.01,
- env: Mapping[str, str] | None = None,
- # This callback gets called once either the process is running or it failed to start.
- # The argument is None on success, or the Exception on failure.
- on_initialization_complete: Callable[[BaseException | None], None] = lambda success: None,
-) -> FinishedProcess:
- """
- implementation notes:
- - this function is really tricky to implement well! check with josh, bawr, or zack before making nontrivial changes
- - the reason it's tricky is that we need to both monitor the shutdown event, while also reading the subprocess
- output in realtime to allow realtime log tracing of the output.
- - we previously had an implementation that used a helper thread to read the output, but never seemed to shutdown
- cleanly and left a mess of warnings in the logs, even though it seemed to be implemented properly.
- - the current implementation aims to use just the main thread to avoid this. but then you need to be very careful
- to avoid anything blocking.
- - thus we set the pipe to nonblocking mode so that reads are nonblocking, and we also don't use readline()
- as that could potentially block/deadlock if the process prints long lines with no newlines.
- - don't redirect the process output to a file, as then the command may detect an interactive terminal and use
- line buffering.
- - (bawr) DO NOT CHANGE STDIN TO ANYTHING BESIDES DEV NULL, that'll cause race conditions.
- - potentially there's a cleaner implementation using asyncio, but better the devil you know, yknow?
- - if `env` is set, will overwrite contents passed into subprocess.Popen
-
- raises ProcessError
- """
- with call_on_exit(on_initialization_complete):
- trace_log_context = trace_log_context if trace_log_context is not None else {}
- shutdown_event = shutdown_event or Event()
- command_as_string = " ".join(shlex.quote(arg) for arg in command)
- # NOTE: We create the process even when shutdown_event is already set.
- # It will be terminated almost immediately after starting but the benefit is that the behavior stays consistent.
-
- if trace_output:
- _log_input_command(command_as_string)
-
- # with bufsize 0 and not setting text, encoding, or errors, the pipe objects will be RawIOBase.
- # use read(2**30) or similar if using this for a nonblocking read.
- # with nonzero bufsize, they will be BufferedIOBase. use read1() if using this for a nonblocking read.
- # with text, encoding, or errors, they will be TextIOBase.
- # this doesn't seem to play nice with nonblocking mode and read().
- try:
- process = subprocess.Popen(
- command,
- cwd=cwd,
- bufsize=0,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env,
- )
- except (OSError, ValueError) as e:
- # Raise setup error if process fails to start.
- # OSError: subprocess.Popen fails to start the requested command
- # ValueError: subprocess.Popen malformed arguments
- raise ProcessSetupError(
- command=tuple(command),
- stdout="",
- stderr="",
- is_output_already_logged=trace_output,
- ) from e
-
- if trace_output:
- # NOTE: We could probably condense the truth value of trace_output to
- # just be bool(trace_on_line_callback). However, there's a lot of scary code
- # that depends on these values that I didn't want to audit just now.
- assert trace_on_line_callback, "Must pass trace_on_line_callback"
- on_complete_line_from_stdout = lambda line: trace_on_line_callback(line, True)
- on_complete_line_from_stderr = lambda line: trace_on_line_callback(line, False)
- else:
- on_complete_line_from_stdout = None
- on_complete_line_from_stderr = None
-
- gatherer = OutputGatherer.build_from_popen(
- process,
- on_complete_line_from_stdout=on_complete_line_from_stdout,
- on_complete_line_from_stderr=on_complete_line_from_stderr,
- shutdown_event=shutdown_event,
- )
-
- timeout_time = time.time() + timeout if timeout is not None else None
-
- with logger.contextualize(**trace_log_context):
- while not shutdown_event.wait(poll_time) and not _is_timeout(timeout_time):
- maybe_exit_code = process.poll()
- gatherer.gather_output()
- if maybe_exit_code is not None:
- exit_code = maybe_exit_code
- break
- else:
- # The shutdown event was set or a timeout limit has been reached,
- # so we should shutdown the process.
- exit_code = _shutdown_popen(process, command_as_string, shutdown_timeout_sec)
-
- stdout, stderr = gatherer.get_output()
-
- # send the final incomplete lines as well
- incomplete_stdout_line, incomplete_stderr_line = gatherer.get_incomplete_lines()
- if incomplete_stdout_line:
- if trace_on_line_callback:
- trace_on_line_callback(incomplete_stdout_line, True)
- if incomplete_stderr_line:
- if trace_on_line_callback:
- trace_on_line_callback(incomplete_stderr_line, False)
-
- result = FinishedProcess(
- returncode=exit_code,
- stdout=stdout.decode("utf-8", errors="replace"),
- stderr=stderr.decode("utf-8", errors="replace"),
- command=tuple(command),
- is_timed_out=_is_timeout(timeout_time),
- is_output_already_logged=trace_output,
- )
- if is_checked:
- result.check()
-
- return result
-
-
-def run_remote_command(
- machine_ssh_command: str,
- remote_command: str,
- is_checked: bool = True,
- trace_output: bool = True,
- trace_on_complete_line_callback: Callable[[str], None] = log_subprocess_output_line,
- trace_log_context: Mapping[str, object] | None = None,
- shutdown_event: Event | CompoundEvent | None = None,
- shutdown_timeout_sec: float = 30.0,
-) -> RemoteCompletedProcess:
- """
- :raises SSHConnectionError: if `is_checked and returncode == 255` (the ssh reserved error code)
- :raises RemoteCommandError: if `is_checked and returncode not in (0, 255)`
- """
- # Please please please use proper quoting tools
- escaped_remote_command = shlex.quote(remote_command)
- result = run_local_command(
- f"{machine_ssh_command} {escaped_remote_command}",
- is_checked=False,
- trace_output=trace_output,
- trace_on_complete_line_callback=trace_on_complete_line_callback,
- trace_log_context=trace_log_context,
- shutdown_event=shutdown_event,
- shutdown_timeout_sec=shutdown_timeout_sec,
- )
-
- remote_result = RemoteCompletedProcess(
- returncode=result.returncode,
- stdout=result.stdout,
- stderr=result.stderr,
- ssh_command=machine_ssh_command,
- command=remote_command,
- is_output_already_logged=result.is_output_already_logged,
- )
- if is_checked:
- remote_result.check()
- return remote_result
-
-
-def truncate_command(command: str, num_chars: int = 2000) -> str:
- """Truncates a command to include just the first `num_chars` of the first line."""
- truncated = False
- split_command = command.split("\n")
- if len(split_command) > 1:
- truncated = True
- command = split_command[0]
- if len(command) > num_chars:
- truncated = True
- command = command[:num_chars]
- if truncated:
- command += "... (truncated)"
- return command
diff --git a/imbue_core/imbue_core/suggestions.py b/imbue_core/imbue_core/suggestions.py
@@ -1,85 +0,0 @@
-from typing import Annotated
-from typing import Any
-
-from pydantic import AnyUrl
-from pydantic import Field
-from pydantic import Tag
-
-from imbue_core.agents.data_types.ids import ObjectID
-from imbue_core.data_types import IdentifiedVerifyIssue
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.pydantic_serialization import build_discriminator
-
-
-class SuggestionAction(SerializableModel):
- object_type: str
- # more important -> lower number. Think about it as "my first priority is to..."
- # different actions *may* share the same priority rank, in which case the ties will be broken by a canonical ordering of importance
- priority_rank: int = 0
-
-
-class UseSuggestionAction(SuggestionAction):
- object_type: str = "UseSuggestionAction"
- content: str
-
-
-class VisitLinkSuggestionAction(SuggestionAction):
- object_type: str = "VisitLinkSuggestionAction"
- url: AnyUrl
- link_text: str
-
-
-SuggestionActionTypes = Annotated[
- Annotated[UseSuggestionAction, Tag("UseSuggestionAction")]
- | Annotated[VisitLinkSuggestionAction, Tag("VisitLinkSuggestionAction")],
- build_discriminator(),
-]
-
-
-# FIXME(johnny): move to imbue_core.imbue_cli.action
-class CheckOutputID(ObjectID):
- tag: str = "chko"
-
-
-class Suggestion(SerializableModel):
- # TODO: this is here because we're treating this like an issue, but we may not want to do that
- object_type: str = "Suggestion"
- # TODO: remove the default factory once we have properly migrated to the new check output protocol
- # Also see sculptor/sculptor/tasks/handlers/run_agent/checks/check_process.py::275
- id: CheckOutputID = Field(default_factory=CheckOutputID)
- title: str = Field(min_length=1)
- description: str = ""
- # (if sculptor speaks true) pyre doesn't like float values for gt/ge/le because... it's looking for def __gt__(self: T, __other: T) -> bool and float has def __gt__(self, value: float, /) -> bool
-
- # will probably be technically implemented by asking an LLM to come up with a number between 1 and 10,
- # so probably really will range between 0.1 and 1.0
- severity_score: float = Field(
- ge=0.0, # pyre-ignore[6]
- le=1.0, # pyre-ignore[6]
- description="A score between 0.0 and 1.0 indicating how severe the issue is that this suggestion addresses.",
- )
- # unlike the severity, this is about how sure we are that this is a good suggestion,
- # for example, you can be confident that there is a real problem (high confidence_score)
- # but it might be about some edge case that doesn't matter (low severity_score)
- confidence_score: float = Field(
- ge=0.0, # pyre-ignore[6]
- le=1.0, # pyre-ignore[6]s
- description="A score between 0.0 and 1.0 indicating how confident we are that this suggestion addresses a real issue.",
- )
- # these are the possible actions that the user can take with this suggestion
- # for right now, the only ones implemented are "USE" and "COPY"
- actions: tuple[SuggestionActionTypes, ...]
- original_issues: tuple[IdentifiedVerifyIssue, ...]
-
-
-# FIXME(johnny): move these to the right location, use the right types, etc -- these are just placeholders to demonstrate how this works
-# FIXME(johnny): migrate to just using the ActionOutputUnion from imbue_core.imbue_cli.action
-CheckOutputTypes = Annotated[
- Annotated[Suggestion, Tag("Suggestion")],
- build_discriminator(),
-]
-
-
-# FIXME(johnny): remove this once we move to using the ActionOutputUnion from imbue_core.imbue_cli.action
-def is_check_output(output: Any) -> bool:
- return isinstance(output, Suggestion)
diff --git a/imbue_core/imbue_core/test_repo_utils.py b/imbue_core/imbue_core/test_repo_utils.py
@@ -5,11 +5,6 @@ import tempfile
from pathlib import Path
from typing import Generator
-from imbue_core.common import get_temp_dir
-from imbue_core.git import LocalGitRepo
-from imbue_core.git import get_git_repo_root
-from imbue_core.test_utils import create_temp_dir
-
def make_simple_test_git_repo() -> Generator[Path, None, None]:
"""Create a temporary git repository for testing.
@@ -39,19 +34,3 @@ def make_simple_test_git_repo() -> Generator[Path, None, None]:
subprocess.run(["git", "commit", "-m", "Initial commit file2"], cwd=repo_path, check=True)
yield repo_path
-
-
-def make_mock_repo(path: Path, is_recreating: bool = False) -> Generator[LocalGitRepo, None, None]:
- mock_repo = LocalGitRepo(base_path=path)
- with create_temp_dir(root_dir=Path(get_temp_dir())) as temp_dir:
- temp_repo = mock_repo.sync_copy_repo(temp_dir)
- temp_repo.sync_configure_git(
- git_user_name="AGI (Automated Software Inspector)",
- git_user_email="the_true_AGI@running.pytest.com",
- is_recreating=is_recreating,
- )
- yield temp_repo
-
-
-def make_test_data_mock_repo() -> Generator[LocalGitRepo, None, None]:
- yield from make_mock_repo(get_git_repo_root() / "imbue/imbue/test_data/mock_repo", is_recreating=False)
diff --git a/imbue_core/imbue_core/testing_utils.py b/imbue_core/imbue_core/testing_utils.py
@@ -1,189 +0,0 @@
-import asyncio
-import inspect
-import os
-import shutil
-from contextlib import asynccontextmanager
-from contextlib import contextmanager
-from pathlib import Path
-from typing import Any
-from typing import AsyncGenerator
-from typing import Callable
-from typing import ContextManager
-from typing import Coroutine
-from typing import Generator
-from typing import Iterable
-from typing import Protocol
-from typing import Sequence
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from uuid import uuid4
-
-import anyio
-import pytest
-import pytest_asyncio
-from _pytest.fixtures import Config
-from _pytest.python import Function
-
-from imbue_core.common import get_temp_dir
-
-if TYPE_CHECKING:
- # noinspection PyProtectedMember
- from _pytest.fixtures import _ScopeName
-
-T = TypeVar("T")
-
-_TestFunc = Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
-
-
-def fixture(
- fixture_function: Any | None = None,
- *,
- scope: "_ScopeName | Callable[[str, Config], _ScopeName]" = "function",
- params: Iterable[object] | None = None,
- autouse: bool = False,
- ids: Sequence[object | None] | Callable[[Any], object | None] | None = None,
-) -> Any:
- def decorator(function: Any) -> Any:
- true_name = function.__name__[:-1]
- if inspect.iscoroutinefunction(function):
- return pytest_asyncio.fixture(
- function,
- name=true_name,
- scope=scope,
- params=params,
- autouse=autouse,
- ids=ids, # type: ignore
- )
- else:
- return pytest.fixture(
- function,
- name=true_name,
- scope=scope,
- params=params,
- autouse=autouse,
- ids=ids,
- )
-
- if fixture_function is not None and callable(fixture_function):
- return decorator(fixture_function)
-
- return decorator
-
-
-def placeholder_param_for_mark(
- marks: pytest.MarkDecorator | list[pytest.MarkDecorator],
-) -> object:
- """Returns a param for annotating a fixture with marks.
-
- It can be useful to add marks to a fixture that propagate to all functions that use it.
- However, decorating a fixture function with "@pytest.mark.foo_bar" has no effect.
-
- On the other hand, parameterized fixtures can have marks attached to each parameter;
- thus a workaround is to parameterize the fixture with a single placeholder param.
- Use this function like this:
-
- @pytest.fixture(params=placeholder_param_for_mark(pytest.mark.foo_bar))
-
- One slightly annoying side effect is that the param will show up in the test name.
- We can get rid of it with pytest_collection_modifyitems,
- but it's probably simpler to live with it.
-
- You don't need this function if the fixture is already parameterized;
- simply add the desired mark to the params:
-
- @pytest.fixture(params=[pytest.param(0, marks=pytest.mark.foo_bar),
- pytest.param(1, marks=pytest.mark.foo_bar)])
-
- See https://github.com/pytest-dev/pytest/issues/1368 for more context.
- """
- return pytest.param("placeholder_param", marks=marks)
-
-
-def use(*args: Callable[..., Any]) -> Any:
- true_names = [x.__name__[:-1] for x in args]
- return pytest.mark.usefixtures(*true_names)
-
-
-def integration_test(function: _TestFunc) -> Any:
- return pytest.mark.integration_test(function)
-
-
-def slow_integration_test(function: _TestFunc) -> Any:
- return pytest.mark.slow_integration_test(function)
-
-
-class RequestFixture(Protocol[T]):
- """Yes, there is a class called FixtureRequest, but the types are quite bad for it"""
-
- node: Function
- param: T
-
-
-@fixture
-def temp_file_path_() -> Generator[Path, None, None]:
- with create_temp_file_path() as output:
- yield output
-
-
-@fixture
-def temp_path_() -> Generator[Path, None, None]:
- with temp_dir(get_temp_dir()) as output:
- yield output
-
-
-def create_temp_file_path(cleanup: bool = True) -> ContextManager[Path]:
- @contextmanager
- def context() -> Generator[Path, None, None]:
- random_id = uuid4()
- output_path = os.path.join(get_temp_dir(), str(random_id))
- try:
- yield Path(output_path)
- finally:
- if cleanup and os.path.exists(output_path):
- if os.path.isfile(output_path):
- os.remove(output_path)
- else:
- shutil.rmtree(output_path)
-
- # noinspection PyTypeChecker
- return context()
-
-
-def temp_dir(base_dir: str, is_uuid_concatenated: bool = False) -> ContextManager[Path]:
- @contextmanager
- def context() -> Generator[Path, None, None]:
- random_id = uuid4()
- if is_uuid_concatenated:
- output_path = Path(base_dir.rstrip("/") + "_" + str(random_id))
- else:
- output_path = Path(base_dir) / str(random_id)
- output_path.mkdir(parents=True, exist_ok=True)
- try:
- yield output_path
- finally:
- if output_path.exists():
- try:
- shutil.rmtree(str(output_path))
- except OSError:
- os.unlink(str(output_path))
-
- # noinspection PyTypeChecker
- return context()
-
-
-@asynccontextmanager
-async def async_temp_dir(base_dir: str, is_uuid_concatenated: bool = False) -> AsyncGenerator[Path, None]:
- random_id = uuid4()
- if is_uuid_concatenated:
- output_path = anyio.Path(base_dir.rstrip("/") + "_" + str(random_id))
- else:
- output_path = anyio.Path(base_dir) / str(random_id)
- await output_path.mkdir(parents=True, exist_ok=True)
- try:
- yield Path(output_path)
- finally:
- if await output_path.exists():
- try:
- await asyncio.to_thread(shutil.rmtree, str(output_path))
- except OSError:
- await output_path.unlink()
diff --git a/imbue_tools/imbue_tools/capabilities_data_logging/common.py b/imbue_tools/imbue_tools/capabilities_data_logging/common.py
@@ -1,83 +0,0 @@
-import abc
-import datetime
-import subprocess
-from enum import StrEnum
-from pathlib import Path
-from typing import Any
-from typing import Iterable
-
-from loguru import logger
-from psycopg.sql import SQL
-
-from imbue_core.pydantic_serialization import SerializableModel
-
-IMBUE_AUTOMATIC_TESTING_ORGANIZATION_ID = "imbue-automatic-testing"
-NEON_PROJECT_ID = (
- "holy-butterfly-05886102" # This is the ID of the crafty project in neon.tech. TODO: reuse this for now
-)
-
-
-# TODO: this should be shared with `sculptor` but there's not a great shared module right now.
-# Make sure if this is updated the value in `sculptor` also gets updated
-# Path where imbue verify will log data and logged data will be expected to be found
-CAPABILITIES_DATA_LOGGING_PATH = Path("/tmp/sculptor/capabilities_logging")
-
-
-class ProductDataBaseEventRecord(SerializableModel, abc.ABC):
- """
- This is a base class for all product data events.
- It contains the fields that are common to all product data events.
- """
-
- id: str
- user_id: str
- organization_id: str
- created_at: datetime.datetime
-
-
-class SculptorDataTables(StrEnum):
- PRODUCT_TOOL_DATA = "PRODUCT_TOOL_DATA"
-
-
-def build_product_feature_data_query_args(
- user_id: str | None = None,
- creation_bounds: tuple[datetime.datetime | None, datetime.datetime | None] = (
- None,
- None,
- ),
- ids: Iterable[str] | None = None,
- filter_test_users: bool = False,
-) -> tuple[Any, tuple[Any, ...]]:
- where_clause: Any = SQL("1 = 1")
- where_args: tuple[Any, ...] = ()
- start, end = creation_bounds
- if start is not None:
- where_clause = SQL("{} AND {}").format(where_clause, SQL("created_at >= %s"))
- where_args += (start,)
- if end is not None:
- where_clause = SQL("{} AND {}").format(where_clause, SQL("created_at <= %s"))
- where_args += (end,)
- if user_id is not None:
- where_clause = SQL("{} AND {}").format(where_clause, SQL("user_id = %s"))
- where_args += (user_id,)
- if ids is not None:
- where_clause = SQL("{} AND {}").format(where_clause, SQL("id = ANY(%s)"))
- where_args += (ids,)
- if filter_test_users:
- where_clause = SQL("{} AND NOT {}").format(where_clause, SQL("organization_id = %s"))
- where_args += (IMBUE_AUTOMATIC_TESTING_ORGANIZATION_ID,)
- return where_clause, where_args
-
-
-UNKNOWN_USER_NAME = "unknown"
-
-
-def get_current_user_name() -> str:
- try:
- possible_username = subprocess.check_output(["git", "config", "user.name"], universal_newlines=True).strip()
- assert possible_username != ""
- return possible_username
- except subprocess.CalledProcessError:
- pass
- logger.info("Using UNKNOWN_USER_NAME")
- return UNKNOWN_USER_NAME
diff --git a/imbue_tools/imbue_tools/capabilities_data_logging/data_types.py b/imbue_tools/imbue_tools/capabilities_data_logging/data_types.py
@@ -1,250 +0,0 @@
-"""
-Any fields in this file that are optional are not strictly required to be present in the database.
-Any fields that are non-optional are required and should not be changed without talking to capabilities eval group.
-Much of this code is taken from an unmerged branch `pranali/backwards_compatibility`.
-The write_code event is also supported in the minimal format in that branch.
-"""
-
-import uuid
-from datetime import datetime
-from datetime import timezone
-from enum import StrEnum
-from typing import Annotated
-from typing import Any
-from typing import Callable
-from typing import Self
-from typing import TypeVar
-
-from pydantic import ConfigDict
-from pydantic import Field
-from pydantic import PlainValidator
-from pydantic import ValidationError
-from pydantic import field_validator
-from pydantic import model_validator
-
-from imbue_core.agents.configs import LanguageModelGenerationConfig
-from imbue_core.common import generate_id
-from imbue_core.data_types import IdentifiedVerifyIssue
-from imbue_core.data_types import LLMResponse
-from imbue_core.frozen_utils import FrozenDict
-from imbue_core.frozen_utils import empty_mapping
-from imbue_core.nested_evolver import assign
-from imbue_core.nested_evolver import chill
-from imbue_core.nested_evolver import evolver
-from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.repo_state import RepoState
-from imbue_core.sculptor.state.messages import ConversationMessageUnion
-from imbue_tools.repo_utils.context_prefix import SubrepoContext
-from imbue_tools.types.imbue_verify_config import ImbueVerifyConfig
-
-TypeInput = TypeVar("TypeInput")
-TypeOutput = TypeVar("TypeOutput")
-
-
-class UnknownFeatureType(Exception):
- """Exception raised when an unknown feature type is encountered."""
-
-
-class LoggedFeatureType(StrEnum):
- VERIFY_EXCEPTION = "VERIFY_EXCEPTION"
- COMMAND_RUN = "COMMAND_RUN"
- ISSUE_FEEDBACK = "ISSUE_FEEDBACK"
- UNKNOWN = "UNKNOWN"
-
-
-class CommandType(StrEnum):
- IMBUE_VERIFY = "IMBUE_VERIFY"
-
-
-# TODO this is WEIRD
-EVENT_BY_LOGGED_FEATURE_TYPE: dict[LoggedFeatureType, Callable[[], type["CapabilitiesLoggedEvent"]]] = {
- LoggedFeatureType.VERIFY_EXCEPTION: lambda: ImbueVerifyEvent,
- LoggedFeatureType.COMMAND_RUN: lambda: ImbueVerifyEvent,
- LoggedFeatureType.ISSUE_FEEDBACK: lambda: IssueFeedbackReport,
-}
-
-# TODO: had to pull in code from crafty in here to make this work, revisit what needs to stay, what should become shared, what we don't need
-
-
-def make_string_safe_for_formatting(s: str) -> str:
- """
- Make a string safe for things like str.format()
- """
- # Replace each '{' with '{{' and each '}' with '}}'
- return s.replace("{", "{{").replace("}", "}}")
-
-
-class IssueKey(SerializableModel):
- # TODO: this should likely be shared with the product code in v1
- issue_type: CommandType
- # this should NOT contain line numbers, as we want it to be stable across changes as much as possible
- # NOTE: we do some initial formatting to avoid issues around message containing code
- message: Annotated[str, PlainValidator(make_string_safe_for_formatting)]
-
- # NOTE: this is the error code for pyre, ruff, and imbue_verify, and the test name for pytest
- error_type: str | None = None
-
- def commit_message(self) -> str:
- return f"Fix {self.issue_type} issue: {self.message[:20]}"
-
-
-class CapabilitiesLoggedEvent(SerializableModel):
- # TODO: Maybe convert empty optional strings and tuples and other datatypes to be Nones so there is only one notion of emptiness
- # Though in some cases there actually is a difference: eg current_issues not existing or there being 0
- # Null values are generally going to correspond to (possibly intentionally) 'missing' data
- """
- Most types in this class are optional since they may not be present for every type of event.
- In the future additional fields may be added to this class to support new events, but they should be optional.
- """
-
- model_config = ConfigDict(frozen=True)
- created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
- id: str = Field(default_factory=lambda: uuid.uuid4().hex)
- user_id: str
- organization_id: str
- git_url: str | None = None
- git_hash: str | None = None
- subrepo_context: SubrepoContext | None = None
- instruction_context: SubrepoContext | None = None
- conversation_history: tuple[ConversationMessageUnion, ...] | None = None
-
- server_version: FrozenDict[str, Any] = Field(default_factory=empty_mapping)
-
- feature_name: LoggedFeatureType | None = None
- repo_state: RepoState | None = None
-
- # TODO: Add configuration settings about how sculptor was started
-
- # Command run specific fields
- diff: str | None = None
- command_type: CommandType | None = None
- task_description: str | None = None
- # For feedback
- issue_key: IssueKey | None = None
-
- # Output fields
- has_output: bool = False
- output_completion_time: datetime | None = None
- llm_response: str | None = Field(default=None, deprecated=True)
- llm_responses: tuple[LLMResponse, ...] | None = None
- # Command run output fields
- issues: tuple[IdentifiedVerifyIssue, ...] | None = None
-
- # Feedback fields
- feedback_rating: str | None = None
- feedback_text: str | None = None
-
- # Imbue verify specific fields
- generation_config: LanguageModelGenerationConfig | None = None
- imbue_verify_config: ImbueVerifyConfig | None = None
-
- # pyre-ignore[56]: pyre's stubs don't match pydantic v2 decorator signatures
- @field_validator("server_version", mode="before")
- @classmethod
- def validate_server_version(cls, v: Any) -> Any:
- if isinstance(v, dict):
- return FrozenDict(v)
- return v
-
- @classmethod
- def build_from_json(cls, json_data: str) -> "CapabilitiesLoggedEvent":
- # mypy is unhappy if this is -> Self because of event = event_type.build_from_json(json_data) below
- event = cls.model_validate_json(json_data)
- if event.feature_name:
- feature_name = event.feature_name
- if feature_name in EVENT_BY_LOGGED_FEATURE_TYPE:
- event_type = EVENT_BY_LOGGED_FEATURE_TYPE[feature_name]()
- try:
- event = event_type.build_from_json(json_data)
- except ValidationError as e:
- print(e)
- return event
-
- def build_new_event_with_outputs(self, outputs: Any) -> Self:
- raise NotImplementedError("Should be implemented by subclasses")
-
-
-class ImbueVerifyUsage(SerializableModel):
- model_config = ConfigDict(frozen=True)
-
- num_llm_calls: int
- total_cost: float
- total_input_tokens: int
- total_output_tokens: int
- total_cache_creation_tokens: int
- total_cache_read_tokens: int
-
-
-class CostedImbueVerifyEvent(SerializableModel):
- model_config = ConfigDict(frozen=True)
- event: "ImbueVerifyEvent"
- usage: ImbueVerifyUsage
-
-
-class ImbueVerifyEvent(CapabilitiesLoggedEvent):
- """
- Events for `imbue_verify`.
- """
-
- diff: str
- command_type: CommandType
- feature_name: LoggedFeatureType = LoggedFeatureType.COMMAND_RUN
-
- task_description: str
- generation_config: LanguageModelGenerationConfig
- imbue_verify_config: ImbueVerifyConfig = Field(default_factory=ImbueVerifyConfig)
- server_version: FrozenDict[str, Any] = Field(default_factory=empty_mapping)
- # TODO: Get the direct llm response for command run events as well
-
- exception_name: str | None = None
-
- # pyre-ignore[56]: pyre's stubs don't match pydantic v2 decorator signatures
- @model_validator(mode="after")
- def ensure_reasonable_output(self) -> Self:
- if self.has_output:
- if self.issues is None:
- raise ValueError("If has_output is True, there must be some issues")
- return self
-
- # TODO the only sites at which this is constructed have this info available.
- # should simplify by providing it at construction time rather than using evolver
- def build_new_event_with_outputs(
- self,
- outputs: tuple[tuple[IdentifiedVerifyIssue, ...], tuple[LLMResponse, ...], str | None],
- ) -> Self:
- issues, llm_responses, git_url = outputs
- event_evolver = evolver(self)
- assign(event_evolver.issues, lambda: issues)
- assign(event_evolver.llm_responses, lambda: llm_responses)
- assign(event_evolver.has_output, lambda: True)
- assign(event_evolver.id, lambda: generate_id())
- assign(event_evolver.output_completion_time, lambda: datetime.now(timezone.utc))
- assign(event_evolver.git_url, lambda: git_url)
- event_with_outputs = chill(event_evolver)
- return event_with_outputs
-
- @classmethod
- def build_from_json(cls, json_data: str) -> Self:
- return cls.model_validate_json(json_data)
-
-
-class IssueFeedbackReport(CapabilitiesLoggedEvent):
- # TODO: this is copied and not updated from crafty
- issue_key: IssueKey
- device_id: str | None = None
- session_id: str | None = None
- browser_id: str | None = None
- tab_id: str | None = None
- feature_name: LoggedFeatureType = LoggedFeatureType.ISSUE_FEEDBACK
-
- # pyre-ignore[56]: pyre's stubs don't match pydantic v2 decorator signatures
- @model_validator(mode="after")
- def has_some_feedback(self) -> Self:
- if self.feedback_rating is None and self.feedback_text is None:
- raise ValueError("At least one of feedback_rating or feedback_text must be set")
- return self
-
- @classmethod
- def build_from_json(cls, json_data: str) -> "IssueFeedbackReport":
- event = IssueFeedbackReport.model_validate_json(json_data)
- return event
diff --git a/imbue_tools/imbue_tools/get_conversation_history/get_conversation_history.py b/imbue_tools/imbue_tools/get_conversation_history/get_conversation_history.py
@@ -6,10 +6,10 @@ from loguru import logger
from pydantic import TypeAdapter
from pydantic import ValidationError
-from imbue_core.sculptor.state.chat_state import ContentBlockTypes
-from imbue_core.sculptor.state.messages import ChatInputUserMessage
-from imbue_core.sculptor.state.messages import ConversationMessageUnion
-from imbue_core.sculptor.state.messages import ResponseBlockAgentMessage
+from vet_types.chat_state import ContentBlockTypes
+from vet_types.messages import ChatInputUserMessage
+from vet_types.messages import ConversationMessageUnion
+from vet_types.messages import ResponseBlockAgentMessage
CONVERSATION_FILE_ENV_VAR = "CONVERSATION_FILE"
TASK_SOURCE_BRANCH_ENV_VAR = "TASK_SOURCE_BRANCH"
diff --git a/imbue_tools/imbue_tools/get_conversation_history/input_data_types.py b/imbue_tools/imbue_tools/get_conversation_history/input_data_types.py
@@ -4,7 +4,7 @@ from typing import TypeVar
from pydantic import model_validator
from imbue_core.pydantic_serialization import SerializableModel
-from imbue_core.sculptor.state.messages import ConversationMessageUnion
+from vet_types.messages import ConversationMessageUnion
class IdentifierInputsMissingError(Exception):
diff --git a/imbue_tools/imbue_tools/util_prompts/goal_from_conversation.py b/imbue_tools/imbue_tools/util_prompts/goal_from_conversation.py
@@ -5,7 +5,7 @@ from imbue_core.agents.llm_apis.build_apis import build_language_model_from_conf
from imbue_core.agents.llm_apis.data_types import CostedLanguageModelResponse
from imbue_core.agents.llm_apis.data_types import LanguageModelGenerationParams
from imbue_core.itertools import only
-from imbue_core.sculptor.state.messages import ConversationMessageUnion
+from vet_types.messages import ConversationMessageUnion
from imbue_tools.get_conversation_history.get_conversation_history import (
format_conversation_history_for_prompt,
)
diff --git a/imbue_verify/api.py b/imbue_verify/api.py
@@ -10,7 +10,7 @@ from loguru import logger
from imbue_core.data_types import IdentifiedVerifyIssue
from imbue_core.data_types import IssueIdentificationDebugInfo
-from imbue_core.sculptor.state.messages import ConversationMessageUnion
+from vet_types.messages import ConversationMessageUnion
from imbue_tools.get_conversation_history.get_conversation_history import (
ConversationLoadingError,
)
diff --git a/imbue_verify/cli/main.py b/imbue_verify/cli/main.py
@@ -12,7 +12,6 @@ from pathlib import Path
from loguru import logger
from imbue_core.data_types import IssueCode
-from imbue_core.log_utils import ensure_core_log_levels_configured
from imbue_tools.get_conversation_history.get_conversation_history import (
parse_conversation_history,
)
@@ -329,8 +328,6 @@ def apply_config_preset(args: argparse.Namespace, preset: CliConfigPreset) -> ar
def main(argv: list[str] | None = None) -> int:
- ensure_core_log_levels_configured()
-
parser = create_parser()
args = parser.parse_args(argv)
diff --git a/imbue_verify/conftest.py b/imbue_verify/conftest.py
@@ -4,7 +4,6 @@ from typing import Generator
import pytest
from imbue_core.async_monkey_patches_test import explode_on_error # noqa: F401
-from imbue_core.log_utils import ensure_core_log_levels_configured
from imbue_core.test_repo_utils import make_simple_test_git_repo
simple_test_git_repo = pytest.fixture(make_simple_test_git_repo)
@@ -23,9 +22,3 @@ def always_explode_on_error(
use the `expect_exact_logged_errors` decorator to suppress the logging of those errors.
"""
yield
-
-
-# copied from imbue_core/conftest.py
-@pytest.fixture(scope="session", autouse=True)
-def setup_logging_and_secrets() -> None:
- ensure_core_log_levels_configured()
diff --git a/imbue_verify/issue_identifiers/common.py b/imbue_verify/issue_identifiers/common.py
@@ -22,7 +22,6 @@ from imbue_core.agents.agent_api.data_types import READ_ONLY_TOOLS
from imbue_core.agents.llm_apis.anthropic_data_types import AnthropicCachingInfo
from imbue_core.agents.llm_apis.data_types import CostedLanguageModelResponse
from imbue_core.async_monkey_patches import log_exception
-from imbue_core.constants import ExceptionPriority
from imbue_core.data_types import ConfidenceScore
from imbue_core.data_types import IdentifiedVerifyIssue
from imbue_core.data_types import InvocationInfo
@@ -172,7 +171,6 @@ def convert_generated_issue_to_identified_issue(
log_exception(
e,
"Error processing issue data: {issue_data}, skipping",
- priority=ExceptionPriority.LOW_PRIORITY,
issue_data=issue_data,
)
return None
diff --git a/imbue_verify/issue_identifiers/harnesses/conversation_single_prompt_test.py b/imbue_verify/issue_identifiers/harnesses/conversation_single_prompt_test.py
@@ -1,12 +1,12 @@
import pytest
from imbue_core.data_types import IssueCode
-from imbue_core.ids import AssistantMessageID
-from imbue_core.sculptor.state.chat_state import TextBlock
-from imbue_core.sculptor.state.messages import AgentMessageSource
-from imbue_core.sculptor.state.messages import ChatInputUserMessage
-from imbue_core.sculptor.state.messages import LLMModel
-from imbue_core.sculptor.state.messages import ResponseBlockAgentMessage
+from vet_types.chat_state import TextBlock
+from vet_types.ids import AssistantMessageID
+from vet_types.messages import AgentMessageSource
+from vet_types.messages import ChatInputUserMessage
+from vet_types.messages import LLMModel
+from vet_types.messages import ResponseBlockAgentMessage
from imbue_tools.get_conversation_history.input_data_types import ConversationInputs
from imbue_tools.get_conversation_history.input_data_types import IdentifierInputs
from imbue_tools.get_conversation_history.input_data_types import (
diff --git a/imbue_verify/repo_utils.py b/imbue_verify/repo_utils.py
@@ -2,7 +2,6 @@ from pathlib import Path
from imbue_core.async_monkey_patches import log_exception
from imbue_core.computing_environment.data_types import RunCommandError
-from imbue_core.constants import ExceptionPriority
from imbue_core.simple_git import SyncLocalGitRepo
from imbue_tools.repo_utils.find_relative_to import find_relative_to_commit_hash
from imbue_verify.errors import GitException
@@ -51,9 +50,8 @@ def get_code_to_check(relative_to: str, repo_path: Path) -> tuple[str, str, str]
except RunCommandError as e:
log_exception(
e,
- "Skipping untracked file we couldn't diff.",
+ "Skipping untracked file we couldn't diff: {file_path}",
file_path=file_path,
- priority=ExceptionPriority.LOW_PRIORITY,
)
try:
@@ -62,9 +60,8 @@ def get_code_to_check(relative_to: str, repo_path: Path) -> tuple[str, str, str]
except RunCommandError as e:
log_exception(
e,
- "Skipping untracked file we couldn't diff (no binary).",
+ "Skipping untracked file we couldn't diff (no binary): {file_path}",
file_path=file_path,
- priority=ExceptionPriority.LOW_PRIORITY,
)
# Add untracked files to unstaged changes and the combined diff
diff --git a/pyproject.toml b/pyproject.toml
@@ -18,6 +18,7 @@ dependencies = [
"pytest",
"syrupy",
"together>=1.5.35",
+ "vet_types",
]
requires-python = ">=3.11"
@@ -33,6 +34,7 @@ include = ["imbue_verify*"]
[tool.uv.sources]
imbue_core = { path = "./imbue_core", editable = true }
imbue_tools = { path = "./imbue_tools", editable = true }
+vet_types = { path = "./vet_types", editable = true }
[dependency-groups]
dev = [
diff --git a/uv.lock b/uv.lock
@@ -988,6 +988,7 @@ dependencies = [
{ name = "pytest" },
{ name = "syrupy" },
{ name = "together" },
+ { name = "vet-types" },
]
[package.dev-dependencies]
@@ -1008,6 +1009,7 @@ requires-dist = [
{ name = "pytest" },
{ name = "syrupy" },
{ name = "together", specifier = ">=1.5.35" },
+ { name = "vet-types", editable = "vet_types" },
]
[package.metadata.requires-dev]
@@ -2767,6 +2769,23 @@ wheels = [
]
[[package]]
+name = "vet-types"
+version = "0.1.0"
+source = { editable = "vet_types" }
+dependencies = [
+ { name = "imbue-core" },
+ { name = "pydantic" },
+ { name = "typeid-python" },
+]
+
+[package.metadata]
+requires-dist = [
+ { name = "imbue-core" },
+ { name = "pydantic" },
+ { name = "typeid-python" },
+]
+
+[[package]]
name = "websockets"
version = "15.0.1"
source = { registry = "https://pypi.org/simple" }
diff --git a/vet_types/pyproject.toml b/vet_types/pyproject.toml
@@ -0,0 +1,20 @@
+[build-system]
+requires = ["setuptools", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "vet_types"
+version = "0.1.0"
+description = "Type definitions for VET (imbue-verify) without telemetry dependencies"
+dependencies = [
+ "pydantic",
+ "imbue_core",
+ "typeid-python",
+]
+requires-python = ">=3.11"
+
+[tool.setuptools]
+package-data.vet_types = ["py.typed"]
+
+[tool.setuptools.packages.find]
+include = ["vet_types*"]
diff --git a/vet_types/vet_types/__init__.py b/vet_types/vet_types/__init__.py
@@ -0,0 +1,33 @@
+"""Shared type definitions for imbue_verify."""
+
+from vet_types.chat_state import ContentBlock
+from vet_types.chat_state import ContentBlockTypes
+from vet_types.chat_state import TextBlock
+from vet_types.chat_state import ToolResultBlock
+from vet_types.chat_state import ToolUseBlock
+from vet_types.ids import AgentMessageID
+from vet_types.ids import AssistantMessageID
+from vet_types.ids import TaskID
+from vet_types.ids import ToolUseID
+from vet_types.messages import AgentMessageSource
+from vet_types.messages import ChatInputUserMessage
+from vet_types.messages import ConversationMessageUnion
+from vet_types.messages import LLMModel
+from vet_types.messages import ResponseBlockAgentMessage
+
+__all__ = [
+ "AgentMessageID",
+ "AgentMessageSource",
+ "AssistantMessageID",
+ "ChatInputUserMessage",
+ "ContentBlock",
+ "ContentBlockTypes",
+ "ConversationMessageUnion",
+ "LLMModel",
+ "ResponseBlockAgentMessage",
+ "TaskID",
+ "TextBlock",
+ "ToolResultBlock",
+ "ToolUseBlock",
+ "ToolUseID",
+]
diff --git a/vet_types/vet_types/chat_state.py b/vet_types/vet_types/chat_state.py
@@ -0,0 +1,153 @@
+"""Chat state types for imbue_verify."""
+
+from typing import Annotated
+from typing import Any
+from typing import Literal
+
+from pydantic import Field
+from pydantic import Tag
+
+from imbue_core.pydantic_serialization import SerializableModel
+from imbue_core.pydantic_serialization import build_discriminator
+from vet_types.ids import TaskID
+from vet_types.ids import ToolUseID
+
+
+# ========================
+# Chat Type Definitions
+# ========================
+
+
+class ContentBlock(SerializableModel):
+ object_type: str = Field(..., description="Type discriminator for content blocks")
+ type: str = Field(..., description="Type discriminator for content blocks")
+
+
+class TextBlock(ContentBlock):
+ object_type: str = "TextBlock"
+ type: Literal["text"] = "text"
+ text: str
+
+
+class ContextSummaryBlock(ContentBlock):
+ object_type: str = "ContextSummaryBlock"
+ type: Literal["context_summary"] = "context_summary"
+ text: str
+
+
+class ResumeResponseBlock(ContentBlock):
+ object_type: str = "ResumeResponseBlock"
+ type: Literal["resume_response"] = "resume_response"
+
+
+class ForkedToBlock(ContentBlock):
+ object_type: str = "ForkedToBlock"
+ type: Literal["forked_to"] = "forked_to"
+ forked_to_task_id: TaskID
+
+
+class ForkedFromBlock(ContentBlock):
+ object_type: str = "ForkedFromBlock"
+ type: Literal["forked_from"] = "forked_from"
+ forked_from_task_id: TaskID
+
+
+class CommandBlock(ContentBlock):
+ object_type: str = "CommandBlock"
+ type: Literal["command"] = "command"
+ command: str
+ is_automated: bool = Field(default=False, description="Whether the command is automated")
+
+
+ToolInput = dict[str, Any]
+
+
+class ToolUseBlock(ContentBlock):
+ object_type: str = "ToolUseBlock"
+ type: Literal["tool_use"] = "tool_use"
+ id: ToolUseID = Field(..., description="Unique identifier for this tool use")
+ name: str = Field(..., description="Name of the tool being used")
+ input: ToolInput = Field(default_factory=ToolInput, description="Input parameters for the tool")
+
+
+class ToolResultContent(SerializableModel):
+ """Base class for tool result content with type discriminator"""
+
+ content_type: str = Field(..., description="Type discriminator for tool result content")
+
+
+class SimpleToolContent(ToolResultContent):
+ """Generic tool content, or information to reconstruct diff tool content"""
+
+ content_type: Literal["simple"] = "simple"
+ text: str = Field(..., description="The tool output as text")
+ tool_input: ToolInput
+ tool_content: Any
+
+
+class GenericToolContent(ToolResultContent):
+ """Generic content for most tools - just a string"""
+
+ content_type: Literal["generic"] = "generic"
+ text: str = Field(..., description="The tool output as text")
+
+
+class DiffToolContent(ToolResultContent):
+ """Content for diff-producing tools (Write, Edit, MultiEdit)"""
+
+ content_type: Literal["diff"] = "diff"
+ diff: str = Field(..., description="The git diff string")
+ file_path: str = Field(..., description="The file that was modified")
+
+
+ToolResultContentType = GenericToolContent | DiffToolContent
+
+
+class ToolResultBlock(ContentBlock):
+ object_type: str = "ToolResultBlock"
+ type: Literal["tool_result"] = "tool_result"
+ tool_use_id: ToolUseID = Field(..., description="ID of the corresponding tool use")
+ tool_name: str = Field(..., description="Name of the tool that was used")
+ invocation_string: str = Field(..., description="String representation of how the tool was invoked")
+ content: ToolResultContentType = Field(..., description="Result content from the tool execution")
+ is_error: bool = Field(default=False, description="Whether the tool execution resulted in an error")
+
+
+class WarningBlock(ContentBlock):
+ object_type: str = "WarningBlock"
+ type: Literal["warning"] = "warning"
+ message: str = Field(..., description="Warning message")
+ traceback: str | None = Field(..., description="Warning traceback")
+ warning_type: str | None = Field(..., description="Type of warning, i.e. name of the exception that was raised")
+
+
+class ErrorBlock(ContentBlock):
+ object_type: str = "ErrorBlock"
+ type: Literal["error"] = "error"
+ message: str = Field(..., description="Error message")
+ traceback: str = Field(..., description="Error traceback")
+ error_type: str = Field(..., description="Type of error, i.e. name of the exception that was raised")
+
+
+class FileBlock(ContentBlock):
+ object_type: str = "FileBlock"
+ type: Literal["file"] = "file"
+ source: str = Field(..., description="A file path on the users local machine.")
+
+
+ContentBlockTypes = Annotated[
+ (
+ Annotated[TextBlock, Tag("TextBlock")]
+ | Annotated[CommandBlock, Tag("CommandBlock")]
+ | Annotated[ToolUseBlock, Tag("ToolUseBlock")]
+ | Annotated[ToolResultBlock, Tag("ToolResultBlock")]
+ | Annotated[ErrorBlock, Tag("ErrorBlock")]
+ | Annotated[WarningBlock, Tag("WarningBlock")]
+ | Annotated[ContextSummaryBlock, Tag("ContextSummaryBlock")]
+ | Annotated[ResumeResponseBlock, Tag("ResumeResponseBlock")]
+ | Annotated[FileBlock, Tag("FileBlock")]
+ | Annotated[ForkedToBlock, Tag("ForkedToBlock")]
+ | Annotated[ForkedFromBlock, Tag("ForkedFromBlock")]
+ ),
+ build_discriminator(),
+]
diff --git a/vet_types/vet_types/ids.py b/vet_types/vet_types/ids.py
@@ -0,0 +1,101 @@
+"""ID types for imbue_verify."""
+
+from abc import ABC
+from typing import Any
+from typing import Self
+
+from pydantic import GetCoreSchemaHandler
+from pydantic_core import core_schema
+from typeid import TypeID
+from typeid import get_prefix_and_suffix
+
+
+class TypeIDPrefixMismatchError(Exception):
+ pass
+
+
+class ObjectID(TypeID, ABC):
+ """
+ A convenience class for string-based object IDs.
+
+ Use in place of strings for IDs. (We don't use raw UUIDs because they are not supported by SQLite.)
+
+ Use `tag` to prefix the ID with the ID type. (We don't use `prefix` because it's already taken by the ancestor class.)
+ """
+
+ # Override this in subclasses to specify the ID type.
+ tag: str = "oid"
+
+ def __init__(self, value: str | None = None) -> None:
+ if value is not None:
+ prefix, suffix = get_prefix_and_suffix(value)
+ # For convenience, don't require the caller to strip the prefix from existing IDs.
+ if prefix is not None:
+ if prefix != self.tag:
+ raise TypeIDPrefixMismatchError(f"Expected prefix '{self.tag}', got '{prefix}'")
+ value = suffix
+ super().__init__(self.tag, value)
+
+ @classmethod
+ def __get_pydantic_core_schema__(cls, source_type: type, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
+ """
+ Support transparently deserializing strings into ObjectID instances and vice versa.
+ """
+ return core_schema.no_info_before_validator_function(
+ lambda raw_value: (cls(raw_value) if isinstance(raw_value, str) else raw_value),
+ core_schema.union_schema(
+ [
+ core_schema.is_instance_schema(cls),
+ core_schema.str_schema(),
+ ]
+ ),
+ serialization=core_schema.plain_serializer_function_ser_schema(
+ lambda instance: str(instance), return_schema=core_schema.str_schema()
+ ),
+ )
+
+
+class TaskID(ObjectID):
+ tag: str = "tsk"
+
+
+class AgentMessageID(ObjectID):
+ tag: str = "agm"
+
+
+class NonEmptyStr(str):
+ def __new__(cls: type[Self], *args: Any, **kwargs: Any) -> Self:
+ value = str.__new__(cls, *args, **kwargs)
+ if len(value) == 0:
+ raise ValueError("NonEmptyStr cannot be empty")
+ return value
+
+ @classmethod
+ def __get_pydantic_core_schema__(cls, source_type: type, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
+ """
+ Support transparently deserializing strings into ObjectID instances and vice versa.
+ """
+ return core_schema.no_info_before_validator_function(
+ lambda raw_value: (cls(raw_value) if isinstance(raw_value, str) else raw_value),
+ core_schema.union_schema(
+ [
+ core_schema.is_instance_schema(cls),
+ core_schema.str_schema(),
+ ]
+ ),
+ serialization=core_schema.plain_serializer_function_ser_schema(
+ lambda instance: str(instance), return_schema=core_schema.str_schema()
+ ),
+ )
+
+
+class ExternalID(NonEmptyStr):
+ pass
+
+
+class AssistantMessageID(ExternalID):
+ pass
+
+
+class ToolUseID(ExternalID):
+ pass
diff --git a/vet_types/vet_types/messages.py b/vet_types/vet_types/messages.py
@@ -0,0 +1,132 @@
+"""Message types for imbue_verify conversation history.
+
+These are simplified versions that avoid dependencies on external telemetry libraries.
+"""
+
+import datetime
+from enum import StrEnum
+from typing import Annotated
+from typing import Literal
+
+from pydantic import Field
+from pydantic import Tag
+
+from imbue_core.pydantic_serialization import SerializableModel
+from imbue_core.pydantic_serialization import build_discriminator
+from imbue_core.time_utils import get_current_time
+from vet_types.chat_state import ContentBlockTypes
+from vet_types.ids import AgentMessageID
+from vet_types.ids import AssistantMessageID
+
+
+class LLMModel(StrEnum):
+ CLAUDE_4_OPUS = "CLAUDE-4-OPUS"
+ CLAUDE_4_SONNET = "CLAUDE-4-SONNET"
+ CLAUDE_4_HAIKU = "CLAUDE-4-HAIKU"
+ GPT_5_1_CODEX = "GPT-5.1-CODEX"
+ GPT_5_1_CODEX_MINI = "GPT-5.1-CODEX-MINI"
+ GPT_5_1 = "GPT-5.1"
+ GPT_5_2 = "GPT-5.2"
+
+
+# ==================================
+# Backend Message Type Definitions
+# ==================================
+
+
+class AgentMessageSource(StrEnum):
+ """
+ Messages can come the AGENT (in-container LLM), USER (chat messages & direct interactions),
+ SCULPTOR_SYSTEM (multifaceted sculptor app and service code) and RUNNER (the process
+ controlling a task on the server.)
+ """
+
+ # Messages coming directly from the agent from inside the environment.
+ AGENT = "AGENT"
+
+ # Messages coming directly from a user interacting with the interface, ie chat
+ USER = "USER"
+
+ # Messages coming from sculptor-mediated actions and automations, like local sync updates
+ # or manual sync operations.
+ SCULPTOR_SYSTEM = "SCULPTOR_SYSTEM"
+
+ # Messages coming from the task runner wrapper, such as environment shutdown.
+ RUNNER = "RUNNER"
+
+
+class Message(SerializableModel):
+ """Base class for all messages sent to or from the agent and user."""
+
+ # used to dispatch and discover the type of message
+ object_type: str
+ # the unique ID of the message, used to track it across the system and prevent duplicates.
+ message_id: AgentMessageID = Field(default_factory=AgentMessageID)
+ # the source of the message, which can be either the agent, user, or runner.
+ source: AgentMessageSource
+ # roughly when the message was created, in UTC.
+ approximate_creation_time: datetime.datetime = Field(default_factory=get_current_time)
+
+ @property
+ def is_ephemeral(self) -> bool:
+ raise NotImplementedError("All messages must be subclassed off of PersistentMessage or EphemeralMessage")
+
+
+class PersistentMessage(Message):
+ @property
+ def is_ephemeral(self) -> bool:
+ return False
+
+
+class PersistentUserMessage(PersistentMessage):
+ """
+ One of two base classes for messages sent from the user.
+ Persistent user messages are saved to the database.
+ """
+
+ object_type: str = Field(
+ default="PersistentUserMessage",
+ description="Type discriminator for user messages",
+ )
+ message_id: AgentMessageID = Field(
+ default_factory=AgentMessageID,
+ description="Unique identifier for the user message",
+ )
+ source: AgentMessageSource = Field(default=AgentMessageSource.USER)
+ approximate_creation_time: datetime.datetime = Field(
+ default_factory=get_current_time,
+ description="Approximate UTC timestamp when user message was created",
+ )
+
+
+class ChatInputUserMessage(PersistentUserMessage):
+ object_type: str = Field(default="ChatInputUserMessage")
+ text: str = Field(..., description="User input text content")
+ model_name: LLMModel | None = Field(
+ default=None,
+ description="Selected LLM model for the chat request",
+ )
+ files: list[str] = Field(
+ default_factory=list,
+ description="List of file paths attached to this message",
+ )
+
+
+class PersistentAgentMessage(PersistentMessage):
+ """Base class for messages sent from the agent."""
+
+ source: AgentMessageSource = AgentMessageSource.AGENT
+
+
+class ResponseBlockAgentMessage(PersistentAgentMessage):
+ object_type: str = "ResponseBlockAgentMessage"
+ role: Literal["user", "assistant", "system"]
+ assistant_message_id: AssistantMessageID
+ content: tuple[ContentBlockTypes, ...]
+
+
+ConversationMessageUnion = Annotated[
+ Annotated[ResponseBlockAgentMessage, Tag("ResponseBlockAgentMessage")]
+ | Annotated[ChatInputUserMessage, Tag("ChatInputUserMessage")],
+ build_discriminator(),
+]