feat: replace client polling with Server-Sent Events (SSE)
Server: - poller.js: add pollSubscribers Set with onPollComplete/offPollComplete; notify all SSE callbacks immediately after every successful poll - dashboard.js: add GET /api/dashboard/stream endpoint (text/event-stream) - requireAuth enforced via cookie (no CSRF needed — GET is a safe method) - X-Accel-Buffering: no for nginx proxy compatibility - 25s heartbeat comments to survive proxy idle timeouts - initial payload sent immediately on connect - cleanup on req.close: deregister callback, stop heartbeat, remove client - active client tracking updated: type='sse', connectedAt, no refreshRateMs Frontend: - app.js: replace setInterval/fetchUserDownloads with EventSource - startSSE() opens /api/dashboard/stream; stopSSE() closes it - first incoming message hides loading spinner - showAll toggle re-opens stream with ?showAll=true param - logout calls stopSSE() before POST /api/auth/logout - status panel: fixed 5s refresh, shows SSE clients + connect duration - statusRefreshHandle now always 5s, not tied to old refresh-rate selector - index.html: remove now-unused refresh-rate <select> element Docs: - ARCHITECTURE.md §4.3: update poller description - ARCHITECTURE.md §5: rename to SSE Stream (§5.2) + Download Matching (§5.3) - ARCHITECTURE.md §7: update active client tracking description - ARCHITECTURE.md §9: add /stream endpoint, update /status clients schema - ARCHITECTURE.md §10: update key functions table; replace Auto-Refresh section with Live Push via SSE - class-server.puml: add /stream to dashboard routes; update ClientInfo - component.puml: annotate dashboard with SSE note; update label
This commit is contained in:
@@ -221,7 +221,7 @@ sofarr/
|
|||||||
|
|
||||||
**`cache.js`** — Singleton `MemoryCache` backed by a `Map`. Each entry has an expiration timestamp. Provides `get`, `set`, `invalidate`, `clear`, and `getStats` (returns per-key size, item count, TTL remaining).
|
**`cache.js`** — Singleton `MemoryCache` backed by a `Map`. Each entry has an expiration timestamp. Provides `get`, `set`, `invalidate`, `clear`, and `getStats` (returns per-key size, item count, TTL remaining).
|
||||||
|
|
||||||
**`poller.js`** — Background polling engine. Fetches data from all configured service instances in parallel with per-task timing. Stores results in the cache with a configurable TTL. Can be disabled entirely (`POLL_INTERVAL=0`), in which case data is fetched on-demand per dashboard request.
|
**`poller.js`** — Background polling engine. Fetches data from all configured service instances in parallel with per-task timing. Stores results in the cache with a configurable TTL. Can be disabled entirely (`POLL_INTERVAL=0`), in which case data is fetched on-demand. After each successful poll it notifies all registered SSE subscriber callbacks so connected clients receive data immediately.
|
||||||
|
|
||||||
**`qbittorrent.js`** — `QBittorrentClient` class with cookie-based authentication, automatic re-auth on 403, and persistent client instances. Includes torrent-to-download mapping (`mapTorrentToDownload`) and formatting utilities (`formatBytes`, `formatSpeed`, `formatEta`).
|
**`qbittorrent.js`** — `QBittorrentClient` class with cookie-based authentication, automatic re-auth on 403, and persistent client instances. Includes torrent-to-download mapping (`mapTorrentToDownload`) and formatting utilities (`formatBytes`, `formatSpeed`, `formatEta`).
|
||||||
|
|
||||||
@@ -253,18 +253,31 @@ Every `POLL_INTERVAL` ms (default 5000), the poller fetches from all services in
|
|||||||
|
|
||||||
Results are stored in the cache under `poll:*` keys with a TTL of `POLL_INTERVAL × 3`.
|
Results are stored in the cache under `poll:*` keys with a TTL of `POLL_INTERVAL × 3`.
|
||||||
|
|
||||||
### 5.2 Dashboard Request
|
### 5.2 SSE Stream
|
||||||
|
|
||||||
When a user requests `/api/dashboard/user-downloads`:
|
When a browser opens `GET /api/dashboard/stream` (after authentication):
|
||||||
|
|
||||||
1. Read all `poll:*` keys from cache
|
1. Server sets `Content-Type: text/event-stream`, disables buffering (`X-Accel-Buffering: no`)
|
||||||
2. Build `seriesMap` and `moviesMap` from embedded objects in queue records
|
2. Immediately builds and sends the first payload (same matching logic as below)
|
||||||
3. Build `sonarrTagMap` and `radarrTagMap` from tag data
|
3. Registers a callback with the poller's `onPollComplete` subscriber set
|
||||||
4. For each SABnzbd queue slot → try to match against Sonarr/Radarr queue records by title
|
4. After every subsequent poll cycle completes the callback fires, rebuilds the payload, and writes a `data:` SSE frame
|
||||||
5. For each SABnzbd history slot → try to match against Sonarr/Radarr history records
|
5. A 25-second heartbeat comment (`: heartbeat`) keeps the connection alive through proxies
|
||||||
6. For each qBittorrent torrent → try to match against Sonarr/Radarr queue, then history
|
6. On client disconnect: deregisters callback, stops heartbeat, removes from active-client map
|
||||||
7. For each match, resolve the series/movie, extract the user tag, check if it belongs to the requesting user
|
|
||||||
8. Return only the user's downloads (or all, if admin with `showAll=true`)
|
The browser's native `EventSource` API handles reconnection automatically on network interruption.
|
||||||
|
|
||||||
|
### 5.3 Download Matching
|
||||||
|
|
||||||
|
For each connected user the server:
|
||||||
|
|
||||||
|
1. Reads all `poll:*` keys from cache
|
||||||
|
2. Builds `seriesMap` and `moviesMap` from embedded objects in queue records
|
||||||
|
3. Builds `sonarrTagMap` and `radarrTagMap` from tag data
|
||||||
|
4. For each SABnzbd queue slot → tries to match against Sonarr/Radarr queue records by title
|
||||||
|
5. For each SABnzbd history slot → tries to match against Sonarr/Radarr history records
|
||||||
|
6. For each qBittorrent torrent → tries to match against Sonarr/Radarr queue, then history
|
||||||
|
7. For each match, resolves the series/movie, extracts the user tag, checks if it belongs to the requesting user
|
||||||
|
8. Returns only the user's downloads (or all, if admin with `showAll=true`)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -336,7 +349,7 @@ Users are matched to downloads via tags in Sonarr/Radarr:
|
|||||||
|
|
||||||
### Active Client Tracking
|
### Active Client Tracking
|
||||||
|
|
||||||
Each dashboard request reports the client's refresh rate. The server tracks active clients in a `Map<username, { user, refreshRateMs, lastSeen }>`, pruning entries older than 30 seconds. This data powers the admin status panel's "effective refresh mode" display.
|
SSE connections are tracked precisely: registered on connect, removed on disconnect. The admin status panel shows each connected user and how long they have been connected. The `type: 'sse'` field distinguishes SSE clients from any legacy HTTP clients.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -476,15 +489,38 @@ Clear session and revoke the Emby token server-side. Does **not** require a CSRF
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### `GET /api/dashboard/stream`
|
||||||
|
|
||||||
|
Server-Sent Events stream. Opens a persistent connection that pushes download data on every poll cycle.
|
||||||
|
|
||||||
|
**Query Parameters:**
|
||||||
|
| Param | Type | Description |
|
||||||
|
|-------|------|-------------|
|
||||||
|
| `showAll` | `"true"` | (Admin) Include all users' downloads |
|
||||||
|
|
||||||
|
**Response:** `Content-Type: text/event-stream`
|
||||||
|
|
||||||
|
Each event is a `data:` frame containing JSON:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"user": "Alice",
|
||||||
|
"isAdmin": false,
|
||||||
|
"downloads": [ /* download objects — same shape as /user-downloads */ ]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The connection is kept alive with `: heartbeat` comments every 25 seconds. The browser's `EventSource` reconnects automatically on failure.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
### `GET /api/dashboard/user-downloads`
|
### `GET /api/dashboard/user-downloads`
|
||||||
|
|
||||||
Fetch downloads for the authenticated user.
|
Fetch downloads for the authenticated user (single HTTP request, no streaming).
|
||||||
|
|
||||||
**Query Parameters:**
|
**Query Parameters:**
|
||||||
| Param | Type | Description |
|
| Param | Type | Description |
|
||||||
|-------|------|-------------|
|
|-------|------|-------------|
|
||||||
| `showAll` | `"true"` | (Admin) Show all users' downloads |
|
| `showAll` | `"true"` | (Admin) Show all users' downloads |
|
||||||
| `refreshRate` | number (ms) | Client's current refresh rate for tracking |
|
|
||||||
|
|
||||||
**Response (200):**
|
**Response (200):**
|
||||||
```json
|
```json
|
||||||
@@ -531,7 +567,7 @@ Admin-only server status.
|
|||||||
]
|
]
|
||||||
},
|
},
|
||||||
"clients": [
|
"clients": [
|
||||||
{ "user": "Alice", "refreshRateMs": 5000, "lastSeen": 1715817600000 }
|
{ "user": "Alice", "type": "sse", "connectedAt": 1715817600000, "lastSeen": 1715817600000 }
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -577,13 +613,13 @@ The frontend is a **vanilla JavaScript SPA** with no build step. All logic resid
|
|||||||
|----------|---------|
|
|----------|---------|
|
||||||
| `checkAuthentication()` | On load: check session → show dashboard or login |
|
| `checkAuthentication()` | On load: check session → show dashboard or login |
|
||||||
| `handleLogin()` | Authenticate, fade login → splash → dashboard |
|
| `handleLogin()` | Authenticate, fade login → splash → dashboard |
|
||||||
| `fetchUserDownloads()` | GET `/user-downloads`, update state, re-render |
|
| `startSSE()` | Open `EventSource` to `/stream`; handles incoming data + first-message loading hide |
|
||||||
|
| `stopSSE()` | Close `EventSource` and cancel reconnect timer |
|
||||||
| `renderDownloads()` | Diff-based card rendering (create/update/remove) |
|
| `renderDownloads()` | Diff-based card rendering (create/update/remove) |
|
||||||
| `createDownloadCard()` | Build DOM for a single download card; renders tag badges |
|
| `createDownloadCard()` | Build DOM for a single download card; renders tag badges |
|
||||||
| `updateDownloadCard()` | Update existing card in-place (progress, speed, etc.) |
|
| `updateDownloadCard()` | Update existing card in-place (progress, speed, etc.) |
|
||||||
| `toggleStatusPanel()` | Show/hide admin status panel |
|
| `toggleStatusPanel()` | Show/hide admin status panel |
|
||||||
| `renderStatusPanel()` | Build status HTML (server, polling, cache, clients) |
|
| `renderStatusPanel()` | Build status HTML (server, polling, SSE clients, cache) |
|
||||||
| `startAutoRefresh()` | Start periodic `fetchUserDownloads` |
|
|
||||||
| `initThemeSwitcher()` | Light / Dark / Mono theme support |
|
| `initThemeSwitcher()` | Light / Dark / Mono theme support |
|
||||||
|
|
||||||
### Themes
|
### Themes
|
||||||
@@ -604,9 +640,11 @@ Download cards render tag badges in the card header:
|
|||||||
- Tags with **no matching Emby user** → amber badge showing the raw tag label (leftmost)
|
- Tags with **no matching Emby user** → amber badge showing the raw tag label (leftmost)
|
||||||
- Tags **matched to a known Emby user** → accent badge showing the Emby display name (rightmost)
|
- Tags **matched to a known Emby user** → accent badge showing the Emby display name (rightmost)
|
||||||
|
|
||||||
### Auto-Refresh
|
### Live Push via SSE
|
||||||
|
|
||||||
The dashboard polls the server at the user-selected interval (1s, 5s, 10s, or Off). The status panel auto-refreshes in sync. Client refresh rate is sent to the server on each request for active-client tracking.
|
The dashboard receives updates via a persistent `EventSource` connection to `GET /api/dashboard/stream`. The server pushes a new `data:` event immediately after every poll cycle completes — there is no client-side timer. The browser's `EventSource` implementation handles reconnection automatically on network interruption.
|
||||||
|
|
||||||
|
The status panel refreshes on a fixed 5-second timer and shows each SSE client with its connect duration.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
@@ -42,9 +42,11 @@ package "server/routes" {
|
|||||||
- activeClients : Map<string, ClientInfo>
|
- activeClients : Map<string, ClientInfo>
|
||||||
- CLIENT_STALE_MS : 30000
|
- CLIENT_STALE_MS : 30000
|
||||||
--
|
--
|
||||||
|
+ GET /stream (SSE, text/event-stream)
|
||||||
+ GET /user-downloads
|
+ GET /user-downloads
|
||||||
+ GET /user-summary
|
+ GET /user-summary
|
||||||
+ GET /status
|
+ GET /status
|
||||||
|
+ GET /cover-art
|
||||||
--
|
--
|
||||||
- getCoverArt(item) : string|null
|
- getCoverArt(item) : string|null
|
||||||
- extractAllTags(tags, tagMap) : string[]
|
- extractAllTags(tags, tagMap) : string[]
|
||||||
@@ -224,7 +226,8 @@ package "server/utils" {
|
|||||||
|
|
||||||
class "ClientInfo" as ci <<value>> {
|
class "ClientInfo" as ci <<value>> {
|
||||||
+ user : string
|
+ user : string
|
||||||
+ refreshRateMs : number
|
+ type : 'sse'
|
||||||
|
+ connectedAt : number (timestamp)
|
||||||
+ lastSeen : number (timestamp)
|
+ lastSeen : number (timestamp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ package "Express Server" as server {
|
|||||||
|
|
||||||
package "Routes" as routes {
|
package "Routes" as routes {
|
||||||
[auth.js\n/api/auth\n(pre-CSRF)] as auth
|
[auth.js\n/api/auth\n(pre-CSRF)] as auth
|
||||||
[dashboard.js\n/api/dashboard] as dashboard
|
[dashboard.js\n/api/dashboard\n(+SSE /stream)] as dashboard
|
||||||
[emby.js\n/api/emby] as emby_route
|
[emby.js\n/api/emby] as emby_route
|
||||||
[sabnzbd.js\n/api/sabnzbd] as sab_route
|
[sabnzbd.js\n/api/sabnzbd] as sab_route
|
||||||
[sonarr.js\n/api/sonarr] as sonarr_route
|
[sonarr.js\n/api/sonarr] as sonarr_route
|
||||||
@@ -86,6 +86,9 @@ package "Express Server" as server {
|
|||||||
|
|
||||||
auth ..> sanitize
|
auth ..> sanitize
|
||||||
dashboard ..> sanitize
|
dashboard ..> sanitize
|
||||||
|
|
||||||
|
note "Browser uses EventSource\n(SSE) to /stream.\nServer pushes on each\npoll cycle — no client timer." as sseNote
|
||||||
|
sseNote .. dashboard
|
||||||
}
|
}
|
||||||
|
|
||||||
cloud "External Services" as external {
|
cloud "External Services" as external {
|
||||||
|
|||||||
150
public/app.js
150
public/app.js
@@ -1,12 +1,15 @@
|
|||||||
let currentUser = null;
|
let currentUser = null;
|
||||||
let downloads = [];
|
let downloads = [];
|
||||||
let refreshInterval = null;
|
|
||||||
let currentRefreshRate = 5000; // default 5 seconds
|
|
||||||
let isAdmin = false;
|
let isAdmin = false;
|
||||||
let showAll = false;
|
let showAll = false;
|
||||||
let csrfToken = null; // double-submit CSRF token, sent as X-CSRF-Token on mutating requests
|
let csrfToken = null; // double-submit CSRF token, sent as X-CSRF-Token on mutating requests
|
||||||
const SPLASH_MIN_MS = 1200; // minimum splash display time
|
const SPLASH_MIN_MS = 1200; // minimum splash display time
|
||||||
|
|
||||||
|
// SSE stream state
|
||||||
|
let sseSource = null;
|
||||||
|
let sseReconnectTimer = null;
|
||||||
|
const SSE_RECONNECT_MS = 3000; // browser already retries, but we add explicit backoff too
|
||||||
|
|
||||||
// Apply saved theme immediately (before DOMContentLoaded to avoid flash)
|
// Apply saved theme immediately (before DOMContentLoaded to avoid flash)
|
||||||
(function() {
|
(function() {
|
||||||
const saved = localStorage.getItem('sofarr-theme') || 'light';
|
const saved = localStorage.getItem('sofarr-theme') || 'light';
|
||||||
@@ -20,7 +23,6 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
|
|
||||||
document.getElementById('login-form').addEventListener('submit', handleLogin);
|
document.getElementById('login-form').addEventListener('submit', handleLogin);
|
||||||
document.getElementById('logout-btn').addEventListener('click', handleLogout);
|
document.getElementById('logout-btn').addEventListener('click', handleLogout);
|
||||||
document.getElementById('refresh-rate').addEventListener('change', handleRefreshRateChange);
|
|
||||||
document.getElementById('show-all-toggle').addEventListener('change', handleShowAllToggle);
|
document.getElementById('show-all-toggle').addEventListener('change', handleShowAllToggle);
|
||||||
document.getElementById('status-btn').addEventListener('click', toggleStatusPanel);
|
document.getElementById('status-btn').addEventListener('click', toggleStatusPanel);
|
||||||
});
|
});
|
||||||
@@ -41,37 +43,51 @@ function setTheme(theme) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function startAutoRefresh() {
|
// --- SSE connection management ---
|
||||||
if (refreshInterval) clearInterval(refreshInterval);
|
|
||||||
if (currentRefreshRate > 0) {
|
function startSSE() {
|
||||||
refreshInterval = setInterval(() => fetchUserDownloads(false), currentRefreshRate);
|
stopSSE();
|
||||||
}
|
const params = showAll ? '?showAll=true' : '';
|
||||||
|
const source = new EventSource('/api/dashboard/stream' + params);
|
||||||
|
sseSource = source;
|
||||||
|
|
||||||
|
let firstMessage = true;
|
||||||
|
source.onmessage = (event) => {
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(event.data);
|
||||||
|
currentUser = data.user;
|
||||||
|
isAdmin = !!data.isAdmin;
|
||||||
|
downloads = data.downloads;
|
||||||
|
document.getElementById('currentUser').textContent = currentUser || '-';
|
||||||
|
renderDownloads();
|
||||||
|
hideError();
|
||||||
|
if (firstMessage) { firstMessage = false; hideLoading(); }
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[SSE] Failed to parse message:', err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
source.onerror = () => {
|
||||||
|
// EventSource retries automatically; we just log and show a reconnecting indicator
|
||||||
|
console.warn('[SSE] Connection lost, browser will retry...');
|
||||||
|
};
|
||||||
|
|
||||||
|
console.log('[SSE] Stream connected');
|
||||||
}
|
}
|
||||||
|
|
||||||
function handleRefreshRateChange(e) {
|
function stopSSE() {
|
||||||
const rate = parseInt(e.target.value);
|
if (sseReconnectTimer) { clearTimeout(sseReconnectTimer); sseReconnectTimer = null; }
|
||||||
currentRefreshRate = rate;
|
if (sseSource) {
|
||||||
startAutoRefresh();
|
sseSource.close();
|
||||||
// Restart status panel refresh if it's open
|
sseSource = null;
|
||||||
const statusPanel = document.getElementById('status-panel');
|
console.log('[SSE] Stream closed');
|
||||||
if (statusPanel && statusPanel.style.display !== 'none') {
|
|
||||||
if (statusRefreshHandle) { clearInterval(statusRefreshHandle); statusRefreshHandle = null; }
|
|
||||||
if (currentRefreshRate > 0) {
|
|
||||||
statusRefreshHandle = setInterval(refreshStatusPanel, currentRefreshRate);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function handleShowAllToggle(e) {
|
function handleShowAllToggle(e) {
|
||||||
showAll = e.target.checked;
|
showAll = e.target.checked;
|
||||||
fetchUserDownloads(true);
|
// Re-open stream with updated showAll param
|
||||||
}
|
startSSE();
|
||||||
|
|
||||||
function stopAutoRefresh() {
|
|
||||||
if (refreshInterval) {
|
|
||||||
clearInterval(refreshInterval);
|
|
||||||
refreshInterval = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function fadeOutLogin() {
|
function fadeOutLogin() {
|
||||||
@@ -132,8 +148,8 @@ async function checkAuthentication() {
|
|||||||
currentUser = data.user;
|
currentUser = data.user;
|
||||||
isAdmin = !!data.user.isAdmin;
|
isAdmin = !!data.user.isAdmin;
|
||||||
showDashboard();
|
showDashboard();
|
||||||
await fetchUserDownloads(true);
|
showLoading();
|
||||||
startAutoRefresh();
|
startSSE();
|
||||||
await dismissSplash(splashStart);
|
await dismissSplash(splashStart);
|
||||||
} else {
|
} else {
|
||||||
await dismissSplash(splashStart);
|
await dismissSplash(splashStart);
|
||||||
@@ -169,7 +185,7 @@ async function handleLogin(e) {
|
|||||||
isAdmin = !!data.user.isAdmin;
|
isAdmin = !!data.user.isAdmin;
|
||||||
// Store CSRF token returned by login for use in subsequent requests
|
// Store CSRF token returned by login for use in subsequent requests
|
||||||
if (data.csrfToken) csrfToken = data.csrfToken;
|
if (data.csrfToken) csrfToken = data.csrfToken;
|
||||||
// Fade out login, then show splash while loading data.
|
// Fade out login, then show splash while opening SSE stream.
|
||||||
// requestAnimationFrame ensures the browser paints the splash at
|
// requestAnimationFrame ensures the browser paints the splash at
|
||||||
// opacity:1 before dismissSplash adds fade-out, so the CSS
|
// opacity:1 before dismissSplash adds fade-out, so the CSS
|
||||||
// transition fires and transitionend is guaranteed.
|
// transition fires and transitionend is guaranteed.
|
||||||
@@ -177,9 +193,9 @@ async function handleLogin(e) {
|
|||||||
showSplash();
|
showSplash();
|
||||||
await new Promise(r => requestAnimationFrame(() => requestAnimationFrame(r)));
|
await new Promise(r => requestAnimationFrame(() => requestAnimationFrame(r)));
|
||||||
showDashboard();
|
showDashboard();
|
||||||
|
showLoading();
|
||||||
const splashStart = Date.now();
|
const splashStart = Date.now();
|
||||||
await fetchUserDownloads(true);
|
startSSE();
|
||||||
startAutoRefresh();
|
|
||||||
await dismissSplash(splashStart);
|
await dismissSplash(splashStart);
|
||||||
} else {
|
} else {
|
||||||
showLoginError(data.error || 'Login failed');
|
showLoginError(data.error || 'Login failed');
|
||||||
@@ -192,7 +208,7 @@ async function handleLogin(e) {
|
|||||||
|
|
||||||
async function handleLogout() {
|
async function handleLogout() {
|
||||||
try {
|
try {
|
||||||
stopAutoRefresh();
|
stopSSE();
|
||||||
await fetch('/api/auth/logout', {
|
await fetch('/api/auth/logout', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: csrfToken ? { 'X-CSRF-Token': csrfToken } : {}
|
headers: csrfToken ? { 'X-CSRF-Token': csrfToken } : {}
|
||||||
@@ -230,40 +246,8 @@ function hideLoginError() {
|
|||||||
errorDiv.style.display = 'none';
|
errorDiv.style.display = 'none';
|
||||||
}
|
}
|
||||||
|
|
||||||
async function fetchUserDownloads(isInitialLoad = false) {
|
// fetchUserDownloads is kept for the showAll toggle re-connection case
|
||||||
if (isInitialLoad) {
|
// but the primary data path is now via SSE (startSSE / EventSource).
|
||||||
showLoading();
|
|
||||||
}
|
|
||||||
hideError();
|
|
||||||
|
|
||||||
try {
|
|
||||||
const params = new URLSearchParams();
|
|
||||||
if (showAll) params.set('showAll', 'true');
|
|
||||||
params.set('refreshRate', currentRefreshRate);
|
|
||||||
const url = '/api/dashboard/user-downloads?' + params.toString();
|
|
||||||
const response = await fetch(url);
|
|
||||||
const data = await response.json();
|
|
||||||
|
|
||||||
currentUser = data.user;
|
|
||||||
isAdmin = !!data.isAdmin;
|
|
||||||
downloads = data.downloads;
|
|
||||||
|
|
||||||
// Debug: log first download to see what fields are present
|
|
||||||
if (downloads.length > 0) {
|
|
||||||
console.log('[Dashboard] Download data:', JSON.stringify(downloads[0]));
|
|
||||||
}
|
|
||||||
|
|
||||||
document.getElementById('currentUser').textContent = currentUser || '-';
|
|
||||||
renderDownloads();
|
|
||||||
} catch (err) {
|
|
||||||
showError('Failed to fetch downloads. Make sure all services are configured.');
|
|
||||||
console.error(err);
|
|
||||||
} finally {
|
|
||||||
if (isInitialLoad) {
|
|
||||||
hideLoading();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function renderDownloads() {
|
function renderDownloads() {
|
||||||
const downloadsList = document.getElementById('downloads-list');
|
const downloadsList = document.getElementById('downloads-list');
|
||||||
@@ -628,6 +612,7 @@ function escapeHtml(str) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let statusRefreshHandle = null;
|
let statusRefreshHandle = null;
|
||||||
|
const STATUS_REFRESH_MS = 5000;
|
||||||
|
|
||||||
async function toggleStatusPanel() {
|
async function toggleStatusPanel() {
|
||||||
const panel = document.getElementById('status-panel');
|
const panel = document.getElementById('status-panel');
|
||||||
@@ -638,11 +623,8 @@ async function toggleStatusPanel() {
|
|||||||
}
|
}
|
||||||
panel.style.display = 'block';
|
panel.style.display = 'block';
|
||||||
await refreshStatusPanel();
|
await refreshStatusPanel();
|
||||||
// Auto-refresh in sync with dashboard refresh rate
|
|
||||||
if (statusRefreshHandle) clearInterval(statusRefreshHandle);
|
if (statusRefreshHandle) clearInterval(statusRefreshHandle);
|
||||||
if (currentRefreshRate > 0) {
|
statusRefreshHandle = setInterval(refreshStatusPanel, STATUS_REFRESH_MS);
|
||||||
statusRefreshHandle = setInterval(refreshStatusPanel, currentRefreshRate);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function closeStatusPanel() {
|
function closeStatusPanel() {
|
||||||
@@ -693,11 +675,7 @@ function renderStatusPanel(data, panel) {
|
|||||||
|
|
||||||
const pollIntervalMs = data.polling.intervalMs;
|
const pollIntervalMs = data.polling.intervalMs;
|
||||||
const clients = data.clients || [];
|
const clients = data.clients || [];
|
||||||
const activeRefreshers = clients.filter(c => c.refreshRateMs > 0);
|
const sseClients = clients.filter(c => c.type === 'sse');
|
||||||
const fastestClient = activeRefreshers.length > 0
|
|
||||||
? activeRefreshers.reduce((min, c) => c.refreshRateMs < min.refreshRateMs ? c : min)
|
|
||||||
: null;
|
|
||||||
const hasForegroundClient = fastestClient && data.polling.enabled && fastestClient.refreshRateMs < pollIntervalMs;
|
|
||||||
|
|
||||||
if (data.polling.enabled) {
|
if (data.polling.enabled) {
|
||||||
html += `<div class="status-row"><span>Background poll</span><span>${pollIntervalMs / 1000}s</span></div>`;
|
html += `<div class="status-row"><span>Background poll</span><span>${pollIntervalMs / 1000}s</span></div>`;
|
||||||
@@ -705,19 +683,15 @@ function renderStatusPanel(data, panel) {
|
|||||||
html += `<div class="status-row"><span>Background poll</span><span>Disabled</span></div>`;
|
html += `<div class="status-row"><span>Background poll</span><span>Disabled</span></div>`;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasForegroundClient) {
|
const mode = sseClients.length > 0
|
||||||
html += `<div class="status-row"><span>Effective mode</span><span class="status-fg-badge">Foreground ${fastestClient.refreshRateMs / 1000}s</span></div>`;
|
? `<span class="status-fg-badge">SSE push</span>`
|
||||||
} else if (activeRefreshers.length > 0) {
|
: (data.polling.enabled ? 'Background' : 'On-demand (idle)');
|
||||||
html += `<div class="status-row"><span>Effective mode</span><span>${data.polling.enabled ? 'Background' : 'On-demand'}</span></div>`;
|
html += `<div class="status-row"><span>Delivery mode</span><span>${mode}</span></div>`;
|
||||||
} else {
|
|
||||||
html += `<div class="status-row"><span>Effective mode</span><span>Idle (no active clients)</span></div>`;
|
|
||||||
}
|
|
||||||
|
|
||||||
html += `<div class="status-row"><span>Active clients</span><span>${clients.length}</span></div>`;
|
html += `<div class="status-row"><span>SSE clients</span><span>${sseClients.length}</span></div>`;
|
||||||
for (const c of clients) {
|
for (const c of sseClients) {
|
||||||
const rate = c.refreshRateMs > 0 ? (c.refreshRateMs / 1000) + 's' : 'Off';
|
const age = Math.round((Date.now() - c.connectedAt) / 1000);
|
||||||
const age = Math.round((Date.now() - c.lastSeen) / 1000);
|
html += `<div class="status-row status-row-sub"><span>${escapeHtml(c.user)}</span><span>connected ${age}s ago</span></div>`;
|
||||||
html += `<div class="status-row status-row-sub"><span>${escapeHtml(c.user)}</span><span>${rate} (${age}s ago)</span></div>`;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
html += `</div>`;
|
html += `</div>`;
|
||||||
|
|||||||
@@ -53,15 +53,6 @@
|
|||||||
<button class="theme-btn" data-theme="dark">Dark</button>
|
<button class="theme-btn" data-theme="dark">Dark</button>
|
||||||
<button class="theme-btn" data-theme="mono">Mono</button>
|
<button class="theme-btn" data-theme="mono">Mono</button>
|
||||||
</div>
|
</div>
|
||||||
<div class="refresh-control">
|
|
||||||
<label for="refresh-rate">Refresh:</label>
|
|
||||||
<select id="refresh-rate">
|
|
||||||
<option value="1000">1s</option>
|
|
||||||
<option value="5000" selected>5s</option>
|
|
||||||
<option value="10000">10s</option>
|
|
||||||
<option value="0">Off</option>
|
|
||||||
</select>
|
|
||||||
</div>
|
|
||||||
<div id="admin-controls" class="admin-controls" style="display: none;">
|
<div id="admin-controls" class="admin-controls" style="display: none;">
|
||||||
<label class="toggle-label">
|
<label class="toggle-label">
|
||||||
<input type="checkbox" id="show-all-toggle">
|
<input type="checkbox" id="show-all-toggle">
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ const requireAuth = require('../middleware/requireAuth');
|
|||||||
const axios = require('axios');
|
const axios = require('axios');
|
||||||
const { mapTorrentToDownload } = require('../utils/qbittorrent');
|
const { mapTorrentToDownload } = require('../utils/qbittorrent');
|
||||||
const cache = require('../utils/cache');
|
const cache = require('../utils/cache');
|
||||||
const { pollAllServices, getLastPollTimings, POLLING_ENABLED } = require('../utils/poller');
|
const { pollAllServices, getLastPollTimings, onPollComplete, offPollComplete, POLLING_ENABLED } = require('../utils/poller');
|
||||||
const { getSonarrInstances, getRadarrInstances } = require('../utils/config');
|
const { getSonarrInstances, getRadarrInstances } = require('../utils/config');
|
||||||
const sanitizeError = require('../utils/sanitizeError');
|
const sanitizeError = require('../utils/sanitizeError');
|
||||||
|
|
||||||
@@ -129,15 +129,18 @@ function buildTagBadges(allTags, embyUserMap) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track active dashboard clients: Map<username, { refreshRateMs, lastSeen }>
|
// Track active dashboard clients.
|
||||||
|
// SSE connections: registered on connect, removed on close — always accurate.
|
||||||
|
// Legacy HTTP poll clients: pruned after CLIENT_STALE_MS of inactivity.
|
||||||
const activeClients = new Map();
|
const activeClients = new Map();
|
||||||
const CLIENT_STALE_MS = 30000; // consider client gone after 30s of no requests
|
const CLIENT_STALE_MS = 30000;
|
||||||
|
|
||||||
function getActiveClients() {
|
function getActiveClients() {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
// Prune stale clients
|
|
||||||
for (const [key, client] of activeClients.entries()) {
|
for (const [key, client] of activeClients.entries()) {
|
||||||
if (now - client.lastSeen > CLIENT_STALE_MS) activeClients.delete(key);
|
if (client.type !== 'sse' && now - client.lastSeen > CLIENT_STALE_MS) {
|
||||||
|
activeClients.delete(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Array.from(activeClients.values());
|
return Array.from(activeClients.values());
|
||||||
}
|
}
|
||||||
@@ -758,4 +761,269 @@ router.get('/cover-art', requireAuth, async (req, res) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// SSE stream — pushes download data to the client on every poll cycle.
|
||||||
|
// Uses the browser's built-in EventSource API (no library required).
|
||||||
|
// Auth is enforced by requireAuth (emby_user cookie sent with the upgrade request).
|
||||||
|
// No CSRF token needed — SSE is a GET request (safe method, no state change).
|
||||||
|
router.get('/stream', requireAuth, async (req, res) => {
|
||||||
|
const user = req.user;
|
||||||
|
const username = user.name.toLowerCase();
|
||||||
|
const showAll = !!user.isAdmin && req.query.showAll === 'true';
|
||||||
|
|
||||||
|
// SSE headers — disable buffering at every layer
|
||||||
|
res.setHeader('Content-Type', 'text/event-stream');
|
||||||
|
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
||||||
|
res.setHeader('Connection', 'keep-alive');
|
||||||
|
res.setHeader('X-Accel-Buffering', 'no'); // nginx: disable proxy buffering
|
||||||
|
res.flushHeaders();
|
||||||
|
|
||||||
|
// Register as an active SSE client
|
||||||
|
activeClients.set(username, { user: user.name, type: 'sse', connectedAt: Date.now(), lastSeen: Date.now() });
|
||||||
|
console.log(`[SSE] Client connected: ${user.name}`);
|
||||||
|
|
||||||
|
// Helper: build and send the downloads payload for this user
|
||||||
|
async function sendDownloads() {
|
||||||
|
try {
|
||||||
|
// On-demand: trigger a fresh poll if cache is stale and polling is disabled
|
||||||
|
if (!POLLING_ENABLED && !cache.get('poll:sab-queue')) {
|
||||||
|
await pollAllServices();
|
||||||
|
}
|
||||||
|
|
||||||
|
const sabQueueData = cache.get('poll:sab-queue') || { slots: [] };
|
||||||
|
const sabHistoryData = cache.get('poll:sab-history') || { slots: [] };
|
||||||
|
const sonarrTagsResults = cache.get('poll:sonarr-tags') || [];
|
||||||
|
const sonarrQueueData = cache.get('poll:sonarr-queue') || { records: [] };
|
||||||
|
const sonarrHistoryData = cache.get('poll:sonarr-history') || { records: [] };
|
||||||
|
const radarrQueueData = cache.get('poll:radarr-queue') || { records: [] };
|
||||||
|
const radarrHistoryData = cache.get('poll:radarr-history') || { records: [] };
|
||||||
|
const radarrTagsData = cache.get('poll:radarr-tags') || [];
|
||||||
|
const qbittorrentTorrents = cache.get('poll:qbittorrent') || [];
|
||||||
|
|
||||||
|
const sabnzbdQueue = { data: { queue: sabQueueData } };
|
||||||
|
const sabnzbdHistory = { data: { history: sabHistoryData } };
|
||||||
|
const sonarrQueue = { data: sonarrQueueData };
|
||||||
|
const sonarrHistory = { data: sonarrHistoryData };
|
||||||
|
const radarrQueue = { data: radarrQueueData };
|
||||||
|
const radarrHistory = { data: radarrHistoryData };
|
||||||
|
const radarrTags = { data: radarrTagsData };
|
||||||
|
|
||||||
|
const seriesMap = new Map();
|
||||||
|
for (const r of sonarrQueue.data.records) {
|
||||||
|
if (r.series && r.seriesId) seriesMap.set(r.seriesId, r.series);
|
||||||
|
}
|
||||||
|
for (const r of sonarrHistory.data.records) {
|
||||||
|
if (r.series && r.seriesId && !seriesMap.has(r.seriesId)) seriesMap.set(r.seriesId, r.series);
|
||||||
|
}
|
||||||
|
const moviesMap = new Map();
|
||||||
|
for (const r of radarrQueue.data.records) {
|
||||||
|
if (r.movie && r.movieId) moviesMap.set(r.movieId, r.movie);
|
||||||
|
}
|
||||||
|
for (const r of radarrHistory.data.records) {
|
||||||
|
if (r.movie && r.movieId && !moviesMap.has(r.movieId)) moviesMap.set(r.movieId, r.movie);
|
||||||
|
}
|
||||||
|
|
||||||
|
const sonarrTagMap = new Map(sonarrTagsResults.flatMap(t => t.data || []).map(t => [t.id, t.label]));
|
||||||
|
const radarrTagMap = new Map(radarrTags.data.map(t => [t.id, t.label]));
|
||||||
|
const embyUserMap = showAll ? await getEmbyUsers() : new Map();
|
||||||
|
|
||||||
|
// Inline the matching logic (same as /user-downloads)
|
||||||
|
const userDownloads = [];
|
||||||
|
const isAdmin = !!user.isAdmin;
|
||||||
|
const usernameSanitized = sanitizeTagLabel(user.name);
|
||||||
|
const queueStatus = sabnzbdQueue.data.queue ? sabnzbdQueue.data.queue.status : null;
|
||||||
|
const queueSpeed = sabnzbdQueue.data.queue ? sabnzbdQueue.data.queue.speed : null;
|
||||||
|
const queueKbpersec = sabnzbdQueue.data.queue ? sabnzbdQueue.data.queue.kbpersec : null;
|
||||||
|
|
||||||
|
function getSlotStatusAndSpeed(slot) {
|
||||||
|
if (queueStatus === 'Paused') return { status: 'Paused', speed: '0' };
|
||||||
|
return { status: slot.status || 'Unknown', speed: queueSpeed || queueKbpersec || '0' };
|
||||||
|
}
|
||||||
|
|
||||||
|
// SABnzbd queue
|
||||||
|
if (sabnzbdQueue.data.queue && sabnzbdQueue.data.queue.slots) {
|
||||||
|
for (const slot of sabnzbdQueue.data.queue.slots) {
|
||||||
|
const nzbName = slot.filename || slot.nzbname;
|
||||||
|
if (!nzbName) continue;
|
||||||
|
const slotState = getSlotStatusAndSpeed(slot);
|
||||||
|
const nzbNameLower = nzbName.toLowerCase();
|
||||||
|
|
||||||
|
const sonarrMatch = sonarrQueue.data.records.find(r => {
|
||||||
|
const rTitle = (r.title || r.sourceTitle || '').toLowerCase();
|
||||||
|
return rTitle && (rTitle.includes(nzbNameLower) || nzbNameLower.includes(rTitle));
|
||||||
|
});
|
||||||
|
if (sonarrMatch && sonarrMatch.seriesId) {
|
||||||
|
const series = seriesMap.get(sonarrMatch.seriesId) || sonarrMatch.series;
|
||||||
|
if (series) {
|
||||||
|
const allTags = extractAllTags(series.tags, sonarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(series.tags, sonarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const dlObj = { type: 'series', title: nzbName, coverArt: getCoverArt(series), status: slotState.status, progress: slot.percentage, mb: slot.mb, mbmissing: slot.mbmissing, size: slot.size, speed: slotState.speed, eta: slot.timeleft, seriesName: series.title, episodeInfo: sonarrMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined };
|
||||||
|
const issues = getImportIssues(sonarrMatch);
|
||||||
|
if (issues) dlObj.importIssues = issues;
|
||||||
|
if (isAdmin) { dlObj.downloadPath = slot.storage || null; dlObj.targetPath = series.path || null; dlObj.arrLink = getSonarrLink(series); }
|
||||||
|
userDownloads.push(dlObj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const radarrMatch = radarrQueue.data.records.find(r => {
|
||||||
|
const rTitle = (r.title || r.sourceTitle || '').toLowerCase();
|
||||||
|
return rTitle && (rTitle.includes(nzbNameLower) || nzbNameLower.includes(rTitle));
|
||||||
|
});
|
||||||
|
if (radarrMatch && radarrMatch.movieId) {
|
||||||
|
const movie = moviesMap.get(radarrMatch.movieId) || radarrMatch.movie;
|
||||||
|
if (movie) {
|
||||||
|
const allTags = extractAllTags(movie.tags, radarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(movie.tags, radarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const dlObj = { type: 'movie', title: nzbName, coverArt: getCoverArt(movie), status: slotState.status, progress: slot.percentage, mb: slot.mb, mbmissing: slot.mbmissing, size: slot.size, speed: slotState.speed, eta: slot.timeleft, movieName: movie.title, movieInfo: radarrMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined };
|
||||||
|
const issues = getImportIssues(radarrMatch);
|
||||||
|
if (issues) dlObj.importIssues = issues;
|
||||||
|
if (isAdmin) { dlObj.downloadPath = slot.storage || null; dlObj.targetPath = movie.path || null; dlObj.arrLink = getRadarrLink(movie); }
|
||||||
|
userDownloads.push(dlObj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SABnzbd history
|
||||||
|
if (sabnzbdHistory.data.history && sabnzbdHistory.data.history.slots) {
|
||||||
|
for (const slot of sabnzbdHistory.data.history.slots) {
|
||||||
|
const nzbName = slot.name || slot.nzb_name || slot.nzbname;
|
||||||
|
if (!nzbName) continue;
|
||||||
|
const nzbNameLower = nzbName.toLowerCase();
|
||||||
|
|
||||||
|
const sonarrMatch = sonarrHistory.data.records.find(r => {
|
||||||
|
const rTitle = (r.sourceTitle || r.title || '').toLowerCase();
|
||||||
|
return rTitle && (rTitle.includes(nzbNameLower) || nzbNameLower.includes(rTitle));
|
||||||
|
});
|
||||||
|
if (sonarrMatch && sonarrMatch.seriesId) {
|
||||||
|
const series = seriesMap.get(sonarrMatch.seriesId) || sonarrMatch.series;
|
||||||
|
if (series) {
|
||||||
|
const allTags = extractAllTags(series.tags, sonarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(series.tags, sonarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const dlObj = { type: 'series', title: nzbName, coverArt: getCoverArt(series), status: slot.status, size: slot.size, completedAt: slot.completed_time, seriesName: series.title, episodeInfo: sonarrMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined };
|
||||||
|
if (isAdmin) { dlObj.downloadPath = slot.storage || null; dlObj.targetPath = series.path || null; dlObj.arrLink = getSonarrLink(series); }
|
||||||
|
userDownloads.push(dlObj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const radarrMatch = radarrHistory.data.records.find(r => {
|
||||||
|
const rTitle = (r.sourceTitle || r.title || '').toLowerCase();
|
||||||
|
return rTitle && (rTitle.includes(nzbNameLower) || nzbNameLower.includes(rTitle));
|
||||||
|
});
|
||||||
|
if (radarrMatch && radarrMatch.movieId) {
|
||||||
|
const movie = moviesMap.get(radarrMatch.movieId) || radarrMatch.movie;
|
||||||
|
if (movie) {
|
||||||
|
const allTags = extractAllTags(movie.tags, radarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(movie.tags, radarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const dlObj = { type: 'movie', title: nzbName, coverArt: getCoverArt(movie), status: slot.status, size: slot.size, completedAt: slot.completed_time, movieName: movie.title, movieInfo: radarrMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined };
|
||||||
|
if (isAdmin) { dlObj.downloadPath = slot.storage || null; dlObj.targetPath = movie.path || null; dlObj.arrLink = getRadarrLink(movie); }
|
||||||
|
userDownloads.push(dlObj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// qBittorrent
|
||||||
|
for (const torrent of qbittorrentTorrents) {
|
||||||
|
const torrentName = torrent.name || '';
|
||||||
|
if (!torrentName) continue;
|
||||||
|
const torrentNameLower = torrentName.toLowerCase();
|
||||||
|
|
||||||
|
const sonarrMatch = sonarrQueue.data.records.find(r => { const rTitle = (r.title || r.sourceTitle || '').toLowerCase(); return rTitle && (rTitle.includes(torrentNameLower) || torrentNameLower.includes(rTitle)); });
|
||||||
|
if (sonarrMatch && sonarrMatch.seriesId) {
|
||||||
|
const series = seriesMap.get(sonarrMatch.seriesId) || sonarrMatch.series;
|
||||||
|
if (series) {
|
||||||
|
const allTags = extractAllTags(series.tags, sonarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(series.tags, sonarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const download = mapTorrentToDownload(torrent);
|
||||||
|
Object.assign(download, { type: 'series', coverArt: getCoverArt(series), seriesName: series.title, episodeInfo: sonarrMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined });
|
||||||
|
const issues = getImportIssues(sonarrMatch); if (issues) download.importIssues = issues;
|
||||||
|
if (isAdmin) { download.downloadPath = download.savePath || null; download.targetPath = series.path || null; download.arrLink = getSonarrLink(series); }
|
||||||
|
userDownloads.push(download); continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const radarrMatch = radarrQueue.data.records.find(r => { const rTitle = (r.title || r.sourceTitle || '').toLowerCase(); return rTitle && (rTitle.includes(torrentNameLower) || torrentNameLower.includes(rTitle)); });
|
||||||
|
if (radarrMatch && radarrMatch.movieId) {
|
||||||
|
const movie = moviesMap.get(radarrMatch.movieId) || radarrMatch.movie;
|
||||||
|
if (movie) {
|
||||||
|
const allTags = extractAllTags(movie.tags, radarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(movie.tags, radarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const download = mapTorrentToDownload(torrent);
|
||||||
|
Object.assign(download, { type: 'movie', coverArt: getCoverArt(movie), movieName: movie.title, movieInfo: radarrMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined });
|
||||||
|
const issues = getImportIssues(radarrMatch); if (issues) download.importIssues = issues;
|
||||||
|
if (isAdmin) { download.downloadPath = download.savePath || null; download.targetPath = movie.path || null; download.arrLink = getRadarrLink(movie); }
|
||||||
|
userDownloads.push(download); continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const sonarrHistoryMatch = sonarrHistory.data.records.find(r => { const rTitle = (r.sourceTitle || r.title || '').toLowerCase(); return rTitle && (rTitle.includes(torrentNameLower) || torrentNameLower.includes(rTitle)); });
|
||||||
|
if (sonarrHistoryMatch && sonarrHistoryMatch.seriesId) {
|
||||||
|
const series = seriesMap.get(sonarrHistoryMatch.seriesId) || sonarrHistoryMatch.series;
|
||||||
|
if (series) {
|
||||||
|
const allTags = extractAllTags(series.tags, sonarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(series.tags, sonarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const download = mapTorrentToDownload(torrent);
|
||||||
|
Object.assign(download, { type: 'series', coverArt: getCoverArt(series), seriesName: series.title, episodeInfo: sonarrHistoryMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined });
|
||||||
|
if (isAdmin) { download.downloadPath = download.savePath || null; download.targetPath = series.path || null; download.arrLink = getSonarrLink(series); }
|
||||||
|
userDownloads.push(download); continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const radarrHistoryMatch = radarrHistory.data.records.find(r => { const rTitle = (r.sourceTitle || r.title || '').toLowerCase(); return rTitle && (rTitle.includes(torrentNameLower) || torrentNameLower.includes(rTitle)); });
|
||||||
|
if (radarrHistoryMatch && radarrHistoryMatch.movieId) {
|
||||||
|
const movie = moviesMap.get(radarrHistoryMatch.movieId) || radarrHistoryMatch.movie;
|
||||||
|
if (movie) {
|
||||||
|
const allTags = extractAllTags(movie.tags, radarrTagMap);
|
||||||
|
const matchedUserTag = extractUserTag(movie.tags, radarrTagMap, username);
|
||||||
|
if (showAll ? allTags.length > 0 : !!matchedUserTag) {
|
||||||
|
const download = mapTorrentToDownload(torrent);
|
||||||
|
Object.assign(download, { type: 'movie', coverArt: getCoverArt(movie), movieName: movie.title, movieInfo: radarrHistoryMatch, allTags, matchedUserTag: matchedUserTag || null, tagBadges: showAll ? buildTagBadges(allTags, embyUserMap) : undefined });
|
||||||
|
if (isAdmin) { download.downloadPath = download.savePath || null; download.targetPath = movie.path || null; download.arrLink = getRadarrLink(movie); }
|
||||||
|
userDownloads.push(download);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write SSE event
|
||||||
|
res.write(`data: ${JSON.stringify({ user: user.name, isAdmin, downloads: userDownloads })}\n\n`);
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[SSE] Error building payload:', sanitizeError(err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send initial data immediately
|
||||||
|
await sendDownloads();
|
||||||
|
|
||||||
|
// Subscribe to poll-complete notifications
|
||||||
|
onPollComplete(sendDownloads);
|
||||||
|
|
||||||
|
// 25s heartbeat comment to keep the connection alive through proxies/load-balancers
|
||||||
|
const heartbeat = setInterval(() => {
|
||||||
|
try { res.write(': heartbeat\n\n'); } catch { /* ignore — cleanup below handles it */ }
|
||||||
|
}, 25000);
|
||||||
|
|
||||||
|
// Cleanup on client disconnect
|
||||||
|
req.on('close', () => {
|
||||||
|
clearInterval(heartbeat);
|
||||||
|
offPollComplete(sendDownloads);
|
||||||
|
activeClients.delete(username);
|
||||||
|
console.log(`[SSE] Client disconnected: ${user.name}`);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
module.exports = router;
|
module.exports = router;
|
||||||
|
|||||||
@@ -16,6 +16,12 @@ const POLLING_ENABLED = POLL_INTERVAL > 0;
|
|||||||
let polling = false;
|
let polling = false;
|
||||||
let lastPollTimings = null;
|
let lastPollTimings = null;
|
||||||
|
|
||||||
|
// SSE subscribers: Set of () => void callbacks, each registered by an open /stream connection
|
||||||
|
const pollSubscribers = new Set();
|
||||||
|
|
||||||
|
function onPollComplete(cb) { pollSubscribers.add(cb); }
|
||||||
|
function offPollComplete(cb) { pollSubscribers.delete(cb); }
|
||||||
|
|
||||||
// Timed fetch helper: runs a fetch and records how long it took
|
// Timed fetch helper: runs a fetch and records how long it took
|
||||||
async function timed(label, fn) {
|
async function timed(label, fn) {
|
||||||
const t0 = Date.now();
|
const t0 = Date.now();
|
||||||
@@ -184,6 +190,11 @@ async function pollAllServices() {
|
|||||||
|
|
||||||
const elapsed = Date.now() - start;
|
const elapsed = Date.now() - start;
|
||||||
console.log(`[Poller] Poll complete in ${elapsed}ms`);
|
console.log(`[Poller] Poll complete in ${elapsed}ms`);
|
||||||
|
|
||||||
|
// Notify all SSE stream connections so they push fresh data immediately
|
||||||
|
for (const cb of pollSubscribers) {
|
||||||
|
try { cb(); } catch { /* subscriber already disconnected */ }
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(`[Poller] Poll error:`, err.message);
|
console.error(`[Poller] Poll error:`, err.message);
|
||||||
} finally {
|
} finally {
|
||||||
@@ -216,4 +227,4 @@ function getLastPollTimings() {
|
|||||||
return lastPollTimings;
|
return lastPollTimings;
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { startPoller, stopPoller, pollAllServices, getLastPollTimings, POLL_INTERVAL, POLLING_ENABLED };
|
module.exports = { startPoller, stopPoller, pollAllServices, getLastPollTimings, onPollComplete, offPollComplete, POLL_INTERVAL, POLLING_ENABLED };
|
||||||
|
|||||||
Reference in New Issue
Block a user