Instrumentação manual de uma aplicação Python usando Open Telemetry

Introdução
Nos dois últimos artigos publicados, exploramos a auto-instrumentação e a instrumentação utilizando o OpenTelemetry Operator no Kubernetes. Agora, vamos abordar um caso de teste prático usando a instrumentação manual do OpenTelemetry.
A principal diferença desta abordagem em relação às anteriores é que ela nos proporciona controle total sobre os dados de telemetria da nossa aplicação. Com a instrumentação manual, conseguimos capturar apenas as métricas e traces realmente relevantes para o contexto do negócio, como por exemplo: verificar se uma função crítica está sendo executada dentro do tempo esperado, monitorar etapas específicas de um fluxo de trabalho ou adicionar contexto personalizado aos spans.
Essa granularidade permite uma observabilidade mais precisa e alinhada com as necessidades específicas da sua aplicação.
Outro ponto importante é que, na sua empresa, o backend que recebe os dados de observabilidade pode mudar ao longo do tempo. Por exemplo, você pode estar usando o Dynatrace hoje e, meses depois, migrar para o New Relic. Essa mudança normalmente exigiria um retrabalho significativo na instrumentação da aplicação. No entanto, com o OpenTelemetry, é possível instrumentar sua aplicação uma única vez e, posteriormente, apenas reconfigurar o destino dos dados (backend). Isso reduz drasticamente o retrabalho e aumenta a flexibilidade na escolha de ferramentas de observabilidade.
Estrutura básica da implementação
Existem dois componentes fundamentais quando falamos de instrumentação manual: API e SDK.
API: é responsável por instrumentar o código da aplicação. Através da API, você consegue criar e manipular estruturas de observabilidade como spans, subspans (spans filhos), definir atributos personalizados, registrar eventos de erro e incrementar contadores de métricas. Em resumo, a API fornece as ferramentas para capturar os dados de telemetria diretamente no código.
SDK: fazendo uma analogia, o SDK funciona como o "motor" do OpenTelemetry. É através dele que os dados instrumentados pela API são recebidos, processados, enriquecidos e, finalmente, exportados para o OpenTelemetry Collector ou diretamente para um backend de observabilidade.
Essa separação entre API e SDK segue o princípio de responsabilidade única: a API se preocupa apenas com a coleta dos dados, enquanto o SDK cuida de todo o pipeline de processamento e exportação.
Link do repositório do projeto: htttps://github.com/joaochiroli/otel-instrumentacao-python/pull/new/instrumentacao-manual
Passo 1: Instalar as dependências necessárias
Primeiro, você precisa adicionar os pacotes OpenTelemetry ao seu requirements.txt:
flask
Flask-SQLAlchemy
psycopg2-binary
flask_sqlalchemy
opentelemetry-api
opentelemetry-sdk
# Instrumentação automática para Flask e SQLAlchemy
opentelemetry-instrumentation-flask
opentelemetry-instrumentation-sqlalchemy
# Exporters (escolha baseado no seu backend)
opentelemetry-exporter-otlp-proto-grpc # Para gRPC
opentelemetry-exporter-otlp-proto-http # Para HTTP
# Opcional mas recomendado
opentelemetry-instrumentation-logging
Passo 2: Estrutura básica da instrumentação
Vou criar um arquivo separado otel_config.py para organizar melhor:
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
import os
import logging
logger = logging.getLogger(__name__)
def init_telemetry(app, db_engine):
"""
Inicializa toda a instrumentação OpenTelemetry
"""
# Passo 2.1: Criar o Resource (identifica seu serviço)
resource = Resource.create({
SERVICE_NAME: os.getenv("OTEL_SERVICE_NAME", "flask-app"),
SERVICE_VERSION: "1.0.0",
"environment": os.getenv("ENVIRONMENT", "development"),
"team": "platform-team"
})
# Passo 2.2: Configurar Traces
trace_provider = TracerProvider(resource=resource)
# Configurar o exporter para traces
otlp_trace_exporter = OTLPSpanExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
insecure=True # Use False em produção com TLS
)
# Adicionar o processor
trace_provider.add_span_processor(
BatchSpanProcessor(otlp_trace_exporter)
)
# Registrar o provider globalmente
trace.set_tracer_provider(trace_provider)
# Passo 2.3: Configurar Metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
insecure=True
),
export_interval_millis=60000 # Exporta a cada 60 segundos
)
metric_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader]
)
metrics.set_meter_provider(metric_provider)
# Passo 2.4: Instrumentação automática
FlaskInstrumentor().instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=db_engine)
logger.info("OpenTelemetry instrumentation initialized successfully")
return trace.get_tracer(__name__), metrics.get_meter(__name__)
Passo 3: Modificar o app.py para usar a instrumentação
Aqui está como integrar no seu código:
import logging
import os
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import sys
from sqlalchemy.sql import text
# ============================================
# IMPORTAR OpenTelemetry
# ============================================
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
app = Flask(__name__)
# Configuração do PostgreSQL via variáveis de ambiente
DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DB_HOST = os.getenv('POSTGRES_HOST')
DB_PORT = os.getenv('POSTGRES_PORT')
DB_NAME = os.getenv('POSTGRES_DB')
DATABASE_URL = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
logger.info(f"Database configuration: Host={DB_HOST}, Port={DB_PORT}, DB={DB_NAME}, User={DB_USER}")
# Inicializar o banco de dados
db = SQLAlchemy(app)
# ============================================
# MODELS (definir antes de init_telemetry)
# ============================================
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'created_at': self.created_at.isoformat()
}
class Message(db.Model):
id = db.Column(db.Integer, primary_key=True)
message = db.Column(db.Text, nullable=False)
ip_address = db.Column(db.String(50))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'message': self.message,
'ip_address': self.ip_address,
'created_at': self.created_at.isoformat()
}
class RequestLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
method = db.Column(db.String(10), nullable=False)
endpoint = db.Column(db.String(200), nullable=False)
ip_address = db.Column(db.String(50))
status_code = db.Column(db.Integer)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'method': self.method,
'endpoint': self.endpoint,
'ip_address': self.ip_address,
'status_code': self.status_code,
'created_at': self.created_at.isoformat()
}
# ============================================
# INICIALIZAR TELEMETRY DENTRO DO CONTEXTO
# ============================================
from otel_config import init_telemetry
# CRITICAL FIX: Inicializar dentro do app context
with app.app_context():
tracer, meter = init_telemetry(app, db.engine)
# Criar tabelas
try:
db.create_all()
logger.info("Database tables created successfully")
except Exception as e:
logger.error(f"Error creating database tables: {e}")
# Criar métricas customizadas (fora do context é ok)
request_counter = meter.create_counter(
name="http_requests_total",
description="Total de requisições HTTP",
unit="1"
)
message_counter = meter.create_counter(
name="messages_created_total",
description="Total de mensagens criadas",
unit="1"
)
user_counter = meter.create_counter(
name="users_created_total",
description="Total de usuários criados",
unit="1"
)
# ============================================
# ROUTES
# ============================================
@app.route('/')
def hello():
logger.info("Hello endpoint accessed")
return "Hello, World!"
@app.route('/user/<name>')
def get_user(name):
with tracer.start_as_current_span("get_user_operation") as span:
span.set_attribute("user.name", name)
span.set_attribute("user.name.length", len(name))
logger.info(f"User endpoint accessed for: {name}")
try:
if len(name) < 2:
span.set_status(Status(StatusCode.ERROR, "Name too short"))
raise ValueError("Name too short")
with tracer.start_as_current_span("database.check_user"):
user = User.query.filter_by(name=name).first()
if not user:
with tracer.start_as_current_span("database.create_user"):
user = User(name=name)
db.session.add(user)
db.session.commit()
logger.info(f"New user created: {name}")
user_counter.add(1, {"operation": "create"})
span.add_event("user_created", {"user_id": user.id})
span.set_attribute("user.id", user.id)
span.set_status(Status(StatusCode.OK))
return jsonify({
"message": f"Hello, {name}!",
"user": user.to_dict()
})
except ValueError as e:
logger.error(f"Validation error in get_user: {e}")
return jsonify({"error": "Name must be at least 2 characters"}), 400
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.error(f"Database error in get_user: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route('/submit', methods=['POST'])
def submit_data():
with tracer.start_as_current_span("submit_data_operation") as span:
logger.info("Submit endpoint accessed")
try:
data = request.get_json()
if not data:
span.set_status(Status(StatusCode.ERROR, "No JSON data"))
return jsonify({"status": "error", "message": "No JSON data received"}), 400
if 'message' not in data:
span.set_status(Status(StatusCode.ERROR, "Missing message field"))
return jsonify({"status": "error", "message": "Missing 'message' field"}), 400
span.set_attribute("message.length", len(data['message']))
span.set_attribute("request.ip", request.remote_addr)
with tracer.start_as_current_span("database.insert_message"):
message_entry = Message(
message=data['message'],
ip_address=request.remote_addr
)
db.session.add(message_entry)
db.session.commit()
span.set_attribute("message.id", message_entry.id)
message_counter.add(1, {"source": "api"})
span.add_event("message_saved", {
"message_id": message_entry.id,
"timestamp": datetime.now().isoformat()
})
span.set_status(Status(StatusCode.OK))
logger.info(f"Successfully processed and saved message: {data['message']}")
return jsonify({
"status": "success",
"received_message": data['message'],
"message_id": message_entry.id,
"response": "Data received and saved successfully!",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.error(f"Unexpected error in submit_data: {e}")
db.session.rollback()
return jsonify({"status": "error", "message": "Internal server error"}), 500
@app.before_request
def before_request_telemetry():
request_counter.add(1, {
"method": request.method,
"endpoint": request.endpoint or "unknown"
})
@app.route('/health')
def health_check():
logger.info("Health check accessed")
try:
db.session.execute(text('SELECT 1'))
db_status = "connected"
user_count = User.query.count()
message_count = Message.query.count()
log_count = RequestLog.query.count()
except Exception as e:
db_status = f"error: {str(e)}"
user_count = message_count = log_count = 0
return jsonify({
"status": "healthy",
"database": db_status,
"database_stats": {
"users": user_count,
"messages": message_count,
"request_logs": log_count
},
"timestamp": datetime.now().isoformat(),
"version": "1.0.0"
})
if __name__ == '__main__':
host = os.getenv('FLASK_HOST', '0.0.0.0')
port = int(os.getenv('FLASK_PORT', '5000'))
logger.info(f"Starting Flask application on {host}:{port}...")
app.run(debug=False, host=host, port=port)
Os dados podem ser vistos acessando o Jaeger

Outros arquivos usados no projeto
Dockerfile
FROM python:3.9-slim
# Defina o diretório de trabalho
WORKDIR /app
# Copie os arquivos necessários para o diretório de trabalho
COPY . /app
# Instale as dependências
RUN pip install -r requirements.txt
# Configura o OTLP
RUN opentelemetry-bootstrap -a install
# Exponha a porta que a aplicação vai rodar
EXPOSE 5000
# Comando para rodar a aplicação
CMD ["opentelemetry-instrument", "python", "app.py"]
Docker-compose
services:
db:
image: postgres:14.15
container_name: postgres-db
ports:
- "5432:5432"
environment:
POSTGRES_DB: simple_app
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck: # verifica se o db esta pronto para os outros servicos
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 10
networks:
- app-network
app:
build: .
container_name: flask-app
volumes:
- .:/app
ports:
- "5000:5000"
environment:
POSTGRES_HOST: db
POSTGRES_PORT: 5432
POSTGRES_DB: simple_app
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
FLASK_ENV: development
FLASK_DEBUG: "true"
OTEL_SERVICE_NAME: flask-api
OTEL_EXPORTER_OTLP_ENDPOINT: otel-collector:4317
OTEL_EXPORTER_OTLP_PROTOCOL: grpc
OTEL_EXPORTER_OTLP_INSECURE: "true"
OTEL_TRACES_EXPORTER: console,otlp
OTEL_METRICS_EXPORTER: console,otlp
OTEL_LOGS_EXPORTER: console,otlp
OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: "true"
OTEL_PYTHON_LOG_CORRELATION: "true"
OTEL_PYTHON_FLASK_ENABLED: "true"
OTEL_PYTHON_SQLALCHEMY_ENABLED: "true"
OTEL_PYTHON_REQUESTS_ENABLED: "true"
OTEL_TRACES_SAMPLER: always_on
OTEL_METRICS_SAMPLER: always_on
# OTEL_LOG_LEVEL: DEBUG
# OTEL_TRACES_SAMPLER: "always_on"
# OTEL_TRACES_SAMPLER_ARG: "1.0"
# OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: ""
# OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: ".*"
# OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: ".*"
networks:
- app-network
depends_on:
db:
condition: service_healthy
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
container_name: otel-collector
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
- "8888:8888" # Prometheus metrics
- "8889:8889" # Prometheus exporter metrics
networks:
- app-network
restart: unless-stopped
jaeger:
image: jaegertracing/all-in-one:latest
container_name: jaeger
environment:
- COLLECTOR_OTLP_ENABLED=true
- LOG_LEVEL=debug
ports:
- "16686:16686" # Jaeger UI
# - "4317:4317" # OTLP gRPC receiver
# - "4318:4318" # OTLP HTTP receiver
- "14250:14250" # Jaeger gRPC
- "14268:14268" # Jaeger HTTP
networks:
- app-network
prometheus:
image: prom/prometheus:latest
container_name: prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.path=/prometheus"
- "--web.console.libraries=/usr/share/prometheus/console_libraries"
- "--web.console.templates=/usr/share/prometheus/consoles"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
ports:
- "9090:9090"
networks:
- app-network
depends_on:
- otel-collector
restart: unless-stopped
volumes:
postgres_data:
prometheus-data:
networks:
app-network:
driver: bridge
otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 10s
send_batch_size: 1024
exporters:
debug:
verbosity: detailed
otlp:
endpoint: jaeger:4317
tls:
insecure: true
prometheus:
endpoint: 0.0.0.0:8889
namespace: flask_app
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [debug, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug, prometheus]
logs:
receivers: [otlp]
processors: [batch]
exporters: [debug]
Pra executar o projeto basta baixar o projeto do Github ou então criar os arquivos acima e executar um docker compose up
Fluxo do projeto

A Ligação Entre os Arquivos
1. otel_config.py - O Configurador
Este arquivo contém a função init_telemetry() que:
Configura o OpenTelemetry SDK
Define para onde enviar os traces (exporters)
Instrumenta automaticamente Flask e SQLAlchemy
Retorna o
traceremeterconfigurados
2. app.py - O Consumidor
Importa e usa a configuração:
python
# IMPORTA a configuração
from otel_config import init_telemetry
# INICIALIZA a telemetria passando app Flask e DB engine
with app.app_context():
tracer, meter = init_telemetry(app, db.engine)
📍 Onde os Traces São Gerados
Traces Automáticos (configurados no otel_config.py):
HTTP Requests - Todas as rotas Flask são automaticamente rastreadas
SQL Queries - Todas as queries do SQLAlchemy são automaticamente rastreadas
Traces Manuais (criados no app.py):
python
# Exemplo na rota /user/<name>
with tracer.start_as_current_span("get_user_operation") as span:
span.set_attribute("user.name", name) # Adiciona metadados
# Span filho para operação específica
with tracer.start_as_current_span("database.check_user"):
user = User.query.filter_by(name=name).first()
🔄 O Fluxo Completo
1. Request chega → Flask (instrumentado automaticamente)
↓
2. Cria span root automático "HTTP GET /user/john"
↓
3. Seu código cria span filho "get_user_operation"
↓
4. Dentro dele, cria span neto "database.check_user"
↓
5. SQLAlchemy (instrumentado) cria span "SELECT FROM users..."
↓
6. Todos os spans são enviados para o coletor configurado
🎯 Para Onde Vão os Traces
O otel_config.py provavelmente configura um exporter que envia para:
Jaeger (porta 4317 ou 14250)
OpenTelemetry Collector (porta 4317/4318)
Zipkin (porta 9411)
Ou outro backend de observabilidade
📊 Estrutura da Telemetria
Trace (representa toda a requisição)
├── Span: HTTP GET /user/john [automático]
│ ├── Span: get_user_operation [manual]
│ │ ├── Span: database.check_user [manual]
│ │ │ └── Span: SELECT * FROM users [automático]
│ │ └── Span: database.create_user [manual]
│ │ └── Span: INSERT INTO users [automático]
🔍 Momentos-Chave da Complementação
Inicialização (linha ~95-96):
Durante Requisições:
Instrumentação automática captura tudo
Spans manuais adicionam contexto de negócio
Métricas Customizadas:
python
message_counter.add(1, {"source": "api"}) # Conta mensagens
user_counter.add(1, {"operation": "create"}) # Conta usuários
O otel_config.py é a infraestrutura (como um sistema de câmeras), enquanto o app.py é o uso dessa infraestrutura (onde você coloca as câmeras e o que filma com elas)
Exemplo de Melhoria no Contexto de Métricas e Traces de Negócio
Você pode enriquecer a observabilidade do seu código configurando pontos estratégicos onde deseja obter mais informações sobre o comportamento da aplicação. Isso inclui:
Adicionar contadores customizados para rastrear eventos específicos do negócio
Implementar validadores para garantir a qualidade dos dados coletados
Otimizar o número de spans, mantendo apenas aqueles que realmente agregam valor ao troubleshooting e à análise de performance
Criar métricas de negócio que reflitam KPIs importantes para a operação
A instrumentação personalizada oferece possibilidades praticamente ilimitadas para adaptar sua estratégia de observabilidade às necessidades específicas do seu contexto.
@app.route('/user/<name>')
def get_user(name):
# Span customizado com informações do SEU negócio
with tracer.start_as_current_span("get_user_operation") as span:
# 🎯 ADICIONAR MAIS ATRIBUTOS AO SPAN
span.set_attribute("user.name", name)
span.set_attribute("user.type", "premium") # ✨ Nova métrica
span.set_attribute("request.source", request.headers.get('User-Agent')) # ✨
span.set_attribute("feature.flag", "new_ui_enabled") # ✨
# 🎯 EVENTOS IMPORTANTES DO NEGÓCIO
span.add_event("validation_started", {
"validator": "name_length",
"min_length": 2
})
# 🎯 SPANS FILHOS PARA OPERAÇÕES ESPECÍFICAS
with tracer.start_as_current_span("cache_check") as cache_span:
cache_span.set_attribute("cache.key", f"user:{name}")
# Simular check de cache
cache_hit = False
cache_span.set_attribute("cache.hit", cache_hit)
if not cache_hit:
span.add_event("cache_miss", {"key": f"user:{name}"})
# 🎯 MÉTRICAS DE PERFORMANCE
import time
db_start = time.time()
with tracer.start_as_current_span("database.check_user") as db_span:
user = User.query.filter_by(name=name).first()
db_latency = (time.time() - db_start) * 1000
db_span.set_attribute("db.latency_ms", db_latency)
db_span.set_attribute("db.rows_examined", 1)
# 🎯 MÉTRICAS CONDICIONAIS
if db_latency > 100:
db_span.set_attribute("performance.slow_query", True)
span.add_event("slow_query_detected", {
"latency_ms": db_latency,
"query_type": "user_lookup"
})
Conclusão
Chegamos ao final desta série de artigos sobre observabilidade e OpenTelemetry. Ao longo de quatro artigos, um introdutório e três práticos, abordamos temas fundamentais para implementar observabilidade efetiva em suas aplicações:
Fundamentos: O que é auto-instrumentação e como ela funciona
Prática com instrumentação automática: utilizando OTLP para auto-instrumentação em ambientes em qualquer tipo de ambiente
Prática com Kubernetes: Como utilizar o OpenTelemetry Operator para auto-instrumentação em ambientes Kubernetes
Instrumentação Manual: Técnicas para instrumentar manualmente suas aplicações quando você precisa de maior controle e customização
Espero que este conteúdo tenha proporcionado uma base sólida para você começar ou aprimorar sua jornada com OpenTelemetry e observabilidade.



