Skip to main content

Error Handling

Production patterns for handling errors gracefully in SoftQCOS applications.

Overview​

Robust error handling is essential for production quantum computing applications. This guide covers:

  • All error types in the SDK
  • Retry strategies
  • Graceful degradation patterns
  • Logging and monitoring

Error Types​

"""All exception types in softqcos_sdk."""

from softqcos_sdk import (
# Base error
QCOSError,

# Authentication
AuthenticationError,

# Validation
CircuitValidationError,

# Rate limiting & quotas
RateLimitError,
QuotaExceededError,

# Execution
ExecutionError,
TimeoutError,

# Azure Quantum specific
AzureQuantumError,
ProviderUnavailableError,
)

Error Hierarchy​

QCOSError (base)
β”œβ”€β”€ AuthenticationError
β”œβ”€β”€ CircuitValidationError
β”œβ”€β”€ RateLimitError
β”œβ”€β”€ QuotaExceededError
β”œβ”€β”€ ExecutionError
β”‚ └── TimeoutError
└── AzureQuantumError
└── ProviderUnavailableError

Basic Error Handling​

"""Basic error handling pattern."""

from softqcos_sdk import (
QCOSClient,
QCOSError,
AuthenticationError,
CircuitValidationError,
)

client = QCOSClient()

circuit = """
OPENQASM 2.0;
include "qelib1.inc";
qreg q[2];
creg c[2];
h q[0];
cx q[0], q[1];
measure q -> c;
"""

try:
result = client.execute(qasm=circuit, shots=1024)
print(f"Success! Counts: {result.counts}")

except AuthenticationError as e:
# API key issues - cannot retry
print(f"❌ Authentication failed: {e}")
print(" Check your API key is valid")

except CircuitValidationError as e:
# Circuit syntax issues - cannot retry
print(f"❌ Invalid circuit: {e}")
print(" Fix the circuit and try again")

except QCOSError as e:
# Other errors - may be retryable
print(f"⚠️ Error: {e}")
print(" This may be temporary, try again later")

Retry Patterns​

Automatic Retry with Exponential Backoff​

"""Automatic retry with exponential backoff."""

from softqcos_sdk import QCOSClient, QCOSError, RateLimitError
import time
from functools import wraps
from typing import TypeVar, Callable

T = TypeVar('T')

def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (QCOSError, RateLimitError),
):
"""Decorator for automatic retry with exponential backoff."""

def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None

for attempt in range(max_attempts):
try:
return func(*args, **kwargs)

except retryable_exceptions as e:
last_exception = e

# Check if error has retry_after hint
if hasattr(e, 'retry_after') and e.retry_after:
delay = e.retry_after
else:
delay = min(base_delay * (2 ** attempt), max_delay)

if attempt < max_attempts - 1:
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.1f}s...")
time.sleep(delay)

raise last_exception

return wrapper
return decorator


# Usage
client = QCOSClient()

@with_retry(max_attempts=3, base_delay=2.0)
def execute_circuit(circuit: str, shots: int = 1024):
return client.execute(qasm=circuit, shots=shots)

# This will automatically retry up to 3 times
result = execute_circuit(circuit)

Selective Retry​

"""Only retry specific error types."""

from softqcos_sdk import (
QCOSClient,
QCOSError,
AuthenticationError,
CircuitValidationError,
RateLimitError,
TimeoutError,
)
import time

# Errors that should NOT be retried
NON_RETRYABLE = (
AuthenticationError, # Bad credentials
CircuitValidationError, # Invalid circuit
)

def execute_with_smart_retry(
client: QCOSClient,
circuit: str,
shots: int = 1024,
max_retries: int = 3,
):
"""Execute with intelligent retry logic."""

for attempt in range(max_retries):
try:
return client.execute(qasm=circuit, shots=shots)

except NON_RETRYABLE as e:
# Don't retry these - fail immediately
raise

except RateLimitError as e:
# Use server-provided retry delay
delay = e.retry_after or 30
print(f"Rate limited. Waiting {delay}s...")
time.sleep(delay)

except TimeoutError as e:
# Increase timeout on retry
print(f"Timeout on attempt {attempt + 1}. Retrying with longer timeout...")
# The next attempt might use extended timeout

