9. Boas Práticas da Integração
Este capítulo detalha recomendações para integração robusta, eficiente e confiável.
9.1 Tamanho Ideal das Cargas
Recomendações por Tipo de Operação
| Tipo | Operação | Itens Recomendados | Razão |
|---|---|---|---|
| Individual | POST / PUT único | 1 | Para testes ou situações urgentes |
| Batch | Síncrono diário | 100-500 | Equilíbrio entre performance e timeout |
| Batch | Síncrono semanal | 500-1.000 | Máximo permitido, otimal |
| Máximo | Qualquer batch | 1.000 | Limite absoluto da API |
Cálculo de Tempo de Processamento
Taxa de Processamento: ~3-5 itens/segundo (média)
Exemplos:
- 100 itens → ~20-30 segundos
- 500 itens → ~2-3 minutos
- 1.000 itens → ~3-5 minutos
Como otimizar:
# ❌ Enviar todos de uma vez (1.000 itens)
enviar_batch(todos_os_produtos) # ~5 minutos
# ✅ Dividir em chunks (3-5 requisições paralelas)
for chunk in dividir_em_chunks(produtos, 300):
enviar_batch(chunk) # ~2 minutos por chunk, paralelo
9.2 Frequência Recomendada de Chamadas
Sincronização de Produtos
| Frequência | Caso de Uso | Método |
|---|---|---|
| Inicial | Primeira sincronização (histórico completo) | Batch de 1.000 |
| Diária | Produtos modificados desde última execução | Query incremental |
| Horária | Apenas mudanças críticas (preço, estoque) | GET latest + delta |
| Real-time | Integrações de vendas/estoque instantâneas | WebSocket (futuro) |
Exemplo: Sincronização Incremental Eficiente
def sincronizar_incrementalmente():
"""
Sincroniza apenas produtos modificados desde última execução
"""
# PASSO 1: Obter data da última sincronização bem-sucedida
ultima_data = obter_ultima_sincronizacao()
print(f"Última sincronização: {ultima_data}")
# PASSO 2: Buscar produtos modificados no seu ERP
produtos_modificados = seu_erp.obter_produtos_apos(ultima_data)
print(f"Produtos para sincronizar: {len(produtos_modificados)}")
# PASSO 3: Enviar apenas os modificados
if len(produtos_modificados) > 0:
resposta = enviar_batch(produtos_modificados)
if resposta.status_code == 202:
print("✓ Sincronização iniciada")
atualizar_ultima_sincronizacao()
def obter_ultima_sincronizacao():
"""
Consulta API para obter data da última sincronização bem-sucedida
"""
response = requests.get(
'https://api.retailportal.com/erp-data/product/get-latest-integration',
headers={'X-API-Key': api_key}
)
if response.status_code == 200:
return response.json()['lastDateTime']
return None
# Executar
sincronizar_incrementalmente()
Benefícios:
- ✓ Reduz volume de dados
- ✓ Processamento mais rápido
- ✓ Menos requisições
- ✓ Menos consumo de banda
9.3 Idempotência — Segurança contra Duplicatas
A API suporta idempotência: você pode reenviar a mesma requisição sem riscos de duplicação.
Como Funciona
REQUISIÇÃO 1: POST produtos [A, B, C]
↓
RESPOSTA: 202 Accepted, jobId=123
(Sua conexão cai, você não sabe se foi processado)
REQUISIÇÃO 2: POST produtos [A, B, C] (MESMA)
↓
API verifica: ErpIds 12345, 12346, 12347 já existem
↓
RESPOSTA: Atualiza registros em vez de criar duplicatas
Garantia de Idempotência
- Campo de Identificação:
erpId(deve ser único e imutável) - Comportamento: Se erpId já existe, atualiza em vez de criar
- Segurança: Não há duplicatas mesmo com múltiplos envios
Implementação Segura
def enviar_com_segurança(produtos):
"""
Envia batch com proteção contra duplicatas
"""
# Sempre use erpId único e consistente
for produto in produtos:
assert 'erpId' in produto, "erpId é obrigatório"
assert isinstance(produto['erpId'], int), "erpId deve ser inteiro"
# Enviar
resposta = enviar_batch(produtos)
# Se falhar por timeout, é SEGURO reenviar
if resposta.status_code == 202:
job_id = resposta.json()['jobId']
aguardar_batch(job_id)
# Se timeout e reconectar, pode reenviar mesma requisição
# A API não criará duplicatas
# Usar com segurança
produtos = [
{'erpId': 12345, ...},
{'erpId': 12346, ...},
{'erpId': 12347, ...},
]
enviar_com_segurança(produtos)
9.4 Estratégias para Evitar Duplicidades
Estratégia 1: Usar erpId Como Chave Primária
# ✅ CORRETO: erpId é único
produtos = [
{'erpId': 12345, 'description': 'Dipirona', ...},
{'erpId': 12346, 'description': 'Ibuprofeno', ...},
]
# ❌ ERRADO: erpIds duplicados
produtos = [
{'erpId': 12345, 'description': 'Dipirona 500mg', ...},
{'erpId': 12345, 'description': 'Dipirona 600mg', ...}, # DUPLICADO!
]
Estratégia 2: Validar Antes de Enviar
def validar_duplicatas(produtos):
"""
Verifica se há erpIds duplicados antes de enviar
"""
erp_ids = [p['erpId'] for p in produtos]
unicos = len(set(erp_ids))
totais = len(erp_ids)
if unicos != totais:
duplicatas = totais - unicos
print(f"❌ {duplicatas} erpIds duplicados detectados")
return False
print(f"✓ Todos os {totais} erpIds são únicos")
return True
# Usar
produtos = obter_produtos_do_erp()
if validar_duplicatas(produtos):
enviar_batch(produtos)
else:
corrigir_dados()
Estratégia 3: Manutenção de Log de Sincronização
import sqlite3
from datetime import datetime
class LogSincronizacao:
def __init__(self, db_path='sincronizacao.db'):
self.conn = sqlite3.connect(db_path)
self.criar_tabela()
def criar_tabela(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS sincronizacoes (
id INTEGER PRIMARY KEY,
job_id TEXT UNIQUE,
tipo_entidade TEXT,
quantidade_itens INTEGER,
data_criacao TIMESTAMP,
data_conclusao TIMESTAMP,
status TEXT,
erros_count INTEGER
)
''')
self.conn.commit()
def registrar(self, job_id, tipo, quantidade):
self.conn.execute('''
INSERT INTO sincronizacoes
(job_id, tipo_entidade, quantidade_itens, data_criacao, status)
VALUES (?, ?, ?, ?, ?)
''', (job_id, tipo, quantidade, datetime.now(), 'Iniciado'))
self.conn.commit()
def marcar_concluido(self, job_id, status, erros=0):
self.conn.execute('''
UPDATE sincronizacoes
SET status = ?, data_conclusao = ?, erros_count = ?
WHERE job_id = ?
''', (status, datetime.now(), erros, job_id))
self.conn.commit()
# Usar
log = LogSincronizacao()
resposta = enviar_batch(produtos)
if resposta.status_code == 202:
job_id = resposta.json()['jobId']
log.registrar(job_id, 'ErpProduct', len(produtos))
resultado = aguardar_batch(job_id)
log.marcar_concluido(
job_id,
resultado['state'],
resultado['failureCount']
)
9.5 Padrões de Conexão e Performance
Padrão 1: Connection Pooling
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def criar_sessao_otimizada():
"""
Cria sessão HTTP com pool de conexões e retry automático
"""
sessao = requests.Session()
# Reutilizar conexões TCP
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
)
sessao.mount('https://', adapter)
sessao.mount('http://', adapter)
return sessao
# Usar
sessao = criar_sessao_otimizada()
response = sessao.post(
'https://api.retailportal.com/erp-data/product/batch',
json=dados,
headers={'X-API-Key': api_key}
)
Padrão 2: Processamento Paralelo
from concurrent.futures import ThreadPoolExecutor, as_completed
def enviar_batches_paralelo(todos_produtos, tamanho_chunk=500):
"""
Envia múltiplos batches em paralelo
"""
chunks = [
todos_produtos[i:i+tamanho_chunk]
for i in range(0, len(todos_produtos), tamanho_chunk)
]
resultados = []
with ThreadPoolExecutor(max_workers=3) as executor:
# Submeter todas as tarefas
futures = {
executor.submit(enviar_batch, chunk): i
for i, chunk in enumerate(chunks)
}
# Coletar resultados conforme completam
for future in as_completed(futures):
chunk_num = futures[future]
try:
resposta = future.result()
job_id = resposta.json()['jobId']
print(f"✓ Chunk {chunk_num + 1} enfileirado: {job_id}")
resultados.append(job_id)
except Exception as e:
print(f"✗ Chunk {chunk_num + 1} falhou: {e}")
# Aguardar conclusão de todos
print("\nAguardando conclusão...")
for job_id in resultados:
resultado = aguardar_batch(job_id)
print(f" Job {job_id}: {resultado['state']}")
# Usar (5.000 produtos em 5 batches paralelos)
produtos = obter_todos_produtos() # 5.000
enviar_batches_paralelo(produtos) # Muito mais rápido!
9.6 Monitoramento e Logging
Estratégia de Logging da API
A API implementa um sistema de logging condicional para reduzir ruído e evitar duplicação:
Para operações individuais (POST/PUT não-batch):
- ✅ Logs de ERRO — Sempre registrados (validação, exceções, conflitos)
- ❌ Logs de SUCESSO — NÃO registrados (reduz ruído)
- Objetivo: Facilitar identificação e diagnóstico de problemas
Para operações em lote (POST/PUT batch):
- ✅ Log agregado — UM registro por lote com resumo completo
- ❌ Logs individuais — NÃO executados (evita duplicação)
- Objetivo: Rastreabilidade sem ruído de 1.000+ logs
- Contém: Total, sucessos, falhas e detalhes de erros
Template de Logging Completo (Cliente)
import logging
import json
from datetime import datetime
class LoggerIntegracaoERP:
def __init__(self, arquivo_log='integracao_erp.log'):
self.logger = logging.getLogger('ERP_Integration')
handler = logging.FileHandler(arquivo_log)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)
def iniciar_batch(self, job_id, tipo, quantidade):
self.logger.info(f"""
BATCH_INICIADO
- JobId: {job_id}
- Tipo: {tipo}
- Quantidade: {quantidade}
- Timestamp: {datetime.now().isoformat()}
""")
def batch_concluido(self, job_id, resultado):
self.logger.info(f"""
BATCH_CONCLUIDO
- JobId: {job_id}
- State: {resultado['state']}
- Sucesso: {resultado['successCount']}/{resultado['totalItems']}
- Falhas: {resultado['failureCount']}
- Duração: {resultado['duration']}
""")
def operacao_individual_falhou(self, tipo_erro, detalhes):
"""
Operações individuais bem-sucedidas NÃO geram log no servidor.
Apenas ERROS são registrados. Implemente este método no cliente.
"""
self.logger.error(f"""
OPERACAO_INDIVIDUAL_ERRO
- Tipo: {tipo_erro}
- Detalhes: {json.dumps(detalhes, indent=2)}
""")
def erro_detectado(self, tipo_erro, detalhes):
self.logger.error(f"""
ERRO_DETECTADO
- Tipo: {tipo_erro}
- Detalhes: {json.dumps(detalhes, indent=2)}
""")
# Usar
logger = LoggerIntegracaoERP()
logger.iniciar_batch(job_id, 'ErpProduct', 500)
# ... processamento ...
logger.batch_concluido(job_id, resultado)
IMPORTANTE: O servidor Portal Retail mantém logs no banco de dados (_Erp_IntegrationHistory) seguindo a estratégia condicional acima. Se você precisa rastrear operações bem-sucedidas (individual ou batch), implemente logging no seu sistema cliente.
9.7 Alertas e Notificações
Implementar Notificação de Falhas
import smtplib
from email.mime.text import MIMEText
def notificar_falha(tipo_erro, detalhes, destinatarios):
"""
Envia email notificando falha de integração
"""
assunto = f"🚨 Integração ERP Portal Retail — FALHA"
corpo = f"""
Tipo de Erro: {tipo_erro}
Data/Hora: {datetime.now().isoformat()}
Detalhes:
{json.dumps(detalhes, indent=2)}
Ação Recomendada:
- Verifique o histórico em: https://api.retailportal.com/erp-data/integration-history
- Contacte: suporte@retailportal.com
"""
mensagem = MIMEText(corpo)
mensagem['Subject'] = assunto
# Enviar
with smtplib.SMTP('smtp.seudominio.com') as smtp:
smtp.send_message(
mensagem,
from_addr='integracao@seudominio.com',
to_addrs=destinatarios
)
print(f"✓ Notificação enviada para {destinatarios}")
def consultar_historico_integracao(tipo_entidade, horas=24):
"""
Consulta histórico de integração para detectar falhas.
NOTA: Apenas ERROS e resumos de BATCH aparecem no histórico.
Sucessos individuais NÃO são registrados (por design).
"""
response = requests.get(
f'https://api.retailportal.com/erp-data/integration-history/{tipo_entidade}',
params={'horas': horas},
headers={'X-API-Key': api_key}
)
return response.json()
# Usar
try:
resultado = aguardar_batch(job_id)
if resultado['failureCount'] > 0:
notificar_falha(
'Sucesso Parcial',
{
'jobId': job_id,
'sucessos': resultado['successCount'],
'falhas': resultado['failureCount']
},
['operacoes@empresa.com', 'ti@empresa.com']
)
except Exception as e:
notificar_falha(
'Erro Crítico',
{'erro': str(e)},
['ti@empresa.com']
)
# Verificar histórico por tipo de entidade
historico = consultar_historico_integracao('ErpProduct')
erros_recentes = [h for h in historico if not h['success']]
if erros_recentes:
print(f"⚠️ {len(erros_recentes)} erros detectados nas últimas 24 horas")
Observação Importante sobre Logging
A API implementa logging condicional. Ao verificar o histórico:
- Operações individuais com ERRO → Aparecem no histórico
- Operações individuais com SUCESSO → NÃO aparecem
- Batch com resumo → UM registro com: total, sucessos, falhas
- Batch itens individuais → NÃO aparecem (evita 1.000+ logs)
Esta estratégia reduz significativamente o ruído no banco de dados enquanto mantém rastreabilidade completa de falhas e resumos de batches.
9.8 Checklist de Implementação
Antes de colocar em produção:
Autenticação e Segurança
- [ ] API Key armazenada em variável de ambiente
- [ ] Certificado SSL validado
- [ ] HTTPS obrigatório (nunca HTTP)
- [ ] Chave rotacionada a cada 90 dias
- [ ] Logs não expõem chave completa
Qualidade de Dados
- [ ] Validação local de dados antes de enviar
- [ ] erpId é único (não há duplicatas)
- [ ] Datas em formato ISO 8601 com Z
- [ ] Tipos de dados corretos (número, string, boolean)
- [ ] Campos obrigatórios sempre preenchidos
Performance e Confiabilidade
- [ ] Batch com máximo 1.000 itens
- [ ] Connection pooling ativado
- [ ] Retry automático com backoff exponencial
- [ ] Timeout configurado (30s para síncrono)
- [ ] Processamento paralelo para múltiplos batches
Monitoramento
- [ ] Entender estratégia de logging condicional (erros sim, sucessos não)
- [ ] Implementar logging do lado cliente para rastrear sucessos individuais
- [ ] Rastreamento de jobIds para consulta de status
- [ ] Alertas para falhas críticas (consultando histórico de integração)
- [ ] Relatório diário de sincronizações (usando endpoint /batch-job-report)
- [ ] Métricas de sucesso/falha registradas no lado cliente
Tratamento de Erro
- [ ] Diagnóstico automático de tipo de erro
- [ ] Reenvio inteligente conforme tipo
- [ ] Notificação em caso de falha persistente
- [ ] Documentação de resoluções comuns
- [ ] Contato com suporte para erros não resolvidos
Documentação
- [ ] Procedimentos de integração documentados
- [ ] Runbook para troubleshooting
- [ ] Contatos de suporte atualizados
- [ ] Exemplos de requisições/respostas
- [ ] Política de SLA definida
9.9 SLA Recomendado
| Métrica | Objetivo |
|---|---|
| Disponibilidade | 99.5% (máx 3,6 horas/mês indisponível) |
| Latência de Resposta | < 5 segundos para GET, < 30 segundos para POST/PUT |
| Tempo de Processamento (Batch) | < 5 minutos para 1.000 itens |
| Tempo de Resolução (Erro 5xx) | < 15 minutos |
| Limite de Taxa | 100 requisições/min por chave de API |
9.10 Suporte e Escalação
Quando Contactar Suporte
- [ ] Erro 500 persistente após 5 tentativas
- [ ] Taxa de erro > 10% consistentemente
- [ ] Dúvidas sobre implementação
- [ ] Solicitação de aumento de limite
- [ ] Problemas de performance inexplicáveis
Informações para Fornecer ao Suporte
Ao contactar suporte, inclua:
{
"problema": "Descrição clara do problema",
"frequencia": "Acontece sempre / Ocasionalmente",
"jobId": "hangfire-job-1234567890",
"timestamp": "2024-12-06T10:30:00Z",
"tipo_operacao": "POST /product/batch",
"quantidade_itens": 500,
"codigo_http": 500,
"mensagem_erro": "Database connection failed",
"historico": "Funcionava, quebrou após mudança X",
"logs_anexados": "integracao_erp_2024-12-06.log"
}
Contato de Suporte
- Email: suporte@retailportal.com
- Telefone: +55 (XX) XXXX-XXXX
- Portal: https://support.retailportal.com
- SLA: Resposta em até 4 horas
Próximas Seções
- Índice — Voltar ao menu principal
- Documentação Técnica Completa
- Exemplos de Código Avançados