diff --git a/e2e/src/api/specs/jobs.e2e-spec.ts b/e2e/src/api/specs/jobs.e2e-spec.ts index a9afd8475..be7984404 100644 --- a/e2e/src/api/specs/jobs.e2e-spec.ts +++ b/e2e/src/api/specs/jobs.e2e-spec.ts @@ -1,4 +1,4 @@ -import { JobCommand, JobName, LoginResponseDto, updateConfig } from '@immich/sdk'; +import { LoginResponseDto, QueueCommand, QueueName, updateConfig } from '@immich/sdk'; import { cpSync, rmSync } from 'node:fs'; import { readFile } from 'node:fs/promises'; import { basename } from 'node:path'; @@ -17,28 +17,28 @@ describe('/jobs', () => { describe('PUT /jobs', () => { afterEach(async () => { - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.FaceDetection, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.FaceDetection, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.SmartSearch, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.SmartSearch, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.DuplicateDetection, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.DuplicateDetection, { + command: QueueCommand.Resume, force: false, }); @@ -59,8 +59,8 @@ describe('/jobs', () => { it('should queue metadata extraction for missing assets', async () => { const path = `${testAssetDir}/formats/raw/Nikon/D700/philadelphia.nef`; - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Pause, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Pause, force: false, }); @@ -77,20 +77,20 @@ describe('/jobs', () => { expect(asset.exifInfo?.make).toBeNull(); } - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Empty, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Empty, force: false, }); await utils.waitForQueueFinish(admin.accessToken, 'metadataExtraction'); - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Start, force: false, }); @@ -124,8 +124,8 @@ describe('/jobs', () => { cpSync(`${testAssetDir}/formats/raw/Nikon/D80/glarus.nef`, path); - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Start, force: false, }); @@ -144,8 +144,8 @@ describe('/jobs', () => { it('should queue thumbnail extraction for assets missing thumbs', async () => { const path = `${testAssetDir}/albums/nature/tanners_ridge.jpg`; - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Pause, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Pause, force: false, }); @@ -153,32 +153,32 @@ describe('/jobs', () => { assetData: { bytes: await readFile(path), filename: basename(path) }, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetBefore = await utils.getAssetInfo(admin.accessToken, id); expect(assetBefore.thumbhash).toBeNull(); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Empty, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Empty, force: false, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Start, force: false, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetAfter = await utils.getAssetInfo(admin.accessToken, id); expect(assetAfter.thumbhash).not.toBeNull(); @@ -193,26 +193,26 @@ describe('/jobs', () => { assetData: { bytes: await readFile(path), filename: basename(path) }, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetBefore = await utils.getAssetInfo(admin.accessToken, id); cpSync(`${testAssetDir}/albums/nature/notocactus_minimus.jpg`, path); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Resume, force: false, }); // This runs the missing thumbnail job - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Start, force: false, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetAfter = await utils.getAssetInfo(admin.accessToken, id); diff --git a/e2e/src/api/specs/user-admin.e2e-spec.ts b/e2e/src/api/specs/user-admin.e2e-spec.ts index 2d6e08b5f..793c508a3 100644 --- a/e2e/src/api/specs/user-admin.e2e-spec.ts +++ b/e2e/src/api/specs/user-admin.e2e-spec.ts @@ -1,6 +1,6 @@ import { - JobName, LoginResponseDto, + QueueName, createStack, deleteUserAdmin, getMyUser, @@ -328,7 +328,7 @@ describe('/admin/users', () => { { headers: asBearerAuth(user.accessToken) }, ); - await utils.waitForQueueFinish(admin.accessToken, JobName.BackgroundTask); + await utils.waitForQueueFinish(admin.accessToken, QueueName.BackgroundTask); const { status, body } = await request(app) .delete(`/admin/users/${user.userId}`) diff --git a/e2e/src/utils.ts b/e2e/src/utils.ts index b33d6cb19..8f34bbe40 100644 --- a/e2e/src/utils.ts +++ b/e2e/src/utils.ts @@ -1,5 +1,4 @@ import { - AllJobStatusResponseDto, AssetMediaCreateDto, AssetMediaResponseDto, AssetResponseDto, @@ -7,11 +6,12 @@ import { CheckExistingAssetsDto, CreateAlbumDto, CreateLibraryDto, - JobCommandDto, - JobName, MetadataSearchDto, Permission, PersonCreateDto, + QueueCommandDto, + QueueName, + QueuesResponseDto, SharedLinkCreateDto, UpdateLibraryDto, UserAdminCreateDto, @@ -27,14 +27,14 @@ import { createStack, createUserAdmin, deleteAssets, - getAllJobsStatus, getAssetInfo, getConfig, getConfigDefaults, + getQueuesLegacy, login, + runQueueCommandLegacy, scanLibrary, searchAssets, - sendJobCommand, setBaseUrl, signUpAdmin, tagAssets, @@ -477,8 +477,8 @@ export const utils = { tagAssets: (accessToken: string, tagId: string, assetIds: string[]) => tagAssets({ id: tagId, bulkIdsDto: { ids: assetIds } }, { headers: asBearerAuth(accessToken) }), - jobCommand: async (accessToken: string, jobName: JobName, jobCommandDto: JobCommandDto) => - sendJobCommand({ id: jobName, jobCommandDto }, { headers: asBearerAuth(accessToken) }), + queueCommand: async (accessToken: string, name: QueueName, queueCommandDto: QueueCommandDto) => + runQueueCommandLegacy({ name, queueCommandDto }, { headers: asBearerAuth(accessToken) }), setAuthCookies: async (context: BrowserContext, accessToken: string, domain = '127.0.0.1') => await context.addCookies([ @@ -524,13 +524,13 @@ export const utils = { await updateConfig({ systemConfigDto: defaultConfig }, { headers: asBearerAuth(accessToken) }); }, - isQueueEmpty: async (accessToken: string, queue: keyof AllJobStatusResponseDto) => { - const queues = await getAllJobsStatus({ headers: asBearerAuth(accessToken) }); + isQueueEmpty: async (accessToken: string, queue: keyof QueuesResponseDto) => { + const queues = await getQueuesLegacy({ headers: asBearerAuth(accessToken) }); const jobCounts = queues[queue].jobCounts; return !jobCounts.active && !jobCounts.waiting; }, - waitForQueueFinish: (accessToken: string, queue: keyof AllJobStatusResponseDto, ms?: number) => { + waitForQueueFinish: (accessToken: string, queue: keyof QueuesResponseDto, ms?: number) => { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { const timeout = setTimeout(() => reject(new Error('Timed out waiting for queue to empty')), ms || 10_000); diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index ff04a91de..5e93c571b 100644 Binary files a/mobile/openapi/README.md and b/mobile/openapi/README.md differ diff --git a/mobile/openapi/lib/api.dart b/mobile/openapi/lib/api.dart index d0ac141ba..c64295837 100644 Binary files a/mobile/openapi/lib/api.dart and b/mobile/openapi/lib/api.dart differ diff --git a/mobile/openapi/lib/api/jobs_api.dart b/mobile/openapi/lib/api/jobs_api.dart index e783f93c7..906dce6d3 100644 Binary files a/mobile/openapi/lib/api/jobs_api.dart and b/mobile/openapi/lib/api/jobs_api.dart differ diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index cf6784fb8..373a4b9d8 100644 Binary files a/mobile/openapi/lib/api_client.dart and b/mobile/openapi/lib/api_client.dart differ diff --git a/mobile/openapi/lib/api_helper.dart b/mobile/openapi/lib/api_helper.dart index 1d197a8f9..5c21009a0 100644 Binary files a/mobile/openapi/lib/api_helper.dart and b/mobile/openapi/lib/api_helper.dart differ diff --git a/mobile/openapi/lib/model/job_command.dart b/mobile/openapi/lib/model/job_command.dart deleted file mode 100644 index 46ca7db68..000000000 Binary files a/mobile/openapi/lib/model/job_command.dart and /dev/null differ diff --git a/mobile/openapi/lib/model/job_name.dart b/mobile/openapi/lib/model/job_name.dart deleted file mode 100644 index bbb911110..000000000 Binary files a/mobile/openapi/lib/model/job_name.dart and /dev/null differ diff --git a/mobile/openapi/lib/model/queue_command.dart b/mobile/openapi/lib/model/queue_command.dart new file mode 100644 index 000000000..f03ec6ecc Binary files /dev/null and b/mobile/openapi/lib/model/queue_command.dart differ diff --git a/mobile/openapi/lib/model/job_command_dto.dart b/mobile/openapi/lib/model/queue_command_dto.dart similarity index 66% rename from mobile/openapi/lib/model/job_command_dto.dart rename to mobile/openapi/lib/model/queue_command_dto.dart index 32274037f..ded848c12 100644 Binary files a/mobile/openapi/lib/model/job_command_dto.dart and b/mobile/openapi/lib/model/queue_command_dto.dart differ diff --git a/mobile/openapi/lib/model/queue_name.dart b/mobile/openapi/lib/model/queue_name.dart new file mode 100644 index 000000000..7b8214e20 Binary files /dev/null and b/mobile/openapi/lib/model/queue_name.dart differ diff --git a/mobile/openapi/lib/model/job_status_dto.dart b/mobile/openapi/lib/model/queue_response_dto.dart similarity index 62% rename from mobile/openapi/lib/model/job_status_dto.dart rename to mobile/openapi/lib/model/queue_response_dto.dart index 18fab8dfb..b20449f72 100644 Binary files a/mobile/openapi/lib/model/job_status_dto.dart and b/mobile/openapi/lib/model/queue_response_dto.dart differ diff --git a/mobile/openapi/lib/model/job_counts_dto.dart b/mobile/openapi/lib/model/queue_statistics_dto.dart similarity index 70% rename from mobile/openapi/lib/model/job_counts_dto.dart rename to mobile/openapi/lib/model/queue_statistics_dto.dart index afc90d108..c27c4a589 100644 Binary files a/mobile/openapi/lib/model/job_counts_dto.dart and b/mobile/openapi/lib/model/queue_statistics_dto.dart differ diff --git a/mobile/openapi/lib/model/all_job_status_response_dto.dart b/mobile/openapi/lib/model/queues_response_dto.dart similarity index 57% rename from mobile/openapi/lib/model/all_job_status_response_dto.dart rename to mobile/openapi/lib/model/queues_response_dto.dart index 291bec439..b20f8c3c0 100644 Binary files a/mobile/openapi/lib/model/all_job_status_response_dto.dart and b/mobile/openapi/lib/model/queues_response_dto.dart differ diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index e4fc5ddd9..7ae48eaf8 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -4836,14 +4836,14 @@ "/jobs": { "get": { "description": "Retrieve the counts of the current queue, as well as the current status.", - "operationId": "getAllJobsStatus", + "operationId": "getQueuesLegacy", "parameters": [], "responses": { "200": { "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/AllJobStatusResponseDto" + "$ref": "#/components/schemas/QueuesResponseDto" } } }, @@ -4936,17 +4936,17 @@ "x-immich-state": "Stable" } }, - "/jobs/{id}": { + "/jobs/{name}": { "put": { "description": "Queue all assets for a specific job type. Defaults to only queueing assets that have not yet been processed, but the force command can be used to re-process all assets.", - "operationId": "sendJobCommand", + "operationId": "runQueueCommandLegacy", "parameters": [ { - "name": "id", + "name": "name", "required": true, "in": "path", "schema": { - "$ref": "#/components/schemas/JobName" + "$ref": "#/components/schemas/QueueName" } } ], @@ -4954,7 +4954,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/JobCommandDto" + "$ref": "#/components/schemas/QueueCommandDto" } } }, @@ -4965,7 +4965,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/JobStatusDto" + "$ref": "#/components/schemas/QueueResponseDto" } } }, @@ -14084,77 +14084,6 @@ }, "type": "object" }, - "AllJobStatusResponseDto": { - "properties": { - "backgroundTask": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "backupDatabase": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "duplicateDetection": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "faceDetection": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "facialRecognition": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "library": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "metadataExtraction": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "migration": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "notifications": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "ocr": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "search": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "sidecar": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "smartSearch": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "storageTemplateMigration": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "thumbnailGeneration": { - "$ref": "#/components/schemas/JobStatusDto" - }, - "videoConversion": { - "$ref": "#/components/schemas/JobStatusDto" - } - }, - "required": [ - "backgroundTask", - "backupDatabase", - "duplicateDetection", - "faceDetection", - "facialRecognition", - "library", - "metadataExtraction", - "migration", - "notifications", - "ocr", - "search", - "sidecar", - "smartSearch", - "storageTemplateMigration", - "thumbnailGeneration", - "videoConversion" - ], - "type": "object" - }, "AssetBulkDeleteDto": { "properties": { "force": { @@ -15866,65 +15795,6 @@ ], "type": "string" }, - "JobCommand": { - "enum": [ - "start", - "pause", - "resume", - "empty", - "clear-failed" - ], - "type": "string" - }, - "JobCommandDto": { - "properties": { - "command": { - "allOf": [ - { - "$ref": "#/components/schemas/JobCommand" - } - ] - }, - "force": { - "type": "boolean" - } - }, - "required": [ - "command" - ], - "type": "object" - }, - "JobCountsDto": { - "properties": { - "active": { - "type": "integer" - }, - "completed": { - "type": "integer" - }, - "delayed": { - "type": "integer" - }, - "failed": { - "type": "integer" - }, - "paused": { - "type": "integer" - }, - "waiting": { - "type": "integer" - } - }, - "required": [ - "active", - "completed", - "delayed", - "failed", - "paused", - "waiting" - ], - "type": "object" - }, "JobCreateDto": { "properties": { "name": { @@ -15940,27 +15810,6 @@ ], "type": "object" }, - "JobName": { - "enum": [ - "thumbnailGeneration", - "metadataExtraction", - "videoConversion", - "faceDetection", - "facialRecognition", - "smartSearch", - "duplicateDetection", - "backgroundTask", - "storageTemplateMigration", - "migration", - "search", - "sidecar", - "library", - "notifications", - "backupDatabase", - "ocr" - ], - "type": "string" - }, "JobSettingsDto": { "properties": { "concurrency": { @@ -15973,21 +15822,6 @@ ], "type": "object" }, - "JobStatusDto": { - "properties": { - "jobCounts": { - "$ref": "#/components/schemas/JobCountsDto" - }, - "queueStatus": { - "$ref": "#/components/schemas/QueueStatusDto" - } - }, - "required": [ - "jobCounts", - "queueStatus" - ], - "type": "object" - }, "LibraryResponseDto": { "properties": { "assetCount": { @@ -17559,6 +17393,101 @@ }, "type": "object" }, + "QueueCommand": { + "enum": [ + "start", + "pause", + "resume", + "empty", + "clear-failed" + ], + "type": "string" + }, + "QueueCommandDto": { + "properties": { + "command": { + "allOf": [ + { + "$ref": "#/components/schemas/QueueCommand" + } + ] + }, + "force": { + "type": "boolean" + } + }, + "required": [ + "command" + ], + "type": "object" + }, + "QueueName": { + "enum": [ + "thumbnailGeneration", + "metadataExtraction", + "videoConversion", + "faceDetection", + "facialRecognition", + "smartSearch", + "duplicateDetection", + "backgroundTask", + "storageTemplateMigration", + "migration", + "search", + "sidecar", + "library", + "notifications", + "backupDatabase", + "ocr" + ], + "type": "string" + }, + "QueueResponseDto": { + "properties": { + "jobCounts": { + "$ref": "#/components/schemas/QueueStatisticsDto" + }, + "queueStatus": { + "$ref": "#/components/schemas/QueueStatusDto" + } + }, + "required": [ + "jobCounts", + "queueStatus" + ], + "type": "object" + }, + "QueueStatisticsDto": { + "properties": { + "active": { + "type": "integer" + }, + "completed": { + "type": "integer" + }, + "delayed": { + "type": "integer" + }, + "failed": { + "type": "integer" + }, + "paused": { + "type": "integer" + }, + "waiting": { + "type": "integer" + } + }, + "required": [ + "active", + "completed", + "delayed", + "failed", + "paused", + "waiting" + ], + "type": "object" + }, "QueueStatusDto": { "properties": { "isActive": { @@ -17574,6 +17503,77 @@ ], "type": "object" }, + "QueuesResponseDto": { + "properties": { + "backgroundTask": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "backupDatabase": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "duplicateDetection": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "faceDetection": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "facialRecognition": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "library": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "metadataExtraction": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "migration": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "notifications": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "ocr": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "search": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "sidecar": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "smartSearch": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "storageTemplateMigration": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "thumbnailGeneration": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "videoConversion": { + "$ref": "#/components/schemas/QueueResponseDto" + } + }, + "required": [ + "backgroundTask", + "backupDatabase", + "duplicateDetection", + "faceDetection", + "facialRecognition", + "library", + "metadataExtraction", + "migration", + "notifications", + "ocr", + "search", + "sidecar", + "smartSearch", + "storageTemplateMigration", + "thumbnailGeneration", + "videoConversion" + ], + "type": "object" + }, "RandomSearchDto": { "properties": { "albumIds": { diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index 9aec8b6f8..00a6eea95 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -699,7 +699,7 @@ export type AssetFaceDeleteDto = { export type FaceDto = { id: string; }; -export type JobCountsDto = { +export type QueueStatisticsDto = { active: number; completed: number; delayed: number; @@ -711,33 +711,33 @@ export type QueueStatusDto = { isActive: boolean; isPaused: boolean; }; -export type JobStatusDto = { - jobCounts: JobCountsDto; +export type QueueResponseDto = { + jobCounts: QueueStatisticsDto; queueStatus: QueueStatusDto; }; -export type AllJobStatusResponseDto = { - backgroundTask: JobStatusDto; - backupDatabase: JobStatusDto; - duplicateDetection: JobStatusDto; - faceDetection: JobStatusDto; - facialRecognition: JobStatusDto; - library: JobStatusDto; - metadataExtraction: JobStatusDto; - migration: JobStatusDto; - notifications: JobStatusDto; - ocr: JobStatusDto; - search: JobStatusDto; - sidecar: JobStatusDto; - smartSearch: JobStatusDto; - storageTemplateMigration: JobStatusDto; - thumbnailGeneration: JobStatusDto; - videoConversion: JobStatusDto; +export type QueuesResponseDto = { + backgroundTask: QueueResponseDto; + backupDatabase: QueueResponseDto; + duplicateDetection: QueueResponseDto; + faceDetection: QueueResponseDto; + facialRecognition: QueueResponseDto; + library: QueueResponseDto; + metadataExtraction: QueueResponseDto; + migration: QueueResponseDto; + notifications: QueueResponseDto; + ocr: QueueResponseDto; + search: QueueResponseDto; + sidecar: QueueResponseDto; + smartSearch: QueueResponseDto; + storageTemplateMigration: QueueResponseDto; + thumbnailGeneration: QueueResponseDto; + videoConversion: QueueResponseDto; }; export type JobCreateDto = { name: ManualJobName; }; -export type JobCommandDto = { - command: JobCommand; +export type QueueCommandDto = { + command: QueueCommand; force?: boolean; }; export type LibraryResponseDto = { @@ -2805,10 +2805,10 @@ export function reassignFacesById({ id, faceDto }: { /** * Retrieve queue counts and status */ -export function getAllJobsStatus(opts?: Oazapfts.RequestOpts) { +export function getQueuesLegacy(opts?: Oazapfts.RequestOpts) { return oazapfts.ok(oazapfts.fetchJson<{ status: 200; - data: AllJobStatusResponseDto; + data: QueuesResponseDto; }>("/jobs", { ...opts })); @@ -2828,17 +2828,17 @@ export function createJob({ jobCreateDto }: { /** * Run jobs */ -export function sendJobCommand({ id, jobCommandDto }: { - id: JobName; - jobCommandDto: JobCommandDto; +export function runQueueCommandLegacy({ name, queueCommandDto }: { + name: QueueName; + queueCommandDto: QueueCommandDto; }, opts?: Oazapfts.RequestOpts) { return oazapfts.ok(oazapfts.fetchJson<{ status: 200; - data: JobStatusDto; - }>(`/jobs/${encodeURIComponent(id)}`, oazapfts.json({ + data: QueueResponseDto; + }>(`/jobs/${encodeURIComponent(name)}`, oazapfts.json({ ...opts, method: "PUT", - body: jobCommandDto + body: queueCommandDto }))); } /** @@ -5067,7 +5067,7 @@ export enum ManualJobName { MemoryCreate = "memory-create", BackupDatabase = "backup-database" } -export enum JobName { +export enum QueueName { ThumbnailGeneration = "thumbnailGeneration", MetadataExtraction = "metadataExtraction", VideoConversion = "videoConversion", @@ -5085,7 +5085,7 @@ export enum JobName { BackupDatabase = "backupDatabase", Ocr = "ocr" } -export enum JobCommand { +export enum QueueCommand { Start = "start", Pause = "pause", Resume = "resume", diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 807944132..f80a47bb7 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -23,7 +23,7 @@ import { WebsocketRepository } from 'src/repositories/websocket.repository'; import { services } from 'src/services'; import { AuthService } from 'src/services/auth.service'; import { CliService } from 'src/services/cli.service'; -import { JobService } from 'src/services/job.service'; +import { QueueService } from 'src/services/queue.service'; import { getKyselyConfig } from 'src/utils/database'; const common = [...repositories, ...services, GlobalExceptionFilter]; @@ -52,11 +52,11 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { constructor( @Inject(IWorker) private worker: ImmichWorker, logger: LoggingRepository, - private eventRepository: EventRepository, - private websocketRepository: WebsocketRepository, - private jobService: JobService, - private telemetryRepository: TelemetryRepository, private authService: AuthService, + private eventRepository: EventRepository, + private queueService: QueueService, + private telemetryRepository: TelemetryRepository, + private websocketRepository: WebsocketRepository, ) { logger.setAppName(this.worker); } @@ -64,7 +64,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { async onModuleInit() { this.telemetryRepository.setup({ repositories }); - this.jobService.setServices(services); + this.queueService.setServices(services); this.websocketRepository.setAuthFn(async (client) => this.authService.authenticate({ diff --git a/server/src/controllers/job.controller.ts b/server/src/controllers/job.controller.ts index 34c6bdc27..977f1e0f1 100644 --- a/server/src/controllers/job.controller.ts +++ b/server/src/controllers/job.controller.ts @@ -1,15 +1,20 @@ import { Body, Controller, Get, HttpCode, HttpStatus, Param, Post, Put } from '@nestjs/common'; import { ApiTags } from '@nestjs/swagger'; import { Endpoint, HistoryBuilder } from 'src/decorators'; -import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobIdParamDto, JobStatusDto } from 'src/dtos/job.dto'; +import { JobCreateDto } from 'src/dtos/job.dto'; +import { QueueCommandDto, QueueNameParamDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto'; import { ApiTag, Permission } from 'src/enum'; import { Authenticated } from 'src/middleware/auth.guard'; import { JobService } from 'src/services/job.service'; +import { QueueService } from 'src/services/queue.service'; @ApiTags(ApiTag.Jobs) @Controller('jobs') export class JobController { - constructor(private service: JobService) {} + constructor( + private service: JobService, + private queueService: QueueService, + ) {} @Get() @Authenticated({ permission: Permission.JobRead, admin: true }) @@ -18,8 +23,8 @@ export class JobController { description: 'Retrieve the counts of the current queue, as well as the current status.', history: new HistoryBuilder().added('v1').beta('v1').stable('v2'), }) - getAllJobsStatus(): Promise { - return this.service.getAllJobsStatus(); + getQueuesLegacy(): Promise { + return this.queueService.getAll(); } @Post() @@ -35,7 +40,7 @@ export class JobController { return this.service.create(dto); } - @Put(':id') + @Put(':name') @Authenticated({ permission: Permission.JobCreate, admin: true }) @Endpoint({ summary: 'Run jobs', @@ -43,7 +48,7 @@ export class JobController { 'Queue all assets for a specific job type. Defaults to only queueing assets that have not yet been processed, but the force command can be used to re-process all assets.', history: new HistoryBuilder().added('v1').beta('v1').stable('v2'), }) - sendJobCommand(@Param() { id }: JobIdParamDto, @Body() dto: JobCommandDto): Promise { - return this.service.handleCommand(id, dto); + runQueueCommandLegacy(@Param() { name }: QueueNameParamDto, @Body() dto: QueueCommandDto): Promise { + return this.queueService.runCommand(name, dto); } } diff --git a/server/src/dtos/job.dto.ts b/server/src/dtos/job.dto.ts index 5daaeacdd..794af6e5e 100644 --- a/server/src/dtos/job.dto.ts +++ b/server/src/dtos/job.dto.ts @@ -1,99 +1,7 @@ -import { ApiProperty } from '@nestjs/swagger'; -import { JobCommand, ManualJobName, QueueName } from 'src/enum'; -import { ValidateBoolean, ValidateEnum } from 'src/validation'; - -export class JobIdParamDto { - @ValidateEnum({ enum: QueueName, name: 'JobName' }) - id!: QueueName; -} - -export class JobCommandDto { - @ValidateEnum({ enum: JobCommand, name: 'JobCommand' }) - command!: JobCommand; - - @ValidateBoolean({ optional: true }) - force?: boolean; // TODO: this uses undefined as a third state, which should be refactored to be more explicit -} +import { ManualJobName } from 'src/enum'; +import { ValidateEnum } from 'src/validation'; export class JobCreateDto { @ValidateEnum({ enum: ManualJobName, name: 'ManualJobName' }) name!: ManualJobName; } - -export class JobCountsDto { - @ApiProperty({ type: 'integer' }) - active!: number; - @ApiProperty({ type: 'integer' }) - completed!: number; - @ApiProperty({ type: 'integer' }) - failed!: number; - @ApiProperty({ type: 'integer' }) - delayed!: number; - @ApiProperty({ type: 'integer' }) - waiting!: number; - @ApiProperty({ type: 'integer' }) - paused!: number; -} - -export class QueueStatusDto { - isActive!: boolean; - isPaused!: boolean; -} - -export class JobStatusDto { - @ApiProperty({ type: JobCountsDto }) - jobCounts!: JobCountsDto; - - @ApiProperty({ type: QueueStatusDto }) - queueStatus!: QueueStatusDto; -} - -export class AllJobStatusResponseDto implements Record { - @ApiProperty({ type: JobStatusDto }) - [QueueName.ThumbnailGeneration]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.MetadataExtraction]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.VideoConversion]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.SmartSearch]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.StorageTemplateMigration]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Migration]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.BackgroundTask]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Search]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.DuplicateDetection]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.FaceDetection]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.FacialRecognition]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Sidecar]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Library]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Notification]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.BackupDatabase]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Ocr]!: JobStatusDto; -} diff --git a/server/src/dtos/queue.dto.ts b/server/src/dtos/queue.dto.ts new file mode 100644 index 000000000..1492e014d --- /dev/null +++ b/server/src/dtos/queue.dto.ts @@ -0,0 +1,94 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { QueueCommand, QueueName } from 'src/enum'; +import { ValidateBoolean, ValidateEnum } from 'src/validation'; + +export class QueueNameParamDto { + @ValidateEnum({ enum: QueueName, name: 'QueueName' }) + name!: QueueName; +} + +export class QueueCommandDto { + @ValidateEnum({ enum: QueueCommand, name: 'QueueCommand' }) + command!: QueueCommand; + + @ValidateBoolean({ optional: true }) + force?: boolean; // TODO: this uses undefined as a third state, which should be refactored to be more explicit +} + +export class QueueStatisticsDto { + @ApiProperty({ type: 'integer' }) + active!: number; + @ApiProperty({ type: 'integer' }) + completed!: number; + @ApiProperty({ type: 'integer' }) + failed!: number; + @ApiProperty({ type: 'integer' }) + delayed!: number; + @ApiProperty({ type: 'integer' }) + waiting!: number; + @ApiProperty({ type: 'integer' }) + paused!: number; +} + +export class QueueStatusDto { + isActive!: boolean; + isPaused!: boolean; +} + +export class QueueResponseDto { + @ApiProperty({ type: QueueStatisticsDto }) + jobCounts!: QueueStatisticsDto; + + @ApiProperty({ type: QueueStatusDto }) + queueStatus!: QueueStatusDto; +} + +export class QueuesResponseDto implements Record { + @ApiProperty({ type: QueueResponseDto }) + [QueueName.ThumbnailGeneration]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.MetadataExtraction]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.VideoConversion]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.SmartSearch]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.StorageTemplateMigration]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Migration]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.BackgroundTask]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Search]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.DuplicateDetection]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.FaceDetection]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.FacialRecognition]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Sidecar]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Library]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Notification]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.BackupDatabase]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Ocr]!: QueueResponseDto; +} diff --git a/server/src/enum.ts b/server/src/enum.ts index c706c1da7..f3814863b 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -603,7 +603,7 @@ export enum JobName { Ocr = 'Ocr', } -export enum JobCommand { +export enum QueueCommand { Start = 'start', Pause = 'pause', Resume = 'resume', diff --git a/server/src/services/api.service.ts b/server/src/services/api.service.ts index ee9b0e622..0ec2a65f9 100644 --- a/server/src/services/api.service.ts +++ b/server/src/services/api.service.ts @@ -7,7 +7,6 @@ import { ONE_HOUR } from 'src/constants'; import { ConfigRepository } from 'src/repositories/config.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; import { AuthService } from 'src/services/auth.service'; -import { JobService } from 'src/services/job.service'; import { SharedLinkService } from 'src/services/shared-link.service'; import { VersionService } from 'src/services/version.service'; import { OpenGraphTags } from 'src/utils/misc'; @@ -40,7 +39,6 @@ const render = (index: string, meta: OpenGraphTags) => { export class ApiService { constructor( private authService: AuthService, - private jobService: JobService, private sharedLinkService: SharedLinkService, private versionService: VersionService, private configRepository: ConfigRepository, diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 9a8b0fb2b..8862a5b37 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -23,6 +23,7 @@ import { NotificationService } from 'src/services/notification.service'; import { OcrService } from 'src/services/ocr.service'; import { PartnerService } from 'src/services/partner.service'; import { PersonService } from 'src/services/person.service'; +import { QueueService } from 'src/services/queue.service'; import { SearchService } from 'src/services/search.service'; import { ServerService } from 'src/services/server.service'; import { SessionService } from 'src/services/session.service'; @@ -69,6 +70,7 @@ export const services = [ OcrService, PartnerService, PersonService, + QueueService, SearchService, ServerService, SessionService, diff --git a/server/src/services/job.service.spec.ts b/server/src/services/job.service.spec.ts index 7a300ae7a..c23b4f05d 100644 --- a/server/src/services/job.service.spec.ts +++ b/server/src/services/job.service.spec.ts @@ -1,6 +1,4 @@ -import { BadRequestException } from '@nestjs/common'; -import { defaults, SystemConfig } from 'src/config'; -import { ImmichWorker, JobCommand, JobName, JobStatus, QueueName } from 'src/enum'; +import { ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum'; import { JobService } from 'src/services/job.service'; import { JobItem } from 'src/types'; import { assetStub } from 'test/fixtures/asset.stub'; @@ -20,209 +18,6 @@ describe(JobService.name, () => { expect(sut).toBeDefined(); }); - describe('onConfigUpdate', () => { - it('should update concurrency', () => { - sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig }); - - expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(9, QueueName.StorageTemplateMigration, 1); - }); - }); - - describe('handleNightlyJobs', () => { - it('should run the scheduled jobs', async () => { - await sut.handleNightlyJobs(); - - expect(mocks.job.queueAll).toHaveBeenCalledWith([ - { name: JobName.AssetDeleteCheck }, - { name: JobName.UserDeleteCheck }, - { name: JobName.PersonCleanup }, - { name: JobName.MemoryCleanup }, - { name: JobName.SessionCleanup }, - { name: JobName.AuditTableCleanup }, - { name: JobName.AuditLogCleanup }, - { name: JobName.MemoryGenerate }, - { name: JobName.UserSyncUsage }, - { name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }, - { name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }, - ]); - }); - }); - - describe('getAllJobStatus', () => { - it('should get all job statuses', async () => { - mocks.job.getJobCounts.mockResolvedValue({ - active: 1, - completed: 1, - failed: 1, - delayed: 1, - waiting: 1, - paused: 1, - }); - mocks.job.getQueueStatus.mockResolvedValue({ - isActive: true, - isPaused: true, - }); - - const expectedJobStatus = { - jobCounts: { - active: 1, - completed: 1, - delayed: 1, - failed: 1, - waiting: 1, - paused: 1, - }, - queueStatus: { - isActive: true, - isPaused: true, - }, - }; - - await expect(sut.getAllJobsStatus()).resolves.toEqual({ - [QueueName.BackgroundTask]: expectedJobStatus, - [QueueName.DuplicateDetection]: expectedJobStatus, - [QueueName.SmartSearch]: expectedJobStatus, - [QueueName.MetadataExtraction]: expectedJobStatus, - [QueueName.Search]: expectedJobStatus, - [QueueName.StorageTemplateMigration]: expectedJobStatus, - [QueueName.Migration]: expectedJobStatus, - [QueueName.ThumbnailGeneration]: expectedJobStatus, - [QueueName.VideoConversion]: expectedJobStatus, - [QueueName.FaceDetection]: expectedJobStatus, - [QueueName.FacialRecognition]: expectedJobStatus, - [QueueName.Sidecar]: expectedJobStatus, - [QueueName.Library]: expectedJobStatus, - [QueueName.Notification]: expectedJobStatus, - [QueueName.BackupDatabase]: expectedJobStatus, - [QueueName.Ocr]: expectedJobStatus, - }); - }); - }); - - describe('handleCommand', () => { - it('should handle a pause command', async () => { - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Pause, force: false }); - - expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction); - }); - - it('should handle a resume command', async () => { - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Resume, force: false }); - - expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction); - }); - - it('should handle an empty command', async () => { - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Empty, force: false }); - - expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction); - }); - - it('should not start a job that is already running', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false }); - - await expect( - sut.handleCommand(QueueName.VideoConversion, { command: JobCommand.Start, force: false }), - ).rejects.toBeInstanceOf(BadRequestException); - - expect(mocks.job.queue).not.toHaveBeenCalled(); - expect(mocks.job.queueAll).not.toHaveBeenCalled(); - }); - - it('should handle a start video conversion command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.VideoConversion, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } }); - }); - - it('should handle a start storage template migration command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.StorageTemplateMigration, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration }); - }); - - it('should handle a start smart search command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.SmartSearch, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } }); - }); - - it('should handle a start metadata extraction command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ - name: JobName.AssetExtractMetadataQueueAll, - data: { force: false }, - }); - }); - - it('should handle a start sidecar command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.Sidecar, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } }); - }); - - it('should handle a start thumbnail generation command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.ThumbnailGeneration, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ - name: JobName.AssetGenerateThumbnailsQueueAll, - data: { force: false }, - }); - }); - - it('should handle a start face detection command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.FaceDetection, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } }); - }); - - it('should handle a start facial recognition command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.FacialRecognition, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } }); - }); - - it('should handle a start backup database command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.BackupDatabase, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } }); - }); - - it('should throw a bad request when an invalid queue is used', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await expect( - sut.handleCommand(QueueName.BackgroundTask, { command: JobCommand.Start, force: false }), - ).rejects.toBeInstanceOf(BadRequestException); - - expect(mocks.job.queue).not.toHaveBeenCalled(); - expect(mocks.job.queueAll).not.toHaveBeenCalled(); - }); - }); - describe('onJobRun', () => { it('should process a successful job', async () => { mocks.job.run.mockResolvedValue(JobStatus.Success); diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index c483155b7..b57a20378 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -1,28 +1,12 @@ import { BadRequestException, Injectable } from '@nestjs/common'; -import { ClassConstructor } from 'class-transformer'; -import { SystemConfig } from 'src/config'; import { OnEvent } from 'src/decorators'; import { mapAsset } from 'src/dtos/asset-response.dto'; -import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto'; -import { - AssetType, - AssetVisibility, - BootstrapEventPriority, - CronJob, - DatabaseLock, - ImmichWorker, - JobCommand, - JobName, - JobStatus, - ManualJobName, - QueueCleanType, - QueueName, -} from 'src/enum'; -import { ArgOf, ArgsOf } from 'src/repositories/event.repository'; +import { JobCreateDto } from 'src/dtos/job.dto'; +import { AssetType, AssetVisibility, JobName, JobStatus, ManualJobName } from 'src/enum'; +import { ArgsOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; -import { ConcurrentQueueName, JobItem } from 'src/types'; +import { JobItem } from 'src/types'; import { hexOrBufferToBase64 } from 'src/utils/bytes'; -import { handlePromiseError } from 'src/utils/misc'; const asJobItem = (dto: JobCreateDto): JobItem => { switch (dto.name) { @@ -56,196 +40,12 @@ const asJobItem = (dto: JobCreateDto): JobItem => { } }; -const asNightlyTasksCron = (config: SystemConfig) => { - const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number); - return `${minutes} ${hours} * * *`; -}; - @Injectable() export class JobService extends BaseService { - private services: ClassConstructor[] = []; - private nightlyJobsLock = false; - - @OnEvent({ name: 'ConfigInit' }) - async onConfigInit({ newConfig: config }: ArgOf<'ConfigInit'>) { - if (this.worker === ImmichWorker.Microservices) { - this.updateQueueConcurrency(config); - return; - } - - this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs); - if (this.nightlyJobsLock) { - const cronExpression = asNightlyTasksCron(config); - this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); - this.cronRepository.create({ - name: CronJob.NightlyJobs, - expression: cronExpression, - start: true, - onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger), - }); - } - } - - @OnEvent({ name: 'ConfigUpdate', server: true }) - onConfigUpdate({ newConfig: config }: ArgOf<'ConfigUpdate'>) { - if (this.worker === ImmichWorker.Microservices) { - this.updateQueueConcurrency(config); - return; - } - - if (this.nightlyJobsLock) { - const cronExpression = asNightlyTasksCron(config); - this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); - this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true }); - } - } - - @OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.JobService }) - onBootstrap() { - this.jobRepository.setup(this.services); - if (this.worker === ImmichWorker.Microservices) { - this.jobRepository.startWorkers(); - } - } - - private updateQueueConcurrency(config: SystemConfig) { - this.logger.debug(`Updating queue concurrency settings`); - for (const queueName of Object.values(QueueName)) { - let concurrency = 1; - if (this.isConcurrentQueue(queueName)) { - concurrency = config.job[queueName].concurrency; - } - this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`); - this.jobRepository.setConcurrency(queueName, concurrency); - } - } - - setServices(services: ClassConstructor[]) { - this.services = services; - } - async create(dto: JobCreateDto): Promise { await this.jobRepository.queue(asJobItem(dto)); } - async handleCommand(queueName: QueueName, dto: JobCommandDto): Promise { - this.logger.debug(`Handling command: queue=${queueName},command=${dto.command},force=${dto.force}`); - - switch (dto.command) { - case JobCommand.Start: { - await this.start(queueName, dto); - break; - } - - case JobCommand.Pause: { - await this.jobRepository.pause(queueName); - break; - } - - case JobCommand.Resume: { - await this.jobRepository.resume(queueName); - break; - } - - case JobCommand.Empty: { - await this.jobRepository.empty(queueName); - break; - } - - case JobCommand.ClearFailed: { - const failedJobs = await this.jobRepository.clear(queueName, QueueCleanType.Failed); - this.logger.debug(`Cleared failed jobs: ${failedJobs}`); - break; - } - } - - return this.getJobStatus(queueName); - } - - async getJobStatus(queueName: QueueName): Promise { - const [jobCounts, queueStatus] = await Promise.all([ - this.jobRepository.getJobCounts(queueName), - this.jobRepository.getQueueStatus(queueName), - ]); - - return { jobCounts, queueStatus }; - } - - async getAllJobsStatus(): Promise { - const response = new AllJobStatusResponseDto(); - for (const queueName of Object.values(QueueName)) { - response[queueName] = await this.getJobStatus(queueName); - } - return response; - } - - private async start(name: QueueName, { force }: JobCommandDto): Promise { - const { isActive } = await this.jobRepository.getQueueStatus(name); - if (isActive) { - throw new BadRequestException(`Job is already running`); - } - - await this.eventRepository.emit('QueueStart', { name }); - - switch (name) { - case QueueName.VideoConversion: { - return this.jobRepository.queue({ name: JobName.AssetEncodeVideoQueueAll, data: { force } }); - } - - case QueueName.StorageTemplateMigration: { - return this.jobRepository.queue({ name: JobName.StorageTemplateMigration }); - } - - case QueueName.Migration: { - return this.jobRepository.queue({ name: JobName.FileMigrationQueueAll }); - } - - case QueueName.SmartSearch: { - return this.jobRepository.queue({ name: JobName.SmartSearchQueueAll, data: { force } }); - } - - case QueueName.DuplicateDetection: { - return this.jobRepository.queue({ name: JobName.AssetDetectDuplicatesQueueAll, data: { force } }); - } - - case QueueName.MetadataExtraction: { - return this.jobRepository.queue({ name: JobName.AssetExtractMetadataQueueAll, data: { force } }); - } - - case QueueName.Sidecar: { - return this.jobRepository.queue({ name: JobName.SidecarQueueAll, data: { force } }); - } - - case QueueName.ThumbnailGeneration: { - return this.jobRepository.queue({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force } }); - } - - case QueueName.FaceDetection: { - return this.jobRepository.queue({ name: JobName.AssetDetectFacesQueueAll, data: { force } }); - } - - case QueueName.FacialRecognition: { - return this.jobRepository.queue({ name: JobName.FacialRecognitionQueueAll, data: { force } }); - } - - case QueueName.Library: { - return this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: { force } }); - } - - case QueueName.BackupDatabase: { - return this.jobRepository.queue({ name: JobName.DatabaseBackup, data: { force } }); - } - - case QueueName.Ocr: { - return this.jobRepository.queue({ name: JobName.OcrQueueAll, data: { force } }); - } - - default: { - throw new BadRequestException(`Invalid job name: ${name}`); - } - } - } - @OnEvent({ name: 'JobRun' }) async onJobRun(...[queueName, job]: ArgsOf<'JobRun'>) { try { @@ -262,50 +62,6 @@ export class JobService extends BaseService { } } - private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName { - return ![ - QueueName.FacialRecognition, - QueueName.StorageTemplateMigration, - QueueName.DuplicateDetection, - QueueName.BackupDatabase, - ].includes(name); - } - - async handleNightlyJobs() { - const config = await this.getConfig({ withCache: false }); - const jobs: JobItem[] = []; - - if (config.nightlyTasks.databaseCleanup) { - jobs.push( - { name: JobName.AssetDeleteCheck }, - { name: JobName.UserDeleteCheck }, - { name: JobName.PersonCleanup }, - { name: JobName.MemoryCleanup }, - { name: JobName.SessionCleanup }, - { name: JobName.AuditTableCleanup }, - { name: JobName.AuditLogCleanup }, - ); - } - - if (config.nightlyTasks.generateMemories) { - jobs.push({ name: JobName.MemoryGenerate }); - } - - if (config.nightlyTasks.syncQuotaUsage) { - jobs.push({ name: JobName.UserSyncUsage }); - } - - if (config.nightlyTasks.missingThumbnails) { - jobs.push({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }); - } - - if (config.nightlyTasks.clusterNewFaces) { - jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }); - } - - await this.jobRepository.queueAll(jobs); - } - /** * Queue follow up jobs */ diff --git a/server/src/services/queue.service.spec.ts b/server/src/services/queue.service.spec.ts new file mode 100644 index 000000000..1cc53df64 --- /dev/null +++ b/server/src/services/queue.service.spec.ts @@ -0,0 +1,223 @@ +import { BadRequestException } from '@nestjs/common'; +import { defaults, SystemConfig } from 'src/config'; +import { ImmichWorker, JobName, QueueCommand, QueueName } from 'src/enum'; +import { QueueService } from 'src/services/queue.service'; +import { newTestService, ServiceMocks } from 'test/utils'; + +describe(QueueService.name, () => { + let sut: QueueService; + let mocks: ServiceMocks; + + beforeEach(() => { + ({ sut, mocks } = newTestService(QueueService)); + + mocks.config.getWorker.mockReturnValue(ImmichWorker.Microservices); + }); + + it('should work', () => { + expect(sut).toBeDefined(); + }); + + describe('onConfigUpdate', () => { + it('should update concurrency', () => { + sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig }); + + expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(9, QueueName.StorageTemplateMigration, 1); + }); + }); + + describe('handleNightlyJobs', () => { + it('should run the scheduled jobs', async () => { + await sut.handleNightlyJobs(); + + expect(mocks.job.queueAll).toHaveBeenCalledWith([ + { name: JobName.AssetDeleteCheck }, + { name: JobName.UserDeleteCheck }, + { name: JobName.PersonCleanup }, + { name: JobName.MemoryCleanup }, + { name: JobName.SessionCleanup }, + { name: JobName.AuditTableCleanup }, + { name: JobName.AuditLogCleanup }, + { name: JobName.MemoryGenerate }, + { name: JobName.UserSyncUsage }, + { name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }, + { name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }, + ]); + }); + }); + + describe('getAllJobStatus', () => { + it('should get all job statuses', async () => { + mocks.job.getJobCounts.mockResolvedValue({ + active: 1, + completed: 1, + failed: 1, + delayed: 1, + waiting: 1, + paused: 1, + }); + mocks.job.getQueueStatus.mockResolvedValue({ + isActive: true, + isPaused: true, + }); + + const expectedJobStatus = { + jobCounts: { + active: 1, + completed: 1, + delayed: 1, + failed: 1, + waiting: 1, + paused: 1, + }, + queueStatus: { + isActive: true, + isPaused: true, + }, + }; + + await expect(sut.getAll()).resolves.toEqual({ + [QueueName.BackgroundTask]: expectedJobStatus, + [QueueName.DuplicateDetection]: expectedJobStatus, + [QueueName.SmartSearch]: expectedJobStatus, + [QueueName.MetadataExtraction]: expectedJobStatus, + [QueueName.Search]: expectedJobStatus, + [QueueName.StorageTemplateMigration]: expectedJobStatus, + [QueueName.Migration]: expectedJobStatus, + [QueueName.ThumbnailGeneration]: expectedJobStatus, + [QueueName.VideoConversion]: expectedJobStatus, + [QueueName.FaceDetection]: expectedJobStatus, + [QueueName.FacialRecognition]: expectedJobStatus, + [QueueName.Sidecar]: expectedJobStatus, + [QueueName.Library]: expectedJobStatus, + [QueueName.Notification]: expectedJobStatus, + [QueueName.BackupDatabase]: expectedJobStatus, + [QueueName.Ocr]: expectedJobStatus, + }); + }); + }); + + describe('handleCommand', () => { + it('should handle a pause command', async () => { + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false }); + + expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction); + }); + + it('should handle a resume command', async () => { + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false }); + + expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction); + }); + + it('should handle an empty command', async () => { + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false }); + + expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction); + }); + + it('should not start a job that is already running', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false }); + + await expect( + sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }), + ).rejects.toBeInstanceOf(BadRequestException); + + expect(mocks.job.queue).not.toHaveBeenCalled(); + expect(mocks.job.queueAll).not.toHaveBeenCalled(); + }); + + it('should handle a start video conversion command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } }); + }); + + it('should handle a start storage template migration command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration }); + }); + + it('should handle a start smart search command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.SmartSearch, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } }); + }); + + it('should handle a start metadata extraction command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ + name: JobName.AssetExtractMetadataQueueAll, + data: { force: false }, + }); + }); + + it('should handle a start sidecar command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.Sidecar, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } }); + }); + + it('should handle a start thumbnail generation command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ + name: JobName.AssetGenerateThumbnailsQueueAll, + data: { force: false }, + }); + }); + + it('should handle a start face detection command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.FaceDetection, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } }); + }); + + it('should handle a start facial recognition command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } }); + }); + + it('should handle a start backup database command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } }); + }); + + it('should throw a bad request when an invalid queue is used', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await expect( + sut.runCommand(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }), + ).rejects.toBeInstanceOf(BadRequestException); + + expect(mocks.job.queue).not.toHaveBeenCalled(); + expect(mocks.job.queueAll).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts new file mode 100644 index 000000000..bea665e8f --- /dev/null +++ b/server/src/services/queue.service.ts @@ -0,0 +1,250 @@ +import { BadRequestException, Injectable } from '@nestjs/common'; +import { ClassConstructor } from 'class-transformer'; +import { SystemConfig } from 'src/config'; +import { OnEvent } from 'src/decorators'; +import { QueueCommandDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto'; +import { + BootstrapEventPriority, + CronJob, + DatabaseLock, + ImmichWorker, + JobName, + QueueCleanType, + QueueCommand, + QueueName, +} from 'src/enum'; +import { ArgOf } from 'src/repositories/event.repository'; +import { BaseService } from 'src/services/base.service'; +import { ConcurrentQueueName, JobItem } from 'src/types'; +import { handlePromiseError } from 'src/utils/misc'; + +const asNightlyTasksCron = (config: SystemConfig) => { + const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number); + return `${minutes} ${hours} * * *`; +}; + +@Injectable() +export class QueueService extends BaseService { + private services: ClassConstructor[] = []; + private nightlyJobsLock = false; + + @OnEvent({ name: 'ConfigInit' }) + async onConfigInit({ newConfig: config }: ArgOf<'ConfigInit'>) { + if (this.worker === ImmichWorker.Microservices) { + this.updateConcurrency(config); + return; + } + + this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs); + if (this.nightlyJobsLock) { + const cronExpression = asNightlyTasksCron(config); + this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); + this.cronRepository.create({ + name: CronJob.NightlyJobs, + expression: cronExpression, + start: true, + onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger), + }); + } + } + + @OnEvent({ name: 'ConfigUpdate', server: true }) + onConfigUpdate({ newConfig: config }: ArgOf<'ConfigUpdate'>) { + if (this.worker === ImmichWorker.Microservices) { + this.updateConcurrency(config); + return; + } + + if (this.nightlyJobsLock) { + const cronExpression = asNightlyTasksCron(config); + this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); + this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true }); + } + } + + @OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.JobService }) + onBootstrap() { + this.jobRepository.setup(this.services); + if (this.worker === ImmichWorker.Microservices) { + this.jobRepository.startWorkers(); + } + } + + private updateConcurrency(config: SystemConfig) { + this.logger.debug(`Updating queue concurrency settings`); + for (const queueName of Object.values(QueueName)) { + let concurrency = 1; + if (this.isConcurrentQueue(queueName)) { + concurrency = config.job[queueName].concurrency; + } + this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`); + this.jobRepository.setConcurrency(queueName, concurrency); + } + } + + setServices(services: ClassConstructor[]) { + this.services = services; + } + + async runCommand(name: QueueName, dto: QueueCommandDto): Promise { + this.logger.debug(`Handling command: queue=${name},command=${dto.command},force=${dto.force}`); + + switch (dto.command) { + case QueueCommand.Start: { + await this.start(name, dto); + break; + } + + case QueueCommand.Pause: { + await this.jobRepository.pause(name); + break; + } + + case QueueCommand.Resume: { + await this.jobRepository.resume(name); + break; + } + + case QueueCommand.Empty: { + await this.jobRepository.empty(name); + break; + } + + case QueueCommand.ClearFailed: { + const failedJobs = await this.jobRepository.clear(name, QueueCleanType.Failed); + this.logger.debug(`Cleared failed jobs: ${failedJobs}`); + break; + } + } + + return this.getByName(name); + } + + async getAll(): Promise { + const response = new QueuesResponseDto(); + for (const name of Object.values(QueueName)) { + response[name] = await this.getByName(name); + } + return response; + } + + async getByName(name: QueueName): Promise { + const [jobCounts, queueStatus] = await Promise.all([ + this.jobRepository.getJobCounts(name), + this.jobRepository.getQueueStatus(name), + ]); + + return { jobCounts, queueStatus }; + } + + private async start(name: QueueName, { force }: QueueCommandDto): Promise { + const { isActive } = await this.jobRepository.getQueueStatus(name); + if (isActive) { + throw new BadRequestException(`Job is already running`); + } + + await this.eventRepository.emit('QueueStart', { name }); + + switch (name) { + case QueueName.VideoConversion: { + return this.jobRepository.queue({ name: JobName.AssetEncodeVideoQueueAll, data: { force } }); + } + + case QueueName.StorageTemplateMigration: { + return this.jobRepository.queue({ name: JobName.StorageTemplateMigration }); + } + + case QueueName.Migration: { + return this.jobRepository.queue({ name: JobName.FileMigrationQueueAll }); + } + + case QueueName.SmartSearch: { + return this.jobRepository.queue({ name: JobName.SmartSearchQueueAll, data: { force } }); + } + + case QueueName.DuplicateDetection: { + return this.jobRepository.queue({ name: JobName.AssetDetectDuplicatesQueueAll, data: { force } }); + } + + case QueueName.MetadataExtraction: { + return this.jobRepository.queue({ name: JobName.AssetExtractMetadataQueueAll, data: { force } }); + } + + case QueueName.Sidecar: { + return this.jobRepository.queue({ name: JobName.SidecarQueueAll, data: { force } }); + } + + case QueueName.ThumbnailGeneration: { + return this.jobRepository.queue({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force } }); + } + + case QueueName.FaceDetection: { + return this.jobRepository.queue({ name: JobName.AssetDetectFacesQueueAll, data: { force } }); + } + + case QueueName.FacialRecognition: { + return this.jobRepository.queue({ name: JobName.FacialRecognitionQueueAll, data: { force } }); + } + + case QueueName.Library: { + return this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: { force } }); + } + + case QueueName.BackupDatabase: { + return this.jobRepository.queue({ name: JobName.DatabaseBackup, data: { force } }); + } + + case QueueName.Ocr: { + return this.jobRepository.queue({ name: JobName.OcrQueueAll, data: { force } }); + } + + default: { + throw new BadRequestException(`Invalid job name: ${name}`); + } + } + } + + private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName { + return ![ + QueueName.FacialRecognition, + QueueName.StorageTemplateMigration, + QueueName.DuplicateDetection, + QueueName.BackupDatabase, + ].includes(name); + } + + async handleNightlyJobs() { + const config = await this.getConfig({ withCache: false }); + const jobs: JobItem[] = []; + + if (config.nightlyTasks.databaseCleanup) { + jobs.push( + { name: JobName.AssetDeleteCheck }, + { name: JobName.UserDeleteCheck }, + { name: JobName.PersonCleanup }, + { name: JobName.MemoryCleanup }, + { name: JobName.SessionCleanup }, + { name: JobName.AuditTableCleanup }, + { name: JobName.AuditLogCleanup }, + ); + } + + if (config.nightlyTasks.generateMemories) { + jobs.push({ name: JobName.MemoryGenerate }); + } + + if (config.nightlyTasks.syncQuotaUsage) { + jobs.push({ name: JobName.UserSyncUsage }); + } + + if (config.nightlyTasks.missingThumbnails) { + jobs.push({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }); + } + + if (config.nightlyTasks.clusterNewFaces) { + jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }); + } + + await this.jobRepository.queueAll(jobs); + } +} diff --git a/web/src/lib/components/admin-settings/JobSettings.svelte b/web/src/lib/components/admin-settings/JobSettings.svelte index fb8a11b33..94b4426db 100644 --- a/web/src/lib/components/admin-settings/JobSettings.svelte +++ b/web/src/lib/components/admin-settings/JobSettings.svelte @@ -4,8 +4,8 @@ import { SettingInputFieldType } from '$lib/constants'; import { featureFlagsManager } from '$lib/managers/feature-flags-manager.svelte'; import { systemConfigManager } from '$lib/managers/system-config-manager.svelte'; - import { getJobName } from '$lib/utils'; - import { JobName, type SystemConfigJobDto } from '@immich/sdk'; + import { getQueueName } from '$lib/utils'; + import { QueueName, type SystemConfigJobDto } from '@immich/sdk'; import { t } from 'svelte-i18n'; import { fade } from 'svelte/transition'; @@ -13,18 +13,18 @@ const config = $derived(systemConfigManager.value); let configToEdit = $state(systemConfigManager.cloneValue()); - const jobNames = [ - JobName.ThumbnailGeneration, - JobName.MetadataExtraction, - JobName.Library, - JobName.Sidecar, - JobName.SmartSearch, - JobName.FaceDetection, - JobName.FacialRecognition, - JobName.VideoConversion, - JobName.StorageTemplateMigration, - JobName.Migration, - JobName.Ocr, + const queueNames = [ + QueueName.ThumbnailGeneration, + QueueName.MetadataExtraction, + QueueName.Library, + QueueName.Sidecar, + QueueName.SmartSearch, + QueueName.FaceDetection, + QueueName.FacialRecognition, + QueueName.VideoConversion, + QueueName.StorageTemplateMigration, + QueueName.Migration, + QueueName.Ocr, ]; function isSystemConfigJobDto(jobName: string): jobName is keyof SystemConfigJobDto { @@ -35,22 +35,22 @@
event.preventDefault()}> - {#each jobNames as jobName (jobName)} + {#each queueNames as queueName (queueName)}
- {#if isSystemConfigJobDto(jobName)} + {#if isSystemConfigJobDto(queueName)} {:else} import Badge from '$lib/elements/Badge.svelte'; import { locale } from '$lib/stores/preferences.store'; - import { JobCommand, type JobCommandDto, type JobCountsDto, type QueueStatusDto } from '@immich/sdk'; + import { QueueCommand, type QueueCommandDto, type QueueStatisticsDto, type QueueStatusDto } from '@immich/sdk'; import { Icon, IconButton } from '@immich/ui'; import { mdiAlertCircle, @@ -22,21 +22,21 @@ title: string; subtitle: string | undefined; description: Component | undefined; - jobCounts: JobCountsDto; + statistics: QueueStatisticsDto; queueStatus: QueueStatusDto; icon: string; disabled?: boolean; allText: string | undefined; refreshText: string | undefined; missingText: string; - onCommand: (command: JobCommandDto) => void; + onCommand: (command: QueueCommandDto) => void; } let { title, subtitle, description, - jobCounts, + statistics, queueStatus, icon, disabled = false, @@ -46,7 +46,7 @@ onCommand, }: Props = $props(); - let waitingCount = $derived(jobCounts.waiting + jobCounts.paused + jobCounts.delayed); + let waitingCount = $derived(statistics.waiting + statistics.paused + statistics.delayed); let isIdle = $derived(!queueStatus.isActive && !queueStatus.isPaused); let multipleButtons = $derived(allText || refreshText); @@ -67,11 +67,11 @@ {title}
- {#if jobCounts.failed > 0} + {#if statistics.failed > 0}
- {$t('admin.jobs_failed', { values: { jobCount: jobCounts.failed.toLocaleString($locale) } })} + {$t('admin.jobs_failed', { values: { jobCount: statistics.failed.toLocaleString($locale) } })} onCommand({ command: JobCommand.ClearFailed, force: false })} + onclick={() => onCommand({ command: QueueCommand.ClearFailed, force: false })} />
{/if} - {#if jobCounts.delayed > 0} + {#if statistics.delayed > 0} - {$t('admin.jobs_delayed', { values: { jobCount: jobCounts.delayed.toLocaleString($locale) } })} + {$t('admin.jobs_delayed', { values: { jobCount: statistics.delayed.toLocaleString($locale) } })} {/if} @@ -111,7 +111,7 @@ >

{$t('active')}

- {jobCounts.active.toLocaleString($locale)} + {statistics.active.toLocaleString($locale)}

@@ -131,7 +131,7 @@ onCommand({ command: JobCommand.Start, force: false })} + onClick={() => onCommand({ command: QueueCommand.Start, force: false })} > {$t('disabled')} @@ -140,20 +140,20 @@ {#if !disabled && !isIdle} {#if waitingCount > 0} - onCommand({ command: JobCommand.Empty, force: false })}> + onCommand({ command: QueueCommand.Empty, force: false })}> {$t('clear')} {/if} {#if queueStatus.isPaused} {@const size = waitingCount > 0 ? '24' : '48'} - onCommand({ command: JobCommand.Resume, force: false })}> + onCommand({ command: QueueCommand.Resume, force: false })}> {$t('resume')} {:else} - onCommand({ command: JobCommand.Pause, force: false })}> + onCommand({ command: QueueCommand.Pause, force: false })}> {$t('pause')} @@ -162,25 +162,25 @@ {#if !disabled && multipleButtons && isIdle} {#if allText} - onCommand({ command: JobCommand.Start, force: true })}> + onCommand({ command: QueueCommand.Start, force: true })}> {allText} {/if} {#if refreshText} - onCommand({ command: JobCommand.Start, force: undefined })}> + onCommand({ command: QueueCommand.Start, force: undefined })}> {refreshText} {/if} - onCommand({ command: JobCommand.Start, force: false })}> + onCommand({ command: QueueCommand.Start, force: false })}> {missingText} {/if} {#if !disabled && !multipleButtons && isIdle} - onCommand({ command: JobCommand.Start, force: false })}> + onCommand({ command: QueueCommand.Start, force: false })}> {missingText} diff --git a/web/src/lib/components/jobs/JobsPanel.svelte b/web/src/lib/components/jobs/JobsPanel.svelte index e204c7664..99dfef3b5 100644 --- a/web/src/lib/components/jobs/JobsPanel.svelte +++ b/web/src/lib/components/jobs/JobsPanel.svelte @@ -1,8 +1,14 @@
{#each jobList as [jobName, { title, subtitle, description, disabled, allText, refreshText, missingText, icon, handleCommand: handleCommandOverride }] (jobName)} - {@const { jobCounts, queueStatus } = jobs[jobName]} + {@const { jobCounts: statistics, queueStatus } = jobs[jobName]} (handleCommandOverride || handleCommand)(jobName, command)} /> diff --git a/web/src/lib/utils.ts b/web/src/lib/utils.ts index 6e0a21647..87f0d7e7b 100644 --- a/web/src/lib/utils.ts +++ b/web/src/lib/utils.ts @@ -5,8 +5,8 @@ import { handleError } from '$lib/utils/handle-error'; import { AssetJobName, AssetMediaSize, - JobName, MemoryType, + QueueName, finishOAuth, getAssetOriginalPath, getAssetPlaybackPath, @@ -143,28 +143,28 @@ export const downloadRequest = (options: DownloadRequestOptions }); }; -export const getJobName = derived(t, ($t) => { - return (jobName: JobName) => { - const names: Record = { - [JobName.ThumbnailGeneration]: $t('admin.thumbnail_generation_job'), - [JobName.MetadataExtraction]: $t('admin.metadata_extraction_job'), - [JobName.Sidecar]: $t('admin.sidecar_job'), - [JobName.SmartSearch]: $t('admin.machine_learning_smart_search'), - [JobName.DuplicateDetection]: $t('admin.machine_learning_duplicate_detection'), - [JobName.FaceDetection]: $t('admin.face_detection'), - [JobName.FacialRecognition]: $t('admin.machine_learning_facial_recognition'), - [JobName.VideoConversion]: $t('admin.video_conversion_job'), - [JobName.StorageTemplateMigration]: $t('admin.storage_template_migration'), - [JobName.Migration]: $t('admin.migration_job'), - [JobName.BackgroundTask]: $t('admin.background_task_job'), - [JobName.Search]: $t('search'), - [JobName.Library]: $t('external_libraries'), - [JobName.Notifications]: $t('notifications'), - [JobName.BackupDatabase]: $t('admin.backup_database'), - [JobName.Ocr]: $t('admin.machine_learning_ocr'), +export const getQueueName = derived(t, ($t) => { + return (name: QueueName) => { + const names: Record = { + [QueueName.ThumbnailGeneration]: $t('admin.thumbnail_generation_job'), + [QueueName.MetadataExtraction]: $t('admin.metadata_extraction_job'), + [QueueName.Sidecar]: $t('admin.sidecar_job'), + [QueueName.SmartSearch]: $t('admin.machine_learning_smart_search'), + [QueueName.DuplicateDetection]: $t('admin.machine_learning_duplicate_detection'), + [QueueName.FaceDetection]: $t('admin.face_detection'), + [QueueName.FacialRecognition]: $t('admin.machine_learning_facial_recognition'), + [QueueName.VideoConversion]: $t('admin.video_conversion_job'), + [QueueName.StorageTemplateMigration]: $t('admin.storage_template_migration'), + [QueueName.Migration]: $t('admin.migration_job'), + [QueueName.BackgroundTask]: $t('admin.background_task_job'), + [QueueName.Search]: $t('search'), + [QueueName.Library]: $t('external_libraries'), + [QueueName.Notifications]: $t('notifications'), + [QueueName.BackupDatabase]: $t('admin.backup_database'), + [QueueName.Ocr]: $t('admin.machine_learning_ocr'), }; - return names[jobName]; + return names[name]; }; }); diff --git a/web/src/routes/admin/jobs-status/+page.svelte b/web/src/routes/admin/jobs-status/+page.svelte index 1204ff901..84586a8af 100644 --- a/web/src/routes/admin/jobs-status/+page.svelte +++ b/web/src/routes/admin/jobs-status/+page.svelte @@ -5,13 +5,7 @@ import JobCreateModal from '$lib/modals/JobCreateModal.svelte'; import { asyncTimeout } from '$lib/utils'; import { handleError } from '$lib/utils/handle-error'; - import { - getAllJobsStatus, - JobCommand, - sendJobCommand, - type AllJobStatusResponseDto, - type JobName, - } from '@immich/sdk'; + import { getQueuesLegacy, QueueCommand, QueueName, runQueueCommandLegacy, type QueuesResponseDto } from '@immich/sdk'; import { Button, HStack, modalManager, Text } from '@immich/ui'; import { mdiCog, mdiPlay, mdiPlus } from '@mdi/js'; import { onDestroy, onMount } from 'svelte'; @@ -24,23 +18,23 @@ let { data }: Props = $props(); - let jobs: AllJobStatusResponseDto | undefined = $state(); + let jobs: QueuesResponseDto | undefined = $state(); let running = true; const pausedJobs = $derived( Object.entries(jobs ?? {}) - .filter(([_, jobStatus]) => jobStatus.queueStatus?.isPaused) - .map(([jobName]) => jobName as JobName), + .filter(([_, queue]) => queue.queueStatus?.isPaused) + .map(([name]) => name as QueueName), ); const handleResumePausedJobs = async () => { try { - for (const jobName of pausedJobs) { - await sendJobCommand({ id: jobName, jobCommandDto: { command: JobCommand.Resume, force: false } }); + for (const name of pausedJobs) { + await runQueueCommandLegacy({ name, queueCommandDto: { command: QueueCommand.Resume, force: false } }); } // Refresh jobs status immediately after resuming - jobs = await getAllJobsStatus(); + jobs = await getQueuesLegacy(); } catch (error) { handleError(error, $t('admin.failed_job_command', { values: { command: 'resume', job: 'paused jobs' } })); } @@ -48,7 +42,7 @@ onMount(async () => { while (running) { - jobs = await getAllJobsStatus(); + jobs = await getQueuesLegacy(); await asyncTimeout(5000); } }); diff --git a/web/src/routes/admin/jobs-status/+page.ts b/web/src/routes/admin/jobs-status/+page.ts index 0d4ec8b41..90057ff96 100644 --- a/web/src/routes/admin/jobs-status/+page.ts +++ b/web/src/routes/admin/jobs-status/+page.ts @@ -1,12 +1,12 @@ import { authenticate } from '$lib/utils/auth'; import { getFormatter } from '$lib/utils/i18n'; -import { getAllJobsStatus } from '@immich/sdk'; +import { getQueuesLegacy } from '@immich/sdk'; import type { PageLoad } from './$types'; export const load = (async ({ url }) => { await authenticate(url, { admin: true }); - const jobs = await getAllJobsStatus(); + const jobs = await getQueuesLegacy(); const $t = await getFormatter(); return { diff --git a/web/src/routes/admin/library-management/+page.svelte b/web/src/routes/admin/library-management/+page.svelte index 039afc97b..600b6ff04 100644 --- a/web/src/routes/admin/library-management/+page.svelte +++ b/web/src/routes/admin/library-management/+page.svelte @@ -17,10 +17,10 @@ getAllLibraries, getLibraryStatistics, getUserAdmin, - JobCommand, - JobName, + QueueCommand, + QueueName, + runQueueCommandLegacy, scanLibrary, - sendJobCommand, updateLibrary, type LibraryResponseDto, type LibraryStatsResponseDto, @@ -151,7 +151,7 @@ const handleScanAll = async () => { try { - await sendJobCommand({ id: JobName.Library, jobCommandDto: { command: JobCommand.Start } }); + await runQueueCommandLegacy({ name: QueueName.Library, queueCommandDto: { command: QueueCommand.Start } }); toastManager.info($t('admin.refreshing_all_libraries')); } catch (error) {