except QCOSError as e:
# Generic error - exponential backoff
delay = 2 ** attempt
print(f"Error: {e}. Retrying in {delay}s...")
time.sleep(delay)

raise QCOSError(f"All {max_retries} attempts failed")

Graceful Degradation​

Fallback to Simulator​

"""Fall back to simulator when QPU is unavailable."""

from softqcos_sdk import (
QCOSClient,
AzureQuantumError,
ProviderUnavailableError,
QuotaExceededError,
)

def execute_with_fallback(
client: QCOSClient,
circuit: str,
shots: int = 1024,
preferred_target: str = "ionq.aria-1",
):
"""Try QPU first, fall back to simulator if needed."""

# Define fallback chain
targets = [
preferred_target,
"ionq.simulator", # IonQ simulator (free)
"aer", # Local Aer simulator
]

last_error = None

for target in targets:
try:
print(f"Trying target: {target}")

if target in ["aer", "cirq", "pennylane"]:
# Local simulator
result = client.execute(qasm=circuit, backend=target, shots=shots)
else:
# Azure Quantum
result = client.azure_quantum.execute(
circuit=circuit,
target=target,
shots=shots,
)

print(f"βœ… Success with {target}")
result.used_target = target
return result

except ProviderUnavailableError as e:
print(f"⚠️ {target} unavailable: {e}")
last_error = e

except QuotaExceededError as e:
print(f"⚠️ Quota exceeded for {target}: {e}")
last_error = e

except AzureQuantumError as e:
print(f"⚠️ Azure error for {target}: {e}")
last_error = e

raise AzureQuantumError(f"All targets failed. Last error: {last_error}")


# Usage
result = execute_with_fallback(
client,
circuit,
preferred_target="ionq.aria-1",
)
print(f"Executed on: {result.used_target}")

Circuit Complexity Fallback​

"""Automatically reduce circuit complexity on failure."""

from softqcos_sdk import QCOSClient, CircuitValidationError

def execute_with_optimization_fallback(
client: QCOSClient,
circuit: str,
shots: int = 1024,
target: str = "ionq",
):
"""Try original circuit, optimize if it fails."""

# Attempt 1: Original circuit
try:
return client.execute(qasm=circuit, shots=shots)
except CircuitValidationError as e:
print(f"Original circuit failed: {e}")

# Attempt 2: Optimized circuit
try:
print("Attempting with optimized circuit...")
optimized = client.optimize(circuit, target=target)
print(f"Optimization reduced gates by {optimized.improvements.get('gate_reduction', 'N/A')}")
return client.execute(qasm=optimized.optimized_qasm, shots=shots)
except CircuitValidationError as e:
print(f"Optimized circuit also failed: {e}")

# Attempt 3: Aggressive optimization
try:
print("Attempting with aggressive optimization...")
optimized = client.optimize(
circuit,
target=target,
optimization_level=3, # Maximum optimization
)
return client.execute(qasm=optimized.optimized_qasm, shots=shots)
except CircuitValidationError as e:
raise CircuitValidationError(
f"Circuit cannot be executed even after optimization: {e}"
)

Async Error Handling​

"""Error handling with async client."""

import asyncio
from softqcos_sdk import AsyncQCOSClient, QCOSError, RateLimitError

async def execute_batch_with_error_handling(
circuits: list[str],
shots: int = 1024,
concurrency: int = 5,
):
"""Execute batch with per-circuit error handling."""

results = []
semaphore = asyncio.Semaphore(concurrency)

async def execute_one(circuit: str, index: int):
async with semaphore:
try:
async with AsyncQCOSClient() as client:
result = await client.execute(qasm=circuit, shots=shots)
return {"index": index, "success": True, "result": result}

except RateLimitError as e:
# Wait and retry once
await asyncio.sleep(e.retry_after or 30)
try:
async with AsyncQCOSClient() as client:
result = await client.execute(qasm=circuit, shots=shots)
return {"index": index, "success": True, "result": result}
except QCOSError as e:
return {"index": index, "success": False, "error": str(e)}

except QCOSError as e:
return {"index": index, "success": False, "error": str(e)}

