Skip to main content
The List sources endpoint returns every source in the project’s knowledge graph for the authenticated project. Each item includes file metadata (ID, name, size, type, origin), current processing status, and a human-readable message. You can optionally filter by file_id.

Endpoint overview

HTTP Method

GET

Authentication

This endpoint requires authentication using an API token. Include your API token as a Bearer token in the Authorization header.
Learn how to create and manage API tokens in the API Tokens guide.

Request format

Headers

HeaderValueRequired
AuthorizationBearer YOUR_API_TOKENYes

Query parameters

ParameterTypeRequiredDescription
file_idslist of stringsNoIf provided, only sources whose file_id is in this list are returned. Repeat the parameter for multiple IDs (e.g. ?file_ids=id1&file_ids=id2).
When file_ids is omitted, the response includes all sources in the project.

Response format

Success response (200 OK)

The endpoint returns a JSON array of source objects. Each object includes file metadata, status, and message:
[
  {
    "status": "New",
    "message": "Source uploaded, awaiting processing",
    "file_id": "file_abc123",
    "file_name": "document.pdf",
    "file_size": 2048576,
    "file_type": "pdf",
    "file_source": "local file",
    "project_id": "550e8400-e29b-41d4-a716-446655440000",
    "project_name": "My Project",
    "method": "fast"
  },
  {
    "status": "Completed",
    "message": "Source processed successfully",
    "file_id": "file_def456",
    "file_name": "presentation.pptx",
    "file_size": 1024000,
    "file_type": "pptx",
    "file_source": "local file",
    "project_id": "550e8400-e29b-41d4-a716-446655440000",
    "project_name": "My Project",
    "method": "balanced"
  },
  {
    "status": "Processing",
    "message": "Source is being processed",
    "file_id": "file_ghi789",
    "file_name": "webpage-content",
    "file_size": 0,
    "file_type": "pdf",
    "file_source": "url",
    "project_id": "550e8400-e29b-41d4-a716-446655440000",
    "project_name": "My Project",
    "method": "accurate"
  }
]

Response fields

FieldTypeDescription
statusstringCurrent processing status (from backend): e.g. New, Processing, Processed, Completed, Failed, Processing failed, or unknown
messagestringHuman-readable status message
file_idstringUnique identifier for the source (use for subsequent API calls)
file_namestringDisplay name of the source file or identifier
file_sizeintegerFile size in bytes (0 for URL/GitHub/YouTube sources)
file_typestringFile extension or type
file_sourcestringOrigin: local file, url, github, or youtube
project_idstringUUID of the project
project_namestringName of the project
methodstring | nullPartitioning strategy used: fast, balanced, accurate, vlm, or agentic (when available)

Status values

The status field is returned with capitalisation as stored in the backend (e.g. New, Completed). Typical values:
StatusMessageDescription
New”Source uploaded, awaiting processing”Upload/ingestion accepted; processing has not started yet
Processing”Source is being processed”Pipeline is running (partitioning, chunking, embedding)
Processed”Source has been processed”Source has been processed (intermediate or final state)
Completed”Source processed successfully”Ready for ask, extract, retrieve, and get elements
Failed”Source processing failed”Processing encountered an error; consider reprocessing
Processing failed”Source status: Processing failed”Pipeline failed during processing
unknown”Source status: …”Status could not be determined (e.g. when status is null)

File Source Types

Source TypeDescriptionTypical Use Cases
local fileFiles uploaded directly from your computerDocuments, PDFs, images, spreadsheets
urlContent imported from web URLsWeb pages, articles, online documents
githubContent imported from GitHub repositoriesCode documentation, README files, wikis
youtubeContent imported from YouTube videosVideo transcripts, educational content

Code examples

JavaScript/Node.js

