The List Sources endpoint allows you to retrieve information about all documents in your GraphorLM project. This endpoint provides comprehensive details about each document’s status, processing information, and metadata, enabling you to monitor and manage your document collection programmatically.

Endpoint Overview

HTTP Method

GET

Authentication

This endpoint requires authentication using an API token. You must include your API token as a Bearer token in the Authorization header.

Learn how to create and manage API tokens in the API Tokens guide.

Request Format

Headers

HeaderValueRequired
AuthorizationBearer YOUR_API_TOKEN✅ Yes

Query Parameters

This endpoint does not require any query parameters. It returns all sources in the dataset associated with your API token.

Response Format

Success Response (200 OK)

The endpoint returns an array of source objects with comprehensive information about each document:

[
  {
    "status": "New",
    "message": "Source uploaded, awaiting processing",
    "file_name": "document.pdf",
    "file_size": 2048576,
    "file_type": "pdf",
    "file_source": "local file",
    "project_id": "550e8400-e29b-41d4-a716-446655440000",
    "project_name": "My Project",
    "partition_method": "basic"
  },
  {
    "status": "Completed",
    "message": "Source processed successfully",
    "file_name": "presentation.pptx",
    "file_size": 1024000,
    "file_type": "pptx",
    "file_source": "local file",
    "project_id": "550e8400-e29b-41d4-a716-446655440000",
    "project_name": "My Project",
    "partition_method": "yolox"
  },
  {
    "status": "Processing",
    "message": "Source is being processed",
    "file_name": "webpage-content",
    "file_size": 0,
    "file_type": "pdf",
    "file_source": "url",
    "project_id": "550e8400-e29b-41d4-a716-446655440000",
    "project_name": "My Project",
    "partition_method": "advanced"
  }
]

Response Fields

FieldTypeDescription
statusstringCurrent processing status (see status values below)
messagestringHuman-readable status description
file_namestringName of the source file or identifier
file_sizeintegerSize of the file in bytes (0 for URLs)
file_typestringFile extension or type
file_sourcestringSource type: local file, url, github, or youtube
project_idstringUUID of the project
project_namestringName of the project
partition_methodstringProcessing method used or applied

Status Values

File Source Types

Source TypeDescriptionTypical Use Cases
local fileFiles uploaded directly from your computerDocuments, PDFs, images, spreadsheets
urlContent imported from web URLsWeb pages, articles, online documents
githubContent imported from GitHub repositoriesCode documentation, README files, wikis
youtubeContent imported from YouTube videosVideo transcripts, educational content

Code Examples

JavaScript/Node.js

const listSources = async (apiToken) => {
  const response = await fetch('https://sources.graphorlm.com', {
    method: 'GET',
    headers: {
      'Authorization': `Bearer ${apiToken}`
    }
  });

  if (response.ok) {
    const sources = await response.json();
    console.log(`Found ${sources.length} sources`);
    return sources;
  } else {
    const error = await response.text();
    throw new Error(`Failed to list sources: ${response.status} ${error}`);
  }
};

// Usage
listSources('grlm_your_api_token_here')
  .then(sources => {
    sources.forEach(source => {
      console.log(`${source.file_name} - ${source.status}`);
    });
  })
  .catch(error => console.error('Error:', error));

Python

import requests

def list_sources(api_token):
    url = "https://sources.graphorlm.com"
    
    headers = {
        "Authorization": f"Bearer {api_token}"
    }
    
    response = requests.get(url, headers=headers, timeout=30)
    
    if response.status_code == 200:
        sources = response.json()
        print(f"Found {len(sources)} sources")
        return sources
    else:
        response.raise_for_status()

# Usage
try:
    sources = list_sources("grlm_your_api_token_here")
    for source in sources:
        print(f"{source['file_name']} - {source['status']}")
except requests.exceptions.RequestException as e:
    print(f"Error listing sources: {e}")

cURL

curl -X GET https://sources.graphorlm.com \
  -H "Authorization: Bearer grlm_your_api_token_here"

PHP

