9. Boas Práticas da Integração

Este capítulo detalha recomendações para integração robusta, eficiente e confiável.


9.1 Tamanho Ideal das Cargas

Recomendações por Tipo de Operação

Tipo Operação Itens Recomendados Razão
Individual POST / PUT único 1 Para testes ou situações urgentes
Batch Síncrono diário 100-500 Equilíbrio entre performance e timeout
Batch Síncrono semanal 500-1.000 Máximo permitido, otimal
Máximo Qualquer batch 1.000 Limite absoluto da API

Cálculo de Tempo de Processamento

Taxa de Processamento: ~3-5 itens/segundo (média)

Exemplos:
- 100 itens   → ~20-30 segundos
- 500 itens   → ~2-3 minutos
- 1.000 itens → ~3-5 minutos

Como otimizar:

# ❌ Enviar todos de uma vez (1.000 itens)
enviar_batch(todos_os_produtos)  # ~5 minutos

# ✅ Dividir em chunks (3-5 requisições paralelas)
for chunk in dividir_em_chunks(produtos, 300):
    enviar_batch(chunk)  # ~2 minutos por chunk, paralelo

9.2 Frequência Recomendada de Chamadas

Sincronização de Produtos

Frequência Caso de Uso Método
Inicial Primeira sincronização (histórico completo) Batch de 1.000
Diária Produtos modificados desde última execução Query incremental
Horária Apenas mudanças críticas (preço, estoque) GET latest + delta
Real-time Integrações de vendas/estoque instantâneas WebSocket (futuro)

Exemplo: Sincronização Incremental Eficiente

def sincronizar_incrementalmente():
    """
    Sincroniza apenas produtos modificados desde última execução
    """
    # PASSO 1: Obter data da última sincronização bem-sucedida
    ultima_data = obter_ultima_sincronizacao()
    print(f"Última sincronização: {ultima_data}")

    # PASSO 2: Buscar produtos modificados no seu ERP
    produtos_modificados = seu_erp.obter_produtos_apos(ultima_data)
    print(f"Produtos para sincronizar: {len(produtos_modificados)}")

    # PASSO 3: Enviar apenas os modificados
    if len(produtos_modificados) > 0:
        resposta = enviar_batch(produtos_modificados)
        if resposta.status_code == 202:
            print("✓ Sincronização iniciada")
            atualizar_ultima_sincronizacao()

def obter_ultima_sincronizacao():
    """
    Consulta API para obter data da última sincronização bem-sucedida
    """
    response = requests.get(
        'https://api.retailportal.com/erp-data/product/get-latest-integration',
        headers={'X-API-Key': api_key}
    )

    if response.status_code == 200:
        return response.json()['lastDateTime']
    return None

# Executar
sincronizar_incrementalmente()

Benefícios:

  • ✓ Reduz volume de dados
  • ✓ Processamento mais rápido
  • ✓ Menos requisições
  • ✓ Menos consumo de banda

9.3 Idempotência — Segurança contra Duplicatas

A API suporta idempotência: você pode reenviar a mesma requisição sem riscos de duplicação.

Como Funciona

REQUISIÇÃO 1: POST produtos [A, B, C]
  ↓
RESPOSTA: 202 Accepted, jobId=123

(Sua conexão cai, você não sabe se foi processado)

REQUISIÇÃO 2: POST produtos [A, B, C] (MESMA)
  ↓
API verifica: ErpIds 12345, 12346, 12347 já existem
  ↓
RESPOSTA: Atualiza registros em vez de criar duplicatas

Garantia de Idempotência

  • Campo de Identificação: erpId (deve ser único e imutável)
  • Comportamento: Se erpId já existe, atualiza em vez de criar
  • Segurança: Não há duplicatas mesmo com múltiplos envios

Implementação Segura

def enviar_com_segurança(produtos):
    """
    Envia batch com proteção contra duplicatas
    """
    # Sempre use erpId único e consistente
    for produto in produtos:
        assert 'erpId' in produto, "erpId é obrigatório"
        assert isinstance(produto['erpId'], int), "erpId deve ser inteiro"

    # Enviar
    resposta = enviar_batch(produtos)

    # Se falhar por timeout, é SEGURO reenviar
    if resposta.status_code == 202:
        job_id = resposta.json()['jobId']
        aguardar_batch(job_id)
        # Se timeout e reconectar, pode reenviar mesma requisição
        # A API não criará duplicatas

# Usar com segurança
produtos = [
    {'erpId': 12345, ...},
    {'erpId': 12346, ...},
    {'erpId': 12347, ...},
]

enviar_com_segurança(produtos)

9.4 Estratégias para Evitar Duplicidades

Estratégia 1: Usar erpId Como Chave Primária

# ✅ CORRETO: erpId é único
produtos = [
    {'erpId': 12345, 'description': 'Dipirona', ...},
    {'erpId': 12346, 'description': 'Ibuprofeno', ...},
]

# ❌ ERRADO: erpIds duplicados
produtos = [
    {'erpId': 12345, 'description': 'Dipirona 500mg', ...},
    {'erpId': 12345, 'description': 'Dipirona 600mg', ...},  # DUPLICADO!
]

Estratégia 2: Validar Antes de Enviar

def validar_duplicatas(produtos):
    """
    Verifica se há erpIds duplicados antes de enviar
    """
    erp_ids = [p['erpId'] for p in produtos]
    unicos = len(set(erp_ids))
    totais = len(erp_ids)

    if unicos != totais:
        duplicatas = totais - unicos
        print(f"❌ {duplicatas} erpIds duplicados detectados")
        return False

    print(f"✓ Todos os {totais} erpIds são únicos")
    return True

# Usar
produtos = obter_produtos_do_erp()
if validar_duplicatas(produtos):
    enviar_batch(produtos)
else:
    corrigir_dados()

Estratégia 3: Manutenção de Log de Sincronização

import sqlite3
from datetime import datetime

class LogSincronizacao:
    def __init__(self, db_path='sincronizacao.db'):
        self.conn = sqlite3.connect(db_path)
        self.criar_tabela()

    def criar_tabela(self):
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS sincronizacoes (
                id INTEGER PRIMARY KEY,
                job_id TEXT UNIQUE,
                tipo_entidade TEXT,
                quantidade_itens INTEGER,
                data_criacao TIMESTAMP,
                data_conclusao TIMESTAMP,
                status TEXT,
                erros_count INTEGER
            )
        ''')
        self.conn.commit()

    def registrar(self, job_id, tipo, quantidade):
        self.conn.execute('''
            INSERT INTO sincronizacoes
            (job_id, tipo_entidade, quantidade_itens, data_criacao, status)
            VALUES (?, ?, ?, ?, ?)
        ''', (job_id, tipo, quantidade, datetime.now(), 'Iniciado'))
        self.conn.commit()

    def marcar_concluido(self, job_id, status, erros=0):
        self.conn.execute('''
            UPDATE sincronizacoes
            SET status = ?, data_conclusao = ?, erros_count = ?
            WHERE job_id = ?
        ''', (status, datetime.now(), erros, job_id))
        self.conn.commit()

# Usar
log = LogSincronizacao()

resposta = enviar_batch(produtos)
if resposta.status_code == 202:
    job_id = resposta.json()['jobId']
    log.registrar(job_id, 'ErpProduct', len(produtos))

    resultado = aguardar_batch(job_id)
    log.marcar_concluido(
        job_id,
        resultado['state'],
        resultado['failureCount']
    )

9.5 Padrões de Conexão e Performance

Padrão 1: Connection Pooling

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def criar_sessao_otimizada():
    """
    Cria sessão HTTP com pool de conexões e retry automático
    """
    sessao = requests.Session()

    # Reutilizar conexões TCP
    adapter = HTTPAdapter(
        pool_connections=10,
        pool_maxsize=10,
        max_retries=Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=[500, 502, 503, 504]
        )
    )

    sessao.mount('https://', adapter)
    sessao.mount('http://', adapter)

    return sessao

# Usar
sessao = criar_sessao_otimizada()

response = sessao.post(
    'https://api.retailportal.com/erp-data/product/batch',
    json=dados,
    headers={'X-API-Key': api_key}
)

Padrão 2: Processamento Paralelo

from concurrent.futures import ThreadPoolExecutor, as_completed

def enviar_batches_paralelo(todos_produtos, tamanho_chunk=500):
    """
    Envia múltiplos batches em paralelo
    """
    chunks = [
        todos_produtos[i:i+tamanho_chunk]
        for i in range(0, len(todos_produtos), tamanho_chunk)
    ]

    resultados = []

    with ThreadPoolExecutor(max_workers=3) as executor:
        # Submeter todas as tarefas
        futures = {
            executor.submit(enviar_batch, chunk): i
            for i, chunk in enumerate(chunks)
        }

        # Coletar resultados conforme completam
        for future in as_completed(futures):
            chunk_num = futures[future]
            try:
                resposta = future.result()
                job_id = resposta.json()['jobId']
                print(f"✓ Chunk {chunk_num + 1} enfileirado: {job_id}")
                resultados.append(job_id)
            except Exception as e:
                print(f"✗ Chunk {chunk_num + 1} falhou: {e}")

    # Aguardar conclusão de todos
    print("\nAguardando conclusão...")
    for job_id in resultados:
        resultado = aguardar_batch(job_id)
        print(f"  Job {job_id}: {resultado['state']}")

# Usar (5.000 produtos em 5 batches paralelos)
produtos = obter_todos_produtos()  # 5.000
enviar_batches_paralelo(produtos)  # Muito mais rápido!

9.6 Monitoramento e Logging

Estratégia de Logging da API

A API implementa um sistema de logging condicional para reduzir ruído e evitar duplicação:

Para operações individuais (POST/PUT não-batch):

  • Logs de ERRO — Sempre registrados (validação, exceções, conflitos)
  • Logs de SUCESSO — NÃO registrados (reduz ruído)
  • Objetivo: Facilitar identificação e diagnóstico de problemas

Para operações em lote (POST/PUT batch):

  • Log agregado — UM registro por lote com resumo completo
  • Logs individuais — NÃO executados (evita duplicação)
  • Objetivo: Rastreabilidade sem ruído de 1.000+ logs
  • Contém: Total, sucessos, falhas e detalhes de erros

Template de Logging Completo (Cliente)

import logging
import json
from datetime import datetime

class LoggerIntegracaoERP:
    def __init__(self, arquivo_log='integracao_erp.log'):
        self.logger = logging.getLogger('ERP_Integration')
        handler = logging.FileHandler(arquivo_log)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.DEBUG)

    def iniciar_batch(self, job_id, tipo, quantidade):
        self.logger.info(f"""
        BATCH_INICIADO
        - JobId: {job_id}
        - Tipo: {tipo}
        - Quantidade: {quantidade}
        - Timestamp: {datetime.now().isoformat()}
        """)

    def batch_concluido(self, job_id, resultado):
        self.logger.info(f"""
        BATCH_CONCLUIDO
        - JobId: {job_id}
        - State: {resultado['state']}
        - Sucesso: {resultado['successCount']}/{resultado['totalItems']}
        - Falhas: {resultado['failureCount']}
        - Duração: {resultado['duration']}
        """)

    def operacao_individual_falhou(self, tipo_erro, detalhes):
        """
        Operações individuais bem-sucedidas NÃO geram log no servidor.
        Apenas ERROS são registrados. Implemente este método no cliente.
        """
        self.logger.error(f"""
        OPERACAO_INDIVIDUAL_ERRO
        - Tipo: {tipo_erro}
        - Detalhes: {json.dumps(detalhes, indent=2)}
        """)

    def erro_detectado(self, tipo_erro, detalhes):
        self.logger.error(f"""
        ERRO_DETECTADO
        - Tipo: {tipo_erro}
        - Detalhes: {json.dumps(detalhes, indent=2)}
        """)

# Usar
logger = LoggerIntegracaoERP()

logger.iniciar_batch(job_id, 'ErpProduct', 500)
# ... processamento ...
logger.batch_concluido(job_id, resultado)

IMPORTANTE: O servidor Portal Retail mantém logs no banco de dados (_Erp_IntegrationHistory) seguindo a estratégia condicional acima. Se você precisa rastrear operações bem-sucedidas (individual ou batch), implemente logging no seu sistema cliente.


9.7 Alertas e Notificações

Implementar Notificação de Falhas

import smtplib
from email.mime.text import MIMEText

def notificar_falha(tipo_erro, detalhes, destinatarios):
    """
    Envia email notificando falha de integração
    """
    assunto = f"🚨 Integração ERP Portal Retail — FALHA"
    corpo = f"""
    Tipo de Erro: {tipo_erro}
    Data/Hora: {datetime.now().isoformat()}

    Detalhes:
    {json.dumps(detalhes, indent=2)}

    Ação Recomendada:
    - Verifique o histórico em: https://api.retailportal.com/erp-data/integration-history
    - Contacte: suporte@retailportal.com
    """

    mensagem = MIMEText(corpo)
    mensagem['Subject'] = assunto

    # Enviar
    with smtplib.SMTP('smtp.seudominio.com') as smtp:
        smtp.send_message(
            mensagem,
            from_addr='integracao@seudominio.com',
            to_addrs=destinatarios
        )

    print(f"✓ Notificação enviada para {destinatarios}")

def consultar_historico_integracao(tipo_entidade, horas=24):
    """
    Consulta histórico de integração para detectar falhas.
    NOTA: Apenas ERROS e resumos de BATCH aparecem no histórico.
    Sucessos individuais NÃO são registrados (por design).
    """
    response = requests.get(
        f'https://api.retailportal.com/erp-data/integration-history/{tipo_entidade}',
        params={'horas': horas},
        headers={'X-API-Key': api_key}
    )
    return response.json()

# Usar
try:
    resultado = aguardar_batch(job_id)
    if resultado['failureCount'] > 0:
        notificar_falha(
            'Sucesso Parcial',
            {
                'jobId': job_id,
                'sucessos': resultado['successCount'],
                'falhas': resultado['failureCount']
            },
            ['operacoes@empresa.com', 'ti@empresa.com']
        )
except Exception as e:
    notificar_falha(
        'Erro Crítico',
        {'erro': str(e)},
        ['ti@empresa.com']
    )

# Verificar histórico por tipo de entidade
historico = consultar_historico_integracao('ErpProduct')
erros_recentes = [h for h in historico if not h['success']]
if erros_recentes:
    print(f"⚠️ {len(erros_recentes)} erros detectados nas últimas 24 horas")

Observação Importante sobre Logging

A API implementa logging condicional. Ao verificar o histórico:

  • Operações individuais com ERRO → Aparecem no histórico
  • Operações individuais com SUCESSO → NÃO aparecem
  • Batch com resumo → UM registro com: total, sucessos, falhas
  • Batch itens individuais → NÃO aparecem (evita 1.000+ logs)

Esta estratégia reduz significativamente o ruído no banco de dados enquanto mantém rastreabilidade completa de falhas e resumos de batches.


9.8 Checklist de Implementação

Antes de colocar em produção:

Autenticação e Segurança

  • [ ] API Key armazenada em variável de ambiente
  • [ ] Certificado SSL validado
  • [ ] HTTPS obrigatório (nunca HTTP)
  • [ ] Chave rotacionada a cada 90 dias
  • [ ] Logs não expõem chave completa

Qualidade de Dados

  • [ ] Validação local de dados antes de enviar
  • [ ] erpId é único (não há duplicatas)
  • [ ] Datas em formato ISO 8601 com Z
  • [ ] Tipos de dados corretos (número, string, boolean)
  • [ ] Campos obrigatórios sempre preenchidos

Performance e Confiabilidade

  • [ ] Batch com máximo 1.000 itens
  • [ ] Connection pooling ativado
  • [ ] Retry automático com backoff exponencial
  • [ ] Timeout configurado (30s para síncrono)
  • [ ] Processamento paralelo para múltiplos batches

Monitoramento

  • [ ] Entender estratégia de logging condicional (erros sim, sucessos não)
  • [ ] Implementar logging do lado cliente para rastrear sucessos individuais
  • [ ] Rastreamento de jobIds para consulta de status
  • [ ] Alertas para falhas críticas (consultando histórico de integração)
  • [ ] Relatório diário de sincronizações (usando endpoint /batch-job-report)
  • [ ] Métricas de sucesso/falha registradas no lado cliente

Tratamento de Erro

  • [ ] Diagnóstico automático de tipo de erro
  • [ ] Reenvio inteligente conforme tipo
  • [ ] Notificação em caso de falha persistente
  • [ ] Documentação de resoluções comuns
  • [ ] Contato com suporte para erros não resolvidos

Documentação

  • [ ] Procedimentos de integração documentados
  • [ ] Runbook para troubleshooting
  • [ ] Contatos de suporte atualizados
  • [ ] Exemplos de requisições/respostas
  • [ ] Política de SLA definida

9.9 SLA Recomendado

Métrica Objetivo
Disponibilidade 99.5% (máx 3,6 horas/mês indisponível)
Latência de Resposta < 5 segundos para GET, < 30 segundos para POST/PUT
Tempo de Processamento (Batch) < 5 minutos para 1.000 itens
Tempo de Resolução (Erro 5xx) < 15 minutos
Limite de Taxa 100 requisições/min por chave de API

9.10 Suporte e Escalação

Quando Contactar Suporte

  • [ ] Erro 500 persistente após 5 tentativas
  • [ ] Taxa de erro > 10% consistentemente
  • [ ] Dúvidas sobre implementação
  • [ ] Solicitação de aumento de limite
  • [ ] Problemas de performance inexplicáveis

Informações para Fornecer ao Suporte

Ao contactar suporte, inclua:

{
  "problema": "Descrição clara do problema",
  "frequencia": "Acontece sempre / Ocasionalmente",
  "jobId": "hangfire-job-1234567890",
  "timestamp": "2024-12-06T10:30:00Z",
  "tipo_operacao": "POST /product/batch",
  "quantidade_itens": 500,
  "codigo_http": 500,
  "mensagem_erro": "Database connection failed",
  "historico": "Funcionava, quebrou após mudança X",
  "logs_anexados": "integracao_erp_2024-12-06.log"
}

Contato de Suporte

  • Email: suporte@retailportal.com
  • Telefone: +55 (XX) XXXX-XXXX
  • Portal: https://support.retailportal.com
  • SLA: Resposta em até 4 horas

Próximas Seções

  • Índice — Voltar ao menu principal
  • Documentação Técnica Completa
  • Exemplos de Código Avançados