Skip to content

Code

app.agents.agent_system

Agent system utilities for orchestrating multi-agent workflows.

This module provides functions and helpers to create, configure, and run agent systems using Pydantic AI. It supports delegation of tasks to research, analysis, and synthesis agents, and manages agent configuration, environment setup, and execution. Args: provider (str): The name of the provider. provider_config (ProviderConfig): Configuration settings for the provider. api_key (str): API key for authentication with the provider. prompts (dict[str, str]): Configuration for prompts. include_researcher (bool): Flag to include the researcher agent. include_analyst (bool): Flag to include the analyst agent. include_synthesiser (bool): Flag to include the synthesiser agent. query (str | list[dict[str, str]]): The query or messages for the agent. chat_config (ChatConfig): The configuration object for agents and providers. usage_limits (UsageLimits): Usage limits for agent execution. pydantic_ai_stream (bool): Whether to use Pydantic AI streaming.

Functions:

Name Description
get_manager

Initializes and returns a manager agent with the specified configuration.

run_manager

Asynchronously runs the manager agent with the given query and provider.

setup_agent_env

Sets up the environment for an agent by configuring provider settings, prompts, API key, and usage limits.

Classes

Functions

get_manager(provider, provider_config, api_key, prompts, include_researcher=False, include_analyst=False, include_synthesiser=False, enable_review_tools=False)

Initializes and returns a Agent manager with the specified configuration. Args: provider (str): The name of the provider. provider_config (ProviderConfig): Configuration settings for the provider. api_key (str): API key for authentication with the provider. prompts (PromptsConfig): Configuration for prompts. include_researcher (bool, optional): Flag to include analyst model. Defaults to False. include_analyst (bool, optional): Flag to include analyst model. Defaults to False. include_synthesiser (bool, optional): Flag to include synthesiser model. Defaults to False. Returns: Agent: The initialized Agent manager.

Source code in src/app/agents/agent_system.py
def get_manager(
    provider: str,
    provider_config: ProviderConfig,
    api_key: str | None,
    prompts: dict[str, str],
    include_researcher: bool = False,
    include_analyst: bool = False,
    include_synthesiser: bool = False,
    enable_review_tools: bool = False,
) -> Agent[None, BaseModel]:
    """
    Initializes and returns a Agent manager with the specified configuration.
    Args:
        provider (str): The name of the provider.
        provider_config (ProviderConfig): Configuration settings for the provider.
        api_key (str): API key for authentication with the provider.
        prompts (PromptsConfig): Configuration for prompts.
        include_researcher (bool, optional): Flag to include analyst model.
            Defaults to False.
        include_analyst (bool, optional): Flag to include analyst model.
            Defaults to False.
        include_synthesiser (bool, optional): Flag to include synthesiser model.
            Defaults to False.
    Returns:
        Agent: The initialized Agent manager.
    """

    # FIXME context manager try-catch
    # with error_handling_context("get_manager()"):
    model_config = EndpointConfig.model_validate(
        {
            "provider": provider,
            "prompts": prompts,
            "api_key": api_key,
            "provider_config": provider_config,
        }
    )
    models = get_models(
        model_config, include_researcher, include_analyst, include_synthesiser
    )
    manager = _create_manager(prompts, models, provider, enable_review_tools)

    # Conditionally add review tools based on flag
    def conditionally_add_review_tools(
        manager: Agent[None, BaseModel],
        enable: bool = False,
        max_content_length: int = 15000,
    ):
        """Conditionally add review persistence tools to the manager.

        Args:
            manager: The manager agent to potentially add tools to.
            enable: Flag to determine whether to add review tools.
            max_content_length: The maximum number of characters to include in the
                prompt.
        """
        if enable:
            add_peerread_review_tools_to_manager(
                manager, max_content_length=max_content_length
            )
        return manager

    max_content_length = provider_config.max_content_length or 15000

    return conditionally_add_review_tools(
        manager,
        enable=enable_review_tools,
        max_content_length=max_content_length,
    )

run_manager(manager, query, provider, usage_limits, pydantic_ai_stream=False) async

Asynchronously runs the manager with the given query and provider, handling errors and printing results. Args: manager (Agent): The system agent responsible for running the query. query (str): The query to be processed by the manager. provider (str): The provider to be used for the query. usage_limits (UsageLimits): The usage limits to be applied during the query execution. pydantic_ai_stream (bool, optional): Flag to enable or disable Pydantic AI stream. Defaults to False. Returns: None

Source code in src/app/agents/agent_system.py
async def run_manager(
    manager: Agent[None, BaseModel],
    query: UserPromptType,
    provider: str,
    usage_limits: UsageLimits | None,
    pydantic_ai_stream: bool = False,
) -> None:
    """
    Asynchronously runs the manager with the given query and provider, handling errors
        and printing results.
    Args:
        manager (Agent): The system agent responsible for running the query.
        query (str): The query to be processed by the manager.
        provider (str): The provider to be used for the query.
        usage_limits (UsageLimits): The usage limits to be applied during the query
            execution.
        pydantic_ai_stream (bool, optional): Flag to enable or disable Pydantic AI
            stream. Defaults to False.
    Returns:
        None
    """

    # FIXME context manager try-catch
    # with out ? error_handling_context("run_manager()"):
    model_name = getattr(manager, "model")._model_name
    mgr_cfg = {"user_prompt": query, "usage_limits": usage_limits}
    logger.info(f"Researching with {provider}({model_name}) and Topic: {query} ...")

    try:
        if pydantic_ai_stream:
            raise NotImplementedError(
                "Streaming currently only possible for Agents with "
                "output_type str not pydantic model"
            )
            # logger.info("Streaming model response ...")
            # result = await manager.run(**mgr_cfg)
            # aync for chunk in result.stream_text():  # .run(**mgr_cfg) as result:
            # async with manager.run_stream(user_prompt=query) as stream:
            #    async for chunk in stream.stream_text():
            #        logger.info(str(chunk))
            # result = await stream.get_result()
        else:
            logger.info("Waiting for model response ...")
            # FIXME deprecated warning manager.run(), query unknown type
            # FIXME [call-overload] error: No overload variant of "run" of "Agent"
            # matches argument type "dict[str, list[dict[str, str]] |
            # Sequence[str | ImageUrl | AudioUrl | DocumentUrl | VideoUrl |
            # BinaryContent] | UsageLimits | None]"
            result = await manager.run(**mgr_cfg)  # type: ignore[reportDeprecated,reportUnknownArgumentType,reportCallOverload,call-overload]
        logger.info(f"Result: {result}")
        # FIXME  # type: ignore
        logger.info(f"Usage statistics: {result.usage()}")  # type: ignore
    except Exception as e:
        logger.error(f"Error in run_manager: {e}")
        raise

setup_agent_env(provider, query, chat_config, chat_env_config)

Sets up the environment for an agent by configuring provider settings, prompts, API key, and usage limits.

Parameters:

Name Type Description Default
provider str

The name of the provider.

required
query UserPromptType

The messages or queries to be sent to the agent.

required
chat_config ChatConfig | BaseModel

The configuration object containing provider and prompt settings.

required
chat_env_config AppEnv

The application environment configuration containing API keys.

required

Returns:

Name Type Description
EndpointConfig EndpointConfig

The configuration object for the agent.

Source code in src/app/agents/agent_system.py
def setup_agent_env(
    provider: str,
    query: UserPromptType,
    chat_config: ChatConfig | BaseModel,
    chat_env_config: AppEnv,
) -> EndpointConfig:
    """
    Sets up the environment for an agent by configuring provider settings, prompts,
    API key, and usage limits.

    Args:
        provider (str): The name of the provider.
        query (UserPromptType): The messages or queries to be sent to the agent.
        chat_config (ChatConfig | BaseModel): The configuration object containing
            provider and prompt settings.
        chat_env_config (AppEnv): The application environment configuration
            containing API keys.

    Returns:
        EndpointConfig: The configuration object for the agent.
    """

    if not isinstance(chat_config, ChatConfig):
        raise TypeError("'chat_config' of invalid type: ChatConfig expected")
    msg: str | None
    # FIXME context manager try-catch
    # with error_handling_context("setup_agent_env()"):
    provider_config = get_provider_config(provider, chat_config.providers)

    prompts = chat_config.prompts
    is_api_key, api_key_msg = get_api_key(provider, chat_env_config)

    # Set up LLM environment with all available API keys
    api_keys = {
        "openai": chat_env_config.OPENAI_API_KEY,
        "anthropic": chat_env_config.ANTHROPIC_API_KEY,
        "gemini": chat_env_config.GEMINI_API_KEY,
        "github": chat_env_config.GITHUB_API_KEY,
        "grok": chat_env_config.GROK_API_KEY,
        "huggingface": chat_env_config.HUGGINGFACE_API_KEY,
        "openrouter": chat_env_config.OPENROUTER_API_KEY,
        "perplexity": chat_env_config.PERPLEXITY_API_KEY,
        "together": chat_env_config.TOGETHER_API_KEY,
    }
    setup_llm_environment(api_keys)

    if provider.lower() != "ollama" and not is_api_key:
        msg = f"API key for provider '{provider}' is not set."
        logger.error(msg)
        raise ValueError(msg)

    # TODO Separate Gemini request into function
    # FIXME GeminiModel not compatible with pydantic-ai OpenAIModel
    # ModelRequest not iterable
    # Input should be 'STOP', 'MAX_TOKENS' or 'SAFETY'
    # [type=literal_error, input_value='MALFORMED_FUNCTION_CALL', input_type=str]
    # For further information visit https://errors.pydantic.dev/2.11/v/literal_error
    # if provider.lower() == "gemini":
    #     if isinstance(query, str):
    #         query = ModelRequest.user_text_prompt(query)
    #     elif isinstance(query, list):  # type: ignore[reportUnnecessaryIsInstance]
    #         # query = [
    #         #    ModelRequest.user_text_prompt(
    #         #        str(msg.get("content", ""))
    #         #    )  # type: ignore[reportUnknownArgumentType]
    #         #    if isinstance(msg, dict)
    #         #    else msg
    #         #    for msg in query
    #         # ]
    #         raise NotImplementedError("Currently conflicting with UserPromptType")
    #     else:
    #         msg = f"Unsupported query type for Gemini: {type(query)}"
    #         logger.error(msg)
    #         raise TypeError(msg)

    # Load usage limits from config instead of hardcoding
    usage_limits = None
    if provider_config.usage_limits is not None:
        usage_limits = UsageLimits(
            request_limit=10, total_tokens_limit=provider_config.usage_limits
        )

    return EndpointConfig.model_validate(
        {
            "provider": provider,
            "query": query,
            "api_key": api_key_msg,
            "prompts": prompts,
            "provider_config": provider_config,
            "usage_limits": usage_limits,
        }
    )

app.agents.llm_model_funs

LLM model functions for integrating with various LLM providers.

This module provides functions to retrieve API keys, provider configurations, and to create model instances for supported LLM providers such as Gemini and OpenAI. It also includes logic for assembling model dictionaries for system agents.

Classes

Functions

get_api_key(provider, chat_env_config)

Retrieve API key from chat env config variable.

Source code in src/app/agents/llm_model_funs.py
def get_api_key(
    provider: str,
    chat_env_config: AppEnv,
) -> tuple[bool, str]:
    """Retrieve API key from chat env config variable."""
    provider = provider.upper()

    # Provider mapping for environment variable keys
    provider_key_mapping = {
        "OPENAI": "OPENAI_API_KEY",
        "ANTHROPIC": "ANTHROPIC_API_KEY",
        "GEMINI": "GEMINI_API_KEY",
        "GITHUB": "GITHUB_API_KEY",
        "GROK": "GROK_API_KEY",
        "HUGGINGFACE": "HUGGINGFACE_API_KEY",
        "OPENROUTER": "OPENROUTER_API_KEY",
        "PERPLEXITY": "PERPLEXITY_API_KEY",
        "TOGETHER": "TOGETHER_API_KEY",
        "OLLAMA": None,  # Ollama doesn't require an API key
    }

    if provider == "OLLAMA":
        return (False, "Ollama does not require an API key.")

    key_name = provider_key_mapping.get(provider)
    if not key_name:
        return (False, f"Provider '{provider}' is not supported.")

    key_content = getattr(chat_env_config, key_name, None)
    if key_content and key_content.strip():
        logger.info(f"Found API key for provider: '{provider}'")
        return (True, key_content)
    else:
        return (
            False,
            f"API key for provider '{provider}' not found in configuration.",
        )

get_models(endpoint_config, include_researcher=False, include_analyst=False, include_synthesiser=False)

Get the models for the system agents.

Parameters:

Name Type Description Default
endpoint_config EndpointConfig

Configuration for the model.

required
include_researcher bool

Whether to include the researcher model.

False
include_analyst bool

Whether to include the analyst model.

False
include_synthesiser bool

Whether to include the synthesiser model.

False

Returns:

Name Type Description
ModelDict ModelDict

A dictionary containing compatible models for the system agents.

Source code in src/app/agents/llm_model_funs.py
def get_models(
    endpoint_config: EndpointConfig,
    include_researcher: bool = False,
    include_analyst: bool = False,
    include_synthesiser: bool = False,
) -> ModelDict:
    """
    Get the models for the system agents.

    Args:
        endpoint_config (EndpointConfig): Configuration for the model.
        include_researcher (bool): Whether to include the researcher model.
        include_analyst (bool): Whether to include the analyst model.
        include_synthesiser (bool): Whether to include the synthesiser model.

    Returns:
        ModelDict: A dictionary containing compatible models for the system
            agents.
    """

    model = _create_llm_model(endpoint_config)
    return ModelDict.model_validate(
        {
            "model_manager": model,
            "model_researcher": model if include_researcher else None,
            "model_analyst": model if include_analyst else None,
            "model_synthesiser": model if include_synthesiser else None,
        }
    )

get_provider_config(provider, providers)

Retrieve configuration settings for the specified provider.

Source code in src/app/agents/llm_model_funs.py
def get_provider_config(
    provider: str, providers: dict[str, ProviderConfig]
) -> ProviderConfig:
    """Retrieve configuration settings for the specified provider."""
    try:
        return providers[provider]
    except KeyError as e:
        msg = get_key_error(str(e))
        logger.error(msg)
        raise KeyError(msg)
    except Exception as e:
        msg = generic_exception(str(e))
        logger.exception(msg)
        raise Exception(msg)

setup_llm_environment(api_keys)

Set up LLM environment variables for API keys.

Parameters:

Name Type Description Default
api_keys dict[str, str]

Dictionary mapping provider names to API keys.

required
Source code in src/app/agents/llm_model_funs.py
def setup_llm_environment(api_keys: dict[str, str]) -> None:
    """
    Set up LLM environment variables for API keys.

    Args:
        api_keys: Dictionary mapping provider names to API keys.
    """
    import os

    # Set environment variables for LLM
    for provider, api_key in api_keys.items():
        if api_key and api_key.strip():
            env_var = f"{provider.upper()}_API_KEY"
            os.environ[env_var] = api_key
            logger.info(f"Set environment variable: {env_var}")

app.agents.peerread_tools

PeerRead agent tools for multi-agent system integration.

This module provides agent tools that enable the manager agent to interact with the PeerRead dataset for paper retrieval, querying, and review evaluation.

Classes

Functions

add_peerread_review_tools_to_manager(manager_agent, max_content_length=15000)

Add PeerRead review generation and persistence tools to the manager agent.

Parameters:

Name Type Description Default
manager_agent Agent[None, BaseModel]

The manager agent to which review tools will be added.

required
max_content_length int

The maximum number of characters to include in the prompt.

