Best Practices
Production-ready patterns and recommendations for using SoftQCOS effectively.
SDK Configuration
Environment-based Configuration
"""Production configuration pattern."""
import os
from softqcos_sdk import QCOSClient
def get_client() -> QCOSClient:
"""Get configured client based on environment."""
env = os.environ.get("ENVIRONMENT", "development")
config = {
"development": {
"api_key": os.environ.get("SOFTQCOS_DEV_API_KEY"),
"timeout": 60,
"max_retries": 1,
},
"staging": {
"api_key": os.environ.get("SOFTQCOS_STAGING_API_KEY"),
"timeout": 120,
"max_retries": 2,
},
"production": {
"api_key": os.environ.get("SOFTQCOS_API_KEY"),
"timeout": 300,
"max_retries": 3,
},
}
return QCOSClient(**config[env])
# Usage
client = get_client()
Singleton Pattern
"""Singleton client for application-wide use."""
from softqcos_sdk import QCOSClient
from functools import lru_cache
@lru_cache(maxsize=1)
def get_qcos_client() -> QCOSClient:
"""Get or create singleton client instance."""
return QCOSClient()
# Usage anywhere in your app
client = get_qcos_client()
result = client.execute(qasm=circuit, shots=1024)
Error Handling
Comprehensive Error Handling
"""Production-grade error handling."""
from softqcos_sdk import (
QCOSClient,
QCOSError,
AuthenticationError,
RateLimitError,
QuotaExceededError,
CircuitValidationError,
TimeoutError,
AzureQuantumError,
)
import logging
import time
logger = logging.getLogger(__name__)
def execute_with_resilience(
client: QCOSClient,
circuit: str,
shots: int = 1024,
max_retries: int = 3,
) -> dict:
"""Execute circuit with comprehensive error handling."""
last_error = None
for attempt in range(max_retries):
try:
result = client.execute(qasm=circuit, shots=shots)
return {
"success": True,
"result": result,
"attempts": attempt + 1,
}
except AuthenticationError as e:
# Don't retry auth errors
logger.error(f"Authentication failed: {e}")
raise
except CircuitValidationError as e:
# Don't retry validation errors
logger.error(f"Invalid circuit: {e}")
raise
except QuotaExceededError as e:
# Don't retry quota errors
logger.error(f"Quota exceeded: {e}")
raise
except RateLimitError as e:
# Wait and retry
wait_time = e.retry_after or (2 ** attempt)
logger.warning(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
last_error = e
except TimeoutError as e:
# Retry with backoff
wait_time = 2 ** attempt
logger.warning(f"Timeout. Retrying in {wait_time}s...")
time.sleep(wait_time)
last_error = e
except QCOSError as e:
# Generic error - retry with backoff
wait_time = 2 ** attempt
logger.warning(f"Error: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
last_error = e
# All retries exhausted
logger.error(f"All {max_retries} attempts failed. Last error: {last_error}")
return {
"success": False,
"error": str(last_error),
"attempts": max_retries,
}
Circuit Validation
"""Always validate circuits before execution."""
def safe_execute(client: QCOSClient, circuit: str, shots: int = 1024):
"""Validate and execute circuit safely."""
# Step 1: Validate syntax
validation = client.validate(circuit)
if not validation.valid:
raise ValueError(f"Invalid circuit: {validation.errors}")
# Step 2: Analyze properties
analysis = client.analyze(circuit)
# Check constraints
if analysis.num_qubits > 25:
logger.warning(f"Large circuit: {analysis.num_qubits} qubits")
if analysis.depth > 1000:
logger.warning(f"Deep circuit: {analysis.depth} depth")
# Step 3: Execute
return client.execute(qasm=circuit, shots=shots)
Performance Optimization
Batch Processing
"""Efficient batch processing patterns."""
from softqcos_sdk import QCOSClient, AsyncQCOSClient
import asyncio
from typing import List
def batch_execute_sync(
client: QCOSClient,
circuits: List[str],
shots: int = 1024,
chunk_size: int = 50,
) -> List[dict]:
"""Process circuits in optimized batches."""
results = []
for i in range(0, len(circuits), chunk_size):
chunk = circuits[i:i + chunk_size]
# Use batch endpoint for efficiency
chunk_results = client.execute_batch(
circuits=chunk,
shots=shots,
)
results.extend(chunk_results)
# Log progress
processed = min(i + chunk_size, len(circuits))
print(f"Progress: {processed}/{len(circuits)}")
return results
async def batch_execute_async(
circuits: List[str],
shots: int = 1024,
concurrency: int = 10,
) -> List[dict]:
"""High-performance async batch processing."""
semaphore = asyncio.Semaphore(concurrency)
async def execute_one(client, circuit):
async with semaphore:
return await client.execute(qasm=circuit, shots=shots)
async with AsyncQCOSClient() as client:
tasks = [execute_one(client, c) for c in circuits]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
circuits = [create_circuit(params) for params in parameter_sweep]
# Sync version
results = batch_execute_sync(client, circuits)
# Async version (faster for I/O bound)
results = asyncio.run(batch_execute_async(circuits))
Caching
"""Cache optimization results for repeated circuits."""
from functools import lru_cache
import hashlib
def circuit_hash(circuit: str) -> str:
"""Create hash of circuit for caching."""
normalized = circuit.strip().replace(" ", "").lower()
return hashlib.md5(normalized.encode()).hexdigest()
@lru_cache(maxsize=1000)
def get_optimized_circuit(circuit_hash: str, target: str) -> str:
"""Cache optimized circuits."""
# Note: actual circuit passed separately due to lru_cache
pass
class CachedOptimizer:
def __init__(self, client: QCOSClient):
self.client = client
self._cache = {}
def optimize(self, circuit: str, target: str = "ionq") -> str:
"""Get optimized circuit, using cache if available."""
key = (circuit_hash(circuit), target)
if key in self._cache:
return self._cache[key]
result = self.client.optimize(circuit, target=target)
self._cache[key] = result.optimized_qasm
return result.optimized_qasm
# Usage
optimizer = CachedOptimizer(client)
optimized = optimizer.optimize(circuit, "ionq") # First call: API request
optimized = optimizer.optimize(circuit, "ionq") # Second call: cached
Connection Pooling
"""Reuse HTTP connections for better performance."""
from softqcos_sdk import QCOSClient
import httpx
# Client automatically pools connections, but you can configure:
client = QCOSClient(
http2=True, # Use HTTP/2 for multiplexing
connection_pool_size=20, # Max concurrent connections
keepalive_timeout=30, # Keep connections alive
)
# For high-throughput applications
high_perf_client = QCOSClient(
http2=True,
connection_pool_size=50,
timeout=60,
)
Cost Management
Budget Controls
"""Implement cost controls for production."""
from softqcos_sdk import QCOSClient, QuotaExceededError
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class BudgetConfig:
daily_limit_eur: float = 10.0
monthly_limit_eur: float = 100.0
max_cost_per_job_eur: float = 5.0
class BudgetControlledClient:
def __init__(self, client: QCOSClient, config: BudgetConfig):
self.client = client
self.config = config
self._daily_spend = 0.0
self._monthly_spend = 0.0
self._last_reset = datetime.now()
def _check_budget(self, estimated_cost: float):
"""Check if execution is within budget."""
# Reset daily counter if needed
if datetime.now().date() > self._last_reset.date():
self._daily_spend = 0.0
self._last_reset = datetime.now()
# Check per-job limit
if estimated_cost > self.config.max_cost_per_job_eur:
raise QuotaExceededError(
f"Job cost €{estimated_cost:.2f} exceeds limit €{self.config.max_cost_per_job_eur:.2f}"
)
# Check daily limit
if self._daily_spend + estimated_cost > self.config.daily_limit_eur:
raise QuotaExceededError(
f"Daily budget exhausted. Spent: €{self._daily_spend:.2f}"
)
def execute_azure(self, circuit: str, target: str, shots: int):
"""Execute with budget controls."""
# Estimate cost
estimate = self.client.azure_quantum.estimate_cost(
circuit=circuit,
target=target,
shots=shots,
)
# Check budget
self._check_budget(estimate.cost_eur)
# Execute
result = self.client.azure_quantum.execute(
circuit=circuit,
target=target,
shots=shots,
)
# Track spending
self._daily_spend += estimate.cost_eur
self._monthly_spend += estimate.cost_eur
return result
# Usage
config = BudgetConfig(daily_limit_eur=50.0, monthly_limit_eur=500.0)
controlled_client = BudgetControlledClient(client, config)
result = controlled_client.execute_azure(circuit, "ionq.aria-1", 100)
Cost Estimation Workflow
"""Always estimate before executing on QPUs."""
def cost_aware_execute(
client: QCOSClient,
circuit: str,
preferred_target: str,
shots: int,
max_cost_eur: float = 5.0,
) -> dict:
"""Execute on best available target within budget."""
# Define fallback targets in priority order
fallback_targets = [
preferred_target,
preferred_target.replace(".aria-1", ".simulator"), # IonQ simulator
"aer", # Local simulator
]
for target in fallback_targets:
try:
if "azure" in target or "ionq" in target or "quantinuum" in target:
estimate = client.azure_quantum.estimate_cost(
circuit=circuit,
target=target,
shots=shots,
)
cost = estimate.cost_eur
else:
cost = 0.0 # Local simulators are free
if cost <= max_cost_eur:
print(f"Using {target} (cost: €{cost:.4f})")
if cost > 0:
return client.azure_quantum.execute(circuit, target, shots)
else:
return client.execute(qasm=circuit, shots=shots)
except Exception as e:
print(f"Target {target} unavailable: {e}")
continue
raise ValueError(f"No target available within budget €{max_cost_eur}")
Security Best Practices
API Key Management
"""Secure API key handling."""
import os
from pathlib import Path
def get_api_key() -> str:
"""Get API key from secure sources."""
# Priority 1: Environment variable
if key := os.environ.get("SOFTQCOS_API_KEY"):
return key
# Priority 2: Secrets manager (AWS example)
try:
import boto3
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId="softqcos/api-key")
return response["SecretString"]
except ImportError:
pass
# Priority 3: Config file (for development only)
config_path = Path.home() / ".softqcos" / "config.json"
if config_path.exists():
import json
with open(config_path) as f:
return json.load(f).get("api_key")
raise ValueError("No API key found. Set SOFTQCOS_API_KEY environment variable.")
# Never do this:
# client = QCOSClient(api_key="sk_live_abc123") # ❌ Hardcoded key
# Always do this:
client = QCOSClient(api_key=get_api_key()) # ✅ Secure retrieval
Logging Without Secrets
"""Configure logging to avoid exposing secrets."""
import logging
class SecretFilter(logging.Filter):
"""Filter out sensitive data from logs."""
def __init__(self, patterns=None):
super().__init__()
self.patterns = patterns or ["api_key", "token", "password", "secret"]
def filter(self, record):
message = record.getMessage()
for pattern in self.patterns:
if pattern in message.lower():
record.msg = "[REDACTED - contains sensitive data]"
record.args = ()
break
return True
# Apply filter
logger = logging.getLogger("softqcos_sdk")
logger.addFilter(SecretFilter())
Testing
Unit Testing with Mocks
"""Test your quantum code without API calls."""
import pytest
from unittest.mock import Mock, patch
from softqcos_sdk import QCOSClient
class MockResult:
def __init__(self, counts):
self.job_id = "mock_job_123"
self.status = "completed"
self.counts = counts
@pytest.fixture
def mock_client():
"""Create mock client for testing."""
client = Mock(spec=QCOSClient)
client.execute.return_value = MockResult({"00": 500, "11": 524})
client.health.return_value = {"status": "ok"}
return client
def test_bell_state_analysis(mock_client):
"""Test Bell state produces correct correlations."""
circuit = """
OPENQASM 2.0;
include "qelib1.inc";
qreg q[2]; creg c[2];
h q[0]; cx q[0], q[1];
measure q -> c;
"""
result = mock_client.execute(qasm=circuit, shots=1024)
# Verify entanglement signature
assert "01" not in result.counts or result.counts["01"] == 0
assert "10" not in result.counts or result.counts["10"] == 0
assert "00" in result.counts
assert "11" in result.counts
def test_with_real_api():
"""Integration test with real API (use sparingly)."""
client = QCOSClient()
# Simple health check
health = client.health()
assert health["status"] == "ok"
Property-Based Testing
"""Use property-based testing for quantum circuits."""
from hypothesis import given, strategies as st
from softqcos_sdk import QCOSClient
@given(n_qubits=st.integers(min_value=1, max_value=5))
def test_superposition_distribution(n_qubits):
"""Test superposition produces uniform distribution."""
# Create n-qubit superposition
circuit = f"""
OPENQASM 2.0;
include "qelib1.inc";
qreg q[{n_qubits}];
creg c[{n_qubits}];
"""
for i in range(n_qubits):
circuit += f"h q[{i}];\n"
circuit += "measure q -> c;"
# Mock execution
expected_states = 2 ** n_qubits
# Each state should appear with probability 1/2^n
# This is a property that should always hold
assert expected_states > 0
Monitoring
Structured Logging
"""Production logging configuration."""
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON log formatter for structured logging."""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
# Add extra fields
if hasattr(record, "job_id"):
log_data["job_id"] = record.job_id
if hasattr(record, "circuit_hash"):
log_data["circuit_hash"] = record.circuit_hash
if hasattr(record, "duration_ms"):
log_data["duration_ms"] = record.duration_ms
return json.dumps(log_data)
# Configure logging
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("quantum_app")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage
logger.info("Executing circuit", extra={
"job_id": result.job_id,
"circuit_hash": circuit_hash(circuit),
"duration_ms": execution_time * 1000,
})
Metrics Collection
"""Collect metrics for monitoring dashboards."""
from dataclasses import dataclass, field
from typing import Dict, List
from datetime import datetime
import statistics
@dataclass
class ExecutionMetrics:
total_executions: int = 0
successful_executions: int = 0
failed_executions: int = 0
total_cost_eur: float = 0.0
execution_times: List[float] = field(default_factory=list)
errors_by_type: Dict[str, int] = field(default_factory=dict)
@property
def success_rate(self) -> float:
if self.total_executions == 0:
return 0.0
return self.successful_executions / self.total_executions
@property
def avg_execution_time(self) -> float:
if not self.execution_times:
return 0.0
return statistics.mean(self.execution_times)
def record_execution(self, success: bool, duration: float, cost: float = 0.0, error_type: str = None):
self.total_executions += 1
self.execution_times.append(duration)
self.total_cost_eur += cost
if success:
self.successful_executions += 1
else:
self.failed_executions += 1
if error_type:
self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1
def to_dict(self) -> dict:
return {
"total_executions": self.total_executions,
"success_rate": self.success_rate,
"avg_execution_time_ms": self.avg_execution_time * 1000,
"total_cost_eur": self.total_cost_eur,
"errors_by_type": self.errors_by_type,
}
# Global metrics instance
metrics = ExecutionMetrics()
# Usage in execution
import time
start = time.time()
try:
result = client.execute(qasm=circuit, shots=1024)
metrics.record_execution(True, time.time() - start)
except QCOSError as e:
metrics.record_execution(False, time.time() - start, error_type=type(e).__name__)
# Export for monitoring
print(metrics.to_dict())
Deployment Checklist
Pre-Production Checklist
- API Keys: Using production keys, stored securely
- Error Handling: Comprehensive error handling implemented
- Logging: Structured logging configured
- Monitoring: Metrics collection in place
- Budget Controls: Spending limits configured
- Rate Limiting: Retry logic implemented
- Testing: Unit and integration tests passing
- Circuit Validation: All circuits validated before execution
- Timeouts: Appropriate timeouts configured
- Fallbacks: Fallback targets defined for Azure Quantum
Environment Variables
# Required
SOFTQCOS_API_KEY=sk_live_...
# Optional
SOFTQCOS_TIMEOUT=300
SOFTQCOS_MAX_RETRIES=3
SOFTQCOS_LOG_LEVEL=INFO
# Budget controls
SOFTQCOS_DAILY_BUDGET=50.0
SOFTQCOS_MONTHLY_BUDGET=500.0