feat: UserMetadata sync (#19882)

* feat: UserMetadata sync

* refactor: sync table filters (#19887)
This commit is contained in:
Daniel Dietzler 2025-07-11 20:19:53 +02:00 committed by GitHub
parent 9e48ae3052
commit df581cc0d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 414 additions and 50 deletions

BIN
mobile/openapi/README.md generated

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -13902,6 +13902,8 @@
"StackDeleteV1",
"PersonV1",
"PersonDeleteV1",
"UserMetadataV1",
"UserMetadataDeleteV1",
"SyncAckV1",
"SyncResetV1"
],
@ -14137,7 +14139,8 @@
"PartnerStacksV1",
"StacksV1",
"UsersV1",
"PeopleV1"
"PeopleV1",
"UserMetadataV1"
],
"type": "string"
},
@ -14213,6 +14216,40 @@
],
"type": "object"
},
"SyncUserMetadataDeleteV1": {
"properties": {
"key": {
"type": "string"
},
"userId": {
"type": "string"
}
},
"required": [
"key",
"userId"
],
"type": "object"
},
"SyncUserMetadataV1": {
"properties": {
"key": {
"type": "string"
},
"userId": {
"type": "string"
},
"value": {
"type": "object"
}
},
"required": [
"key",
"userId",
"value"
],
"type": "object"
},
"SyncUserV1": {
"properties": {
"deletedAt": {

View file

@ -4116,6 +4116,8 @@ export enum SyncEntityType {
StackDeleteV1 = "StackDeleteV1",
PersonV1 = "PersonV1",
PersonDeleteV1 = "PersonDeleteV1",
UserMetadataV1 = "UserMetadataV1",
UserMetadataDeleteV1 = "UserMetadataDeleteV1",
SyncAckV1 = "SyncAckV1",
SyncResetV1 = "SyncResetV1"
}
@ -4135,7 +4137,8 @@ export enum SyncRequestType {
PartnerStacksV1 = "PartnerStacksV1",
StacksV1 = "StacksV1",
UsersV1 = "UsersV1",
PeopleV1 = "PeopleV1"
PeopleV1 = "PeopleV1",
UserMetadataV1 = "UserMetadataV1"
}
export enum TranscodeHWAccel {
Nvenc = "nvenc",

View file

@ -10,7 +10,9 @@ import {
MemoryType,
SyncEntityType,
SyncRequestType,
UserMetadataKey,
} from 'src/enum';
import { UserMetadata } from 'src/types';
import { Optional, ValidateBoolean, ValidateDate, ValidateUUID } from 'src/validation';
export class AssetFullSyncDto {
@ -253,6 +255,19 @@ export class SyncPersonDeleteV1 {
personId!: string;
}
@ExtraModel()
export class SyncUserMetadataV1 {
userId!: string;
key!: string;
value!: UserMetadata[UserMetadataKey];
}
@ExtraModel()
export class SyncUserMetadataDeleteV1 {
userId!: string;
key!: string;
}
@ExtraModel()
export class SyncAckV1 {}
@ -295,6 +310,8 @@ export type SyncItem = {
[SyncEntityType.PartnerStackV1]: SyncStackV1;
[SyncEntityType.PersonV1]: SyncPersonV1;
[SyncEntityType.PersonDeleteV1]: SyncPersonDeleteV1;
[SyncEntityType.UserMetadataV1]: SyncUserMetadataV1;
[SyncEntityType.UserMetadataDeleteV1]: SyncUserMetadataDeleteV1;
[SyncEntityType.SyncAckV1]: SyncAckV1;
[SyncEntityType.SyncResetV1]: SyncResetV1;
};

View file

@ -589,6 +589,7 @@ export enum SyncRequestType {
StacksV1 = 'StacksV1',
UsersV1 = 'UsersV1',
PeopleV1 = 'PeopleV1',
UserMetadataV1 = 'UserMetadataV1',
}
export enum SyncEntityType {
@ -639,6 +640,9 @@ export enum SyncEntityType {
PersonV1 = 'PersonV1',
PersonDeleteV1 = 'PersonDeleteV1',
UserMetadataV1 = 'UserMetadataV1',
UserMetadataDeleteV1 = 'UserMetadataDeleteV1',
SyncAckV1 = 'SyncAckV1',
SyncResetV1 = 'SyncResetV1',
}

View file

@ -835,3 +835,30 @@ where
"updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.userMetadata.getDeletes
select
"id",
"userId",
"key"
from
"user_metadata_audit"
where
"userId" = $1
and "deletedAt" < now() - interval '1 millisecond'
order by
"id" asc
-- SyncRepository.userMetadata.getUpserts
select
"userId",
"key",
"value",
"updateId"
from
"user_metadata"
where
"userId" = $1
and "updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc

View file

@ -16,7 +16,8 @@ type AuditTables =
| 'memories_audit'
| 'memory_assets_audit'
| 'stacks_audit'
| 'person_audit';
| 'person_audit'
| 'user_metadata_audit';
type UpsertTables =
| 'users'
| 'partners'
@ -27,7 +28,8 @@ type UpsertTables =
| 'memories'
| 'memories_assets_assets'
| 'asset_stack'
| 'person';
| 'person'
| 'user_metadata';
@Injectable()
export class SyncRepository {
@ -47,6 +49,7 @@ export class SyncRepository {
people: PersonSync;
stack: StackSync;
user: UserSync;
userMetadata: UserMetadataSync;
constructor(@InjectKysely() private db: Kysely<DB>) {
this.album = new AlbumSync(this.db);
@ -65,32 +68,31 @@ export class SyncRepository {
this.people = new PersonSync(this.db);
this.stack = new StackSync(this.db);
this.user = new UserSync(this.db);
this.userMetadata = new UserMetadataSync(this.db);
}
}
class BaseSync {
constructor(protected db: Kysely<DB>) {}
protected auditTableFilters<T extends keyof Pick<DB, AuditTables>, D>(
qb: SelectQueryBuilder<DB, T, D>,
ack?: SyncAck,
) {
const builder = qb as SelectQueryBuilder<DB, AuditTables, D>;
return builder
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.orderBy('id', 'asc') as SelectQueryBuilder<DB, T, D>;
protected auditTableFilters(ack?: SyncAck) {
return <T extends keyof Pick<DB, AuditTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => {
const builder = qb as SelectQueryBuilder<DB, AuditTables, D>;
return builder
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.orderBy('id', 'asc') as SelectQueryBuilder<DB, T, D>;
};
}
protected upsertTableFilters<T extends keyof Pick<DB, UpsertTables>, D>(
qb: SelectQueryBuilder<DB, T, D>,
ack?: SyncAck,
) {
const builder = qb as SelectQueryBuilder<DB, UpsertTables, D>;
return builder
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.orderBy('updateId', 'asc') as SelectQueryBuilder<DB, T, D>;
protected upsertTableFilters(ack?: SyncAck) {
return <T extends keyof Pick<DB, UpsertTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => {
const builder = qb as SelectQueryBuilder<DB, UpsertTables, D>;
return builder
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.orderBy('updateId', 'asc') as SelectQueryBuilder<DB, T, D>;
};
}
}
@ -113,7 +115,7 @@ class AlbumSync extends BaseSync {
.selectFrom('albums_audit')
.select(['id', 'albumId'])
.where('userId', '=', userId)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -246,7 +248,7 @@ class AlbumToAssetSync extends BaseSync {
),
),
)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -303,7 +305,7 @@ class AlbumUserSync extends BaseSync {
),
),
)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -345,7 +347,7 @@ class AssetSync extends BaseSync {
.selectFrom('assets_audit')
.select(['id', 'assetId'])
.where('ownerId', '=', userId)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -356,7 +358,7 @@ class AssetSync extends BaseSync {
.select(columns.syncAsset)
.select('assets.updateId')
.where('ownerId', '=', userId)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -368,7 +370,7 @@ class PersonSync extends BaseSync {
.selectFrom('person_audit')
.select(['id', 'personId'])
.where('ownerId', '=', userId)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -391,7 +393,7 @@ class PersonSync extends BaseSync {
'faceAssetId',
])
.where('ownerId', '=', userId)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -404,7 +406,7 @@ class AssetExifSync extends BaseSync {
.select(columns.syncAssetExif)
.select('exif.updateId')
.where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId))
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -416,7 +418,7 @@ class MemorySync extends BaseSync {
.selectFrom('memories_audit')
.select(['id', 'memoryId'])
.where('userId', '=', userId)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -440,7 +442,7 @@ class MemorySync extends BaseSync {
])
.select('updateId')
.where('ownerId', '=', userId)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -452,7 +454,7 @@ class MemoryToAssetSync extends BaseSync {
.selectFrom('memory_assets_audit')
.select(['id', 'memoryId', 'assetId'])
.where('memoryId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId))
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -463,7 +465,7 @@ class MemoryToAssetSync extends BaseSync {
.select(['memoriesId as memoryId', 'assetsId as assetId'])
.select('updateId')
.where('memoriesId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId))
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -487,7 +489,7 @@ class PartnerSync extends BaseSync {
.selectFrom('partners_audit')
.select(['id', 'sharedById', 'sharedWithId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -497,7 +499,7 @@ class PartnerSync extends BaseSync {
.selectFrom('partners')
.select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -525,7 +527,7 @@ class PartnerAssetsSync extends BaseSync {
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -538,7 +540,7 @@ class PartnerAssetsSync extends BaseSync {
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -573,7 +575,7 @@ class PartnerAssetExifsSync extends BaseSync {
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
),
)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -585,7 +587,7 @@ class StackSync extends BaseSync {
.selectFrom('stacks_audit')
.select(['id', 'stackId'])
.where('userId', '=', userId)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -596,7 +598,7 @@ class StackSync extends BaseSync {
.select(columns.syncStack)
.select('updateId')
.where('ownerId', '=', userId)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
@ -610,7 +612,7 @@ class PartnerStackSync extends BaseSync {
.where('userId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call((qb) => this.auditTableFilters(qb, ack))
.$call(this.auditTableFilters(ack))
.stream();
}
@ -637,18 +639,15 @@ class PartnerStackSync extends BaseSync {
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
class UserSync extends BaseSync {
@GenerateSql({ params: [], stream: true })
getDeletes(ack?: SyncAck) {
return this.db
.selectFrom('users_audit')
.select(['id', 'userId'])
.$call((qb) => this.auditTableFilters(qb, ack))
.stream();
return this.db.selectFrom('users_audit').select(['id', 'userId']).$call(this.auditTableFilters(ack)).stream();
}
@GenerateSql({ params: [], stream: true })
@ -656,7 +655,29 @@ class UserSync extends BaseSync {
return this.db
.selectFrom('users')
.select(['id', 'name', 'email', 'deletedAt', 'updateId'])
.$call((qb) => this.upsertTableFilters(qb, ack))
.$call(this.upsertTableFilters(ack))
.stream();
}
}
class UserMetadataSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('user_metadata_audit')
.select(['id', 'userId', 'key'])
.where('userId', '=', userId)
.$call(this.auditTableFilters(ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('user_metadata')
.select(['userId', 'key', 'value', 'updateId'])
.where('userId', '=', userId)
.$call(this.upsertTableFilters(ack))
.stream();
}
}

View file

@ -216,3 +216,16 @@ export const person_delete_audit = registerFunction({
RETURN NULL;
END`,
});
export const user_metadata_audit = registerFunction({
name: 'user_metadata_audit',
returnType: 'TRIGGER',
language: 'PLPGSQL',
body: `
BEGIN
INSERT INTO user_metadata_audit ("userId", "key")
SELECT "userId", "key"
FROM OLD;
RETURN NULL;
END`,
});

View file

@ -57,6 +57,7 @@ import { TagAssetTable } from 'src/schema/tables/tag-asset.table';
import { TagClosureTable } from 'src/schema/tables/tag-closure.table';
import { TagTable } from 'src/schema/tables/tag.table';
import { UserAuditTable } from 'src/schema/tables/user-audit.table';
import { UserMetadataAuditTable } from 'src/schema/tables/user-metadata-audit.table';
import { UserMetadataTable } from 'src/schema/tables/user-metadata.table';
import { UserTable } from 'src/schema/tables/user.table';
import { VersionHistoryTable } from 'src/schema/tables/version-history.table';
@ -108,6 +109,7 @@ export class ImmichDatabase {
TagClosureTable,
UserAuditTable,
UserMetadataTable,
UserMetadataAuditTable,
UserTable,
VersionHistoryTable,
];
@ -128,6 +130,7 @@ export class ImmichDatabase {
memory_assets_delete_audit,
stacks_delete_audit,
person_delete_audit,
users_delete_audit,
];
enum = [assets_status_enum, asset_face_source_type, asset_visibility_enum];
@ -182,6 +185,7 @@ export interface DB {
tags: TagTable;
tags_closure: TagClosureTable;
user_metadata: UserMetadataTable;
user_metadata_audit: UserMetadataAuditTable;
users: UserTable;
users_audit: UserAuditTable;
version_history: VersionHistoryTable;

View file

@ -0,0 +1,56 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`CREATE OR REPLACE FUNCTION user_metadata_audit()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $$
BEGIN
INSERT INTO user_metadata_audit ("userId", "key")
SELECT "userId", "key"
FROM OLD;
RETURN NULL;
END
$$;`.execute(db);
await sql`CREATE TABLE "user_metadata_audit" (
"id" uuid NOT NULL DEFAULT immich_uuid_v7(),
"userId" uuid NOT NULL,
"key" character varying NOT NULL,
"deletedAt" timestamp with time zone NOT NULL DEFAULT clock_timestamp(),
CONSTRAINT "PK_15d5cc4d65ac966233b9921acac" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE INDEX "IDX_user_metadata_audit_user_id" ON "user_metadata_audit" ("userId");`.execute(db);
await sql`CREATE INDEX "IDX_user_metadata_audit_key" ON "user_metadata_audit" ("key");`.execute(db);
await sql`CREATE INDEX "IDX_user_metadata_audit_deleted_at" ON "user_metadata_audit" ("deletedAt");`.execute(db);
await sql`ALTER TABLE "user_metadata" ADD "updateId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db);
await sql`ALTER TABLE "user_metadata" ADD "updatedAt" timestamp with time zone NOT NULL DEFAULT now();`.execute(db);
await sql`CREATE INDEX "IDX_user_metadata_update_id" ON "user_metadata" ("updateId");`.execute(db);
await sql`CREATE INDEX "IDX_user_metadata_updated_at" ON "user_metadata" ("updatedAt");`.execute(db);
await sql`CREATE OR REPLACE TRIGGER "user_metadata_audit"
AFTER DELETE ON "user_metadata"
REFERENCING OLD TABLE AS "old"
FOR EACH STATEMENT
WHEN (pg_trigger_depth() = 0)
EXECUTE FUNCTION user_metadata_audit();`.execute(db);
await sql`CREATE OR REPLACE TRIGGER "user_metadata_updated_at"
BEFORE UPDATE ON "user_metadata"
FOR EACH ROW
EXECUTE FUNCTION updated_at();`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('function_user_metadata_audit', '{"type":"function","name":"user_metadata_audit","sql":"CREATE OR REPLACE FUNCTION user_metadata_audit()\\n RETURNS TRIGGER\\n LANGUAGE PLPGSQL\\n AS $$\\n BEGIN\\n INSERT INTO user_metadata_audit (\\"userId\\", \\"key\\")\\n SELECT \\"userId\\", \\"key\\"\\n FROM OLD;\\n RETURN NULL;\\n END\\n $$;"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('trigger_user_metadata_audit', '{"type":"trigger","name":"user_metadata_audit","sql":"CREATE OR REPLACE TRIGGER \\"user_metadata_audit\\"\\n AFTER DELETE ON \\"user_metadata\\"\\n REFERENCING OLD TABLE AS \\"old\\"\\n FOR EACH STATEMENT\\n WHEN (pg_trigger_depth() = 0)\\n EXECUTE FUNCTION user_metadata_audit();"}'::jsonb);`.execute(db);
await sql`INSERT INTO "migration_overrides" ("name", "value") VALUES ('trigger_user_metadata_updated_at', '{"type":"trigger","name":"user_metadata_updated_at","sql":"CREATE OR REPLACE TRIGGER \\"user_metadata_updated_at\\"\\n BEFORE UPDATE ON \\"user_metadata\\"\\n FOR EACH ROW\\n EXECUTE FUNCTION updated_at();"}'::jsonb);`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP TRIGGER "user_metadata_audit" ON "user_metadata";`.execute(db);
await sql`DROP TRIGGER "user_metadata_updated_at" ON "user_metadata";`.execute(db);
await sql`DROP INDEX "IDX_user_metadata_update_id";`.execute(db);
await sql`DROP INDEX "IDX_user_metadata_updated_at";`.execute(db);
await sql`ALTER TABLE "user_metadata" DROP COLUMN "updateId";`.execute(db);
await sql`ALTER TABLE "user_metadata" DROP COLUMN "updatedAt";`.execute(db);
await sql`DROP TABLE "user_metadata_audit";`.execute(db);
await sql`DROP FUNCTION user_metadata_audit;`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'function_user_metadata_audit';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'trigger_user_metadata_audit';`.execute(db);
await sql`DELETE FROM "migration_overrides" WHERE "name" = 'trigger_user_metadata_updated_at';`.execute(db);
}

View file

@ -0,0 +1,17 @@
import { PrimaryGeneratedUuidV7Column } from 'src/decorators';
import { Column, CreateDateColumn, Generated, Table, Timestamp } from 'src/sql-tools';
@Table('user_metadata_audit')
export class UserMetadataAuditTable {
@PrimaryGeneratedUuidV7Column()
id!: Generated<string>;
@Column({ type: 'uuid', indexName: 'IDX_user_metadata_audit_user_id' })
userId!: string;
@Column({ indexName: 'IDX_user_metadata_audit_key' })
key!: string;
@CreateDateColumn({ default: () => 'clock_timestamp()', indexName: 'IDX_user_metadata_audit_deleted_at' })
deletedAt!: Generated<Timestamp>;
}

View file

@ -1,9 +1,27 @@
import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators';
import { UserMetadataKey } from 'src/enum';
import { user_metadata_audit } from 'src/schema/functions';
import { UserTable } from 'src/schema/tables/user.table';
import { Column, ForeignKeyColumn, PrimaryColumn, Table } from 'src/sql-tools';
import {
AfterDeleteTrigger,
Column,
ForeignKeyColumn,
Generated,
PrimaryColumn,
Table,
Timestamp,
UpdateDateColumn,
} from 'src/sql-tools';
import { UserMetadata, UserMetadataItem } from 'src/types';
@UpdatedAtTrigger('user_metadata_updated_at')
@Table('user_metadata')
@AfterDeleteTrigger({
scope: 'statement',
function: user_metadata_audit,
referencingOldTableAs: 'old',
when: 'pg_trigger_depth() = 0',
})
export class UserMetadataTable<T extends keyof UserMetadata = UserMetadataKey> implements UserMetadataItem<T> {
@ForeignKeyColumn(() => UserTable, {
onUpdate: 'CASCADE',
@ -19,4 +37,10 @@ export class UserMetadataTable<T extends keyof UserMetadata = UserMetadataKey> i
@Column({ type: 'jsonb' })
value!: UserMetadata[T];
@UpdateIdColumn({ indexName: 'IDX_user_metadata_update_id' })
updateId!: Generated<string>;
@UpdateDateColumn({ indexName: 'IDX_user_metadata_updated_at' })
updatedAt!: Generated<Timestamp>;
}

View file

@ -70,6 +70,7 @@ export const SYNC_TYPES_ORDER = [
SyncRequestType.MemoriesV1,
SyncRequestType.MemoryToAssetsV1,
SyncRequestType.PeopleV1,
SyncRequestType.UserMetadataV1,
];
const throwSessionRequired = () => {
@ -155,6 +156,7 @@ export class SyncService extends BaseService {
[SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth),
[SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, session.id),
[SyncRequestType.PeopleV1]: () => this.syncPeopleV1(response, checkpointMap, auth),
[SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(response, checkpointMap, auth),
};
for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) {
@ -604,6 +606,22 @@ export class SyncService extends BaseService {
}
}
private async syncUserMetadataV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) {
const deleteType = SyncEntityType.UserMetadataDeleteV1;
const deletes = this.syncRepository.userMetadata.getDeletes(auth.user.id, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data });
}
const upsertType = SyncEntityType.UserMetadataV1;
const upserts = this.syncRepository.userMetadata.getUpserts(auth.user.id, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data });
}
}
private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) {
const { type, sessionId, createId } = item;
await this.syncCheckpointRepository.upsertAll([

View file

@ -0,0 +1,123 @@
import { Kysely } from 'kysely';
import { SyncEntityType, SyncRequestType, UserMetadataKey } from 'src/enum';
import { UserRepository } from 'src/repositories/user.repository';
import { DB } from 'src/schema';
import { SyncTestContext } from 'test/medium.factory';
import { getKyselyDB } from 'test/utils';
let defaultDatabase: Kysely<DB>;
const setup = async (db?: Kysely<DB>) => {
const ctx = new SyncTestContext(db || defaultDatabase);
const { auth, user, session } = await ctx.newSyncAuthUser();
return { auth, user, session, ctx };
};
beforeAll(async () => {
defaultDatabase = await getKyselyDB();
});
describe(SyncEntityType.UserMetadataV1, () => {
it('should detect and sync new user metadata', async () => {
const { auth, user, ctx } = await setup();
const userRepo = ctx.get(UserRepository);
await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } });
const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]);
expect(response).toHaveLength(1);
expect(response).toEqual([
{
ack: expect.any(String),
data: {
key: UserMetadataKey.ONBOARDING,
userId: user.id,
value: { isOnboarded: true },
},
type: 'UserMetadataV1',
},
]);
await ctx.syncAckAll(auth, response);
await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([]);
});
it('should update user metadata', async () => {
const { auth, user, ctx } = await setup();
const userRepo = ctx.get(UserRepository);
await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } });
const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]);
expect(response).toHaveLength(1);
expect(response).toEqual([
{
ack: expect.any(String),
data: {
key: UserMetadataKey.ONBOARDING,
userId: user.id,
value: { isOnboarded: true },
},
type: 'UserMetadataV1',
},
]);
await ctx.syncAckAll(auth, response);
await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: false } });
const updatedResponse = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]);
expect(updatedResponse).toEqual([
{
ack: expect.any(String),
data: {
key: UserMetadataKey.ONBOARDING,
userId: user.id,
value: { isOnboarded: false },
},
type: 'UserMetadataV1',
},
]);
await ctx.syncAckAll(auth, updatedResponse);
await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([]);
});
});
describe(SyncEntityType.UserMetadataDeleteV1, () => {
it('should delete and sync user metadata', async () => {
const { auth, user, ctx } = await setup();
const userRepo = ctx.get(UserRepository);
await userRepo.upsertMetadata(user.id, { key: UserMetadataKey.ONBOARDING, value: { isOnboarded: true } });
const response = await ctx.syncStream(auth, [SyncRequestType.UserMetadataV1]);
expect(response).toHaveLength(1);
expect(response).toEqual([
{
ack: expect.any(String),
data: {
key: UserMetadataKey.ONBOARDING,
userId: user.id,
value: { isOnboarded: true },
},
type: 'UserMetadataV1',
},
]);
await ctx.syncAckAll(auth, response);
await userRepo.deleteMetadata(auth.user.id, UserMetadataKey.ONBOARDING);
await expect(ctx.syncStream(auth, [SyncRequestType.UserMetadataV1])).resolves.toEqual([
{
ack: expect.any(String),
data: {
userId: user.id,
key: UserMetadataKey.ONBOARDING,
},
type: 'UserMetadataDeleteV1',
},
]);
});
});