Retrieve all documents in your project using the Graphor Python SDK
The list method allows you to retrieve information about all documents in your Graphor project. This method provides comprehensive details about each document’s status, processing information, and metadata, enabling you to monitor and manage your document collection programmatically.
from graphor import Graphorclient = Graphor()# List all sources in the projectsources = client.sources.list()print(f"Found {len(sources)} sources")for source in sources: print(f"{source.file_name} - {source.status}")
from graphor import Graphorclient = Graphor()sources = client.sources.list()# Filter by statuscompleted = [s for s in sources if s.status == "Completed"]processing = [s for s in sources if s.status == "Processing"]failed = [s for s in sources if s.status == "Failed"]new = [s for s in sources if s.status == "New"]print(f"Completed: {len(completed)}")print(f"Processing: {len(processing)}")print(f"Failed: {len(failed)}")print(f"New: {len(new)}")
from graphor import Graphorclient = Graphor()sources = client.sources.list()# Filter by file typepdf_files = [s for s in sources if s.file_type == "pdf"]docx_files = [s for s in sources if s.file_type == "docx"]images = [s for s in sources if s.file_type in ("png", "jpg", "jpeg")]print(f"PDFs: {len(pdf_files)}")print(f"Word docs: {len(docx_files)}")print(f"Images: {len(images)}")# List all PDF filesfor pdf in pdf_files: size_mb = pdf.file_size / (1024 * 1024) print(f" {pdf.file_name} ({size_mb:.2f} MB)")
from graphor import Graphorclient = Graphor()sources = client.sources.list()# Filter by source typelocal_files = [s for s in sources if s.file_source == "local file"]url_sources = [s for s in sources if s.file_source == "url"]github_sources = [s for s in sources if s.file_source == "github"]youtube_sources = [s for s in sources if s.file_source == "youtube"]print(f"Local files: {len(local_files)}")print(f"URL sources: {len(url_sources)}")print(f"GitHub repos: {len(github_sources)}")print(f"YouTube videos: {len(youtube_sources)}")
from graphor import Graphorclient = Graphor()def monitor_processing_status(): """Monitor and report on processing status.""" sources = client.sources.list() processing = [s for s in sources if s.status == "Processing"] failed = [s for s in sources if s.status == "Failed"] completed = [s for s in sources if s.status == "Completed"] new = [s for s in sources if s.status == "New"] print("=" * 50) print("Processing Status Report") print("=" * 50) print(f"✅ Completed: {len(completed)}") print(f"⏳ Processing: {len(processing)}") print(f"🆕 New: {len(new)}") print(f"❌ Failed: {len(failed)}") print("=" * 50) # List files currently processing if processing: print("\n📋 Currently Processing:") for source in processing: print(f" • {source.file_name} ({source.partition_method})") # List failed files that need attention if failed: print("\n⚠️ Failed Files (need attention):") for source in failed: print(f" • {source.file_name}: {source.message}") return { "completed": completed, "processing": processing, "new": new, "failed": failed }# Usagestatus = monitor_processing_status()
from graphor import Graphorclient = Graphor()def find_source(file_name: str): """Find a source by exact file name.""" sources = client.sources.list() for source in sources: if source.file_name == file_name: return source return Nonedef search_sources(query: str): """Search sources by partial name match.""" sources = client.sources.list() matches = [s for s in sources if query.lower() in s.file_name.lower()] return matches# Usage# Find exact matchsource = find_source("document.pdf")if source: print(f"Found: {source.file_name} - {source.status}")else: print("Source not found")# Search by partial namematches = search_sources("report")print(f"Found {len(matches)} sources matching 'report'")for match in matches: print(f" • {match.file_name}")
from graphor import Graphorimport graphorfrom dataclasses import dataclassfrom typing import Optional@dataclassclass SourceSummary: total: int completed: int processing: int failed: int new: int total_size_mb: floatclass SourceManager: def __init__(self, api_key: Optional[str] = None): self.client = Graphor(api_key=api_key) if api_key else Graphor() self._cache = None def refresh(self): """Refresh the sources cache.""" self._cache = self.client.sources.list() return self._cache @property def sources(self): """Get sources (cached).""" if self._cache is None: self.refresh() return self._cache def get_summary(self) -> SourceSummary: """Get a summary of all sources.""" sources = self.sources total_size = sum(s.file_size for s in sources) return SourceSummary( total=len(sources), completed=len([s for s in sources if s.status == "Completed"]), processing=len([s for s in sources if s.status == "Processing"]), failed=len([s for s in sources if s.status == "Failed"]), new=len([s for s in sources if s.status == "New"]), total_size_mb=round(total_size / (1024 * 1024), 2) ) def find_by_name(self, name: str): """Find a source by exact name.""" for source in self.sources: if source.file_name == name: return source return None def search(self, query: str): """Search sources by partial name match.""" return [s for s in self.sources if query.lower() in s.file_name.lower()] def filter_by_status(self, status: str): """Filter sources by status.""" return [s for s in self.sources if s.status == status] def filter_by_type(self, file_type: str): """Filter sources by file type.""" return [s for s in self.sources if s.file_type == file_type] def get_failed(self): """Get all failed sources.""" return self.filter_by_status("Failed") def get_processing(self): """Get all processing sources.""" return self.filter_by_status("Processing")# Usagemanager = SourceManager()# Get summarysummary = manager.get_summary()print(f"Total: {summary.total}, Completed: {summary.completed}, Failed: {summary.failed}")# Find a specific sourcesource = manager.find_by_name("document.pdf")if source: print(f"Found: {source.file_name} - {source.status}")# Search sourcesmatches = manager.search("report")print(f"Found {len(matches)} matches for 'report'")# Get failed sourcesfailed = manager.get_failed()print(f"Failed sources: {len(failed)}")
import timefrom graphor import Graphorimport graphorclient = Graphor()def continuous_monitoring(interval_seconds: int = 60, max_iterations: int = None): """Continuously monitor source processing status.""" iteration = 0 while max_iterations is None or iteration < max_iterations: try: sources = client.sources.list() processing = len([s for s in sources if s.status == "Processing"]) failed = len([s for s in sources if s.status == "Failed"]) completed = len([s for s in sources if s.status == "Completed"]) print(f"[{time.strftime('%H:%M:%S')}] " f"✅ {completed} | ⏳ {processing} | ❌ {failed}") # Alert if new failures detected if failed > 0: failed_sources = [s for s in sources if s.status == "Failed"] print(f" ⚠️ Failed sources: {[s.file_name for s in failed_sources]}") time.sleep(interval_seconds) iteration += 1 except graphor.APIConnectionError as e: print(f"[{time.strftime('%H:%M:%S')}] Connection error: {e}") time.sleep(interval_seconds) except KeyboardInterrupt: print("\nMonitoring stopped") break# Usage (monitor every 30 seconds, 10 times)# continuous_monitoring(interval_seconds=30, max_iterations=10)
Cache results: Store the response locally when making multiple queries
Filter client-side: The SDK returns all sources; filter in your code as needed
Use async: For applications that need to perform other work while waiting
Copy
# Example: Cache sources for multiple operationssources = client.sources.list()# Now perform multiple filter operations without re-fetchingpdfs = [s for s in sources if s.file_type == "pdf"]completed = [s for s in sources if s.status == "Completed"]large_files = [s for s in sources if s.file_size > 10 * 1024 * 1024]