const listSources = async (apiToken, fileIds = null) => {
  const url = new URL("https://sources.graphorlm.com");
  if (fileIds && fileIds.length > 0) {
    fileIds.forEach((id) => url.searchParams.append("file_ids", id));
  }
  const response = await fetch(url, {
    method: "GET",
    headers: { Authorization: `Bearer ${apiToken}` },
  });

  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed to list sources: ${response.status} ${error}`);
  }
  const sources = await response.json();
  console.log(`Found ${sources.length} sources`);
  return sources;
};

// Usage: list all sources
listSources("grlm_your_api_token_here").then((sources) => {
  sources.forEach((s) => console.log(`${s.file_name} - ${s.status}`));
});

// Usage: filter by file_ids
listSources("grlm_your_api_token_here", ["file_abc123", "file_def456"]);

Python

import requests

def list_sources(api_token, file_ids=None):
    url = "https://sources.graphorlm.com"
    headers = {"Authorization": f"Bearer {api_token}"}
    params = {}
    if file_ids:
        params["file_ids"] = file_ids  # list: requests sends as file_ids=id1&file_ids=id2
    response = requests.get(url, headers=headers, params=params, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage: list all sources
sources = list_sources("grlm_your_api_token_here")
for source in sources:
    print(f"{source['file_name']} - {source['status']}")

# Usage: filter by file_ids
sources = list_sources("grlm_your_api_token_here", file_ids=["file_abc123", "file_def456"])

cURL

# List all sources
curl -X GET https://sources.graphorlm.com \
  -H "Authorization: Bearer grlm_your_api_token_here"
# List only specific sources (repeat file_ids for each)
curl -X GET "https://sources.graphorlm.com?file_ids=file_abc123&file_ids=file_def456" \
  -H "Authorization: Bearer grlm_your_api_token_here"

Error responses

Common error codes

Status codeDescription
401Invalid or missing API token
403Access denied to the project
500Unexpected internal error while retrieving sources

Error response format

{
  "detail": "Failed to retrieve sources"
}

Response analysis

Filtering and processing results

def analyze_sources(sources):
    """Analyze sources by status and type."""
    status_counts = {}
    type_counts = {}
    total_size = 0
    for source in sources:
        status = source.get("status", "unknown")
        status_counts[status] = status_counts.get(status, 0) + 1
        file_type = source.get("file_type", "unknown")
        type_counts[file_type] = type_counts.get(file_type, 0) + 1
        total_size += source.get("file_size", 0)
    return {
        "total_sources": len(sources),
        "status_breakdown": status_counts,
        "type_breakdown": type_counts,
        "total_size_mb": round(total_size / (1024 * 1024), 2),
    }

sources = list_sources("grlm_your_token")
analysis = analyze_sources(sources)
print("Analysis:", analysis)

Status monitoring

const monitorProcessingStatus = async (apiToken) => {
  const sources = await listSources(apiToken);
  const processing = sources.filter((s) => s.status === "Processing");
  const failed = sources.filter((s) => s.status === "Failed" || s.status === "Processing failed");
  const completed = sources.filter((s) => s.status === "Completed" || s.status === "Processed");
  console.log(`Processing: ${processing.length}, Failed: ${failed.length}, Completed: ${completed.length}`);
  if (failed.length > 0) {
    failed.forEach((s) => console.log(`Failed: ${s.file_name} (${s.file_type})`));
  }
  return { processing, failed, completed };
};

Integration Examples

Project Health Check

import requests
from datetime import datetime

def project_health_check(api_token):
    """Perform a comprehensive health check of the project."""
    try:
        sources = list_sources(api_token)
        
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'total_sources': len(sources),
            'status_summary': {},
            'issues': [],
            'recommendations': []
        }
        
        # Analyze status distribution
        for source in sources:
            status = source.get('status', 'unknown')
            health_report['status_summary'][status] = health_report['status_summary'].get(status, 0) + 1
            
            # Identify issues
            if status in ("Failed", "Processing failed"):
                health_report["issues"].append(f"Failed processing: {source['file_name']}")
            elif status == "unknown":
                health_report["issues"].append(f"Unknown status: {source['file_name']}")
        
        failed_count = health_report["status_summary"].get("Failed", 0) + health_report["status_summary"].get("Processing failed", 0)
        if failed_count > 0:
            health_report["recommendations"].append(f"Reprocess {failed_count} failed documents")
        
        processing_count = health_report["status_summary"].get("Processing", 0)
        if processing_count > 5:
            health_report["recommendations"].append("Monitor processing queue - high volume detected")
        
        return health_report
        
    except Exception as e:
        return {'error': str(e), 'timestamp': datetime.now().isoformat()}

# Usage
health = project_health_check("grlm_your_token")
print(f"Project Health Report: {health}")

Source Management Dashboard

class SourceManager {
  constructor(apiToken) {
    this.apiToken = apiToken;
  }
  
  async getDashboardData() {
    const sources = await listSources(this.apiToken);
    
    return {
      overview: this.getOverview(sources),
      recentUploads: this.getRecentUploads(sources),
      processingQueue: this.getProcessingQueue(sources),
      failedSources: this.getFailedSources(sources)
    };
  }
  
  getOverview(sources) {
    const totalSize = sources.reduce((sum, s) => sum + (s.file_size || 0), 0);
    
    return {
      totalSources: sources.length,
      totalSizeMB: Math.round(totalSize / (1024 * 1024)),
      byStatus: this.groupByField(sources, 'status'),
      byType: this.groupByField(sources, 'file_type'),
      bySource: this.groupByField(sources, 'file_source')
    };
  }
  
  getRecentUploads(sources, limit = 10) {
    return sources
      .filter(s => s.status === "New" || s.status === "Processed" || s.status === "Completed")
      .slice(0, limit)
      .map(s => ({
        name: s.file_name,
        status: s.status,
        type: s.file_type,
        sizeMB: Math.round((s.file_size || 0) / (1024 * 1024))
      }));
  }
  
  getProcessingQueue(sources) {
    return sources
      .filter(s => s.status === "Processing")
      .map(s => ({
        name: s.file_name,
        type: s.file_type,
        method: s.method
      }));
  }
  
  getFailedSources(sources) {
    return sources
      .filter(s => s.status === "Failed" || s.status === "Processing failed")
      .map(s => ({
        name: s.file_name,
        type: s.file_type,
        message: s.message
      }));
  }
  
  groupByField(sources, field) {
    return sources.reduce((acc, source) => {
      const value = source[field] || 'unknown';
      acc[value] = (acc[value] || 0) + 1;
      return acc;
    }, {});
  }
}

// Usage
const manager = new SourceManager('grlm_your_token');
manager.getDashboardData()
  .then(dashboard => console.log('Dashboard:', dashboard))
  .catch(error => console.error('Error:', error));

Automated Processing Pipeline

import time
import logging

class ProcessingPipeline:
    def __init__(self, api_token):
        self.api_token = api_token
        self.logger = logging.getLogger(__name__)
    
    def monitor_and_process(self, check_interval=60):
        """Monitor sources and automatically handle processing."""
        while True:
            try:
                sources = list_sources(self.api_token)
                
                new_sources = [s for s in sources if s["status"] == "New"]
                if new_sources:
                    self.logger.info(f"Found {len(new_sources)} new sources")
                
                failed_sources = [s for s in sources if s["status"] in ("Failed", "Processing failed")]
                if failed_sources:
                    self.logger.warning(f"Found {len(failed_sources)} failed sources")
                    self.handle_failed_sources(failed_sources)
                
                processing_sources = [s for s in sources if s["status"] == "Processing"]
                if processing_sources:
                    self.logger.info(f"{len(processing_sources)} sources currently processing")
                
                time.sleep(check_interval)
                
            except Exception as e:
                self.logger.error(f"Pipeline error: {e}")
                time.sleep(check_interval)
    
    def handle_failed_sources(self, failed_sources):
        """Handle failed sources - could implement retry logic."""
        for source in failed_sources:
            self.logger.info(f"Failed source: {source['file_name']} - {source['message']}")
            # Implement retry logic here if needed

# Usage
pipeline = ProcessingPipeline("grlm_your_token")
# pipeline.monitor_and_process()  # Uncomment to run monitoring

Best Practices

Performance Optimization

  • Cache results: Store the response locally for a reasonable period
  • Filter client-side: Process the full list to extract specific information you need
  • Monitor regularly: Set up automated checks for processing status
  • Batch operations: Use the list to plan batch processing operations

Data Management

  • Track processing times: Monitor how long documents take to process
  • Identify patterns: Look for file types or sizes that frequently fail
  • Maintain logs: Keep records of source management activities
  • Plan capacity: Use file counts and sizes for storage planning

Error Handling

  • Implement retries: Handle temporary network issues with exponential backoff
  • Log failures: Keep detailed logs of API failures for debugging
  • Monitor status: Regularly check for failed processing jobs
  • Graceful degradation: Have fallback plans when the API is unavailable

Troubleshooting

Causes: Large number of sources, server load, or network issuesSolutions:
  • Implement request timeouts (30+ seconds recommended)
  • Use response caching for non-critical applications
  • Consider pagination if available in future API versions
Causes: No sources in project, wrong API token, or permission issuesSolutions:
  • Verify you have uploaded documents to your project
  • Check that your API token is correct and active
  • Ensure you’re accessing the right project
Causes: Processing lag, system sync issues, or database inconsistenciesSolutions:
  • Wait a few minutes and retry the request
  • Check the Graphor dashboard for accurate status
  • Contact support if inconsistencies persist
Causes: Invalid token, expired token, or revoked accessSolutions:
  • Verify token format and validity
  • Check token hasn’t been revoked in dashboard
  • Generate a new token if necessary

Next steps

After listing your sources:

Upload sources

Ingest files, URLs, GitHub repos, or YouTube videos (async)

Reprocess source

Re-process an existing source with a different partition method (async)

Delete source

Remove a source from your project

Get build status

Poll status and optional elements for an async ingestion or reprocess