# Execute all with error isolation
tasks = [execute_one(c, i) for i, c in enumerate(circuits)]
results = await asyncio.gather(*tasks)

# Summary
successful = sum(1 for r in results if r["success"])
print(f"Completed: {successful}/{len(circuits)} successful")

return results


# Usage
circuits = [create_circuit(i) for i in range(100)]
results = asyncio.run(execute_batch_with_error_handling(circuits))

# Process results
for r in results:
if r["success"]:
print(f"Circuit {r['index']}: {r['result'].counts}")
else:
print(f"Circuit {r['index']}: FAILED - {r['error']}")

Logging Errors​

"""Structured error logging."""

import logging
import json
from datetime import datetime
from softqcos_sdk import QCOSError

# Configure structured logger
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)

def log_error(self, error: Exception, context: dict = None):
"""Log error with structured data."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": "ERROR",
"error_type": type(error).__name__,
"error_message": str(error),
"context": context or {},
}

# Add error-specific fields
if hasattr(error, 'retry_after'):
log_data["retry_after"] = error.retry_after
if hasattr(error, 'status_code'):
log_data["status_code"] = error.status_code

self.logger.error(json.dumps(log_data))

def log_success(self, job_id: str, duration: float, context: dict = None):
"""Log successful execution."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": "INFO",
"event": "execution_success",
"job_id": job_id,
"duration_ms": duration * 1000,
"context": context or {},
}
self.logger.info(json.dumps(log_data))


# Usage
logger = StructuredLogger("quantum_app")

try:
result = client.execute(qasm=circuit, shots=1024)
logger.log_success(result.job_id, execution_time, {"circuit_type": "bell_state"})
except QCOSError as e:
logger.log_error(e, {"circuit_type": "bell_state", "shots": 1024})
raise

Error Recovery Patterns​

Checkpoint and Resume​

"""Checkpoint progress for long-running jobs."""

import json
from pathlib import Path
from softqcos_sdk import QCOSClient, QCOSError

class CheckpointedExecution:
def __init__(self, checkpoint_file: str = "checkpoint.json"):
self.checkpoint_file = Path(checkpoint_file)
self.client = QCOSClient()

def load_checkpoint(self) -> dict:
"""Load progress from checkpoint."""
if self.checkpoint_file.exists():
with open(self.checkpoint_file) as f:
return json.load(f)
return {"completed": [], "failed": [], "pending": []}

def save_checkpoint(self, state: dict):
"""Save progress to checkpoint."""
with open(self.checkpoint_file, "w") as f:
json.dump(state, f)

def execute_batch(self, circuits: list[dict]):
"""Execute batch with checkpoint recovery."""

state = self.load_checkpoint()

# Determine what's left to do
completed_ids = set(state["completed"])
pending = [c for c in circuits if c["id"] not in completed_ids]

print(f"Resuming: {len(completed_ids)} done, {len(pending)} remaining")

for circuit_data in pending:
circuit_id = circuit_data["id"]

try:
result = self.client.execute(
qasm=circuit_data["circuit"],
shots=circuit_data.get("shots", 1024),
)

state["completed"].append(circuit_id)
state[circuit_id] = result.counts
print(f"βœ… {circuit_id}")

except QCOSError as e:
state["failed"].append({
"id": circuit_id,
"error": str(e),
})
print(f"❌ {circuit_id}: {e}")

# Save after each circuit
self.save_checkpoint(state)

return state


# Usage
executor = CheckpointedExecution("my_experiment.json")

circuits = [
{"id": f"circuit_{i}", "circuit": create_circuit(i), "shots": 1024}
for i in range(100)
]

# Can be interrupted and resumed
results = executor.execute_batch(circuits)

Best Practices Summary​

  1. Catch specific exceptions - Handle each error type appropriately
  2. Use exponential backoff - For rate limits and transient errors
  3. Never retry auth/validation errors - These won't resolve themselves
  4. Implement fallbacks - QPU β†’ Simulator chain
  5. Log everything - Structured logging for debugging
  6. Checkpoint long jobs - Enable resume after failures
  7. Set timeouts - Don't wait forever
  8. Test error paths - Mock errors in unit tests

Next Steps​