From df581cc0d53aae169e0ace0dededeff16f7d9b4c Mon Sep 17 00:00:00 2001 From: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com> Date: Fri, 11 Jul 2025 20:19:53 +0200 Subject: [PATCH] feat: UserMetadata sync (#19882) * feat: UserMetadata sync * refactor: sync table filters (#19887) --- mobile/openapi/README.md | Bin 38412 -> 38528 bytes mobile/openapi/lib/api.dart | Bin 13767 -> 13856 bytes mobile/openapi/lib/api_client.dart | Bin 34539 -> 34727 bytes .../openapi/lib/model/sync_entity_type.dart | Bin 8424 -> 8772 bytes .../openapi/lib/model/sync_request_type.dart | Bin 4852 -> 5013 bytes .../model/sync_user_metadata_delete_v1.dart | Bin 0 -> 3171 bytes .../lib/model/sync_user_metadata_v1.dart | Bin 0 -> 3268 bytes open-api/immich-openapi-specs.json | 39 +++++- open-api/typescript-sdk/src/fetch-client.ts | 5 +- server/src/dtos/sync.dto.ts | 17 +++ server/src/enum.ts | 4 + server/src/queries/sync.repository.sql | 27 ++++ server/src/repositories/sync.repository.ts | 115 +++++++++------- server/src/schema/functions.ts | 13 ++ server/src/schema/index.ts | 4 + .../1752250924342-UserMetadataSync.ts | 56 ++++++++ .../tables/user-metadata-audit.table.ts | 17 +++ .../src/schema/tables/user-metadata.table.ts | 26 +++- server/src/services/sync.service.ts | 18 +++ .../specs/sync/sync-user-metadata.spec.ts | 123 ++++++++++++++++++ 20 files changed, 414 insertions(+), 50 deletions(-) create mode 100644 mobile/openapi/lib/model/sync_user_metadata_delete_v1.dart create mode 100644 mobile/openapi/lib/model/sync_user_metadata_v1.dart create mode 100644 server/src/schema/migrations/1752250924342-UserMetadataSync.ts create mode 100644 server/src/schema/tables/user-metadata-audit.table.ts create mode 100644 server/test/medium/specs/sync/sync-user-metadata.spec.ts diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index 5b90e4a633cd25377129363ce4b669266c710557..5085052c05f1b9d4f750eb8f13fd54399102debc 100644 GIT binary patch delta 98 zcmeBK!_=^rX~XRn4&T(0#FWI6#K{Yrh227nQ;T2%E~z=GC8=SCdbuf@Tnf4h(ZQ8@ Y$;b-A3}ZD?@{{%T(IqyYXmJq)072g*^Z)<= delta 14 VcmZoz%ha=mX~XT7&Hk;1f&elP20{P; diff --git a/mobile/openapi/lib/api.dart b/mobile/openapi/lib/api.dart index 4de0614cf65bacd9e19b95715687603b777bc7d2..f30481ecce92042244a80ce960a6a214b325a850 100644 GIT binary patch delta 38 lcmX?}y&z{pg*r!WYDr>BVoBoU0Cf!nXQPVD=3?~_J^(Hx4ut>! delta 20 ccmZ3Gb3A)Ph5F=8DiV_gw0SmvRoCVN0AF$k@c;k- diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index 26113a11153defd200e1d30ea2e9d5623ebb3b44..f1cf05f1109d32dcd3984fef6b9b3c3db070f29f 100644 GIT binary patch delta 78 zcmaFe%e1_oX+vKUhi__0VoG93;^e>43Ou33sYP&sf@mF7frSZDr~((`)HZh|C8_`b D+Zi7( delta 14 VcmZ49&-A*NX+vMqX0PN}6#z1n1~dQw diff --git a/mobile/openapi/lib/model/sync_entity_type.dart b/mobile/openapi/lib/model/sync_entity_type.dart index 1d09a6dbe02f15ec636eeaf2e3b3fd64e5e21b25..61f94401c7c95ec29eac41258914fec802090361 100644 GIT binary patch delta 183 zcmaFic*JGHOKzUh;?yGF)RM%M#FE6x8<`|{LXiaonMF}mQUsU?Xii6x1X4>C#egdz!Sc4eEv$fuxytZ4Ifc0R7j VZw1&l2*qF1kb^r%+3HZopn6E9UJFQCjzE7MqJawBsW*3W8FXyZQR!k;kup zjgz%7w%5aS4)hXqA#+iv2tSv-UXlrG;o~k(K3QpQq!X!dVvHMf^7AXW?*R-Dh!4`0 zCbu9Ums^;3*)KqGxrvO>vCAr;r~ow_;MU4U|H@r>Vj1B!*D1Z(pJTZg)ESkp0L|Nv z-i%k?=x@>(gbR)Dw9aV5MmTL?I1HwDUu1$XxV1FrFdTu0H4#}rRfP4M@8A7DP(J|Y z#S1Q&gHipMfa-#ybzd31>ZnoW4p!2^TG;iUI+a)zhi&9Xs>@{}trccRszD*FrB#;A z0?ZiIzUT0ZU-Y>S>kp8rV07C52^fri7h`iwGJK9-sCXu1_}u?4`W*R26uMJO$s|7L>E_O=P9Curzv8#hmjJ>VAm^eQJ<7g0To8FEf-L>1?Ykw?m82 z!pJ3dC^xq$6BMV7NwzWex>HGvV_~gRBG@l8CsT?~UaG@bQ z>*Dws_7}A!?!?=Y_Hi$DD1ajdIb4(o>mG0!_)hgbPIIHP{D3o(A<}&T8&P~Fgs02d zu8>vSQ9zmr!As)&%ufmO@ti;ER!QaHS#5OB$t$rCU5!{O(BCWCT*qh}L58odPzguq z`&#dL@T`p9in|Ou8kQonGThf#dL)gsW#+gWx{}Nn2T~|BaYAL`jA4C8%HC=>*%WS9 z$3~uT)&*4xy~UBtrib@6lT0C|@7iKSI_KTtMk>$<_k%*thOF~;XqxM^tw^M)*HO#U zLryJ8agGS6yqu=(1vPXRQ2*m7+{zU@?U~cHgAqyUBu9109wxM68u)ApI{_KhQ;HC% z>A*UE$OhXDJzHJvg5#;WYg|BcEr{`C#tR5tH2kPPVis4;uI4tPCnasYD?-=r?CBhH zz=-Wzj6K3N0yp=L*uXTd+23pSKygj=5#i|CG#qXpA8C3rTH2q|TjkccM14jVmoAcD zdUN!y1~i^L=teg=ii%P@p%_wGcW<+9$mFDULU>w#WAe7D9!dE#+IpgFo+80&61Qf@ zqYA_Q9h2Y1P(&0qP_~2?Htl_A^srIii7A08TcR1jY%LpoH1|oAqQ!fIK>ACR&Yc82 wZ!M;!b`kLIKN;OKEPNU9=T5&mP54H7*~5YKAHw7Ub^rhX literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/sync_user_metadata_v1.dart b/mobile/openapi/lib/model/sync_user_metadata_v1.dart new file mode 100644 index 0000000000000000000000000000000000000000..0b060dc17c5467b7462f5e9a712f70bcecabe938 GIT binary patch literal 3268 zcmbVOZExE)5dQ98aVdgY!BlzOry`lX7E3auYhs{v3k-%KFcNLElSP%JY8a{i`|ggC zELZB1?L!=iyf;42b0>~QgV6}ye4I^R{Bd=6_5RK5>JqMR-d%-oIfc9F3~r~FH`jli zpcz@dPK9yfU!s?<2K1^{xi&IhY7;M2CeNYFODoe@W^yBQ7uN4;Q)uIQJw&z@I~y0J zi;etJDueF1*x+v=4E}f0SPX7#zk8x{>qPEim107*RMN1!?M;@cT-rF(`3lY4TIBJo zpW|dLj2#RxoddlDUC3M%D#E{u!63lN|Rd< zkjpL1yOs-3Tw)?4bm{H~6bYa{Jh-*8(cf|x_ADc~<|ggi`6{wf(#HM_tHwOesC)$& zr~T^vd1ZS4StbtHj~8C+uhJNV5xwuV&Ikx2oHj5X2bHyUYiZ74JOK}PBC>$$4WZik z_RT+m>ILojb3POs(-|=|l?X@czDU}to{o08VA|}jT;&c{(!pBT_3t{BSSzPY{? zWg@K=W=9%B*{r2imdygJCaQhU;3vQ6>*%g?35Vqlj1>t^`-gy$8+Iiy$E@R1{K1A{ zhR3J=f9Ov+KIKe{A!oQ1#jieK2bS9JL5YA(uRWkf13oesFf5T11<@C@6Qu)^xS6?G8SUk3p=qLc$g ziLkCiZr}$%3;1rffOC-vGJg&mQGBBLF6*PM;+VH$(QX4T<)Vxy!M`u(|Aj6GoRLT)Y{&WvK>Q+W?#eIg; z0+u4PGThg=d(@4zW#+h*y5Z-G19cRXn4?l~seBOY

}n<>dQ;NY=OIM>nV!NZhl<#;#m^(mB3N_p zh)qq?oc+Bv4}{jl9?^@gO*7%a@s;K$r-kgEo6L62jBE7Sp!X^TTBSM?+7P$5W(M&EnQXcvMMvvSX^dd_+W5 z17Ax}VbipS_6-}A_4Ga_Yl&k7v$bq=(A-*4au%--0?95>BKH#T%W5$#HH3h-AEtM7 p{jD^>pWEij`zQG}*ObWv1HyL_|IHazUy|yLWj*A(?!^FS&Oe{%7PSBX literal 0 HcmV?d00001 diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 2e6c70ada..18204c21f 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -13902,6 +13902,8 @@ "StackDeleteV1", "PersonV1", "PersonDeleteV1", + "UserMetadataV1", + "UserMetadataDeleteV1", "SyncAckV1", "SyncResetV1" ], @@ -14137,7 +14139,8 @@ "PartnerStacksV1", "StacksV1", "UsersV1", - "PeopleV1" + "PeopleV1", + "UserMetadataV1" ], "type": "string" }, @@ -14213,6 +14216,40 @@ ], "type": "object" }, + "SyncUserMetadataDeleteV1": { + "properties": { + "key": { + "type": "string" + }, + "userId": { + "type": "string" + } + }, + "required": [ + "key", + "userId" + ], + "type": "object" + }, + "SyncUserMetadataV1": { + "properties": { + "key": { + "type": "string" + }, + "userId": { + "type": "string" + }, + "value": { + "type": "object" + } + }, + "required": [ + "key", + "userId", + "value" + ], + "type": "object" + }, "SyncUserV1": { "properties": { "deletedAt": { diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index f7fc9fe61..88dee9bf0 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -4116,6 +4116,8 @@ export enum SyncEntityType { StackDeleteV1 = "StackDeleteV1", PersonV1 = "PersonV1", PersonDeleteV1 = "PersonDeleteV1", + UserMetadataV1 = "UserMetadataV1", + UserMetadataDeleteV1 = "UserMetadataDeleteV1", SyncAckV1 = "SyncAckV1", SyncResetV1 = "SyncResetV1" } @@ -4135,7 +4137,8 @@ export enum SyncRequestType { PartnerStacksV1 = "PartnerStacksV1", StacksV1 = "StacksV1", UsersV1 = "UsersV1", - PeopleV1 = "PeopleV1" + PeopleV1 = "PeopleV1", + UserMetadataV1 = "UserMetadataV1" } export enum TranscodeHWAccel { Nvenc = "nvenc", diff --git a/server/src/dtos/sync.dto.ts b/server/src/dtos/sync.dto.ts index ff5df03ea..abd7d7b69 100644 --- a/server/src/dtos/sync.dto.ts +++ b/server/src/dtos/sync.dto.ts @@ -10,7 +10,9 @@ import { MemoryType, SyncEntityType, SyncRequestType, + UserMetadataKey, } from 'src/enum'; +import { UserMetadata } from 'src/types'; import { Optional, ValidateBoolean, ValidateDate, ValidateUUID } from 'src/validation'; export class AssetFullSyncDto { @@ -253,6 +255,19 @@ export class SyncPersonDeleteV1 { personId!: string; } +@ExtraModel() +export class SyncUserMetadataV1 { + userId!: string; + key!: string; + value!: UserMetadata[UserMetadataKey]; +} + +@ExtraModel() +export class SyncUserMetadataDeleteV1 { + userId!: string; + key!: string; +} + @ExtraModel() export class SyncAckV1 {} @@ -295,6 +310,8 @@ export type SyncItem = { [SyncEntityType.PartnerStackV1]: SyncStackV1; [SyncEntityType.PersonV1]: SyncPersonV1; [SyncEntityType.PersonDeleteV1]: SyncPersonDeleteV1; + [SyncEntityType.UserMetadataV1]: SyncUserMetadataV1; + [SyncEntityType.UserMetadataDeleteV1]: SyncUserMetadataDeleteV1; [SyncEntityType.SyncAckV1]: SyncAckV1; [SyncEntityType.SyncResetV1]: SyncResetV1; }; diff --git a/server/src/enum.ts b/server/src/enum.ts index 6d960e1fb..dca0f0955 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -589,6 +589,7 @@ export enum SyncRequestType { StacksV1 = 'StacksV1', UsersV1 = 'UsersV1', PeopleV1 = 'PeopleV1', + UserMetadataV1 = 'UserMetadataV1', } export enum SyncEntityType { @@ -639,6 +640,9 @@ export enum SyncEntityType { PersonV1 = 'PersonV1', PersonDeleteV1 = 'PersonDeleteV1', + UserMetadataV1 = 'UserMetadataV1', + UserMetadataDeleteV1 = 'UserMetadataDeleteV1', + SyncAckV1 = 'SyncAckV1', SyncResetV1 = 'SyncResetV1', } diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index f68be8c83..ac3632ad4 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -835,3 +835,30 @@ where "updatedAt" < now() - interval '1 millisecond' order by "updateId" asc + +-- SyncRepository.userMetadata.getDeletes +select + "id", + "userId", + "key" +from + "user_metadata_audit" +where + "userId" = $1 + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.userMetadata.getUpserts +select + "userId", + "key", + "value", + "updateId" +from + "user_metadata" +where + "userId" = $1 + and "updatedAt" < now() - interval '1 millisecond' +order by + "updateId" asc diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index 3bc09c97e..f9d65f724 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -16,7 +16,8 @@ type AuditTables = | 'memories_audit' | 'memory_assets_audit' | 'stacks_audit' - | 'person_audit'; + | 'person_audit' + | 'user_metadata_audit'; type UpsertTables = | 'users' | 'partners' @@ -27,7 +28,8 @@ type UpsertTables = | 'memories' | 'memories_assets_assets' | 'asset_stack' - | 'person'; + | 'person' + | 'user_metadata'; @Injectable() export class SyncRepository { @@ -47,6 +49,7 @@ export class SyncRepository { people: PersonSync; stack: StackSync; user: UserSync; + userMetadata: UserMetadataSync; constructor(@InjectKysely() private db: Kysely) { this.album = new AlbumSync(this.db); @@ -65,32 +68,31 @@ export class SyncRepository { this.people = new PersonSync(this.db); this.stack = new StackSync(this.db); this.user = new UserSync(this.db); + this.userMetadata = new UserMetadataSync(this.db); } } class BaseSync { constructor(protected db: Kysely) {} - protected auditTableFilters, D>( - qb: SelectQueryBuilder, - ack?: SyncAck, - ) { - const builder = qb as SelectQueryBuilder; - return builder - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .orderBy('id', 'asc') as SelectQueryBuilder; + protected auditTableFilters(ack?: SyncAck) { + return , D>(qb: SelectQueryBuilder) => { + const builder = qb as SelectQueryBuilder; + return builder + .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) + .orderBy('id', 'asc') as SelectQueryBuilder; + }; } - protected upsertTableFilters, D>( - qb: SelectQueryBuilder, - ack?: SyncAck, - ) { - const builder = qb as SelectQueryBuilder; - return builder - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .orderBy('updateId', 'asc') as SelectQueryBuilder; + protected upsertTableFilters(ack?: SyncAck) { + return , D>(qb: SelectQueryBuilder) => { + const builder = qb as SelectQueryBuilder; + return builder + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) + .orderBy('updateId', 'asc') as SelectQueryBuilder; + }; } } @@ -113,7 +115,7 @@ class AlbumSync extends BaseSync { .selectFrom('albums_audit') .select(['id', 'albumId']) .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -246,7 +248,7 @@ class AlbumToAssetSync extends BaseSync { ), ), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -303,7 +305,7 @@ class AlbumUserSync extends BaseSync { ), ), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -345,7 +347,7 @@ class AssetSync extends BaseSync { .selectFrom('assets_audit') .select(['id', 'assetId']) .where('ownerId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -356,7 +358,7 @@ class AssetSync extends BaseSync { .select(columns.syncAsset) .select('assets.updateId') .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -368,7 +370,7 @@ class PersonSync extends BaseSync { .selectFrom('person_audit') .select(['id', 'personId']) .where('ownerId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -391,7 +393,7 @@ class PersonSync extends BaseSync { 'faceAssetId', ]) .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -404,7 +406,7 @@ class AssetExifSync extends BaseSync { .select(columns.syncAssetExif) .select('exif.updateId') .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -416,7 +418,7 @@ class MemorySync extends BaseSync { .selectFrom('memories_audit') .select(['id', 'memoryId']) .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -440,7 +442,7 @@ class MemorySync extends BaseSync { ]) .select('updateId') .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -452,7 +454,7 @@ class MemoryToAssetSync extends BaseSync { .selectFrom('memory_assets_audit') .select(['id', 'memoryId', 'assetId']) .where('memoryId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -463,7 +465,7 @@ class MemoryToAssetSync extends BaseSync { .select(['memoriesId as memoryId', 'assetsId as assetId']) .select('updateId') .where('memoriesId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -487,7 +489,7 @@ class PartnerSync extends BaseSync { .selectFrom('partners_audit') .select(['id', 'sharedById', 'sharedWithId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -497,7 +499,7 @@ class PartnerSync extends BaseSync { .selectFrom('partners') .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -525,7 +527,7 @@ class PartnerAssetsSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -538,7 +540,7 @@ class PartnerAssetsSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -573,7 +575,7 @@ class PartnerAssetExifsSync extends BaseSync { eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ), ) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -585,7 +587,7 @@ class StackSync extends BaseSync { .selectFrom('stacks_audit') .select(['id', 'stackId']) .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -596,7 +598,7 @@ class StackSync extends BaseSync { .select(columns.syncStack) .select('updateId') .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -610,7 +612,7 @@ class PartnerStackSync extends BaseSync { .where('userId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -637,18 +639,15 @@ class PartnerStackSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } + class UserSync extends BaseSync { @GenerateSql({ params: [], stream: true }) getDeletes(ack?: SyncAck) { - return this.db - .selectFrom('users_audit') - .select(['id', 'userId']) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); + return this.db.selectFrom('users_audit').select(['id', 'userId']).$call(this.auditTableFilters(ack)).stream(); } @GenerateSql({ params: [], stream: true }) @@ -656,7 +655,29 @@ class UserSync extends BaseSync { return this.db .selectFrom('users') .select(['id', 'name', 'email', 'deletedAt', 'updateId']) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) + .stream(); + } +} + +class UserMetadataSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('user_metadata_audit') + .select(['id', 'userId', 'key']) + .where('userId', '=', userId) + .$call(this.auditTableFilters(ack)) + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('user_metadata') + .select(['userId', 'key', 'value', 'updateId']) + .where('userId', '=', userId) + .$call(this.upsertTableFilters(ack)) .stream(); } } diff --git a/server/src/schema/functions.ts b/server/src/schema/functions.ts index 424ceb0d3..335201a6c 100644 --- a/server/src/schema/functions.ts +++ b/server/src/schema/functions.ts @@ -216,3 +216,16 @@ export const person_delete_audit = registerFunction({ RETURN NULL; END`, }); + +export const user_metadata_audit = registerFunction({ + name: 'user_metadata_audit', + returnType: 'TRIGGER', + language: 'PLPGSQL', + body: ` + BEGIN + INSERT INTO user_metadata_audit ("userId", "key") + SELECT "userId", "key" + FROM OLD; + RETURN NULL; + END`, +}); diff --git a/server/src/schema/index.ts b/server/src/schema/index.ts index f564e8c7f..384e47df7 100644 --- a/server/src/schema/index.ts +++ b/server/src/schema/index.ts @@ -57,6 +57,7 @@ import { TagAssetTable } from 'src/schema/tables/tag-asset.table'; import { TagClosureTable } from 'src/schema/tables/tag-closure.table'; import { TagTable } from 'src/schema/tables/tag.table'; import { UserAuditTable } from 'src/schema/tables/user-audit.table'; +import { UserMetadataAuditTable } from 'src/schema/tables/user-metadata-audit.table'; import { UserMetadataTable } from 'src/schema/tables/user-metadata.table'; import { UserTable } from 'src/schema/tables/user.table'; import { VersionHistoryTable } from 'src/schema/tables/version-history.table'; @@ -108,6 +109,7 @@ export class ImmichDatabase { TagClosureTable, UserAuditTable, UserMetadataTable, + UserMetadataAuditTable, UserTable, VersionHistoryTable, ]; @@ -128,6 +130,7 @@ export class ImmichDatabase { memory_assets_delete_audit, stacks_delete_audit, person_delete_audit, + users_delete_audit, ]; enum = [assets_status_enum, asset_face_source_type, asset_visibility_enum]; @@ -182,6 +185,7 @@ export interface DB { tags: TagTable; tags_closure: TagClosureTable; user_metadata: UserMetadataTable; + user_metadata_audit: UserMetadataAuditTable; users: UserTable; users_audit: UserAuditTable; version_history: VersionHistoryTable; diff --git a/server/src/schema/migrations/1752250924342-UserMetadataSync.ts b/server/src/schema/migrations/1752250924342-UserMetadataSync.ts new file mode 100644 index 000000000..20778d801 --- /dev/null +++ b/server/src/schema/migrations/1752250924342-UserMetadataSync.ts @@ -0,0 +1,56 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql`CREATE OR REPLACE FUNCTION user_metadata_audit() + RETURNS TRIGGER + LANGUAGE PLPGSQL + AS $$ + BEGIN + INSERT INTO user_metadata_audit ("userId", "key") + SELECT "userId", "key" + FROM OLD; + RETURN NULL; + END + $$;`.execute(db); + await sql`CREATE TABLE "user_metadata_audit" ( + "id" uuid NOT NULL DEFAULT immich_uuid_v7(), + "userId" uuid NOT NULL, + "key" character varying NOT NULL, + "deletedAt" timestamp with time zone NOT NULL DEFAULT clock_timestamp(), + CONSTRAINT "PK_15d5cc4d65ac966233b9921acac" PRIMARY KEY ("id") +);`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_audit_user_id" ON "user_metadata_audit" ("userId");`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_audit_key" ON "user_metadata_audit" ("key");`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_audit_deleted_at" ON "user_metadata_audit" ("deletedAt");`.execute(db); + await sql`ALTER TABLE "user_metadata" ADD "updateId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db); + await sql`ALTER TABLE "user_metadata" ADD "updatedAt" timestamp with time zone NOT NULL DEFAULT now();`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_update_id" ON "user_metadata" ("updateId");`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_updated_at" ON "user_metadata" ("updatedAt");`.execute(db); + await sql`CREATE OR REPLACE TRIGGER "user_metadata_audit" + AFTER DELETE ON "user_metadata" + REFERENCING OLD TABLE AS "old" + FOR EACH STATEMENT + WHEN (pg_trigger_depth() = 0) + EXECUTE FUNCTION user_metadata_audit();`.execute(db); + await sql`CREATE OR REPLACE TRIGGER "user_metadata_updated_at" + BEFORE UPDATE ON "user_metadata" + FOR EACH ROW + EXECUTE FUNCTION updated_at();`.execute(db); + await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('function_user_metadata_audit', '{"type":"function","name":"user_metadata_audit","sql":"CREATE OR REPLACE FUNCTION user_metadata_audit()\\n RETURNS TRIGGER\\n LANGUAGE PLPGSQL\\n AS $$\\n BEGIN\\n INSERT INTO user_metadata_audit (\\"userId\\", \\"key\\")\\n SELECT \\"userId\\", \\"key\\"\\n FROM OLD;\\n RETURN NULL;\\n END\\n $$;"}'::jsonb);`.execute(db); + await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('trigger_user_metadata_audit', '{"type":"trigger","name":"user_metadata_audit","sql":"CREATE OR REPLACE TRIGGER \\"user_metadata_audit\\"\\n AFTER DELETE ON \\"user_metadata\\"\\n REFERENCING OLD TABLE AS \\"old\\"\\n FOR EACH STATEMENT\\n WHEN (pg_trigger_depth() = 0)\\n EXECUTE FUNCTION user_metadata_audit();"}'::jsonb);`.execute(db); + await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('trigger_user_metadata_updated_at', '{"type":"trigger","name":"user_metadata_updated_at","sql":"CREATE OR REPLACE TRIGGER \\"user_metadata_updated_at\\"\\n BEFORE UPDATE ON \\"user_metadata\\"\\n FOR EACH ROW\\n EXECUTE FUNCTION updated_at();"}'::jsonb);`.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`DROP TRIGGER "user_metadata_audit" ON "user_metadata";`.execute(db); + await sql`DROP TRIGGER "user_metadata_updated_at" ON "user_metadata";`.execute(db); + await sql`DROP INDEX "IDX_user_metadata_update_id";`.execute(db); + await sql`DROP INDEX "IDX_user_metadata_updated_at";`.execute(db); + await sql`ALTER TABLE "user_metadata" DROP COLUMN "updateId";`.execute(db); + await sql`ALTER TABLE "user_metadata" DROP COLUMN "updatedAt";`.execute(db); + await sql`DROP TABLE "user_metadata_audit";`.execute(db); + await sql`DROP FUNCTION user_metadata_audit;`.execute(db); + await sql`DELETE FROM "migration_overrides" WHERE "name" = 'function_user_metadata_audit';`.execute(db); + await sql`DELETE FROM "migration_overrides" WHERE "name" = 'trigger_user_metadata_audit';`.execute(db); + await sql`DELETE FROM "migration_overrides" WHERE "name" = 'trigger_user_metadata_updated_at';`.execute(db); +} diff --git a/server/src/schema/tables/user-metadata-audit.table.ts b/server/src/schema/tables/user-metadata-audit.table.ts new file mode 100644 index 000000000..de7d21c87 --- /dev/null +++ b/server/src/schema/tables/user-metadata-audit.table.ts @@ -0,0 +1,17 @@ +import { PrimaryGeneratedUuidV7Column } from 'src/decorators'; +import { Column, CreateDateColumn, Generated, Table, Timestamp } from 'src/sql-tools'; + +@Table('user_metadata_audit') +export class UserMetadataAuditTable { + @PrimaryGeneratedUuidV7Column() + id!: Generated; + + @Column({ type: 'uuid', indexName: 'IDX_user_metadata_audit_user_id' }) + userId!: string; + + @Column({ indexName: 'IDX_user_metadata_audit_key' }) + key!: string; + + @CreateDateColumn({ default: () => 'clock_timestamp()', indexName: 'IDX_user_metadata_audit_deleted_at' }) + deletedAt!: Generated; +} diff --git a/server/src/schema/tables/user-metadata.table.ts b/server/src/schema/tables/user-metadata.table.ts index 04b457867..a453ec667 100644 --- a/server/src/schema/tables/user-metadata.table.ts +++ b/server/src/schema/tables/user-metadata.table.ts @@ -1,9 +1,27 @@ +import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators'; import { UserMetadataKey } from 'src/enum'; +import { user_metadata_audit } from 'src/schema/functions'; import { UserTable } from 'src/schema/tables/user.table'; -import { Column, ForeignKeyColumn, PrimaryColumn, Table } from 'src/sql-tools'; +import { + AfterDeleteTrigger, + Column, + ForeignKeyColumn, + Generated, + PrimaryColumn, + Table, + Timestamp, + UpdateDateColumn, +} from 'src/sql-tools'; import { UserMetadata, UserMetadataItem } from 'src/types'; +@UpdatedAtTrigger('user_metadata_updated_at') @Table('user_metadata') +@AfterDeleteTrigger({ + scope: 'statement', + function: user_metadata_audit, + referencingOldTableAs: 'old', + when: 'pg_trigger_depth() = 0', +}) export class UserMetadataTable implements UserMetadataItem { @ForeignKeyColumn(() => UserTable, { onUpdate: 'CASCADE', @@ -19,4 +37,10 @@ export class UserMetadataTable i @Column({ type: 'jsonb' }) value!: UserMetadata[T]; + + @UpdateIdColumn({ indexName: 'IDX_user_metadata_update_id' }) + updateId!: Generated; + + @UpdateDateColumn({ indexName: 'IDX_user_metadata_updated_at' }) + updatedAt!: Generated; } diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 2cec72a3b..9779498d7 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -70,6 +70,7 @@ export const SYNC_TYPES_ORDER = [ SyncRequestType.MemoriesV1, SyncRequestType.MemoryToAssetsV1, SyncRequestType.PeopleV1, + SyncRequestType.UserMetadataV1, ]; const throwSessionRequired = () => { @@ -155,6 +156,7 @@ export class SyncService extends BaseService { [SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth), [SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, session.id), [SyncRequestType.PeopleV1]: () => this.syncPeopleV1(response, checkpointMap, auth), + [SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(response, checkpointMap, auth), }; for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { @@ -604,6 +606,22 @@ export class SyncService extends BaseService { } } + private async syncUserMetadataV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const deleteType = SyncEntityType.UserMetadataDeleteV1; + const deletes = this.syncRepository.userMetadata.getDeletes(auth.user.id, checkpointMap[deleteType]); + + for await (const { id, ...data } of deletes) { + send(response, { type: deleteType, ids: [id], data }); + } + + const upsertType = SyncEntityType.UserMetadataV1; + const upserts = this.syncRepository.userMetadata.getUpserts(auth.user.id, checkpointMap[upsertType]); + + for await (const { updateId, ...data } of upserts) { + send(response, { type: upsertType, ids: [updateId], data }); + } + } + private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) { const { type, sessionId, createId } = item; await this.syncCheckpointRepository.upsertAll([ diff --git a/server/test/medium/specs/sync/sync-user-metadata.spec.ts b/server/test/medium/specs/sync/sync-user-metadata.spec.ts new file mode 100644 index 000000000..bb4a500a6 --- /dev/null +++ b/server/test/medium/specs/sync/sync-user-metadata.spec.ts @@ -0,0 +1,123 @@ +import { Kysely } from 'kysely'; +import { SyncEntityType, SyncRequestType, UserMetadataKey } from 'src/enum'; +import { UserRepository } from 'src/repositories/user.repository'; +import { DB } from 'src/schema'; +import { SyncTestContext } from 'test/medium.factory'; +import { getKyselyDB } from 'test/utils'; + +let defaultDatabase: Kysely; + +const setup = async (db?: Kysely) => { + const ctx = new SyncTestContext(db || defaultDatabase); + const { auth, user, session } = await ctx.newSyncAuthUser(); + return { auth, user, session, ctx }; +}; + +beforeAll(async () => { + defaultDatabase = await getKyselyDB(); +}); + +describe(SyncEntityType.UserMetadataV1, () => { + it('should detect and sync new user metadata', async () => { + const { auth, user, ctx } = await setup(); + + const userRepo = ctx.get(UserRepository); + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } }); + + const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(response).toHaveLength(1); + expect(response).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: true }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, response); + await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([]); + }); + + it('should update user metadata', async () => { + const { auth, user, ctx } = await setup(); + + const userRepo = ctx.get(UserRepository); + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } }); + + const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(response).toHaveLength(1); + expect(response).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: true }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, response); + + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: false } }); + + const updatedResponse = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(updatedResponse).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: false }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, updatedResponse); + await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([]); + }); +}); + +describe(SyncEntityType.UserMetadataDeleteV1, () => { + it('should delete and sync user metadata', async () => { + const { auth, user, ctx } = await setup(); + + const userRepo = ctx.get(UserRepository); + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } }); + + const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(response).toHaveLength(1); + expect(response).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: true }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, response); + + await userRepo.deleteMetadata(auth.user.id, UserMetadataKey.ONBOARDING); + + await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([ + { + ack: expect.any(String), + data: { + userId: user.id, + key: UserMetadataKey.ONBOARDING, + }, + type: 'UserMetadataDeleteV1', + }, + ]); + }); +});