Execute deployed flows in your GraphorLM project to process queries and generate responses through your configured RAG pipeline.

Overview

The Run Flow endpoint allows you to execute deployed flows by sending queries and receiving processed results. Each flow represents a complete RAG pipeline that can include document retrieval, LLM processing, and custom logic.

  • Method: POST
  • URL: https://{flow_name}.flows.graphorlm.com
  • Authentication: Required (API Token)

Authentication

All requests must include a valid API token in the Authorization header:

Authorization: Bearer YOUR_API_TOKEN

Learn how to generate API tokens in the API Tokens guide.

Request Format

Headers

HeaderValueRequired
AuthorizationBearer YOUR_API_TOKENYes
Content-Typeapplication/jsonYes

Request Body

The request body should be a JSON object with the following optional fields:

FieldTypeRequiredDescription
querystringNoThe query or question to process through the flow
pageintegerNoPage number for paginated results (default: 1)
page_sizeintegerNoNumber of items per page for results

Example Request

{
  "query": "What are the key features of attention mechanisms in transformers?",
  "page": 1,
  "page_size": 10
}

Response Format

Success Response (200 OK)

The response contains processed results from your flow execution with pagination information:

{
  "items": [
    {
      "page_content": "Attention mechanisms allow models to focus on relevant parts of the input...",
      "metadata": {
        "source": "attention_paper.pdf",
        "page": 3,
        "chunk_id": "chunk_123",
        "relevance_score": 0.95
      }
    }
  ],
  "total": 1,
  "page": 1,
  "page_size": 10,
  "total_pages": 1
}

Response Fields

FieldTypeDescription
itemsarrayArray of result documents with content and metadata
totalintegerTotal number of results found
pageintegerCurrent page number
page_sizeintegerNumber of items per page
total_pagesintegerTotal number of pages available

Document Object Structure

Each item in the items array contains:

FieldTypeDescription
page_contentstringThe main content or answer from the document
metadataobjectAdditional information about the document

Common metadata fields include:

  • source: Source file or document name
  • page: Page number in the original document
  • chunk_id: Unique identifier for the text chunk
  • relevance_score: Relevance score for the query (if available)

Code Examples

JavaScript/Node.js

async function runFlow(flowName, query) {
  const response = await fetch(`https://${flowName}.flows.graphorlm.com`, {
    method: 'POST',
    headers: {
      'Authorization': 'Bearer YOUR_API_TOKEN',
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      query: query,
      page: 1,
      page_size: 10
    })
  });

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  return await response.json();
}

// Usage
runFlow('my-rag-pipeline', 'What is machine learning?')
  .then(result => {
    console.log('Flow Results:', result);
    result.items.forEach(item => {
      console.log('Content:', item.page_content);
      console.log('Source:', item.metadata.source);
    });
  })
  .catch(error => console.error('Error:', error));

Python

import requests
import json

def run_flow(flow_name, query, api_token, page=1, page_size=10):
    url = f"https://{flow_name}.flows.graphorlm.com"
    
    headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "query": query,
        "page": page,
        "page_size": page_size
    }
    
    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    
    return response.json()

# Usage
try:
    result = run_flow(
        flow_name="my-rag-pipeline",
        query="What is machine learning?",
        api_token="YOUR_API_TOKEN"
    )
    
    print(f"Found {result['total']} results")
    for item in result['items']:
        print(f"Content: {item['page_content']}")
        print(f"Source: {item['metadata'].get('source', 'Unknown')}")
        print("---")
        
except requests.exceptions.HTTPError as e:
    print(f"Error: {e}")

cURL

curl -X POST https://my-rag-pipeline.flows.graphorlm.com \
  -H "Authorization: Bearer YOUR_API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "What is machine learning?",
    "page": 1,
    "page_size": 10
  }'

PHP

<?php
function runFlow($flowName, $query, $apiToken, $page = 1, $pageSize = 10) {
    $url = "https://{$flowName}.flows.graphorlm.com";
    
    $data = [
        'query' => $query,
        'page' => $page,
        'page_size' => $pageSize
    ];
    
    $options = [
        'http' => [
            'header' => [
                "Authorization: Bearer {$apiToken}",
                "Content-Type: application/json"
            ],
            'method' => 'POST',
            'content' => json_encode($data)
        ]
    ];
    
    $context = stream_context_create($options);
    $result = file_get_contents($url, false, $context);
    
    if ($result === FALSE) {
        throw new Exception('Failed to execute flow');
    }
    
    return json_decode($result, true);
}

// Usage
try {
    $result = runFlow(
        'my-rag-pipeline',
        'What is machine learning?',
        'YOUR_API_TOKEN'
    );
    
    echo "Found {$result['total']} results\n";
    foreach ($result['items'] as $item) {
        echo "Content: {$item['page_content']}\n";
        echo "Source: " . ($item['metadata']['source'] ?? 'Unknown') . "\n";
        echo "---\n";
    }
} catch (Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
}
?>

Error Responses

Common Error Codes

Status CodeDescriptionExample Response
400Bad Request - Invalid query parameters{"detail": "Invalid request format"}
401Unauthorized - Invalid or missing API token{"detail": "Invalid authentication credentials"}
404Not Found - Flow not found or not deployed{"detail": "Flow not found"}
500Internal Server Error - Flow execution failed{"detail": "Error running flow: execution failed"}

Error Response Format

{
  "detail": "Error message describing what went wrong"
}