<?php
function listSources($apiToken) {
    $url = "https://sources.graphorlm.com";
    
    $headers = [
        "Authorization: Bearer " . $apiToken
    ];
    
    $ch = curl_init();
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch, CURLOPT_TIMEOUT, 30);
    
    $response = curl_exec($ch);
    $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
    curl_close($ch);
    
    if ($httpCode === 200) {
        return json_decode($response, true);
    } else {
        throw new Exception("Failed to list sources. HTTP code: " . $httpCode);
    }
}

// Usage
try {
    $sources = listSources("grlm_your_api_token_here");
    echo "Found " . count($sources) . " sources\n";
    
    foreach ($sources as $source) {
        echo $source['file_name'] . " - " . $source['status'] . "\n";
    }
} catch (Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
}
?>

Error Responses

Common Error Codes

Status CodeError TypeDescription
401UnauthorizedInvalid or missing API token
403ForbiddenAccess denied to the specified project
500Internal Server ErrorServer-side error retrieving sources

Error Response Format

{
  "detail": "Invalid authentication credentials"
}

Error Examples

Response Analysis

Filtering and Processing Results

def analyze_sources(sources):
    """Analyze sources by status and type."""
    status_counts = {}
    type_counts = {}
    total_size = 0
    
    for source in sources:
        # Count by status
        status = source.get('status', 'unknown')
        status_counts[status] = status_counts.get(status, 0) + 1
        
        # Count by file type
        file_type = source.get('file_type', 'unknown')
        type_counts[file_type] = type_counts.get(file_type, 0) + 1
        
        # Calculate total size
        total_size += source.get('file_size', 0)
    
    return {
        'total_sources': len(sources),
        'status_breakdown': status_counts,
        'type_breakdown': type_counts,
        'total_size_mb': round(total_size / (1024 * 1024), 2)
    }

# Usage
sources = list_sources("grlm_your_token")
analysis = analyze_sources(sources)
print(f"Analysis: {analysis}")

Status Monitoring

const monitorProcessingStatus = async (apiToken) => {
  const sources = await listSources(apiToken);
  
  const processing = sources.filter(s => s.status === 'Processing');
  const failed = sources.filter(s => s.status === 'Failed');
  const completed = sources.filter(s => s.status === 'Completed');
  
  console.log(`Processing: ${processing.length}`);
  console.log(`Failed: ${failed.length}`);
  console.log(`Completed: ${completed.length}`);
  
  // List files that need attention
  if (failed.length > 0) {
    console.log('\nFailed files that need attention:');
    failed.forEach(source => {
      console.log(`- ${source.file_name} (${source.file_type})`);
    });
  }
  
  return { processing, failed, completed };
};

Integration Examples

Project Health Check

import requests
from datetime import datetime

def project_health_check(api_token):
    """Perform a comprehensive health check of the project."""
    try:
        sources = list_sources(api_token)
        
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'total_sources': len(sources),
            'status_summary': {},
            'issues': [],
            'recommendations': []
        }
        
        # Analyze status distribution
        for source in sources:
            status = source.get('status', 'unknown')
            health_report['status_summary'][status] = health_report['status_summary'].get(status, 0) + 1
            
            # Identify issues
            if status == 'Failed':
                health_report['issues'].append(f"Failed processing: {source['file_name']}")
            elif status == 'unknown':
                health_report['issues'].append(f"Unknown status: {source['file_name']}")
        
        # Generate recommendations
        failed_count = health_report['status_summary'].get('Failed', 0)
        if failed_count > 0:
            health_report['recommendations'].append(f"Reprocess {failed_count} failed documents")
        
        processing_count = health_report['status_summary'].get('Processing', 0)
        if processing_count > 5:
            health_report['recommendations'].append("Monitor processing queue - high volume detected")
        
        return health_report
        
    except Exception as e:
        return {'error': str(e), 'timestamp': datetime.now().isoformat()}

# Usage
health = project_health_check("grlm_your_token")
print(f"Project Health Report: {health}")

Source Management Dashboard

class SourceManager {
  constructor(apiToken) {
    this.apiToken = apiToken;
  }
  
  async getDashboardData() {
    const sources = await listSources(this.apiToken);
    
    return {
      overview: this.getOverview(sources),
      recentUploads: this.getRecentUploads(sources),
      processingQueue: this.getProcessingQueue(sources),
      failedSources: this.getFailedSources(sources)
    };
  }
  
