refactor: use qBittorrent Sync API (/api/v2/sync/maindata) with fallback
All checks were successful
Docs Check / Markdown lint (push) Successful in 51s
Licence Check / Licence compatibility and copyright header verification (push) Successful in 1m16s
CI / Tests & coverage (push) Successful in 1m37s
CI / Security audit (push) Successful in 1m44s
Docs Check / Mermaid diagram parse check (push) Successful in 1m52s
All checks were successful
Docs Check / Markdown lint (push) Successful in 51s
Licence Check / Licence compatibility and copyright header verification (push) Successful in 1m16s
CI / Tests & coverage (push) Successful in 1m37s
CI / Security audit (push) Successful in 1m44s
Docs Check / Mermaid diagram parse check (push) Successful in 1m52s
- QBittorrentClient now uses the incremental Sync API instead of repeatedly fetching the full torrent list via /api/v2/torrents/info. - Per-client state: lastRid, torrentMap, fallbackThisCycle. - Handles full_update, delta updates, and torrents_removed. - Falls back to legacy torrents/info at most once per poll cycle. - getAllTorrents() resets fallback flags before each cycle. - Added 9 new unit tests covering: first sync, delta merge, full_update, torrents_removed, fallback path, direct-legacy-after-fallback, 403 re-auth, completed-field computation, and fallback reset.
This commit is contained in:
@@ -226,7 +226,7 @@ sofarr/
|
||||
|
||||
**`poller.js`** — Background polling engine. Fetches data from all configured service instances in parallel with per-task timing. Stores results in the cache with a configurable TTL. Can be disabled entirely (`POLL_INTERVAL=0`), in which case data is fetched on-demand. After each successful poll it notifies all registered SSE subscriber callbacks so connected clients receive data immediately.
|
||||
|
||||
**`qbittorrent.js`** — `QBittorrentClient` class with cookie-based authentication, automatic re-auth on 403, and persistent client instances. Includes torrent-to-download mapping (`mapTorrentToDownload`) and formatting utilities (`formatBytes`, `formatSpeed`, `formatEta`).
|
||||
**`qbittorrent.js`** — `QBittorrentClient` class with cookie-based authentication, automatic re-auth on 403, and persistent client instances. **Uses the qBittorrent Sync API (`/api/v2/sync/maindata`) for incremental updates**: the first call sends `rid=0` for a full list; subsequent calls send the last `rid` to receive delta updates only (changed fields + removed hashes). If the Sync API fails, it falls back once per poll cycle to the legacy `GET /api/v2/torrents/info`. Includes torrent-to-download mapping (`mapTorrentToDownload`) and formatting utilities (`formatBytes`, `formatSpeed`, `formatEta`).
|
||||
|
||||
**`tokenStore.js`** — JSON file-backed store (`DATA_DIR/tokens.json`) for Emby `AccessToken`s. Tokens are stored server-side and **never sent to the client**. Writes are atomic (write to `.tmp` then rename). Entries expire after 31 days (slightly longer than the maximum 30-day cookie). Pruning runs on startup and hourly.
|
||||
|
||||
@@ -254,10 +254,32 @@ Every `POLL_INTERVAL` ms (default 5000), the poller fetches from all services in
|
||||
| Radarr Queue | `GET /api/v3/queue` | `includeMovie=true` |
|
||||
| Radarr History | `GET /api/v3/history` | `pageSize=10` |
|
||||
| Radarr Tags | `GET /api/v3/tag` | — |
|
||||
| qBittorrent | `GET /api/v2/torrents/info` | — |
|
||||
| qBittorrent | `GET /api/v2/sync/maindata?rid=N` | Fallback to `GET /api/v2/torrents/info` |
|
||||
|
||||
Results are stored in the cache under `poll:*` keys with a TTL of `POLL_INTERVAL × 3`.
|
||||
|
||||
#### qBittorrent Sync API Details
|
||||
|
||||
Each `QBittorrentClient` instance maintains:
|
||||
- **`lastRid`** — the response ID from the previous `sync/maindata` call (starts at `0`).
|
||||
- **`torrentMap`** — a `Map<hash, torrent>` holding the complete state for every known torrent on this qBittorrent instance.
|
||||
- **`fallbackThisCycle`** — boolean tracking whether this poll cycle has already fallen back to the legacy endpoint.
|
||||
|
||||
**Flow per poll cycle:**
|
||||
|
||||
1. `getAllTorrents()` resets `fallbackThisCycle = false` on every client.
|
||||
2. `client.getTorrents()` attempts `GET /api/v2/sync/maindata?rid={lastRid}`.
|
||||
3. qBittorrent returns:
|
||||
- `rid` — new response ID to use next time.
|
||||
- `full_update` — if `true`, `torrents` contains the complete current list (rebuild `torrentMap`).
|
||||
- `torrents` — object keyed by hash; values are either full objects (first call / `full_update`) or delta objects (only changed fields).
|
||||
- `torrents_removed` — array of hashes to delete from `torrentMap`.
|
||||
4. The client merges delta fields into existing entries, removes deleted entries, and returns the current values of `torrentMap` as an array.
|
||||
5. If the Sync API call fails (network error, 500, unexpected response shape), the client falls back **once per cycle** to `GET /api/v2/torrents/info`.
|
||||
6. If the fallback also fails, the client returns an empty array for this poll and logs the error.
|
||||
|
||||
**Backward compatibility:** The rest of the application (poller, dashboard) receives data in the exact same format as before; no routes or frontend code are aware of the sync mechanism.
|
||||
|
||||
### 5.2 SSE Stream
|
||||
|
||||
When a browser opens `GET /api/dashboard/stream` (after authentication):
|
||||
|
||||
@@ -11,6 +11,10 @@ class QBittorrentClient {
|
||||
this.username = instance.username;
|
||||
this.password = instance.password;
|
||||
this.authCookie = null;
|
||||
// Sync API incremental state
|
||||
this.lastRid = 0;
|
||||
this.torrentMap = new Map();
|
||||
this.fallbackThisCycle = false;
|
||||
}
|
||||
|
||||
async login() {
|
||||
@@ -80,19 +84,110 @@ class QBittorrentClient {
|
||||
}
|
||||
}
|
||||
|
||||
async getTorrents() {
|
||||
/**
|
||||
* Fetches incremental torrent data using the qBittorrent Sync API.
|
||||
*
|
||||
* The Sync API uses a response ID (rid) to send only changed fields:
|
||||
* - First call uses rid=0 to get the full torrent list.
|
||||
* - Subsequent calls send the last received rid; qBittorrent returns
|
||||
* delta updates (changed fields only), new torrents, and removed hashes.
|
||||
* - If full_update is true, the server is sending a full refresh and
|
||||
* we rebuild our local map from scratch.
|
||||
*
|
||||
* @returns {Promise<Array>} Array of complete torrent objects.
|
||||
*/
|
||||
async getMainData() {
|
||||
const response = await this.makeRequest(`/api/v2/sync/maindata?rid=${this.lastRid}`);
|
||||
const data = response.data;
|
||||
|
||||
if (data.full_update) {
|
||||
// Full refresh: rebuild the entire map
|
||||
this.torrentMap.clear();
|
||||
if (data.torrents) {
|
||||
for (const [hash, props] of Object.entries(data.torrents)) {
|
||||
this.torrentMap.set(hash, { ...props, hash });
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Delta update: merge changed fields into existing torrent objects
|
||||
if (data.torrents) {
|
||||
for (const [hash, delta] of Object.entries(data.torrents)) {
|
||||
const existing = this.torrentMap.get(hash) || { hash };
|
||||
this.torrentMap.set(hash, { ...existing, ...delta });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove torrents that the server reports as deleted
|
||||
if (data.torrents_removed) {
|
||||
for (const hash of data.torrents_removed) {
|
||||
this.torrentMap.delete(hash);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure every torrent has a computed 'completed' field for downstream consumers
|
||||
for (const torrent of this.torrentMap.values()) {
|
||||
if (torrent.completed === undefined && torrent.size !== undefined && torrent.progress !== undefined) {
|
||||
torrent.completed = Math.round(torrent.size * torrent.progress);
|
||||
}
|
||||
}
|
||||
|
||||
this.lastRid = data.rid;
|
||||
return Array.from(this.torrentMap.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Legacy full-list fetch. Used as a fallback when the Sync API fails.
|
||||
*/
|
||||
async getTorrentsLegacy() {
|
||||
try {
|
||||
const response = await this.makeRequest('/api/v2/torrents/info');
|
||||
logToFile(`[qBittorrent:${this.name}] Retrieved ${response.data.length} torrents`);
|
||||
// Add instance info to each torrent
|
||||
return response.data.map(torrent => ({
|
||||
logToFile(`[qBittorrent:${this.name}] Retrieved ${response.data.length} torrents (legacy)`);
|
||||
return response.data;
|
||||
} catch (error) {
|
||||
logToFile(`[qBittorrent:${this.name}] Error fetching torrents (legacy): ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current list of torrents for this instance.
|
||||
* Uses the Sync API for incremental updates; falls back to torrents/info
|
||||
* at most once per polling cycle if the Sync API call fails.
|
||||
*/
|
||||
async getTorrents() {
|
||||
try {
|
||||
if (this.fallbackThisCycle) {
|
||||
logToFile(`[qBittorrent:${this.name}] Already fell back this cycle, using legacy`);
|
||||
const torrents = await this.getTorrentsLegacy();
|
||||
return torrents.map(torrent => ({
|
||||
...torrent,
|
||||
instanceId: this.id,
|
||||
instanceName: this.name
|
||||
}));
|
||||
}
|
||||
|
||||
const torrents = await this.getMainData();
|
||||
logToFile(`[qBittorrent:${this.name}] Sync: ${torrents.length} torrents (rid=${this.lastRid})`);
|
||||
return torrents.map(torrent => ({
|
||||
...torrent,
|
||||
instanceId: this.id,
|
||||
instanceName: this.name
|
||||
}));
|
||||
} catch (error) {
|
||||
logToFile(`[qBittorrent:${this.name}] Error fetching torrents: ${error.message}`);
|
||||
return [];
|
||||
logToFile(`[qBittorrent:${this.name}] Sync failed, falling back to legacy: ${error.message}`);
|
||||
this.fallbackThisCycle = true;
|
||||
try {
|
||||
const torrents = await this.getTorrentsLegacy();
|
||||
return torrents.map(torrent => ({
|
||||
...torrent,
|
||||
instanceId: this.id,
|
||||
instanceName: this.name
|
||||
}));
|
||||
} catch (fallbackError) {
|
||||
logToFile(`[qBittorrent:${this.name}] Fallback also failed: ${fallbackError.message}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -117,7 +212,13 @@ async function getAllTorrents() {
|
||||
if (clients.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
|
||||
// Reset fallback flags at the start of each poll cycle so every cycle
|
||||
// gets one chance to use the Sync API before falling back.
|
||||
for (const client of clients) {
|
||||
client.fallbackThisCycle = false;
|
||||
}
|
||||
|
||||
const results = await Promise.all(
|
||||
clients.map(client => client.getTorrents().catch(err => {
|
||||
logToFile(`[qBittorrent] Error from ${client.name}: ${err.message}`);
|
||||
@@ -216,5 +317,6 @@ module.exports = {
|
||||
mapTorrentToDownload,
|
||||
formatBytes,
|
||||
formatSpeed,
|
||||
formatEta
|
||||
formatEta,
|
||||
QBittorrentClient
|
||||
};
|
||||
|
||||
@@ -7,7 +7,8 @@
|
||||
* dashboard card rendering so correctness matters for UX.
|
||||
*/
|
||||
|
||||
import { mapTorrentToDownload, formatBytes, formatSpeed, formatEta } from '../../server/utils/qbittorrent.js';
|
||||
import { mapTorrentToDownload, formatBytes, formatSpeed, formatEta, QBittorrentClient } from '../../server/utils/qbittorrent.js';
|
||||
import nock from 'nock';
|
||||
|
||||
// Minimal torrent fixture that satisfies mapTorrentToDownload's expectations
|
||||
function makeTorrent(overrides = {}) {
|
||||
@@ -32,6 +33,35 @@ function makeTorrent(overrides = {}) {
|
||||
};
|
||||
}
|
||||
|
||||
const QBT_URL = 'http://qbittorrent.test:8080';
|
||||
|
||||
function makeClient(overrides = {}) {
|
||||
return new QBittorrentClient({
|
||||
id: 'test-qbt',
|
||||
name: 'TestQBT',
|
||||
url: QBT_URL,
|
||||
username: 'admin',
|
||||
password: 'adminadmin',
|
||||
...overrides
|
||||
});
|
||||
}
|
||||
|
||||
function mockLogin() {
|
||||
return nock(QBT_URL)
|
||||
.post('/api/v2/auth/login')
|
||||
.reply(200, {}, { 'set-cookie': ['SID=abc123; path=/'] });
|
||||
}
|
||||
|
||||
function mockSync(rid, response) {
|
||||
return nock(QBT_URL)
|
||||
.get(`/api/v2/sync/maindata?rid=${rid}`)
|
||||
.reply(200, response);
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
nock.cleanAll();
|
||||
});
|
||||
|
||||
describe('formatBytes', () => {
|
||||
it('formats 0 bytes', () => expect(formatBytes(0)).toBe('0 B'));
|
||||
it('formats bytes', () => expect(formatBytes(512)).toBe('512 B'));
|
||||
@@ -110,3 +140,230 @@ describe('mapTorrentToDownload', () => {
|
||||
expect(result.savePath).toBe('/dl/');
|
||||
});
|
||||
});
|
||||
|
||||
describe('QBittorrentClient sync API', () => {
|
||||
it('first call uses rid=0 and returns full torrent list', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
mockSync(0, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'Test1', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(1);
|
||||
expect(torrents[0].name).toBe('Test1');
|
||||
expect(torrents[0].instanceId).toBe('test-qbt');
|
||||
expect(torrents[0].hash).toBe('hash01');
|
||||
expect(client.lastRid).toBe(1);
|
||||
});
|
||||
|
||||
it('subsequent call uses last rid and merges delta', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
// First call
|
||||
mockSync(0, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'Test1', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
await client.getTorrents();
|
||||
|
||||
// Second call — delta
|
||||
mockSync(1, {
|
||||
rid: 2,
|
||||
full_update: false,
|
||||
torrents: {
|
||||
hash01: { dlspeed: 200 }
|
||||
}
|
||||
});
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(1);
|
||||
expect(torrents[0].dlspeed).toBe(200);
|
||||
expect(torrents[0].name).toBe('Test1');
|
||||
expect(client.lastRid).toBe(2);
|
||||
});
|
||||
|
||||
it('handles full_update=true on subsequent call', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
// First call
|
||||
mockSync(0, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'Test1', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
await client.getTorrents();
|
||||
|
||||
// Server forces full refresh
|
||||
mockSync(1, {
|
||||
rid: 2,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash02: { name: 'Test2', state: 'uploading', size: 2000, progress: 1.0, dlspeed: 0, eta: 0, num_seeds: 10, num_leechs: 0, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(1);
|
||||
expect(torrents[0].name).toBe('Test2');
|
||||
expect(torrents[0].hash).toBe('hash02');
|
||||
expect(client.lastRid).toBe(2);
|
||||
});
|
||||
|
||||
it('removes torrents when torrents_removed is present', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
mockSync(0, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'Test1', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
await client.getTorrents();
|
||||
|
||||
mockSync(1, {
|
||||
rid: 2,
|
||||
full_update: false,
|
||||
torrents_removed: ['hash01']
|
||||
});
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('falls back to torrents/info when sync fails', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
// Sync fails with 500
|
||||
nock(QBT_URL)
|
||||
.get('/api/v2/sync/maindata?rid=0')
|
||||
.reply(500, { error: 'Internal Server Error' });
|
||||
|
||||
// Legacy succeeds
|
||||
nock(QBT_URL)
|
||||
.get('/api/v2/torrents/info')
|
||||
.reply(200, [
|
||||
{ name: 'Fallback', hash: 'fb01', state: 'downloading', size: 1073741824, progress: 0.5, dlspeed: 1048576, eta: 512, num_seeds: 10, num_leechs: 3, availability: 1.0 }
|
||||
]);
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(1);
|
||||
expect(torrents[0].name).toBe('Fallback');
|
||||
expect(client.fallbackThisCycle).toBe(true);
|
||||
});
|
||||
|
||||
it('uses legacy immediately if already fell back this cycle', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
client.fallbackThisCycle = true;
|
||||
|
||||
// Only legacy should be called
|
||||
nock(QBT_URL)
|
||||
.get('/api/v2/torrents/info')
|
||||
.reply(200, [
|
||||
{ name: 'DirectLegacy', hash: 'dl01', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
]);
|
||||
|
||||
// Ensure sync is NOT called
|
||||
const syncScope = nock(QBT_URL)
|
||||
.get('/api/v2/sync/maindata?rid=0')
|
||||
.reply(200, { rid: 1, full_update: true });
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(1);
|
||||
expect(torrents[0].name).toBe('DirectLegacy');
|
||||
expect(syncScope.isDone()).toBe(false);
|
||||
});
|
||||
|
||||
it('re-authenticates on 403 during sync and retries', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
// First sync call returns 403
|
||||
nock(QBT_URL)
|
||||
.get('/api/v2/sync/maindata?rid=0')
|
||||
.reply(403, {});
|
||||
|
||||
// Re-login
|
||||
nock(QBT_URL)
|
||||
.post('/api/v2/auth/login')
|
||||
.reply(200, {}, { 'set-cookie': ['SID=newtoken; path=/'] });
|
||||
|
||||
// Retry succeeds
|
||||
nock(QBT_URL)
|
||||
.get('/api/v2/sync/maindata?rid=0')
|
||||
.reply(200, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'AfterReauth', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents).toHaveLength(1);
|
||||
expect(torrents[0].name).toBe('AfterReauth');
|
||||
});
|
||||
|
||||
it('computes completed from size and progress when missing', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
|
||||
mockSync(0, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'NoCompleted', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents[0].completed).toBe(500);
|
||||
});
|
||||
|
||||
it('resets fallback flag when getAllTorrents resets it', async () => {
|
||||
mockLogin();
|
||||
const client = makeClient();
|
||||
await client.login();
|
||||
client.fallbackThisCycle = true;
|
||||
|
||||
// After reset, sync should be attempted
|
||||
mockSync(0, {
|
||||
rid: 1,
|
||||
full_update: true,
|
||||
torrents: {
|
||||
hash01: { name: 'ResetWorks', state: 'downloading', size: 1000, progress: 0.5, dlspeed: 100, eta: 60, num_seeds: 5, num_leechs: 2, availability: 1.0 }
|
||||
}
|
||||
});
|
||||
|
||||
// Simulate the reset that getAllTorrents performs
|
||||
client.fallbackThisCycle = false;
|
||||
const torrents = await client.getTorrents();
|
||||
expect(torrents[0].name).toBe('ResetWorks');
|
||||
expect(client.fallbackThisCycle).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user