15000
Source code in src/app/agents/peerread_tools.py
def add_peerread_review_tools_to_manager(
    manager_agent: Agent[None, BaseModel], max_content_length: int = 15000
):
    """Add PeerRead review generation and persistence tools to the manager agent.

    Args:
        manager_agent: The manager agent to which review tools will be added.
        max_content_length: The maximum number of characters to include in the prompt.
    """

    @manager_agent.tool
    async def generate_paper_review_content_from_template(  # type: ignore[reportUnusedFunction]
        ctx: RunContext[None],
        paper_id: str,
        review_focus: str = "comprehensive",
        tone: str = "professional",
    ) -> str:
        """Create a review template for a specific paper.

        WARNING: This function does NOT generate actual reviews. It creates a
        structured template that would need to be filled in manually or by
        another AI system. This is a demonstration/template function only.

        Args:
            paper_id: Unique identifier for the paper being reviewed.
            review_focus: Type of review (comprehensive, technical, high-level).
            tone: Tone of the review (professional, constructive, critical).

        Returns:
            str: Review template with paper information and placeholder sections
                 that need to be manually completed.
        """
        try:
            config = load_peerread_config()
            loader = PeerReadLoader(config)
            paper = loader.get_paper_by_id(paper_id)

            if not paper:
                raise ValueError(f"Paper {paper_id} not found in PeerRead dataset")

            # Load paper content for the template
            paper_content_for_template = loader.load_parsed_pdf_content(paper_id)

            if not paper_content_for_template:
                logger.warning(
                    f"No parsed PDF content found for paper {paper_id}. "
                    "Attempting to read raw PDF."
                )
                raw_pdf_path = loader.get_raw_pdf_path(paper_id)
                if raw_pdf_path:
                    try:
                        paper_content_for_template = read_paper_pdf(ctx, raw_pdf_path)
                        logger.info(f"Successfully read raw PDF for paper {paper_id}.")
                    except Exception as e:
                        logger.warning(
                            f"Failed to read raw PDF for paper {paper_id}: {e}. "
                            "Using abstract as fallback."
                        )
                        paper_content_for_template = paper.abstract
                else:
                    logger.warning(
                        f"No raw PDF found for paper {paper_id}. "
                        "Using abstract as fallback."
                    )
                    paper_content_for_template = paper.abstract

            # Use centralized path resolution for template
            template_path = get_review_template_path()

            try:
                with open(template_path, encoding="utf-8") as f:
                    template_content = f.read()
                # TODO max content length handling for models
                # full_input_contenxt_len > max_content_length

                # Format the template with paper information including full content
                review_template = template_content.format(
                    paper_title=paper.title,
                    paper_abstract=paper.abstract,
                    paper_full_content=paper_content_for_template,
                    tone=tone,
                    review_focus=review_focus,
                )

            except FileNotFoundError:
                logger.error(f"Review template file not found at {template_path}")
                raise ValueError(
                    f"Review template configuration file missing: {template_path}"
                )
            except Exception as e:
                logger.error(f"Error loading review template: {e}")
                raise ValueError(f"Failed to load review template: {str(e)}")

            logger.info(
                f"Created review template for paper {paper_id} (NOT a real review)"
            )
            return review_template

        except Exception as e:
            logger.error(f"Error creating review template: {e}")
            raise ValueError(f"Failed to create review template: {str(e)}")

    @manager_agent.tool
    async def save_paper_review(  # type: ignore[reportUnusedFunction]
        ctx: RunContext[None],
        paper_id: str,
        review_text: str,
        recommendation: str = "",
        confidence: float = 0.0,
    ) -> str:
        """Save agent-generated review to persistent storage.

        Args:
            paper_id: Unique identifier for the paper being reviewed.
            review_text: Review text generated by the agent.
            recommendation: Review recommendation (accept/reject/etc).
            confidence: Confidence score for the review (0.0-1.0).

        Returns:
            str: Path to the saved review file.
        """
        try:
            # Create PeerReadReview object
            review = PeerReadReview(
                impact="N/A",
                substance="N/A",
                appropriateness="N/A",
                meaningful_comparison="N/A",
                presentation_format="N/A",
                comments=review_text,
                soundness_correctness="N/A",
                originality="N/A",
                recommendation=recommendation or "N/A",
                clarity="N/A",
                reviewer_confidence=str(confidence) if confidence > 0 else "N/A",
            )

            # Save to persistent storage
            persistence = ReviewPersistence()
            filepath = persistence.save_review(paper_id, review)

            logger.info(f"Saved review for paper {paper_id} to {filepath}")
            return filepath

        except Exception as e:
            logger.error(f"Error saving paper review: {e}")
            raise ValueError(f"Failed to save review: {str(e)}")

    @manager_agent.tool
    async def save_structured_review(  # type: ignore[reportUnusedFunction]
        ctx: RunContext[None],
        paper_id: str,
        structured_review: GeneratedReview,
    ) -> str:
        """Save a structured review object to persistent storage.

        Args:
            paper_id: Unique identifier for the paper being reviewed.
            structured_review: GeneratedReview object with validated fields.

        Returns:
            str: Path to the saved review file.
        """
        try:
            from datetime import UTC, datetime

            # Convert structured review to PeerReadReview format for persistence
            peerread_format = structured_review.to_peerread_format()
            # Create PeerReadReview with proper type conversion
            review = PeerReadReview(
                impact=peerread_format["IMPACT"] or "N/A",
                substance=peerread_format["SUBSTANCE"] or "N/A",
                appropriateness=peerread_format["APPROPRIATENESS"] or "N/A",
                meaningful_comparison=peerread_format["MEANINGFUL_COMPARISON"] or "N/A",
                presentation_format=peerread_format["PRESENTATION_FORMAT"] or "Poster",
                comments=peerread_format["comments"] or "No comments provided",
                soundness_correctness=peerread_format["SOUNDNESS_CORRECTNESS"] or "N/A",
                originality=peerread_format["ORIGINALITY"] or "N/A",
                recommendation=peerread_format["RECOMMENDATION"] or "N/A",
                clarity="N/A",
                reviewer_confidence=peerread_format["REVIEWER_CONFIDENCE"] or "N/A",
                is_meta_review=None,
            )

            # Save to persistent storage
            persistence = ReviewPersistence()
            filepath = persistence.save_review(paper_id, review)

            # Also save the original structured format for validation
            timestamp = datetime.now(UTC).strftime("%Y-%m-%dT%H-%M-%SZ")
            result = ReviewGenerationResult(
                paper_id=paper_id,
                review=structured_review,
                timestamp=timestamp,
                model_info="GPT-4o via PydanticAI",
            )

            # Save structured version alongside
            structured_path = filepath.replace(".json", "_structured.json")
            with open(structured_path, "w", encoding="utf-8") as f:
                dump(result.model_dump(), f, indent=2, ensure_ascii=False)

            logger.info(f"Saved structured review for paper {paper_id} to {filepath}")
            return filepath

        except Exception as e:
            logger.error(f"Error saving structured review: {e}")
            raise ValueError(f"Failed to save structured review: {str(e)}")

add_peerread_tools_to_manager(manager_agent)

Add PeerRead dataset tools to the manager agent.

Parameters:

Name Type Description Default
manager_agent Agent[None, BaseModel]

The manager agent to which PeerRead tools will be added.

required
Source code in src/app/agents/peerread_tools.py
def add_peerread_tools_to_manager(manager_agent: Agent[None, BaseModel]):
    """Add PeerRead dataset tools to the manager agent.

    Args:
        manager_agent: The manager agent to which PeerRead tools will be added.
    """

    @manager_agent.tool
    async def get_peerread_paper(ctx: RunContext[None], paper_id: str) -> PeerReadPaper:  # type: ignore[reportUnusedFunction]
        """Get a specific paper from the PeerRead dataset.

        Args:
            paper_id: Unique identifier for the paper.

        Returns:
            PeerReadPaper with title, abstract, and reviews.
        """
        try:
            config = load_peerread_config()
            loader = PeerReadLoader(config)

            paper = loader.get_paper_by_id(paper_id)
            if not paper:
                raise ValueError(f"Paper {paper_id} not found in PeerRead dataset")

            logger.info(f"Retrieved paper {paper_id}: {paper.title[:50]}...")
            return paper

        except Exception as e:
            logger.error(f"Error retrieving paper: {e}")
            raise ValueError(f"Failed to retrieve paper: {str(e)}")

    @manager_agent.tool
    async def query_peerread_papers(  # type: ignore[reportUnusedFunction]
        ctx: RunContext[None], venue: str = "", min_reviews: int = 1
    ) -> list[PeerReadPaper]:
        """Query papers from PeerRead dataset with filters.

        Args:
            venue: Filter by conference venue (empty for all venues).
            min_reviews: Minimum number of reviews required per paper.

        Returns:
            List of PeerReadPaper objects matching the criteria.
        """
        try:
            config = load_peerread_config()
            loader = PeerReadLoader(config)

            # Query papers with filters
            papers = loader.query_papers(
                venue=venue if venue else None,
                min_reviews=min_reviews,
                limit=config.max_papers_per_query,
            )

            logger.info(f"Found {len(papers)} papers matching criteria")
            return papers

        except Exception as e:
            logger.error(f"Error querying papers: {e}")
            raise ValueError(f"Failed to query papers: {str(e)}")

    @manager_agent.tool
    async def read_paper_pdf_tool(  # type: ignore[reportUnusedFunction]
        ctx: RunContext[None],
        pdf_path: str,
    ) -> str:
        """Read text content from a PDF file using MarkItDown.

        Note: MarkItDown extracts the entire PDF content as a single text block.
        Page-level extraction is not supported by the underlying library.

        Args:
            pdf_path: Path to the PDF file.

        Returns:
            str: Extracted text content from the entire PDF in Markdown format.
        """
        return read_paper_pdf(ctx, pdf_path)

read_paper_pdf(ctx, pdf_path)

Read text content from a PDF file using MarkItDown.

Note: MarkItDown extracts the entire PDF content as a single text block. Page-level extraction is not supported by the underlying library.

Parameters:

Name Type Description Default
ctx RunContext[None] | None

RunContext (unused but required for tool compatibility).

required
pdf_path str | Path

Path to the PDF file.

required

Returns:

Name Type Description
str str

Extracted text content from the entire PDF in Markdown format.

Raises:

Type Description
FileNotFoundError

If the PDF file doesn’t exist.

ValueError

If the file is not a PDF or conversion fails.

Source code in src/app/agents/peerread_tools.py
def read_paper_pdf(
    ctx: RunContext[None] | None,
    pdf_path: str | Path,
) -> str:
    """Read text content from a PDF file using MarkItDown.

    Note: MarkItDown extracts the entire PDF content as a single text block.
    Page-level extraction is not supported by the underlying library.

    Args:
        ctx: RunContext (unused but required for tool compatibility).
        pdf_path: Path to the PDF file.

    Returns:
        str: Extracted text content from the entire PDF in Markdown format.

    Raises:
        FileNotFoundError: If the PDF file doesn't exist.
        ValueError: If the file is not a PDF or conversion fails.
    """
    if isinstance(pdf_path, str):
        pdf_file = Path(pdf_path)
    else:
        pdf_file = pdf_path
    if not pdf_file.exists():
        raise FileNotFoundError(f"PDF file not found: {pdf_file}")
    if pdf_file.suffix.lower() != ".pdf":
        raise ValueError(f"Not a PDF file: {pdf_file}")

    try:
        md_converter = MarkItDown()
        result = md_converter.convert(pdf_file)
        logger.info(f"Extracted text from {pdf_file}")
        return result.text_content.strip()

    except Exception as e:
        logger.error(f"Error reading PDF with MarkItDown: {e}")
        raise ValueError(f"Failed to read PDF: {str(e)}")

app.app

Main entry point for the Agents-eval application.

This module initializes the agentic system, loads configuration files, handles user input, and orchestrates the multi-agent workflow using asynchronous execution. It integrates logging, tracing, and authentication, and supports both CLI and programmatic execution.

Classes

Functions

main(chat_provider=CHAT_DEFAULT_PROVIDER, query='', include_researcher=False, include_analyst=False, include_synthesiser=False, pydantic_ai_stream=False, chat_config_file=None, enable_review_tools=False, paper_number=None, download_peerread_full_only=False, download_peerread_samples_only=False, peerread_max_papers_per_sample_download=5) async

Main entry point for the application.

Returns:

Type Description
None

None

Source code in src/app/app.py
@op()
async def main(
    chat_provider: str = CHAT_DEFAULT_PROVIDER,
    query: str = "",
    include_researcher: bool = False,
    include_analyst: bool = False,
    include_synthesiser: bool = False,
    pydantic_ai_stream: bool = False,
    chat_config_file: str | Path | None = None,
    enable_review_tools: bool = False,
    paper_number: str | None = None,
    download_peerread_full_only: bool = False,
    download_peerread_samples_only: bool = False,
    peerread_max_papers_per_sample_download: int | None = 5,
    # chat_config_path: str | Path,
) -> None:
    """
    Main entry point for the application.

    Args:
        See `--help`.

    Returns:
        None
    """

    logger.info(f"Starting app '{PROJECT_NAME}' v{__version__}")

    # Handle download-only mode (setup phase)
    if download_peerread_full_only:
        logger.info("Full download-only mode activated")
        try:
            download_peerread_dataset(peerread_max_papers_per_sample_download=None)
            logger.info("Setup completed successfully. Exiting.")
            return
        except Exception as e:
            logger.error(f"Setup failed: {e}")
            raise

    if download_peerread_samples_only:
        logger.info(
            f"Downloading only {peerread_max_papers_per_sample_download} samples"
        )
        try:
            download_peerread_dataset(peerread_max_papers_per_sample_download)
            logger.info("Setup completed successfully. Exiting.")
            return
        except Exception as e:
            logger.error(f"Setup failed: {e}")
            raise

    try:
        if chat_config_file is None:
            chat_config_file = resolve_config_path(CHAT_CONFIG_FILE)
        logger.info(f"Chat config file: {chat_config_file}")
        with span("main()"):
            if not chat_provider:
                chat_provider = input("Which inference chat_provider to use? ")

            chat_config = load_config(chat_config_file, ChatConfig)
            # FIXME remove type ignore and cast and properly type
            prompts: dict[str, str] = cast(dict[str, str], chat_config.prompts)  # type: ignore[reportUnknownMemberType,reportAttributeAccessIssue]

            # Handle paper review workflow
            if paper_number:
                enable_review_tools = True
                if not query:
                    paper_review_template = prompts.get(
                        "paper_review_query",
                        "Generate a structured peer review for paper '{paper_number}' "
                        "from PeerRead dataset.",
                    )
                    query = paper_review_template.format(paper_number=paper_number)
                logger.info(f"Paper review mode enabled for paper {paper_number}")
            elif not query:
                # Prompt user for input when no query is provided
                default_prompt = prompts.get(
                    "default_query", "What would you like to research? "
                )
                query = input(f"{default_prompt} ")
            chat_env_config = AppEnv()
            agent_env = setup_agent_env(
                chat_provider, query, chat_config, chat_env_config
            )

            # FIXME enhance login, not every run?
            login(PROJECT_NAME, chat_env_config)

            manager = get_manager(
                agent_env.provider,
                agent_env.provider_config,
                agent_env.api_key,
                agent_env.prompts,
                include_researcher,
                include_analyst,
                include_synthesiser,
                enable_review_tools,
            )
            await run_manager(
                manager,
                agent_env.query,
                agent_env.provider,
                agent_env.usage_limits,
                pydantic_ai_stream,
            )
            logger.info(f"Exiting app '{PROJECT_NAME}'")

    except Exception as e:
        msg = generic_exception(f"Aborting app '{PROJECT_NAME}' with: {e}")
        logger.exception(msg)
        raise Exception(msg) from e

app.config.config_app

Configuration constants for the application.

app.data_models.app_models

Data models for agent system configuration and results.

This module defines Pydantic models for representing research and analysis results, summaries, provider and agent configurations, and model dictionaries used throughout the application. These models ensure type safety and validation for data exchanged between agents and system components.

Classes

AgentConfig

Bases: BaseModel

Configuration for an agent

Source code in src/app/data_models/app_models.py
class AgentConfig(BaseModel):
    """Configuration for an agent"""

    model: Model  # (1) Instance expected
    output_type: type[BaseModel]  # (2) Class expected
    system_prompt: str
    # FIXME tools: list[Callable[..., Awaitable[Any]]]
    tools: list[Any] = []  # (3) List of tools will be validated at creation
    retries: int = 3

    # Avoid pydantic.errors.PydanticSchemaGenerationError:
    # Unable to generate pydantic-core schema for <class 'openai.AsyncOpenAI'>.
    # Avoid Pydantic errors related to non-Pydantic types
    model_config = ConfigDict(
        arbitrary_types_allowed=True
    )  # (4) Suppress Error non-Pydantic types caused by <class 'openai.AsyncOpenAI'>

    @field_validator("tools", mode="before")
    def validate_tools(cls, v: list[Any]) -> list[Tool | None]:
        """Validate that all tools are instances of Tool."""
        if not v:
            return []
        if not all(isinstance(t, Tool) for t in v):
            raise ValueError("All tools must be Tool instances")
        return v
Functions
validate_tools(v)

Validate that all tools are instances of Tool.

Source code in src/app/data_models/app_models.py
@field_validator("tools", mode="before")
def validate_tools(cls, v: list[Any]) -> list[Tool | None]:
    """Validate that all tools are instances of Tool."""
    if not v:
        return []
    if not all(isinstance(t, Tool) for t in v):
        raise ValueError("All tools must be Tool instances")
    return v

AnalysisResult

Bases: BaseModel

Analysis results from the analysis agent.

Source code in src/app/data_models/app_models.py
class AnalysisResult(BaseModel):
    """Analysis results from the analysis agent."""

    insights: list[str]
    recommendations: list[str]
    approval: bool

AppEnv

Bases: BaseSettings

Application environment settings loaded from environment variables or .env file.

This class uses Pydantic’s BaseSettings to manage API keys and configuration for various inference endpoints, tools, and logging/monitoring services. Environment variables are loaded from a .env file by default.

Source code in src/app/data_models/app_models.py
class AppEnv(BaseSettings):
    """
    Application environment settings loaded from environment variables or .env file.

    This class uses Pydantic's BaseSettings to manage API keys and configuration
    for various inference endpoints, tools, and logging/monitoring services.
    Environment variables are loaded from a .env file by default.
    """

    # Inference endpoints
    ANTHROPIC_API_KEY: str = ""
    GEMINI_API_KEY: str = ""
    GITHUB_API_KEY: str = ""
    GROK_API_KEY: str = ""
    HUGGINGFACE_API_KEY: str = ""
    OPENAI_API_KEY: str = ""
    OPENROUTER_API_KEY: str = ""
    PERPLEXITY_API_KEY: str = ""
    RESTACK_API_KEY: str = ""
    TOGETHER_API_KEY: str = ""

    # Tools
    TAVILY_API_KEY: str = ""

    # Logging/Monitoring/Tracing
    AGENTOPS_API_KEY: str = ""
    LOGFIRE_API_KEY: str = ""
    WANDB_API_KEY: str = ""

    model_config = SettingsConfigDict(
        env_file=".env", env_file_encoding="utf-8", extra="ignore"
    )

ChatConfig

Bases: BaseModel

Configuration settings for agents and model providers

Source code in src/app/data_models/app_models.py
class ChatConfig(BaseModel):
    """Configuration settings for agents and model providers"""

    providers: dict[str, ProviderConfig]
    inference: dict[str, str | int]
    prompts: dict[str, str]

EndpointConfig

Bases: BaseModel

Configuration for an agent

Source code in src/app/data_models/app_models.py
class EndpointConfig(BaseModel):
    """Configuration for an agent"""

    provider: str
    query: UserPromptType = None
    api_key: str | None
    prompts: dict[str, str]
    provider_config: ProviderConfig
    usage_limits: UsageLimits | None = None

ModelDict

Bases: BaseModel

Dictionary of models used to create agent systems

Source code in src/app/data_models/app_models.py
class ModelDict(BaseModel):
    """Dictionary of models used to create agent systems"""

    model_manager: Model
    model_researcher: Model | None
    model_analyst: Model | None
    model_synthesiser: Model | None
    model_config = ConfigDict(arbitrary_types_allowed=True)

ProviderConfig

Bases: BaseModel

Configuration for a model provider

Source code in src/app/data_models/app_models.py
class ProviderConfig(BaseModel):
    """Configuration for a model provider"""

    model_name: str
    base_url: HttpUrl
    usage_limits: int | None = None
    max_content_length: int | None = 15000

ResearchResult

Bases: BaseModel

Research results from the research agent with flexible structure.

Source code in src/app/data_models/app_models.py
class ResearchResult(BaseModel):
    """Research results from the research agent with flexible structure."""

    topic: str | dict[str, str]
    findings: list[str] | dict[str, str | list[str]]
    sources: list[str | HttpUrl] | dict[str, str | HttpUrl | list[str | HttpUrl]]

ResearchResultSimple

Bases: BaseModel

Simplified research results for Gemini compatibility.

Source code in src/app/data_models/app_models.py
class ResearchResultSimple(BaseModel):
    """Simplified research results for Gemini compatibility."""

    topic: str
    findings: list[str]
    sources: list[str]

ResearchSummary

Bases: BaseModel

Expected model response of research on a topic

Source code in src/app/data_models/app_models.py
class ResearchSummary(BaseModel):
    """Expected model response of research on a topic"""

    topic: str
    key_points: list[str]
    key_points_explanation: list[str]
    conclusion: str
    sources: list[str]

app.data_models.peerread_evaluation_models

PeerRead evaluation data models.

This module defines Pydantic models specifically for evaluation results when comparing agent-generated reviews against PeerRead ground truth.

Classes

PeerReadEvalResult

Bases: BaseModel

Result of evaluating agent review against PeerRead ground truth.

Source code in src/app/data_models/peerread_evaluation_models.py
class PeerReadEvalResult(BaseModel):
    """Result of evaluating agent review against PeerRead ground truth."""

    paper_id: str = Field(description="Paper being evaluated")
    agent_review: str = Field(description="Review generated by agent")
    ground_truth_reviews: list[PeerReadReview] = Field(
        description="Original peer reviews from dataset"
    )
    similarity_scores: dict[str, float] = Field(
        description="Similarity metrics (semantic, cosine, jaccard)"
    )
    overall_similarity: float = Field(
        description="Weighted overall similarity score (0-1)"
    )
    recommendation_match: bool = Field(
        description="Whether agent recommendation matches ground truth"
    )

app.data_models.peerread_models

PeerRead dataset data models.

This module defines Pydantic models for representing PeerRead scientific paper review data structures. These models ensure type safety and validation for papers, reviews, and evaluation results used in the multi-agent system evaluation.

The models are based on the actual PeerRead dataset structure validated from: https://raw.githubusercontent.com/allenai/PeerRead/master/data/acl_2017/train/reviews/104.json

This module also includes structured data models for LLM-generated reviews, ensuring consistency and validation against the PeerRead format.

Classes

DownloadResult

Bases: BaseModel

Result of dataset download operation.

Source code in src/app/data_models/peerread_models.py
class DownloadResult(BaseModel):
    """Result of dataset download operation."""

    success: bool = Field(description="Whether download was successful")
    cache_path: str = Field(description="Path to cached data")
    papers_downloaded: int = Field(default=0, description="Number of papers downloaded")
    error_message: str | None = Field(
        default=None, description="Error message if download failed"
    )

GeneratedReview

Bases: BaseModel

Structured data model for LLM-generated reviews.

This model enforces the PeerRead review format and ensures all required fields are present with proper validation.

Source code in src/app/data_models/peerread_models.py
class GeneratedReview(BaseModel):
    """
    Structured data model for LLM-generated reviews.

    This model enforces the PeerRead review format and ensures
    all required fields are present with proper validation.
    """

    impact: int = Field(
        ..., ge=1, le=5, description="Impact rating (1=minimal, 5=high impact)"
    )

    substance: int = Field(
        ..., ge=1, le=5, description="Substance/depth rating (1=shallow, 5=substantial)"
    )

    appropriateness: int = Field(
        ...,
        ge=1,
        le=5,
        description="Venue appropriateness rating (1=inappropriate, 5=appropriate)",
    )

    meaningful_comparison: int = Field(
        ...,
        ge=1,
        le=5,
        description="Related work comparison rating (1=poor, 5=excellent)",
    )

    presentation_format: Literal["Poster", "Oral"] = Field(
        ..., description="Recommended presentation format"
    )

    comments: str = Field(
        ...,
        min_length=100,
        description="Detailed review comments covering contributions, strengths, "
        "weaknesses, technical soundness, clarity, and suggestions",
    )

    soundness_correctness: int = Field(
        ...,
        ge=1,
        le=5,
        description="Technical soundness rating (1=many errors, 5=very sound)",
    )

    originality: int = Field(
        ...,
        ge=1,
        le=5,
        description="Originality rating (1=not original, 5=highly original)",
    )

    recommendation: int = Field(
        ...,
        ge=1,
        le=5,
        description="Overall recommendation (1=strong reject, 2=reject, "
        "3=borderline, 4=accept, 5=strong accept)",
    )

    clarity: int = Field(
        ...,
        ge=1,
        le=5,
        description="Presentation clarity rating (1=very unclear, 5=very clear)",
    )

    reviewer_confidence: int = Field(
        ...,
        ge=1,
        le=5,
        description="Reviewer confidence rating (1=low confidence, 5=high confidence)",
    )

    @field_validator("comments")
    def validate_comments_structure(cls, v: str) -> str:
        """Ensure comments contain key review sections."""
        required_sections = [
            "contributions",
            "strengths",
            "weaknesses",
            "technical",
            "clarity",
        ]

        v_lower = v.lower()
        missing_sections = [
            section for section in required_sections if section not in v_lower
        ]

        if missing_sections:
            # Just warn but don't fail - LLM might use different wording
            pass

        return v

    def to_peerread_format(self) -> dict[str, str | None]:
        """Convert to PeerRead dataset format for compatibility."""
        return {
            "IMPACT": str(self.impact),
            "SUBSTANCE": str(self.substance),
            "APPROPRIATENESS": str(self.appropriateness),
            "MEANINGFUL_COMPARISON": str(self.meaningful_comparison),
            "PRESENTATION_FORMAT": self.presentation_format,
            "comments": self.comments,
            "SOUNDNESS_CORRECTNESS": str(self.soundness_correctness),
            "ORIGINALITY": str(self.originality),
            "RECOMMENDATION": str(self.recommendation),
            "CLARITY": str(self.clarity),
            "REVIEWER_CONFIDENCE": str(self.reviewer_confidence),
            "is_meta_review": None,
        }
Functions
to_peerread_format()

Convert to PeerRead dataset format for compatibility.

Source code in src/app/data_models/peerread_models.py
def to_peerread_format(self) -> dict[str, str | None]:
    """Convert to PeerRead dataset format for compatibility."""
    return {
        "IMPACT": str(self.impact),
        "SUBSTANCE": str(self.substance),
        "APPROPRIATENESS": str(self.appropriateness),
        "MEANINGFUL_COMPARISON": str(self.meaningful_comparison),
        "PRESENTATION_FORMAT": self.presentation_format,
        "comments": self.comments,
        "SOUNDNESS_CORRECTNESS": str(self.soundness_correctness),
        "ORIGINALITY": str(self.originality),
        "RECOMMENDATION": str(self.recommendation),
        "CLARITY": str(self.clarity),
        "REVIEWER_CONFIDENCE": str(self.reviewer_confidence),
        "is_meta_review": None,
    }
validate_comments_structure(v)

Ensure comments contain key review sections.

Source code in src/app/data_models/peerread_models.py
@field_validator("comments")
def validate_comments_structure(cls, v: str) -> str:
    """Ensure comments contain key review sections."""
    required_sections = [
        "contributions",
        "strengths",
        "weaknesses",
        "technical",
        "clarity",
    ]

    v_lower = v.lower()
    missing_sections = [
        section for section in required_sections if section not in v_lower
    ]

    if missing_sections:
        # Just warn but don't fail - LLM might use different wording
        pass

    return v

PeerReadConfig

Bases: BaseModel

Configuration for PeerRead dataset management.

Source code in src/app/data_models/peerread_models.py
class PeerReadConfig(BaseModel):
    """Configuration for PeerRead dataset management."""

    base_url: str = Field(
        default="https://github.com/allenai/PeerRead/tree/master/data",
        description="Base URL for PeerRead dataset",
    )
    github_api_base_url: str = Field(
        default="https://api.github.com/repos/allenai/PeerRead/contents/data",
        description="Base URL for GitHub API to list PeerRead dataset contents",
    )
    raw_github_base_url: str = Field(
        default="https://raw.githubusercontent.com/allenai/PeerRead/master/data",
        description="Base URL for raw GitHub content of PeerRead dataset",
    )
    cache_directory: str = Field(
        default=DATASETS_PEERREAD_PATH,
        description="Local directory for caching downloaded data",
    )
    venues: list[str] = Field(
        default=["acl_2017", "conll_2016", "iclr_2017"],
        description="Available conference venues",
    )
    splits: list[str] = Field(
        default=["train", "test", "dev"], description="Available data splits"
    )
    max_papers_per_query: int = Field(
        default=100, description="Maximum papers to return per query"
    )
    download_timeout: int = Field(
        default=30, description="Timeout for download requests in seconds"
    )
    max_retries: int = Field(
        default=5, description="Maximum number of retry attempts for downloads"
    )
    retry_delay_seconds: int = Field(
        default=5, description="Delay in seconds between retry attempts"
    )
    similarity_metrics: dict[str, float] = Field(
        default={"cosine_weight": 0.6, "jaccard_weight": 0.4},
        description="Weights for similarity metrics",
    )

PeerReadPaper

Bases: BaseModel

Scientific paper from PeerRead dataset.

Source code in src/app/data_models/peerread_models.py
class PeerReadPaper(BaseModel):
    """Scientific paper from PeerRead dataset."""

    paper_id: str = Field(description="Unique paper identifier")
    title: str = Field(description="Paper title")
    abstract: str = Field(description="Paper abstract")
    reviews: list[PeerReadReview] = Field(description="Peer reviews for this paper")
    review_histories: list[str] = Field(
        default_factory=list, description="Paper revision histories"
    )

PeerReadReview

Bases: BaseModel

Individual peer review from PeerRead dataset.

Source code in src/app/data_models/peerread_models.py
class PeerReadReview(BaseModel):
    """Individual peer review from PeerRead dataset."""

    impact: str = Field(description="Impact score (1-5)")
    substance: str = Field(description="Substance score (1-5)")
    appropriateness: str = Field(description="Appropriateness score (1-5)")
    meaningful_comparison: str = Field(description="Meaningful comparison score (1-5)")
    presentation_format: str = Field(description="Presentation format (Poster/Oral)")
    comments: str = Field(description="Detailed review comments")
    soundness_correctness: str = Field(description="Soundness/correctness score (1-5)")
    originality: str = Field(description="Originality score (1-5)")
    recommendation: str = Field(description="Overall recommendation score (1-5)")
    clarity: str = Field(description="Clarity score (1-5)")
    reviewer_confidence: str = Field(description="Reviewer confidence score (1-5)")
    is_meta_review: bool | None = Field(
        default=None, description="Whether this is a meta review"
    )

ReviewGenerationResult

Bases: BaseModel

Complete result from the review generation process.

Contains the structured review along with metadata.

Source code in src/app/data_models/peerread_models.py
class ReviewGenerationResult(BaseModel):
    """
    Complete result from the review generation process.

    Contains the structured review along with metadata.
    """

    paper_id: str = Field(
        ..., description=("The unique paper identifier provided by PeerRead")
    )
    review: GeneratedReview = Field(
        ..., description="The structured review povided by LLM"
    )
    timestamp: str = Field(..., description="Generation timestamp in ISO format")
    model_info: str = Field(
        ...,
        description=(
            "Information about the generating model: your model name, version, etc."
        ),
    )

app.data_utils.datasets_peerread

PeerRead dataset core utilities for download and loading.

This module provides pure dataset functionality for downloading, caching, and loading the PeerRead scientific paper review dataset. It contains no evaluation logic - only data access and management.

Classes

PeerReadDownloader

Downloads PeerRead dataset files with caching and validation.

Handles direct download from GitHub repository with progress tracking, error recovery, and integrity verification.

Source code in src/app/data_utils/datasets_peerread.py
class PeerReadDownloader:
    """Downloads PeerRead dataset files with caching and validation.

    Handles direct download from GitHub repository with progress tracking,
    error recovery, and integrity verification.
    """

    def __init__(self, config: PeerReadConfig):
        """Initialize downloader with configuration.

        Args:
            config: PeerRead dataset configuration.
        """
        self.config = config
        # Resolve cache directory relative to project root
        self.cache_dir = resolve_project_path(config.cache_directory)
        headers: dict[str, str] = {}
        if chat_config.GITHUB_API_KEY:
            logger.info("Using GitHub API key for authenticated requests")
            headers["Authorization"] = f"token {chat_config.GITHUB_API_KEY}"
        self.client = Client(headers=headers)

    def _construct_url(
        self,
        venue: str,
        split: str,
        data_type: str,
        paper_id: str,
    ) -> str:
        """Construct download URL for specific file.

        Args:
            venue: Conference venue (e.g., 'acl_2017').
            split: Data split ('train', 'test', 'dev').
            data_type: Type of data ('reviews', 'parsed_pdfs', 'pdfs').
            paper_id: Unique paper identifier.

        Returns:
            Complete download URL.

        Raises:
            ValueError: If venue or split is invalid.
        """
        if venue not in self.config.venues:
            raise ValueError(
                f"Invalid venue: {venue}. Valid venues: {self.config.venues}"
            )

        if split not in self.config.splits:
            raise ValueError(
                f"Invalid split: {split}. Valid splits: {self.config.splits}"
            )

        # Construct filename based on data type
        if data_type == "reviews":
            filename = f"{paper_id}.json"
        elif data_type == "parsed_pdfs":
            filename = f"{paper_id}.pdf.json"
        elif data_type == "pdfs":
            filename = f"{paper_id}.pdf"
        else:
            raise ValueError(
                f"Invalid data_type: {data_type}. Valid types: reviews, "
                f"parsed_pdfs, pdfs"
            )

        return (
            f"{self.config.raw_github_base_url}/{venue}/{split}/{data_type}/{filename}"
        )

    def _discover_available_files(
        self,
        venue: str,
        split: str,
        data_type: str,
    ) -> list[str]:
        """Discover available files in a GitHub repository directory.

        Args:
            venue: Conference venue (e.g., 'acl_2017').
            split: Data split ('train', 'test', 'dev').
            data_type: Type of data ('reviews', 'parsed_pdfs', 'pdfs').

        Returns:
            List of paper IDs (without extensions) available in the directory.
        """
        # Use GitHub API to list directory contents
        api_url = f"{self.config.github_api_base_url}/{venue}/{split}/{data_type}"

        try:
            logger.info(
                f"Discovering {data_type} files in {venue}/{split} via GitHub API"
            )
            response = self.client.get(api_url, timeout=self.config.download_timeout)
            response.raise_for_status()

            files_data = response.json()

            # Extract paper IDs from filenames based on data type
            paper_ids: list[str] = []
            for file_info in files_data:
                if file_info.get("type") == "file":
                    filename = file_info.get("name", "")
                    if data_type == "reviews" and filename.endswith(".json"):
                        paper_id = filename[:-5]  # Remove .json extension
                        paper_ids.append(paper_id)
                    elif data_type == "parsed_pdfs" and filename.endswith(".pdf.json"):
                        paper_id = filename[:-9]  # Remove .pdf.json extension
                        paper_ids.append(paper_id)
                    elif data_type == "pdfs" and filename.endswith(".pdf"):
                        paper_id = filename[:-4]  # Remove .pdf extension
                        paper_ids.append(paper_id)

            logger.info(f"Found {len(paper_ids)} {data_type} files in {venue}/{split}")
            return sorted(paper_ids)

        except RequestError as e:
            logger.error(
                f"Failed to discover {data_type} files for {venue}/{split}: {e}"
            )
            return []
        except (KeyError, ValueError) as e:
            logger.error(
                f"Failed to parse GitHub API response for "
                f"{venue}/{split}/{data_type}: {e}"
            )
            return []

    def download_file(
        self,
        venue: str,
        split: str,
        data_type: str,
        paper_id: str,
    ) -> bytes | dict[str, Any] | None:
        """Download a single file.

        Args:
            venue: Conference venue.
            split: Data split.
            data_type: Type of data ('reviews', 'parsed_pdfs', 'pdfs').
            paper_id: Paper identifier.

        Returns:
            File content (JSON dict for .json files, bytes for PDFs),
            or None if download fails.

        Raises:
            ValueError: If venue/split is invalid.
        """
        url = self._construct_url(venue, split, data_type, paper_id)
        for attempt in range(self.config.max_retries):
            try:
                logger.info(
                    f"Downloading {data_type}/{paper_id} from {url} "
                    f"(Attempt {attempt + 1}/{self.config.max_retries})"
                )

                response = self.client.get(url, timeout=self.config.download_timeout)
                response.raise_for_status()

                # Return JSON for .json files, bytes for PDFs
                if data_type in ["reviews", "parsed_pdfs"]:
                    return response.json()
                else:  # PDFs
                    return response.content

            except HTTPStatusError as e:
                if e.response.status_code == 429:
                    logger.warning(
                        f"Rate limit hit for {data_type}/{paper_id}. "
                        f"Retrying in {self.config.retry_delay_seconds} seconds..."
                    )
                    sleep(self.config.retry_delay_seconds)
                else:
                    logger.error(f"Failed to download {data_type}/{paper_id}: {e}")
                    return None
            except RequestError as e:
                logger.error(f"Failed to download {data_type}/{paper_id}: {e}")
                return None
            except JSONDecodeError as e:
                logger.error(f"Invalid JSON for {data_type}/{paper_id}: {e}")
                return None
        logger.error(
            f"Failed to download {data_type}/{paper_id} after "
            f"{self.config.max_retries} attempts."
        )
        return None

    def download_venue_split(
        self,
        venue: str,
        split: str,
        max_papers: int | None = None,
    ) -> DownloadResult:
        """Download all files for a venue/split combination across all data types.

        Args:
            venue: Conference venue.
            split: Data split.
            max_papers: Maximum number of papers to download.

        Returns:
            DownloadResult with download statistics.
        """
        # Create base cache directory structure
        base_cache_path = self.cache_dir / venue / split

        downloaded = 0
        errors: list[str] = []
        data_types = ["reviews", "parsed_pdfs", "pdfs"]

        # Discover available papers from reviews (use as master list)
        available_paper_ids = self._discover_available_files(venue, split, "reviews")

        if not available_paper_ids:
            error_msg = f"No review files discovered for {venue}/{split}"
            logger.error(error_msg)
            return DownloadResult(
                success=False,
                cache_path=str(base_cache_path),
                papers_downloaded=0,
                error_message=error_msg,
            )

        # Apply max_papers limit if specified
        max_papers = max_papers or self.config.max_papers_per_query
        paper_ids_to_download = available_paper_ids[:max_papers]
        logger.info(
            f"Will download {len(paper_ids_to_download)} of "
            f"{len(available_paper_ids)} available papers across all data types"
        )

        # Download all data types for each paper
        for paper_id in paper_ids_to_download:
            paper_downloaded = False

            for data_type in data_types:
                # Create data type directory
                data_type_path = base_cache_path / data_type
                data_type_path.mkdir(parents=True, exist_ok=True)

                # Determine cache filename based on data type
                if data_type == "reviews":
                    cache_filename = f"{paper_id}.json"
                elif data_type == "parsed_pdfs":
                    cache_filename = f"{paper_id}.pdf.json"
                elif data_type == "pdfs":
                    cache_filename = f"{paper_id}.pdf"
                else:
                    # This case should not be reached if data_types list is correct
                    logger.warning(f"Unsupported data_type: {data_type}")
                    continue

                cache_file = data_type_path / cache_filename

                if cache_file.exists():
                    logger.debug(f"{data_type}/{paper_id} already cached")
                    if not paper_downloaded:
                        paper_downloaded = True
                    continue

                # Download the file
                file_data = self.download_file(venue, split, data_type, paper_id)
                if file_data is not None:
                    if data_type in ["reviews", "parsed_pdfs"]:
                        # JSON data
                        with open(cache_file, "w", encoding="utf-8") as f:
                            dump(file_data, f, indent=2)
                    elif isinstance(file_data, bytes):
                        # PDF binary data
                        with open(cache_file, "wb") as f:
                            f.write(file_data)

                    logger.info(f"Cached {data_type}/{paper_id}")
                    if not paper_downloaded:
                        paper_downloaded = True
                else:
                    errors.append(f"Failed to download {data_type}/{paper_id}")

            if paper_downloaded:
                downloaded += 1

        success = downloaded > 0
        error_message = None if success else "; ".join(errors[:5])

        return DownloadResult(
            success=success,
            cache_path=str(base_cache_path),
            papers_downloaded=downloaded,
            error_message=error_message,
        )
Functions
__init__(config)

Initialize downloader with configuration.

Parameters:

Name Type Description Default
config PeerReadConfig

PeerRead dataset configuration.

required
Source code in src/app/data_utils/datasets_peerread.py
def __init__(self, config: PeerReadConfig):
    """Initialize downloader with configuration.

    Args:
        config: PeerRead dataset configuration.
    """
    self.config = config
    # Resolve cache directory relative to project root
    self.cache_dir = resolve_project_path(config.cache_directory)
    headers: dict[str, str] = {}
    if chat_config.GITHUB_API_KEY:
        logger.info("Using GitHub API key for authenticated requests")
        headers["Authorization"] = f"token {chat_config.GITHUB_API_KEY}"
    self.client = Client(headers=headers)
download_file(venue, split, data_type, paper_id)

Download a single file.

Parameters:

Name Type Description Default
venue str

Conference venue.

required
split str

Data split.

required
data_type str

Type of data (‘reviews’, ‘parsed_pdfs’, ‘pdfs’).

required
paper_id str

Paper identifier.

required

Returns:

Type Description
bytes | dict[str, Any] | None

File content (JSON dict for .json files, bytes for PDFs),

bytes | dict[str, Any] | None

or None if download fails.

Raises:

Type Description
ValueError

If venue/split is invalid.

Source code in src/app/data_utils/datasets_peerread.py
def download_file(
    self,
    venue: str,
    split: str,
    data_type: str,
    paper_id: str,
) -> bytes | dict[str, Any] | None:
    """Download a single file.

    Args:
        venue: Conference venue.
        split: Data split.
        data_type: Type of data ('reviews', 'parsed_pdfs', 'pdfs').
        paper_id: Paper identifier.

    Returns:
        File content (JSON dict for .json files, bytes for PDFs),
        or None if download fails.

    Raises:
        ValueError: If venue/split is invalid.
    """
    url = self._construct_url(venue, split, data_type, paper_id)
    for attempt in range(self.config.max_retries):
        try:
            logger.info(
                f"Downloading {data_type}/{paper_id} from {url} "
                f"(Attempt {attempt + 1}/{self.config.max_retries})"
            )

            response = self.client.get(url, timeout=self.config.download_timeout)
            response.raise_for_status()

            # Return JSON for .json files, bytes for PDFs
            if data_type in ["reviews", "parsed_pdfs"]:
                return response.json()
            else:  # PDFs
                return response.content

        except HTTPStatusError as e:
            if e.response.status_code == 429:
                logger.warning(
                    f"Rate limit hit for {data_type}/{paper_id}. "
                    f"Retrying in {self.config.retry_delay_seconds} seconds..."
                )
                sleep(self.config.retry_delay_seconds)
            else:
                logger.error(f"Failed to download {data_type}/{paper_id}: {e}")
                return None
        except RequestError as e:
            logger.error(f"Failed to download {data_type}/{paper_id}: {e}")
            return None
        except JSONDecodeError as e:
            logger.error(f"Invalid JSON for {data_type}/{paper_id}: {e}")
            return None
    logger.error(
        f"Failed to download {data_type}/{paper_id} after "
        f"{self.config.max_retries} attempts."
    )
    return None
download_venue_split(venue, split, max_papers=None)

Download all files for a venue/split combination across all data types.

Parameters:

Name Type Description Default
venue str

Conference venue.

required
split str

Data split.

required
max_papers int | None

Maximum number of papers to download.

None

Returns:

Type Description
DownloadResult

DownloadResult with download statistics.

Source code in src/app/data_utils/datasets_peerread.py
def download_venue_split(
    self,
    venue: str,
    split: str,
    max_papers: int | None = None,
) -> DownloadResult:
    """Download all files for a venue/split combination across all data types.

    Args:
        venue: Conference venue.
        split: Data split.
        max_papers: Maximum number of papers to download.

    Returns:
        DownloadResult with download statistics.
    """
    # Create base cache directory structure
    base_cache_path = self.cache_dir / venue / split

    downloaded = 0
    errors: list[str] = []
    data_types = ["reviews", "parsed_pdfs", "pdfs"]

    # Discover available papers from reviews (use as master list)
    available_paper_ids = self._discover_available_files(venue, split, "reviews")

    if not available_paper_ids:
        error_msg = f"No review files discovered for {venue}/{split}"
        logger.error(error_msg)
        return DownloadResult(
            success=False,
            cache_path=str(base_cache_path),
            papers_downloaded=0,
            error_message=error_msg,
        )

    # Apply max_papers limit if specified
    max_papers = max_papers or self.config.max_papers_per_query
    paper_ids_to_download = available_paper_ids[:max_papers]
    logger.info(
        f"Will download {len(paper_ids_to_download)} of "
        f"{len(available_paper_ids)} available papers across all data types"
    )

    # Download all data types for each paper
    for paper_id in paper_ids_to_download:
        paper_downloaded = False

        for data_type in data_types:
            # Create data type directory
            data_type_path = base_cache_path / data_type
            data_type_path.mkdir(parents=True, exist_ok=True)

            # Determine cache filename based on data type
            if data_type == "reviews":
                cache_filename = f"{paper_id}.json"
            elif data_type == "parsed_pdfs":
                cache_filename = f"{paper_id}.pdf.json"
            elif data_type == "pdfs":
                cache_filename = f"{paper_id}.pdf"
            else:
                # This case should not be reached if data_types list is correct
                logger.warning(f"Unsupported data_type: {data_type}")
                continue

            cache_file = data_type_path / cache_filename

            if cache_file.exists():
                logger.debug(f"{data_type}/{paper_id} already cached")
                if not paper_downloaded:
                    paper_downloaded = True
                continue

            # Download the file
            file_data = self.download_file(venue, split, data_type, paper_id)
            if file_data is not None:
                if data_type in ["reviews", "parsed_pdfs"]:
                    # JSON data
                    with open(cache_file, "w", encoding="utf-8") as f:
                        dump(file_data, f, indent=2)
                elif isinstance(file_data, bytes):
                    # PDF binary data
                    with open(cache_file, "wb") as f:
                        f.write(file_data)

                logger.info(f"Cached {data_type}/{paper_id}")
                if not paper_downloaded:
                    paper_downloaded = True
            else:
                errors.append(f"Failed to download {data_type}/{paper_id}")

        if paper_downloaded:
            downloaded += 1

    success = downloaded > 0
    error_message = None if success else "; ".join(errors[:5])

    return DownloadResult(
        success=success,
        cache_path=str(base_cache_path),
        papers_downloaded=downloaded,
        error_message=error_message,
    )

PeerReadLoader

Loads and queries PeerRead dataset with structured access.

Source code in src/app/data_utils/datasets_peerread.py
class PeerReadLoader:
    """Loads and queries PeerRead dataset with structured access."""

    def __init__(self, config: PeerReadConfig | None = None):
        """Initialize loader with configuration.

        Args:
            config: PeerRead dataset configuration. Loads from file if None.
        """
        self.config = config or load_peerread_config()
        # Resolve cache directory relative to project root
        self.cache_dir = resolve_project_path(self.config.cache_directory)

    def load_parsed_pdf_content(self, paper_id: str) -> str | None:
        """Load the text content from the parsed PDF for a given paper ID.

        Assumes parsed PDF files are JSON and contain a 'sections' key with 'text'
        within. Defaults to the latest revision if multiple exist (by filename).

        Args:
            paper_id: Unique identifier for the paper.

        Returns:
            str: The extracted text content, or None if not found/parsed.
        """
        for venue in self.config.venues:
            for split in self.config.splits:
                parsed_pdfs_path = self.cache_dir / venue / split / "parsed_pdfs"
                if parsed_pdfs_path.exists():
                    # Find all parsed PDF files for this paper_id
                    # Assuming filenames are like 'PAPER_ID.pdf.json'
                    # If multiple revisions, we'll just take the first one found for now
                    parsed_files = sorted(
                        parsed_pdfs_path.glob(f"{paper_id}.pdf.json"), reverse=True
                    )
                    if parsed_files:
                        latest_parsed_file = parsed_files[0]
                        try:
                            with open(latest_parsed_file, encoding="utf-8") as f:
                                parsed_data = load(f)

                            # Extract and concatenate text from all sections
                            full_text: list[str] = []
                            for section in parsed_data.get("metadata", {}).get(
                                "sections", []
                            ):
                                if "text" in section:
                                    full_text.append(section["text"])
                            return "\n".join(full_text).strip()
                        except Exception as e:
                            logger.warning(
                                f"Failed to load/parse {latest_parsed_file}: {e}"
                            )
        return None

    def get_raw_pdf_path(self, paper_id: str) -> str | None:
        """Get the absolute path to the raw PDF file for a given paper ID.

        Args:
            paper_id: Unique identifier for the paper.

        Returns:
            str: The absolute path to the PDF file, or None if not found.
        """
        for venue in self.config.venues:
            for split in self.config.splits:
                pdf_path = self.cache_dir / venue / split / "pdfs" / f"{paper_id}.pdf"
                if pdf_path.exists():
                    return str(pdf_path)
        return None

    def _validate_papers(
        self,
        papers_data: list[dict[str, Any]],
    ) -> list[PeerReadPaper]:
        """Validate and convert paper data to Pydantic models.

        Args:
            papers_data: List of paper dictionaries.

        Returns:
            List of validated PeerReadPaper models.
        """
        validated_papers: list[PeerReadPaper] = []

        for paper_data in papers_data:
            try:
                # Convert from PeerRead format to our model format
                reviews = [
                    PeerReadReview(
                        impact=r["IMPACT"],
                        substance=r["SUBSTANCE"],
                        appropriateness=r["APPROPRIATENESS"],
                        meaningful_comparison=r["MEANINGFUL_COMPARISON"],
                        presentation_format=r["PRESENTATION_FORMAT"],
                        comments=r["comments"],
                        soundness_correctness=r["SOUNDNESS_CORRECTNESS"],
                        originality=r["ORIGINALITY"],
                        recommendation=r["RECOMMENDATION"],
                        clarity=r["CLARITY"],
                        reviewer_confidence=r["REVIEWER_CONFIDENCE"],
                        is_meta_review=r.get("is_meta_review"),
                    )
                    for r in paper_data.get("reviews", [])
                ]

                paper = PeerReadPaper(
                    paper_id=str(paper_data["id"]),
                    title=paper_data["title"],
                    abstract=paper_data["abstract"],
                    reviews=reviews,
                    review_histories=[
                        " ".join(map(str, h)) for h in paper_data.get("histories", [])
                    ],
                )
                validated_papers.append(paper)

            except Exception as e:
                logger.warning(
                    f"Failed to validate paper {paper_data.get('id', 'unknown')}: {e}"
                )
                continue

        return validated_papers

    def load_papers(
        self,
        venue: str = "acl_2017",
        split: str = "train",
    ) -> list[PeerReadPaper]:
        """Load papers from cached data or download if needed.

        Args:
            venue: Conference venue.
            split: Data split.

        Returns:
            List of validated PeerReadPaper models.

        Raises:
            FileNotFoundError: If cache directory doesn't exist and download fails.
        """
        cache_path = self.cache_dir / venue / split

        if not cache_path.exists():
            error_msg = (
                f"PeerRead dataset not found for {venue}/{split}. "
                f"Please download the dataset first using: "
                f"'python src/app/main.py --download-peerread-only' or "
                f"'make run_cli ARGS=\"--download-peerread-only\"'"
            )
            logger.error(error_msg)
            raise FileNotFoundError(error_msg)

        # Load all cached papers from reviews directory
        reviews_path = cache_path / "reviews"

        if not reviews_path.exists():
            error_msg = (
                f"PeerRead reviews not found for {venue}/{split}. "
                f"Please download the dataset first using: "
                f"'python src/app/main.py --download-peerread-only' or "
                f"'make run_cli ARGS=\"--download-peerread-only\"'"
            )
            logger.error(error_msg)
            raise FileNotFoundError(error_msg)

        papers_data: list[dict[str, Any]] = []
        for json_file in reviews_path.glob("*.json"):
            try:
                with open(json_file, encoding="utf-8") as f:
                    papers_data.append(load(f))
            except Exception as e:
                logger.warning(f"Failed to load {json_file}: {e}")
                continue

        return self._validate_papers(papers_data)

    def get_paper_by_id(self, paper_id: str) -> PeerReadPaper | None:
        """Get a specific paper by ID.

        Args:
            paper_id: Paper identifier.

        Returns:
            PeerReadPaper if found, None otherwise.
        """
        # Search across all venues and splits in reviews directory
        for venue in self.config.venues:
            for split in self.config.splits:
                cache_path = (
                    self.cache_dir / venue / split / "reviews" / f"{paper_id}.json"
                )
                if cache_path.exists():
                    try:
                        with open(cache_path, encoding="utf-8") as f:
                            data: dict[str, Any] = load(f)
                        papers = self._validate_papers([data])
                        return papers[0] if papers else None
                    except Exception as e:
                        logger.warning(f"Failed to load paper {paper_id}: {e}")
                        continue
        return None

    def query_papers(
        self,
        venue: str | None = None,
        min_reviews: int = 1,
        limit: int | None = None,
    ) -> list[PeerReadPaper]:
        """Query papers with filters.

        Args:
            venue: Filter by venue (None for all venues).
            min_reviews: Minimum number of reviews required.
            limit: Maximum number of papers to return.

        Returns:
            List of filtered PeerReadPaper models.
        """
        all_papers: list[PeerReadPaper] = []
        venues_to_search = [venue] if venue else self.config.venues

        for search_venue in venues_to_search:
            for split in self.config.splits:
                try:
                    papers = self.load_papers(search_venue, split)
                    all_papers.extend(papers)
                except Exception as e:
                    logger.warning(f"Failed to load {search_venue}/{split}: {e}")
                    continue

        # Apply filters
        filtered_papers = [
            paper for paper in all_papers if len(paper.reviews) >= min_reviews
        ]

        # Apply limit
        if limit:
            filtered_papers = filtered_papers[:limit]

        return filtered_papers
Functions
__init__(config=None)

Initialize loader with configuration.

Parameters:

Name Type Description Default
config PeerReadConfig | None

PeerRead dataset configuration. Loads from file if None.

None
Source code in src/app/data_utils/datasets_peerread.py
def __init__(self, config: PeerReadConfig | None = None):
    """Initialize loader with configuration.

    Args:
        config: PeerRead dataset configuration. Loads from file if None.
    """
    self.config = config or load_peerread_config()
    # Resolve cache directory relative to project root
    self.cache_dir = resolve_project_path(self.config.cache_directory)
get_paper_by_id(paper_id)

Get a specific paper by ID.

Parameters:

Name Type Description Default
paper_id str

Paper identifier.

required

Returns:

Type Description
PeerReadPaper | None

PeerReadPaper if found, None otherwise.

Source code in src/app/data_utils/datasets_peerread.py
def get_paper_by_id(self, paper_id: str) -> PeerReadPaper | None:
    """Get a specific paper by ID.

    Args:
        paper_id: Paper identifier.

    Returns:
        PeerReadPaper if found, None otherwise.
    """
    # Search across all venues and splits in reviews directory
    for venue in self.config.venues:
        for split in self.config.splits:
            cache_path = (
                self.cache_dir / venue / split / "reviews" / f"{paper_id}.json"
            )
            if cache_path.exists():
                try:
                    with open(cache_path, encoding="utf-8") as f:
                        data: dict[str, Any] = load(f)
                    papers = self._validate_papers([data])
                    return papers[0] if papers else None
                except Exception as e:
                    logger.warning(f"Failed to load paper {paper_id}: {e}")
                    continue
    return None
get_raw_pdf_path(paper_id)

Get the absolute path to the raw PDF file for a given paper ID.

Parameters:

Name Type Description Default
paper_id str

Unique identifier for the paper.

required

Returns:

Name Type Description
str str | None

The absolute path to the PDF file, or None if not found.

Source code in src/app/data_utils/datasets_peerread.py
def get_raw_pdf_path(self, paper_id: str) -> str | None:
    """Get the absolute path to the raw PDF file for a given paper ID.

    Args:
        paper_id: Unique identifier for the paper.

    Returns:
        str: The absolute path to the PDF file, or None if not found.
    """
    for venue in self.config.venues:
        for split in self.config.splits:
            pdf_path = self.cache_dir / venue / split / "pdfs" / f"{paper_id}.pdf"
            if pdf_path.exists():
                return str(pdf_path)
    return None
load_papers(venue='acl_2017', split='train')

Load papers from cached data or download if needed.

Parameters:

Name Type Description Default
venue str

Conference venue.

'acl_2017'
split str

Data split.

'train'

Returns:

Type Description
list[PeerReadPaper]

List of validated PeerReadPaper models.

Raises:

Type Description
FileNotFoundError

If cache directory doesn’t exist and download fails.

Source code in src/app/data_utils/datasets_peerread.py
def load_papers(
    self,
    venue: str = "acl_2017",
    split: str = "train",
) -> list[PeerReadPaper]:
    """Load papers from cached data or download if needed.

    Args:
        venue: Conference venue.
        split: Data split.

    Returns:
        List of validated PeerReadPaper models.

    Raises:
        FileNotFoundError: If cache directory doesn't exist and download fails.
    """
    cache_path = self.cache_dir / venue / split

    if not cache_path.exists():
        error_msg = (
            f"PeerRead dataset not found for {venue}/{split}. "
            f"Please download the dataset first using: "
            f"'python src/app/main.py --download-peerread-only' or "
            f"'make run_cli ARGS=\"--download-peerread-only\"'"
        )
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    # Load all cached papers from reviews directory
    reviews_path = cache_path / "reviews"

    if not reviews_path.exists():
        error_msg = (
            f"PeerRead reviews not found for {venue}/{split}. "
            f"Please download the dataset first using: "
            f"'python src/app/main.py --download-peerread-only' or "
            f"'make run_cli ARGS=\"--download-peerread-only\"'"
        )
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    papers_data: list[dict[str, Any]] = []
    for json_file in reviews_path.glob("*.json"):
        try:
            with open(json_file, encoding="utf-8") as f:
                papers_data.append(load(f))
        except Exception as e:
            logger.warning(f"Failed to load {json_file}: {e}")
            continue

    return self._validate_papers(papers_data)
load_parsed_pdf_content(paper_id)

Load the text content from the parsed PDF for a given paper ID.

Assumes parsed PDF files are JSON and contain a ‘sections’ key with ‘text’ within. Defaults to the latest revision if multiple exist (by filename).

Parameters:

Name Type Description Default
paper_id str

Unique identifier for the paper.

required

Returns:

Name Type Description
str str | None

The extracted text content, or None if not found/parsed.

Source code in src/app/data_utils/datasets_peerread.py
def load_parsed_pdf_content(self, paper_id: str) -> str | None:
    """Load the text content from the parsed PDF for a given paper ID.

    Assumes parsed PDF files are JSON and contain a 'sections' key with 'text'
    within. Defaults to the latest revision if multiple exist (by filename).

    Args:
        paper_id: Unique identifier for the paper.

    Returns:
        str: The extracted text content, or None if not found/parsed.
    """
    for venue in self.config.venues:
        for split in self.config.splits:
            parsed_pdfs_path = self.cache_dir / venue / split / "parsed_pdfs"
            if parsed_pdfs_path.exists():
                # Find all parsed PDF files for this paper_id
                # Assuming filenames are like 'PAPER_ID.pdf.json'
                # If multiple revisions, we'll just take the first one found for now
                parsed_files = sorted(
                    parsed_pdfs_path.glob(f"{paper_id}.pdf.json"), reverse=True
                )
                if parsed_files:
                    latest_parsed_file = parsed_files[0]
                    try:
                        with open(latest_parsed_file, encoding="utf-8") as f:
                            parsed_data = load(f)

                        # Extract and concatenate text from all sections
                        full_text: list[str] = []
                        for section in parsed_data.get("metadata", {}).get(
                            "sections", []
                        ):
                            if "text" in section:
                                full_text.append(section["text"])
                        return "\n".join(full_text).strip()
                    except Exception as e:
                        logger.warning(
                            f"Failed to load/parse {latest_parsed_file}: {e}"
                        )
    return None
query_papers(venue=None, min_reviews=1, limit=None)

Query papers with filters.

Parameters:

Name Type Description Default
venue str | None

Filter by venue (None for all venues).

None
min_reviews int

Minimum number of reviews required.

1
limit int | None

Maximum number of papers to return.

None

Returns:

Type Description
list[PeerReadPaper]

List of filtered PeerReadPaper models.

Source code in src/app/data_utils/datasets_peerread.py
def query_papers(
    self,
    venue: str | None = None,
    min_reviews: int = 1,
    limit: int | None = None,
) -> list[PeerReadPaper]:
    """Query papers with filters.

    Args:
        venue: Filter by venue (None for all venues).
        min_reviews: Minimum number of reviews required.
        limit: Maximum number of papers to return.

    Returns:
        List of filtered PeerReadPaper models.
    """
    all_papers: list[PeerReadPaper] = []
    venues_to_search = [venue] if venue else self.config.venues

    for search_venue in venues_to_search:
        for split in self.config.splits:
            try:
                papers = self.load_papers(search_venue, split)
                all_papers.extend(papers)
            except Exception as e:
                logger.warning(f"Failed to load {search_venue}/{split}: {e}")
                continue

    # Apply filters
    filtered_papers = [
        paper for paper in all_papers if len(paper.reviews) >= min_reviews
    ]

    # Apply limit
    if limit:
        filtered_papers = filtered_papers[:limit]

    return filtered_papers

Functions

download_peerread_dataset(peerread_max_papers_per_sample_download=None)

Download PeerRead dataset and verify the download.

This function handles the setup phase separately from MAS execution, following Separation of Concerns principle. It downloads the dataset to the configured path and verifies the download was successful.

Parameters:

Name Type Description Default
peerread_max_papers_per_sample_download int | None

The maximum number of papers to download. If None, downloads all papers it can find.

None

Raises:

Type Description
Exception

If download or verification fails.

Source code in src/app/data_utils/datasets_peerread.py
def download_peerread_dataset(
    peerread_max_papers_per_sample_download: int | None = None,
) -> None:
    """
    Download PeerRead dataset and verify the download.

    This function handles the setup phase separately from MAS execution,
    following Separation of Concerns principle. It downloads the dataset
    to the configured path and verifies the download was successful.

    Args:
        peerread_max_papers_per_sample_download: The maximum number of papers to
            download. If None, downloads all papers it can find.

    Raises:
        Exception: If download or verification fails.
    """
    logger.info("Starting PeerRead dataset download (setup mode)")

    try:
        # Load configuration
        config = load_peerread_config()
        logger.info(
            f"Loaded PeerRead config: {len(config.venues)} venues, "
            f"{len(config.splits)} splits"
        )

        # Initialize downloader
        downloader = PeerReadDownloader(config)
        logger.info(f"Download target directory: {downloader.cache_dir}")

        # Track download statistics
        total_downloaded = 0
        failed_downloads: list[str] = []

        # Determine max papers to download
        max_papers = (
            peerread_max_papers_per_sample_download
            if peerread_max_papers_per_sample_download is not None
            else config.max_papers_per_query
        )

        # Download dataset for each venue/split combination
        for venue in config.venues:
            for split in config.splits:
                logger.info(f"Downloading {venue}/{split}...")
                result = downloader.download_venue_split(
                    venue, split, max_papers=max_papers
                )

                if result.success:
                    logger.info(
                        f"✓ {venue}/{split}: {result.papers_downloaded} downloaded"
                    )
                    total_downloaded += result.papers_downloaded
                else:
                    error_msg = f"✗ {venue}/{split}: {result.error_message}"
                    logger.error(error_msg)
                    failed_downloads.append(f"{venue}/{split}")

        # Verify download by attempting to load papers
        logger.info("Verifying download integrity...")
        loader = PeerReadLoader(config)

        verification_count = 0
        for venue in config.venues:
            for split in config.splits:
                try:
                    papers = loader.load_papers(venue, split)
                    verification_count += len(papers)
                    logger.info(
                        f"✓ Verified {venue}/{split}: {len(papers)} papers loaded"
                    )
                except Exception as e:
                    logger.error(f"✗ Verification failed for {venue}/{split}: {e}")
                    failed_downloads.append(f"{venue}/{split} (verification)")

        # Summary report
        logger.info("=== Download Summary ===")
        logger.info(f"Total papers downloaded: {total_downloaded}")
        logger.info(f"Total papers verified: {verification_count}")
        logger.info(f"Download directory: {downloader.cache_dir}")

        if failed_downloads:
            logger.warning(f"Failed downloads/verifications: {failed_downloads}")
            # Don't raise exception for partial failures - venue might not have data
            logger.warning(
                "Some downloads failed, but continuing (this may be expected)"
            )
            raise Exception(f"Failed to download from {len(failed_downloads)} sources.")

        if total_downloaded == 0 and verification_count == 0:
            raise Exception("No papers were downloaded or verified successfully")

        logger.info(
            "✓ PeerRead dataset download and verification completed successfully"
        )

    except Exception as e:
        error_msg = f"PeerRead dataset download failed: {e}"
        logger.error(error_msg)
        raise Exception(error_msg) from e

load_peerread_config()

Load PeerRead dataset configuration from config file.

Returns:

Name Type Description
PeerReadConfig PeerReadConfig

Validated configuration object.

Raises:

Type Description
FileNotFoundError

If config file doesn’t exist.

ValidationError

If config data is invalid.

Source code in src/app/data_utils/datasets_peerread.py
def load_peerread_config() -> PeerReadConfig:
    """Load PeerRead dataset configuration from config file.

    Returns:
        PeerReadConfig: Validated configuration object.

    Raises:
        FileNotFoundError: If config file doesn't exist.
        ValidationError: If config data is invalid.
    """
    # Get absolute path to config file
    ds_cfg_file_path = resolve_config_path(DATASETS_CONFIG_FILE)
    try:
        # Load as raw JSON data first
        with open(ds_cfg_file_path, encoding="utf-8") as f:
            data = load(f)
        return PeerReadConfig.model_validate(data["peerread"])
    except Exception as e:
        logger.error(f"Failed to load PeerRead config: {e}")
        raise

app.data_utils.review_loader

Review loading utilities for external evaluation system.

Classes

ReviewLoader

Loads MAS-generated reviews for external evaluation system.

Source code in src/app/data_utils/review_loader.py
class ReviewLoader:
    """Loads MAS-generated reviews for external evaluation system."""

    def __init__(self, reviews_dir: str = MAS_REVIEWS_PATH):
        """Initialize with reviews directory path.

        Args:
            reviews_dir: Directory containing review files
        """
        # ReviewPersistence will handle path resolution
        self.persistence = ReviewPersistence(reviews_dir)

    def load_review_for_paper(self, paper_id: str) -> PeerReadReview | None:
        """Load the latest review for a specific paper.

        Args:
            paper_id: Paper identifier

        Returns:
            PeerReadReview object if found, None otherwise
        """
        latest_file = self.persistence.get_latest_review(paper_id)
        if not latest_file:
            return None

        _, review = self.persistence.load_review(latest_file)
        return review

    def load_all_reviews(self) -> dict[str, PeerReadReview]:
        """Load all available reviews grouped by paper ID.

        Returns:
            dict: Mapping of paper_id -> latest PeerReadReview
        """
        reviews: dict[str, PeerReadReview] = {}

        # Get all review files
        all_files = self.persistence.list_reviews()

        # Group by paper ID and get latest for each
        paper_ids: set[str] = set()
        for filepath in all_files:
            filename = Path(filepath).stem
            paper_id: str = filename.split("_")[0]  # Extract paper_id from filename
            paper_ids.add(paper_id)

        # Load latest review for each paper
        for paper_id in paper_ids:
            review = self.load_review_for_paper(paper_id)
            if review:
                reviews[paper_id] = review

        return reviews

    def get_available_paper_ids(self) -> list[str]:
        """Get list of paper IDs that have reviews available.

        Returns:
            list: Paper identifiers with available reviews
        """
        all_files = self.persistence.list_reviews()
        paper_ids: set[str] = set()

        for filepath in all_files:
            filename = Path(filepath).stem
            paper_id: str = filename.split("_")[0]  # Extract paper_id from filename
            paper_ids.add(paper_id)

        return sorted(list(paper_ids))
Functions
__init__(reviews_dir=MAS_REVIEWS_PATH)

Initialize with reviews directory path.

Parameters:

Name Type Description Default
reviews_dir str

Directory containing review files

MAS_REVIEWS_PATH
Source code in src/app/data_utils/review_loader.py
def __init__(self, reviews_dir: str = MAS_REVIEWS_PATH):
    """Initialize with reviews directory path.

    Args:
        reviews_dir: Directory containing review files
    """
    # ReviewPersistence will handle path resolution
    self.persistence = ReviewPersistence(reviews_dir)
get_available_paper_ids()

Get list of paper IDs that have reviews available.

Returns:

Name Type Description
list list[str]

Paper identifiers with available reviews

Source code in src/app/data_utils/review_loader.py
def get_available_paper_ids(self) -> list[str]:
    """Get list of paper IDs that have reviews available.

    Returns:
        list: Paper identifiers with available reviews
    """
    all_files = self.persistence.list_reviews()
    paper_ids: set[str] = set()

    for filepath in all_files:
        filename = Path(filepath).stem
        paper_id: str = filename.split("_")[0]  # Extract paper_id from filename
        paper_ids.add(paper_id)

    return sorted(list(paper_ids))
load_all_reviews()

Load all available reviews grouped by paper ID.

Returns:

Name Type Description
dict dict[str, PeerReadReview]

Mapping of paper_id -> latest PeerReadReview

Source code in src/app/data_utils/review_loader.py
def load_all_reviews(self) -> dict[str, PeerReadReview]:
    """Load all available reviews grouped by paper ID.

    Returns:
        dict: Mapping of paper_id -> latest PeerReadReview
    """
    reviews: dict[str, PeerReadReview] = {}

    # Get all review files
    all_files = self.persistence.list_reviews()

    # Group by paper ID and get latest for each
    paper_ids: set[str] = set()
    for filepath in all_files:
        filename = Path(filepath).stem
        paper_id: str = filename.split("_")[0]  # Extract paper_id from filename
        paper_ids.add(paper_id)

    # Load latest review for each paper
    for paper_id in paper_ids:
        review = self.load_review_for_paper(paper_id)
        if review:
            reviews[paper_id] = review

    return reviews
load_review_for_paper(paper_id)

Load the latest review for a specific paper.

Parameters:

Name Type Description Default
paper_id str

Paper identifier

required

Returns:

Type Description
PeerReadReview | None

PeerReadReview object if found, None otherwise

Source code in src/app/data_utils/review_loader.py
def load_review_for_paper(self, paper_id: str) -> PeerReadReview | None:
    """Load the latest review for a specific paper.

    Args:
        paper_id: Paper identifier

    Returns:
        PeerReadReview object if found, None otherwise
    """
    latest_file = self.persistence.get_latest_review(paper_id)
    if not latest_file:
        return None

    _, review = self.persistence.load_review(latest_file)
    return review

app.data_utils.review_persistence

Review persistence interface for MAS and evaluation system integration.

Classes

ReviewPersistence

Handles saving and loading of MAS-generated reviews.

Source code in src/app/data_utils/review_persistence.py
class ReviewPersistence:
    """Handles saving and loading of MAS-generated reviews."""

    def __init__(self, reviews_dir: str = MAS_REVIEWS_PATH):
        """Initialize with reviews directory path.

        Args:
            reviews_dir: Directory to store review files
        """
        # Resolve reviews directory relative to src/app
        self.reviews_dir = resolve_app_path(reviews_dir)
        self.reviews_dir.mkdir(parents=True, exist_ok=True)

    def save_review(
        self, paper_id: str, review: PeerReadReview, timestamp: str | None = None
    ) -> str:
        """Save a review to the reviews directory.

        Args:
            paper_id: Unique identifier for the paper
            review: The generated review object
            timestamp: Optional timestamp, defaults to current UTC time

        Returns:
            str: Path to the saved review file
        """
        if timestamp is None:
            timestamp = datetime.now(UTC).strftime("%Y-%m-%dT%H-%M-%SZ")

        filename = f"{paper_id}_{timestamp}.json"
        filepath = self.reviews_dir / filename

        # Convert review to dict for JSON serialization
        review_data = {
            "paper_id": paper_id,
            "timestamp": timestamp,
            "review": review.model_dump(),
        }

        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(review_data, f, indent=2, ensure_ascii=False)

        return str(filepath)

    def load_review(self, filepath: str) -> tuple[str, PeerReadReview]:
        """Load a review from file.

        Args:
            filepath: Path to the review file

        Returns:
            tuple: (paper_id, PeerReadReview object)
        """
        with open(filepath, encoding="utf-8") as f:
            review_data = json.load(f)

        paper_id = review_data["paper_id"]
        review = PeerReadReview.model_validate(review_data["review"])

        return paper_id, review

    def list_reviews(self, paper_id: str | None = None) -> list[str]:
        """List available review files.

        Args:
            paper_id: Optional filter by paper ID

        Returns:
            list: Paths to matching review files
        """
        pattern = f"{paper_id}_*.json" if paper_id else "*.json"
        return [str(p) for p in self.reviews_dir.glob(pattern)]

    def get_latest_review(self, paper_id: str) -> str | None:
        """Get the most recent review file for a paper.

        Args:
            paper_id: Paper identifier

        Returns:
            str: Path to latest review file, or None if not found
        """
        reviews = self.list_reviews(paper_id)
        if not reviews:
            return None

        # Sort by timestamp in filename (newest first)
        reviews.sort(reverse=True)
        return reviews[0]
Functions
__init__(reviews_dir=MAS_REVIEWS_PATH)

Initialize with reviews directory path.

Parameters:

Name Type Description Default
reviews_dir str

Directory to store review files

MAS_REVIEWS_PATH
Source code in src/app/data_utils/review_persistence.py
def __init__(self, reviews_dir: str = MAS_REVIEWS_PATH):
    """Initialize with reviews directory path.

    Args:
        reviews_dir: Directory to store review files
    """
    # Resolve reviews directory relative to src/app
    self.reviews_dir = resolve_app_path(reviews_dir)
    self.reviews_dir.mkdir(parents=True, exist_ok=True)
get_latest_review(paper_id)

Get the most recent review file for a paper.

Parameters:

Name Type Description Default
paper_id str

Paper identifier

required

Returns:

Name Type Description
str str | None

Path to latest review file, or None if not found

Source code in src/app/data_utils/review_persistence.py
def get_latest_review(self, paper_id: str) -> str | None:
    """Get the most recent review file for a paper.

    Args:
        paper_id: Paper identifier

    Returns:
        str: Path to latest review file, or None if not found
    """
    reviews = self.list_reviews(paper_id)
    if not reviews:
        return None

    # Sort by timestamp in filename (newest first)
    reviews.sort(reverse=True)
    return reviews[0]
list_reviews(paper_id=None)

List available review files.

Parameters:

Name Type Description Default
paper_id str | None

Optional filter by paper ID

None

Returns:

Name Type Description
list list[str]

Paths to matching review files

Source code in src/app/data_utils/review_persistence.py
def list_reviews(self, paper_id: str | None = None) -> list[str]:
    """List available review files.

    Args:
        paper_id: Optional filter by paper ID

    Returns:
        list: Paths to matching review files
    """
    pattern = f"{paper_id}_*.json" if paper_id else "*.json"
    return [str(p) for p in self.reviews_dir.glob(pattern)]
load_review(filepath)

Load a review from file.

Parameters:

Name Type Description Default
filepath str

Path to the review file

required

Returns:

Name Type Description
tuple tuple[str, PeerReadReview]

(paper_id, PeerReadReview object)

Source code in src/app/data_utils/review_persistence.py
def load_review(self, filepath: str) -> tuple[str, PeerReadReview]:
    """Load a review from file.

    Args:
        filepath: Path to the review file

    Returns:
        tuple: (paper_id, PeerReadReview object)
    """
    with open(filepath, encoding="utf-8") as f:
        review_data = json.load(f)

    paper_id = review_data["paper_id"]
    review = PeerReadReview.model_validate(review_data["review"])

    return paper_id, review
save_review(paper_id, review, timestamp=None)

Save a review to the reviews directory.

Parameters:

Name Type Description Default
paper_id str

Unique identifier for the paper

required
review PeerReadReview

The generated review object

required
timestamp str | None

Optional timestamp, defaults to current UTC time

None

Returns:

Name Type Description
str str

Path to the saved review file

Source code in src/app/data_utils/review_persistence.py
def save_review(
    self, paper_id: str, review: PeerReadReview, timestamp: str | None = None
) -> str:
    """Save a review to the reviews directory.

    Args:
        paper_id: Unique identifier for the paper
        review: The generated review object
        timestamp: Optional timestamp, defaults to current UTC time

    Returns:
        str: Path to the saved review file
    """
    if timestamp is None:
        timestamp = datetime.now(UTC).strftime("%Y-%m-%dT%H-%M-%SZ")

    filename = f"{paper_id}_{timestamp}.json"
    filepath = self.reviews_dir / filename

    # Convert review to dict for JSON serialization
    review_data = {
        "paper_id": paper_id,
        "timestamp": timestamp,
        "review": review.model_dump(),
    }

    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(review_data, f, indent=2, ensure_ascii=False)

    return str(filepath)

Functions

app.evals.metrics

Functions

output_similarity(agent_output, expected_answer)

Determine to what degree the agent’s output matches the expected answer.

Parameters:

Name Type Description Default
agent_output str

The output produced by the agent.

required
expected_answer str

The correct or expected answer.

required

Returns:

Name Type Description
bool bool

True if the output matches the expected answer, False otherwise.

Source code in src/app/evals/metrics.py
def output_similarity(agent_output: str, expected_answer: str) -> bool:
    """
    Determine to what degree the agent's output matches the expected answer.

    Args:
        agent_output (str): The output produced by the agent.
        expected_answer (str): The correct or expected answer.

    Returns:
        bool: True if the output matches the expected answer, False otherwise.
    """

    # TODO score instead of bool
    return agent_output.strip() == expected_answer.strip()

time_taken(start_time, end_time)

Calculate duration between start and end timestamps

Parameters:

Name Type Description Default
start_time float

Timestamp when execution started

required
end_time float

Timestamp when execution completed

required

Returns:

Type Description
float

Duration in seconds with microsecond precision

Source code in src/app/evals/metrics.py
def time_taken(start_time: float, end_time: float) -> float:
    """Calculate duration between start and end timestamps

    Args:
        start_time: Timestamp when execution started
        end_time: Timestamp when execution completed

    Returns:
        Duration in seconds with microsecond precision
    """

    # TODO implement
    return end_time - start_time

app.evals.peerread_evaluation

PeerRead evaluation utilities for comparing agent reviews against ground truth.

This module provides functionality to evaluate agent-generated scientific paper reviews against the peer reviews in the PeerRead dataset. It includes similarity metrics and structured comparison results.

Classes

Functions

calculate_cosine_similarity(text1, text2)

Calculate cosine similarity between two text strings.

Parameters:

Name Type Description Default
text1 str

First text string.

required
text2 str

Second text string.

required

Returns:

Type Description
float

Cosine similarity score (0-1).

Source code in src/app/evals/peerread_evaluation.py
def calculate_cosine_similarity(text1: str, text2: str) -> float:
    """Calculate cosine similarity between two text strings.

    Args:
        text1: First text string.
        text2: Second text string.

    Returns:
        Cosine similarity score (0-1).
    """
    # Simple implementation using word overlap
    # In production, use proper embeddings or TF-IDF
    words1 = set(re.findall(r"\w+", text1.lower()))
    words2 = set(re.findall(r"\w+", text2.lower()))

    if not words1 or not words2:
        return 0.0

    intersection = len(words1 & words2)
    union = len(words1 | words2)

    if union == 0:
        return 0.0

    return intersection / union

calculate_jaccard_similarity(text1, text2)

Calculate Jaccard similarity between two text strings.

Parameters:

Name Type Description Default
text1 str

First text string.

required
text2 str

Second text string.

required

Returns:

Type Description
float

Jaccard similarity score (0-1).

Source code in src/app/evals/peerread_evaluation.py
def calculate_jaccard_similarity(text1: str, text2: str) -> float:
    """Calculate Jaccard similarity between two text strings.

    Args:
        text1: First text string.
        text2: Second text string.

    Returns:
        Jaccard similarity score (0-1).
    """
    words1 = set(re.findall(r"\w+", text1.lower()))
    words2 = set(re.findall(r"\w+", text2.lower()))

    if not words1 and not words2:
        return 1.0

    intersection = len(words1 & words2)
    union = len(words1 | words2)

    return intersection / union if union > 0 else 0.0

create_evaluation_result(paper_id, agent_review, ground_truth_reviews)

Create evaluation result comparing agent review to ground truth.

Parameters:

Name Type Description Default
paper_id str

Paper identifier.

required
agent_review str

Review generated by agent.

required
ground_truth_reviews list[PeerReadReview]

Original peer reviews.

required

Returns:

Type Description
PeerReadEvalResult

PeerReadEvalResult with similarity metrics.

Source code in src/app/evals/peerread_evaluation.py
def create_evaluation_result(
    paper_id: str,
    agent_review: str,
    ground_truth_reviews: list[PeerReadReview],
) -> PeerReadEvalResult:
    """Create evaluation result comparing agent review to ground truth.

    Args:
        paper_id: Paper identifier.
        agent_review: Review generated by agent.
        ground_truth_reviews: Original peer reviews.

    Returns:
        PeerReadEvalResult with similarity metrics.
    """
    # Calculate similarity against all ground truth reviews
    similarities: list[float] = []
    for gt_review in ground_truth_reviews:
        sim = evaluate_review_similarity(agent_review, gt_review.comments)
        similarities.append(sim)

    overall_similarity = max(similarities) if similarities else 0.0

    # Simple recommendation matching (could be more sophisticated)
    agent_sentiment = "positive" if "good" in agent_review.lower() else "negative"
    gt_recommendations = [float(r.recommendation) for r in ground_truth_reviews]

    if len(gt_recommendations) == 0:
        # No ground truth to compare - default to False
        recommendation_match = False
    else:
        avg_gt_recommendation = sum(gt_recommendations) / len(gt_recommendations)
        recommendation_match = (
            agent_sentiment == "positive" and avg_gt_recommendation >= 3.0
        ) or (agent_sentiment == "negative" and avg_gt_recommendation < 3.0)

    return PeerReadEvalResult(
        paper_id=paper_id,
        agent_review=agent_review,
        ground_truth_reviews=ground_truth_reviews,
        similarity_scores={
            "cosine": max(
                [
                    calculate_cosine_similarity(agent_review, r.comments)
                    for r in ground_truth_reviews
                ],
                default=0.0,
            ),
            "jaccard": max(
                [
                    calculate_jaccard_similarity(agent_review, r.comments)
                    for r in ground_truth_reviews
                ],
                default=0.0,
            ),
        },
        overall_similarity=overall_similarity,
        recommendation_match=recommendation_match,
    )

evaluate_review_similarity(agent_review, ground_truth)

Evaluate similarity between agent review and ground truth.

Parameters:

Name Type Description Default
agent_review str

Review text generated by agent.

required
ground_truth str

Ground truth review text.

required

Returns:

Type Description
float

Weighted similarity score (0-1).

Source code in src/app/evals/peerread_evaluation.py
def evaluate_review_similarity(agent_review: str, ground_truth: str) -> float:
    """Evaluate similarity between agent review and ground truth.

    Args:
        agent_review: Review text generated by agent.
        ground_truth: Ground truth review text.

    Returns:
        Weighted similarity score (0-1).
    """
    # Simple implementation - in production, use semantic embeddings
    cosine_sim = calculate_cosine_similarity(agent_review, ground_truth)
    jaccard_sim = calculate_jaccard_similarity(agent_review, ground_truth)

    # Weighted combination (weights from config)
    config = load_peerread_config()
    cosine_weight = config.similarity_metrics["cosine_weight"]
    jaccard_weight = config.similarity_metrics["jaccard_weight"]

    # For now, use only cosine and jaccard (semantic would require embeddings)
    total_weight = cosine_weight + jaccard_weight

    return (cosine_sim * cosine_weight + jaccard_sim * jaccard_weight) / total_weight

app.utils.error_messages

Error message utilities for the Agents-eval application.

This module provides concise helper functions for generating standardized error messages related to configuration loading and validation.

Functions

api_connection_error(error)

Generate a error message for API connection error.

Source code in src/app/utils/error_messages.py
def api_connection_error(error: str) -> str:
    """
    Generate a error message for API connection error.
    """
    return f"API connection error: {error}"

failed_to_load_config(error)

Generate a error message for configuration loading failure.

Source code in src/app/utils/error_messages.py
def failed_to_load_config(error: str) -> str:
    """
    Generate a error message for configuration loading failure.
    """
    return f"Failed to load config: {error}"

file_not_found(file_path)

Generate an error message for a missing configuration file.

Source code in src/app/utils/error_messages.py
def file_not_found(file_path: str | Path) -> str:
    """
    Generate an error message for a missing configuration file.
    """
    return f"File not found: {file_path}"

generic_exception(error)

Generate a generic error message.

Source code in src/app/utils/error_messages.py
def generic_exception(error: str) -> str:
    """
    Generate a generic error message.
    """
    return f"Exception: {error}"

get_key_error(error)

Generate a generic error message.

Source code in src/app/utils/error_messages.py
def get_key_error(error: str) -> str:
    """
    Generate a generic error message.
    """
    return f"Key Error: {error}"

invalid_data_model_format(error)

Generate an error message for invalid pydantic data model format.

Source code in src/app/utils/error_messages.py
def invalid_data_model_format(error: str) -> str:
    """
    Generate an error message for invalid pydantic data model format.
    """
    return f"Invalid pydantic data model format: {error}"

invalid_json(error)

Generate an error message for invalid JSON in a configuration file.

Source code in src/app/utils/error_messages.py
def invalid_json(error: str) -> str:
    """
    Generate an error message for invalid JSON in a configuration file.
    """
    return f"Invalid JSON: {error}"

invalid_type(expected_type, actual_type)

Generate an error message for invalid Type.

Source code in src/app/utils/error_messages.py
def invalid_type(expected_type: str, actual_type: str) -> str:
    """
    Generate an error message for invalid Type.
    """
    return f"Type Error: Expected {expected_type}, got {actual_type} instead."

app.utils.load_configs

Configuration loading utilities.

Provides a generic function for loading and validating JSON configuration files against Pydantic models, with error handling and logging support.

Functions

load_config(config_path, data_model)

Generic configuration loader that validates against any Pydantic model.

Parameters:

Name Type Description Default
config_path str | Path

Path to the JSON configuration file

required
model

Pydantic model class for validation

required

Returns:

Type Description
BaseModel

Validated configuration instance

Source code in src/app/utils/load_configs.py
def load_config(config_path: str | Path, data_model: type[BaseModel]) -> BaseModel:
    """
    Generic configuration loader that validates against any Pydantic model.

    Args:
        config_path: Path to the JSON configuration file
        model: Pydantic model class for validation

    Returns:
        Validated configuration instance
    """

    try:
        with open(config_path, encoding="utf-8") as f:
            data = json.load(f)
        return data_model.model_validate(data)
    except FileNotFoundError as e:
        msg = file_not_found(config_path)
        logger.error(msg)
        raise FileNotFoundError(msg) from e
    except json.JSONDecodeError as e:
        msg = invalid_json(str(e))
        logger.error(msg)
        raise ValueError(msg) from e
    except ValidationError as e:
        msg = invalid_data_model_format(str(e))
        logger.error(msg)
        raise ValidationError(msg) from e
    except Exception as e:
        msg = failed_to_load_config(str(e))
        logger.exception(msg)
        raise Exception(msg) from e

app.utils.load_settings

Utility functions and classes for loading application settings and configuration.

This module defines the AppEnv class for managing environment variables using Pydantic, and provides a function to load and validate application configuration from a JSON file.

Classes

AppEnv

Bases: BaseSettings

Application environment settings loaded from environment variables or .env file.

This class uses Pydantic’s BaseSettings to manage API keys and configuration for various inference endpoints, tools, and logging/monitoring services. Environment variables are loaded from a .env file by default.

Source code in src/app/utils/load_settings.py
class AppEnv(BaseSettings):
    """
    Application environment settings loaded from environment variables or .env file.

    This class uses Pydantic's BaseSettings to manage API keys and configuration
    for various inference endpoints, tools, and logging/monitoring services.
    Environment variables are loaded from a .env file by default.
    """

    # Inference endpoints
    GEMINI_API_KEY: str = ""
    GITHUB_API_KEY: str = ""
    GROK_API_KEY: str = ""
    HUGGINGFACE_API_KEY: str = ""
    OPENROUTER_API_KEY: str = ""
    PERPLEXITY_API_KEY: str = ""
    RESTACK_API_KEY: str = ""
    TOGETHER_API_KEY: str = ""

    # Tools
    TAVILY_API_KEY: str = ""

    # Logging/Monitoring/Tracing
    AGENTOPS_API_KEY: str = ""
    LOGFIRE_TOKEN: str = ""
    WANDB_API_KEY: str = ""

    model_config = SettingsConfigDict(
        env_file=".env", env_file_encoding="utf-8", extra="ignore"
    )

Functions

load_config(config_path)

Load and validate application configuration from a JSON file.

Parameters:

Name Type Description Default
config_path str

Path to the JSON configuration file.

required

Returns:

Name Type Description
ChatConfig ChatConfig

An instance of ChatConfig with validated configuration data.

Raises:

Type Description
FileNotFoundError

If the configuration file does not exist.

JSONDecodeError

If the file contains invalid JSON.

Exception

For any other unexpected errors during loading or validation.

Source code in src/app/utils/load_settings.py
def load_config(config_path: str | Path) -> ChatConfig:
    """
    Load and validate application configuration from a JSON file.

    Args:
        config_path (str): Path to the JSON configuration file.

    Returns:
        ChatConfig: An instance of ChatConfig with validated configuration data.

    Raises:
        FileNotFoundError: If the configuration file does not exist.
        json.JSONDecodeError: If the file contains invalid JSON.
        Exception: For any other unexpected errors during loading or validation.
    """

    try:
        with open(config_path) as f:
            config_data = json.load(f)
    except FileNotFoundError as e:
        msg = file_not_found(config_path)
        logger.error(msg)
        raise FileNotFoundError(msg) from e
    except json.JSONDecodeError as e:
        msg = invalid_json(str(e))
        logger.error(msg)
        raise json.JSONDecodeError(msg, str(config_path), 0) from e
    except Exception as e:
        msg = failed_to_load_config(str(e))
        logger.exception(msg)
        raise Exception(msg) from e

    return ChatConfig.model_validate(config_data)

app.utils.log

Set up the logger with custom settings. Logs are written to a file with automatic rotation.

app.utils.login

This module provides utility functions for managing login state and initializing the environment for a given project. It includes functionality to load and save login state, perform a one-time login, and check if the user is logged in.

Classes

Functions

login(project_name, chat_env_config)

Logs in to the workspace and initializes the environment for the given project. Args: project_name (str): The name of the project to initialize. chat_env_config (AppEnv): The application environment configuration containing the API keys. Returns: None

Source code in src/app/utils/login.py
def login(project_name: str, chat_env_config: AppEnv):
    """
    Logs in to the workspace and initializes the environment for the given project.
    Args:
        project_name (str): The name of the project to initialize.
        chat_env_config (AppEnv): The application environment configuration
            containing the API keys.
    Returns:
        None
    """

    try:
        logger.info(f"Logging in to the workspaces for project: {project_name}")
        is_api_key, api_key_msg = get_api_key("AGENTOPS", chat_env_config)
        if is_api_key:
            # TODO agentops log to local file
            environ["AGENTOPS_LOGGING_TO_FILE"] = "FALSE"
            agentops_init(
                default_tags=[project_name],
                api_key=api_key_msg,
            )
        is_api_key, api_key_msg = get_api_key("LOGFIRE", chat_env_config)
        if is_api_key:
            logfire_conf(token=api_key_msg)
        is_api_key, api_key_msg = get_api_key("WANDB", chat_env_config)
        if is_api_key:
            wandb_login(key=api_key_msg)
            weave_init(project_name)
    except Exception as e:
        msg = generic_exception(str(e))
        logger.exception(e)
        raise Exception(msg) from e
    finally:
        api_key_msg = ""

app.utils.paths

Centralized path resolution utilities for the application.

Functions

get_app_root()

Get the application root directory (src/app).

Returns:

Name Type Description
Path Path

Absolute path to the src/app directory.

Source code in src/app/utils/paths.py
def get_app_root() -> Path:
    """Get the application root directory (src/app).

    Returns:
        Path: Absolute path to the src/app directory.
    """

    return Path(__file__).parent.parent

get_config_dir()

Get the application config directory (src/app/config).

Returns:

Name Type Description
Path Path

Absolute path to the src/app/config directory.

Source code in src/app/utils/paths.py
def get_config_dir() -> Path:
    """Get the application config directory (src/app/config).

    Returns:
        Path: Absolute path to the src/app/config directory.
    """
    return get_app_root() / CONFIGS_PATH

get_project_root()

Get the project root directory.

Returns:

Name Type Description
Path Path

Absolute path to the project root directory.

Source code in src/app/utils/paths.py
def get_project_root() -> Path:
    """Get the project root directory.

    Returns:
        Path: Absolute path to the project root directory.
    """
    return get_app_root().parent.parent

get_review_template_path()

Get the path to the review template file.

Returns:

Name Type Description
Path Path

Absolute path to the REVIEW_PROMPT_TEMPLATE file.

Source code in src/app/utils/paths.py
def get_review_template_path() -> Path:
    """Get the path to the review template file.

    Returns:
        Path: Absolute path to the REVIEW_PROMPT_TEMPLATE file.
    """
    return get_config_dir() / REVIEW_PROMPT_TEMPLATE

resolve_app_path(relative_path)

Resolve a path relative to the application root.

Parameters:

Name Type Description Default
relative_path str

Path relative to src/app directory.

required

Returns:

Name Type Description
Path Path

Absolute path resolved from the application root.

Example

resolve_app_path(“datasets/peerread”) -> /full/path/to/src/app/datasets/peerread

Source code in src/app/utils/paths.py
def resolve_app_path(relative_path: str) -> Path:
    """Resolve a path relative to the application root.

    Args:
        relative_path: Path relative to src/app directory.

    Returns:
        Path: Absolute path resolved from the application root.

    Example:
        resolve_app_path("datasets/peerread") -> /full/path/to/src/app/datasets/peerread
    """

    return get_app_root() / relative_path

resolve_config_path(filename)

Resolve a config file path within the config directory.

Parameters:

Name Type Description Default
filename str

Name of the config file (e.g., “config_chat.json”).

required

Returns:

Name Type Description
Path Path

Absolute path to the config file.

Example

resolve_config_path(“config_chat.json”) -> /full/path/to/src/app/config/config_chat.json

Source code in src/app/utils/paths.py
def resolve_config_path(filename: str) -> Path:
    """Resolve a config file path within the config directory.

    Args:
        filename: Name of the config file (e.g., "config_chat.json").

    Returns:
        Path: Absolute path to the config file.

    Example:
        resolve_config_path("config_chat.json") ->
        /full/path/to/src/app/config/config_chat.json
    """
    return get_config_dir() / filename

resolve_project_path(relative_path)

Resolve a path relative to the project root.

Parameters:

Name Type Description Default
relative_path str

Path relative to the project root directory.

required

Returns:

Name Type Description
Path Path

Absolute path resolved from the project root.

Source code in src/app/utils/paths.py
def resolve_project_path(relative_path: str) -> Path:
    """Resolve a path relative to the project root.

    Args:
        relative_path: Path relative to the project root directory.

    Returns:
        Path: Absolute path resolved from the project root.
    """
    return get_project_root() / relative_path

app.utils.utils

This module provides utility functions and context managers for handling configurations, error handling, and setting up agent environments.

Functions:

Name Description
load_config

str) -> Config: Load and validate configuration from a JSON file.

print_research_Result

Dict, usage: Usage) -> None: Output structured summary of the research topic.

error_handling_context

str, console: Console = None): Context manager for handling errors during operations.

setup_agent_env

Config, console: Console = None) -> AgentConfig: Set up the agent environment based on the provided configuration.

Classes

Functions

log_research_result(summary, usage)

Prints the research summary and usage details in a formatted manner.

Parameters:

Name Type Description Default
summary Dict

A dictionary containing the research summary with keys ‘topic’, ‘key_points’, ‘key_points_explanation’, and ‘conclusion’.

required
usage Usage

An object containing usage details to be printed.

required
Source code in src/app/utils/utils.py
def log_research_result(summary: ResearchSummary, usage: Usage) -> None:
    """
    Prints the research summary and usage details in a formatted manner.

    Args:
        summary (Dict): A dictionary containing the research summary with keys 'topic',
            'key_points', 'key_points_explanation', and 'conclusion'.
        usage (Usage): An object containing usage details to be printed.
    """

    logger.info(f"\n=== Research Summary: {summary.topic} ===")
    logger.info("\nKey Points:")
    for i, point in enumerate(summary.key_points, 1):
        logger.info(f"{i}. {point}")
    logger.info("\nKey Points Explanation:")
    for i, point in enumerate(summary.key_points_explanation, 1):
        logger.info(f"{i}. {point}")
    logger.info(f"\nConclusion: {summary.conclusion}")
    logger.info(f"\nResponse structure: {list(dict(summary).keys())}")
    logger.info(usage)

examples.run_simple_agent_no_tools

A simple example of using a Pydantic AI agent to generate a structured summary of a research topic.

Functions

main()

Main function to run the research agent.

Source code in src/examples/run_simple_agent_no_tools.py
def main():
    """Main function to run the research agent."""

    config_path = path.join(path.dirname(__file__), CONFIG_FILE)
    config = load_config(config_path)

    provider = input("Which inference provider to use? ")
    topic = input("What topic would you like to research? ")

    api_key = get_api_key(provider)
    provider_config = get_provider_config(provider, config)

    result = get_research(topic, config.prompts, provider, provider_config, api_key)
    print_research_Result(result.data, result.usage())

examples.run_simple_agent_system

This example demonstrates how to run a simple agent system that consists of a manager agent, a research agent, and an analysis agent. The manager agent delegates research and analysis tasks to the corresponding agents and combines the results to provide a comprehensive answer to the user query. https://ai.pydantic.dev/multi-agent-applications/#agent-delegation

Classes

Functions

get_manager(model_manager, model_researcher, model_analyst, prompts)

Get the agents for the system.

Source code in src/examples/run_simple_agent_system.py
def get_manager(
    model_manager: OpenAIModel,
    model_researcher: OpenAIModel,
    model_analyst: OpenAIModel,
    prompts: dict[str, str],
) -> SystemAgent:
    """Get the agents for the system."""
    researcher = SystemAgent(
        model_researcher,
        ResearchResult,
        prompts["system_prompt_researcher"],
        [duckduckgo_search_tool()],
    )
    analyst = SystemAgent(
        model_analyst, AnalysisResult, prompts["system_prompt_analyst"]
    )
    manager = SystemAgent(
        model_manager, ResearchResult, prompts["system_prompt_manager"]
    )
    add_tools_to_manager_agent(manager, researcher, analyst)
    return manager

get_models(model_config)

Get the models for the system agents.

Source code in src/examples/run_simple_agent_system.py
def get_models(model_config: dict) -> tuple[OpenAIModel]:
    """Get the models for the system agents."""
    model_researcher = create_model(**model_config)
    model_analyst = create_model(**model_config)
    model_manager = create_model(**model_config)
    return model_researcher, model_analyst, model_manager

main() async

Main function to run the research system.

Source code in src/examples/run_simple_agent_system.py
async def main():
    """Main function to run the research system."""

    provider = input("Which inference provider to use? ")
    query = input("What would you like to research? ")

    config_path = path.join(path.dirname(__file__), CONFIG_FILE)
    config = load_config(config_path)

    api_key = get_api_key(provider)
    provider_config = get_provider_config(provider, config)
    usage_limits = UsageLimits(request_limit=10, total_tokens_limit=4000)

    model_config = {
        "base_url": provider_config["base_url"],
        "model_name": provider_config["model_name"],
        "api_key": api_key,
        "provider": provider,
    }
    manager = get_manager(*get_models(model_config), config.prompts)

    print(f"\nResearching: {query}...")

    try:
        result = await manager.run(query, usage_limits=usage_limits)
    except (UnexpectedModelBehavior, UnprocessableEntityError) as e:
        print(f"Error: Model returned unexpected result: {e}")
    except UsageLimitExceeded as e:
        print(f"Usage limit exceeded: {e}")
    else:
        print("\nFindings:", {result.data.findings})
        print(f"Sources: {result.data.sources}")
        print("\nUsage statistics:")
        print(result.usage())

examples.run_simple_agent_tools

Run the dice game agent using simple tools.

Functions

main()

Run the dice game agent.

Source code in src/examples/run_simple_agent_tools.py
def main():
    """Run the dice game agent."""

    provider = input("Which inference provider to use? ")
    player_name = input("Enter your name: ")
    guess = input("Guess a number between 1 and 6: ")

    config_path = path.join(path.dirname(__file__), CONFIG_FILE)
    config = load_config(config_path)

    api_key = get_api_key(provider)
    provider_config = get_provider_config(provider, config)

    result = get_dice(
        player_name, guess, system_prompt, provider, api_key, provider_config
    )
    print(result.data)
    print(f"{result._result_tool_name=}")
    print(result.usage())

examples.utils.agent_simple_no_tools

This module contains a function to create a research agent with the specified model, result type, and system prompt.

Classes

Functions

get_research(topic, prompts, provider, provider_config, api_key)

Run the research agent to generate a structured summary of a research topic.

Source code in src/examples/utils/agent_simple_no_tools.py
def get_research(
    topic: str,
    prompts: dict[str, str],
    provider: str,
    provider_config: Config,
    api_key: str,
) -> AgentRunResult:
    """Run the research agent to generate a structured summary of a research topic."""

    model = create_model(
        provider_config["base_url"], provider_config["model_name"], api_key, provider
    )
    agent = _create_research_agent(model, ResearchSummary, prompts["system_prompt"])

    print(f"\nResearching {topic}...")
    try:
        result = agent.run_sync(f"{prompts['user_prompt']} {topic}")
    except APIConnectionError as e:
        print(f"Error connecting to API: {e}")
        exit()
    except Exception as e:
        print(f"Error connecting to API: {e}")
        exit()
    else:
        return result

examples.utils.agent_simple_system

This module contains a simple system of agents that can be used to research and analyze data.

Classes

SystemAgent

Bases: Agent

A generic system agent that can be used to research and analyze data.

