Problem

One polling message triggered sequential processing of all entities. A slow provider (5s) blocked all fast providers (0.1s). Pods 2 and 3 sat idle.

Fix: publish one message per entity per task type

flowchart TD
    subgraph before [Before: 1 aggregate message]
        M1[1 message] --> P1[Pod 1]
        P1 --> A1[Provider A 5s]
        A1 --> B1[Provider B 0.1s]
        B1 --> C1[Provider C 0.1s]
        C1 --> D1[Provider D 0.1s]
    end

    subgraph after ["After: 1 message per entity per task type"]
        MA[msg A] --> PA["Pod 1<br/>Provider A 5s"]
        MB[msg B] --> PB["Pod 2<br/>Provider B 0.1s"]
        MC[msg C] --> PC["Pod 3<br/>Provider C 0.1s"]
        MD[msg D] --> PB
    end

Let RabbitMQ’s competing consumers do the load balancing.

Scheduler architecture

polling_scheduler maintains in-memory entity lists per module type, sourced from the DB and refreshed every 5 minutes:

flowchart TD
    start["Start()"]
    start --> refresh1["refreshEntities()<br/>DB → group by PollingEnabledModules"]
    start --> initial["triggerInitialPolling()<br/>publish all task types immediately<br/>(no waiting for first tick)"]
    refresh1 --> cache["locationEntities<br/>sessionEntities<br/>tariffEntities<br/>cdrEntities"]
    ticker5["5-min ticker"] --> refresh2["refreshEntities() → update cache"]
    tickerN["interval ticker<br/>(per task type)"] --> publish["publishPollingMessages(entities, taskType)"]
    publish --> setnx["ShouldPublish?<br/>Redis SETNX per entity per task type per window"]
    setnx -->|yes| msg["publish GOCPITaskMsg<br/>{TaskType, EntityParty, LastExecutedAt}"]
    setnx -->|no| skip[skip]

Key design decisions

GOCPITaskMsg carries the full EntityParty — the consumer processes exactly what’s in the message and never queries the DB for entities.

Entity list cached in scheduler — avoids a DB query on every tick. Refreshed every 5 minutes from is_polling_enabled=true rows.

One entity can appear in multiple lists — an entity with PollingEnabledModules = "LOCATIONS,TARIFFS" is polled independently for each module type.

Redis key scoped per entity per task typegocpi:publish:{entityCode}:{taskType}:{alignedTimestamp} — each entity × task type combination gets its own idempotency window.

Sessions polling gated on ENABLE_SESSIONS_POLLING — env flag controls whether session entities are included in the polling cycle.

Why not separate queues per entity per task type?

  • Adding a new entity would require a new queue — an infrastructure change instead of a DB config change
  • Competing consumers on a single queue achieves the same parallelism naturally

See also