From 1d61ea8d8303ba2aa496273b4e8666c53a2a470d Mon Sep 17 00:00:00 2001 From: Gronod Date: Tue, 19 May 2026 15:24:43 +0100 Subject: [PATCH] feat(webhooks): integrate receiver with cache + SSE (Phase 2) --- server/routes/webhook.js | 168 +++++++++++++++++++++++++++++++++------ 1 file changed, 145 insertions(+), 23 deletions(-) diff --git a/server/routes/webhook.js b/server/routes/webhook.js index 74a00cc..bd8bd64 100644 --- a/server/routes/webhook.js +++ b/server/routes/webhook.js @@ -1,10 +1,32 @@ // Copyright (c) 2026 Gordon Bolton. MIT License. const express = require('express'); const { logToFile } = require('../utils/logger'); -const { getWebhookSecret } = require('../utils/config'); +const { getWebhookSecret, getSonarrInstances, getRadarrInstances } = require('../utils/config'); +const cache = require('../utils/cache'); +const arrRetrieverRegistry = require('../utils/arrRetrievers'); +const { pollAllServices, POLL_INTERVAL, POLLING_ENABLED } = require('../utils/poller'); const router = express.Router(); +// Cache TTL mirrors poller.js logic: 3x poll interval when active, 30s when on-demand +const CACHE_TTL = POLLING_ENABLED ? POLL_INTERVAL * 3 : 30000; + +// Event classification — determines which cache keys to refresh +const QUEUE_EVENTS = new Set([ + 'Grab', + 'Download', + 'DownloadFailed', + 'ManualInteractionRequired' +]); + +const HISTORY_EVENTS = new Set([ + 'DownloadFolderImported', + 'ImportFailed', + 'EpisodeFileRenamed', + 'MovieFileRenamed', + 'EpisodeFileRenamedBySeries' +]); + /** * Validate webhook secret from the X-Sofarr-Webhook-Secret header * @param {Object} req - Express request object @@ -13,76 +35,176 @@ const router = express.Router(); function validateWebhookSecret(req) { const expectedSecret = getWebhookSecret(); const providedSecret = req.get('X-Sofarr-Webhook-Secret'); - + if (!expectedSecret) { logToFile('[Webhook] WARNING: SOFARR_WEBHOOK_SECRET not configured, rejecting webhook'); return false; } - + if (!providedSecret) { logToFile('[Webhook] WARNING: Missing X-Sofarr-Webhook-Secret header'); return false; } - + if (providedSecret !== expectedSecret) { logToFile('[Webhook] WARNING: Invalid webhook secret provided'); return false; } - + return true; } +/** + * Process a webhook event by refreshing the affected cache and broadcasting SSE. + * This is a fire-and-forget background task — callers must respond to the webhook + * sender before awaiting this function. + * + * Phase 2: lightweight refresh via arrRetrieverRegistry + cache update + SSE broadcast. + * + * @param {string} serviceType - 'sonarr' or 'radarr' + * @param {string} eventType - the eventType from the *arr webhook payload + */ +async function processWebhookEvent(serviceType, eventType) { + const affectsQueue = QUEUE_EVENTS.has(eventType); + const affectsHistory = HISTORY_EVENTS.has(eventType); + + if (!affectsQueue && !affectsHistory) { + logToFile(`[Webhook] Event ${eventType} does not affect queue or history, skipping refresh`); + return; + } + + logToFile(`[Webhook] ${serviceType} event "${eventType}" → queue=${affectsQueue}, history=${affectsHistory}`); + + // Ensure retrievers are initialized (idempotent) + await arrRetrieverRegistry.initialize(); + + if (serviceType === 'sonarr') { + const sonarrInstances = getSonarrInstances(); + + if (affectsQueue) { + const queuesByType = await arrRetrieverRegistry.getQueuesByType(); + const sonarrQueues = queuesByType.sonarr || []; + cache.set('poll:sonarr-queue', { + records: sonarrQueues.flatMap(q => { + const inst = sonarrInstances.find(i => i.id === q.instance); + const url = inst ? inst.url : null; + const key = inst ? inst.apiKey : null; + return (q.data.records || []).map(r => { + if (r.series) r.series._instanceUrl = url; + r._instanceUrl = url; + r._instanceKey = key; + return r; + }); + }) + }, CACHE_TTL); + logToFile(`[Webhook] Refreshed poll:sonarr-queue (${sonarrQueues.length} instance(s))`); + } + + if (affectsHistory) { + const historyByType = await arrRetrieverRegistry.getHistoryByType({ pageSize: 10 }); + const sonarrHistories = historyByType.sonarr || []; + cache.set('poll:sonarr-history', { + records: sonarrHistories.flatMap(h => h.data.records || []) + }, CACHE_TTL); + logToFile(`[Webhook] Refreshed poll:sonarr-history (${sonarrHistories.length} instance(s))`); + } + } else if (serviceType === 'radarr') { + const radarrInstances = getRadarrInstances(); + + if (affectsQueue) { + const queuesByType = await arrRetrieverRegistry.getQueuesByType(); + const radarrQueues = queuesByType.radarr || []; + cache.set('poll:radarr-queue', { + records: radarrQueues.flatMap(q => { + const inst = radarrInstances.find(i => i.id === q.instance); + const url = inst ? inst.url : null; + const key = inst ? inst.apiKey : null; + return (q.data.records || []).map(r => { + if (r.movie) r.movie._instanceUrl = url; + r._instanceUrl = url; + r._instanceKey = key; + return r; + }); + }) + }, CACHE_TTL); + logToFile(`[Webhook] Refreshed poll:radarr-queue (${radarrQueues.length} instance(s))`); + } + + if (affectsHistory) { + const historyByType = await arrRetrieverRegistry.getHistoryByType({ pageSize: 10 }); + const radarrHistories = historyByType.radarr || []; + cache.set('poll:radarr-history', { + records: radarrHistories.flatMap(h => h.data.records || []) + }, CACHE_TTL); + logToFile(`[Webhook] Refreshed poll:radarr-history (${radarrHistories.length} instance(s))`); + } + } + + // Broadcast to all SSE subscribers using the same mechanism poller.js uses. + // pollAllServices() refreshes all data, updates every cache key, and then + // iterates pollSubscribers to push fresh payloads to every open SSE connection. + // If a poll is already in progress this call is a no-op, but the cache keys + // above were already updated so the next broadcast (or dashboard request) + // will see fresh data. + logToFile('[Webhook] Triggering SSE broadcast via pollAllServices()'); + await pollAllServices(); +} + /** * POST /api/webhook/sonarr * Receives webhook events from Sonarr instances. - * Validates the secret, logs the event, and returns 200 immediately. - * - * Phase 1: Receiver only - no cache or SSE integration yet. - * Future phases will integrate with PALDRA registry for event-driven updates. + * Validates the secret, logs the event, refreshes cache, broadcasts SSE, and returns 200. + * + * Phase 2: integrated with PALDRA cache + SSE for real-time dashboard updates. */ router.post('/sonarr', (req, res) => { if (!validateWebhookSecret(req)) { return res.status(401).json({ error: 'Unauthorized' }); } - + try { const { eventType, instanceName } = req.body || {}; logToFile(`[Webhook] Sonarr event received - Type: ${eventType || 'unknown'}, Instance: ${instanceName || 'unknown'}`); logToFile(`[Webhook] Sonarr payload: ${JSON.stringify(req.body)}`); - - // Phase 1: Log and respond immediately - // Future phases will push to cache and trigger SSE + + // Phase 2: background cache refresh + SSE broadcast (fire-and-forget) + processWebhookEvent('sonarr', eventType).catch(err => { + logToFile(`[Webhook] Sonarr background refresh error: ${err.message}`); + }); + res.status(200).json({ received: true }); } catch (error) { logToFile(`[Webhook] Sonarr error: ${error.message}`); - res.status(200).json({ received: true }); // Still return 200 to avoid webhook retries + res.status(200).json({ received: true }); } }); /** * POST /api/webhook/radarr * Receives webhook events from Radarr instances. - * Validates the secret, logs the event, and returns 200 immediately. - * - * Phase 1: Receiver only - no cache or SSE integration yet. - * Future phases will integrate with PALDRA registry for event-driven updates. + * Validates the secret, logs the event, refreshes cache, broadcasts SSE, and returns 200. + * + * Phase 2: integrated with PALDRA cache + SSE for real-time dashboard updates. */ router.post('/radarr', (req, res) => { if (!validateWebhookSecret(req)) { return res.status(401).json({ error: 'Unauthorized' }); } - + try { const { eventType, instanceName } = req.body || {}; logToFile(`[Webhook] Radarr event received - Type: ${eventType || 'unknown'}, Instance: ${instanceName || 'unknown'}`); logToFile(`[Webhook] Radarr payload: ${JSON.stringify(req.body)}`); - - // Phase 1: Log and respond immediately - // Future phases will push to cache and trigger SSE + + // Phase 2: background cache refresh + SSE broadcast (fire-and-forget) + processWebhookEvent('radarr', eventType).catch(err => { + logToFile(`[Webhook] Radarr background refresh error: ${err.message}`); + }); + res.status(200).json({ received: true }); } catch (error) { logToFile(`[Webhook] Radarr error: ${error.message}`); - res.status(200).json({ received: true }); // Still return 200 to avoid webhook retries + res.status(200).json({ received: true }); } });