Example Error Responses

Invalid API Token

{
  "detail": "Invalid authentication credentials"
}

Flow Not Found

{
  "detail": "Flow not found"
}

Flow Not Deployed

{
  "detail": "No deployed revision found for flow my-rag-pipeline"
}

Flow Execution Error

{
  "detail": "Error running flow: execution failed"
}

Flow Requirements

Before you can execute a flow, ensure:

  1. Flow is Deployed: The flow must have a deployed revision
  2. Valid Configuration: All flow components must be properly configured
  3. Source Documents: Required documents must be uploaded and processed
  4. API Access: Your API token must have access to the project

Use the Deploy Flow endpoint to deploy your flow before execution.

Pagination

The Run Flow endpoint supports pagination for large result sets:

Request Pagination Parameters

{
  "query": "your query here",
  "page": 2,
  "page_size": 20
}

Response Pagination Information

{
  "items": [...],
  "total": 156,
  "page": 2,
  "page_size": 20,
  "total_pages": 8
}

Pagination Best Practices

  • Use reasonable page sizes (10-50 items)
  • Handle empty result sets gracefully
  • Implement proper navigation between pages
  • Cache results when appropriate

Integration Examples

Complete RAG Query System

class GraphorLMClient {
  constructor(flowName, apiToken) {
    this.flowName = flowName;
    this.apiToken = apiToken;
    this.baseUrl = `https://${flowName}.flows.graphorlm.com`;
  }

  async query(question, options = {}) {
    const {
      page = 1,
      pageSize = 10,
      includeMetadata = true
    } = options;

    try {
      const response = await fetch(this.baseUrl, {
        method: 'POST',
        headers: {
          'Authorization': `Bearer ${this.apiToken}`,
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          query: question,
          page,
          page_size: pageSize
        })
      });

      if (!response.ok) {
        throw new Error(`Flow execution failed: ${response.status}`);
      }

      const result = await response.json();
      
      return {
        answer: this.extractMainAnswer(result),
        sources: this.extractSources(result),
        pagination: {
          total: result.total,
          page: result.page,
          totalPages: result.total_pages
        },
        metadata: includeMetadata ? result : null
      };
    } catch (error) {
      console.error('Query failed:', error);
      throw error;
    }
  }

  extractMainAnswer(result) {
    // Extract the primary answer from the first result
    return result.items[0]?.page_content || 'No answer found';
  }

  extractSources(result) {
    // Extract unique sources from all results
    const sources = new Set();
    result.items.forEach(item => {
      if (item.metadata?.source) {
        sources.add(item.metadata.source);
      }
    });
    return Array.from(sources);
  }
}

// Usage
const client = new GraphorLMClient('my-rag-pipeline', 'YOUR_API_TOKEN');

client.query('What are the benefits of using transformers?')
  .then(result => {
    console.log('Answer:', result.answer);
    console.log('Sources:', result.sources);
    console.log('Total Results:', result.pagination.total);
  })
  .catch(error => console.error('Error:', error));

Batch Query Processing

import asyncio
import aiohttp
import json
from typing import List, Dict, Any

class FlowExecutor:
    def __init__(self, flow_name: str, api_token: str):
        self.flow_name = flow_name
        self.api_token = api_token
        self.url = f"https://{flow_name}.flows.graphorlm.com"
        
    async def execute_single_query(self, session: aiohttp.ClientSession, query: str) -> Dict[str, Any]:
        headers = {
            "Authorization": f"Bearer {self.api_token}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "query": query,
            "page": 1,
            "page_size": 5
        }
        
        async with session.post(self.url, headers=headers, json=payload) as response:
            if response.status == 200:
                return {
                    "query": query,
                    "status": "success",
                    "result": await response.json()
                }
            else:
                error_text = await response.text()
                return {
                    "query": query,
                    "status": "error",
                    "error": error_text
                }
    
    async def execute_batch_queries(self, queries: List[str]) -> List[Dict[str, Any]]:
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.execute_single_query(session, query) 
                for query in queries
            ]
            return await asyncio.gather(*tasks, return_exceptions=True)

# Usage
async def main():
    executor = FlowExecutor("my-rag-pipeline", "YOUR_API_TOKEN")
    
    queries = [
        "What is machine learning?",
        "How do neural networks work?",
        "What are the applications of AI?",
        "Explain deep learning concepts"
    ]
    
    results = await executor.execute_batch_queries(queries)
    
    for result in results:
        if isinstance(result, dict) and result["status"] == "success":
            print(f"Query: {result['query']}")
            print(f"Results: {len(result['result']['items'])}")
            print("---")
        else:
            print(f"Failed query: {result}")

# Run the batch processing
# asyncio.run(main())

Best Practices

Query Optimization

  • Be Specific: More specific queries yield better results
  • Use Context: Include relevant context in your queries
  • Test Different Phrasings: Try variations of your questions

Performance

  • Use Pagination: Don’t request all results at once
  • Cache Results: Cache responses for repeated queries
  • Batch Processing: Use async/parallel processing for multiple queries

Error Handling

  • Implement Retries: Add retry logic for transient failures
  • Validate Responses: Check response structure before processing
  • Log Errors: Keep detailed logs for debugging

Security

  • Protect API Tokens: Never expose tokens in client-side code
  • Use HTTPS: Always use secure connections
  • Rate Limiting: Implement client-side rate limiting

Troubleshooting

Next Steps

After successfully running flows, you might want to: