diff --git a/.env.example b/.env.example index 62d11dc5..e5dee99f 100644 --- a/.env.example +++ b/.env.example @@ -109,6 +109,10 @@ ZAI_PLAN_API_KEY= TAVILY_API_KEY= JINA_API_KEY= FIRECRAWL_API_KEY= +# fastCRW (Firecrawl-compatible web scraper). CRW_API_URL defaults to the +# managed cloud; override it to point at a self-hosted server. +CRW_API_KEY= +CRW_API_URL= # Tracing backend (optional). Only one runs at a time: Langfuse takes # precedence when its keys are set, otherwise LangSmith is used. Values are diff --git a/intentkit/config/config.py b/intentkit/config/config.py index efe6479f..cd8dd33e 100644 --- a/intentkit/config/config.py +++ b/intentkit/config/config.py @@ -228,6 +228,10 @@ def __init__(self) -> None: self.jina_api_key: str | None = self.load("JINA_API_KEY") self.cookiefun_api_key: str | None = self.load("COOKIEFUN_API_KEY") self.firecrawl_api_key: str | None = self.load("FIRECRAWL_API_KEY") + # fastCRW (Firecrawl-compatible web scraper; single binary, self-host or + # cloud). CRW_API_URL overrides the base for self-hosted deployments. + self.crw_api_key: str | None = self.load("CRW_API_KEY") + self.crw_api_url: str = self.load("CRW_API_URL", "https://fastcrw.com/api") self.cryptopanic_api_key: str | None = self.load("CRYPTOPANIC_API_KEY") self.unrealspeech_api_key: str | None = self.load("UNREALSPEECH_API_KEY") self.dune_api_key: str | None = self.load("DUNE_API_KEY") diff --git a/intentkit/tools/crw/README.md b/intentkit/tools/crw/README.md new file mode 100644 index 00000000..7c8e2e29 --- /dev/null +++ b/intentkit/tools/crw/README.md @@ -0,0 +1,79 @@ +# fastCRW Tools + +The fastCRW tools provide web scraping and content indexing using [fastCRW](https://fastcrw.com), +a Firecrawl-compatible web data engine that ships as a single binary. Run it +self-hosted (free, open core) or against the managed cloud. These tools mirror +the Firecrawl provider and are additive — both providers can be enabled side by +side. + +## Tools Overview + +### 1. crw_scrape +Scrapes a single webpage and REPLACES any existing indexed content for that URL, preventing duplicates. + +**Parameters:** +- `url` (required): The URL to scrape +- `formats` (optional): Output formats - markdown, html, rawHtml, screenshot, links, json (default: ["markdown"]) +- `only_main_content` (optional): Extract only main content (default: true) +- `include_tags` (optional): HTML tags to include (e.g., ["h1", "h2", "p"]) +- `exclude_tags` (optional): HTML tags to exclude +- `wait_for` (optional): Wait time in milliseconds before scraping +- `timeout` (optional): Maximum timeout in milliseconds (default: 30000) +- `index_content` (optional): Whether to index content for querying (default: true) +- `chunk_size` (optional): Size of text chunks for indexing (default: 1000) +- `chunk_overlap` (optional): Overlap between chunks (default: 200) + +### 2. crw_crawl +Crawls multiple pages from a website and indexes all content. + +**Parameters:** +- `url` (required): The base URL to start crawling +- `limit` (optional): Maximum number of pages to crawl (default: 10) +- `include_paths` (optional): URL patterns to include (e.g., ["/docs/*"]) +- `exclude_paths` (optional): URL patterns to exclude +- `max_depth` (optional): Maximum crawl depth +- `index_content` (optional): Whether to index content for querying (default: true) +- `chunk_size` (optional): Size of text chunks for indexing (default: 1000) +- `chunk_overlap` (optional): Overlap between chunks (default: 200) + +### 3. crw_query_indexed_content +Queries previously indexed fastCRW content using semantic search. + +**Parameters:** +- `query` (required): The search query +- `max_results` (optional): Maximum number of results to return (1-10, default: 4) + +### 4. crw_clear_indexed_content +Clears all previously indexed fastCRW content from the vector store. + +**Parameters:** +- `confirm` (required): Must be set to true to confirm the deletion (default: false) + +**Note:** This action is permanent and cannot be undone. + +## Configuration + +fastCRW is Firecrawl-compatible; the integration defaults to the managed cloud +and lets you override the base URL for a self-hosted server. + +```bash +# Managed cloud (default base URL https://fastcrw.com/api) +export CRW_API_KEY=your-api-key-here + +# Self-hosted server (auth optional; CRW_API_KEY may be omitted) +export CRW_API_URL=http://localhost:3000 +``` + +- `CRW_API_KEY` — Bearer token. Optional for self-hosted instances that run without auth. +- `CRW_API_URL` — Base URL, default `https://fastcrw.com/api`. Set this to point at a self-hosted server. + +Content indexing uses OpenAI embeddings, so `OPENAI_API_KEY` must also be configured. + +## Features and Benefits + +- **Firecrawl-compatible**: Same REST surface and data shapes as Firecrawl. +- **Single binary**: Self-host the open core for free, or use the managed cloud. +- **JavaScript Rendering**: Handles SPAs and dynamic content. +- **Intelligent Chunking**: Optimized text splitting for better search. +- **Content Replacement**: Replace mode prevents duplicate/stale content on re-scrape. +- **Semantic Search**: Uses OpenAI embeddings for intelligent querying. diff --git a/intentkit/tools/crw/__init__.py b/intentkit/tools/crw/__init__.py new file mode 100644 index 00000000..84eb8208 --- /dev/null +++ b/intentkit/tools/crw/__init__.py @@ -0,0 +1,102 @@ +"""fastCRW tools for web scraping and crawling. + +fastCRW is a Firecrawl-compatible web scraper that ships as a single binary and +runs self-hosted or on the managed cloud at https://fastcrw.com. This toolset +mirrors the Firecrawl provider with a different base URL (CRW_API_URL) and key +(CRW_API_KEY); it is additive and does not affect the Firecrawl toolset. +""" + +import logging +from typing import NotRequired, TypedDict + +from intentkit.config.config import config as system_config +from intentkit.tools.base import ToolsetConfig, ToolState +from intentkit.tools.crw.base import CrwBaseTool +from intentkit.tools.crw.clear import CrwClearIndexedContent +from intentkit.tools.crw.crawl import CrwCrawl +from intentkit.tools.crw.query import CrwQueryIndexedContent +from intentkit.tools.crw.scrape import CrwScrape + +# Cache tools at the system level, because they are stateless +_cache: dict[str, CrwBaseTool] = {} + +logger = logging.getLogger(__name__) + + +class ToolStates(TypedDict): + crw_scrape: ToolState + crw_crawl: ToolState + crw_query_indexed_content: ToolState + crw_clear_indexed_content: ToolState + + +class Config(ToolsetConfig): + """Configuration for fastCRW tools.""" + + states: ToolStates + rate_limit_number: NotRequired[int] + rate_limit_minutes: NotRequired[int] + + +async def get_tools( + config: "Config", + is_private: bool, + **_, +) -> list[CrwBaseTool]: + """Get all fastCRW tools. + + Args: + config: The configuration for fastCRW tools. + is_private: Whether to include private tools. + + Returns: + A list of fastCRW tools. + """ + available_tools = [] + + # Include tools based on their state + for tool_name, state in config["states"].items(): + if state == "disabled": + continue + elif state == "public" or (state == "private" and is_private): + available_tools.append(tool_name) + + # Get each tool using the cached getter + return [s for name in available_tools if (s := get_crw_tool(name))] + + +def get_crw_tool( + name: str, +) -> CrwBaseTool | None: + """Get a fastCRW tool by name.""" + if name == "crw_scrape": + if name not in _cache: + _cache[name] = CrwScrape() + return _cache[name] + elif name == "crw_crawl": + if name not in _cache: + _cache[name] = CrwCrawl() + return _cache[name] + elif name == "crw_query_indexed_content": + if name not in _cache: + _cache[name] = CrwQueryIndexedContent() + return _cache[name] + elif name == "crw_clear_indexed_content": + if name not in _cache: + _cache[name] = CrwClearIndexedContent() + return _cache[name] + else: + logger.warning("Unknown fastCRW tool: %s", name) + return None + + +def available() -> bool: + """Check if this toolset is available based on system config. + + fastCRW self-host may run without auth, and CRW_API_URL always has a default, + so the toolset is available when a key or a custom base URL is configured. + """ + return bool(system_config.crw_api_key) or bool( + system_config.crw_api_url + and system_config.crw_api_url != "https://fastcrw.com/api" + ) diff --git a/intentkit/tools/crw/base.py b/intentkit/tools/crw/base.py new file mode 100644 index 00000000..b447afd6 --- /dev/null +++ b/intentkit/tools/crw/base.py @@ -0,0 +1,24 @@ +from langchain_core.tools.base import ToolException + +from intentkit.config.config import config +from intentkit.tools.base import IntentKitTool + + +class CrwBaseTool(IntentKitTool): + """Base class for fastCRW tools. + + fastCRW is a Firecrawl-compatible web scraper that ships as a single binary + and runs self-hosted or on the managed cloud. The REST surface mirrors + Firecrawl, so these tools mirror the Firecrawl provider with a different + base URL (CRW_API_URL, default https://fastcrw.com/api) and key (CRW_API_KEY). + """ + + def get_api_key(self): + # Self-hosted fastCRW may run without auth, so the key is optional there. + return config.crw_api_key + + def get_api_url(self) -> str: + base = (config.crw_api_url or "https://fastcrw.com/api").rstrip("/") + return base + + category: str = "crw" diff --git a/intentkit/tools/crw/clear.py b/intentkit/tools/crw/clear.py new file mode 100644 index 00000000..0c2506ab --- /dev/null +++ b/intentkit/tools/crw/clear.py @@ -0,0 +1,79 @@ +import logging + +from langchain_core.tools import ArgsSchema +from langchain_core.tools.base import ToolException +from pydantic import BaseModel, Field + +from intentkit.models.tool import AgentToolData +from intentkit.tools.crw.base import CrwBaseTool + +logger = logging.getLogger(__name__) + + +class CrwClearInput(BaseModel): + """Input for fastCRW clear tool.""" + + confirm: bool = Field( + description="Must be true to confirm deletion.", + default=False, + ) + + +class CrwClearIndexedContent(CrwBaseTool): + """Tool for clearing all indexed fastCRW content. + + This tool removes all previously indexed content from the fastCRW vector store, + allowing for a fresh start with new content. + """ + + name: str = "crw_clear_indexed_content" + description: str = "Permanently clear all indexed fastCRW content from the vector store. Cannot be undone." + args_schema: ArgsSchema | None = CrwClearInput + + async def _arun( + self, + confirm: bool = False, + **kwargs, + ) -> str: + """Clear all indexed fastCRW content for the agent. + + Args: + confirm: Must be True to confirm the deletion + config: The configuration for the tool call + + Returns: + str: Confirmation message + """ + context = self.get_context() + agent_id = context.agent_id + + if not agent_id: + raise ToolException("Error: Agent ID not available for clearing content.") + if not confirm: + raise ToolException( + "Error: You must set confirm=true to clear all indexed content." + ) + logger.info( + f"crw_clear: Starting clear indexed content operation for agent {agent_id}" + ) + + try: + # Delete vector store data (using web_scraper storage format for compatibility) + vector_store_key = f"vector_store_{agent_id}" + await AgentToolData.delete(agent_id, "web_scraper", vector_store_key) + + # Delete metadata + metadata_key = f"indexed_urls_{agent_id}" + await AgentToolData.delete(agent_id, "web_scraper", metadata_key) + + logger.info( + f"crw_clear: Successfully cleared all indexed content for agent {agent_id}" + ) + return "Successfully cleared all fastCRW indexed content. The vector store is now empty and ready for new content." + + except Exception as e: + logger.error( + f"crw_clear: Error clearing indexed content for agent {agent_id}: {e}", + exc_info=True, + ) + raise ToolException(f"Error clearing indexed content: {str(e)}") diff --git a/intentkit/tools/crw/crawl.py b/intentkit/tools/crw/crawl.py new file mode 100644 index 00000000..ed6478ea --- /dev/null +++ b/intentkit/tools/crw/crawl.py @@ -0,0 +1,397 @@ +import asyncio +import logging +from decimal import Decimal +from typing import Any + +import httpx +from langchain_core.documents import Document +from langchain_core.tools import ArgsSchema +from langchain_core.tools.base import ToolException +from pydantic import BaseModel, Field + +from intentkit.tools.crw.base import CrwBaseTool + +logger = logging.getLogger(__name__) + + +class CrwCrawlInput(BaseModel): + """Input for fastCRW crawl tool.""" + + url: str = Field(description="Base URL to crawl.") + limit: int = Field(description="Max pages to crawl.", default=10, ge=1, le=1000) + formats: list[str] = Field( + description="Output formats: markdown, html, rawHtml, screenshot, links, json.", + default=["markdown"], + ) + include_paths: list[str] | None = Field( + description="Regex patterns for paths to include.", + default=None, + ) + exclude_paths: list[str] | None = Field( + description="Regex patterns for paths to exclude.", + default=None, + ) + max_depth: int | None = Field( + description="Max crawl depth from base URL.", + default=None, + ge=1, + le=10, + ) + allow_backward_links: bool = Field( + description="Allow crawling parent/sibling URLs.", + default=False, + ) + allow_external_links: bool = Field( + description="Allow crawling external domains.", default=False + ) + allow_subdomains: bool = Field( + description="Allow crawling subdomains.", default=False + ) + only_main_content: bool = Field( + description="Extract only main content, excluding nav/footer.", + default=True, + ) + index_content: bool = Field( + description="Index crawled content for later querying.", + default=True, + ) + chunk_size: int = Field( + description="Text chunk size for indexing.", + default=1000, + ge=100, + le=4000, + ) + chunk_overlap: int = Field( + description="Overlap between chunks.", + default=200, + ge=0, + le=1000, + ) + + +class CrwCrawl(CrwBaseTool): + """Tool for crawling entire websites using fastCRW. + + This tool uses fastCRW's API to crawl websites and extract content from multiple pages. + It can handle JavaScript-rendered content, follow links, and extract structured data + from entire websites. + + Attributes: + name: The name of the tool. + description: A description of what the tool does. + args_schema: The schema for the tool's input arguments. + """ + + name: str = "crw_crawl" + description: str = ( + "Crawl a website to extract content from multiple pages. " + "Optionally indexes content for querying via crw_query_indexed_content." + ) + price: Decimal = Decimal("100") + args_schema: ArgsSchema | None = CrwCrawlInput + + async def _arun( + self, + url: str, + limit: int = 10, + formats: list[str] | None = None, + include_paths: list[str] | None = None, + exclude_paths: list[str] | None = None, + max_depth: int | None = None, + allow_backward_links: bool = False, + allow_external_links: bool = False, + allow_subdomains: bool = False, + only_main_content: bool = True, + index_content: bool = True, + chunk_size: int = 1000, + chunk_overlap: int = 200, + **kwargs, + ) -> str: + """Implementation of the fastCRW crawl tool. + + Args: + url: The base URL to crawl. + limit: Maximum number of pages to crawl. + formats: Output formats to include in the response. + include_paths: Regex patterns to include in the crawl. + exclude_paths: Regex patterns to exclude from the crawl. + max_depth: Maximum depth to crawl from the base URL. + allow_backward_links: Allow crawling parent and sibling URLs. + allow_external_links: Allow crawling external domains. + allow_subdomains: Allow crawling subdomains. + only_main_content: Whether to extract only main content. + config: The configuration for the tool call. + + Returns: + str: Formatted crawled content from all pages. + """ + context = self.get_context() + tool_config = context.agent.tool_config(self.category) + logger.debug("crw_crawl: Running crawl with context %s", context) + + if tool_config.get("rate_limit_number") and tool_config.get( + "rate_limit_minutes" + ): + await self.user_rate_limit_by_category( + tool_config["rate_limit_number"], + tool_config["rate_limit_minutes"] * 60, + ) + + # Get the API key from the agent's configuration. fastCRW self-host may + # run without auth, so an absent key is allowed. + api_key = self.get_api_key() + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + api_url = self.get_api_url() + # Validate and set defaults + if formats is None: + formats = ["markdown"] + + # Validate formats + valid_formats = ["markdown", "html", "rawHtml", "screenshot", "links", "json"] + formats = [f for f in formats if f in valid_formats] + if not formats: + formats = ["markdown"] + + # Prepare the request payload + payload: dict[str, Any] = { + "url": url, + "limit": min(limit, 1000), # Cap at 1000 for safety + "scrapeOptions": {"formats": formats, "onlyMainContent": only_main_content}, + } + + if include_paths: + payload["includePaths"] = include_paths + if exclude_paths: + payload["excludePaths"] = exclude_paths + if max_depth: + payload["maxDepth"] = max_depth + if allow_backward_links: + payload["allowBackwardLinks"] = allow_backward_links + if allow_external_links: + payload["allowExternalLinks"] = allow_external_links + if allow_subdomains: + payload["allowSubdomains"] = allow_subdomains + + # Call fastCRW crawl API + try: + async with httpx.AsyncClient(timeout=120.0) as client: + # Start the crawl + response = await client.post( + f"{api_url}/v1/crawl", + json=payload, + headers=headers, + ) + + if response.status_code != 200: + logger.error( + f"crw_crawl: Error from fastCRW API: {response.status_code} - {response.text}" + ) + raise ToolException( + f"Error starting crawl: {response.status_code} - {response.text}" + ) + crawl_data = response.json() + + if not crawl_data.get("success"): + error_msg = crawl_data.get("error", "Unknown error occurred") + raise ToolException(f"Error starting crawl: {error_msg}") + crawl_id = crawl_data.get("id") + if not crawl_id: + raise ToolException( + "Error: No crawl ID returned from fastCRW API" + ) + # Poll for crawl completion + max_polls = 60 # Maximum 5 minutes of polling (60 * 5 seconds) + poll_count = 0 + + while poll_count < max_polls: + # Check crawl status + status_response = await client.get( + f"{api_url}/v1/crawl/{crawl_id}", + headers=headers, + ) + + if status_response.status_code != 200: + logger.error( + f"crw_crawl: Error checking crawl status: {status_response.status_code} - {status_response.text}" + ) + raise ToolException( + f"Error checking crawl status: {status_response.status_code} - {status_response.text}" + ) + status_data = status_response.json() + status = status_data.get("status") + + if status == "completed": + # Crawl completed successfully + pages_data = status_data.get("data", []) + total_pages = status_data.get("total", 0) + completed_pages = status_data.get("completed", 0) + + # Format the results + formatted_result = f"Successfully crawled: {url}\n" + formatted_result += f"Total pages found: {total_pages}\n" + formatted_result += f"Pages completed: {completed_pages}\n\n" + + # Process each page + for i, page_data in enumerate( + pages_data[:10], 1 + ): # Limit to first 10 pages for output + page_url = page_data.get("metadata", {}).get( + "sourceURL", "Unknown URL" + ) + formatted_result += f"## Page {i}: {page_url}\n" + + if "markdown" in formats and page_data.get("markdown"): + content = page_data["markdown"][ + :500 + ] # Limit content length + formatted_result += f"{content}" + if len(page_data["markdown"]) > 500: + formatted_result += "... (content truncated)" + formatted_result += "\n\n" + + # Add page metadata + metadata = page_data.get("metadata", {}) + if metadata.get("title"): + formatted_result += f"Title: {metadata['title']}\n" + if metadata.get("description"): + formatted_result += ( + f"Description: {metadata['description']}\n" + ) + formatted_result += "\n" + + if len(pages_data) > 10: + formatted_result += ( + f"... and {len(pages_data) - 10} more pages\n" + ) + + # Index content if requested + if index_content and pages_data: + try: + # Import indexing utilities from crw utils + from intentkit.tools.crw.utils import ( + CrwMetadataManager, + CrwVectorStoreManager, + index_documents, + ) + + # Create documents from crawled content + documents = [] + for page_data in pages_data: + if page_data.get("markdown"): + metadata = page_data.get("metadata", {}) + document = Document( + page_content=page_data["markdown"], + metadata={ + "source": metadata.get( + "sourceURL", "Unknown URL" + ), + "title": metadata.get("title", ""), + "description": metadata.get( + "description", "" + ), + "language": metadata.get( + "language", "" + ), + "source_type": "crw_crawl", + "indexed_at": str(context.agent_id), + }, + ) + documents.append(document) + + # Get agent ID for indexing + agent_id = context.agent_id + if agent_id and documents: + vector_manager = CrwVectorStoreManager() + + # Index all documents + total_chunks, was_merged = await index_documents( + documents, + agent_id, + vector_manager, + chunk_size, + chunk_overlap, + ) + + # Update metadata + urls = [doc.metadata["source"] for doc in documents] + new_metadata = ( + CrwMetadataManager.create_url_metadata( + urls, documents, "crw_crawl" + ) + ) + await CrwMetadataManager.update_metadata( + agent_id, new_metadata + ) + + formatted_result += "\n## Content Indexing\n" + formatted_result += "Successfully indexed crawled content into vector store:\n" + formatted_result += ( + f"- Pages indexed: {len(documents)}\n" + ) + formatted_result += ( + f"- Total chunks created: {total_chunks}\n" + ) + formatted_result += f"- Chunk size: {chunk_size}\n" + formatted_result += ( + f"- Chunk overlap: {chunk_overlap}\n" + ) + formatted_result += f"- Content merged with existing: {'Yes' if was_merged else 'No'}\n" + formatted_result += "Use the 'crw_query_indexed_content' tool to search this content.\n" + + logger.info( + f"crw_crawl: Successfully indexed {len(documents)} pages with {total_chunks} total chunks" + ) + else: + formatted_result += "\n## Content Indexing\n" + formatted_result += "Warning: Could not index content - agent ID not available or no content to index.\n" + + except Exception as index_error: + logger.error( + f"crw_crawl: Error indexing content: {index_error}" + ) + formatted_result += "\n## Content Indexing\n" + formatted_result += f"Warning: Failed to index content for later querying: {str(index_error)}\n" + + return formatted_result.strip() + + elif status == "failed": + error_msg = status_data.get("error", "Crawl failed") + raise ToolException(f"Crawl failed: {error_msg}") + + elif status in ["scraping", "active"]: + # Still in progress, wait and poll again + completed = status_data.get("completed", 0) + total = status_data.get("total", 0) + logger.debug( + f"crw_crawl: Crawl in progress: {completed}/{total} pages" + ) + + # Wait 5 seconds before next poll + await asyncio.sleep(5) + poll_count += 1 + + else: + # Unknown status + logger.warning( + f"crw_crawl: Unknown crawl status: {status}" + ) + await asyncio.sleep(5) + poll_count += 1 + + # If we've exceeded max polls, raise timeout + raise ToolException( + f"Crawl timeout: The crawl of {url} is taking longer than expected. Please try again later or reduce the crawl limit." + ) + + except ToolException: + raise + except httpx.TimeoutException: + logger.error("crw_crawl: Timeout crawling URL: %s", url) + raise ToolException( + f"Timeout error: The request to crawl {url} took too long to complete." + ) + except Exception as e: + logger.error("crw_crawl: Error crawling URL: %s", e, exc_info=True) + raise ToolException(f"An error occurred while crawling the URL: {e!s}") diff --git a/intentkit/tools/crw/crw.png b/intentkit/tools/crw/crw.png new file mode 100644 index 00000000..78ee35c5 Binary files /dev/null and b/intentkit/tools/crw/crw.png differ diff --git a/intentkit/tools/crw/query.py b/intentkit/tools/crw/query.py new file mode 100644 index 00000000..2fc9b300 --- /dev/null +++ b/intentkit/tools/crw/query.py @@ -0,0 +1,111 @@ +import logging + +from langchain_core.tools import ArgsSchema +from langchain_core.tools.base import ToolException +from pydantic import BaseModel, Field + +from intentkit.tools.crw.base import CrwBaseTool + +logger = logging.getLogger(__name__) + + +class CrwQueryInput(BaseModel): + """Input for fastCRW query tool.""" + + query: str = Field( + description="Search query for indexed content.", + min_length=1, + max_length=500, + ) + max_results: int = Field( + description="Max relevant documents to return.", + default=4, + ge=1, + le=10, + ) + + +class CrwQueryIndexedContent(CrwBaseTool): + """Tool for querying previously indexed fastCRW content. + + This tool searches through content that was previously scraped and indexed + using the crw_scrape or crw_crawl tools to answer questions or find relevant information. + """ + + name: str = "crw_query_indexed_content" + description: str = ( + "Search previously indexed fastCRW content to find relevant information." + ) + args_schema: ArgsSchema | None = CrwQueryInput + + async def _arun( + self, + query: str, + max_results: int = 4, + **kwargs, + ) -> str: + """Query the indexed fastCRW content.""" + try: + context = self.get_context() + if not context or not context.agent_id: + raise ToolException( + "Agent ID is required but not found in configuration" + ) + + agent_id = context.agent_id + + logger.info( + "[%s] Starting fastCRW query operation: '%s'", agent_id, query + ) + + # Import query utilities from crw utils + from intentkit.tools.crw.utils import ( + CrwDocumentProcessor, + CrwVectorStoreManager, + query_indexed_content, + ) + + # Query the indexed content + vector_manager = CrwVectorStoreManager() + docs = await query_indexed_content( + query, agent_id, vector_manager, max_results + ) + + if not docs: + logger.info("[%s] No relevant documents found for query", agent_id) + return f"No relevant information found for your query: '{query}'. The indexed content may not contain information related to your search." + + # Format results + results = [] + for i, doc in enumerate(docs, 1): + # Sanitize content to prevent database storage errors + content = CrwDocumentProcessor.sanitize_for_database( + doc.page_content.strip() + ) + source = doc.metadata.get("source", "Unknown") + source_type = doc.metadata.get("source_type", "unknown") + + # Add source type indicator for fastCRW content + if source_type.startswith("crw"): + source_indicator = ( + f"[fastCRW {source_type.replace('crw_', '').title()}]" + ) + else: + source_indicator = "" + + results.append( + f"**Source {i}:** {source} {source_indicator}\n{content}" + ) + + response = "\n\n".join(results) + logger.info( + f"[{agent_id}] fastCRW query completed successfully, returning {len(response)} chars" + ) + + return response + + except ToolException: + raise + except Exception as e: + logger.error("Error in CrwQueryIndexedContent: %s", e, exc_info=True) + raise ToolException(f"Failed to query indexed content: {e!s}") diff --git a/intentkit/tools/crw/schema.json b/intentkit/tools/crw/schema.json new file mode 100644 index 00000000..058f7196 --- /dev/null +++ b/intentkit/tools/crw/schema.json @@ -0,0 +1,109 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "title": "fastCRW Web Scraping and Crawling", + "description": "Firecrawl-compatible web scraping and crawling via fastCRW (single binary; self-host or cloud at https://fastcrw.com)", + "x-icon": "/tools/crw/crw.png", + "x-tags": [ + "Knowledge Base" + ], + "properties": { + "enabled": { + "type": "boolean", + "title": "Enabled", + "description": "Whether this tool is enabled", + "default": false + }, + "states": { + "type": "object", + "properties": { + "crw_scrape": { + "type": "string", + "title": "fastCRW Scrape", + "enum": [ + "disabled", + "public", + "private" + ], + "x-enum-title": [ + "Disabled", + "Agent Owner + All Users", + "Agent Owner Only" + ], + "description": "Scrape single web pages and REPLACE any existing indexed content for that URL. Unlike regular scrape, this prevents duplicate content when re-scraping the same page. Use this to refresh/update content from a previously scraped URL.", + "default": "disabled" + }, + "crw_crawl": { + "type": "string", + "title": "fastCRW Crawl", + "enum": [ + "disabled", + "public", + "private" + ], + "x-enum-title": [ + "Disabled", + "Agent Owner + All Users", + "Agent Owner Only" + ], + "description": "Crawl entire websites and extract content from multiple pages. Can follow links, handle JavaScript-rendered content, and extract structured data from entire websites.", + "default": "disabled" + }, + "crw_query_indexed_content": { + "type": "string", + "title": "Query Indexed Content", + "enum": [ + "disabled", + "public", + "private" + ], + "x-enum-title": [ + "Disabled", + "Agent Owner + All Users", + "Agent Owner Only" + ], + "description": "Query previously indexed fastCRW content to find relevant information and answer questions. Use this to search through content that was scraped and indexed using fastCRW tools.", + "default": "disabled" + }, + "crw_clear_indexed_content": { + "type": "string", + "title": "Clear Indexed Content", + "enum": [ + "disabled", + "public", + "private" + ], + "x-enum-title": [ + "Disabled", + "Agent Owner + All Users", + "Agent Owner Only" + ], + "description": "Clear all previously indexed fastCRW content from the vector store. This will permanently delete all indexed content and cannot be undone. Use this tool when you want to start fresh with new content.", + "default": "disabled" + } + }, + "description": "States for each fastCRW tool (disabled, public, or private)" + }, + "rate_limit_number": { + "type": "integer", + "title": "Rate Limit Number", + "description": "Number of requests allowed per time window", + "minimum": 1, + "maximum": 1000, + "default": 100 + }, + "rate_limit_minutes": { + "type": "integer", + "title": "Rate Limit Minutes", + "description": "Time window in minutes for rate limiting", + "minimum": 1, + "maximum": 1440, + "default": 60 + } + }, + "required": [ + "states", + "enabled" + ], + "additionalProperties": true +} diff --git a/intentkit/tools/crw/scrape.py b/intentkit/tools/crw/scrape.py new file mode 100644 index 00000000..9211a430 --- /dev/null +++ b/intentkit/tools/crw/scrape.py @@ -0,0 +1,419 @@ +import logging +from decimal import Decimal + +import httpx +from langchain_core.documents import Document +from langchain_core.tools import ArgsSchema +from langchain_core.tools.base import ToolException +from pydantic import BaseModel, Field + +from intentkit.tools.crw.base import CrwBaseTool + +logger = logging.getLogger(__name__) + + +class CrwScrapeInput(BaseModel): + """Input for fastCRW scrape tool.""" + + url: str = Field(description="URL to scrape.") + formats: list[str] = Field( + description="Output formats: markdown, html, rawHtml, screenshot, links, json.", + default=["markdown"], + ) + only_main_content: bool = Field( + description="Extract only main content, excluding nav/footer.", + default=True, + ) + include_tags: list[str] | None = Field( + description="HTML tags/classes/IDs to include.", + default=None, + ) + exclude_tags: list[str] | None = Field( + description="HTML tags/classes/IDs to exclude.", + default=None, + ) + wait_for: int = Field( + description="Wait ms before scraping. Use as last resort.", + default=0, + ge=0, + ) + timeout: int = Field( + description="Max timeout in ms.", + default=30000, + ge=1000, + le=120000, + ) + index_content: bool = Field( + description="Index scraped content for later querying.", + default=True, + ) + chunk_size: int = Field( + description="Text chunk size for indexing.", + default=1000, + ge=100, + le=4000, + ) + chunk_overlap: int = Field( + description="Overlap between chunks.", + default=200, + ge=0, + le=1000, + ) + + +class CrwScrape(CrwBaseTool): + """Tool for scraping web pages using fastCRW with REPLACE behavior. + + This tool uses fastCRW's API to scrape web pages and REPLACES any existing + indexed content for the same URL instead of appending to it. This prevents + duplicate content when re-scraping the same page. + + Attributes: + name: The name of the tool. + description: A description of what the tool does. + args_schema: The schema for the tool's input arguments. + """ + + name: str = "crw_scrape" + description: str = ( + "Scrape a web page and replace any existing indexed content for that URL. " + "Handles JS-rendered content, PDFs, and dynamic sites." + ) + price: Decimal = Decimal("100") + args_schema: ArgsSchema | None = CrwScrapeInput + + async def _arun( + self, + url: str, + formats: list[str] | None = None, + only_main_content: bool = True, + include_tags: list[str] | None = None, + exclude_tags: list[str] | None = None, + wait_for: int = 0, + timeout: int = 30000, + index_content: bool = True, + chunk_size: int = 1000, + chunk_overlap: int = 200, + **kwargs, + ) -> str: + """Implementation of the fastCRW scrape tool. + + Args: + url: The URL to scrape. + formats: Output formats to include in the response. + only_main_content: Whether to extract only main content. + include_tags: HTML tags/classes/IDs to include. + exclude_tags: HTML tags/classes/IDs to exclude. + wait_for: Wait time in milliseconds before scraping. + timeout: Maximum timeout in milliseconds. + index_content: Whether to index the content for later querying. + chunk_size: Size of text chunks for indexing. + chunk_overlap: Overlap between chunks. + config: The configuration for the tool call. + + Returns: + str: Formatted scraped content based on the requested formats. + """ + context = self.get_context() + tool_config = context.agent.tool_config(self.category) + logger.debug("crw_scrape: Running scrape with context %s", context) + + if tool_config.get("rate_limit_number") and tool_config.get( + "rate_limit_minutes" + ): + await self.user_rate_limit_by_category( + tool_config["rate_limit_number"], + tool_config["rate_limit_minutes"] * 60, + ) + + # Get the API key from the agent's configuration. fastCRW self-host may + # run without auth, so an absent key is allowed. + api_key = self.get_api_key() + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + # Validate and set defaults + if formats is None: + formats = ["markdown"] + + # Validate formats + valid_formats = ["markdown", "html", "rawHtml", "screenshot", "links", "json"] + formats = [f for f in formats if f in valid_formats] + if not formats: + formats = ["markdown"] + + # Prepare the request payload + payload = { + "url": url, + "formats": formats, + "onlyMainContent": only_main_content, + "timeout": timeout, + } + + if include_tags: + payload["includeTags"] = include_tags + if exclude_tags: + payload["excludeTags"] = exclude_tags + if wait_for > 0: + payload["waitFor"] = wait_for + + # Call fastCRW scrape API + try: + async with httpx.AsyncClient(timeout=timeout / 1000 + 10) as client: + response = await client.post( + f"{self.get_api_url()}/v1/scrape", + json=payload, + headers=headers, + ) + + if response.status_code != 200: + logger.error( + f"crw_scrape: Error from fastCRW API: {response.status_code} - {response.text}" + ) + raise ToolException( + f"Error scraping URL: {response.status_code} - {response.text}" + ) + + data = response.json() + + if not data.get("success"): + error_msg = data.get("error", "Unknown error occurred") + raise ToolException(f"Error scraping URL: {error_msg}") + result_data = data.get("data", {}) + + # Format the results based on requested formats + formatted_result = f"Successfully scraped (REPLACE mode): {url}\n\n" + + if "markdown" in formats and result_data.get("markdown"): + formatted_result += "## Markdown Content\n" + formatted_result += result_data["markdown"][:2000] # Limit length + if len(result_data["markdown"]) > 2000: + formatted_result += "... (content truncated)" + formatted_result += "\n\n" + + if "html" in formats and result_data.get("html"): + formatted_result += "## HTML Content\n" + formatted_result += f"HTML content available ({len(result_data['html'])} characters)\n\n" + + if "links" in formats and result_data.get("links"): + formatted_result += "## Extracted Links\n" + links = result_data["links"][:10] # Limit to first 10 links + for link in links: + formatted_result += f"- {link}\n" + if len(result_data["links"]) > 10: + formatted_result += ( + f"... and {len(result_data['links']) - 10} more links\n" + ) + formatted_result += "\n" + + if "json" in formats and result_data.get("json"): + formatted_result += "## Structured Data (JSON)\n" + formatted_result += str(result_data["json"])[:1000] # Limit length + if len(str(result_data["json"])) > 1000: + formatted_result += "... (data truncated)" + formatted_result += "\n\n" + + if "screenshot" in formats and result_data.get("screenshot"): + formatted_result += "## Screenshot\n" + formatted_result += ( + f"Screenshot available at: {result_data['screenshot']}\n\n" + ) + + # Add metadata information + metadata = result_data.get("metadata", {}) + if metadata: + formatted_result += "## Page Metadata\n" + if metadata.get("title"): + formatted_result += f"Title: {metadata['title']}\n" + if metadata.get("description"): + formatted_result += f"Description: {metadata['description']}\n" + if metadata.get("language"): + formatted_result += f"Language: {metadata['language']}\n" + formatted_result += "\n" + + # Index content if requested - REPLACE MODE + if index_content and result_data.get("markdown"): + try: + # Import indexing utilities + from langchain_community.vectorstores import FAISS + + from intentkit.tools.crw.utils import ( + CrwDocumentProcessor, + CrwMetadataManager, + CrwVectorStoreManager, + ) + + # Create document from scraped content + document = Document( + page_content=result_data["markdown"], + metadata={ + "source": url, + "title": metadata.get("title", ""), + "description": metadata.get("description", ""), + "language": metadata.get("language", ""), + "source_type": "crw_scrape", + "indexed_at": str(context.agent_id), + }, + ) + + # Get agent ID for indexing + agent_id = context.agent_id + if agent_id: + # Initialize vector store manager + vs_manager = CrwVectorStoreManager() + + # Load existing vector store + existing_vector_store = await vs_manager.load_vector_store( + agent_id + ) + + # Split the new document into chunks + split_docs = CrwDocumentProcessor.split_documents( + [document], chunk_size, chunk_overlap + ) + + # Create embeddings + embeddings = vs_manager.create_embeddings() + + if existing_vector_store: + # Get all existing documents and filter out those from the same URL + try: + # Retrieve all documents via public API: + # index_to_docstore_id maps FAISS index + # positions to docstore IDs, and + # docstore.search() looks up by ID. + # Single pass: look up each doc by ID, + # keep only valid Documents not from this URL. + preserved_docs = [ + doc + for doc_id in existing_vector_store.index_to_docstore_id.values() + if isinstance( + doc + := existing_vector_store.docstore.search( + doc_id + ), + Document, + ) + and doc.metadata.get("source") != url + ] + + logger.info( + f"crw_scrape: Preserving {len(preserved_docs)} docs from other URLs, " + f"replacing content from {url}" + ) + + # Create new vector store with preserved docs + new docs + if preserved_docs: + # Combine preserved and new documents + all_documents = preserved_docs + split_docs + new_vector_store = FAISS.from_documents( + all_documents, embeddings + ) + formatted_result += "\n## Content Replacement\n" + formatted_result += f"Replaced existing content for URL: {url}\n" + num_preserved_urls = len( + set( + doc.metadata.get("source", "") + for doc in preserved_docs + ) + ) + formatted_result += f"Preserved content from {num_preserved_urls} other URLs\n" + else: + # No other documents to preserve, just create from new docs + new_vector_store = FAISS.from_documents( + split_docs, embeddings + ) + formatted_result += "\n## Content Replacement\n" + formatted_result += f"Created new index with content from: {url}\n" + except Exception as e: + logger.warning( + f"Could not preserve other URLs, creating fresh index: {e}" + ) + # Fallback: create new store with just the new documents + new_vector_store = FAISS.from_documents( + split_docs, embeddings + ) + formatted_result += "\n## Content Replacement\n" + formatted_result += f"Created fresh index with content from: {url}\n" + else: + # No existing store, create new one + new_vector_store = FAISS.from_documents( + split_docs, embeddings + ) + formatted_result += "\n## Content Indexing\n" + formatted_result += ( + f"Created new index with content from: {url}\n" + ) + + # Save the new vector store + await vs_manager.save_vector_store( + agent_id, new_vector_store, chunk_size, chunk_overlap + ) + + # Update metadata to track all URLs + # Get existing metadata to preserve other URLs + metadata_key = f"indexed_urls_{agent_id}" + existing_metadata = await self.get_agent_tool_data_raw( + "crw", metadata_key + ) + + if existing_metadata and existing_metadata.get("urls"): + # Remove the current URL and add it back (to update timestamp) + existing_urls = [ + u for u in existing_metadata["urls"] if u != url + ] + existing_urls.append(url) + updated_metadata = { + "urls": existing_urls, + "document_count": len(existing_urls), + "source_type": "crw_mixed", + "indexed_at": str(len(existing_urls)), + } + else: + # Create new metadata + updated_metadata = ( + CrwMetadataManager.create_url_metadata( + [url], [document], "crw_scrape" + ) + ) + + await CrwMetadataManager.update_metadata( + agent_id, updated_metadata + ) + + formatted_result += "\n## Content Indexing (REPLACE MODE)\n" + formatted_result += "Successfully REPLACED indexed content in vector store:\n" + formatted_result += f"- Chunks created: {len(split_docs)}\n" + formatted_result += f"- Chunk size: {chunk_size}\n" + formatted_result += f"- Chunk overlap: {chunk_overlap}\n" + formatted_result += ( + "- Previous content for this URL: REPLACED\n" + ) + formatted_result += "Use the 'crw_query_indexed_content' tool to search this content.\n" + + logger.info( + f"crw_scrape: Successfully replaced content for {url} with {len(split_docs)} chunks" + ) + else: + formatted_result += "\n## Content Indexing\n" + formatted_result += "Warning: Could not index content - agent ID not available.\n" + + except Exception as index_error: + logger.error( + f"crw_scrape: Error indexing content: {index_error}" + ) + formatted_result += "\n## Content Indexing\n" + formatted_result += f"Warning: Failed to index content for later querying: {str(index_error)}\n" + + return formatted_result.strip() + + except httpx.TimeoutException: + logger.error("crw_scrape: Timeout scraping URL: %s", url) + raise ToolException( + f"Timeout error: The request to scrape {url} took too long to complete." + ) + except ToolException: + raise + except Exception as e: + logger.error("crw_scrape: Error scraping URL: %s", e, exc_info=True) + raise ToolException(f"An error occurred while scraping the URL: {str(e)}") diff --git a/intentkit/tools/crw/utils.py b/intentkit/tools/crw/utils.py new file mode 100644 index 00000000..d7e07576 --- /dev/null +++ b/intentkit/tools/crw/utils.py @@ -0,0 +1,315 @@ +"""Utilities for fastCRW tool content indexing and querying.""" + +import logging +import re +from typing import Any + +from langchain_community.vectorstores import FAISS +from langchain_core.documents import Document +from langchain_core.tools.base import ToolException +from langchain_openai import OpenAIEmbeddings +from langchain_text_splitters import RecursiveCharacterTextSplitter +from pydantic import SecretStr + +from intentkit.config.config import config +from intentkit.models.tool import AgentToolData, AgentToolDataCreate + +logger = logging.getLogger(__name__) + + +class CrwDocumentProcessor: + """Handles document processing and sanitization for fastCRW content.""" + + @staticmethod + def sanitize_for_database(text: str) -> str: + """Sanitize text content to prevent database storage errors.""" + if not text: + return "" + + # Remove null bytes and other problematic characters + text = text.replace("\x00", "") + text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]", "", text) + + # Normalize whitespace + text = re.sub(r"\s+", " ", text) + text = text.strip() + + return text + + @staticmethod + def split_documents( + documents: list[Document], chunk_size: int = 1000, chunk_overlap: int = 200 + ) -> list[Document]: + """Split documents into smaller chunks for better indexing.""" + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=len, + ) + + split_docs = [] + for doc in documents: + # Sanitize content before splitting + sanitized_content = CrwDocumentProcessor.sanitize_for_database( + doc.page_content + ) + doc.page_content = sanitized_content + + # Split the document + chunks = text_splitter.split_documents([doc]) + split_docs.extend(chunks) + + return split_docs + + +class CrwVectorStoreManager: + """Manages vector store operations for fastCRW content.""" + + def __init__(self, embedding_api_key: str | None = None): + self._embedding_api_key = embedding_api_key + + def _resolve_api_key(self) -> str: + """Resolve the API key to use for embeddings.""" + if self._embedding_api_key: + return self._embedding_api_key + if config.openai_api_key: + return config.openai_api_key + raise ToolException("OpenAI API key not found in system configuration") + + def create_embeddings(self) -> OpenAIEmbeddings: + """Create OpenAI embeddings instance.""" + openai_api_key = self._resolve_api_key() + return OpenAIEmbeddings( + api_key=SecretStr(openai_api_key), model="text-embedding-3-small" + ) + + def encode_vector_store(self, vector_store: FAISS) -> dict[str, str]: + """Encode FAISS vector store to base64 for storage (compatible with web_scraper).""" + import base64 + import os + import tempfile + + try: + with tempfile.TemporaryDirectory() as temp_dir: + vector_store.save_local(temp_dir) + + encoded_files = {} + for filename in os.listdir(temp_dir): + file_path = os.path.join(temp_dir, filename) + if os.path.isfile(file_path): + with open(file_path, "rb") as f: + encoded_files[filename] = base64.b64encode(f.read()).decode( + "utf-8" + ) + + return encoded_files + except Exception as e: + logger.error("Error encoding vector store: %s", e) + raise + + def decode_vector_store( + self, encoded_files: dict[str, str], embeddings: OpenAIEmbeddings + ) -> FAISS: + """Decode base64 files back to FAISS vector store (compatible with web_scraper).""" + import base64 + import os + import tempfile + + try: + with tempfile.TemporaryDirectory() as temp_dir: + # Decode and write files + for filename, encoded_content in encoded_files.items(): + file_path = os.path.join(temp_dir, filename) + with open(file_path, "wb") as f: + f.write(base64.b64decode(encoded_content)) + + # Load vector store + return FAISS.load_local( + temp_dir, + embeddings, + allow_dangerous_deserialization=True, + ) + except Exception as e: + logger.error("Error decoding vector store: %s", e) + raise + + async def load_vector_store(self, agent_id: str) -> FAISS | None: + """Load existing vector store for an agent.""" + try: + vector_store_key = f"vector_store_{agent_id}" + stored_data = await AgentToolData.get( + agent_id, "web_scraper", vector_store_key + ) + + if not stored_data or "faiss_files" not in stored_data: + return None + + embeddings = self.create_embeddings() + return self.decode_vector_store(stored_data["faiss_files"], embeddings) + + except Exception as e: + logger.error("Error loading vector store for agent %s: %s", agent_id, e) + return None + + async def save_vector_store( + self, + agent_id: str, + vector_store: FAISS, + chunk_size: int = 1000, + chunk_overlap: int = 200, + ) -> None: + """Save vector store for an agent (compatible with web_scraper format).""" + try: + vector_store_key = f"vector_store_{agent_id}" + encoded_files = self.encode_vector_store(vector_store) + + # Use the same data structure as web_scraper + storage_data = { + "faiss_files": encoded_files, + "chunk_size": chunk_size, + "chunk_overlap": chunk_overlap, + } + + tool_data = AgentToolDataCreate( + agent_id=agent_id, + tool="web_scraper", + key=vector_store_key, + data=storage_data, + ) + await tool_data.save() + + except Exception as e: + logger.error("Error saving vector store for agent %s: %s", agent_id, e) + raise + + +class CrwMetadataManager: + """Manages metadata for fastCRW indexed content.""" + + @staticmethod + def create_url_metadata( + urls: list[str], documents: list[Document], source_type: str + ) -> dict[str, Any]: + """Create metadata for indexed URLs.""" + return { + "urls": urls, + "document_count": len(documents), + "source_type": source_type, + "indexed_at": str(len(urls)), # Simple counter + } + + @staticmethod + async def update_metadata(agent_id: str, new_metadata: dict[str, Any]) -> None: + """Update metadata for an agent.""" + try: + metadata_key = f"indexed_urls_{agent_id}" + tool_data = AgentToolDataCreate( + agent_id=agent_id, + tool="web_scraper", + key=metadata_key, + data=new_metadata, + ) + await tool_data.save() + except Exception as e: + logger.error("Error updating metadata for agent %s: %s", agent_id, e) + raise + + +async def index_documents( + documents: list[Document], + agent_id: str, + vector_manager: CrwVectorStoreManager, + chunk_size: int = 1000, + chunk_overlap: int = 200, +) -> tuple[int, bool]: + """ + Index documents into the fastCRW vector store. + + Args: + documents: List of documents to index + agent_id: Agent ID for storage + vector_manager: Vector store manager + chunk_size: Size of text chunks + chunk_overlap: Overlap between chunks + + Returns: + Tuple of (total_chunks, was_merged_with_existing) + """ + try: + # Initialize managers + # Split documents into chunks + split_docs = CrwDocumentProcessor.split_documents( + documents, chunk_size, chunk_overlap + ) + + if not split_docs: + logger.warning("No documents to index after splitting") + return 0, False + + # Create embeddings + embeddings = vector_manager.create_embeddings() + + # Try to load existing vector store + existing_vector_store = await vector_manager.load_vector_store(agent_id) + + if existing_vector_store: + # Add to existing vector store + existing_vector_store.add_documents(split_docs) + vector_store = existing_vector_store + was_merged = True + else: + # Create new vector store + vector_store = FAISS.from_documents(split_docs, embeddings) + was_merged = False + + # Save the vector store + await vector_manager.save_vector_store( + agent_id, vector_store, chunk_size, chunk_overlap + ) + + logger.info( + "Successfully indexed %s chunks for agent %s", len(split_docs), agent_id + ) + return len(split_docs), was_merged + + except Exception as e: + logger.error("Error indexing documents for agent %s: %s", agent_id, e) + raise + + +async def query_indexed_content( + query: str, + agent_id: str, + vector_manager: CrwVectorStoreManager, + max_results: int = 4, +) -> list[Document]: + """ + Query the fastCRW indexed content. + + Args: + query: Search query + agent_id: Agent ID + vector_manager: Manager for vector store persistence + max_results: Maximum number of results to return + + Returns: + List of relevant documents + """ + try: + # Initialize vector store manager + # Load vector store + vector_store = await vector_manager.load_vector_store(agent_id) + + if not vector_store: + logger.warning("No vector store found for agent %s", agent_id) + return [] + + # Perform similarity search + docs = vector_store.similarity_search(query, k=max_results) + + logger.info("Found %s documents for query: %s", len(docs), query) + return docs + + except Exception as e: + logger.error("Error querying indexed content for agent %s: %s", agent_id, e) + raise