Source code in src/examples/utils/agent_simple_system.py
class SystemAgent(Agent):
    """A generic system agent that can be used to research and analyze data."""

    def __init__(
        self,
        model: OpenAIModel,
        result_type: ResearchResult | AnalysisResult,
        system_prompt: str,
        result_retries: int = 3,
        tools: list | None = [],
    ):
        super().__init__(
            model,
            result_type=result_type,
            system_prompt=system_prompt,
            result_retries=result_retries,
            tools=tools,
        )

Functions

add_tools_to_manager_agent(manager_agent, research_agent, analysis_agent)

Create and configure the joke generation agent.

Source code in src/examples/utils/agent_simple_system.py
def add_tools_to_manager_agent(
    manager_agent: SystemAgent, research_agent: SystemAgent, analysis_agent: SystemAgent
) -> None:
    """Create and configure the joke generation agent."""

    @manager_agent.tool
    async def delegate_research(ctx: RunContext[None], query: str) -> ResearchResult:
        """Delegate research task to ResearchAgent."""
        result = await research_agent.run(query, usage=ctx.usage)
        return result.data

    @manager_agent.tool
    async def delegate_analysis(ctx: RunContext[None], data: str) -> AnalysisResult:
        """Delegate analysis task to AnalysisAgent."""
        result = await analysis_agent.run(data, usage=ctx.usage)
        return result.data

examples.utils.agent_simple_tools

Simple agent for the dice game example.

Functions

get_dice(player_name, guess, system_prompt, provider, api_key, config)

Run the dice game agent.

Source code in src/examples/utils/agent_simple_tools.py
def get_dice(
    player_name: str,
    guess: str,
    system_prompt: str,
    provider: str,
    api_key: str,
    config: dict,
) -> AgentRunResult:
    """Run the dice game agent."""

    model = create_model(config["base_url"], config["model_name"], api_key, provider)
    agent = _DiceGameAgent(model, system_prompt)

    try:
        # usage_limits=UsageLimits(request_limit=5, total_tokens_limit=300),
        result = agent.run_sync(f"Player is guessing {guess}...", deps=player_name)
    except APIConnectionError as e:
        print(f"Error connecting to API: {e}")
        exit()
    except Exception as e:
        print(f"Error connecting to API: {e}")
        exit()
    else:
        return result

examples.utils.data_models

Example of a module with data models

Classes

AnalysisResult

Bases: BaseModel

Analysis results from the analysis agent.

Source code in src/examples/utils/data_models.py
class AnalysisResult(BaseModel):
    """Analysis results from the analysis agent."""

    insights: list[str]
    recommendations: list[str]

Config

Bases: BaseModel

Configuration settings for the research agent and model providers

Source code in src/examples/utils/data_models.py
class Config(BaseModel):
    """Configuration settings for the research agent and model providers"""

    providers: dict[str, ProviderConfig]
    prompts: dict[str, str]

ProviderConfig

Bases: BaseModel

Configuration for a model provider

Source code in src/examples/utils/data_models.py
class ProviderConfig(BaseModel):
    """Configuration for a model provider"""

    model_name: str
    base_url: str

ResearchResult

Bases: BaseModel

Research results from the research agent.

Source code in src/examples/utils/data_models.py
class ResearchResult(BaseModel):
    """Research results from the research agent."""

    topic: str
    findings: list[str]
    sources: list[str]

ResearchSummary

Bases: BaseModel

Expected model response of research on a topic

Source code in src/examples/utils/data_models.py
class ResearchSummary(BaseModel):
    """Expected model response of research on a topic"""

    topic: str
    key_points: list[str]
    key_points_explanation: list[str]
    conclusion: str

examples.utils.tools

Example tools for the utils example.

Functions

get_player_name(ctx)

Get the player’s name from the context.

Source code in src/examples/utils/tools.py
def get_player_name(ctx: RunContext[str]) -> str:
    """Get the player's name from the context."""
    return ctx.deps

roll_die()

Tool to roll a die.

Source code in src/examples/utils/tools.py
def roll_die() -> str:
    """Tool to roll a die."""

    async def _execute(self) -> str:
        """Roll the die and return the result."""
        return str(randint(1, 6))

examples.utils.utils

Utility functions for running the research agent example.

Classes

Functions

create_model(base_url, model_name, api_key=None, provider=None)

Create a model that uses base_url as inference API

Source code in src/examples/utils/utils.py
def create_model(
    base_url: str,
    model_name: str,
    api_key: str | None = None,
    provider: str | None = None,
) -> OpenAIModel:
    """Create a model that uses base_url as inference API"""

    if api_key is None and not provider.lower() == "ollama":
        raise ValueError("API key is required for model.")
        exit()
    else:
        return OpenAIModel(
            model_name, provider=OpenAIProvider(base_url=base_url, api_key=api_key)
        )

get_api_key(provider)

Retrieve API key from environment variable.

Source code in src/examples/utils/utils.py
def get_api_key(provider: str) -> str | None:
    """Retrieve API key from environment variable."""

    # TODO replace with pydantic-settings ?
    load_dotenv()

    if provider.lower() == "ollama":
        return None
    else:
        return getenv(f"{provider.upper()}{API_SUFFIX}")

get_provider_config(provider, config)

Retrieve configuration settings for the specified provider.

Source code in src/examples/utils/utils.py
def get_provider_config(provider: str, config: Config) -> dict[str, str]:
    """Retrieve configuration settings for the specified provider."""

    try:
        model_name = config.providers[provider].model_name
        base_url = config.providers[provider].base_url
    except KeyError as e:
        raise ValueError(f"Missing configuration for {provider}: {e}.")
        exit()
    except Exception as e:
        raise Exception(f"Error loading provider configuration: {e}")
        exit()
    else:
        return {
            "model_name": model_name,
            "base_url": base_url,
        }

load_config(config_path)

Load and validate configuration from a JSON file.

Source code in src/examples/utils/utils.py
def load_config(config_path: str) -> Config:
    """Load and validate configuration from a JSON file."""

    try:
        with open(config_path) as file:
            config_data = load(file)
        config = Config.model_validate(config_data)
    except FileNotFoundError:
        raise FileNotFoundError(f"Configuration file not found: {config_path}")
        exit()
    except ValidationError as e:
        raise ValueError(f"Invalid configuration format: {e}")
        exit()
    except Exception as e:
        raise Exception(f"Error loading configuration: {e}")
        exit()
    else:
        return config

print_research_Result(summary, usage)

Output structured summary of the research topic.

Source code in src/examples/utils/utils.py
def print_research_Result(summary: dict, usage: Usage) -> None:
    """Output structured summary of the research topic."""

    print(f"\n=== Research Summary: {summary.topic} ===")
    print("\nKey Points:")
    for i, point in enumerate(summary.key_points, 1):
        print(f"{i}. {point}")
    print("\nKey Points Explanation:")
    for i, point in enumerate(summary.key_points_explanation, 1):
        print(f"{i}. {point}")
    print(f"\nConclusion: {summary.conclusion}")

    print(f"\nResponse structure: {list(dict(summary).keys())}")
    print(usage)

gui.components.footer

Functions

Render the page footer.

Source code in src/gui/components/footer.py
4
5
6
7
def render_footer(footer_caption: str):
    """Render the page footer."""
    divider()
    caption(footer_caption)

gui.components.header

Functions

render_header(header_title)

Render the page header with title.

Source code in src/gui/components/header.py
4
5
6
7
def render_header(header_title: str):
    """Render the page header with title."""
    title(header_title)
    divider()

gui.components.output

Functions

render_output(result=None, info_str=None, type=None)

Renders the output in a Streamlit app based on the provided type.

Parameters:

Name Type Description Default
result Any

The content to be displayed. Can be JSON, code markdown, or plain text.

None
info str

The information message to be displayed if result is None.

required
type str

The type of the result content. Can be ‘json’, ‘code’, ‘md’, or other for plain text.

None

Returns:

Name Type Description
Out

None

Source code in src/gui/components/output.py
def render_output(
    result: Any = None, info_str: str | None = None, type: str | None = None
):
    """
    Renders the output in a Streamlit app based on the provided type.

    Args:
        result (Any, optional): The content to be displayed. Can be JSON, code
            markdown, or plain text.
        info (str, optional): The information message to be displayed if result is None.
        type (str, optional): The type of the result content. Can be 'json', 'code',
            'md', or other for plain text.

    Returns:
        Out: None
    """

    if result:
        output_container = empty()
        output_container.write(result)
        # match type:
        #     case "json":
        #         json(result)
        #     case "code":
        #         code(result)
        #     case "md":
        #         markdown(result)
        #     case _:
        #         text(result)
        #         # st.write(result)
    else:
        info(info_str)

gui.components.prompts

gui.components.sidebar

gui.config.config

gui.config.styling

gui.config.text

gui.pages.home

gui.pages.prompts

Streamlit component for editing agent system prompts.

This module provides a function to render and edit prompt configurations for agent roles using a Streamlit-based UI. It validates the input configuration, displays warnings if prompts are missing, and allows interactive editing of each prompt.

Classes

Functions

render_prompts(chat_config)

Render and edit the prompt configuration for agent roles in the Streamlit UI.

Source code in src/gui/pages/prompts.py
def render_prompts(chat_config: ChatConfig | BaseModel):  # -> dict[str, str]:
    """
    Render and edit the prompt configuration for agent roles in the Streamlit UI.
    """

    header(PROMPTS_HEADER)

    if not isinstance(chat_config, ChatConfig):
        msg = invalid_type("ChatConfig", type(chat_config).__name__)
        logger.error(msg)
        error(msg)
        return None

    # updated = False
    prompts = chat_config.prompts

    if not prompts:
        warning(PROMPTS_WARNING)
        prompts = PROMPTS_DEFAULT

    updated_prompts = prompts.copy()

    # Edit prompts
    for prompt_key, prompt_value in prompts.items():
        new_value = render_prompt_editor(prompt_key, prompt_value, height=200)
        if new_value != prompt_value and new_value is not None:
            updated_prompts[prompt_key] = new_value

gui.pages.run_app

Streamlit interface for running the agentic system interactively.

This module defines the render_app function, which provides a Streamlit-based UI for users to select a provider, enter a query, and execute the main agent workflow. Results and errors are displayed in real time, supporting asynchronous execution.

Functions

render_app(provider=None, chat_config_file=None) async

Render the main app interface for running agentic queries via Streamlit.

Displays input fields for provider and query, a button to trigger execution, and an area for output or error messages. Handles async invocation of the main agent workflow and logs any exceptions.

Source code in src/gui/pages/run_app.py
async def render_app(
    provider: str | None = None, chat_config_file: str | Path | None = None
):
    """
    Render the main app interface for running agentic queries via Streamlit.

    Displays input fields for provider and query, a button to trigger execution,
    and an area for output or error messages. Handles async invocation of the
    main agent workflow and logs any exceptions.
    """

    header(RUN_APP_HEADER)
    if provider is None:
        provider = text_input(RUN_APP_PROVIDER_PLACEHOLDER)
    query = text_input(RUN_APP_QUERY_PLACEHOLDER)

    subheader(OUTPUT_SUBHEADER)
    if button(RUN_APP_BUTTON):
        if query:
            info(f"{RUN_APP_QUERY_RUN_INFO} {query}")
            try:
                result = await main(
                    chat_provider=provider,
                    query=query,
                    chat_config_file=chat_config_file,
                )
                render_output(result)
            except Exception as e:
                render_output(None)
                exception(e)
                logger.exception(e)
        else:
            warning(RUN_APP_QUERY_WARNING)
    else:
        render_output(RUN_APP_OUTPUT_PLACEHOLDER)

gui.pages.settings

Streamlit settings UI for provider and agent configuration.

This module provides a function to render and edit agent system settings, including provider selection and related options, within the Streamlit GUI. It validates the input configuration and ensures correct typing before rendering.

Classes

Functions

render_settings(chat_config)

Render and edit agent system settings in the Streamlit UI.

Displays a header and a selectbox for choosing the inference provider. Validates that the input is a ChatConfig instance and displays an error if not.

Source code in src/gui/pages/settings.py
def render_settings(chat_config: ChatConfig | BaseModel) -> str:
    """
    Render and edit agent system settings in the Streamlit UI.

    Displays a header and a selectbox for choosing the inference provider.
    Validates that the input is a ChatConfig instance and displays an error if not.
    """
    header(SETTINGS_HEADER)

    # updated = False
    # updated_config = config.copy()

    if not isinstance(chat_config, ChatConfig):
        msg = invalid_type("ChatConfig", type(chat_config).__name__)
        logger.error(msg)
        error(msg)
        return msg

    provider = selectbox(
        label=SETTINGS_PROVIDER_LABEL,
        options=chat_config.providers.keys(),
    )

    # Run options
    # col1, col2 = st.columns(2)
    # with col1:
    #     streamed_output = st.checkbox(
    #         "Stream Output", value=config.get("streamed_output", False)
    #     )
    # with col2:
    #     st.checkbox("Include Sources", value=True)  # include_sources

    # Allow adding new providers
    # new_provider = st.text_input("Add New Provider")
    # api_key = st.text_input(f"{provider} API Key", type="password")
    # if st.button("Add Provider") and new_provider and new_provider not in providers:
    #     providers.append(new_provider)
    #     updated_config["providers"] = providers
    #     updated_config["api_key"] = api_key
    #     updated = True
    #     st.success(f"Added provider: {new_provider}")

    # # Update config if changed
    # if (
    #     include_a != config.get("include_a", False)
    #     or include_b != config.get("include_b", False)
    #     or streamed_output != config.get("streamed_output", False)
    # ):
    #     updated_config["include_a"] = include_a
    #     updated_config["include_b"] = include_b
    #     updated_config["streamed_output"] = streamed_output
    #     updated = True

    return provider

run_cli

Lightweight CLI wrapper for the Agents-eval application.

This wrapper handles help and basic argument parsing quickly without loading heavy dependencies. It only imports the main application when actual processing is needed.

Functions

parse_args(argv)

Parse command line arguments into a dictionary.

This function processes a list of command-line arguments, extracting recognized options and their values. Supported arguments include flags (e.g., –help, –include-researcher and key-value pairs (e.g., --chat-provider=ollama). If the --help flag is present, a list of available commands and their descriptions is printed, and an empty dictionary is returned.

Returns:

Type Description
dict[str, str | bool]

dict[str, str | bool]: A dictionary mapping argument names

dict[str, str | bool]

(with leading ‘–’ removed and hyphens replaced by underscores)

dict[str, str | bool]

to their values (str for key-value pairs, bool for flags).

dict[str, str | bool]

Returns an empty dict if --help is specified.

Example

parse_args(['--chat-provider=ollama', '--include-researcher']) returns {'chat_provider': 'ollama', 'include_researcher': True}

Source code in src/run_cli.py
def parse_args(argv: list[str]) -> dict[str, str | bool]:
    """
    Parse command line arguments into a dictionary.

    This function processes a list of command-line arguments,
    extracting recognized options and their values.
    Supported arguments include flags (e.g., --help, --include-researcher
    and key-value pairs (e.g., `--chat-provider=ollama`).
    If the `--help` flag is present, a list of available commands and their
    descriptions is printed, and an empty dictionary is returned.

    Returns:
        `dict[str, str | bool]`: A dictionary mapping argument names
        (with leading '--' removed and hyphens replaced by underscores)
        to their values (`str` for key-value pairs, `bool` for flags).
        Returns an empty dict if `--help` is specified.

    Example:
        >>> `parse_args(['--chat-provider=ollama', '--include-researcher'])`
        returns `{'chat_provider': 'ollama', 'include_researcher': True}`
    """

    commands = {
        "--help": "Display help information",
        "--version": "Display version information",
        "--chat-provider": "Specify the chat provider to use",
        "--query": "Specify the query to process",
        "--include-researcher": "Include the researcher agent",
        "--include-analyst": "Include the analyst agent",
        "--include-synthesiser": "Include the synthesiser agent",
        "--no-stream": "Disable streaming output",
        "--chat-config-file": "Specify the path to the chat configuration file",
        "--paper-number": "Specify paper number for PeerRead review generation",
        "--download-peerread-full-only": (
            "Download all of the PeerRead dataset and exit (setup mode)"
        ),
        "--download-peerread-samples-only": (
            "Download a small sample of the PeerRead dataset and exit (setup mode)"
        ),
        "--peerread-max-papers-per-sample-download": (
            "Specify max papers to download per split, overrides sample default"
        ),
    }

    # output help and exit
    if "--help" in argv:
        print("Available commands:")
        for cmd, desc in commands.items():
            print(f"{cmd}: {desc}")
        exit(0)

    parsed_args: dict[str, str | bool] = {}

    # parse arguments for key-value pairs and flags
    for arg in argv:
        if arg.split("=", 1)[0] in commands.keys():
            key, value = arg.split("=", 1) if "=" in arg else (arg, True)
            key = key.lstrip("--").replace("-", "_")
            parsed_args[key] = value

    if parsed_args:
        logger.info(f"Used arguments: {parsed_args}")

    return parsed_args

run_gui

This module sets up and runs a Streamlit application for a Multi-Agent System.

The application includes the following components: - Header - Sidebar for configuration options - Main content area for prompts - Footer

The main function loads the configuration, renders the UI components, and handles the execution of the Multi-Agent System based on user input.

Functions: - run_app(): Placeholder function to run the main application logic. - main(): Main function to set up and run the Streamlit application.

Classes

Functions