diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index 66c264cd7..3a3a3bc6c 100644 Binary files a/mobile/openapi/README.md and b/mobile/openapi/README.md differ diff --git a/mobile/openapi/lib/api.dart b/mobile/openapi/lib/api.dart index 893587e7f..04dc43f88 100644 Binary files a/mobile/openapi/lib/api.dart and b/mobile/openapi/lib/api.dart differ diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index 7c2dc5345..4d837ccb9 100644 Binary files a/mobile/openapi/lib/api_client.dart and b/mobile/openapi/lib/api_client.dart differ diff --git a/mobile/openapi/lib/model/sync_entity_type.dart b/mobile/openapi/lib/model/sync_entity_type.dart index ed82205a3..5d130f7f9 100644 Binary files a/mobile/openapi/lib/model/sync_entity_type.dart and b/mobile/openapi/lib/model/sync_entity_type.dart differ diff --git a/mobile/openapi/lib/model/sync_partner_delete_v1.dart b/mobile/openapi/lib/model/sync_partner_delete_v1.dart new file mode 100644 index 000000000..f5e10d657 Binary files /dev/null and b/mobile/openapi/lib/model/sync_partner_delete_v1.dart differ diff --git a/mobile/openapi/lib/model/sync_partner_v1.dart b/mobile/openapi/lib/model/sync_partner_v1.dart new file mode 100644 index 000000000..e551c4c83 Binary files /dev/null and b/mobile/openapi/lib/model/sync_partner_v1.dart differ diff --git a/mobile/openapi/lib/model/sync_request_type.dart b/mobile/openapi/lib/model/sync_request_type.dart index d7f1bde54..c35b17dea 100644 Binary files a/mobile/openapi/lib/model/sync_request_type.dart and b/mobile/openapi/lib/model/sync_request_type.dart differ diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 2728fb9c9..7212d3b7f 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -12052,13 +12052,50 @@ "SyncEntityType": { "enum": [ "UserV1", - "UserDeleteV1" + "UserDeleteV1", + "PartnerV1", + "PartnerDeleteV1" ], "type": "string" }, + "SyncPartnerDeleteV1": { + "properties": { + "sharedById": { + "type": "string" + }, + "sharedWithId": { + "type": "string" + } + }, + "required": [ + "sharedById", + "sharedWithId" + ], + "type": "object" + }, + "SyncPartnerV1": { + "properties": { + "inTimeline": { + "type": "boolean" + }, + "sharedById": { + "type": "string" + }, + "sharedWithId": { + "type": "string" + } + }, + "required": [ + "inTimeline", + "sharedById", + "sharedWithId" + ], + "type": "object" + }, "SyncRequestType": { "enum": [ - "UsersV1" + "UsersV1", + "PartnersV1" ], "type": "string" }, diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index 7e6164099..8aecbe981 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -3645,10 +3645,13 @@ export enum Error2 { } export enum SyncEntityType { UserV1 = "UserV1", - UserDeleteV1 = "UserDeleteV1" + UserDeleteV1 = "UserDeleteV1", + PartnerV1 = "PartnerV1", + PartnerDeleteV1 = "PartnerDeleteV1" } export enum SyncRequestType { - UsersV1 = "UsersV1" + UsersV1 = "UsersV1", + PartnersV1 = "PartnersV1" } export enum TranscodeHWAccel { Nvenc = "nvenc", diff --git a/server/src/db.d.ts b/server/src/db.d.ts index 7fb073d8c..4c75562ba 100644 --- a/server/src/db.d.ts +++ b/server/src/db.d.ts @@ -272,6 +272,13 @@ export interface NaturalearthCountries { type: string; } +export interface PartnersAudit { + deletedAt: Generated; + id: Generated; + sharedById: string; + sharedWithId: string; +} + export interface Partners { createdAt: Generated; inTimeline: Generated; @@ -316,7 +323,6 @@ export interface SessionSyncCheckpoints { updateId: Generated; } - export interface SharedLinkAsset { assetsId: string; sharedLinksId: string; @@ -462,6 +468,7 @@ export interface DB { migrations: Migrations; move_history: MoveHistory; naturalearth_countries: NaturalearthCountries; + partners_audit: PartnersAudit; partners: Partners; person: Person; sessions: Sessions; diff --git a/server/src/dtos/sync.dto.ts b/server/src/dtos/sync.dto.ts index 0628a566c..d191c82bb 100644 --- a/server/src/dtos/sync.dto.ts +++ b/server/src/dtos/sync.dto.ts @@ -45,15 +45,30 @@ export class SyncUserDeleteV1 { userId!: string; } +export class SyncPartnerV1 { + sharedById!: string; + sharedWithId!: string; + inTimeline!: boolean; +} + +export class SyncPartnerDeleteV1 { + sharedById!: string; + sharedWithId!: string; +} + export type SyncItem = { [SyncEntityType.UserV1]: SyncUserV1; [SyncEntityType.UserDeleteV1]: SyncUserDeleteV1; + [SyncEntityType.PartnerV1]: SyncPartnerV1; + [SyncEntityType.PartnerDeleteV1]: SyncPartnerDeleteV1; }; const responseDtos = [ // SyncUserV1, SyncUserDeleteV1, + SyncPartnerV1, + SyncPartnerDeleteV1, ]; export const extraSyncModels = responseDtos; diff --git a/server/src/entities/partner-audit.entity.ts b/server/src/entities/partner-audit.entity.ts new file mode 100644 index 000000000..a731e017d --- /dev/null +++ b/server/src/entities/partner-audit.entity.ts @@ -0,0 +1,19 @@ +import { Column, CreateDateColumn, Entity, Index, PrimaryColumn } from 'typeorm'; + +@Entity('partners_audit') +export class PartnerAuditEntity { + @PrimaryColumn({ type: 'uuid', nullable: false, default: () => 'immich_uuid_v7()' }) + id!: string; + + @Index('IDX_partners_audit_shared_by_id') + @Column({ type: 'uuid' }) + sharedById!: string; + + @Index('IDX_partners_audit_shared_with_id') + @Column({ type: 'uuid' }) + sharedWithId!: string; + + @Index('IDX_partners_audit_deleted_at') + @CreateDateColumn({ type: 'timestamptz', default: () => 'clock_timestamp()' }) + deletedAt!: Date; +} diff --git a/server/src/enum.ts b/server/src/enum.ts index 95168b175..483bae2fc 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -548,9 +548,12 @@ export enum DatabaseLock { export enum SyncRequestType { UsersV1 = 'UsersV1', + PartnersV1 = 'PartnersV1', } export enum SyncEntityType { UserV1 = 'UserV1', UserDeleteV1 = 'UserDeleteV1', + PartnerV1 = 'PartnerV1', + PartnerDeleteV1 = 'PartnerDeleteV1', } diff --git a/server/src/migrations/1740739778549-CreatePartnersAuditTable.ts b/server/src/migrations/1740739778549-CreatePartnersAuditTable.ts new file mode 100644 index 000000000..d9c9dc194 --- /dev/null +++ b/server/src/migrations/1740739778549-CreatePartnersAuditTable.ts @@ -0,0 +1,38 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class CreatePartnersAuditTable1740739778549 implements MigrationInterface { + name = 'CreatePartnersAuditTable1740739778549' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE TABLE "partners_audit" ("id" uuid NOT NULL DEFAULT immich_uuid_v7(), "sharedById" uuid NOT NULL, "sharedWithId" uuid NOT NULL, "deletedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp(), CONSTRAINT "PK_952b50217ff78198a7e380f0359" PRIMARY KEY ("id"))`); + await queryRunner.query(`CREATE INDEX "IDX_partners_audit_shared_by_id" ON "partners_audit" ("sharedById") `); + await queryRunner.query(`CREATE INDEX "IDX_partners_audit_shared_with_id" ON "partners_audit" ("sharedWithId") `); + await queryRunner.query(`CREATE INDEX "IDX_partners_audit_deleted_at" ON "partners_audit" ("deletedAt") `); + await queryRunner.query(`CREATE OR REPLACE FUNCTION partners_delete_audit() RETURNS TRIGGER AS + $$ + BEGIN + INSERT INTO partners_audit ("sharedById", "sharedWithId") + SELECT "sharedById", "sharedWithId" + FROM OLD; + RETURN NULL; + END; + $$ LANGUAGE plpgsql` + ); + await queryRunner.query(`CREATE OR REPLACE TRIGGER partners_delete_audit + AFTER DELETE ON partners + REFERENCING OLD TABLE AS OLD + FOR EACH STATEMENT + EXECUTE FUNCTION partners_delete_audit(); + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_partners_audit_deleted_at"`); + await queryRunner.query(`DROP INDEX "public"."IDX_partners_audit_shared_with_id"`); + await queryRunner.query(`DROP INDEX "public"."IDX_partners_audit_shared_by_id"`); + await queryRunner.query(`DROP TRIGGER partners_delete_audit`); + await queryRunner.query(`DROP FUNCTION partners_delete_audit`); + await queryRunner.query(`DROP TABLE "partners_audit"`); + } + +} diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index bde4b9f10..f2c5a1fc1 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -56,4 +56,26 @@ export class SyncRepository { .orderBy(['id asc']) .stream(); } + + getPartnerUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('partners') + .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) + .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) + .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .orderBy(['updateId asc']) + .stream(); + } + + getPartnerDeletes(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('partners_audit') + .select(['id', 'sharedById', 'sharedWithId']) + .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) + .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) + .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .orderBy(['id asc']) + .stream(); + } } diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index b756c11ef..45b1b7ff8 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -25,6 +25,7 @@ const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] }; const SYNC_TYPES_ORDER = [ // SyncRequestType.UsersV1, + SyncRequestType.PartnersV1, ]; const throwSessionRequired = () => { @@ -81,8 +82,6 @@ export class SyncService extends BaseService { checkpoints.map(({ type, ack }) => [type, fromAck(ack)]), ); - // TODO pre-filter/sort list based on optimal sync order - for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { switch (type) { case SyncRequestType.UsersV1: { @@ -99,6 +98,23 @@ export class SyncService extends BaseService { break; } + case SyncRequestType.PartnersV1: { + const deletes = this.syncRepository.getPartnerDeletes( + auth.user.id, + checkpointMap[SyncEntityType.PartnerDeleteV1], + ); + for await (const { id, ...data } of deletes) { + response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, updateId: id, data })); + } + + const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]); + for await (const { updateId, ...data } of upserts) { + response.write(serialize({ type: SyncEntityType.PartnerV1, updateId, data })); + } + + break; + } + default: { this.logger.warn(`Unsupported sync type: ${type}`); break; diff --git a/server/test/factory.ts b/server/test/factory.ts index 983b7cbb7..a682ad48f 100644 --- a/server/test/factory.ts +++ b/server/test/factory.ts @@ -1,11 +1,12 @@ import { Insertable, Kysely } from 'kysely'; import { randomBytes, randomUUID } from 'node:crypto'; import { Writable } from 'node:stream'; -import { Assets, DB, Sessions, Users } from 'src/db'; +import { Assets, DB, Partners, Sessions, Users } from 'src/db'; import { AuthDto } from 'src/dtos/auth.dto'; import { AssetType } from 'src/enum'; import { AlbumRepository } from 'src/repositories/album.repository'; import { AssetRepository } from 'src/repositories/asset.repository'; +import { PartnerRepository } from 'src/repositories/partner.repository'; import { SessionRepository } from 'src/repositories/session.repository'; import { SyncRepository } from 'src/repositories/sync.repository'; import { UserRepository } from 'src/repositories/user.repository'; @@ -30,6 +31,7 @@ class CustomWritable extends Writable { type Asset = Insertable; type User = Partial>; type Session = Omit, 'token'> & { token?: string }; +type Partner = Insertable; export const newUuid = () => randomUUID() as string; @@ -37,6 +39,7 @@ export class TestFactory { private assets: Asset[] = []; private sessions: Session[] = []; private users: User[] = []; + private partners: Partner[] = []; private constructor(private context: TestContext) {} @@ -100,6 +103,17 @@ export class TestFactory { }; } + static partner(partner: Partner) { + const defaults = { + inTimeline: true, + }; + + return { + ...defaults, + ...partner, + }; + } + withAsset(asset: Asset) { this.assets.push(asset); return this; @@ -115,6 +129,11 @@ export class TestFactory { return this; } + withPartner(partner: Partner) { + this.partners.push(partner); + return this; + } + async create() { for (const asset of this.assets) { await this.context.createAsset(asset); @@ -124,6 +143,10 @@ export class TestFactory { await this.context.createUser(user); } + for (const partner of this.partners) { + await this.context.createPartner(partner); + } + for (const session of this.sessions) { await this.context.createSession(session); } @@ -138,6 +161,7 @@ export class TestContext { albumRepository: AlbumRepository; sessionRepository: SessionRepository; syncRepository: SyncRepository; + partnerRepository: PartnerRepository; private constructor(private db: Kysely) { this.userRepository = new UserRepository(this.db); @@ -145,6 +169,7 @@ export class TestContext { this.albumRepository = new AlbumRepository(this.db); this.sessionRepository = new SessionRepository(this.db); this.syncRepository = new SyncRepository(this.db); + this.partnerRepository = new PartnerRepository(this.db); } static from(db: Kysely) { @@ -159,6 +184,10 @@ export class TestContext { return this.userRepository.create(TestFactory.user(user)); } + createPartner(partner: Partner) { + return this.partnerRepository.create(TestFactory.partner(partner)); + } + createAsset(asset: Asset) { return this.assetRepository.create(TestFactory.asset(asset)); } diff --git a/server/test/medium/specs/sync.service.spec.ts b/server/test/medium/specs/sync.service.spec.ts index bab979410..7cd849c6f 100644 --- a/server/test/medium/specs/sync.service.spec.ts +++ b/server/test/medium/specs/sync.service.spec.ts @@ -17,6 +17,8 @@ const setup = async () => { const testSync = async (auth: AuthDto, types: SyncRequestType[]) => { const stream = TestFactory.stream(); + // Wait for 1ms to ensure all updates are available + await new Promise((resolve) => setTimeout(resolve, 1)); await sut.stream(auth, stream, { types }); return stream.getResponse(); @@ -186,4 +188,178 @@ describe(SyncService.name, () => { ); }); }); + + describe.concurrent('partners', () => { + it('should detect and sync the first partner', async () => { + const { auth, context, sut, testSync } = await setup(); + + const user1 = auth.user; + const user2 = await context.createUser(); + + const partner = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id }); + + const initialSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(initialSyncResponse).toHaveLength(1); + expect(initialSyncResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: { + inTimeline: partner.inTimeline, + sharedById: partner.sharedById, + sharedWithId: partner.sharedWithId, + }, + type: 'PartnerV1', + }, + ]), + ); + + const acks = [initialSyncResponse[0].ack]; + await sut.setAcks(auth, { acks }); + + const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(ackSyncResponse).toHaveLength(0); + }); + + it('should detect and sync a deleted partner', async () => { + const { auth, context, sut, testSync } = await setup(); + + const user1 = auth.user; + const user2 = await context.createUser(); + + const partner = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id }); + await context.partnerRepository.remove(partner); + + const response = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(response).toHaveLength(1); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: { + sharedById: partner.sharedById, + sharedWithId: partner.sharedWithId, + }, + type: 'PartnerDeleteV1', + }, + ]), + ); + + const acks = response.map(({ ack }) => ack); + await sut.setAcks(auth, { acks }); + + const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(ackSyncResponse).toHaveLength(0); + }); + + it('should detect and sync a partner share both to and from another user', async () => { + const { auth, context, sut, testSync } = await setup(); + + const user1 = auth.user; + const user2 = await context.createUser(); + + const partner1 = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id }); + const partner2 = await context.createPartner({ sharedById: user1.id, sharedWithId: user2.id }); + + const response = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(response).toHaveLength(2); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: { + inTimeline: partner1.inTimeline, + sharedById: partner1.sharedById, + sharedWithId: partner1.sharedWithId, + }, + type: 'PartnerV1', + }, + { + ack: expect.any(String), + data: { + inTimeline: partner2.inTimeline, + sharedById: partner2.sharedById, + sharedWithId: partner2.sharedWithId, + }, + type: 'PartnerV1', + }, + ]), + ); + + await sut.setAcks(auth, { acks: [response[1].ack] }); + + const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(ackSyncResponse).toHaveLength(0); + }); + + it('should sync a partner and then an update to that same partner', async () => { + const { auth, context, sut, testSync } = await setup(); + + const user1 = auth.user; + const user2 = await context.createUser(); + + const partner = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id }); + + const initialSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(initialSyncResponse).toHaveLength(1); + expect(initialSyncResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: { + inTimeline: partner.inTimeline, + sharedById: partner.sharedById, + sharedWithId: partner.sharedWithId, + }, + type: 'PartnerV1', + }, + ]), + ); + + const acks = [initialSyncResponse[0].ack]; + await sut.setAcks(auth, { acks }); + + const updated = await context.partnerRepository.update( + { sharedById: partner.sharedById, sharedWithId: partner.sharedWithId }, + { inTimeline: true }, + ); + + const updatedSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(updatedSyncResponse).toHaveLength(1); + expect(updatedSyncResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: { + inTimeline: updated.inTimeline, + sharedById: updated.sharedById, + sharedWithId: updated.sharedWithId, + }, + type: 'PartnerV1', + }, + ]), + ); + }); + + it('should not sync a partner for an unrelated user', async () => { + const { auth, context, testSync } = await setup(); + + const user2 = await context.createUser(); + const user3 = await context.createUser(); + + await context.createPartner({ sharedById: user2.id, sharedWithId: user3.id }); + + const response = await testSync(auth, [SyncRequestType.PartnersV1]); + + expect(response).toHaveLength(0); + }); + }); }); diff --git a/server/test/repositories/sync.repository.mock.ts b/server/test/repositories/sync.repository.mock.ts index fbb8ec2f6..6d94f6e03 100644 --- a/server/test/repositories/sync.repository.mock.ts +++ b/server/test/repositories/sync.repository.mock.ts @@ -9,5 +9,7 @@ export const newSyncRepositoryMock = (): Mocked