  getOverview(sources) {
    const totalSize = sources.reduce((sum, s) => sum + (s.file_size || 0), 0);
    
    return {
      totalSources: sources.length,
      totalSizeMB: Math.round(totalSize / (1024 * 1024)),
      byStatus: this.groupByField(sources, 'status'),
      byType: this.groupByField(sources, 'file_type'),
      bySource: this.groupByField(sources, 'file_source')
    };
  }
  
  getRecentUploads(sources, limit = 10) {
    return sources
      .filter(s => s.status === 'New' || s.status === 'Completed')
      .slice(0, limit)
      .map(s => ({
        name: s.file_name,
        status: s.status,
        type: s.file_type,
        sizeMB: Math.round((s.file_size || 0) / (1024 * 1024))
      }));
  }
  
  getProcessingQueue(sources) {
    return sources
      .filter(s => s.status === 'Processing')
      .map(s => ({
        name: s.file_name,
        type: s.file_type,
        method: s.partition_method
      }));
  }
  
  getFailedSources(sources) {
    return sources
      .filter(s => s.status === 'Failed')
      .map(s => ({
        name: s.file_name,
        type: s.file_type,
        message: s.message
      }));
  }
  
  groupByField(sources, field) {
    return sources.reduce((acc, source) => {
      const value = source[field] || 'unknown';
      acc[value] = (acc[value] || 0) + 1;
      return acc;
    }, {});
  }
}

// Usage
const manager = new SourceManager('grlm_your_token');
manager.getDashboardData()
  .then(dashboard => console.log('Dashboard:', dashboard))
  .catch(error => console.error('Error:', error));

Automated Processing Pipeline

import time
import logging

class ProcessingPipeline:
    def __init__(self, api_token):
        self.api_token = api_token
        self.logger = logging.getLogger(__name__)
    
    def monitor_and_process(self, check_interval=60):
        """Monitor sources and automatically handle processing."""
        while True:
            try:
                sources = list_sources(self.api_token)
                
                # Handle new uploads
                new_sources = [s for s in sources if s['status'] == 'New']
                if new_sources:
                    self.logger.info(f"Found {len(new_sources)} new sources")
                    # Add logic to trigger processing if needed
                
                # Handle failed sources
                failed_sources = [s for s in sources if s['status'] == 'Failed']
                if failed_sources:
                    self.logger.warning(f"Found {len(failed_sources)} failed sources")
                    self.handle_failed_sources(failed_sources)
                
                # Check processing status
                processing_sources = [s for s in sources if s['status'] == 'Processing']
                if processing_sources:
                    self.logger.info(f"{len(processing_sources)} sources currently processing")
                
                time.sleep(check_interval)
                
            except Exception as e:
                self.logger.error(f"Pipeline error: {e}")
                time.sleep(check_interval)
    
    def handle_failed_sources(self, failed_sources):
        """Handle failed sources - could implement retry logic."""
        for source in failed_sources:
            self.logger.info(f"Failed source: {source['file_name']} - {source['message']}")
            # Implement retry logic here if needed

# Usage
pipeline = ProcessingPipeline("grlm_your_token")
# pipeline.monitor_and_process()  # Uncomment to run monitoring

Best Practices

Performance Optimization

  • Cache results: Store the response locally for a reasonable period
  • Filter client-side: Process the full list to extract specific information you need
  • Monitor regularly: Set up automated checks for processing status
  • Batch operations: Use the list to plan batch processing operations

Data Management

  • Track processing times: Monitor how long documents take to process
  • Identify patterns: Look for file types or sizes that frequently fail
  • Maintain logs: Keep records of source management activities
  • Plan capacity: Use file counts and sizes for storage planning

Error Handling

  • Implement retries: Handle temporary network issues with exponential backoff
  • Log failures: Keep detailed logs of API failures for debugging
  • Monitor status: Regularly check for failed processing jobs
  • Graceful degradation: Have fallback plans when the API is unavailable

Troubleshooting

Next Steps

After successfully listing your sources: