diff --git a/e2e/src/utils.ts b/e2e/src/utils.ts index f045ea2ef..15bb112cd 100644 --- a/e2e/src/utils.ts +++ b/e2e/src/utils.ts @@ -12,7 +12,7 @@ import { PersonCreateDto, QueueCommandDto, QueueName, - QueuesResponseDto, + QueuesResponseLegacyDto, SharedLinkCreateDto, UpdateLibraryDto, UserAdminCreateDto, @@ -564,13 +564,13 @@ export const utils = { await updateConfig({ systemConfigDto: defaultConfig }, { headers: asBearerAuth(accessToken) }); }, - isQueueEmpty: async (accessToken: string, queue: keyof QueuesResponseDto) => { + isQueueEmpty: async (accessToken: string, queue: keyof QueuesResponseLegacyDto) => { const queues = await getQueuesLegacy({ headers: asBearerAuth(accessToken) }); const jobCounts = queues[queue].jobCounts; return !jobCounts.active && !jobCounts.waiting; }, - waitForQueueFinish: (accessToken: string, queue: keyof QueuesResponseDto, ms?: number) => { + waitForQueueFinish: (accessToken: string, queue: keyof QueuesResponseLegacyDto, ms?: number) => { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { const timeout = setTimeout(() => reject(new Error('Timed out waiting for queue to empty')), ms || 10_000); diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index 52fbbc8e7..268c4849c 100644 Binary files a/mobile/openapi/README.md and b/mobile/openapi/README.md differ diff --git a/mobile/openapi/lib/api.dart b/mobile/openapi/lib/api.dart index a47d9ddf9..21730074a 100644 Binary files a/mobile/openapi/lib/api.dart and b/mobile/openapi/lib/api.dart differ diff --git a/mobile/openapi/lib/api/deprecated_api.dart b/mobile/openapi/lib/api/deprecated_api.dart index aaf7c074b..d0d92d804 100644 Binary files a/mobile/openapi/lib/api/deprecated_api.dart and b/mobile/openapi/lib/api/deprecated_api.dart differ diff --git a/mobile/openapi/lib/api/jobs_api.dart b/mobile/openapi/lib/api/jobs_api.dart index 906dce6d3..9dda59a88 100644 Binary files a/mobile/openapi/lib/api/jobs_api.dart and b/mobile/openapi/lib/api/jobs_api.dart differ diff --git a/mobile/openapi/lib/api/queues_api.dart b/mobile/openapi/lib/api/queues_api.dart new file mode 100644 index 000000000..50575ed70 Binary files /dev/null and b/mobile/openapi/lib/api/queues_api.dart differ diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index c0dcf542e..041be6701 100644 Binary files a/mobile/openapi/lib/api_client.dart and b/mobile/openapi/lib/api_client.dart differ diff --git a/mobile/openapi/lib/api_helper.dart b/mobile/openapi/lib/api_helper.dart index e6d39d5eb..2c97eeb31 100644 Binary files a/mobile/openapi/lib/api_helper.dart and b/mobile/openapi/lib/api_helper.dart differ diff --git a/mobile/openapi/lib/model/job_name.dart b/mobile/openapi/lib/model/job_name.dart new file mode 100644 index 000000000..038a17a8e Binary files /dev/null and b/mobile/openapi/lib/model/job_name.dart differ diff --git a/mobile/openapi/lib/model/permission.dart b/mobile/openapi/lib/model/permission.dart index 0a2f0d179..3b9a3964b 100644 Binary files a/mobile/openapi/lib/model/permission.dart and b/mobile/openapi/lib/model/permission.dart differ diff --git a/mobile/openapi/lib/model/queue_delete_dto.dart b/mobile/openapi/lib/model/queue_delete_dto.dart new file mode 100644 index 000000000..d319238f9 Binary files /dev/null and b/mobile/openapi/lib/model/queue_delete_dto.dart differ diff --git a/mobile/openapi/lib/model/queue_job_response_dto.dart b/mobile/openapi/lib/model/queue_job_response_dto.dart new file mode 100644 index 000000000..1bfaa5619 Binary files /dev/null and b/mobile/openapi/lib/model/queue_job_response_dto.dart differ diff --git a/mobile/openapi/lib/model/queue_job_status.dart b/mobile/openapi/lib/model/queue_job_status.dart new file mode 100644 index 000000000..03a1371cc Binary files /dev/null and b/mobile/openapi/lib/model/queue_job_status.dart differ diff --git a/mobile/openapi/lib/model/queue_response_dto.dart b/mobile/openapi/lib/model/queue_response_dto.dart index b20449f72..c5d4ed8e3 100644 Binary files a/mobile/openapi/lib/model/queue_response_dto.dart and b/mobile/openapi/lib/model/queue_response_dto.dart differ diff --git a/mobile/openapi/lib/model/queue_response_legacy_dto.dart b/mobile/openapi/lib/model/queue_response_legacy_dto.dart new file mode 100644 index 000000000..214b0b31f Binary files /dev/null and b/mobile/openapi/lib/model/queue_response_legacy_dto.dart differ diff --git a/mobile/openapi/lib/model/queue_status_dto.dart b/mobile/openapi/lib/model/queue_status_legacy_dto.dart similarity index 63% rename from mobile/openapi/lib/model/queue_status_dto.dart rename to mobile/openapi/lib/model/queue_status_legacy_dto.dart index 77591affe..88c4eac34 100644 Binary files a/mobile/openapi/lib/model/queue_status_dto.dart and b/mobile/openapi/lib/model/queue_status_legacy_dto.dart differ diff --git a/mobile/openapi/lib/model/queue_update_dto.dart b/mobile/openapi/lib/model/queue_update_dto.dart new file mode 100644 index 000000000..ce89e5187 Binary files /dev/null and b/mobile/openapi/lib/model/queue_update_dto.dart differ diff --git a/mobile/openapi/lib/model/queues_response_dto.dart b/mobile/openapi/lib/model/queues_response_legacy_dto.dart similarity index 55% rename from mobile/openapi/lib/model/queues_response_dto.dart rename to mobile/openapi/lib/model/queues_response_legacy_dto.dart index be40a56fb..4aab6d863 100644 Binary files a/mobile/openapi/lib/model/queues_response_dto.dart and b/mobile/openapi/lib/model/queues_response_legacy_dto.dart differ diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index ffaad8590..68af1438c 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -4929,6 +4929,7 @@ }, "/jobs": { "get": { + "deprecated": true, "description": "Retrieve the counts of the current queue, as well as the current status.", "operationId": "getQueuesLegacy", "parameters": [], @@ -4937,7 +4938,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/QueuesResponseDto" + "$ref": "#/components/schemas/QueuesResponseLegacyDto" } } }, @@ -4957,7 +4958,8 @@ ], "summary": "Retrieve queue counts and status", "tags": [ - "Jobs" + "Jobs", + "Deprecated" ], "x-immich-admin-only": true, "x-immich-history": [ @@ -4972,10 +4974,14 @@ { "version": "v2", "state": "Stable" + }, + { + "version": "v2.4.0", + "state": "Deprecated" } ], "x-immich-permission": "job.read", - "x-immich-state": "Stable" + "x-immich-state": "Deprecated" }, "post": { "description": "Run a specific job. Most jobs are queued automatically, but this endpoint allows for manual creation of a handful of jobs, including various cleanup tasks, as well as creating a new database backup.", @@ -5032,6 +5038,7 @@ }, "/jobs/{name}": { "put": { + "deprecated": true, "description": "Queue all assets for a specific job type. Defaults to only queueing assets that have not yet been processed, but the force command can be used to re-process all assets.", "operationId": "runQueueCommandLegacy", "parameters": [ @@ -5059,7 +5066,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" } } }, @@ -5079,7 +5086,8 @@ ], "summary": "Run jobs", "tags": [ - "Jobs" + "Jobs", + "Deprecated" ], "x-immich-admin-only": true, "x-immich-history": [ @@ -5094,10 +5102,14 @@ { "version": "v2", "state": "Stable" + }, + { + "version": "v2.4.0", + "state": "Deprecated" } ], "x-immich-permission": "job.create", - "x-immich-state": "Stable" + "x-immich-state": "Deprecated" } }, "/libraries": { @@ -8064,6 +8076,303 @@ "x-immich-state": "Alpha" } }, + "/queues": { + "get": { + "description": "Retrieves a list of queues.", + "operationId": "getQueues", + "parameters": [], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "items": { + "$ref": "#/components/schemas/QueueResponseDto" + }, + "type": "array" + } + } + }, + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "summary": "List all queues", + "tags": [ + "Queues" + ], + "x-immich-admin-only": true, + "x-immich-history": [ + { + "version": "v2.4.0", + "state": "Added" + }, + { + "version": "v2.4.0", + "state": "Alpha" + } + ], + "x-immich-permission": "queue.read", + "x-immich-state": "Alpha" + } + }, + "/queues/{name}": { + "get": { + "description": "Retrieves a specific queue by its name.", + "operationId": "getQueue", + "parameters": [ + { + "name": "name", + "required": true, + "in": "path", + "schema": { + "$ref": "#/components/schemas/QueueName" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/QueueResponseDto" + } + } + }, + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "summary": "Retrieve a queue", + "tags": [ + "Queues" + ], + "x-immich-admin-only": true, + "x-immich-history": [ + { + "version": "v2.4.0", + "state": "Added" + }, + { + "version": "v2.4.0", + "state": "Alpha" + } + ], + "x-immich-permission": "queue.read", + "x-immich-state": "Alpha" + }, + "put": { + "description": "Change the paused status of a specific queue.", + "operationId": "updateQueue", + "parameters": [ + { + "name": "name", + "required": true, + "in": "path", + "schema": { + "$ref": "#/components/schemas/QueueName" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/QueueUpdateDto" + } + } + }, + "required": true + }, + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/QueueResponseDto" + } + } + }, + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "summary": "Update a queue", + "tags": [ + "Queues" + ], + "x-immich-admin-only": true, + "x-immich-history": [ + { + "version": "v2.4.0", + "state": "Added" + }, + { + "version": "v2.4.0", + "state": "Alpha" + } + ], + "x-immich-permission": "queue.update", + "x-immich-state": "Alpha" + } + }, + "/queues/{name}/jobs": { + "delete": { + "description": "Removes all jobs from the specified queue.", + "operationId": "emptyQueue", + "parameters": [ + { + "name": "name", + "required": true, + "in": "path", + "schema": { + "$ref": "#/components/schemas/QueueName" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/QueueDeleteDto" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "summary": "Empty a queue", + "tags": [ + "Queues" + ], + "x-immich-admin-only": true, + "x-immich-history": [ + { + "version": "v2.4.0", + "state": "Added" + }, + { + "version": "v2.4.0", + "state": "Alpha" + } + ], + "x-immich-permission": "queueJob.delete", + "x-immich-state": "Alpha" + }, + "get": { + "description": "Retrieves a list of queue jobs from the specified queue.", + "operationId": "getQueueJobs", + "parameters": [ + { + "name": "name", + "required": true, + "in": "path", + "schema": { + "$ref": "#/components/schemas/QueueName" + } + }, + { + "name": "status", + "required": false, + "in": "query", + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/QueueJobStatus" + } + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "items": { + "$ref": "#/components/schemas/QueueJobResponseDto" + }, + "type": "array" + } + } + }, + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "summary": "Retrieve queue jobs", + "tags": [ + "Queues" + ], + "x-immich-admin-only": true, + "x-immich-history": [ + { + "version": "v2.4.0", + "state": "Added" + }, + { + "version": "v2.4.0", + "state": "Alpha" + } + ], + "x-immich-permission": "queueJob.read", + "x-immich-state": "Alpha" + } + }, "/search/cities": { "get": { "description": "Retrieve a list of assets with each asset belonging to a different city. This endpoint is used on the places pages to show a single thumbnail for each city the user has assets in.", @@ -14043,6 +14352,10 @@ "name": "Plugins", "description": "A plugin is an installed module that makes filters and actions available for the workflow feature." }, + { + "name": "Queues", + "description": "Queues and background jobs are used for processing tasks asynchronously. Queues can be paused and resumed as needed." + }, { "name": "Search", "description": "Endpoints related to searching assets via text, smart search, optical character recognition (OCR), and other filters like person, album, and other metadata. Search endpoints usually support pagination and sorting." @@ -16291,6 +16604,66 @@ ], "type": "object" }, + "JobName": { + "enum": [ + "AssetDelete", + "AssetDeleteCheck", + "AssetDetectFacesQueueAll", + "AssetDetectFaces", + "AssetDetectDuplicatesQueueAll", + "AssetDetectDuplicates", + "AssetEncodeVideoQueueAll", + "AssetEncodeVideo", + "AssetEmptyTrash", + "AssetExtractMetadataQueueAll", + "AssetExtractMetadata", + "AssetFileMigration", + "AssetGenerateThumbnailsQueueAll", + "AssetGenerateThumbnails", + "AuditLogCleanup", + "AuditTableCleanup", + "DatabaseBackup", + "FacialRecognitionQueueAll", + "FacialRecognition", + "FileDelete", + "FileMigrationQueueAll", + "LibraryDeleteCheck", + "LibraryDelete", + "LibraryRemoveAsset", + "LibraryScanAssetsQueueAll", + "LibrarySyncAssets", + "LibrarySyncFilesQueueAll", + "LibrarySyncFiles", + "LibraryScanQueueAll", + "MemoryCleanup", + "MemoryGenerate", + "NotificationsCleanup", + "NotifyUserSignup", + "NotifyAlbumInvite", + "NotifyAlbumUpdate", + "UserDelete", + "UserDeleteCheck", + "UserSyncUsage", + "PersonCleanup", + "PersonFileMigration", + "PersonGenerateThumbnail", + "SessionCleanup", + "SendMail", + "SidecarQueueAll", + "SidecarCheck", + "SidecarWrite", + "SmartSearchQueueAll", + "SmartSearch", + "StorageTemplateMigration", + "StorageTemplateMigrationSingle", + "TagCleanup", + "VersionCheck", + "OcrQueueAll", + "Ocr", + "WorkflowRun" + ], + "type": "string" + }, "JobSettingsDto": { "properties": { "concurrency": { @@ -17583,6 +17956,12 @@ "userProfileImage.read", "userProfileImage.update", "userProfileImage.delete", + "queue.read", + "queue.update", + "queueJob.create", + "queueJob.read", + "queueJob.update", + "queueJob.delete", "workflow.create", "workflow.read", "workflow.update", @@ -18083,6 +18462,63 @@ ], "type": "object" }, + "QueueDeleteDto": { + "properties": { + "failed": { + "description": "If true, will also remove failed jobs from the queue.", + "type": "boolean", + "x-immich-history": [ + { + "version": "v2.4.0", + "state": "Added" + }, + { + "version": "v2.4.0", + "state": "Alpha" + } + ], + "x-immich-state": "Alpha" + } + }, + "type": "object" + }, + "QueueJobResponseDto": { + "properties": { + "data": { + "type": "object" + }, + "id": { + "type": "string" + }, + "name": { + "allOf": [ + { + "$ref": "#/components/schemas/JobName" + } + ] + }, + "timestamp": { + "type": "integer" + } + }, + "required": [ + "data", + "name", + "timestamp" + ], + "type": "object" + }, + "QueueJobStatus": { + "enum": [ + "active", + "failed", + "completed", + "delayed", + "waiting", + "paused" + ], + "type": "string" + }, "QueueName": { "enum": [ "thumbnailGeneration", @@ -18106,12 +18542,35 @@ "type": "string" }, "QueueResponseDto": { + "properties": { + "isPaused": { + "type": "boolean" + }, + "name": { + "allOf": [ + { + "$ref": "#/components/schemas/QueueName" + } + ] + }, + "statistics": { + "$ref": "#/components/schemas/QueueStatisticsDto" + } + }, + "required": [ + "isPaused", + "name", + "statistics" + ], + "type": "object" + }, + "QueueResponseLegacyDto": { "properties": { "jobCounts": { "$ref": "#/components/schemas/QueueStatisticsDto" }, "queueStatus": { - "$ref": "#/components/schemas/QueueStatusDto" + "$ref": "#/components/schemas/QueueStatusLegacyDto" } }, "required": [ @@ -18151,7 +18610,7 @@ ], "type": "object" }, - "QueueStatusDto": { + "QueueStatusLegacyDto": { "properties": { "isActive": { "type": "boolean" @@ -18166,58 +18625,66 @@ ], "type": "object" }, - "QueuesResponseDto": { + "QueueUpdateDto": { + "properties": { + "isPaused": { + "type": "boolean" + } + }, + "type": "object" + }, + "QueuesResponseLegacyDto": { "properties": { "backgroundTask": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "backupDatabase": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "duplicateDetection": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "faceDetection": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "facialRecognition": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "library": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "metadataExtraction": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "migration": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "notifications": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "ocr": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "search": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "sidecar": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "smartSearch": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "storageTemplateMigration": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "thumbnailGeneration": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "videoConversion": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" }, "workflow": { - "$ref": "#/components/schemas/QueueResponseDto" + "$ref": "#/components/schemas/QueueResponseLegacyDto" } }, "required": [ diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index 0de0e7696..bbcc2311b 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -716,32 +716,32 @@ export type QueueStatisticsDto = { paused: number; waiting: number; }; -export type QueueStatusDto = { +export type QueueStatusLegacyDto = { isActive: boolean; isPaused: boolean; }; -export type QueueResponseDto = { +export type QueueResponseLegacyDto = { jobCounts: QueueStatisticsDto; - queueStatus: QueueStatusDto; + queueStatus: QueueStatusLegacyDto; }; -export type QueuesResponseDto = { - backgroundTask: QueueResponseDto; - backupDatabase: QueueResponseDto; - duplicateDetection: QueueResponseDto; - faceDetection: QueueResponseDto; - facialRecognition: QueueResponseDto; - library: QueueResponseDto; - metadataExtraction: QueueResponseDto; - migration: QueueResponseDto; - notifications: QueueResponseDto; - ocr: QueueResponseDto; - search: QueueResponseDto; - sidecar: QueueResponseDto; - smartSearch: QueueResponseDto; - storageTemplateMigration: QueueResponseDto; - thumbnailGeneration: QueueResponseDto; - videoConversion: QueueResponseDto; - workflow: QueueResponseDto; +export type QueuesResponseLegacyDto = { + backgroundTask: QueueResponseLegacyDto; + backupDatabase: QueueResponseLegacyDto; + duplicateDetection: QueueResponseLegacyDto; + faceDetection: QueueResponseLegacyDto; + facialRecognition: QueueResponseLegacyDto; + library: QueueResponseLegacyDto; + metadataExtraction: QueueResponseLegacyDto; + migration: QueueResponseLegacyDto; + notifications: QueueResponseLegacyDto; + ocr: QueueResponseLegacyDto; + search: QueueResponseLegacyDto; + sidecar: QueueResponseLegacyDto; + smartSearch: QueueResponseLegacyDto; + storageTemplateMigration: QueueResponseLegacyDto; + thumbnailGeneration: QueueResponseLegacyDto; + videoConversion: QueueResponseLegacyDto; + workflow: QueueResponseLegacyDto; }; export type JobCreateDto = { name: ManualJobName; @@ -966,6 +966,24 @@ export type PluginResponseDto = { updatedAt: string; version: string; }; +export type QueueResponseDto = { + isPaused: boolean; + name: QueueName; + statistics: QueueStatisticsDto; +}; +export type QueueUpdateDto = { + isPaused?: boolean; +}; +export type QueueDeleteDto = { + /** If true, will also remove failed jobs from the queue. */ + failed?: boolean; +}; +export type QueueJobResponseDto = { + data: object; + id?: string; + name: JobName; + timestamp: number; +}; export type SearchExploreItem = { data: AssetResponseDto; value: string; @@ -2925,7 +2943,7 @@ export function reassignFacesById({ id, faceDto }: { export function getQueuesLegacy(opts?: Oazapfts.RequestOpts) { return oazapfts.ok(oazapfts.fetchJson<{ status: 200; - data: QueuesResponseDto; + data: QueuesResponseLegacyDto; }>("/jobs", { ...opts })); @@ -2951,7 +2969,7 @@ export function runQueueCommandLegacy({ name, queueCommandDto }: { }, opts?: Oazapfts.RequestOpts) { return oazapfts.ok(oazapfts.fetchJson<{ status: 200; - data: QueueResponseDto; + data: QueueResponseLegacyDto; }>(`/jobs/${encodeURIComponent(name)}`, oazapfts.json({ ...opts, method: "PUT", @@ -3651,6 +3669,75 @@ export function getPlugin({ id }: { ...opts })); } +/** + * List all queues + */ +export function getQueues(opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchJson<{ + status: 200; + data: QueueResponseDto[]; + }>("/queues", { + ...opts + })); +} +/** + * Retrieve a queue + */ +export function getQueue({ name }: { + name: QueueName; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchJson<{ + status: 200; + data: QueueResponseDto; + }>(`/queues/${encodeURIComponent(name)}`, { + ...opts + })); +} +/** + * Update a queue + */ +export function updateQueue({ name, queueUpdateDto }: { + name: QueueName; + queueUpdateDto: QueueUpdateDto; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchJson<{ + status: 200; + data: QueueResponseDto; + }>(`/queues/${encodeURIComponent(name)}`, oazapfts.json({ + ...opts, + method: "PUT", + body: queueUpdateDto + }))); +} +/** + * Empty a queue + */ +export function emptyQueue({ name, queueDeleteDto }: { + name: QueueName; + queueDeleteDto: QueueDeleteDto; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchText(`/queues/${encodeURIComponent(name)}/jobs`, oazapfts.json({ + ...opts, + method: "DELETE", + body: queueDeleteDto + }))); +} +/** + * Retrieve queue jobs + */ +export function getQueueJobs({ name, status }: { + name: QueueName; + status?: QueueJobStatus[]; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchJson<{ + status: 200; + data: QueueJobResponseDto[]; + }>(`/queues/${encodeURIComponent(name)}/jobs${QS.query(QS.explode({ + status + }))}`, { + ...opts + })); +} /** * Retrieve assets by city */ @@ -5241,6 +5328,12 @@ export enum Permission { UserProfileImageRead = "userProfileImage.read", UserProfileImageUpdate = "userProfileImage.update", UserProfileImageDelete = "userProfileImage.delete", + QueueRead = "queue.read", + QueueUpdate = "queue.update", + QueueJobCreate = "queueJob.create", + QueueJobRead = "queueJob.read", + QueueJobUpdate = "queueJob.update", + QueueJobDelete = "queueJob.delete", WorkflowCreate = "workflow.create", WorkflowRead = "workflow.read", WorkflowUpdate = "workflow.update", @@ -5330,6 +5423,71 @@ export enum PluginContext { Album = "album", Person = "person" } +export enum QueueJobStatus { + Active = "active", + Failed = "failed", + Completed = "completed", + Delayed = "delayed", + Waiting = "waiting", + Paused = "paused" +} +export enum JobName { + AssetDelete = "AssetDelete", + AssetDeleteCheck = "AssetDeleteCheck", + AssetDetectFacesQueueAll = "AssetDetectFacesQueueAll", + AssetDetectFaces = "AssetDetectFaces", + AssetDetectDuplicatesQueueAll = "AssetDetectDuplicatesQueueAll", + AssetDetectDuplicates = "AssetDetectDuplicates", + AssetEncodeVideoQueueAll = "AssetEncodeVideoQueueAll", + AssetEncodeVideo = "AssetEncodeVideo", + AssetEmptyTrash = "AssetEmptyTrash", + AssetExtractMetadataQueueAll = "AssetExtractMetadataQueueAll", + AssetExtractMetadata = "AssetExtractMetadata", + AssetFileMigration = "AssetFileMigration", + AssetGenerateThumbnailsQueueAll = "AssetGenerateThumbnailsQueueAll", + AssetGenerateThumbnails = "AssetGenerateThumbnails", + AuditLogCleanup = "AuditLogCleanup", + AuditTableCleanup = "AuditTableCleanup", + DatabaseBackup = "DatabaseBackup", + FacialRecognitionQueueAll = "FacialRecognitionQueueAll", + FacialRecognition = "FacialRecognition", + FileDelete = "FileDelete", + FileMigrationQueueAll = "FileMigrationQueueAll", + LibraryDeleteCheck = "LibraryDeleteCheck", + LibraryDelete = "LibraryDelete", + LibraryRemoveAsset = "LibraryRemoveAsset", + LibraryScanAssetsQueueAll = "LibraryScanAssetsQueueAll", + LibrarySyncAssets = "LibrarySyncAssets", + LibrarySyncFilesQueueAll = "LibrarySyncFilesQueueAll", + LibrarySyncFiles = "LibrarySyncFiles", + LibraryScanQueueAll = "LibraryScanQueueAll", + MemoryCleanup = "MemoryCleanup", + MemoryGenerate = "MemoryGenerate", + NotificationsCleanup = "NotificationsCleanup", + NotifyUserSignup = "NotifyUserSignup", + NotifyAlbumInvite = "NotifyAlbumInvite", + NotifyAlbumUpdate = "NotifyAlbumUpdate", + UserDelete = "UserDelete", + UserDeleteCheck = "UserDeleteCheck", + UserSyncUsage = "UserSyncUsage", + PersonCleanup = "PersonCleanup", + PersonFileMigration = "PersonFileMigration", + PersonGenerateThumbnail = "PersonGenerateThumbnail", + SessionCleanup = "SessionCleanup", + SendMail = "SendMail", + SidecarQueueAll = "SidecarQueueAll", + SidecarCheck = "SidecarCheck", + SidecarWrite = "SidecarWrite", + SmartSearchQueueAll = "SmartSearchQueueAll", + SmartSearch = "SmartSearch", + StorageTemplateMigration = "StorageTemplateMigration", + StorageTemplateMigrationSingle = "StorageTemplateMigrationSingle", + TagCleanup = "TagCleanup", + VersionCheck = "VersionCheck", + OcrQueueAll = "OcrQueueAll", + Ocr = "Ocr", + WorkflowRun = "WorkflowRun" +} export enum SearchSuggestionType { Country = "country", State = "state", diff --git a/server/src/constants.ts b/server/src/constants.ts index 68534c00e..33f8e3b4c 100644 --- a/server/src/constants.ts +++ b/server/src/constants.ts @@ -163,6 +163,8 @@ export const endpointTags: Record = { 'A person is a collection of faces, which can be favorited and named. A person can also be merged into another person. People are automatically created via the face recognition job.', [ApiTag.Plugins]: 'A plugin is an installed module that makes filters and actions available for the workflow feature.', + [ApiTag.Queues]: + 'Queues and background jobs are used for processing tasks asynchronously. Queues can be paused and resumed as needed.', [ApiTag.Search]: 'Endpoints related to searching assets via text, smart search, optical character recognition (OCR), and other filters like person, album, and other metadata. Search endpoints usually support pagination and sorting.', [ApiTag.Server]: diff --git a/server/src/controllers/index.ts b/server/src/controllers/index.ts index d5811de48..6ba3d38a7 100644 --- a/server/src/controllers/index.ts +++ b/server/src/controllers/index.ts @@ -20,6 +20,7 @@ import { OAuthController } from 'src/controllers/oauth.controller'; import { PartnerController } from 'src/controllers/partner.controller'; import { PersonController } from 'src/controllers/person.controller'; import { PluginController } from 'src/controllers/plugin.controller'; +import { QueueController } from 'src/controllers/queue.controller'; import { SearchController } from 'src/controllers/search.controller'; import { ServerController } from 'src/controllers/server.controller'; import { SessionController } from 'src/controllers/session.controller'; @@ -59,6 +60,7 @@ export const controllers = [ PartnerController, PersonController, PluginController, + QueueController, SearchController, ServerController, SessionController, diff --git a/server/src/controllers/job.controller.ts b/server/src/controllers/job.controller.ts index 977f1e0f1..783d5a313 100644 --- a/server/src/controllers/job.controller.ts +++ b/server/src/controllers/job.controller.ts @@ -1,10 +1,12 @@ import { Body, Controller, Get, HttpCode, HttpStatus, Param, Post, Put } from '@nestjs/common'; import { ApiTags } from '@nestjs/swagger'; import { Endpoint, HistoryBuilder } from 'src/decorators'; +import { AuthDto } from 'src/dtos/auth.dto'; import { JobCreateDto } from 'src/dtos/job.dto'; -import { QueueCommandDto, QueueNameParamDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto'; +import { QueueResponseLegacyDto, QueuesResponseLegacyDto } from 'src/dtos/queue-legacy.dto'; +import { QueueCommandDto, QueueNameParamDto } from 'src/dtos/queue.dto'; import { ApiTag, Permission } from 'src/enum'; -import { Authenticated } from 'src/middleware/auth.guard'; +import { Auth, Authenticated } from 'src/middleware/auth.guard'; import { JobService } from 'src/services/job.service'; import { QueueService } from 'src/services/queue.service'; @@ -21,10 +23,10 @@ export class JobController { @Endpoint({ summary: 'Retrieve queue counts and status', description: 'Retrieve the counts of the current queue, as well as the current status.', - history: new HistoryBuilder().added('v1').beta('v1').stable('v2'), + history: new HistoryBuilder().added('v1').beta('v1').stable('v2').deprecated('v2.4.0'), }) - getQueuesLegacy(): Promise { - return this.queueService.getAll(); + getQueuesLegacy(@Auth() auth: AuthDto): Promise { + return this.queueService.getAllLegacy(auth); } @Post() @@ -46,9 +48,12 @@ export class JobController { summary: 'Run jobs', description: 'Queue all assets for a specific job type. Defaults to only queueing assets that have not yet been processed, but the force command can be used to re-process all assets.', - history: new HistoryBuilder().added('v1').beta('v1').stable('v2'), + history: new HistoryBuilder().added('v1').beta('v1').stable('v2').deprecated('v2.4.0'), }) - runQueueCommandLegacy(@Param() { name }: QueueNameParamDto, @Body() dto: QueueCommandDto): Promise { - return this.queueService.runCommand(name, dto); + runQueueCommandLegacy( + @Param() { name }: QueueNameParamDto, + @Body() dto: QueueCommandDto, + ): Promise { + return this.queueService.runCommandLegacy(name, dto); } } diff --git a/server/src/controllers/queue.controller.ts b/server/src/controllers/queue.controller.ts new file mode 100644 index 000000000..1d8d918c5 --- /dev/null +++ b/server/src/controllers/queue.controller.ts @@ -0,0 +1,85 @@ +import { Body, Controller, Delete, Get, HttpCode, HttpStatus, Param, Put, Query } from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { Endpoint, HistoryBuilder } from 'src/decorators'; +import { AuthDto } from 'src/dtos/auth.dto'; +import { + QueueDeleteDto, + QueueJobResponseDto, + QueueJobSearchDto, + QueueNameParamDto, + QueueResponseDto, + QueueUpdateDto, +} from 'src/dtos/queue.dto'; +import { ApiTag, Permission } from 'src/enum'; +import { Auth, Authenticated } from 'src/middleware/auth.guard'; +import { QueueService } from 'src/services/queue.service'; + +@ApiTags(ApiTag.Queues) +@Controller('queues') +export class QueueController { + constructor(private service: QueueService) {} + + @Get() + @Authenticated({ permission: Permission.QueueRead, admin: true }) + @Endpoint({ + summary: 'List all queues', + description: 'Retrieves a list of queues.', + history: new HistoryBuilder().added('v2.4.0').alpha('v2.4.0'), + }) + getQueues(@Auth() auth: AuthDto): Promise { + return this.service.getAll(auth); + } + + @Get(':name') + @Authenticated({ permission: Permission.QueueRead, admin: true }) + @Endpoint({ + summary: 'Retrieve a queue', + description: 'Retrieves a specific queue by its name.', + history: new HistoryBuilder().added('v2.4.0').alpha('v2.4.0'), + }) + getQueue(@Auth() auth: AuthDto, @Param() { name }: QueueNameParamDto): Promise { + return this.service.get(auth, name); + } + + @Put(':name') + @Authenticated({ permission: Permission.QueueUpdate, admin: true }) + @Endpoint({ + summary: 'Update a queue', + description: 'Change the paused status of a specific queue.', + history: new HistoryBuilder().added('v2.4.0').alpha('v2.4.0'), + }) + updateQueue( + @Auth() auth: AuthDto, + @Param() { name }: QueueNameParamDto, + @Body() dto: QueueUpdateDto, + ): Promise { + return this.service.update(auth, name, dto); + } + + @Get(':name/jobs') + @Authenticated({ permission: Permission.QueueJobRead, admin: true }) + @Endpoint({ + summary: 'Retrieve queue jobs', + description: 'Retrieves a list of queue jobs from the specified queue.', + history: new HistoryBuilder().added('v2.4.0').alpha('v2.4.0'), + }) + getQueueJobs( + @Auth() auth: AuthDto, + @Param() { name }: QueueNameParamDto, + @Query() dto: QueueJobSearchDto, + ): Promise { + return this.service.searchJobs(auth, name, dto); + } + + @Delete(':name/jobs') + @Authenticated({ permission: Permission.QueueJobDelete, admin: true }) + @HttpCode(HttpStatus.NO_CONTENT) + @Endpoint({ + summary: 'Empty a queue', + description: 'Removes all jobs from the specified queue.', + history: new HistoryBuilder().added('v2.4.0').alpha('v2.4.0'), + }) + emptyQueue(@Auth() auth: AuthDto, @Param() { name }: QueueNameParamDto, @Body() dto: QueueDeleteDto): Promise { + return this.service.emptyQueue(auth, name, dto); + } +} diff --git a/server/src/dtos/queue-legacy.dto.ts b/server/src/dtos/queue-legacy.dto.ts new file mode 100644 index 000000000..79155e3f7 --- /dev/null +++ b/server/src/dtos/queue-legacy.dto.ts @@ -0,0 +1,89 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { QueueResponseDto, QueueStatisticsDto } from 'src/dtos/queue.dto'; +import { QueueName } from 'src/enum'; + +export class QueueStatusLegacyDto { + isActive!: boolean; + isPaused!: boolean; +} + +export class QueueResponseLegacyDto { + @ApiProperty({ type: QueueStatusLegacyDto }) + queueStatus!: QueueStatusLegacyDto; + + @ApiProperty({ type: QueueStatisticsDto }) + jobCounts!: QueueStatisticsDto; +} + +export class QueuesResponseLegacyDto implements Record { + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.ThumbnailGeneration]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.MetadataExtraction]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.VideoConversion]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.SmartSearch]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.StorageTemplateMigration]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Migration]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.BackgroundTask]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Search]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.DuplicateDetection]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.FaceDetection]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.FacialRecognition]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Sidecar]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Library]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Notification]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.BackupDatabase]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Ocr]!: QueueResponseLegacyDto; + + @ApiProperty({ type: QueueResponseLegacyDto }) + [QueueName.Workflow]!: QueueResponseLegacyDto; +} + +export const mapQueueLegacy = (response: QueueResponseDto): QueueResponseLegacyDto => { + return { + queueStatus: { + isPaused: response.isPaused, + isActive: response.statistics.active > 0, + }, + jobCounts: response.statistics, + }; +}; + +export const mapQueuesLegacy = (responses: QueueResponseDto[]): QueuesResponseLegacyDto => { + const legacy = new QueuesResponseLegacyDto(); + + for (const response of responses) { + legacy[response.name] = mapQueueLegacy(response); + } + + return legacy; +}; diff --git a/server/src/dtos/queue.dto.ts b/server/src/dtos/queue.dto.ts index df00c5cfc..38a4a4ac6 100644 --- a/server/src/dtos/queue.dto.ts +++ b/server/src/dtos/queue.dto.ts @@ -1,5 +1,6 @@ import { ApiProperty } from '@nestjs/swagger'; -import { QueueCommand, QueueName } from 'src/enum'; +import { HistoryBuilder, Property } from 'src/decorators'; +import { JobName, QueueCommand, QueueJobStatus, QueueName } from 'src/enum'; import { ValidateBoolean, ValidateEnum } from 'src/validation'; export class QueueNameParamDto { @@ -15,6 +16,46 @@ export class QueueCommandDto { force?: boolean; // TODO: this uses undefined as a third state, which should be refactored to be more explicit } +export class QueueUpdateDto { + @ValidateBoolean({ optional: true }) + isPaused?: boolean; +} + +export class QueueDeleteDto { + @ValidateBoolean({ optional: true }) + @Property({ + description: 'If true, will also remove failed jobs from the queue.', + history: new HistoryBuilder().added('v2.4.0').alpha('v2.4.0'), + }) + failed?: boolean; +} + +export class QueueJobSearchDto { + @ValidateEnum({ enum: QueueJobStatus, name: 'QueueJobStatus', optional: true, each: true }) + status?: QueueJobStatus[]; +} +export class QueueJobResponseDto { + id?: string; + + @ValidateEnum({ enum: JobName, name: 'JobName' }) + name!: JobName; + + data!: object; + + @ApiProperty({ type: 'integer' }) + timestamp!: number; +} + +export class QueueResponseDto { + @ValidateEnum({ enum: QueueName, name: 'QueueName' }) + name!: QueueName; + + @ValidateBoolean() + isPaused!: boolean; + + statistics!: QueueStatisticsDto; +} + export class QueueStatisticsDto { @ApiProperty({ type: 'integer' }) active!: number; @@ -29,69 +70,3 @@ export class QueueStatisticsDto { @ApiProperty({ type: 'integer' }) paused!: number; } - -export class QueueStatusDto { - isActive!: boolean; - isPaused!: boolean; -} - -export class QueueResponseDto { - @ApiProperty({ type: QueueStatisticsDto }) - jobCounts!: QueueStatisticsDto; - - @ApiProperty({ type: QueueStatusDto }) - queueStatus!: QueueStatusDto; -} - -export class QueuesResponseDto implements Record { - @ApiProperty({ type: QueueResponseDto }) - [QueueName.ThumbnailGeneration]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.MetadataExtraction]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.VideoConversion]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.SmartSearch]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.StorageTemplateMigration]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Migration]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.BackgroundTask]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Search]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.DuplicateDetection]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.FaceDetection]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.FacialRecognition]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Sidecar]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Library]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Notification]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.BackupDatabase]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Ocr]!: QueueResponseDto; - - @ApiProperty({ type: QueueResponseDto }) - [QueueName.Workflow]!: QueueResponseDto; -} diff --git a/server/src/enum.ts b/server/src/enum.ts index d397f9d2a..87ff282f3 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -248,6 +248,14 @@ export enum Permission { UserProfileImageUpdate = 'userProfileImage.update', UserProfileImageDelete = 'userProfileImage.delete', + QueueRead = 'queue.read', + QueueUpdate = 'queue.update', + + QueueJobCreate = 'queueJob.create', + QueueJobRead = 'queueJob.read', + QueueJobUpdate = 'queueJob.update', + QueueJobDelete = 'queueJob.delete', + WorkflowCreate = 'workflow.create', WorkflowRead = 'workflow.read', WorkflowUpdate = 'workflow.update', @@ -543,6 +551,15 @@ export enum QueueName { Workflow = 'workflow', } +export enum QueueJobStatus { + Active = 'active', + Failed = 'failed', + Complete = 'completed', + Delayed = 'delayed', + Waiting = 'waiting', + Paused = 'paused', +} + export enum JobName { AssetDelete = 'AssetDelete', AssetDeleteCheck = 'AssetDeleteCheck', @@ -624,9 +641,13 @@ export enum JobName { export enum QueueCommand { Start = 'start', + /** @deprecated Use `updateQueue` instead */ Pause = 'pause', + /** @deprecated Use `updateQueue` instead */ Resume = 'resume', + /** @deprecated Use `emptyQueue` instead */ Empty = 'empty', + /** @deprecated Use `emptyQueue` instead */ ClearFailed = 'clear-failed', } @@ -823,6 +844,7 @@ export enum ApiTag { Partners = 'Partners', People = 'People', Plugins = 'Plugins', + Queues = 'Queues', Search = 'Search', Server = 'Server', Sessions = 'Sessions', diff --git a/server/src/repositories/config.repository.ts b/server/src/repositories/config.repository.ts index 05d4bd2ac..60ec021b3 100644 --- a/server/src/repositories/config.repository.ts +++ b/server/src/repositories/config.repository.ts @@ -249,7 +249,7 @@ const getEnv = (): EnvData => { prefix: 'immich_bull', connection: { ...redisConfig }, defaultJobOptions: { - attempts: 3, + attempts: 1, removeOnComplete: true, removeOnFail: false, }, diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index cf2799a4c..b12accb68 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -5,11 +5,12 @@ import { JobsOptions, Queue, Worker } from 'bullmq'; import { ClassConstructor } from 'class-transformer'; import { setTimeout } from 'node:timers/promises'; import { JobConfig } from 'src/decorators'; -import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueName } from 'src/enum'; +import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto'; +import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum'; import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; -import { JobCounts, JobItem, JobOf, QueueStatus } from 'src/types'; +import { JobCounts, JobItem, JobOf } from 'src/types'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; type JobMapItem = { @@ -115,13 +116,14 @@ export class JobRepository { worker.concurrency = concurrency; } - async getQueueStatus(name: QueueName): Promise { + async isActive(name: QueueName): Promise { const queue = this.getQueue(name); + const count = await queue.getActiveCount(); + return count > 0; + } - return { - isActive: !!(await queue.getActiveCount()), - isPaused: await queue.isPaused(), - }; + async isPaused(name: QueueName): Promise { + return this.getQueue(name).isPaused(); } pause(name: QueueName) { @@ -192,17 +194,28 @@ export class JobRepository { } async waitForQueueCompletion(...queues: QueueName[]): Promise { - let activeQueue: QueueStatus | undefined; - do { - const statuses = await Promise.all(queues.map((name) => this.getQueueStatus(name))); - activeQueue = statuses.find((status) => status.isActive); - } while (activeQueue); - { - this.logger.verbose(`Waiting for ${activeQueue} queue to stop...`); + const getPending = async () => { + const results = await Promise.all(queues.map(async (name) => ({ pending: await this.isActive(name), name }))); + return results.filter(({ pending }) => pending).map(({ name }) => name); + }; + + let pending = await getPending(); + + while (pending.length > 0) { + this.logger.verbose(`Waiting for ${pending[0]} queue to stop...`); await setTimeout(1000); + pending = await getPending(); } } + async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise { + const jobs = await this.getQueue(name).getJobs(dto.status ?? Object.values(QueueJobStatus), 0, 1000); + return jobs.map((job) => { + const { id, name, timestamp, data } = job; + return { id, name: name as JobName, timestamp, data }; + }); + } + private getJobOptions(item: JobItem): JobsOptions | null { switch (item.name) { case JobName.NotifyAlbumUpdate: { diff --git a/server/src/services/queue.service.spec.ts b/server/src/services/queue.service.spec.ts index 5dce9476e..f5cf20413 100644 --- a/server/src/services/queue.service.spec.ts +++ b/server/src/services/queue.service.spec.ts @@ -2,6 +2,7 @@ import { BadRequestException } from '@nestjs/common'; import { defaults, SystemConfig } from 'src/config'; import { ImmichWorker, JobName, QueueCommand, QueueName } from 'src/enum'; import { QueueService } from 'src/services/queue.service'; +import { factory } from 'test/small.factory'; import { newTestService, ServiceMocks } from 'test/utils'; describe(QueueService.name, () => { @@ -52,80 +53,64 @@ describe(QueueService.name, () => { describe('getAllJobStatus', () => { it('should get all job statuses', async () => { - mocks.job.getJobCounts.mockResolvedValue({ - active: 1, - completed: 1, - failed: 1, - delayed: 1, - waiting: 1, - paused: 1, - }); - mocks.job.getQueueStatus.mockResolvedValue({ - isActive: true, - isPaused: true, - }); + const stats = factory.queueStatistics({ active: 1 }); + const expected = { jobCounts: stats, queueStatus: { isActive: true, isPaused: true } }; - const expectedJobStatus = { - jobCounts: { - active: 1, - completed: 1, - delayed: 1, - failed: 1, - waiting: 1, - paused: 1, - }, - queueStatus: { - isActive: true, - isPaused: true, - }, - }; + mocks.job.getJobCounts.mockResolvedValue(stats); + mocks.job.isPaused.mockResolvedValue(true); - await expect(sut.getAll()).resolves.toEqual({ - [QueueName.BackgroundTask]: expectedJobStatus, - [QueueName.DuplicateDetection]: expectedJobStatus, - [QueueName.SmartSearch]: expectedJobStatus, - [QueueName.MetadataExtraction]: expectedJobStatus, - [QueueName.Search]: expectedJobStatus, - [QueueName.StorageTemplateMigration]: expectedJobStatus, - [QueueName.Migration]: expectedJobStatus, - [QueueName.ThumbnailGeneration]: expectedJobStatus, - [QueueName.VideoConversion]: expectedJobStatus, - [QueueName.FaceDetection]: expectedJobStatus, - [QueueName.FacialRecognition]: expectedJobStatus, - [QueueName.Sidecar]: expectedJobStatus, - [QueueName.Library]: expectedJobStatus, - [QueueName.Notification]: expectedJobStatus, - [QueueName.BackupDatabase]: expectedJobStatus, - [QueueName.Ocr]: expectedJobStatus, - [QueueName.Workflow]: expectedJobStatus, + await expect(sut.getAllLegacy(factory.auth())).resolves.toEqual({ + [QueueName.BackgroundTask]: expected, + [QueueName.DuplicateDetection]: expected, + [QueueName.SmartSearch]: expected, + [QueueName.MetadataExtraction]: expected, + [QueueName.Search]: expected, + [QueueName.StorageTemplateMigration]: expected, + [QueueName.Migration]: expected, + [QueueName.ThumbnailGeneration]: expected, + [QueueName.VideoConversion]: expected, + [QueueName.FaceDetection]: expected, + [QueueName.FacialRecognition]: expected, + [QueueName.Sidecar]: expected, + [QueueName.Library]: expected, + [QueueName.Notification]: expected, + [QueueName.BackupDatabase]: expected, + [QueueName.Ocr]: expected, + [QueueName.Workflow]: expected, }); }); }); describe('handleCommand', () => { it('should handle a pause command', async () => { - await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false }); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); + + await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false }); expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction); }); it('should handle a resume command', async () => { - await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false }); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); + + await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false }); expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction); }); it('should handle an empty command', async () => { - await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false }); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); + + await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false }); expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction); }); it('should not start a job that is already running', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false }); + mocks.job.isActive.mockResolvedValue(true); await expect( - sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }), + sut.runCommandLegacy(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }), ).rejects.toBeInstanceOf(BadRequestException); expect(mocks.job.queue).not.toHaveBeenCalled(); @@ -133,33 +118,37 @@ describe(QueueService.name, () => { }); it('should handle a start video conversion command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } }); }); it('should handle a start storage template migration command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration }); }); it('should handle a start smart search command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.SmartSearch, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.SmartSearch, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } }); }); it('should handle a start metadata extraction command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetExtractMetadataQueueAll, @@ -168,17 +157,19 @@ describe(QueueService.name, () => { }); it('should handle a start sidecar command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.Sidecar, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.Sidecar, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } }); }); it('should handle a start thumbnail generation command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetGenerateThumbnailsQueueAll, @@ -187,34 +178,37 @@ describe(QueueService.name, () => { }); it('should handle a start face detection command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.FaceDetection, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.FaceDetection, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } }); }); it('should handle a start facial recognition command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } }); }); it('should handle a start backup database command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); + mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics()); - await sut.runCommand(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false }); + await sut.runCommandLegacy(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } }); }); it('should throw a bad request when an invalid queue is used', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + mocks.job.isActive.mockResolvedValue(false); await expect( - sut.runCommand(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }), + sut.runCommandLegacy(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }), ).rejects.toBeInstanceOf(BadRequestException); expect(mocks.job.queue).not.toHaveBeenCalled(); diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index bea665e8f..cdfa2ad2e 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -2,7 +2,21 @@ import { BadRequestException, Injectable } from '@nestjs/common'; import { ClassConstructor } from 'class-transformer'; import { SystemConfig } from 'src/config'; import { OnEvent } from 'src/decorators'; -import { QueueCommandDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto'; +import { AuthDto } from 'src/dtos/auth.dto'; +import { + mapQueueLegacy, + mapQueuesLegacy, + QueueResponseLegacyDto, + QueuesResponseLegacyDto, +} from 'src/dtos/queue-legacy.dto'; +import { + QueueCommandDto, + QueueDeleteDto, + QueueJobResponseDto, + QueueJobSearchDto, + QueueResponseDto, + QueueUpdateDto, +} from 'src/dtos/queue.dto'; import { BootstrapEventPriority, CronJob, @@ -86,7 +100,7 @@ export class QueueService extends BaseService { this.services = services; } - async runCommand(name: QueueName, dto: QueueCommandDto): Promise { + async runCommandLegacy(name: QueueName, dto: QueueCommandDto): Promise { this.logger.debug(`Handling command: queue=${name},command=${dto.command},force=${dto.force}`); switch (dto.command) { @@ -117,28 +131,60 @@ export class QueueService extends BaseService { } } + const response = await this.getByName(name); + + return mapQueueLegacy(response); + } + + async getAll(_auth: AuthDto): Promise { + return Promise.all(Object.values(QueueName).map((name) => this.getByName(name))); + } + + async getAllLegacy(auth: AuthDto): Promise { + const responses = await this.getAll(auth); + return mapQueuesLegacy(responses); + } + + get(auth: AuthDto, name: QueueName): Promise { return this.getByName(name); } - async getAll(): Promise { - const response = new QueuesResponseDto(); - for (const name of Object.values(QueueName)) { - response[name] = await this.getByName(name); + async update(auth: AuthDto, name: QueueName, dto: QueueUpdateDto): Promise { + if (dto.isPaused === true) { + if (name === QueueName.BackgroundTask) { + throw new BadRequestException(`The BackgroundTask queue cannot be paused`); + } + await this.jobRepository.pause(name); } - return response; + + if (dto.isPaused === false) { + await this.jobRepository.resume(name); + } + + return this.getByName(name); } - async getByName(name: QueueName): Promise { - const [jobCounts, queueStatus] = await Promise.all([ - this.jobRepository.getJobCounts(name), - this.jobRepository.getQueueStatus(name), - ]); + searchJobs(auth: AuthDto, name: QueueName, dto: QueueJobSearchDto): Promise { + return this.jobRepository.searchJobs(name, dto); + } - return { jobCounts, queueStatus }; + async emptyQueue(auth: AuthDto, name: QueueName, dto: QueueDeleteDto) { + await this.jobRepository.empty(name); + if (dto.failed) { + await this.jobRepository.clear(name, QueueCleanType.Failed); + } + } + + private async getByName(name: QueueName): Promise { + const [statistics, isPaused] = await Promise.all([ + this.jobRepository.getJobCounts(name), + this.jobRepository.isPaused(name), + ]); + return { name, isPaused, statistics }; } private async start(name: QueueName, { force }: QueueCommandDto): Promise { - const { isActive } = await this.jobRepository.getQueueStatus(name); + const isActive = await this.jobRepository.isActive(name); if (isActive) { throw new BadRequestException(`Job is already running`); } diff --git a/server/src/types.ts b/server/src/types.ts index dd3d25a7c..848d19177 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -291,11 +291,6 @@ export interface JobCounts { paused: number; } -export interface QueueStatus { - isActive: boolean; - isPaused: boolean; -} - export type JobItem = // Audit | { name: JobName.AuditTableCleanup; data?: IBaseJob } diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index f0f4fdda0..4fc5460c8 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -11,9 +11,11 @@ export const newJobRepositoryMock = (): Mocked Promise.resolve()), queueAll: vitest.fn().mockImplementation(() => Promise.resolve()), - getQueueStatus: vitest.fn(), + isActive: vitest.fn(), + isPaused: vitest.fn(), getJobCounts: vitest.fn(), clear: vitest.fn(), waitForQueueCompletion: vitest.fn(), diff --git a/server/test/small.factory.ts b/server/test/small.factory.ts index ea0df585e..a0de947b2 100644 --- a/server/test/small.factory.ts +++ b/server/test/small.factory.ts @@ -14,6 +14,7 @@ import { } from 'src/database'; import { MapAsset } from 'src/dtos/asset-response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; +import { QueueStatisticsDto } from 'src/dtos/queue.dto'; import { AssetStatus, AssetType, AssetVisibility, MemoryType, Permission, UserMetadataKey, UserStatus } from 'src/enum'; import { OnThisDayData, UserMetadataItem } from 'src/types'; import { v4, v7 } from 'uuid'; @@ -139,6 +140,16 @@ const sessionFactory = (session: Partial = {}) => ({ ...session, }); +const queueStatisticsFactory = (dto?: Partial) => ({ + active: 0, + completed: 0, + failed: 0, + delayed: 0, + waiting: 0, + paused: 0, + ...dto, +}); + const stackFactory = () => ({ id: newUuid(), ownerId: newUuid(), @@ -353,6 +364,7 @@ export const factory = { library: libraryFactory, memory: memoryFactory, partner: partnerFactory, + queueStatistics: queueStatisticsFactory, session: sessionFactory, stack: stackFactory, user: userFactory, diff --git a/web/src/lib/components/jobs/JobTile.svelte b/web/src/lib/components/jobs/JobTile.svelte index 64a6db5b7..8bdd7c169 100644 --- a/web/src/lib/components/jobs/JobTile.svelte +++ b/web/src/lib/components/jobs/JobTile.svelte @@ -1,7 +1,7 @@