Error Handling
Production patterns for handling errors gracefully in SoftQCOS applications.
Overviewβ
Robust error handling is essential for production quantum computing applications. This guide covers:
- All error types in the SDK
- Retry strategies
- Graceful degradation patterns
- Logging and monitoring
Error Typesβ
"""All exception types in softqcos_sdk."""
from softqcos_sdk import (
# Base error
QCOSError,
# Authentication
AuthenticationError,
# Validation
CircuitValidationError,
# Rate limiting & quotas
RateLimitError,
QuotaExceededError,
# Execution
ExecutionError,
TimeoutError,
# Azure Quantum specific
AzureQuantumError,
ProviderUnavailableError,
)
Error Hierarchyβ
QCOSError (base)
βββ AuthenticationError
βββ CircuitValidationError
βββ RateLimitError
βββ QuotaExceededError
βββ ExecutionError
β βββ TimeoutError
βββ AzureQuantumError
βββ ProviderUnavailableError
Basic Error Handlingβ
"""Basic error handling pattern."""
from softqcos_sdk import (
QCOSClient,
QCOSError,
AuthenticationError,
CircuitValidationError,
)
client = QCOSClient()
circuit = """
OPENQASM 2.0;
include "qelib1.inc";
qreg q[2];
creg c[2];
h q[0];
cx q[0], q[1];
measure q -> c;
"""
try:
result = client.execute(qasm=circuit, shots=1024)
print(f"Success! Counts: {result.counts}")
except AuthenticationError as e:
# API key issues - cannot retry
print(f"β Authentication failed: {e}")
print(" Check your API key is valid")
except CircuitValidationError as e:
# Circuit syntax issues - cannot retry
print(f"β Invalid circuit: {e}")
print(" Fix the circuit and try again")
except QCOSError as e:
# Other errors - may be retryable
print(f"β οΈ Error: {e}")
print(" This may be temporary, try again later")
Retry Patternsβ
Automatic Retry with Exponential Backoffβ
"""Automatic retry with exponential backoff."""
from softqcos_sdk import QCOSClient, QCOSError, RateLimitError
import time
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar('T')
def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (QCOSError, RateLimitError),
):
"""Decorator for automatic retry with exponential backoff."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
# Check if error has retry_after hint
if hasattr(e, 'retry_after') and e.retry_after:
delay = e.retry_after
else:
delay = min(base_delay * (2 ** attempt), max_delay)
if attempt < max_attempts - 1:
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.1f}s...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
client = QCOSClient()
@with_retry(max_attempts=3, base_delay=2.0)
def execute_circuit(circuit: str, shots: int = 1024):
return client.execute(qasm=circuit, shots=shots)
# This will automatically retry up to 3 times
result = execute_circuit(circuit)
Selective Retryβ
"""Only retry specific error types."""
from softqcos_sdk import (
QCOSClient,
QCOSError,
AuthenticationError,
CircuitValidationError,
RateLimitError,
TimeoutError,
)
import time
# Errors that should NOT be retried
NON_RETRYABLE = (
AuthenticationError, # Bad credentials
CircuitValidationError, # Invalid circuit
)
def execute_with_smart_retry(
client: QCOSClient,
circuit: str,
shots: int = 1024,
max_retries: int = 3,
):
"""Execute with intelligent retry logic."""
for attempt in range(max_retries):
try:
return client.execute(qasm=circuit, shots=shots)
except NON_RETRYABLE as e:
# Don't retry these - fail immediately
raise
except RateLimitError as e:
# Use server-provided retry delay
delay = e.retry_after or 30
print(f"Rate limited. Waiting {delay}s...")
time.sleep(delay)
except TimeoutError as e:
# Increase timeout on retry
print(f"Timeout on attempt {attempt + 1}. Retrying with longer timeout...")
# The next attempt might use extended timeout
except QCOSError as e:
# Generic error - exponential backoff
delay = 2 ** attempt
print(f"Error: {e}. Retrying in {delay}s...")
time.sleep(delay)
raise QCOSError(f"All {max_retries} attempts failed")
Graceful Degradationβ
Fallback to Simulatorβ
"""Fall back to simulator when QPU is unavailable."""
from softqcos_sdk import (
QCOSClient,
AzureQuantumError,
ProviderUnavailableError,
QuotaExceededError,
)
def execute_with_fallback(
client: QCOSClient,
circuit: str,
shots: int = 1024,
preferred_target: str = "ionq.aria-1",
):
"""Try QPU first, fall back to simulator if needed."""
# Define fallback chain
targets = [
preferred_target,
"ionq.simulator", # IonQ simulator (free)
"aer", # Local Aer simulator
]
last_error = None
for target in targets:
try:
print(f"Trying target: {target}")
if target in ["aer", "cirq", "pennylane"]:
# Local simulator
result = client.execute(qasm=circuit, backend=target, shots=shots)
else:
# Azure Quantum
result = client.azure_quantum.execute(
circuit=circuit,
target=target,
shots=shots,
)
print(f"β
Success with {target}")
result.used_target = target
return result
except ProviderUnavailableError as e:
print(f"β οΈ {target} unavailable: {e}")
last_error = e
except QuotaExceededError as e:
print(f"β οΈ Quota exceeded for {target}: {e}")
last_error = e
except AzureQuantumError as e:
print(f"β οΈ Azure error for {target}: {e}")
last_error = e
raise AzureQuantumError(f"All targets failed. Last error: {last_error}")
# Usage
result = execute_with_fallback(
client,
circuit,
preferred_target="ionq.aria-1",
)
print(f"Executed on: {result.used_target}")
Circuit Complexity Fallbackβ
"""Automatically reduce circuit complexity on failure."""
from softqcos_sdk import QCOSClient, CircuitValidationError
def execute_with_optimization_fallback(
client: QCOSClient,
circuit: str,
shots: int = 1024,
target: str = "ionq",
):
"""Try original circuit, optimize if it fails."""
# Attempt 1: Original circuit
try:
return client.execute(qasm=circuit, shots=shots)
except CircuitValidationError as e:
print(f"Original circuit failed: {e}")
# Attempt 2: Optimized circuit
try:
print("Attempting with optimized circuit...")
optimized = client.optimize(circuit, target=target)
print(f"Optimization reduced gates by {optimized.improvements.get('gate_reduction', 'N/A')}")
return client.execute(qasm=optimized.optimized_qasm, shots=shots)
except CircuitValidationError as e:
print(f"Optimized circuit also failed: {e}")
# Attempt 3: Aggressive optimization
try:
print("Attempting with aggressive optimization...")
optimized = client.optimize(
circuit,
target=target,
optimization_level=3, # Maximum optimization
)
return client.execute(qasm=optimized.optimized_qasm, shots=shots)
except CircuitValidationError as e:
raise CircuitValidationError(
f"Circuit cannot be executed even after optimization: {e}"
)
Async Error Handlingβ
"""Error handling with async client."""
import asyncio
from softqcos_sdk import AsyncQCOSClient, QCOSError, RateLimitError
async def execute_batch_with_error_handling(
circuits: list[str],
shots: int = 1024,
concurrency: int = 5,
):
"""Execute batch with per-circuit error handling."""
results = []
semaphore = asyncio.Semaphore(concurrency)
async def execute_one(circuit: str, index: int):
async with semaphore:
try:
async with AsyncQCOSClient() as client:
result = await client.execute(qasm=circuit, shots=shots)
return {"index": index, "success": True, "result": result}
except RateLimitError as e:
# Wait and retry once
await asyncio.sleep(e.retry_after or 30)
try:
async with AsyncQCOSClient() as client:
result = await client.execute(qasm=circuit, shots=shots)
return {"index": index, "success": True, "result": result}
except QCOSError as e:
return {"index": index, "success": False, "error": str(e)}
except QCOSError as e:
return {"index": index, "success": False, "error": str(e)}
# Execute all with error isolation
tasks = [execute_one(c, i) for i, c in enumerate(circuits)]
results = await asyncio.gather(*tasks)
# Summary
successful = sum(1 for r in results if r["success"])
print(f"Completed: {successful}/{len(circuits)} successful")
return results
# Usage
circuits = [create_circuit(i) for i in range(100)]
results = asyncio.run(execute_batch_with_error_handling(circuits))
# Process results
for r in results:
if r["success"]:
print(f"Circuit {r['index']}: {r['result'].counts}")
else:
print(f"Circuit {r['index']}: FAILED - {r['error']}")
Logging Errorsβ
"""Structured error logging."""
import logging
import json
from datetime import datetime
from softqcos_sdk import QCOSError
# Configure structured logger
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_error(self, error: Exception, context: dict = None):
"""Log error with structured data."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": "ERROR",
"error_type": type(error).__name__,
"error_message": str(error),
"context": context or {},
}
# Add error-specific fields
if hasattr(error, 'retry_after'):
log_data["retry_after"] = error.retry_after
if hasattr(error, 'status_code'):
log_data["status_code"] = error.status_code
self.logger.error(json.dumps(log_data))
def log_success(self, job_id: str, duration: float, context: dict = None):
"""Log successful execution."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": "INFO",
"event": "execution_success",
"job_id": job_id,
"duration_ms": duration * 1000,
"context": context or {},
}
self.logger.info(json.dumps(log_data))
# Usage
logger = StructuredLogger("quantum_app")
try:
result = client.execute(qasm=circuit, shots=1024)
logger.log_success(result.job_id, execution_time, {"circuit_type": "bell_state"})
except QCOSError as e:
logger.log_error(e, {"circuit_type": "bell_state", "shots": 1024})
raise
Error Recovery Patternsβ
Checkpoint and Resumeβ
"""Checkpoint progress for long-running jobs."""
import json
from pathlib import Path
from softqcos_sdk import QCOSClient, QCOSError
class CheckpointedExecution:
def __init__(self, checkpoint_file: str = "checkpoint.json"):
self.checkpoint_file = Path(checkpoint_file)
self.client = QCOSClient()
def load_checkpoint(self) -> dict:
"""Load progress from checkpoint."""
if self.checkpoint_file.exists():
with open(self.checkpoint_file) as f:
return json.load(f)
return {"completed": [], "failed": [], "pending": []}
def save_checkpoint(self, state: dict):
"""Save progress to checkpoint."""
with open(self.checkpoint_file, "w") as f:
json.dump(state, f)
def execute_batch(self, circuits: list[dict]):
"""Execute batch with checkpoint recovery."""
state = self.load_checkpoint()
# Determine what's left to do
completed_ids = set(state["completed"])
pending = [c for c in circuits if c["id"] not in completed_ids]
print(f"Resuming: {len(completed_ids)} done, {len(pending)} remaining")
for circuit_data in pending:
circuit_id = circuit_data["id"]
try:
result = self.client.execute(
qasm=circuit_data["circuit"],
shots=circuit_data.get("shots", 1024),
)
state["completed"].append(circuit_id)
state[circuit_id] = result.counts
print(f"β
{circuit_id}")
except QCOSError as e:
state["failed"].append({
"id": circuit_id,
"error": str(e),
})
print(f"β {circuit_id}: {e}")
# Save after each circuit
self.save_checkpoint(state)
return state
# Usage
executor = CheckpointedExecution("my_experiment.json")
circuits = [
{"id": f"circuit_{i}", "circuit": create_circuit(i), "shots": 1024}
for i in range(100)
]
# Can be interrupted and resumed
results = executor.execute_batch(circuits)
Best Practices Summaryβ
- Catch specific exceptions - Handle each error type appropriately
- Use exponential backoff - For rate limits and transient errors
- Never retry auth/validation errors - These won't resolve themselves
- Implement fallbacks - QPU β Simulator chain
- Log everything - Structured logging for debugging
- Checkpoint long jobs - Enable resume after failures
- Set timeouts - Don't wait forever
- Test error paths - Mock errors in unit tests
Next Stepsβ
- Best Practices - Production patterns
- Batch Processing - Handle many circuits
- Troubleshooting - Common issues and solutions