From d784d431d058cd3227eb6b41a5d2f0e22ad5fa03 Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Fri, 14 Nov 2025 14:42:00 -0500 Subject: [PATCH] refactor: job vs queue naming (#23902) --- e2e/src/api/specs/jobs.e2e-spec.ts | 86 ++--- e2e/src/api/specs/user-admin.e2e-spec.ts | 4 +- e2e/src/utils.ts | 20 +- mobile/openapi/README.md | Bin 45954 -> 45988 bytes mobile/openapi/lib/api.dart | Bin 14774 -> 14782 bytes mobile/openapi/lib/api/jobs_api.dart | Bin 5757 -> 5813 bytes mobile/openapi/lib/api_client.dart | Bin 36721 -> 36741 bytes mobile/openapi/lib/api_helper.dart | Bin 7200 -> 7208 bytes mobile/openapi/lib/model/job_command.dart | Bin 2898 -> 0 bytes mobile/openapi/lib/model/job_name.dart | Bin 4514 -> 0 bytes mobile/openapi/lib/model/queue_command.dart | Bin 0 -> 2958 bytes ...ommand_dto.dart => queue_command_dto.dart} | Bin 3423 -> 3465 bytes mobile/openapi/lib/model/queue_name.dart | Bin 0 -> 4618 bytes ...tatus_dto.dart => queue_response_dto.dart} | Bin 3089 -> 3177 bytes ...nts_dto.dart => queue_statistics_dto.dart} | Bin 3895 -> 4009 bytes ...onse_dto.dart => queues_response_dto.dart} | Bin 7743 -> 7757 bytes open-api/immich-openapi-specs.json | 348 +++++++++--------- open-api/typescript-sdk/src/fetch-client.ts | 64 ++-- server/src/app.module.ts | 12 +- server/src/controllers/job.controller.ts | 19 +- server/src/dtos/job.dto.ts | 96 +---- server/src/dtos/queue.dto.ts | 94 +++++ server/src/enum.ts | 2 +- server/src/services/api.service.ts | 2 - server/src/services/index.ts | 2 + server/src/services/job.service.spec.ts | 207 +---------- server/src/services/job.service.ts | 252 +------------ server/src/services/queue.service.spec.ts | 223 +++++++++++ server/src/services/queue.service.ts | 250 +++++++++++++ .../admin-settings/JobSettings.svelte | 40 +- web/src/lib/components/jobs/JobTile.svelte | 38 +- web/src/lib/components/jobs/JobsPanel.svelte | 86 +++-- web/src/lib/utils.ts | 42 +-- web/src/routes/admin/jobs-status/+page.svelte | 22 +- web/src/routes/admin/jobs-status/+page.ts | 4 +- .../admin/library-management/+page.svelte | 8 +- 36 files changed, 976 insertions(+), 945 deletions(-) delete mode 100644 mobile/openapi/lib/model/job_command.dart delete mode 100644 mobile/openapi/lib/model/job_name.dart create mode 100644 mobile/openapi/lib/model/queue_command.dart rename mobile/openapi/lib/model/{job_command_dto.dart => queue_command_dto.dart} (66%) create mode 100644 mobile/openapi/lib/model/queue_name.dart rename mobile/openapi/lib/model/{job_status_dto.dart => queue_response_dto.dart} (62%) rename mobile/openapi/lib/model/{job_counts_dto.dart => queue_statistics_dto.dart} (70%) rename mobile/openapi/lib/model/{all_job_status_response_dto.dart => queues_response_dto.dart} (57%) create mode 100644 server/src/dtos/queue.dto.ts create mode 100644 server/src/services/queue.service.spec.ts create mode 100644 server/src/services/queue.service.ts diff --git a/e2e/src/api/specs/jobs.e2e-spec.ts b/e2e/src/api/specs/jobs.e2e-spec.ts index a9afd8475..be7984404 100644 --- a/e2e/src/api/specs/jobs.e2e-spec.ts +++ b/e2e/src/api/specs/jobs.e2e-spec.ts @@ -1,4 +1,4 @@ -import { JobCommand, JobName, LoginResponseDto, updateConfig } from '@immich/sdk'; +import { LoginResponseDto, QueueCommand, QueueName, updateConfig } from '@immich/sdk'; import { cpSync, rmSync } from 'node:fs'; import { readFile } from 'node:fs/promises'; import { basename } from 'node:path'; @@ -17,28 +17,28 @@ describe('/jobs', () => { describe('PUT /jobs', () => { afterEach(async () => { - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.FaceDetection, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.FaceDetection, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.SmartSearch, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.SmartSearch, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.DuplicateDetection, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.DuplicateDetection, { + command: QueueCommand.Resume, force: false, }); @@ -59,8 +59,8 @@ describe('/jobs', () => { it('should queue metadata extraction for missing assets', async () => { const path = `${testAssetDir}/formats/raw/Nikon/D700/philadelphia.nef`; - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Pause, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Pause, force: false, }); @@ -77,20 +77,20 @@ describe('/jobs', () => { expect(asset.exifInfo?.make).toBeNull(); } - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Empty, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Empty, force: false, }); await utils.waitForQueueFinish(admin.accessToken, 'metadataExtraction'); - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Start, force: false, }); @@ -124,8 +124,8 @@ describe('/jobs', () => { cpSync(`${testAssetDir}/formats/raw/Nikon/D80/glarus.nef`, path); - await utils.jobCommand(admin.accessToken, JobName.MetadataExtraction, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.MetadataExtraction, { + command: QueueCommand.Start, force: false, }); @@ -144,8 +144,8 @@ describe('/jobs', () => { it('should queue thumbnail extraction for assets missing thumbs', async () => { const path = `${testAssetDir}/albums/nature/tanners_ridge.jpg`; - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Pause, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Pause, force: false, }); @@ -153,32 +153,32 @@ describe('/jobs', () => { assetData: { bytes: await readFile(path), filename: basename(path) }, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetBefore = await utils.getAssetInfo(admin.accessToken, id); expect(assetBefore.thumbhash).toBeNull(); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Empty, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Empty, force: false, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Resume, force: false, }); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Start, force: false, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetAfter = await utils.getAssetInfo(admin.accessToken, id); expect(assetAfter.thumbhash).not.toBeNull(); @@ -193,26 +193,26 @@ describe('/jobs', () => { assetData: { bytes: await readFile(path), filename: basename(path) }, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetBefore = await utils.getAssetInfo(admin.accessToken, id); cpSync(`${testAssetDir}/albums/nature/notocactus_minimus.jpg`, path); - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Resume, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Resume, force: false, }); // This runs the missing thumbnail job - await utils.jobCommand(admin.accessToken, JobName.ThumbnailGeneration, { - command: JobCommand.Start, + await utils.queueCommand(admin.accessToken, QueueName.ThumbnailGeneration, { + command: QueueCommand.Start, force: false, }); - await utils.waitForQueueFinish(admin.accessToken, JobName.MetadataExtraction); - await utils.waitForQueueFinish(admin.accessToken, JobName.ThumbnailGeneration); + await utils.waitForQueueFinish(admin.accessToken, QueueName.MetadataExtraction); + await utils.waitForQueueFinish(admin.accessToken, QueueName.ThumbnailGeneration); const assetAfter = await utils.getAssetInfo(admin.accessToken, id); diff --git a/e2e/src/api/specs/user-admin.e2e-spec.ts b/e2e/src/api/specs/user-admin.e2e-spec.ts index 2d6e08b5f..793c508a3 100644 --- a/e2e/src/api/specs/user-admin.e2e-spec.ts +++ b/e2e/src/api/specs/user-admin.e2e-spec.ts @@ -1,6 +1,6 @@ import { - JobName, LoginResponseDto, + QueueName, createStack, deleteUserAdmin, getMyUser, @@ -328,7 +328,7 @@ describe('/admin/users', () => { { headers: asBearerAuth(user.accessToken) }, ); - await utils.waitForQueueFinish(admin.accessToken, JobName.BackgroundTask); + await utils.waitForQueueFinish(admin.accessToken, QueueName.BackgroundTask); const { status, body } = await request(app) .delete(`/admin/users/${user.userId}`) diff --git a/e2e/src/utils.ts b/e2e/src/utils.ts index b33d6cb19..8f34bbe40 100644 --- a/e2e/src/utils.ts +++ b/e2e/src/utils.ts @@ -1,5 +1,4 @@ import { - AllJobStatusResponseDto, AssetMediaCreateDto, AssetMediaResponseDto, AssetResponseDto, @@ -7,11 +6,12 @@ import { CheckExistingAssetsDto, CreateAlbumDto, CreateLibraryDto, - JobCommandDto, - JobName, MetadataSearchDto, Permission, PersonCreateDto, + QueueCommandDto, + QueueName, + QueuesResponseDto, SharedLinkCreateDto, UpdateLibraryDto, UserAdminCreateDto, @@ -27,14 +27,14 @@ import { createStack, createUserAdmin, deleteAssets, - getAllJobsStatus, getAssetInfo, getConfig, getConfigDefaults, + getQueuesLegacy, login, + runQueueCommandLegacy, scanLibrary, searchAssets, - sendJobCommand, setBaseUrl, signUpAdmin, tagAssets, @@ -477,8 +477,8 @@ export const utils = { tagAssets: (accessToken: string, tagId: string, assetIds: string[]) => tagAssets({ id: tagId, bulkIdsDto: { ids: assetIds } }, { headers: asBearerAuth(accessToken) }), - jobCommand: async (accessToken: string, jobName: JobName, jobCommandDto: JobCommandDto) => - sendJobCommand({ id: jobName, jobCommandDto }, { headers: asBearerAuth(accessToken) }), + queueCommand: async (accessToken: string, name: QueueName, queueCommandDto: QueueCommandDto) => + runQueueCommandLegacy({ name, queueCommandDto }, { headers: asBearerAuth(accessToken) }), setAuthCookies: async (context: BrowserContext, accessToken: string, domain = '127.0.0.1') => await context.addCookies([ @@ -524,13 +524,13 @@ export const utils = { await updateConfig({ systemConfigDto: defaultConfig }, { headers: asBearerAuth(accessToken) }); }, - isQueueEmpty: async (accessToken: string, queue: keyof AllJobStatusResponseDto) => { - const queues = await getAllJobsStatus({ headers: asBearerAuth(accessToken) }); + isQueueEmpty: async (accessToken: string, queue: keyof QueuesResponseDto) => { + const queues = await getQueuesLegacy({ headers: asBearerAuth(accessToken) }); const jobCounts = queues[queue].jobCounts; return !jobCounts.active && !jobCounts.waiting; }, - waitForQueueFinish: (accessToken: string, queue: keyof AllJobStatusResponseDto, ms?: number) => { + waitForQueueFinish: (accessToken: string, queue: keyof QueuesResponseDto, ms?: number) => { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { const timeout = setTimeout(() => reject(new Error('Timed out waiting for queue to empty')), ms || 10_000); diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index ff04a91defa0bdb14a6d3d3b88990008bb535ec7..5e93c571bd59ff4be58953b51591eeb412d16f62 100644 GIT binary patch delta 383 zcmZpA&a~t?(}wA~`hlgXrK!a}sp*Nym0DV{8Y%h7`ubk^NyUx@nR>Y?%IT>kga!mF9u9Ip^o*Cg!E!)(lh#)|?Dc2r)uap+-SVDgGVbX3xz+6$g1Id$y@h?rYQBe7~)oyB=x@)Md!_0-dMHrJ$=2jV$O=l8;3xP!v_Q zAJ9f*bs#>filEfug8aPV$(`-otY8Z!8&-;FgIU2Pi6xoEC7H>^Xttq9Opcgj$^ue9 SSzw|Fgkd;QY;*6VuL=N3V2t(v delta 360 zcmZ4ToT=$K(}wA~hK@NoUinGI!6k_$rNvrWu^K7)$@=;rA;*GDz1$S#^wg5XoSZD6 zKrvX&@H zuh;C!3buc;VZ9`jl9^mQ*^x_P@>4k#7Ldf`2eQhWIpx<%0RR!f BCUF1& delta 94 zcmdm2ysdbHHWyn?PJC8=(&XFxnv*ARvu-xvTF=FnpPQSQmohn#U4$7VXCWUji2mZ3d`3=gv!MK1DF9=MA{PJv diff --git a/mobile/openapi/lib/api/jobs_api.dart b/mobile/openapi/lib/api/jobs_api.dart index e783f93c7cfa15b31ff3aebd99c6c8314f48cdd5..906dce6d376298ebbac0eca9d22b0ad19a92eeb6 100644 GIT binary patch delta 532 zcmeyXvsHJ)Bo3ay($vz_VxQFX#N^7&^EvvNR3IWjsl^5PdBv$NCHeMt3hAjO$O<(k z?-y3(M-$xK$8E+Wi=kUxM^hoOSOF})`7LiKBR`nwmzbLxtB?mIC;Rfth=IkN^K)|( z^HP9T#wrwo+%>tHU(3CyG!I!8;?(fWk_?ZMk^;}Xw0sS?fnf7=Aa)@PgPL`hKUy7R zU#&Wq0vPBOr55BQCZ{^)7~An)ive7>1Y#Dqx(-6hEJGWi{^E>kTR0K1vPE&u=k delta 451 zcmdn0`&VbfBo1E3oE)$Gq~hR`#FEnD&GR_=nY1865RstN;)49V;#8NCe0w{E^wbg* z^^*g*HPK9Hz9p<{P}BjNCv6_$B71#wui{Ot#^d5d!j^^K)|( z^HPA;#wujxCr!@d*Rm*1%}YT@g=dyzc$AbBc;=<$Yd}o^8lapQHc) diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index cf6784fb8301f09b52d61cee5315877290822c58..373a4b9d8beaffed80fd4fee5d96c5042850bb3b 100644 GIT binary patch delta 179 zcmex3kEwM&(}qh5lP@T1kB~UR(Ic1=d0FXY$;>jBwpgIeEbvHlopJ)#NkZnTd delta 182 zcmZpj&-8H~(}qh5Tsb*j`ANYgi6y0z6%7^G9Krm_>vWAKH)yhKzNWw^!Iqz!o0yj} zSy4`e3#ir^%57BFX9jX7`N?s@MJGGRsj!22rFoMN7)U~7O7lvJCp*e1Prhv7&+M0& wJK4}o6r^=>zMt`AdCLF}ARFR>g9Z{HEnp5<%jSGbGm*)`{@$Cv_)oM40OhVgCIA2c diff --git a/mobile/openapi/lib/api_helper.dart b/mobile/openapi/lib/api_helper.dart index 1d197a8f91104d6a38dd5f037a288cbb63a1b6d4..5c21009a0b21367e094e5be1d2457551789b0ac2 100644 GIT binary patch delta 68 zcmZ2rvBF|QEca$fu*UXsgo19#dyFh=ltB<#JrTr8`$ijQWM#Pp{#j4&XfHG LI5xlMH{<~TKGqkh delta 48 zcmZ2svA|+OEH|@Pe$r$;elac}?VO*Ro0yj}c>|mM=eJ`9E-&=M7MrAUpWVi-mK`|fy> zEZbUEpot_t_rkm9p38JPnoi;Raq;l>xB312&+Enf25#>@&nIy60QV0I`0{Xbcl*~F z!U*{>7cR_xPJVhfqF41~tdnVBU0NtDFQGQImw76+ETsvP_Fa{gbz#s#=&jiKw5mhu z;W!@$PpVScxD(fRS5}rY>Qt z!Un=dLgnl;CY8hTS?F4>610oi_M9c@IBb+pBH{wWvFinz*Cv-v@059hcuJ`emZOxi zTuVnT3uT0cufcW5=;ot+md;^(gfxf2^27K7W*^vTgm%ZDyfw#%SlpnpCUJbh8qE$b z;LhXcb4(xL0A2|s!zny|9)LUP>v90k-vs0fR~tl`C%SjU zme|ZfWWl=KYn&p=rCp9!SF;lhJaUYZ$m4gcmH&lnzl}Q zkHWodYY{uXS`v6g$;Fy@hQqZP4N(l4Z0j}132V1%gdc6MQ4sgrj*2Rirm3r%v=L|24#w4iQe`eo?5zwWno=j;%&#RkdHb^npf$vfNYQs;T+7?KL&IRjRI& zC`Q!BPN+zQ2nZ)C;Bm=k8$Ru8Lnd(Fwfpe0>0d981m)BP);N-k2G0v|2p>;Q@XbOT zGT@^V3tovs2Jz$sf3OXQ%zBGxZ=B0Qpr~JB?c0A;!?Ui!{aO(s5ToGQJ>lLrAu?@z z{%F1#KaA<`|e0d zF-5uy5L@EC@W{J+?n%zhMrUVmbw9uR`Tgv6_W5c)yM}i+A7>M|zJuGlIefaizIpfe z3sfV`wCY@RiAiFQ#bcL+w0Gmt(gTcIw5axKZ#sN83h~+{nF98}L#Jigd#k z6|Yv@kiWH1Ou`pudQI=} zDJJB5uDC&gR&a4}7ITu(IKguPM1C-ztYnVOzB$8+jGI&;3J^(aKg`n^?o^MKv~jT2CqctP64 zVSUh z4KqRy3!jwsX4{**gtgI?hu?CKV-Ep0OaubzR%gDi`P?vNam-fSj8CUn&AQ~{gW1t~ zrEK>}0*P<$hVK7CsGq?1mDUmv1$K-I;sVx8T0VI?^%Jg}Er#1hx+w8@2ui(J!6(+Q2wWE8LLEj#QuJ-F9KxJ&%?CYS7A$652WJ9aLXj zfNG>X^aK!sp0cv+FDTdnJo(tW@9<{0YmDlwoTk%^yNfyRugLHJGGX#}pEUkkkcqbwFbhsGLZ-(eSL#4Isl4^5ZBO8cvx&3 zWUbh+hF3@=%kD-THPYoM*8HXhX7E}5Y5xk^EQ=TNYl&!3-CsO(K?$9CklxI08txGMk3}2jPKpd~&ZI6~bot8#k z=kOC@OkmgD9zWm;#5pKFBm-ewd_f$P+#wz6=Ja4bB5=1nfDkdJfCJUb=h^(4=*E8u C2ia}_ diff --git a/mobile/openapi/lib/model/queue_command.dart b/mobile/openapi/lib/model/queue_command.dart new file mode 100644 index 0000000000000000000000000000000000000000..f03ec6eccd0be2fbc0f40211f46a6225f8b98843 GIT binary patch literal 2958 zcmai0ZExE)5dQ98aRG+L0bF_8r^20^28*+$=#p)9J`9E-Fw!aJN|72##W0He_uY|_ zA~{+XAU5TD?oaXb%d=3|P>)W@#tZr6+UaVJ_aCQB8HG|7rxVc@!Z?~7%SAU(5 z8f$*ch4srHlONtq_)$J+V=*m^O$(`T0ky6j<|!&HQTti@EX&GRKWZV=UL0In)jqZO zPc1E(Yf;i?B`ke=w2quxH*TI8?Yz)Fl{p2Jg+v?7-Dof+HM&$8{XlAdCv^Jm`!w4L z>n0Nl=fMW?XcHzDQcS$<+JL!_3b4DSf^EElBb&ggYq<(-PA`dK)J62)W8 zJ)T2W@z_g*f7ph*HnIA?&!MoU4BgESG+)%u2sp+;UvYh?aBYQlG}I;9=~SFdOX@DvhgzvGidUt8T;dBZ79+Y&9hVukomlPBifIdt8jsiaCo zf+8~WkO>WhKW^6+=qv+4WyriSL?n3AP{J(uWLn$AMsJ}7NG`m9dy#!*?V)`#!iz`9 z32iMz=8ZkPjxdH`+THd1d^y6BN6v{f<5<4r)Bnk|6E-LUeOOCtv9F(Ff%;wM~F<%urL{)Vu!W!QRK)4HH~^NJYvNqyAD!#zT5 zNe-^gGIXvWCkLK$#sS&u073Q|k%V~yznVP~@6UKvjzWitdp&mJ`+sf_V>8Hh+0hp>6Dz_CUU2k6knN(h&lZ`Wz1Z%}Mbp8-O%ftn>m&0qjWiZ$|d?1jZ(YmOWC*449lj)fr7Ur-;~Kd$CUSL=S&Si>{A;KDx8dxG`> zb<@*3{Z$j@;=56V3e9F5pap!#y@@XZZH!k4nTeh6M~oY1?F)fOp(nZ>@Zb^G5cC_y SWlW81;UoR?XWDzu?EWt@zR*Vi literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/job_command_dto.dart b/mobile/openapi/lib/model/queue_command_dto.dart similarity index 66% rename from mobile/openapi/lib/model/job_command_dto.dart rename to mobile/openapi/lib/model/queue_command_dto.dart index 32274037f6d5b05f770b3ce04a9d03b417313e68..ded848c12f158b9d10b2d938ff944051cca03a54 100644 GIT binary patch delta 219 zcmcaF)hWF}osl)LG_^E!vKFHfl;zJT0cB+|Izm{R*D+p#DQsr`4Hax;v4#omXPpPt zlfY&RWi4P!gR(f;WuYuh_FgD!Cwn-QCCpI(Wwmi^fwF8lp*Bx$wosroqKWVZKqmmqucFxbuP0UMiDansj$jmD)Nz6-5)#Fl704XV8bOb5g zyp8b^L{BgCZ?Hfwi#0^xEbBb5VFhfqV0sf<8km-4mj%=I?7d+6B6~QP*5xPw(~CH^ nfaz#Xunm(JavlZqv$?i{X;p3)FrCMp3Z`#xp9j-xc?-RE3fj4m$V`pe?s$9MDl`KRl}{046CKFlX@^8oh`3;6hOb9ejK z3sfV`w>dLz_RI9=S0j2ATcr(OZM0c!gyfe{smk(v#U(Gea+Bs=6s0yUX~E>49qg*C z+{*BODq+xEu>${=%;2Zv#v-_}Y4c1g>zHyYk>iG9Be?PAHo2J;irbacYKv-a$JFZ8 zPpfRlj2(?|I|q6Ly5x$LVv4`7N25_DnYHj|#VdY~?!dPZzy%)tF6=9qI}PS~keILFNb@&c_Zm2)ExLT$lI$uZIm zn-siQb3=$5p_qiv&a|2y=B@t9jlu0vr4~9J9>(V|dlNvtYAb$kwHgXBv`%ET-)>hG zYsG|o#}zjy&>5TUP;K(Pf@-e=BCwu|D~ktHnis+1zb3Os#;SS5k8 zX4%u$Xe_z~vrmx|eR0CxI&IjNFL+T(^yZ`3o+yt~iV4i(BuFbsSlJU{^vfxIte}jU z=*Tv#CoH5`8)gox$fOn|Oq8a^gn7amwUJ#}mo>9IGVhBM_BxA?hijx+ zH=GZ``p(Csgi>28){=(~VL_`o00KmfW8wwz zB@gR^dm9#em$6;y->$ISxin!Rz64>tU)Z(riCPZQw)SdeUsv^WD4FomR7rU2r8q40 z(-zh+EA+7RiK%1T#_|d_Mi;(G=LZ~+*p|cGpmyGBEe<7L7^W=`4sD0=X~6m zF*Pz}y^|`C_6J)_c?!G=l8C(q}88a7kNa9c??C;&VRm4Mj5 zn6Lmxm@l`NFM-~t!rp5)FG_c4zz|Lw7>CJ*TT%&<>QUZo7$#$P1hpYTJ3n@4@34DN zelY`8N!i5$2#!x#p0{BNb^y=b`{pye8SW~h8Y`#iJmV%fmu;B*{?DUlB1#v~CMC?x zz$%b|ILMd+yj_%~CE&C0#3JJ~@;mnpoUTXt!~-&g)FLEz2~9=NPz5S2((V{fhcsqT zCq9pO!LarZ-zg?4wN>li)zxgsXa=xIa6*?E5T8?OH5UoN7`H_YbSzI>=poRhwt3z7 z`nJfy9-AomR4*S%c}am_$0HGE!gPecaEB4brlk{XZUry{W3c zC?wv2it~JKj5gtrJG>sM9||I!1BcMh10YAv53r}xAma5OcYt&m=`#i#qtqe3;eO#^ z@fsoP#Vc%hfiyI4B0~E3<-nkZ;m8LCPMgXSZ$)4&5v(ka^qCguN|2XrAzalVA2*q% z6gTauO(b?8N@LrdsP~W}M3>(pTSwC6rQMd0(V$I}*~>baN2jKV@fMmMHIW)Bo^)pk zq*yBg8jQvi%0MN8=nc{oKu;lM-XFRrAo@xXOlL@^06OBxG#=+Egjg#A>d*faNM9oY zILT)d$vveTfwmkY!P;=aruR#=DQv9a zJ9_AcIDNvDxP%|52L*oK?C~9TK`e#*L;4T~%4h79k~^eO$GJY3quAVQ?m%60NFWEw LSI0^Inn=ojaOVjH literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/job_status_dto.dart b/mobile/openapi/lib/model/queue_response_dto.dart similarity index 62% rename from mobile/openapi/lib/model/job_status_dto.dart rename to mobile/openapi/lib/model/queue_response_dto.dart index 18fab8dfb3fe2e11d3e880cf070af7ab3314e09b..b20449f7212d9fa62d84dd258c367d18c8a916f4 100644 GIT binary patch delta 408 zcmbOz@ls-gIwNmjX=-U|P-<~OeqM3vWCKQNZ4?oglKfbO%)H`~#JuEGJuU?W6xqpn zjNW`;k>HZVlFZ_g%;d=j8Jp1z^=0~wDze#^nFZbED3((bTGOzXkxa37fD0 delta 361 zcmaDUF;QZJI-|Q+eo}BrVo7PSOG&;$HJ5^dzP`RfP-;nOQC_h^qC#G3xk5CunplO* zyyB9?yyR3ppklbp8%BZ51*f@L}qPlnYD zi5JfruL(B>=spdgjS!FpwW-(&7B(EfW-UGmWjm6WnYWLQ|GWk;^lGd mL*iL-u0!H|;j~5K*>T13BD>CRav_K5b0fF`v0AbT%{VPTwsXv!w9=gdME zQsi2PE_9vC8eK?=dq29+L+m(1q^t#0#Sa5GVlcCd+Y~POj(eM^nC8j8Bmf E0AP=m%m4rY delta 312 zcmZ1}zg=#FI-{Fcev)&3XL2+l^{lW@-F4n9Rj0F~cw)c^nh diff --git a/mobile/openapi/lib/model/all_job_status_response_dto.dart b/mobile/openapi/lib/model/queues_response_dto.dart similarity index 57% rename from mobile/openapi/lib/model/all_job_status_response_dto.dart rename to mobile/openapi/lib/model/queues_response_dto.dart index 291bec4394ee34cf1deb0045560e034bfd2836aa..b20f8c3c09fc49d9efa6c9d2b3ca75db6751e4d0 100644 GIT binary patch delta 996 zcmaKqy-Ncz7>BuP5h{rNxNCbVo+64URw@Xkf`j#A^<&XR-`=I|(e_euInb$6LBuuS zD6US59o+1ugF5v8P#4z@Rt;j~o!gV={Uz@klE%XG!u5pYUw4p$SW+6}1d{qq^xa-ReVa>wS%zfUW+MeEa zZ-sT@Ua%1mz2H$Z4WxtD#xfAP*|EFNLBF4f1^d;M+q=+$Q_fmomR6C4yl%!${3iba zEq}t#alZGABLhicul=X|ztn<@oSuROPK%*&PK~(C-oibs7Md2!V`x?|v2a2#MtDXr zSK-MHBc73tOm)4f5{t8~xquqr@{`Xoi~GQNAk`_Q1v#0? zi6yBnsU@k&C7Jnos5(Gu(h`$#Nl&(Ak*Y^AFefvqD6yzA9@CiI)RM%M#F9kUijty4 z99HFKrWYk*cST-)NoE?zsX#XtBb%OD3ifw?auKp9M6ftDu_!qMi+E;AYH}hr4Y`R$ zCBay9O^#<(6Y?+9j~ z@I8gpknB?w6`4FiNC!iNW%3PPp2=^86wqY!g!R!x>V);sM9vCpqKSx$po=7mXrrlF zD}wH>FCwV6>!Rt%7nMa5Su3i9Ch|el7)``Q%o("/jobs", { ...opts })); @@ -2828,17 +2828,17 @@ export function createJob({ jobCreateDto }: { /** * Run jobs */ -export function sendJobCommand({ id, jobCommandDto }: { - id: JobName; - jobCommandDto: JobCommandDto; +export function runQueueCommandLegacy({ name, queueCommandDto }: { + name: QueueName; + queueCommandDto: QueueCommandDto; }, opts?: Oazapfts.RequestOpts) { return oazapfts.ok(oazapfts.fetchJson<{ status: 200; - data: JobStatusDto; - }>(`/jobs/${encodeURIComponent(id)}`, oazapfts.json({ + data: QueueResponseDto; + }>(`/jobs/${encodeURIComponent(name)}`, oazapfts.json({ ...opts, method: "PUT", - body: jobCommandDto + body: queueCommandDto }))); } /** @@ -5067,7 +5067,7 @@ export enum ManualJobName { MemoryCreate = "memory-create", BackupDatabase = "backup-database" } -export enum JobName { +export enum QueueName { ThumbnailGeneration = "thumbnailGeneration", MetadataExtraction = "metadataExtraction", VideoConversion = "videoConversion", @@ -5085,7 +5085,7 @@ export enum JobName { BackupDatabase = "backupDatabase", Ocr = "ocr" } -export enum JobCommand { +export enum QueueCommand { Start = "start", Pause = "pause", Resume = "resume", diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 807944132..f80a47bb7 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -23,7 +23,7 @@ import { WebsocketRepository } from 'src/repositories/websocket.repository'; import { services } from 'src/services'; import { AuthService } from 'src/services/auth.service'; import { CliService } from 'src/services/cli.service'; -import { JobService } from 'src/services/job.service'; +import { QueueService } from 'src/services/queue.service'; import { getKyselyConfig } from 'src/utils/database'; const common = [...repositories, ...services, GlobalExceptionFilter]; @@ -52,11 +52,11 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { constructor( @Inject(IWorker) private worker: ImmichWorker, logger: LoggingRepository, - private eventRepository: EventRepository, - private websocketRepository: WebsocketRepository, - private jobService: JobService, - private telemetryRepository: TelemetryRepository, private authService: AuthService, + private eventRepository: EventRepository, + private queueService: QueueService, + private telemetryRepository: TelemetryRepository, + private websocketRepository: WebsocketRepository, ) { logger.setAppName(this.worker); } @@ -64,7 +64,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { async onModuleInit() { this.telemetryRepository.setup({ repositories }); - this.jobService.setServices(services); + this.queueService.setServices(services); this.websocketRepository.setAuthFn(async (client) => this.authService.authenticate({ diff --git a/server/src/controllers/job.controller.ts b/server/src/controllers/job.controller.ts index 34c6bdc27..977f1e0f1 100644 --- a/server/src/controllers/job.controller.ts +++ b/server/src/controllers/job.controller.ts @@ -1,15 +1,20 @@ import { Body, Controller, Get, HttpCode, HttpStatus, Param, Post, Put } from '@nestjs/common'; import { ApiTags } from '@nestjs/swagger'; import { Endpoint, HistoryBuilder } from 'src/decorators'; -import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobIdParamDto, JobStatusDto } from 'src/dtos/job.dto'; +import { JobCreateDto } from 'src/dtos/job.dto'; +import { QueueCommandDto, QueueNameParamDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto'; import { ApiTag, Permission } from 'src/enum'; import { Authenticated } from 'src/middleware/auth.guard'; import { JobService } from 'src/services/job.service'; +import { QueueService } from 'src/services/queue.service'; @ApiTags(ApiTag.Jobs) @Controller('jobs') export class JobController { - constructor(private service: JobService) {} + constructor( + private service: JobService, + private queueService: QueueService, + ) {} @Get() @Authenticated({ permission: Permission.JobRead, admin: true }) @@ -18,8 +23,8 @@ export class JobController { description: 'Retrieve the counts of the current queue, as well as the current status.', history: new HistoryBuilder().added('v1').beta('v1').stable('v2'), }) - getAllJobsStatus(): Promise { - return this.service.getAllJobsStatus(); + getQueuesLegacy(): Promise { + return this.queueService.getAll(); } @Post() @@ -35,7 +40,7 @@ export class JobController { return this.service.create(dto); } - @Put(':id') + @Put(':name') @Authenticated({ permission: Permission.JobCreate, admin: true }) @Endpoint({ summary: 'Run jobs', @@ -43,7 +48,7 @@ export class JobController { 'Queue all assets for a specific job type. Defaults to only queueing assets that have not yet been processed, but the force command can be used to re-process all assets.', history: new HistoryBuilder().added('v1').beta('v1').stable('v2'), }) - sendJobCommand(@Param() { id }: JobIdParamDto, @Body() dto: JobCommandDto): Promise { - return this.service.handleCommand(id, dto); + runQueueCommandLegacy(@Param() { name }: QueueNameParamDto, @Body() dto: QueueCommandDto): Promise { + return this.queueService.runCommand(name, dto); } } diff --git a/server/src/dtos/job.dto.ts b/server/src/dtos/job.dto.ts index 5daaeacdd..794af6e5e 100644 --- a/server/src/dtos/job.dto.ts +++ b/server/src/dtos/job.dto.ts @@ -1,99 +1,7 @@ -import { ApiProperty } from '@nestjs/swagger'; -import { JobCommand, ManualJobName, QueueName } from 'src/enum'; -import { ValidateBoolean, ValidateEnum } from 'src/validation'; - -export class JobIdParamDto { - @ValidateEnum({ enum: QueueName, name: 'JobName' }) - id!: QueueName; -} - -export class JobCommandDto { - @ValidateEnum({ enum: JobCommand, name: 'JobCommand' }) - command!: JobCommand; - - @ValidateBoolean({ optional: true }) - force?: boolean; // TODO: this uses undefined as a third state, which should be refactored to be more explicit -} +import { ManualJobName } from 'src/enum'; +import { ValidateEnum } from 'src/validation'; export class JobCreateDto { @ValidateEnum({ enum: ManualJobName, name: 'ManualJobName' }) name!: ManualJobName; } - -export class JobCountsDto { - @ApiProperty({ type: 'integer' }) - active!: number; - @ApiProperty({ type: 'integer' }) - completed!: number; - @ApiProperty({ type: 'integer' }) - failed!: number; - @ApiProperty({ type: 'integer' }) - delayed!: number; - @ApiProperty({ type: 'integer' }) - waiting!: number; - @ApiProperty({ type: 'integer' }) - paused!: number; -} - -export class QueueStatusDto { - isActive!: boolean; - isPaused!: boolean; -} - -export class JobStatusDto { - @ApiProperty({ type: JobCountsDto }) - jobCounts!: JobCountsDto; - - @ApiProperty({ type: QueueStatusDto }) - queueStatus!: QueueStatusDto; -} - -export class AllJobStatusResponseDto implements Record { - @ApiProperty({ type: JobStatusDto }) - [QueueName.ThumbnailGeneration]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.MetadataExtraction]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.VideoConversion]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.SmartSearch]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.StorageTemplateMigration]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Migration]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.BackgroundTask]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Search]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.DuplicateDetection]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.FaceDetection]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.FacialRecognition]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Sidecar]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Library]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Notification]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.BackupDatabase]!: JobStatusDto; - - @ApiProperty({ type: JobStatusDto }) - [QueueName.Ocr]!: JobStatusDto; -} diff --git a/server/src/dtos/queue.dto.ts b/server/src/dtos/queue.dto.ts new file mode 100644 index 000000000..1492e014d --- /dev/null +++ b/server/src/dtos/queue.dto.ts @@ -0,0 +1,94 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { QueueCommand, QueueName } from 'src/enum'; +import { ValidateBoolean, ValidateEnum } from 'src/validation'; + +export class QueueNameParamDto { + @ValidateEnum({ enum: QueueName, name: 'QueueName' }) + name!: QueueName; +} + +export class QueueCommandDto { + @ValidateEnum({ enum: QueueCommand, name: 'QueueCommand' }) + command!: QueueCommand; + + @ValidateBoolean({ optional: true }) + force?: boolean; // TODO: this uses undefined as a third state, which should be refactored to be more explicit +} + +export class QueueStatisticsDto { + @ApiProperty({ type: 'integer' }) + active!: number; + @ApiProperty({ type: 'integer' }) + completed!: number; + @ApiProperty({ type: 'integer' }) + failed!: number; + @ApiProperty({ type: 'integer' }) + delayed!: number; + @ApiProperty({ type: 'integer' }) + waiting!: number; + @ApiProperty({ type: 'integer' }) + paused!: number; +} + +export class QueueStatusDto { + isActive!: boolean; + isPaused!: boolean; +} + +export class QueueResponseDto { + @ApiProperty({ type: QueueStatisticsDto }) + jobCounts!: QueueStatisticsDto; + + @ApiProperty({ type: QueueStatusDto }) + queueStatus!: QueueStatusDto; +} + +export class QueuesResponseDto implements Record { + @ApiProperty({ type: QueueResponseDto }) + [QueueName.ThumbnailGeneration]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.MetadataExtraction]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.VideoConversion]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.SmartSearch]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.StorageTemplateMigration]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Migration]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.BackgroundTask]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Search]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.DuplicateDetection]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.FaceDetection]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.FacialRecognition]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Sidecar]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Library]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Notification]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.BackupDatabase]!: QueueResponseDto; + + @ApiProperty({ type: QueueResponseDto }) + [QueueName.Ocr]!: QueueResponseDto; +} diff --git a/server/src/enum.ts b/server/src/enum.ts index c706c1da7..f3814863b 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -603,7 +603,7 @@ export enum JobName { Ocr = 'Ocr', } -export enum JobCommand { +export enum QueueCommand { Start = 'start', Pause = 'pause', Resume = 'resume', diff --git a/server/src/services/api.service.ts b/server/src/services/api.service.ts index ee9b0e622..0ec2a65f9 100644 --- a/server/src/services/api.service.ts +++ b/server/src/services/api.service.ts @@ -7,7 +7,6 @@ import { ONE_HOUR } from 'src/constants'; import { ConfigRepository } from 'src/repositories/config.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; import { AuthService } from 'src/services/auth.service'; -import { JobService } from 'src/services/job.service'; import { SharedLinkService } from 'src/services/shared-link.service'; import { VersionService } from 'src/services/version.service'; import { OpenGraphTags } from 'src/utils/misc'; @@ -40,7 +39,6 @@ const render = (index: string, meta: OpenGraphTags) => { export class ApiService { constructor( private authService: AuthService, - private jobService: JobService, private sharedLinkService: SharedLinkService, private versionService: VersionService, private configRepository: ConfigRepository, diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 9a8b0fb2b..8862a5b37 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -23,6 +23,7 @@ import { NotificationService } from 'src/services/notification.service'; import { OcrService } from 'src/services/ocr.service'; import { PartnerService } from 'src/services/partner.service'; import { PersonService } from 'src/services/person.service'; +import { QueueService } from 'src/services/queue.service'; import { SearchService } from 'src/services/search.service'; import { ServerService } from 'src/services/server.service'; import { SessionService } from 'src/services/session.service'; @@ -69,6 +70,7 @@ export const services = [ OcrService, PartnerService, PersonService, + QueueService, SearchService, ServerService, SessionService, diff --git a/server/src/services/job.service.spec.ts b/server/src/services/job.service.spec.ts index 7a300ae7a..c23b4f05d 100644 --- a/server/src/services/job.service.spec.ts +++ b/server/src/services/job.service.spec.ts @@ -1,6 +1,4 @@ -import { BadRequestException } from '@nestjs/common'; -import { defaults, SystemConfig } from 'src/config'; -import { ImmichWorker, JobCommand, JobName, JobStatus, QueueName } from 'src/enum'; +import { ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum'; import { JobService } from 'src/services/job.service'; import { JobItem } from 'src/types'; import { assetStub } from 'test/fixtures/asset.stub'; @@ -20,209 +18,6 @@ describe(JobService.name, () => { expect(sut).toBeDefined(); }); - describe('onConfigUpdate', () => { - it('should update concurrency', () => { - sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig }); - - expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5); - expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(9, QueueName.StorageTemplateMigration, 1); - }); - }); - - describe('handleNightlyJobs', () => { - it('should run the scheduled jobs', async () => { - await sut.handleNightlyJobs(); - - expect(mocks.job.queueAll).toHaveBeenCalledWith([ - { name: JobName.AssetDeleteCheck }, - { name: JobName.UserDeleteCheck }, - { name: JobName.PersonCleanup }, - { name: JobName.MemoryCleanup }, - { name: JobName.SessionCleanup }, - { name: JobName.AuditTableCleanup }, - { name: JobName.AuditLogCleanup }, - { name: JobName.MemoryGenerate }, - { name: JobName.UserSyncUsage }, - { name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }, - { name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }, - ]); - }); - }); - - describe('getAllJobStatus', () => { - it('should get all job statuses', async () => { - mocks.job.getJobCounts.mockResolvedValue({ - active: 1, - completed: 1, - failed: 1, - delayed: 1, - waiting: 1, - paused: 1, - }); - mocks.job.getQueueStatus.mockResolvedValue({ - isActive: true, - isPaused: true, - }); - - const expectedJobStatus = { - jobCounts: { - active: 1, - completed: 1, - delayed: 1, - failed: 1, - waiting: 1, - paused: 1, - }, - queueStatus: { - isActive: true, - isPaused: true, - }, - }; - - await expect(sut.getAllJobsStatus()).resolves.toEqual({ - [QueueName.BackgroundTask]: expectedJobStatus, - [QueueName.DuplicateDetection]: expectedJobStatus, - [QueueName.SmartSearch]: expectedJobStatus, - [QueueName.MetadataExtraction]: expectedJobStatus, - [QueueName.Search]: expectedJobStatus, - [QueueName.StorageTemplateMigration]: expectedJobStatus, - [QueueName.Migration]: expectedJobStatus, - [QueueName.ThumbnailGeneration]: expectedJobStatus, - [QueueName.VideoConversion]: expectedJobStatus, - [QueueName.FaceDetection]: expectedJobStatus, - [QueueName.FacialRecognition]: expectedJobStatus, - [QueueName.Sidecar]: expectedJobStatus, - [QueueName.Library]: expectedJobStatus, - [QueueName.Notification]: expectedJobStatus, - [QueueName.BackupDatabase]: expectedJobStatus, - [QueueName.Ocr]: expectedJobStatus, - }); - }); - }); - - describe('handleCommand', () => { - it('should handle a pause command', async () => { - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Pause, force: false }); - - expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction); - }); - - it('should handle a resume command', async () => { - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Resume, force: false }); - - expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction); - }); - - it('should handle an empty command', async () => { - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Empty, force: false }); - - expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction); - }); - - it('should not start a job that is already running', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false }); - - await expect( - sut.handleCommand(QueueName.VideoConversion, { command: JobCommand.Start, force: false }), - ).rejects.toBeInstanceOf(BadRequestException); - - expect(mocks.job.queue).not.toHaveBeenCalled(); - expect(mocks.job.queueAll).not.toHaveBeenCalled(); - }); - - it('should handle a start video conversion command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.VideoConversion, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } }); - }); - - it('should handle a start storage template migration command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.StorageTemplateMigration, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration }); - }); - - it('should handle a start smart search command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.SmartSearch, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } }); - }); - - it('should handle a start metadata extraction command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ - name: JobName.AssetExtractMetadataQueueAll, - data: { force: false }, - }); - }); - - it('should handle a start sidecar command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.Sidecar, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } }); - }); - - it('should handle a start thumbnail generation command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.ThumbnailGeneration, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ - name: JobName.AssetGenerateThumbnailsQueueAll, - data: { force: false }, - }); - }); - - it('should handle a start face detection command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.FaceDetection, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } }); - }); - - it('should handle a start facial recognition command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.FacialRecognition, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } }); - }); - - it('should handle a start backup database command', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await sut.handleCommand(QueueName.BackupDatabase, { command: JobCommand.Start, force: false }); - - expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } }); - }); - - it('should throw a bad request when an invalid queue is used', async () => { - mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); - - await expect( - sut.handleCommand(QueueName.BackgroundTask, { command: JobCommand.Start, force: false }), - ).rejects.toBeInstanceOf(BadRequestException); - - expect(mocks.job.queue).not.toHaveBeenCalled(); - expect(mocks.job.queueAll).not.toHaveBeenCalled(); - }); - }); - describe('onJobRun', () => { it('should process a successful job', async () => { mocks.job.run.mockResolvedValue(JobStatus.Success); diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index c483155b7..b57a20378 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -1,28 +1,12 @@ import { BadRequestException, Injectable } from '@nestjs/common'; -import { ClassConstructor } from 'class-transformer'; -import { SystemConfig } from 'src/config'; import { OnEvent } from 'src/decorators'; import { mapAsset } from 'src/dtos/asset-response.dto'; -import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto'; -import { - AssetType, - AssetVisibility, - BootstrapEventPriority, - CronJob, - DatabaseLock, - ImmichWorker, - JobCommand, - JobName, - JobStatus, - ManualJobName, - QueueCleanType, - QueueName, -} from 'src/enum'; -import { ArgOf, ArgsOf } from 'src/repositories/event.repository'; +import { JobCreateDto } from 'src/dtos/job.dto'; +import { AssetType, AssetVisibility, JobName, JobStatus, ManualJobName } from 'src/enum'; +import { ArgsOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; -import { ConcurrentQueueName, JobItem } from 'src/types'; +import { JobItem } from 'src/types'; import { hexOrBufferToBase64 } from 'src/utils/bytes'; -import { handlePromiseError } from 'src/utils/misc'; const asJobItem = (dto: JobCreateDto): JobItem => { switch (dto.name) { @@ -56,196 +40,12 @@ const asJobItem = (dto: JobCreateDto): JobItem => { } }; -const asNightlyTasksCron = (config: SystemConfig) => { - const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number); - return `${minutes} ${hours} * * *`; -}; - @Injectable() export class JobService extends BaseService { - private services: ClassConstructor[] = []; - private nightlyJobsLock = false; - - @OnEvent({ name: 'ConfigInit' }) - async onConfigInit({ newConfig: config }: ArgOf<'ConfigInit'>) { - if (this.worker === ImmichWorker.Microservices) { - this.updateQueueConcurrency(config); - return; - } - - this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs); - if (this.nightlyJobsLock) { - const cronExpression = asNightlyTasksCron(config); - this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); - this.cronRepository.create({ - name: CronJob.NightlyJobs, - expression: cronExpression, - start: true, - onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger), - }); - } - } - - @OnEvent({ name: 'ConfigUpdate', server: true }) - onConfigUpdate({ newConfig: config }: ArgOf<'ConfigUpdate'>) { - if (this.worker === ImmichWorker.Microservices) { - this.updateQueueConcurrency(config); - return; - } - - if (this.nightlyJobsLock) { - const cronExpression = asNightlyTasksCron(config); - this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); - this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true }); - } - } - - @OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.JobService }) - onBootstrap() { - this.jobRepository.setup(this.services); - if (this.worker === ImmichWorker.Microservices) { - this.jobRepository.startWorkers(); - } - } - - private updateQueueConcurrency(config: SystemConfig) { - this.logger.debug(`Updating queue concurrency settings`); - for (const queueName of Object.values(QueueName)) { - let concurrency = 1; - if (this.isConcurrentQueue(queueName)) { - concurrency = config.job[queueName].concurrency; - } - this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`); - this.jobRepository.setConcurrency(queueName, concurrency); - } - } - - setServices(services: ClassConstructor[]) { - this.services = services; - } - async create(dto: JobCreateDto): Promise { await this.jobRepository.queue(asJobItem(dto)); } - async handleCommand(queueName: QueueName, dto: JobCommandDto): Promise { - this.logger.debug(`Handling command: queue=${queueName},command=${dto.command},force=${dto.force}`); - - switch (dto.command) { - case JobCommand.Start: { - await this.start(queueName, dto); - break; - } - - case JobCommand.Pause: { - await this.jobRepository.pause(queueName); - break; - } - - case JobCommand.Resume: { - await this.jobRepository.resume(queueName); - break; - } - - case JobCommand.Empty: { - await this.jobRepository.empty(queueName); - break; - } - - case JobCommand.ClearFailed: { - const failedJobs = await this.jobRepository.clear(queueName, QueueCleanType.Failed); - this.logger.debug(`Cleared failed jobs: ${failedJobs}`); - break; - } - } - - return this.getJobStatus(queueName); - } - - async getJobStatus(queueName: QueueName): Promise { - const [jobCounts, queueStatus] = await Promise.all([ - this.jobRepository.getJobCounts(queueName), - this.jobRepository.getQueueStatus(queueName), - ]); - - return { jobCounts, queueStatus }; - } - - async getAllJobsStatus(): Promise { - const response = new AllJobStatusResponseDto(); - for (const queueName of Object.values(QueueName)) { - response[queueName] = await this.getJobStatus(queueName); - } - return response; - } - - private async start(name: QueueName, { force }: JobCommandDto): Promise { - const { isActive } = await this.jobRepository.getQueueStatus(name); - if (isActive) { - throw new BadRequestException(`Job is already running`); - } - - await this.eventRepository.emit('QueueStart', { name }); - - switch (name) { - case QueueName.VideoConversion: { - return this.jobRepository.queue({ name: JobName.AssetEncodeVideoQueueAll, data: { force } }); - } - - case QueueName.StorageTemplateMigration: { - return this.jobRepository.queue({ name: JobName.StorageTemplateMigration }); - } - - case QueueName.Migration: { - return this.jobRepository.queue({ name: JobName.FileMigrationQueueAll }); - } - - case QueueName.SmartSearch: { - return this.jobRepository.queue({ name: JobName.SmartSearchQueueAll, data: { force } }); - } - - case QueueName.DuplicateDetection: { - return this.jobRepository.queue({ name: JobName.AssetDetectDuplicatesQueueAll, data: { force } }); - } - - case QueueName.MetadataExtraction: { - return this.jobRepository.queue({ name: JobName.AssetExtractMetadataQueueAll, data: { force } }); - } - - case QueueName.Sidecar: { - return this.jobRepository.queue({ name: JobName.SidecarQueueAll, data: { force } }); - } - - case QueueName.ThumbnailGeneration: { - return this.jobRepository.queue({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force } }); - } - - case QueueName.FaceDetection: { - return this.jobRepository.queue({ name: JobName.AssetDetectFacesQueueAll, data: { force } }); - } - - case QueueName.FacialRecognition: { - return this.jobRepository.queue({ name: JobName.FacialRecognitionQueueAll, data: { force } }); - } - - case QueueName.Library: { - return this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: { force } }); - } - - case QueueName.BackupDatabase: { - return this.jobRepository.queue({ name: JobName.DatabaseBackup, data: { force } }); - } - - case QueueName.Ocr: { - return this.jobRepository.queue({ name: JobName.OcrQueueAll, data: { force } }); - } - - default: { - throw new BadRequestException(`Invalid job name: ${name}`); - } - } - } - @OnEvent({ name: 'JobRun' }) async onJobRun(...[queueName, job]: ArgsOf<'JobRun'>) { try { @@ -262,50 +62,6 @@ export class JobService extends BaseService { } } - private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName { - return ![ - QueueName.FacialRecognition, - QueueName.StorageTemplateMigration, - QueueName.DuplicateDetection, - QueueName.BackupDatabase, - ].includes(name); - } - - async handleNightlyJobs() { - const config = await this.getConfig({ withCache: false }); - const jobs: JobItem[] = []; - - if (config.nightlyTasks.databaseCleanup) { - jobs.push( - { name: JobName.AssetDeleteCheck }, - { name: JobName.UserDeleteCheck }, - { name: JobName.PersonCleanup }, - { name: JobName.MemoryCleanup }, - { name: JobName.SessionCleanup }, - { name: JobName.AuditTableCleanup }, - { name: JobName.AuditLogCleanup }, - ); - } - - if (config.nightlyTasks.generateMemories) { - jobs.push({ name: JobName.MemoryGenerate }); - } - - if (config.nightlyTasks.syncQuotaUsage) { - jobs.push({ name: JobName.UserSyncUsage }); - } - - if (config.nightlyTasks.missingThumbnails) { - jobs.push({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }); - } - - if (config.nightlyTasks.clusterNewFaces) { - jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }); - } - - await this.jobRepository.queueAll(jobs); - } - /** * Queue follow up jobs */ diff --git a/server/src/services/queue.service.spec.ts b/server/src/services/queue.service.spec.ts new file mode 100644 index 000000000..1cc53df64 --- /dev/null +++ b/server/src/services/queue.service.spec.ts @@ -0,0 +1,223 @@ +import { BadRequestException } from '@nestjs/common'; +import { defaults, SystemConfig } from 'src/config'; +import { ImmichWorker, JobName, QueueCommand, QueueName } from 'src/enum'; +import { QueueService } from 'src/services/queue.service'; +import { newTestService, ServiceMocks } from 'test/utils'; + +describe(QueueService.name, () => { + let sut: QueueService; + let mocks: ServiceMocks; + + beforeEach(() => { + ({ sut, mocks } = newTestService(QueueService)); + + mocks.config.getWorker.mockReturnValue(ImmichWorker.Microservices); + }); + + it('should work', () => { + expect(sut).toBeDefined(); + }); + + describe('onConfigUpdate', () => { + it('should update concurrency', () => { + sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig }); + + expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5); + expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(9, QueueName.StorageTemplateMigration, 1); + }); + }); + + describe('handleNightlyJobs', () => { + it('should run the scheduled jobs', async () => { + await sut.handleNightlyJobs(); + + expect(mocks.job.queueAll).toHaveBeenCalledWith([ + { name: JobName.AssetDeleteCheck }, + { name: JobName.UserDeleteCheck }, + { name: JobName.PersonCleanup }, + { name: JobName.MemoryCleanup }, + { name: JobName.SessionCleanup }, + { name: JobName.AuditTableCleanup }, + { name: JobName.AuditLogCleanup }, + { name: JobName.MemoryGenerate }, + { name: JobName.UserSyncUsage }, + { name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }, + { name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }, + ]); + }); + }); + + describe('getAllJobStatus', () => { + it('should get all job statuses', async () => { + mocks.job.getJobCounts.mockResolvedValue({ + active: 1, + completed: 1, + failed: 1, + delayed: 1, + waiting: 1, + paused: 1, + }); + mocks.job.getQueueStatus.mockResolvedValue({ + isActive: true, + isPaused: true, + }); + + const expectedJobStatus = { + jobCounts: { + active: 1, + completed: 1, + delayed: 1, + failed: 1, + waiting: 1, + paused: 1, + }, + queueStatus: { + isActive: true, + isPaused: true, + }, + }; + + await expect(sut.getAll()).resolves.toEqual({ + [QueueName.BackgroundTask]: expectedJobStatus, + [QueueName.DuplicateDetection]: expectedJobStatus, + [QueueName.SmartSearch]: expectedJobStatus, + [QueueName.MetadataExtraction]: expectedJobStatus, + [QueueName.Search]: expectedJobStatus, + [QueueName.StorageTemplateMigration]: expectedJobStatus, + [QueueName.Migration]: expectedJobStatus, + [QueueName.ThumbnailGeneration]: expectedJobStatus, + [QueueName.VideoConversion]: expectedJobStatus, + [QueueName.FaceDetection]: expectedJobStatus, + [QueueName.FacialRecognition]: expectedJobStatus, + [QueueName.Sidecar]: expectedJobStatus, + [QueueName.Library]: expectedJobStatus, + [QueueName.Notification]: expectedJobStatus, + [QueueName.BackupDatabase]: expectedJobStatus, + [QueueName.Ocr]: expectedJobStatus, + }); + }); + }); + + describe('handleCommand', () => { + it('should handle a pause command', async () => { + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false }); + + expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction); + }); + + it('should handle a resume command', async () => { + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false }); + + expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction); + }); + + it('should handle an empty command', async () => { + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false }); + + expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction); + }); + + it('should not start a job that is already running', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false }); + + await expect( + sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }), + ).rejects.toBeInstanceOf(BadRequestException); + + expect(mocks.job.queue).not.toHaveBeenCalled(); + expect(mocks.job.queueAll).not.toHaveBeenCalled(); + }); + + it('should handle a start video conversion command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } }); + }); + + it('should handle a start storage template migration command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration }); + }); + + it('should handle a start smart search command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.SmartSearch, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } }); + }); + + it('should handle a start metadata extraction command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ + name: JobName.AssetExtractMetadataQueueAll, + data: { force: false }, + }); + }); + + it('should handle a start sidecar command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.Sidecar, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } }); + }); + + it('should handle a start thumbnail generation command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ + name: JobName.AssetGenerateThumbnailsQueueAll, + data: { force: false }, + }); + }); + + it('should handle a start face detection command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.FaceDetection, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } }); + }); + + it('should handle a start facial recognition command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } }); + }); + + it('should handle a start backup database command', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await sut.runCommand(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false }); + + expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } }); + }); + + it('should throw a bad request when an invalid queue is used', async () => { + mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false }); + + await expect( + sut.runCommand(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }), + ).rejects.toBeInstanceOf(BadRequestException); + + expect(mocks.job.queue).not.toHaveBeenCalled(); + expect(mocks.job.queueAll).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts new file mode 100644 index 000000000..bea665e8f --- /dev/null +++ b/server/src/services/queue.service.ts @@ -0,0 +1,250 @@ +import { BadRequestException, Injectable } from '@nestjs/common'; +import { ClassConstructor } from 'class-transformer'; +import { SystemConfig } from 'src/config'; +import { OnEvent } from 'src/decorators'; +import { QueueCommandDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto'; +import { + BootstrapEventPriority, + CronJob, + DatabaseLock, + ImmichWorker, + JobName, + QueueCleanType, + QueueCommand, + QueueName, +} from 'src/enum'; +import { ArgOf } from 'src/repositories/event.repository'; +import { BaseService } from 'src/services/base.service'; +import { ConcurrentQueueName, JobItem } from 'src/types'; +import { handlePromiseError } from 'src/utils/misc'; + +const asNightlyTasksCron = (config: SystemConfig) => { + const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number); + return `${minutes} ${hours} * * *`; +}; + +@Injectable() +export class QueueService extends BaseService { + private services: ClassConstructor[] = []; + private nightlyJobsLock = false; + + @OnEvent({ name: 'ConfigInit' }) + async onConfigInit({ newConfig: config }: ArgOf<'ConfigInit'>) { + if (this.worker === ImmichWorker.Microservices) { + this.updateConcurrency(config); + return; + } + + this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs); + if (this.nightlyJobsLock) { + const cronExpression = asNightlyTasksCron(config); + this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); + this.cronRepository.create({ + name: CronJob.NightlyJobs, + expression: cronExpression, + start: true, + onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger), + }); + } + } + + @OnEvent({ name: 'ConfigUpdate', server: true }) + onConfigUpdate({ newConfig: config }: ArgOf<'ConfigUpdate'>) { + if (this.worker === ImmichWorker.Microservices) { + this.updateConcurrency(config); + return; + } + + if (this.nightlyJobsLock) { + const cronExpression = asNightlyTasksCron(config); + this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`); + this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true }); + } + } + + @OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.JobService }) + onBootstrap() { + this.jobRepository.setup(this.services); + if (this.worker === ImmichWorker.Microservices) { + this.jobRepository.startWorkers(); + } + } + + private updateConcurrency(config: SystemConfig) { + this.logger.debug(`Updating queue concurrency settings`); + for (const queueName of Object.values(QueueName)) { + let concurrency = 1; + if (this.isConcurrentQueue(queueName)) { + concurrency = config.job[queueName].concurrency; + } + this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`); + this.jobRepository.setConcurrency(queueName, concurrency); + } + } + + setServices(services: ClassConstructor[]) { + this.services = services; + } + + async runCommand(name: QueueName, dto: QueueCommandDto): Promise { + this.logger.debug(`Handling command: queue=${name},command=${dto.command},force=${dto.force}`); + + switch (dto.command) { + case QueueCommand.Start: { + await this.start(name, dto); + break; + } + + case QueueCommand.Pause: { + await this.jobRepository.pause(name); + break; + } + + case QueueCommand.Resume: { + await this.jobRepository.resume(name); + break; + } + + case QueueCommand.Empty: { + await this.jobRepository.empty(name); + break; + } + + case QueueCommand.ClearFailed: { + const failedJobs = await this.jobRepository.clear(name, QueueCleanType.Failed); + this.logger.debug(`Cleared failed jobs: ${failedJobs}`); + break; + } + } + + return this.getByName(name); + } + + async getAll(): Promise { + const response = new QueuesResponseDto(); + for (const name of Object.values(QueueName)) { + response[name] = await this.getByName(name); + } + return response; + } + + async getByName(name: QueueName): Promise { + const [jobCounts, queueStatus] = await Promise.all([ + this.jobRepository.getJobCounts(name), + this.jobRepository.getQueueStatus(name), + ]); + + return { jobCounts, queueStatus }; + } + + private async start(name: QueueName, { force }: QueueCommandDto): Promise { + const { isActive } = await this.jobRepository.getQueueStatus(name); + if (isActive) { + throw new BadRequestException(`Job is already running`); + } + + await this.eventRepository.emit('QueueStart', { name }); + + switch (name) { + case QueueName.VideoConversion: { + return this.jobRepository.queue({ name: JobName.AssetEncodeVideoQueueAll, data: { force } }); + } + + case QueueName.StorageTemplateMigration: { + return this.jobRepository.queue({ name: JobName.StorageTemplateMigration }); + } + + case QueueName.Migration: { + return this.jobRepository.queue({ name: JobName.FileMigrationQueueAll }); + } + + case QueueName.SmartSearch: { + return this.jobRepository.queue({ name: JobName.SmartSearchQueueAll, data: { force } }); + } + + case QueueName.DuplicateDetection: { + return this.jobRepository.queue({ name: JobName.AssetDetectDuplicatesQueueAll, data: { force } }); + } + + case QueueName.MetadataExtraction: { + return this.jobRepository.queue({ name: JobName.AssetExtractMetadataQueueAll, data: { force } }); + } + + case QueueName.Sidecar: { + return this.jobRepository.queue({ name: JobName.SidecarQueueAll, data: { force } }); + } + + case QueueName.ThumbnailGeneration: { + return this.jobRepository.queue({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force } }); + } + + case QueueName.FaceDetection: { + return this.jobRepository.queue({ name: JobName.AssetDetectFacesQueueAll, data: { force } }); + } + + case QueueName.FacialRecognition: { + return this.jobRepository.queue({ name: JobName.FacialRecognitionQueueAll, data: { force } }); + } + + case QueueName.Library: { + return this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: { force } }); + } + + case QueueName.BackupDatabase: { + return this.jobRepository.queue({ name: JobName.DatabaseBackup, data: { force } }); + } + + case QueueName.Ocr: { + return this.jobRepository.queue({ name: JobName.OcrQueueAll, data: { force } }); + } + + default: { + throw new BadRequestException(`Invalid job name: ${name}`); + } + } + } + + private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName { + return ![ + QueueName.FacialRecognition, + QueueName.StorageTemplateMigration, + QueueName.DuplicateDetection, + QueueName.BackupDatabase, + ].includes(name); + } + + async handleNightlyJobs() { + const config = await this.getConfig({ withCache: false }); + const jobs: JobItem[] = []; + + if (config.nightlyTasks.databaseCleanup) { + jobs.push( + { name: JobName.AssetDeleteCheck }, + { name: JobName.UserDeleteCheck }, + { name: JobName.PersonCleanup }, + { name: JobName.MemoryCleanup }, + { name: JobName.SessionCleanup }, + { name: JobName.AuditTableCleanup }, + { name: JobName.AuditLogCleanup }, + ); + } + + if (config.nightlyTasks.generateMemories) { + jobs.push({ name: JobName.MemoryGenerate }); + } + + if (config.nightlyTasks.syncQuotaUsage) { + jobs.push({ name: JobName.UserSyncUsage }); + } + + if (config.nightlyTasks.missingThumbnails) { + jobs.push({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } }); + } + + if (config.nightlyTasks.clusterNewFaces) { + jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } }); + } + + await this.jobRepository.queueAll(jobs); + } +} diff --git a/web/src/lib/components/admin-settings/JobSettings.svelte b/web/src/lib/components/admin-settings/JobSettings.svelte index fb8a11b33..94b4426db 100644 --- a/web/src/lib/components/admin-settings/JobSettings.svelte +++ b/web/src/lib/components/admin-settings/JobSettings.svelte @@ -4,8 +4,8 @@ import { SettingInputFieldType } from '$lib/constants'; import { featureFlagsManager } from '$lib/managers/feature-flags-manager.svelte'; import { systemConfigManager } from '$lib/managers/system-config-manager.svelte'; - import { getJobName } from '$lib/utils'; - import { JobName, type SystemConfigJobDto } from '@immich/sdk'; + import { getQueueName } from '$lib/utils'; + import { QueueName, type SystemConfigJobDto } from '@immich/sdk'; import { t } from 'svelte-i18n'; import { fade } from 'svelte/transition'; @@ -13,18 +13,18 @@ const config = $derived(systemConfigManager.value); let configToEdit = $state(systemConfigManager.cloneValue()); - const jobNames = [ - JobName.ThumbnailGeneration, - JobName.MetadataExtraction, - JobName.Library, - JobName.Sidecar, - JobName.SmartSearch, - JobName.FaceDetection, - JobName.FacialRecognition, - JobName.VideoConversion, - JobName.StorageTemplateMigration, - JobName.Migration, - JobName.Ocr, + const queueNames = [ + QueueName.ThumbnailGeneration, + QueueName.MetadataExtraction, + QueueName.Library, + QueueName.Sidecar, + QueueName.SmartSearch, + QueueName.FaceDetection, + QueueName.FacialRecognition, + QueueName.VideoConversion, + QueueName.StorageTemplateMigration, + QueueName.Migration, + QueueName.Ocr, ]; function isSystemConfigJobDto(jobName: string): jobName is keyof SystemConfigJobDto { @@ -35,22 +35,22 @@
event.preventDefault()}> - {#each jobNames as jobName (jobName)} + {#each queueNames as queueName (queueName)}
- {#if isSystemConfigJobDto(jobName)} + {#if isSystemConfigJobDto(queueName)} {:else} import Badge from '$lib/elements/Badge.svelte'; import { locale } from '$lib/stores/preferences.store'; - import { JobCommand, type JobCommandDto, type JobCountsDto, type QueueStatusDto } from '@immich/sdk'; + import { QueueCommand, type QueueCommandDto, type QueueStatisticsDto, type QueueStatusDto } from '@immich/sdk'; import { Icon, IconButton } from '@immich/ui'; import { mdiAlertCircle, @@ -22,21 +22,21 @@ title: string; subtitle: string | undefined; description: Component | undefined; - jobCounts: JobCountsDto; + statistics: QueueStatisticsDto; queueStatus: QueueStatusDto; icon: string; disabled?: boolean; allText: string | undefined; refreshText: string | undefined; missingText: string; - onCommand: (command: JobCommandDto) => void; + onCommand: (command: QueueCommandDto) => void; } let { title, subtitle, description, - jobCounts, + statistics, queueStatus, icon, disabled = false, @@ -46,7 +46,7 @@ onCommand, }: Props = $props(); - let waitingCount = $derived(jobCounts.waiting + jobCounts.paused + jobCounts.delayed); + let waitingCount = $derived(statistics.waiting + statistics.paused + statistics.delayed); let isIdle = $derived(!queueStatus.isActive && !queueStatus.isPaused); let multipleButtons = $derived(allText || refreshText); @@ -67,11 +67,11 @@ {title}
- {#if jobCounts.failed > 0} + {#if statistics.failed > 0}
- {$t('admin.jobs_failed', { values: { jobCount: jobCounts.failed.toLocaleString($locale) } })} + {$t('admin.jobs_failed', { values: { jobCount: statistics.failed.toLocaleString($locale) } })} onCommand({ command: JobCommand.ClearFailed, force: false })} + onclick={() => onCommand({ command: QueueCommand.ClearFailed, force: false })} />
{/if} - {#if jobCounts.delayed > 0} + {#if statistics.delayed > 0} - {$t('admin.jobs_delayed', { values: { jobCount: jobCounts.delayed.toLocaleString($locale) } })} + {$t('admin.jobs_delayed', { values: { jobCount: statistics.delayed.toLocaleString($locale) } })} {/if} @@ -111,7 +111,7 @@ >

{$t('active')}

- {jobCounts.active.toLocaleString($locale)} + {statistics.active.toLocaleString($locale)}

@@ -131,7 +131,7 @@ onCommand({ command: JobCommand.Start, force: false })} + onClick={() => onCommand({ command: QueueCommand.Start, force: false })} > {$t('disabled')} @@ -140,20 +140,20 @@ {#if !disabled && !isIdle} {#if waitingCount > 0} - onCommand({ command: JobCommand.Empty, force: false })}> + onCommand({ command: QueueCommand.Empty, force: false })}> {$t('clear')} {/if} {#if queueStatus.isPaused} {@const size = waitingCount > 0 ? '24' : '48'} - onCommand({ command: JobCommand.Resume, force: false })}> + onCommand({ command: QueueCommand.Resume, force: false })}> {$t('resume')} {:else} - onCommand({ command: JobCommand.Pause, force: false })}> + onCommand({ command: QueueCommand.Pause, force: false })}> {$t('pause')} @@ -162,25 +162,25 @@ {#if !disabled && multipleButtons && isIdle} {#if allText} - onCommand({ command: JobCommand.Start, force: true })}> + onCommand({ command: QueueCommand.Start, force: true })}> {allText} {/if} {#if refreshText} - onCommand({ command: JobCommand.Start, force: undefined })}> + onCommand({ command: QueueCommand.Start, force: undefined })}> {refreshText} {/if} - onCommand({ command: JobCommand.Start, force: false })}> + onCommand({ command: QueueCommand.Start, force: false })}> {missingText} {/if} {#if !disabled && !multipleButtons && isIdle} - onCommand({ command: JobCommand.Start, force: false })}> + onCommand({ command: QueueCommand.Start, force: false })}> {missingText} diff --git a/web/src/lib/components/jobs/JobsPanel.svelte b/web/src/lib/components/jobs/JobsPanel.svelte index e204c7664..99dfef3b5 100644 --- a/web/src/lib/components/jobs/JobsPanel.svelte +++ b/web/src/lib/components/jobs/JobsPanel.svelte @@ -1,8 +1,14 @@
{#each jobList as [jobName, { title, subtitle, description, disabled, allText, refreshText, missingText, icon, handleCommand: handleCommandOverride }] (jobName)} - {@const { jobCounts, queueStatus } = jobs[jobName]} + {@const { jobCounts: statistics, queueStatus } = jobs[jobName]} (handleCommandOverride || handleCommand)(jobName, command)} /> diff --git a/web/src/lib/utils.ts b/web/src/lib/utils.ts index 6e0a21647..87f0d7e7b 100644 --- a/web/src/lib/utils.ts +++ b/web/src/lib/utils.ts @@ -5,8 +5,8 @@ import { handleError } from '$lib/utils/handle-error'; import { AssetJobName, AssetMediaSize, - JobName, MemoryType, + QueueName, finishOAuth, getAssetOriginalPath, getAssetPlaybackPath, @@ -143,28 +143,28 @@ export const downloadRequest = (options: DownloadRequestOptions }); }; -export const getJobName = derived(t, ($t) => { - return (jobName: JobName) => { - const names: Record = { - [JobName.ThumbnailGeneration]: $t('admin.thumbnail_generation_job'), - [JobName.MetadataExtraction]: $t('admin.metadata_extraction_job'), - [JobName.Sidecar]: $t('admin.sidecar_job'), - [JobName.SmartSearch]: $t('admin.machine_learning_smart_search'), - [JobName.DuplicateDetection]: $t('admin.machine_learning_duplicate_detection'), - [JobName.FaceDetection]: $t('admin.face_detection'), - [JobName.FacialRecognition]: $t('admin.machine_learning_facial_recognition'), - [JobName.VideoConversion]: $t('admin.video_conversion_job'), - [JobName.StorageTemplateMigration]: $t('admin.storage_template_migration'), - [JobName.Migration]: $t('admin.migration_job'), - [JobName.BackgroundTask]: $t('admin.background_task_job'), - [JobName.Search]: $t('search'), - [JobName.Library]: $t('external_libraries'), - [JobName.Notifications]: $t('notifications'), - [JobName.BackupDatabase]: $t('admin.backup_database'), - [JobName.Ocr]: $t('admin.machine_learning_ocr'), +export const getQueueName = derived(t, ($t) => { + return (name: QueueName) => { + const names: Record = { + [QueueName.ThumbnailGeneration]: $t('admin.thumbnail_generation_job'), + [QueueName.MetadataExtraction]: $t('admin.metadata_extraction_job'), + [QueueName.Sidecar]: $t('admin.sidecar_job'), + [QueueName.SmartSearch]: $t('admin.machine_learning_smart_search'), + [QueueName.DuplicateDetection]: $t('admin.machine_learning_duplicate_detection'), + [QueueName.FaceDetection]: $t('admin.face_detection'), + [QueueName.FacialRecognition]: $t('admin.machine_learning_facial_recognition'), + [QueueName.VideoConversion]: $t('admin.video_conversion_job'), + [QueueName.StorageTemplateMigration]: $t('admin.storage_template_migration'), + [QueueName.Migration]: $t('admin.migration_job'), + [QueueName.BackgroundTask]: $t('admin.background_task_job'), + [QueueName.Search]: $t('search'), + [QueueName.Library]: $t('external_libraries'), + [QueueName.Notifications]: $t('notifications'), + [QueueName.BackupDatabase]: $t('admin.backup_database'), + [QueueName.Ocr]: $t('admin.machine_learning_ocr'), }; - return names[jobName]; + return names[name]; }; }); diff --git a/web/src/routes/admin/jobs-status/+page.svelte b/web/src/routes/admin/jobs-status/+page.svelte index 1204ff901..84586a8af 100644 --- a/web/src/routes/admin/jobs-status/+page.svelte +++ b/web/src/routes/admin/jobs-status/+page.svelte @@ -5,13 +5,7 @@ import JobCreateModal from '$lib/modals/JobCreateModal.svelte'; import { asyncTimeout } from '$lib/utils'; import { handleError } from '$lib/utils/handle-error'; - import { - getAllJobsStatus, - JobCommand, - sendJobCommand, - type AllJobStatusResponseDto, - type JobName, - } from '@immich/sdk'; + import { getQueuesLegacy, QueueCommand, QueueName, runQueueCommandLegacy, type QueuesResponseDto } from '@immich/sdk'; import { Button, HStack, modalManager, Text } from '@immich/ui'; import { mdiCog, mdiPlay, mdiPlus } from '@mdi/js'; import { onDestroy, onMount } from 'svelte'; @@ -24,23 +18,23 @@ let { data }: Props = $props(); - let jobs: AllJobStatusResponseDto | undefined = $state(); + let jobs: QueuesResponseDto | undefined = $state(); let running = true; const pausedJobs = $derived( Object.entries(jobs ?? {}) - .filter(([_, jobStatus]) => jobStatus.queueStatus?.isPaused) - .map(([jobName]) => jobName as JobName), + .filter(([_, queue]) => queue.queueStatus?.isPaused) + .map(([name]) => name as QueueName), ); const handleResumePausedJobs = async () => { try { - for (const jobName of pausedJobs) { - await sendJobCommand({ id: jobName, jobCommandDto: { command: JobCommand.Resume, force: false } }); + for (const name of pausedJobs) { + await runQueueCommandLegacy({ name, queueCommandDto: { command: QueueCommand.Resume, force: false } }); } // Refresh jobs status immediately after resuming - jobs = await getAllJobsStatus(); + jobs = await getQueuesLegacy(); } catch (error) { handleError(error, $t('admin.failed_job_command', { values: { command: 'resume', job: 'paused jobs' } })); } @@ -48,7 +42,7 @@ onMount(async () => { while (running) { - jobs = await getAllJobsStatus(); + jobs = await getQueuesLegacy(); await asyncTimeout(5000); } }); diff --git a/web/src/routes/admin/jobs-status/+page.ts b/web/src/routes/admin/jobs-status/+page.ts index 0d4ec8b41..90057ff96 100644 --- a/web/src/routes/admin/jobs-status/+page.ts +++ b/web/src/routes/admin/jobs-status/+page.ts @@ -1,12 +1,12 @@ import { authenticate } from '$lib/utils/auth'; import { getFormatter } from '$lib/utils/i18n'; -import { getAllJobsStatus } from '@immich/sdk'; +import { getQueuesLegacy } from '@immich/sdk'; import type { PageLoad } from './$types'; export const load = (async ({ url }) => { await authenticate(url, { admin: true }); - const jobs = await getAllJobsStatus(); + const jobs = await getQueuesLegacy(); const $t = await getFormatter(); return { diff --git a/web/src/routes/admin/library-management/+page.svelte b/web/src/routes/admin/library-management/+page.svelte index 039afc97b..600b6ff04 100644 --- a/web/src/routes/admin/library-management/+page.svelte +++ b/web/src/routes/admin/library-management/+page.svelte @@ -17,10 +17,10 @@ getAllLibraries, getLibraryStatistics, getUserAdmin, - JobCommand, - JobName, + QueueCommand, + QueueName, + runQueueCommandLegacy, scanLibrary, - sendJobCommand, updateLibrary, type LibraryResponseDto, type LibraryStatsResponseDto, @@ -151,7 +151,7 @@ const handleScanAll = async () => { try { - await sendJobCommand({ id: JobName.Library, jobCommandDto: { command: JobCommand.Start } }); + await runQueueCommandLegacy({ name: QueueName.Library, queueCommandDto: { command: QueueCommand.Start } }); toastManager.info($t('admin.refreshing_all_libraries')); } catch (error) {