diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index 5b90e4a63..5085052c0 100644 Binary files a/mobile/openapi/README.md and b/mobile/openapi/README.md differ diff --git a/mobile/openapi/lib/api.dart b/mobile/openapi/lib/api.dart index 4de0614cf..f30481ecc 100644 Binary files a/mobile/openapi/lib/api.dart and b/mobile/openapi/lib/api.dart differ diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index 26113a111..f1cf05f11 100644 Binary files a/mobile/openapi/lib/api_client.dart and b/mobile/openapi/lib/api_client.dart differ diff --git a/mobile/openapi/lib/model/sync_entity_type.dart b/mobile/openapi/lib/model/sync_entity_type.dart index 1d09a6dbe..61f94401c 100644 Binary files a/mobile/openapi/lib/model/sync_entity_type.dart and b/mobile/openapi/lib/model/sync_entity_type.dart differ diff --git a/mobile/openapi/lib/model/sync_request_type.dart b/mobile/openapi/lib/model/sync_request_type.dart index 0b121d96c..75ce852f9 100644 Binary files a/mobile/openapi/lib/model/sync_request_type.dart and b/mobile/openapi/lib/model/sync_request_type.dart differ diff --git a/mobile/openapi/lib/model/sync_user_metadata_delete_v1.dart b/mobile/openapi/lib/model/sync_user_metadata_delete_v1.dart new file mode 100644 index 000000000..e9dd73329 Binary files /dev/null and b/mobile/openapi/lib/model/sync_user_metadata_delete_v1.dart differ diff --git a/mobile/openapi/lib/model/sync_user_metadata_v1.dart b/mobile/openapi/lib/model/sync_user_metadata_v1.dart new file mode 100644 index 000000000..0b060dc17 Binary files /dev/null and b/mobile/openapi/lib/model/sync_user_metadata_v1.dart differ diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 2e6c70ada..18204c21f 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -13902,6 +13902,8 @@ "StackDeleteV1", "PersonV1", "PersonDeleteV1", + "UserMetadataV1", + "UserMetadataDeleteV1", "SyncAckV1", "SyncResetV1" ], @@ -14137,7 +14139,8 @@ "PartnerStacksV1", "StacksV1", "UsersV1", - "PeopleV1" + "PeopleV1", + "UserMetadataV1" ], "type": "string" }, @@ -14213,6 +14216,40 @@ ], "type": "object" }, + "SyncUserMetadataDeleteV1": { + "properties": { + "key": { + "type": "string" + }, + "userId": { + "type": "string" + } + }, + "required": [ + "key", + "userId" + ], + "type": "object" + }, + "SyncUserMetadataV1": { + "properties": { + "key": { + "type": "string" + }, + "userId": { + "type": "string" + }, + "value": { + "type": "object" + } + }, + "required": [ + "key", + "userId", + "value" + ], + "type": "object" + }, "SyncUserV1": { "properties": { "deletedAt": { diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index f7fc9fe61..88dee9bf0 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -4116,6 +4116,8 @@ export enum SyncEntityType { StackDeleteV1 = "StackDeleteV1", PersonV1 = "PersonV1", PersonDeleteV1 = "PersonDeleteV1", + UserMetadataV1 = "UserMetadataV1", + UserMetadataDeleteV1 = "UserMetadataDeleteV1", SyncAckV1 = "SyncAckV1", SyncResetV1 = "SyncResetV1" } @@ -4135,7 +4137,8 @@ export enum SyncRequestType { PartnerStacksV1 = "PartnerStacksV1", StacksV1 = "StacksV1", UsersV1 = "UsersV1", - PeopleV1 = "PeopleV1" + PeopleV1 = "PeopleV1", + UserMetadataV1 = "UserMetadataV1" } export enum TranscodeHWAccel { Nvenc = "nvenc", diff --git a/server/src/dtos/sync.dto.ts b/server/src/dtos/sync.dto.ts index ff5df03ea..abd7d7b69 100644 --- a/server/src/dtos/sync.dto.ts +++ b/server/src/dtos/sync.dto.ts @@ -10,7 +10,9 @@ import { MemoryType, SyncEntityType, SyncRequestType, + UserMetadataKey, } from 'src/enum'; +import { UserMetadata } from 'src/types'; import { Optional, ValidateBoolean, ValidateDate, ValidateUUID } from 'src/validation'; export class AssetFullSyncDto { @@ -253,6 +255,19 @@ export class SyncPersonDeleteV1 { personId!: string; } +@ExtraModel() +export class SyncUserMetadataV1 { + userId!: string; + key!: string; + value!: UserMetadata[UserMetadataKey]; +} + +@ExtraModel() +export class SyncUserMetadataDeleteV1 { + userId!: string; + key!: string; +} + @ExtraModel() export class SyncAckV1 {} @@ -295,6 +310,8 @@ export type SyncItem = { [SyncEntityType.PartnerStackV1]: SyncStackV1; [SyncEntityType.PersonV1]: SyncPersonV1; [SyncEntityType.PersonDeleteV1]: SyncPersonDeleteV1; + [SyncEntityType.UserMetadataV1]: SyncUserMetadataV1; + [SyncEntityType.UserMetadataDeleteV1]: SyncUserMetadataDeleteV1; [SyncEntityType.SyncAckV1]: SyncAckV1; [SyncEntityType.SyncResetV1]: SyncResetV1; }; diff --git a/server/src/enum.ts b/server/src/enum.ts index 6d960e1fb..dca0f0955 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -589,6 +589,7 @@ export enum SyncRequestType { StacksV1 = 'StacksV1', UsersV1 = 'UsersV1', PeopleV1 = 'PeopleV1', + UserMetadataV1 = 'UserMetadataV1', } export enum SyncEntityType { @@ -639,6 +640,9 @@ export enum SyncEntityType { PersonV1 = 'PersonV1', PersonDeleteV1 = 'PersonDeleteV1', + UserMetadataV1 = 'UserMetadataV1', + UserMetadataDeleteV1 = 'UserMetadataDeleteV1', + SyncAckV1 = 'SyncAckV1', SyncResetV1 = 'SyncResetV1', } diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index f68be8c83..ac3632ad4 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -835,3 +835,30 @@ where "updatedAt" < now() - interval '1 millisecond' order by "updateId" asc + +-- SyncRepository.userMetadata.getDeletes +select + "id", + "userId", + "key" +from + "user_metadata_audit" +where + "userId" = $1 + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.userMetadata.getUpserts +select + "userId", + "key", + "value", + "updateId" +from + "user_metadata" +where + "userId" = $1 + and "updatedAt" < now() - interval '1 millisecond' +order by + "updateId" asc diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index 3bc09c97e..f9d65f724 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -16,7 +16,8 @@ type AuditTables = | 'memories_audit' | 'memory_assets_audit' | 'stacks_audit' - | 'person_audit'; + | 'person_audit' + | 'user_metadata_audit'; type UpsertTables = | 'users' | 'partners' @@ -27,7 +28,8 @@ type UpsertTables = | 'memories' | 'memories_assets_assets' | 'asset_stack' - | 'person'; + | 'person' + | 'user_metadata'; @Injectable() export class SyncRepository { @@ -47,6 +49,7 @@ export class SyncRepository { people: PersonSync; stack: StackSync; user: UserSync; + userMetadata: UserMetadataSync; constructor(@InjectKysely() private db: Kysely) { this.album = new AlbumSync(this.db); @@ -65,32 +68,31 @@ export class SyncRepository { this.people = new PersonSync(this.db); this.stack = new StackSync(this.db); this.user = new UserSync(this.db); + this.userMetadata = new UserMetadataSync(this.db); } } class BaseSync { constructor(protected db: Kysely) {} - protected auditTableFilters, D>( - qb: SelectQueryBuilder, - ack?: SyncAck, - ) { - const builder = qb as SelectQueryBuilder; - return builder - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .orderBy('id', 'asc') as SelectQueryBuilder; + protected auditTableFilters(ack?: SyncAck) { + return , D>(qb: SelectQueryBuilder) => { + const builder = qb as SelectQueryBuilder; + return builder + .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) + .orderBy('id', 'asc') as SelectQueryBuilder; + }; } - protected upsertTableFilters, D>( - qb: SelectQueryBuilder, - ack?: SyncAck, - ) { - const builder = qb as SelectQueryBuilder; - return builder - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .orderBy('updateId', 'asc') as SelectQueryBuilder; + protected upsertTableFilters(ack?: SyncAck) { + return , D>(qb: SelectQueryBuilder) => { + const builder = qb as SelectQueryBuilder; + return builder + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) + .orderBy('updateId', 'asc') as SelectQueryBuilder; + }; } } @@ -113,7 +115,7 @@ class AlbumSync extends BaseSync { .selectFrom('albums_audit') .select(['id', 'albumId']) .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -246,7 +248,7 @@ class AlbumToAssetSync extends BaseSync { ), ), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -303,7 +305,7 @@ class AlbumUserSync extends BaseSync { ), ), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -345,7 +347,7 @@ class AssetSync extends BaseSync { .selectFrom('assets_audit') .select(['id', 'assetId']) .where('ownerId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -356,7 +358,7 @@ class AssetSync extends BaseSync { .select(columns.syncAsset) .select('assets.updateId') .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -368,7 +370,7 @@ class PersonSync extends BaseSync { .selectFrom('person_audit') .select(['id', 'personId']) .where('ownerId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -391,7 +393,7 @@ class PersonSync extends BaseSync { 'faceAssetId', ]) .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -404,7 +406,7 @@ class AssetExifSync extends BaseSync { .select(columns.syncAssetExif) .select('exif.updateId') .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -416,7 +418,7 @@ class MemorySync extends BaseSync { .selectFrom('memories_audit') .select(['id', 'memoryId']) .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -440,7 +442,7 @@ class MemorySync extends BaseSync { ]) .select('updateId') .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -452,7 +454,7 @@ class MemoryToAssetSync extends BaseSync { .selectFrom('memory_assets_audit') .select(['id', 'memoryId', 'assetId']) .where('memoryId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -463,7 +465,7 @@ class MemoryToAssetSync extends BaseSync { .select(['memoriesId as memoryId', 'assetsId as assetId']) .select('updateId') .where('memoriesId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -487,7 +489,7 @@ class PartnerSync extends BaseSync { .selectFrom('partners_audit') .select(['id', 'sharedById', 'sharedWithId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -497,7 +499,7 @@ class PartnerSync extends BaseSync { .selectFrom('partners') .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -525,7 +527,7 @@ class PartnerAssetsSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -538,7 +540,7 @@ class PartnerAssetsSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -573,7 +575,7 @@ class PartnerAssetExifsSync extends BaseSync { eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ), ) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -585,7 +587,7 @@ class StackSync extends BaseSync { .selectFrom('stacks_audit') .select(['id', 'stackId']) .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -596,7 +598,7 @@ class StackSync extends BaseSync { .select(columns.syncStack) .select('updateId') .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } @@ -610,7 +612,7 @@ class PartnerStackSync extends BaseSync { .where('userId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.auditTableFilters(qb, ack)) + .$call(this.auditTableFilters(ack)) .stream(); } @@ -637,18 +639,15 @@ class PartnerStackSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) .stream(); } } + class UserSync extends BaseSync { @GenerateSql({ params: [], stream: true }) getDeletes(ack?: SyncAck) { - return this.db - .selectFrom('users_audit') - .select(['id', 'userId']) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); + return this.db.selectFrom('users_audit').select(['id', 'userId']).$call(this.auditTableFilters(ack)).stream(); } @GenerateSql({ params: [], stream: true }) @@ -656,7 +655,29 @@ class UserSync extends BaseSync { return this.db .selectFrom('users') .select(['id', 'name', 'email', 'deletedAt', 'updateId']) - .$call((qb) => this.upsertTableFilters(qb, ack)) + .$call(this.upsertTableFilters(ack)) + .stream(); + } +} + +class UserMetadataSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('user_metadata_audit') + .select(['id', 'userId', 'key']) + .where('userId', '=', userId) + .$call(this.auditTableFilters(ack)) + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('user_metadata') + .select(['userId', 'key', 'value', 'updateId']) + .where('userId', '=', userId) + .$call(this.upsertTableFilters(ack)) .stream(); } } diff --git a/server/src/schema/functions.ts b/server/src/schema/functions.ts index 424ceb0d3..335201a6c 100644 --- a/server/src/schema/functions.ts +++ b/server/src/schema/functions.ts @@ -216,3 +216,16 @@ export const person_delete_audit = registerFunction({ RETURN NULL; END`, }); + +export const user_metadata_audit = registerFunction({ + name: 'user_metadata_audit', + returnType: 'TRIGGER', + language: 'PLPGSQL', + body: ` + BEGIN + INSERT INTO user_metadata_audit ("userId", "key") + SELECT "userId", "key" + FROM OLD; + RETURN NULL; + END`, +}); diff --git a/server/src/schema/index.ts b/server/src/schema/index.ts index f564e8c7f..384e47df7 100644 --- a/server/src/schema/index.ts +++ b/server/src/schema/index.ts @@ -57,6 +57,7 @@ import { TagAssetTable } from 'src/schema/tables/tag-asset.table'; import { TagClosureTable } from 'src/schema/tables/tag-closure.table'; import { TagTable } from 'src/schema/tables/tag.table'; import { UserAuditTable } from 'src/schema/tables/user-audit.table'; +import { UserMetadataAuditTable } from 'src/schema/tables/user-metadata-audit.table'; import { UserMetadataTable } from 'src/schema/tables/user-metadata.table'; import { UserTable } from 'src/schema/tables/user.table'; import { VersionHistoryTable } from 'src/schema/tables/version-history.table'; @@ -108,6 +109,7 @@ export class ImmichDatabase { TagClosureTable, UserAuditTable, UserMetadataTable, + UserMetadataAuditTable, UserTable, VersionHistoryTable, ]; @@ -128,6 +130,7 @@ export class ImmichDatabase { memory_assets_delete_audit, stacks_delete_audit, person_delete_audit, + users_delete_audit, ]; enum = [assets_status_enum, asset_face_source_type, asset_visibility_enum]; @@ -182,6 +185,7 @@ export interface DB { tags: TagTable; tags_closure: TagClosureTable; user_metadata: UserMetadataTable; + user_metadata_audit: UserMetadataAuditTable; users: UserTable; users_audit: UserAuditTable; version_history: VersionHistoryTable; diff --git a/server/src/schema/migrations/1752250924342-UserMetadataSync.ts b/server/src/schema/migrations/1752250924342-UserMetadataSync.ts new file mode 100644 index 000000000..20778d801 --- /dev/null +++ b/server/src/schema/migrations/1752250924342-UserMetadataSync.ts @@ -0,0 +1,56 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql`CREATE OR REPLACE FUNCTION user_metadata_audit() + RETURNS TRIGGER + LANGUAGE PLPGSQL + AS $$ + BEGIN + INSERT INTO user_metadata_audit ("userId", "key") + SELECT "userId", "key" + FROM OLD; + RETURN NULL; + END + $$;`.execute(db); + await sql`CREATE TABLE "user_metadata_audit" ( + "id" uuid NOT NULL DEFAULT immich_uuid_v7(), + "userId" uuid NOT NULL, + "key" character varying NOT NULL, + "deletedAt" timestamp with time zone NOT NULL DEFAULT clock_timestamp(), + CONSTRAINT "PK_15d5cc4d65ac966233b9921acac" PRIMARY KEY ("id") +);`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_audit_user_id" ON "user_metadata_audit" ("userId");`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_audit_key" ON "user_metadata_audit" ("key");`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_audit_deleted_at" ON "user_metadata_audit" ("deletedAt");`.execute(db); + await sql`ALTER TABLE "user_metadata" ADD "updateId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db); + await sql`ALTER TABLE "user_metadata" ADD "updatedAt" timestamp with time zone NOT NULL DEFAULT now();`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_update_id" ON "user_metadata" ("updateId");`.execute(db); + await sql`CREATE INDEX "IDX_user_metadata_updated_at" ON "user_metadata" ("updatedAt");`.execute(db); + await sql`CREATE OR REPLACE TRIGGER "user_metadata_audit" + AFTER DELETE ON "user_metadata" + REFERENCING OLD TABLE AS "old" + FOR EACH STATEMENT + WHEN (pg_trigger_depth() = 0) + EXECUTE FUNCTION user_metadata_audit();`.execute(db); + await sql`CREATE OR REPLACE TRIGGER "user_metadata_updated_at" + BEFORE UPDATE ON "user_metadata" + FOR EACH ROW + EXECUTE FUNCTION updated_at();`.execute(db); + await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('function_user_metadata_audit', '{"type":"function","name":"user_metadata_audit","sql":"CREATE OR REPLACE FUNCTION user_metadata_audit()\\n RETURNS TRIGGER\\n LANGUAGE PLPGSQL\\n AS $$\\n BEGIN\\n INSERT INTO user_metadata_audit (\\"userId\\", \\"key\\")\\n SELECT \\"userId\\", \\"key\\"\\n FROM OLD;\\n RETURN NULL;\\n END\\n $$;"}'::jsonb);`.execute(db); + await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('trigger_user_metadata_audit', '{"type":"trigger","name":"user_metadata_audit","sql":"CREATE OR REPLACE TRIGGER \\"user_metadata_audit\\"\\n AFTER DELETE ON \\"user_metadata\\"\\n REFERENCING OLD TABLE AS \\"old\\"\\n FOR EACH STATEMENT\\n WHEN (pg_trigger_depth() = 0)\\n EXECUTE FUNCTION user_metadata_audit();"}'::jsonb);`.execute(db); + await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('trigger_user_metadata_updated_at', '{"type":"trigger","name":"user_metadata_updated_at","sql":"CREATE OR REPLACE TRIGGER \\"user_metadata_updated_at\\"\\n BEFORE UPDATE ON \\"user_metadata\\"\\n FOR EACH ROW\\n EXECUTE FUNCTION updated_at();"}'::jsonb);`.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`DROP TRIGGER "user_metadata_audit" ON "user_metadata";`.execute(db); + await sql`DROP TRIGGER "user_metadata_updated_at" ON "user_metadata";`.execute(db); + await sql`DROP INDEX "IDX_user_metadata_update_id";`.execute(db); + await sql`DROP INDEX "IDX_user_metadata_updated_at";`.execute(db); + await sql`ALTER TABLE "user_metadata" DROP COLUMN "updateId";`.execute(db); + await sql`ALTER TABLE "user_metadata" DROP COLUMN "updatedAt";`.execute(db); + await sql`DROP TABLE "user_metadata_audit";`.execute(db); + await sql`DROP FUNCTION user_metadata_audit;`.execute(db); + await sql`DELETE FROM "migration_overrides" WHERE "name" = 'function_user_metadata_audit';`.execute(db); + await sql`DELETE FROM "migration_overrides" WHERE "name" = 'trigger_user_metadata_audit';`.execute(db); + await sql`DELETE FROM "migration_overrides" WHERE "name" = 'trigger_user_metadata_updated_at';`.execute(db); +} diff --git a/server/src/schema/tables/user-metadata-audit.table.ts b/server/src/schema/tables/user-metadata-audit.table.ts new file mode 100644 index 000000000..de7d21c87 --- /dev/null +++ b/server/src/schema/tables/user-metadata-audit.table.ts @@ -0,0 +1,17 @@ +import { PrimaryGeneratedUuidV7Column } from 'src/decorators'; +import { Column, CreateDateColumn, Generated, Table, Timestamp } from 'src/sql-tools'; + +@Table('user_metadata_audit') +export class UserMetadataAuditTable { + @PrimaryGeneratedUuidV7Column() + id!: Generated; + + @Column({ type: 'uuid', indexName: 'IDX_user_metadata_audit_user_id' }) + userId!: string; + + @Column({ indexName: 'IDX_user_metadata_audit_key' }) + key!: string; + + @CreateDateColumn({ default: () => 'clock_timestamp()', indexName: 'IDX_user_metadata_audit_deleted_at' }) + deletedAt!: Generated; +} diff --git a/server/src/schema/tables/user-metadata.table.ts b/server/src/schema/tables/user-metadata.table.ts index 04b457867..a453ec667 100644 --- a/server/src/schema/tables/user-metadata.table.ts +++ b/server/src/schema/tables/user-metadata.table.ts @@ -1,9 +1,27 @@ +import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators'; import { UserMetadataKey } from 'src/enum'; +import { user_metadata_audit } from 'src/schema/functions'; import { UserTable } from 'src/schema/tables/user.table'; -import { Column, ForeignKeyColumn, PrimaryColumn, Table } from 'src/sql-tools'; +import { + AfterDeleteTrigger, + Column, + ForeignKeyColumn, + Generated, + PrimaryColumn, + Table, + Timestamp, + UpdateDateColumn, +} from 'src/sql-tools'; import { UserMetadata, UserMetadataItem } from 'src/types'; +@UpdatedAtTrigger('user_metadata_updated_at') @Table('user_metadata') +@AfterDeleteTrigger({ + scope: 'statement', + function: user_metadata_audit, + referencingOldTableAs: 'old', + when: 'pg_trigger_depth() = 0', +}) export class UserMetadataTable implements UserMetadataItem { @ForeignKeyColumn(() => UserTable, { onUpdate: 'CASCADE', @@ -19,4 +37,10 @@ export class UserMetadataTable i @Column({ type: 'jsonb' }) value!: UserMetadata[T]; + + @UpdateIdColumn({ indexName: 'IDX_user_metadata_update_id' }) + updateId!: Generated; + + @UpdateDateColumn({ indexName: 'IDX_user_metadata_updated_at' }) + updatedAt!: Generated; } diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 2cec72a3b..9779498d7 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -70,6 +70,7 @@ export const SYNC_TYPES_ORDER = [ SyncRequestType.MemoriesV1, SyncRequestType.MemoryToAssetsV1, SyncRequestType.PeopleV1, + SyncRequestType.UserMetadataV1, ]; const throwSessionRequired = () => { @@ -155,6 +156,7 @@ export class SyncService extends BaseService { [SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth), [SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, session.id), [SyncRequestType.PeopleV1]: () => this.syncPeopleV1(response, checkpointMap, auth), + [SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(response, checkpointMap, auth), }; for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { @@ -604,6 +606,22 @@ export class SyncService extends BaseService { } } + private async syncUserMetadataV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const deleteType = SyncEntityType.UserMetadataDeleteV1; + const deletes = this.syncRepository.userMetadata.getDeletes(auth.user.id, checkpointMap[deleteType]); + + for await (const { id, ...data } of deletes) { + send(response, { type: deleteType, ids: [id], data }); + } + + const upsertType = SyncEntityType.UserMetadataV1; + const upserts = this.syncRepository.userMetadata.getUpserts(auth.user.id, checkpointMap[upsertType]); + + for await (const { updateId, ...data } of upserts) { + send(response, { type: upsertType, ids: [updateId], data }); + } + } + private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) { const { type, sessionId, createId } = item; await this.syncCheckpointRepository.upsertAll([ diff --git a/server/test/medium/specs/sync/sync-user-metadata.spec.ts b/server/test/medium/specs/sync/sync-user-metadata.spec.ts new file mode 100644 index 000000000..bb4a500a6 --- /dev/null +++ b/server/test/medium/specs/sync/sync-user-metadata.spec.ts @@ -0,0 +1,123 @@ +import { Kysely } from 'kysely'; +import { SyncEntityType, SyncRequestType, UserMetadataKey } from 'src/enum'; +import { UserRepository } from 'src/repositories/user.repository'; +import { DB } from 'src/schema'; +import { SyncTestContext } from 'test/medium.factory'; +import { getKyselyDB } from 'test/utils'; + +let defaultDatabase: Kysely; + +const setup = async (db?: Kysely) => { + const ctx = new SyncTestContext(db || defaultDatabase); + const { auth, user, session } = await ctx.newSyncAuthUser(); + return { auth, user, session, ctx }; +}; + +beforeAll(async () => { + defaultDatabase = await getKyselyDB(); +}); + +describe(SyncEntityType.UserMetadataV1, () => { + it('should detect and sync new user metadata', async () => { + const { auth, user, ctx } = await setup(); + + const userRepo = ctx.get(UserRepository); + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } }); + + const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(response).toHaveLength(1); + expect(response).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: true }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, response); + await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([]); + }); + + it('should update user metadata', async () => { + const { auth, user, ctx } = await setup(); + + const userRepo = ctx.get(UserRepository); + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } }); + + const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(response).toHaveLength(1); + expect(response).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: true }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, response); + + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: false } }); + + const updatedResponse = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(updatedResponse).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: false }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, updatedResponse); + await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([]); + }); +}); + +describe(SyncEntityType.UserMetadataDeleteV1, () => { + it('should delete and sync user metadata', async () => { + const { auth, user, ctx } = await setup(); + + const userRepo = ctx.get(UserRepository); + await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } }); + + const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]); + expect(response).toHaveLength(1); + expect(response).toEqual([ + { + ack: expect.any(String), + data: { + key: UserMetadataKey.ONBOARDING, + userId: user.id, + value: { isOnboarded: true }, + }, + type: 'UserMetadataV1', + }, + ]); + + await ctx.syncAckAll(auth, response); + + await userRepo.deleteMetadata(auth.user.id, UserMetadataKey.ONBOARDING); + + await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([ + { + ack: expect.any(String), + data: { + userId: user.id, + key: UserMetadataKey.ONBOARDING, + }, + type: 'UserMetadataDeleteV1', + }, + ]); + }); +});