Complete API documentation for FinOpsMetrics
IMPORTANT: ObservabilityHub receives data from telemetry agents. You don't manually track costs - agents do this automatically.
from finopsmetrics import ObservabilityHub
# Server-side initialization
hub = ObservabilityHub()
Parameter | Type | Description |
---|---|---|
cluster_id |
str | Unique cluster identifier |
nodes |
List[str] | List of node hostnames or IPs |
region |
str | Cloud region (default: us-east-1) |
hub.register_cluster(
cluster_id="gpu-cluster-1",
nodes=["node-1", "node-2", "node-3"],
region="us-west-2"
)
# Query health (populated by agents)
health = hub.get_cluster_health_summary()
for cluster_id, metrics in health.items():
print(f"{cluster_id}: {metrics['health_status']}")
print(f" CPU: {metrics['avg_cpu_usage']:.1f}%")
print(f" GPU: {metrics['avg_gpu_usage']:.1f}%")
Specialized monitoring for Large Language Model training and RAG pipelines.
from finopsmetrics.observability.llm_observability import LLMObservabilityHub
llm_hub = LLMObservabilityHub()
from finopsmetrics.observability.llm_observability import LLMTrainingMetrics
import time
metrics = LLMTrainingMetrics(
run_id="llm-training-001",
model_name="gpt-custom-7b",
epoch=5,
step=1000,
training_loss=0.245,
validation_loss=0.289,
learning_rate=0.0001,
gpu_memory_mb=42000,
batch_size=32,
throughput_samples_per_sec=128.5,
timestamp=time.time()
)
llm_hub.collect_llm_training_metrics(metrics)
summary = llm_hub.get_training_summary("llm-training-001")
print(f"Model: {summary['model_name']}")
print(f"Total Steps: {summary['total_steps']}")
print(f"Best Loss: {summary['best_loss']:.4f}")
Centralized cost tracking and budget management. Receives cost data from telemetry agents.
from finopsmetrics.observability.cost_observatory import CostObservatory
cost_obs = CostObservatory()
from finopsmetrics.observability.cost_observatory import Budget
import time
budget = Budget(
budget_id="monthly-ai-budget",
name="AI/ML Monthly Budget",
amount_usd=50000.0,
period="monthly",
start_time=time.time(),
scope={"provider": "aws", "tags": {"team": "ml-research"}},
alert_threshold=0.8 # Alert at 80%
)
cost_obs.create_budget(budget)
summary = cost_obs.get_cost_summary(time_range_hours=24)
print(f"Total: ${summary['total_cost']:.2f}")
for provider, cost in summary['by_provider'].items():
print(f"{provider}: ${cost:.2f}")
for service, cost in summary['by_service'].items():
print(f"{service}: ${cost:.2f}")
status = cost_obs.get_budget_status()
for budget_id, info in status.items():
print(f"{info['name']}:")
print(f" Budget: ${info['amount']:.2f}")
print(f" Spent: ${info['spent']:.2f}")
print(f" Status: {info['status']}")
NEW IN v0.3.0
Plugin system for extending FinOpsMetrics with custom telemetry collectors, cost attribution logic, recommendations, dashboards, and integrations.
from finopsmetrics.plugins import registry, TelemetryPlugin
class MyPlugin(TelemetryPlugin):
# Plugin implementation
pass
# Register the plugin
registry.register(MyPlugin)
Parameter | Type | Description |
---|---|---|
plugin_name |
str | Name of the plugin to load |
config |
Dict[str, Any] | Plugin configuration dictionary |
plugin = registry.load_plugin("my-plugin", config={
"api_key": "your-api-key",
"endpoint": "https://api.example.com"
})
# Use the plugin
data = plugin.collect_telemetry()
from finopsmetrics.plugins import (
TelemetryPlugin, # Custom data collectors
AttributionPlugin, # Cost attribution logic
RecommendationPlugin, # Optimization recommendations
DashboardPlugin, # Custom widgets
IntegrationPlugin, # External tool integrations
NotificationPlugin, # Alert channels
PolicyPlugin # Governance rules
)
NEW IN v0.3.0
Persona-specific insights tailored to different organizational roles (CFO, Engineer, Finance, Business Lead).
from finopsmetrics.insights import InsightEngine
from finopsmetrics import ObservabilityHub, CostObservatory
hub = ObservabilityHub()
cost_obs = CostObservatory()
engine = InsightEngine(hub=hub, cost_obs=cost_obs)
Parameter | Type | Description |
---|---|---|
persona |
str | Target persona: "cfo", "engineer", "finance", or "business_lead" |
time_range |
str | Time range for analysis (e.g., "7d", "30d", "90d") |
# Generate CFO insights
cfo_insights = engine.generate_insights(persona="cfo", time_range="30d")
for insight in cfo_insights:
print(f"📊 {insight.title}")
print(f" {insight.description}")
print(f" Impact: {insight.impact}")
print(f" Priority: {insight.priority}")
print()
from finopsmetrics.insights import (
CFOInsightGenerator,
EngineerInsightGenerator,
FinanceInsightGenerator,
BusinessLeadInsightGenerator
)
# Use specific persona generator
cfo_gen = CFOInsightGenerator(hub=hub, cost_obs=cost_obs)
insights = cfo_gen.generate(time_range="30d")
NEW IN v0.2.1
Historical data persistence with support for SQLite, PostgreSQL, and TimescaleDB backends.
from finopsmetrics.observability.persistence import PersistenceConfig, StorageBackend
# SQLite configuration
config = PersistenceConfig(
backend=StorageBackend.SQLITE,
connection_string="sqlite:///finopsmetrics.db",
retention_days=90
)
# PostgreSQL configuration
config = PersistenceConfig(
backend=StorageBackend.POSTGRESQL,
connection_string="postgresql://user:pass@localhost:5432/finopsmetrics",
retention_days=365
)
# TimescaleDB with compression
config = PersistenceConfig(
backend=StorageBackend.TIMESCALEDB,
connection_string="postgresql://user:pass@localhost:5432/finopsmetrics",
retention_days=730,
enable_compression=True
)
Parameter | Type | Description |
---|---|---|
backend |
StorageBackend | IN_MEMORY, SQLITE, POSTGRESQL, or TIMESCALEDB |
connection_string |
str | Database connection string |
retention_days |
int | Data retention period in days (default: 90) |
batch_size |
int | Batch size for writes (default: 100) |
enable_compression |
bool | Enable compression (TimescaleDB only) |
import time
# Initialize with persistence
hub = ObservabilityHub(persistence_config=config)
# Query last 30 days
thirty_days_ago = time.time() - (30 * 24 * 3600)
metrics = hub.query_historical_metrics(
start_time=thirty_days_ago,
cluster_id="production",
limit=10000
)
# Query historical costs
costs = cost_obs.query_historical_costs(
start_time=thirty_days_ago,
category="compute"
)
Role-based dashboard components for executives and team leads.
from finopsmetrics.dashboard import CFODashboard
cfo_dash = CFODashboard(hub)
# Generate financial report
report = cfo_dash.generate_financial_report()
print(f"Total Spend: ${report.total_spend}")
print(f"AI/ML ROI: {report.ai_ml_roi}%")
print(f"Budget Status: {report.budget_utilization}%")
from finopsmetrics.dashboard import COODashboard
coo_dash = COODashboard(hub)
# Get operational metrics
metrics = coo_dash.get_operational_metrics()
print(f"SLA Compliance: {metrics.sla_compliance}%")
print(f"Efficiency Score: {metrics.operational_efficiency}%")
from finopsmetrics.dashboard import InfrastructureLeaderDashboard
infra_dash = InfrastructureLeaderDashboard(hub)
# Get resource utilization
util = infra_dash.get_resource_utilization()
print(f"CPU: {util.cpu_percent}%")
print(f"Memory: {util.memory_percent}%")
print(f"Storage: {util.storage_percent}%")
Deploy agents as separate processes to automatically discover resources, collect metrics, and calculate costs.
Parameter | Type | Description |
---|---|---|
finopsmetrics_endpoint |
str | FinOpsMetrics server URL |
aws_region |
str | AWS region (e.g., 'us-west-2') |
from agents.aws_telemetry_agent import AWSTelemetryAgent
# Initialize agent (uses boto3 credential chain)
agent = AWSTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
aws_region="us-west-2"
)
# Register with server
if agent.register_agent():
print("✓ Agent registered")
# Run continuous collection
# Automatically discovers & calculates costs for:
# - EC2 instances, EKS clusters, Lambda functions
# - RDS databases, S3 buckets
agent.run_continuous(interval_seconds=300)
from agents.azure_telemetry_agent import AzureTelemetryAgent
agent = AzureTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
subscription_id="your-subscription-id"
)
agent.register_agent()
agent.run_continuous(interval_seconds=300)
from agents.gcp_telemetry_agent import GCPTelemetryAgent
agent = GCPTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
project_id="your-project-id"
)
agent.register_agent()
agent.run_continuous(interval_seconds=300)
Collect Databricks DBU consumption, cluster costs, job execution metrics, and SQL warehouse usage.
Parameter | Type | Description |
---|---|---|
finopsmetrics_endpoint |
str | FinOpsMetrics server URL (e.g., http://localhost:8080) |
databricks_host |
str | Databricks workspace URL (e.g., https://your-workspace.cloud.databricks.com) |
databricks_token |
str | Databricks personal access token |
workspace_name |
str (optional) | Workspace identifier (default: derived from host) |
from agents.databricks_telemetry_agent import DatabricksTelemetryAgent
# Initialize agent
agent = DatabricksTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
databricks_host="https://your-workspace.cloud.databricks.com",
databricks_token="dapi***",
workspace_name="production"
)
# Register and run
if agent.register_agent():
print("✓ Databricks agent registered")
# Collect every 5 minutes
agent.run_continuous(interval_seconds=300)
Metric | Description |
---|---|
cluster_metrics |
Cluster uptime, instance types, worker count, DBU consumption |
job_metrics |
Job runtime, success/failure, execution costs |
sql_warehouse_metrics |
Warehouse size, state, estimated costs |
Cost Calculation: The agent automatically calculates costs based on DBU pricing:
Collect metrics from all clusters in the workspace.
# Returns cluster metrics
{
"total_clusters": 5,
"running_clusters": 3,
"total_estimated_cost_usd": 145.67,
"clusters": [
{
"cluster_name": "prod-cluster",
"state": "RUNNING",
"instance_type": "m5.xlarge",
"num_workers": 4,
"uptime_hours": 8.5,
"estimated_cost_usd": 32.45
}
]
}
Collect job execution metrics and costs.
# Returns job metrics for last 24 hours
{
"total_runs": 42,
"successful_runs": 38,
"failed_runs": 4,
"total_estimated_cost_usd": 89.23,
"jobs": [
{
"job_id": 123,
"run_name": "ETL Pipeline",
"state": "SUCCESS",
"runtime_seconds": 1845,
"estimated_cost_usd": 12.34
}
]
}
# Install dependencies
pip install databricks-sdk requests
# Set credentials
export DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
export DATABRICKS_TOKEN=dapi***
# Run agent
python agents/databricks_telemetry_agent.py \
--finopsmetrics-endpoint http://localhost:8080 \
--databricks-host $DATABRICKS_HOST \
--databricks-token $DATABRICKS_TOKEN \
--interval 300
Monitor Snowflake credit consumption, warehouse usage, storage costs, and query patterns.
Parameter | Type | Description |
---|---|---|
finopsmetrics_endpoint |
str | FinOpsMetrics server URL |
snowflake_account |
str | Snowflake account identifier (e.g., xy12345.us-east-1) |
snowflake_user |
str | Snowflake username |
snowflake_password |
str | Snowflake password |
snowflake_warehouse |
str | Warehouse name (default: COMPUTE_WH) |
edition |
str | Edition for pricing: 'standard', 'enterprise', 'business_critical' |
from agents.snowflake_telemetry_agent import SnowflakeTelemetryAgent
# Initialize agent
agent = SnowflakeTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
snowflake_account="xy12345.us-east-1",
snowflake_user="admin_user",
snowflake_password="***",
snowflake_warehouse="COMPUTE_WH",
edition="enterprise"
)
# Register and run
if agent.register_agent():
print("✓ Snowflake agent registered")
agent.run_continuous(interval_seconds=300)
Metric | Description |
---|---|
warehouse_metrics |
Credit consumption (compute + cloud services), active hours |
storage_metrics |
Database storage, failsafe storage, total TB |
query_metrics |
Query count, execution time, data scanned |
user_attribution |
Cost breakdown by user and warehouse |
Cost Calculation: Automatic cost calculation based on Snowflake edition:
Collect warehouse credit consumption for past 24 hours.
# Returns warehouse metrics
{
"total_warehouses": 3,
"total_credits_used": 145.67,
"total_estimated_cost_usd": 437.01,
"warehouses": [
{
"warehouse_name": "COMPUTE_WH",
"total_credits_used": 89.45,
"compute_credits": 78.23,
"cloud_service_credits": 11.22,
"active_hours": 18,
"estimated_cost_usd": 268.35
}
]
}
Collect storage usage across all databases.
# Returns storage metrics
{
"total_storage_tb": 12.456,
"estimated_monthly_cost_usd": 498.24,
"databases": [
{
"database_name": "PROD_DB",
"current_storage_tb": 8.234,
"database_tb": 7.123,
"failsafe_tb": 1.111,
"estimated_monthly_cost_usd": 329.36
}
]
}
# Install dependencies
pip install snowflake-connector-python requests
# Set credentials
export SNOWFLAKE_USER=admin_user
export SNOWFLAKE_PASSWORD=***
# Run agent
python agents/snowflake_telemetry_agent.py \
--finopsmetrics-endpoint http://localhost:8080 \
--snowflake-account xy12345.us-east-1 \
--snowflake-warehouse COMPUTE_WH \
--edition enterprise \
--interval 300
Multi-service agent for monitoring MongoDB Atlas, Redis Cloud, GitHub Actions, DataDog, and more.
Parameter | Type | Description |
---|---|---|
finopsmetrics_endpoint |
str | FinOpsMetrics server URL |
config_file |
str | Path to JSON configuration file |
Create a JSON configuration file to enable specific services:
{
"mongodb_atlas": {
"enabled": true,
"public_key": "your_public_key",
"private_key": "your_private_key",
"project_id": "your_project_id"
},
"redis_cloud": {
"enabled": true,
"api_key": "your_api_key",
"secret_key": "your_secret_key",
"account_id": "your_account_id"
},
"github_actions": {
"enabled": true,
"token": "ghp_your_token",
"org_name": "your_organization"
},
"datadog": {
"enabled": true,
"api_key": "your_api_key",
"app_key": "your_app_key"
}
}
from agents.saas_services_telemetry_agent import SaaSServicesTelemetryAgent
# Initialize agent with config
agent = SaaSServicesTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
config_file="saas_config.json"
)
# Register and run (collect hourly)
if agent.register_agent():
print("✓ SaaS services agent registered")
agent.run_continuous(interval_seconds=3600)
Service | Metrics Collected | Cost Estimation |
---|---|---|
mongodb_atlas |
Cluster size, replication, sharding, storage | M10: $0.08/hr, M30: $0.54/hr |
redis_cloud |
Subscriptions, databases, throughput | Based on subscription pricing |
github_actions |
Workflow minutes, billable time | Linux: $0.008/min, macOS: $0.08/min |
datadog |
Host count, usage metrics | ~$15/host/month |
# MongoDB Atlas metrics structure
{
"service": "mongodb_atlas",
"total_clusters": 3,
"total_daily_cost_usd": 47.52,
"clusters": [
{
"cluster_name": "prod-cluster",
"instance_size": "M30",
"num_shards": 1,
"replication_factor": 3,
"provider": "AWS",
"region": "us-east-1",
"hourly_cost_usd": 1.62,
"daily_cost_usd": 38.88
}
]
}
# GitHub Actions metrics structure
{
"service": "github_actions",
"total_minutes_used": 12450,
"included_minutes": 3000,
"billable_minutes": 9450,
"estimated_monthly_cost_usd": 75.60,
"minutes_used_breakdown": {
"UBUNTU": 8000,
"MACOS": 1450
}
}
# Create sample configuration
python agents/saas_services_telemetry_agent.py \
--create-config saas_config.json
# Edit the configuration file with your credentials
# Then run the agent
python agents/saas_services_telemetry_agent.py \
--finopsmetrics-endpoint http://localhost:8080 \
--config saas_config.json \
--interval 3600
Extend the agent by creating custom collectors:
from agents.saas_services_telemetry_agent import SaaSServicesTelemetryAgent
class CustomServiceCollector:
def __init__(self, api_key: str):
self.api_key = api_key
def collect_metrics(self) -> dict:
# Your custom collection logic
return {
"service": "custom_service",
"total_cost": 123.45,
"metrics": {...}
}
# Add to agent configuration
agent = SaaSServicesTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
config_file="config.json"
)
agent.collectors.append(("custom", CustomServiceCollector("api_key")))
agent.run_continuous(interval_seconds=3600)
Set up the FinOpsMetrics server to receive telemetry from agents and serve dashboards.
The server provides both web dashboards and API endpoints for telemetry ingestion.
# Start server on default port 8080
finopsmetrics-dashboard
# Start on custom host/port
finopsmetrics-dashboard --host 0.0.0.0 --port 8080
# Alternative command
finopsmetrics-server --host 0.0.0.0 --port 8080
from finopsmetrics.webui import start_server
# Start server
start_server(
host='0.0.0.0',
port=8080,
debug=False
)
# Check health endpoint
curl http://localhost:8080/api/health
# Expected response: {"status": "healthy"}
# Verify server in logs
# You should see:
# 🚀 Starting FinOpsMetrics Web UI Server...
# Host: 0.0.0.0
# Port: 8080
The server automatically exposes telemetry endpoints when started. Agents use these to send data.
Endpoint | Method | Description |
---|---|---|
/api/v1/agents/register |
POST | Agent registration endpoint |
/api/v1/telemetry/ingest |
POST | Telemetry data ingestion |
/api/health |
GET | Server health check |
/ |
GET | Overview dashboard |
/dashboard/cfo |
GET | CFO executive dashboard |
/dashboard/coo |
GET | COO operational dashboard |
/dashboard/infrastructure |
GET | Infrastructure leader dashboard |
The server automatically initializes these components, but you can access them programmatically.
from finopsmetrics import ObservabilityHub
from finopsmetrics.observability.cost_observatory import CostObservatory
from finopsmetrics.observability.llm_observability import LLMObservabilityHub
# Initialize components (done automatically by server)
hub = ObservabilityHub()
cost_obs = CostObservatory()
llm_hub = LLMObservabilityHub()
# These will receive data from agents automatically
# No manual configuration needed
Once the server is running, deploy agents to start collecting data.
# deploy_agent.py
from agents.aws_telemetry_agent import AWSTelemetryAgent
# Point agent to your server
agent = AWSTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080", # Your server URL
aws_region="us-west-2"
)
# Register with server
if agent.register_agent():
print("✓ Agent registered with server")
# Start continuous collection
# Agent will POST to http://localhost:8080/api/v1/telemetry/ingest
agent.run_continuous(interval_seconds=300)
# 1. Start server (in one terminal/process)
# finopsmetrics-dashboard --port 8080
# 2. Deploy agent (in another terminal/process)
from agents.aws_telemetry_agent import AWSTelemetryAgent
agent = AWSTelemetryAgent(
finopsmetrics_endpoint="http://localhost:8080",
aws_region="us-west-2"
)
agent.register_agent()
agent.run_continuous(interval_seconds=300)
# 3. Access dashboards in browser
# http://localhost:8080/
# http://localhost:8080/dashboard/cfo
# http://localhost:8080/dashboard/coo
For production, run the server with proper configuration.
# Using systemd service
# /etc/systemd/system/finopsmetrics-server.service
[Unit]
Description=FinOpsMetrics Server
After=network.target
[Service]
Type=simple
User=finopsmetrics
WorkingDirectory=/opt/finopsmetrics
Environment="HOST=0.0.0.0"
Environment="PORT=8080"
ExecStart=/usr/bin/finopsmetrics-dashboard --host 0.0.0.0 --port 8080
Restart=always
[Install]
WantedBy=multi-user.target
# Enable and start
sudo systemctl enable finopsmetrics-server
sudo systemctl start finopsmetrics-server
# Using Docker
docker run -d \
--name finopsmetrics-server \
-p 8080:8080 \
-e HOST=0.0.0.0 \
-e PORT=8080 \
finopsmetrics/server:latest
# Using Docker Compose
# docker-compose.yml
version: '3.8'
services:
finopsmetrics-server:
image: finopsmetrics/server:latest
ports:
- "8080:8080"
environment:
- HOST=0.0.0.0
- PORT=8080
restart: always
Ensure agents can reach the server on the configured port.
# Open firewall port (if needed)
sudo ufw allow 8080/tcp
# Verify server is listening
netstat -tulpn | grep 8080
# Test from agent machine
curl http://your-server-ip:8080/api/health
Flask-based web server with real-time WebSocket updates.
Parameter | Type | Description |
---|---|---|
host |
str | Host to bind to (default: '0.0.0.0') |
port |
int | Port to bind to (default: 8080) |
debug |
bool | Enable debug mode (default: False) |
The server pushes real-time updates to connected clients every 5 seconds.
// Client-side WebSocket connection
const socket = io('http://localhost:8080');
// Listen for cost updates
socket.on('cost_update', (data) => {
console.log('New cost data:', data);
updateDashboard(data);
});
// Listen for metric updates
socket.on('metrics_update', (data) => {
console.log('New metrics:', data);
updateCharts(data);
});
Route | Description |
---|---|
/ |
Overview dashboard with key metrics |
/dashboard/cfo |
CFO executive financial dashboard |
/dashboard/coo |
COO operational efficiency dashboard |
/dashboard/infrastructure |
Infrastructure leader technical dashboard |