From ac36effb4574f76ea29e4b6e56b22b2d7ae5c792 Mon Sep 17 00:00:00 2001 From: Zack Pollard Date: Fri, 21 Feb 2025 04:37:57 +0000 Subject: [PATCH] feat: sync implementation for the user entity (#16234) * ci: print out typeorm generation changes * feat: sync implementation for the user entity wip --------- Co-authored-by: Jason Rasmussen --- .github/workflows/test.yml | 1 + mobile/openapi/README.md | Bin 32428 -> 33112 bytes mobile/openapi/lib/api.dart | Bin 11769 -> 12055 bytes mobile/openapi/lib/api/sync_api.dart | Bin 3887 -> 8738 bytes mobile/openapi/lib/api_client.dart | Bin 30184 -> 30844 bytes mobile/openapi/lib/api_helper.dart | Bin 6386 -> 6600 bytes .../lib/model/sync_ack_delete_dto.dart | Bin 0 -> 2847 bytes mobile/openapi/lib/model/sync_ack_dto.dart | Bin 0 -> 2890 bytes .../openapi/lib/model/sync_ack_set_dto.dart | Bin 0 -> 2870 bytes .../openapi/lib/model/sync_entity_type.dart | Bin 0 -> 2679 bytes .../openapi/lib/model/sync_request_type.dart | Bin 0 -> 2557 bytes mobile/openapi/lib/model/sync_stream_dto.dart | Bin 0 -> 2805 bytes .../lib/model/sync_user_delete_v1.dart | Bin 0 -> 2832 bytes mobile/openapi/lib/model/sync_user_v1.dart | Bin 0 -> 3475 bytes open-api/immich-openapi-specs.json | 243 ++++++++++++++++++ open-api/typescript-sdk/src/fetch-client.ts | 55 ++++ server/src/app.module.ts | 2 +- server/src/controllers/sync.controller.ts | 52 +++- server/src/database.ts | 3 + server/src/db.d.ts | 18 +- server/src/dtos/sync.dto.ts | 53 +++- server/src/entities/index.ts | 4 + server/src/entities/sync-checkpoint.entity.ts | 24 ++ server/src/entities/user-audit.entity.ts | 14 + server/src/entities/user.entity.ts | 2 + server/src/enum.ts | 9 + .../src/middleware/global-exception.filter.ts | 7 + ...001232576-AddSessionSyncCheckpointTable.ts | 22 ++ .../1740064899123-AddUsersAuditTable.ts | 34 +++ server/src/repositories/index.ts | 2 + server/src/repositories/sync.repository.ts | 80 ++++++ server/src/services/base.service.ts | 2 + server/src/services/sync.service.ts | 100 ++++++- server/src/types.ts | 7 + server/src/utils/misc.ts | 2 + server/src/utils/sync.ts | 30 +++ .../test/repositories/sync.repository.mock.ts | 13 + server/test/utils.ts | 4 + 38 files changed, 773 insertions(+), 10 deletions(-) create mode 100644 mobile/openapi/lib/model/sync_ack_delete_dto.dart create mode 100644 mobile/openapi/lib/model/sync_ack_dto.dart create mode 100644 mobile/openapi/lib/model/sync_ack_set_dto.dart create mode 100644 mobile/openapi/lib/model/sync_entity_type.dart create mode 100644 mobile/openapi/lib/model/sync_request_type.dart create mode 100644 mobile/openapi/lib/model/sync_stream_dto.dart create mode 100644 mobile/openapi/lib/model/sync_user_delete_v1.dart create mode 100644 mobile/openapi/lib/model/sync_user_v1.dart create mode 100644 server/src/entities/sync-checkpoint.entity.ts create mode 100644 server/src/entities/user-audit.entity.ts create mode 100644 server/src/migrations/1740001232576-AddSessionSyncCheckpointTable.ts create mode 100644 server/src/migrations/1740064899123-AddUsersAuditTable.ts create mode 100644 server/src/repositories/sync.repository.ts create mode 100644 server/src/utils/sync.ts create mode 100644 server/test/repositories/sync.repository.mock.ts diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a8a8f0e5c..9d89063f2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -504,6 +504,7 @@ jobs: run: | echo "ERROR: Generated migration files not up to date!" echo "Changed files: ${{ steps.verify-changed-files.outputs.changed_files }}" + cat ./src/migrations/*-TestMigration.ts exit 1 - name: Run SQL generation diff --git a/mobile/openapi/README.md b/mobile/openapi/README.md index d006ef38bb3ea1c08c8979bf02a51a1d87dba3c4..e86ac93350804ee4ec3bc4c54ab175aadd95aaca 100644 GIT binary patch delta 673 zcmZ4Um+?ju(}va7ZYilbsU@kwm3hgI$=O<3u^K7)$@=+u+ro?3!pio0tFvJpU&xwOFA3o^BU zoM|@bEXhY#6P#Ltp$sC1tg=4Y zHLoPIq%x$kAQj1SgdnowG}v5!Yt!LSF#@xA+U~ diff --git a/mobile/openapi/lib/api.dart b/mobile/openapi/lib/api.dart index 2a2b6d46a496d05f5923ea2c6f9fc9f61334336b..e5794a26940bca0b837a14819b6cc2a112022911 100644 GIT binary patch delta 134 zcmewvJw0xNvjS^ga(rU)^OP+jJT!9Cyq$stp zG_|;7GNXhf7g(UUq$o9U@ diff --git a/mobile/openapi/lib/api/sync_api.dart b/mobile/openapi/lib/api/sync_api.dart index f94eb88081a15660b7453a2d335b6d2a12e4db14..49a4963bffd39776a42e077515ab3adc9b27a229 100644 GIT binary patch delta 953 zcmZ24x5#C~4n_eNS0C39R|Wmz%DiO##N_PBXBj=^gMl2!lH1B&BOU9^RD24n5!d5rzMlWjO8dEH$@P(n&;@?#DW z5o}>Fc{-C1)R)$iZJ3u$p1@_nOiO_2Q3**QipxNrg?a$!S(aL^S}p*UG+);M delta 15 WcmZ4FvR-b(4#v$6Y^A(RwOjx(Tm>Nj diff --git a/mobile/openapi/lib/api_client.dart b/mobile/openapi/lib/api_client.dart index 49fbe9464b7dba8d7243fcd28aa3075465490e2f..54a8959f6a11860d23c4aeb940de85c02d600b8c 100644 GIT binary patch delta 237 zcmaFyn(@yE#tnypSn`q`lPBMom*Wnu1TnK+Qgc!#8_KG(L-~^*21_ynWhN)ssd7QZ zf>TRC!t7uT!KsrKqScu4k|#f~m*WJfaLp^pESWst#sq9;P->tX#`Am?EG5~{pS9$;d delta 18 acmezKf$_y_#tnypCL4rvZB7W4Q3e2Cvne}iu7!Kj;pZV;?U(=iE$E*2t0@rWfPeYi@;AS?553|YJ>rc!T+T&`0b>z7~I&??ujm}6NQUaiV4+9NyF~;WU^cp(#E+i)@bH3QN%BQ zjgw3mI~ZU(2YLm%l!Yi&gr8S~L6Qq=;dWOfSIL(N_T}WnX}BK%3=x2j(p9FgAfS+2 zSoH0dpt!t5M(EIg8c-Afmnj>$T`4V$fct^PGQ>yw3oHQOy|V5#`A)Gxx0|Eo7}%a4 zztP{MG1xwu-f5jv14cM)U^EJ5cb{c~ZMjUE3mA>T!<2|TptNE0h~;WOSsdLCHc?_Y6|z!nR0&f(9OXnWw*n;6y#+tH0Zar>V%1fod6kCFxD zEPNArg{oN@y{Tg@_!i1;g$C!O%LVqh456rUR0hdlt1PRrgMMk`3I~FeY8ZWO4Te>@HX^0T3An%zp7-f_j=a(|VCCM1q=>aQ zApnku;4o1lXf&Aq1A&282nDMl@8=3Q6B#gDm)y@`x}qE>?ixYmhKTtCuNO3!^jDX= z=gT?|f!tDCL)A`icYloGC`E%Ar3&Gmr+97jR@~({EU>~sli|KW!sB41t#Zeb>@BD9 zVtIg)LK2cx2ArL-`XXg-w3}{9w`*b|y0|ccI<(&6-eS|k^9Fwg9@F+QjYx$I7#3cG zLpUAyd^Tbgc0&?F)J7!I)U)34^w83vP@E&6YcHp%t)PMG5}IEeg$Jx=p*!gP%Ezz~ zWlMLLi>6c$3tBPUwcH+rX;uMs+`x``e~LQWjXWzq5;HD|`k=jp_DqcNSi;knPFcSt zj~K+eX4h~S(Up?6>4wns9X-w^2a4FP!^|U`B2WwO_(kq$6SBY8h*~09L?#qg`%O9PAGy@#3$_0jIu81(MfGQ zbxtv9S2aIK^?$VUL|HsVV)ahz?0`oVgnb>8+-Kr=i*|$*wqtr2*sxXCiD}1F9Z?Kn zp2?Q~=A98xY8H=NfizbrjXMc=s##1+V<6z&?d*o$!D{yXYgXg@H~F@Q`}iL1@L0$F F@h@|Dn|J^K literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/sync_ack_dto.dart b/mobile/openapi/lib/model/sync_ack_dto.dart new file mode 100644 index 0000000000000000000000000000000000000000..c7fafa17d20d4e8729b35811e67bf9fdffb4a233 GIT binary patch literal 2890 zcmbVO+iu%N5PjEIjDsMiQHE;!6v&ktqn3-Li3KETfkF_7HMtZwrnt=RGJ@df-#atQ zn`$*N`VdE4&h?x#vmB2{<1t+Qxm>*beSSOtaJ8IY!_Av_^AN5VaJyK-`^EK}o4?P{ zj4WTL!noHFIp%?U!f=WX$KU z^%rRj0)Z}fS{Kxc5l$PJPJ_kWCz)V_E|=y4rZez>C87wZlCXXA^{f8{>Zt7(FSty0 zr>Zd|)dNT8R0+K}S&=FoY@~x+*!(x0qT!4~b}}OMlroXl3bQBnkVMQIaZ|fzxlT zE-{WLGgGA~;1eFf6dqYW94@&^V2hPG$3*9Av}~#W*K11NzxN<|$&l1I2zeYx5>J#M zh%yB~eHN6n@I@50w2&FSZKACB2vwV*!8r+fg{`kcDC+_xM0(jN%VHi-RT-J#4DtAx zIzf@ynB*7ZG&+^UIJmh^i8_BU7=&LrJLA~k>Z@u&t*ea@sl;ImxU>+S^^85ki6Bg{ zTK|@GiuJb607sl~fG82xJz^*Dvo!#?&k}GBdWDotU@NM>2)9MHM2eb7vw(OH!P9zi z=I5Avh`9+j+)rslh;z^6g<9}r_ijyJ2hMDGxiAAA;|-4Q6!p^WqZt%t%Fl^%&S^^JHWUwCTvEn^WqDQVk22tnV=x0vG)5nBv+_zr^z z&dNL9%RTKz_V?QBiE9Zwq7hxY;lPIRk(L+Zqy1&KQ!d9Nnh?6#d+J_#1GH~8A})3` zq>melgOWMHJgH?kdCWf@p431HPn$kWH&n-?B>zVvPo)1TQfe0AV6wlf6zu4j(jNDS zsOaDt2qx@?uOGv(Q_+bT#v}vs$FR(0$8-0Ig_5#(*$O1JMhVfq~!~+%I~+!1*KXa)is`AK4|BF#rGn literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/sync_ack_set_dto.dart b/mobile/openapi/lib/model/sync_ack_set_dto.dart new file mode 100644 index 0000000000000000000000000000000000000000..0d9eedc3892961d297d4de33d10bab56754116ef GIT binary patch literal 2870 zcmbVOZExE)5dQ98aV>^e22^?5r^1=t28%PKYh$3vJ`9E-FcNLCkwulHY8a{i`|ggC zEH&N|EI<;Qy4UA*ef9S% zG$YIRsW5K(YxK+80ew}QLK_*cwTag%mlLRp%E~mBx!lUah0S-hEwyncJw(0}dmERP zi;etRDTD5X*y7((82s;~u^8Og)9#5btP_QcRf-AKT1mt1_GGeL71GAJE;eZ9GEu~D ze~yz(7&{nXItO|Ux|D?|RfNCigF%uDYvFcZBp1oUt#p@8!{Y#8jNN~ht}=xM0fpSb zvS+jc#f2p@LhIh6fcybmrfejV2Mbf+E?~I|@yTI?X#qYe>wdd+MirYGkQtx8*WaWu z*ehDxYn@ZIMmTL?It>>0Uu1$kxJ;T6OlRORNkkq{vaosb{k#7J>c`H`&bYKVm9m_F ztwgTeK0;PRZUYX>1&3UZFSA>aDjaO2gG|`$cb!Tsfy1>zA$=4wk=6>cCw`H#Oj>2x zETAr<#`$S@4+c2xKLp(3um|muLQH?6zpEhw_DPgAiXo(X6qZV67~+G~1qSd;tW_a$ z_<~1B#S`nt!#S4+Y_T*$iauPSY0IEDFf0eQqJsmm_;Q(mC{plqVnI0z-$Y(X3u~jd zb)Y35p|aL!FeKeAvAtCYMU|rxi1wYbEVvHjrIBk?J4?(|32Md0q@oz-#;GL65zBN+ z6!?R_Sl@903|9@GFRa0?DmO-?62~Io>Oy$kW8^jRNNUHLy#a|1t8V-OdgybEC=spp zkE{h=5frROx|b;6H1w4CnfyQ&CcVZ@BJOjVoa4_s2#EJ1tKw)l>eev}Mv>Drl)=X6 z9R-@e8s|cedI2ZBS*Aoa00%a!D<{HQjnuIFf$uJw8RVGn3I))^p9&7)k;lKO1=YyK zgog%L537;3${owP7kRll(n9fw6)FJE&KS8!*&FTV+tTfufXEcCnV^oVcevo#^!U8t zl1apzJ$@n*GWUV-)tC@GM`660vXHy+h-zve5^3t?)A96((x6eC9=^5L6T%v}I0_GF z4Ssh`^va&WR+J|xXzKKUpcSLc3p?z$K5JG8_1d6~8UGV(wwrn~{%sy`z0_Cm1lrp& z#tRGYWx9X;UhNTwr)J+!7tuE*ZPNdC-tj^1X@9c6*Wr=D zhRGhE=-Q48_KaU?`!Ph?UwS*`GCZPzp-a5S;Tvy*F7;N$3jl5DRfa;Ld`_s~#5l4R z==`J}UOJMPv8$9*dH#=HUMPi^$f{mwopFCwHP}?2+a3WC(b9s}5l;xsL{8dIVXLSU zgN=DQLKwqcCR@&%*GE8kS-hMDvRk1X?j_(IXfZA6UciUj#SOib)uj7(ug3XL@@?M& RW{-@8zw-Iz?0kT0;~xYqmB0W1 literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/sync_entity_type.dart b/mobile/openapi/lib/model/sync_entity_type.dart new file mode 100644 index 0000000000000000000000000000000000000000..ed82205a37a1f2bafc676eae39be70f414b74efe GIT binary patch literal 2679 zcmai0Yfs}w6#brGagnMeNU789rxm+d7KzYO?LxJLN>vnDgFPnGb!Oa|F_E?6zxUi3 z+axAsksuG>=eg&e%XB)LPU-w^ar5(U^XvK7^TqsvF0Ve%Cv%B^TzTGc+a z^50rnxNEV&-%41#dTAYkTX)z!GunBfeX4RKRE3f@xVwYNN~NVsOQRoP=GQ`}?|wmhDLC2*#3eUoH2? z6yV;gAt2~b_=kPVk4xktihWJ`_gyjk%_`K>M%(v3qr#d^KrL^;UEmZ4$@uP@GJ6(Q zXor>ANIO1Ba+#T2j&~;C=HRx6Wm1VRFNewg!MrB){oa_89<134253fwD4m=ErER7@cke8>yrq9p2tdY4d z_|n4$=Rg=PAeIn4j%K9mvfK?w6kA9{p7*FlKcc75LiZX!S?O!5J1=jzsc{F>X%AS) z|1^4HU7J!rBd{J=L=k{sc*=w(u#JbdBD_~Y0y4>aIpiOC(=Od4L=_rJ5C^`Zjw-qE zg6>83jje|fl7sp)U{M%wAu?}l^CP}7CDXp2NZfs!0GGE3$O{cg4rIy>R*8Ke$d_JVZMhpnJpc2Ga0PJ{fjUsy zIc0GV(Ja;tcMSV(DuuTLSR!OSXDhAME-(ThpXJD#952` zxSKf-yHmA(nh-9YWiM*1R%}Qwl)~W%XY2TMS3ff0B=u+Nv>oE0m?U0;sE8~sJ6_#U zIDXByGkT7Y%R)eTPtkby!^I!@EXHv20QV0|`0L^3?)K9K znX%>jOc*!&HGchO#E){Tw813TCdp-iQ>avBF-x$(Jt{YDpXI*P#tq&O#X;0IDJz#4 z{I8ORt}C&p_fi=8bZ9I&H+J}Xs+Dy@xkP3ZQ05YizitPE6;h#13az$e=5|6QZ+=PA zoiKJZqHqp$4!T4oN*UAN+tFy07Q$M1tX2Av(nIT(b&2q01TY~0?m~1a9D)Fa2S8Qr z;UL`(+zz2M`k6AzVD&7D3fD2+^C|Z|#!2LM(#GEGiUJ4!OY%{v3{6o>wFQ65NwRGS zwa5Ji4g1Zd5(PXu(^+~q@AWepLrIWHbLe#V5?#UUU2y76yW>BtR{cYQ*~zrYp1N%8 zKd)gnP=~pE3a6LHIzCA2!4*XTrM6aX3JifQNEOOiF?8!y4{8tDSFt?^mO}s5Gkv$B z5&@dr#`LiEWi_{Z)^o^>-up_knixm51@IY@o-Ox$X@s)G^gWvBD$X#~8Ad1VYNBwZ zXNf^Uzuxx4{Yxm1;mbzr0=7mU1cNezToe|^FIS#qZCef6s&IovQ;}S@%pqcrK+N!P zzj_7y97}tz-C|$5x^+oTwSg!Qh5~a7*z|bbh7E)}?{Wtn-KH*ce135b;mXAfR8ZAb?Bv>cL-vYIUM@<7dN?Ag)@Z zJSo;IHeNGWqF*Wt7_~3F!yR>mq(UF=i9>5H zZB9;VmILMo-B+HP>@_e9vl#x+2PB;wGlhGh{XBj@cH{d88HCu(!Y4%=G3-dp7RG23 zirmpSsjs2n4bV%7@~nZDS60J;t09lqf8PP^G1_YcJE3&4i^4oou{3tHh-mN)?rA4v zEh231H4}tEuuhUjSC*FuIo1U^E6d|X;~#j0ILjU><_+*s+jVYtD{x&qNyE6AoxZVV zupyjDhNU%{?!xJ*kfg#Q9JkE0X=6W~xLE{8dCD*sq@q(wy?t@?i!Qg*C~V=lA#iQB z75_Wq8x>tXsn3eq6xMG*mlz#5H-(?L7T$bqgzst_Yb2YJcm;Ys67B^$Qm($9^yZWZ U)94p>VtB>%uFQ|x@s{QCe}F|*k^lez literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/sync_stream_dto.dart b/mobile/openapi/lib/model/sync_stream_dto.dart new file mode 100644 index 0000000000000000000000000000000000000000..28fd3dfaeed9601174ed0e971ec9bb94ed8681ab GIT binary patch literal 2805 zcmbVOU2oeq6n*!vxGjc60aSV0Q<2nOi^Un*wK0%*4})O{j6_@PWKt!m8b<2>zI!hv zSz5g%SOCWsc|XrNm(+MX8js=KAB*{`-)6V7yLXG(HQc=aIE&$W4!83Ke41aszxne8 znvvz(Tv$K(Ir-`Jh#u8O8!OYbvFTcs@)BxYJDI1llv}BN+&-&qWvoBxAqe4$+8_my-o0d519s~EQq7nm2|qjLTYIo=^G()k6NPJtbG|E>8d zt;M#{_Q9Bv3b(==3zJDSfA}mjY{wVUUch7u0iH~h5#^2Bop0a5FHrw%^6ZSOOmiu# z*_T?B%I_1bM3gS#u%Y%iR1`|?m!!0Zjr33mSNv*niA8Y0PBkPJEi>tyuzMmK`75MT zj?E%&G-?IHTAwc*5ITo2IS!O}@Kuy3 z1Xx?MZ2~QM3AMIHgL6{i5*u8_pz9I^LG168V+rS|*}E^$yIt}cd`16E!lm!yBJIT(=ku&GKnsDVV}d7!%I84#tRpg0438!vCDr>F(#653xJg@>sDzdzK2vZt^WRY!53 zg0{{82wE}9jlhO6+JUEDTeLCbe}c_+lR(D5X%~-z<^sKh?ygJmy1`qOZqINK4v525 zyKkvW=t)V}wn8|DcAlV+qaTyo$)ZM5HH_5%eRoGm zmStzj7QnGZ-W~6r=eeW${a(KhZ@)|@FaH?djX%Aejz@5F`(YfyXaaYWDSVuaZg2iP zLo>2`lM3SozeKNI_vleAb8Te2&?a7}OkP2mmsX~+%;Z|;F07x`y3od*^bpxbY;9bW zE;jP-QW3oT1ZYA>g z_0Ms#62|s=c$@>h0A0vj6e_~cb+4CX!dm#e&66)ynh_S}RLz zE;lgi+RZ_6dWnqCp}QMU6o9&Ha4Tga#LwKOA@1xh@qF}hM&(QJ>%8(tf0xE!!MJ;` zbw(*0;k1FlAeh{LlL;2$R?=L;UfJ&}x2P7HDum;+tWW^APef zLopEZ8)aE?EeZ-F7pQt}7*i%F7aNn1V(cHMk{DZUrBkBBFC5ABH_m~9tAO;m)hL$5 z(uh=!oDMj_5MFdCdV$=Mkt43RB)Im5!M}%5_pyP{-GW`N;lw>$iVY; z1#3}!B`_z8>9&wn$YDUdhtSY^c}C4ge^s(OeyoTkh#O)npgt%{urnftkm2$TvUY%& zKkR|qc4_oR+-KM?un?J*;jYHPqhO>hGslwb2A|FMKvGD;kX&k zOvDx^L{Np+8yrn+y5FzSXV~%BZb=bokgLJOD{u&>J)36()?hm%%{M9|5^3r=Z+UuX zsZA)(0nn9~(^OVaLv;o9FOI_PRk6@+^ls)u;{GHTb*df~v|_k<YjRIzP3Zx#=-M<8mW_`z!5Ag&@2-tr*_mt(r%U`ajxvrYxQ#v1+EZX26pQ z!nzJg?lKV(MGbTI|hX3aM2&HE6#uZ3&iPE^2fOne3 oW2p@Uy!$-4qt~p8eZJ_Lm#g>W+g$F$2Zp1*$@y*Tx`)%`A5`s-JOBUy literal 0 HcmV?d00001 diff --git a/mobile/openapi/lib/model/sync_user_v1.dart b/mobile/openapi/lib/model/sync_user_v1.dart new file mode 100644 index 0000000000000000000000000000000000000000..b9b41bb723961bd62249f258547f070ab2b896b1 GIT binary patch literal 3475 zcmbVOZBH9H5dNNDF;uA8=(Y>ohZCZgP;|hFULjHml}-^_nO!GwZ?jHqr%^=n-*0BT zFOHHc=spnE<9YKl&y0O^)HynWi-*zh-Je%?SD!CNSC?>o^XV#v%OTtiM{qm5yt)4S z0M$tILoST#|C*efc4(=lr8Y91Xp>GkKrdt|7AnEN_nl5w2y5YfU1kqfnlC5t+yOYkqkooeX-W$MO1Xk@ ztMLemQ_5t5s;xXCOMv>2;AYAuIU?m`e&OOhn{qBxv2R1=`=o>#$u~XB8{kqnIZ|_Z z4owi2-#eqqDX{ZUpz?4<&UpPnf0xD}y6Eyr>w@qx!f6Bjel&dgDl||_~E}mZDINLEuWgrTPV0l}y z%!Hl&p>tFma6%2UiO{9Yq_x7ViIB*#nY7BXTGRl`5tc;(=Wq;XZT1IA?YalQcpJ`q zSFG|~8RgtccdAhp(rNz@aRIt5;l@pR{ln%n_R#eYIgc3e>K`JV7@z(jMSSdXq+7B0 z-8+sUU*m?#2H^BZt4oaJxja#&DBvrKpbtCR&ztvrWHkY1Lr~!ZEC|9?`vLYbHr*!8 z?jw5K?vR7T=?9lZJ^X)T_33ZNC-uR7oDImr$|^N1;eMwSx6VR0ymUI^)M9wkQt}OsCW4G8!MijNFyGo3V2e_$l4QcV9dbo}$dR)O z;5JIcF=(sd(;3XgqMnEmF*b&fZnWz5{8S}Fu09)mA;^c5(Rv}PBnFMWr%dmlwNv?v zMo6O2f{5~>4Rfots!-hUIjiH>yk>q?Rr{m}x#sQ~K!Jj5B+pj)JQ~kD+^0sb#8ZKj z5GDewpAr(R8$I$lbC6rD0`>f)qLUBwKps@-XKxsRIl*P zWYu=Rrh=zQKVV=&^yd>`vK16mvzAmlWRp6Prbm>14+F}id(Q&R!-A_q8gqv zsK3}thuPt0% zz^yW(X)t_g7|^I>e6)YVtdyIfh&qH`fi3dyyaMW1^A~SHRHTP6`9aE5-F6$?n8cQ_ z^9H0!!o3`b;dR}IK05LEVnbd4 literal 0 HcmV?d00001 diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 14245e11b..1d0f06599 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -5802,6 +5802,107 @@ ] } }, + "/sync/ack": { + "delete": { + "operationId": "deleteSyncAck", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SyncAckDeleteDto" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "tags": [ + "Sync" + ] + }, + "get": { + "operationId": "getSyncAck", + "parameters": [], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "items": { + "$ref": "#/components/schemas/SyncAckDto" + }, + "type": "array" + } + } + }, + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "tags": [ + "Sync" + ] + }, + "post": { + "operationId": "sendSyncAck", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SyncAckSetDto" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "tags": [ + "Sync" + ] + } + }, "/sync/delta-sync": { "post": { "operationId": "getDeltaSync", @@ -5889,6 +5990,41 @@ ] } }, + "/sync/stream": { + "post": { + "operationId": "getSyncStream", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SyncStreamDto" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "" + } + }, + "security": [ + { + "bearer": [] + }, + { + "cookie": [] + }, + { + "api_key": [] + } + ], + "tags": [ + "Sync" + ] + } + }, "/system-config": { "get": { "operationId": "getConfig", @@ -11696,6 +11832,113 @@ }, "type": "object" }, + "SyncAckDeleteDto": { + "properties": { + "types": { + "items": { + "$ref": "#/components/schemas/SyncEntityType" + }, + "type": "array" + } + }, + "type": "object" + }, + "SyncAckDto": { + "properties": { + "ack": { + "type": "string" + }, + "type": { + "allOf": [ + { + "$ref": "#/components/schemas/SyncEntityType" + } + ] + } + }, + "required": [ + "ack", + "type" + ], + "type": "object" + }, + "SyncAckSetDto": { + "properties": { + "acks": { + "items": { + "type": "string" + }, + "type": "array" + } + }, + "required": [ + "acks" + ], + "type": "object" + }, + "SyncEntityType": { + "enum": [ + "UserV1", + "UserDeleteV1" + ], + "type": "string" + }, + "SyncRequestType": { + "enum": [ + "UsersV1" + ], + "type": "string" + }, + "SyncStreamDto": { + "properties": { + "types": { + "items": { + "$ref": "#/components/schemas/SyncRequestType" + }, + "type": "array" + } + }, + "required": [ + "types" + ], + "type": "object" + }, + "SyncUserDeleteV1": { + "properties": { + "userId": { + "type": "string" + } + }, + "required": [ + "userId" + ], + "type": "object" + }, + "SyncUserV1": { + "properties": { + "deletedAt": { + "format": "date-time", + "nullable": true, + "type": "string" + }, + "email": { + "type": "string" + }, + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + }, + "required": [ + "deletedAt", + "email", + "id", + "name" + ], + "type": "object" + }, "SystemConfigBackupsDto": { "properties": { "database": { diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index 9ff35331f..8b2e88183 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -1104,6 +1104,16 @@ export type StackCreateDto = { export type StackUpdateDto = { primaryAssetId?: string; }; +export type SyncAckDeleteDto = { + types?: SyncEntityType[]; +}; +export type SyncAckDto = { + ack: string; + "type": SyncEntityType; +}; +export type SyncAckSetDto = { + acks: string[]; +}; export type AssetDeltaSyncDto = { updatedAfter: string; userIds: string[]; @@ -1119,6 +1129,9 @@ export type AssetFullSyncDto = { updatedUntil: string; userId?: string; }; +export type SyncStreamDto = { + types: SyncRequestType[]; +}; export type DatabaseBackupConfig = { cronExpression: string; enabled: boolean; @@ -2912,6 +2925,32 @@ export function updateStack({ id, stackUpdateDto }: { body: stackUpdateDto }))); } +export function deleteSyncAck({ syncAckDeleteDto }: { + syncAckDeleteDto: SyncAckDeleteDto; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchText("/sync/ack", oazapfts.json({ + ...opts, + method: "DELETE", + body: syncAckDeleteDto + }))); +} +export function getSyncAck(opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchJson<{ + status: 200; + data: SyncAckDto[]; + }>("/sync/ack", { + ...opts + })); +} +export function sendSyncAck({ syncAckSetDto }: { + syncAckSetDto: SyncAckSetDto; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchText("/sync/ack", oazapfts.json({ + ...opts, + method: "POST", + body: syncAckSetDto + }))); +} export function getDeltaSync({ assetDeltaSyncDto }: { assetDeltaSyncDto: AssetDeltaSyncDto; }, opts?: Oazapfts.RequestOpts) { @@ -2936,6 +2975,15 @@ export function getFullSyncForUser({ assetFullSyncDto }: { body: assetFullSyncDto }))); } +export function getSyncStream({ syncStreamDto }: { + syncStreamDto: SyncStreamDto; +}, opts?: Oazapfts.RequestOpts) { + return oazapfts.ok(oazapfts.fetchText("/sync/stream", oazapfts.json({ + ...opts, + method: "POST", + body: syncStreamDto + }))); +} export function getConfig(opts?: Oazapfts.RequestOpts) { return oazapfts.ok(oazapfts.fetchJson<{ status: 200; @@ -3548,6 +3596,13 @@ export enum Error2 { NoPermission = "no_permission", NotFound = "not_found" } +export enum SyncEntityType { + UserV1 = "UserV1", + UserDeleteV1 = "UserDeleteV1" +} +export enum SyncRequestType { + UsersV1 = "UsersV1" +} export enum TranscodeHWAccel { Nvenc = "nvenc", Qsv = "qsv", diff --git a/server/src/app.module.ts b/server/src/app.module.ts index a4518598a..b02d869a1 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -29,7 +29,7 @@ import { AuthService } from 'src/services/auth.service'; import { CliService } from 'src/services/cli.service'; import { DatabaseService } from 'src/services/database.service'; -const common = [...repositories, ...services]; +const common = [...repositories, ...services, GlobalExceptionFilter]; const middleware = [ FileUploadInterceptor, diff --git a/server/src/controllers/sync.controller.ts b/server/src/controllers/sync.controller.ts index 4d970a710..0945810be 100644 --- a/server/src/controllers/sync.controller.ts +++ b/server/src/controllers/sync.controller.ts @@ -1,15 +1,28 @@ -import { Body, Controller, HttpCode, HttpStatus, Post } from '@nestjs/common'; +import { Body, Controller, Delete, Get, Header, HttpCode, HttpStatus, Post, Res } from '@nestjs/common'; import { ApiTags } from '@nestjs/swagger'; +import { Response } from 'express'; import { AssetResponseDto } from 'src/dtos/asset-response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; -import { AssetDeltaSyncDto, AssetDeltaSyncResponseDto, AssetFullSyncDto } from 'src/dtos/sync.dto'; +import { + AssetDeltaSyncDto, + AssetDeltaSyncResponseDto, + AssetFullSyncDto, + SyncAckDeleteDto, + SyncAckDto, + SyncAckSetDto, + SyncStreamDto, +} from 'src/dtos/sync.dto'; import { Auth, Authenticated } from 'src/middleware/auth.guard'; +import { GlobalExceptionFilter } from 'src/middleware/global-exception.filter'; import { SyncService } from 'src/services/sync.service'; @ApiTags('Sync') @Controller('sync') export class SyncController { - constructor(private service: SyncService) {} + constructor( + private service: SyncService, + private errorService: GlobalExceptionFilter, + ) {} @Post('full-sync') @HttpCode(HttpStatus.OK) @@ -24,4 +37,37 @@ export class SyncController { getDeltaSync(@Auth() auth: AuthDto, @Body() dto: AssetDeltaSyncDto): Promise { return this.service.getDeltaSync(auth, dto); } + + @Post('stream') + @Header('Content-Type', 'application/jsonlines+json') + @HttpCode(HttpStatus.OK) + @Authenticated() + async getSyncStream(@Auth() auth: AuthDto, @Res() res: Response, @Body() dto: SyncStreamDto) { + try { + await this.service.stream(auth, res, dto); + } catch (error: Error | any) { + res.setHeader('Content-Type', 'application/json'); + this.errorService.handleError(res, error); + } + } + + @Get('ack') + @Authenticated() + getSyncAck(@Auth() auth: AuthDto): Promise { + return this.service.getAcks(auth); + } + + @Post('ack') + @HttpCode(HttpStatus.NO_CONTENT) + @Authenticated() + sendSyncAck(@Auth() auth: AuthDto, @Body() dto: SyncAckSetDto) { + return this.service.setAcks(auth, dto); + } + + @Delete('ack') + @HttpCode(HttpStatus.NO_CONTENT) + @Authenticated() + deleteSyncAck(@Auth() auth: AuthDto, @Body() dto: SyncAckDeleteDto) { + return this.service.deleteAcks(auth, dto); + } } diff --git a/server/src/database.ts b/server/src/database.ts index 4fcab0fd6..c3fb4cbab 100644 --- a/server/src/database.ts +++ b/server/src/database.ts @@ -1,3 +1,4 @@ +import { sql } from 'kysely'; import { Permission } from 'src/enum'; export type AuthUser = { @@ -29,6 +30,8 @@ export type AuthSession = { }; export const columns = { + ackEpoch: (columnName: 'createdAt' | 'updatedAt' | 'deletedAt') => + sql.raw(`extract(epoch from "${columnName}")::text`).as('ackEpoch'), authUser: [ 'users.id', 'users.name', diff --git a/server/src/db.d.ts b/server/src/db.d.ts index 2e10e1ade..255ac8cd2 100644 --- a/server/src/db.d.ts +++ b/server/src/db.d.ts @@ -4,7 +4,7 @@ */ import type { ColumnType } from 'kysely'; -import { Permission } from 'src/enum'; +import { Permission, SyncEntityType } from 'src/enum'; export type ArrayType = ArrayTypeImpl extends (infer U)[] ? U[] : ArrayTypeImpl; @@ -294,6 +294,15 @@ export interface Sessions { userId: string; } +export interface SessionSyncCheckpoints { + ack: string; + createdAt: Generated; + sessionId: string; + type: SyncEntityType; + updatedAt: Generated; +} + + export interface SharedLinkAsset { assetsId: string; sharedLinksId: string; @@ -384,6 +393,11 @@ export interface Users { updatedAt: Generated; } +export interface UsersAudit { + userId: string; + deletedAt: Generated; +} + export interface VectorsPgVectorIndexStat { idx_growing: ArrayType | null; idx_indexing: boolean | null; @@ -429,6 +443,7 @@ export interface DB { partners: Partners; person: Person; sessions: Sessions; + session_sync_checkpoints: SessionSyncCheckpoints; shared_link__asset: SharedLinkAsset; shared_links: SharedLinks; smart_search: SmartSearch; @@ -440,6 +455,7 @@ export interface DB { typeorm_metadata: TypeormMetadata; user_metadata: UserMetadata; users: Users; + users_audit: UsersAudit; 'vectors.pg_vector_index_stat': VectorsPgVectorIndexStat; version_history: VersionHistory; } diff --git a/server/src/dtos/sync.dto.ts b/server/src/dtos/sync.dto.ts index 820de8d6c..0628a566c 100644 --- a/server/src/dtos/sync.dto.ts +++ b/server/src/dtos/sync.dto.ts @@ -1,7 +1,8 @@ import { ApiProperty } from '@nestjs/swagger'; -import { IsInt, IsPositive } from 'class-validator'; +import { IsEnum, IsInt, IsPositive, IsString } from 'class-validator'; import { AssetResponseDto } from 'src/dtos/asset-response.dto'; -import { ValidateDate, ValidateUUID } from 'src/validation'; +import { SyncEntityType, SyncRequestType } from 'src/enum'; +import { Optional, ValidateDate, ValidateUUID } from 'src/validation'; export class AssetFullSyncDto { @ValidateUUID({ optional: true }) @@ -32,3 +33,51 @@ export class AssetDeltaSyncResponseDto { upserted!: AssetResponseDto[]; deleted!: string[]; } + +export class SyncUserV1 { + id!: string; + name!: string; + email!: string; + deletedAt!: Date | null; +} + +export class SyncUserDeleteV1 { + userId!: string; +} + +export type SyncItem = { + [SyncEntityType.UserV1]: SyncUserV1; + [SyncEntityType.UserDeleteV1]: SyncUserDeleteV1; +}; + +const responseDtos = [ + // + SyncUserV1, + SyncUserDeleteV1, +]; + +export const extraSyncModels = responseDtos; + +export class SyncStreamDto { + @IsEnum(SyncRequestType, { each: true }) + @ApiProperty({ enumName: 'SyncRequestType', enum: SyncRequestType, isArray: true }) + types!: SyncRequestType[]; +} + +export class SyncAckDto { + @ApiProperty({ enumName: 'SyncEntityType', enum: SyncEntityType }) + type!: SyncEntityType; + ack!: string; +} + +export class SyncAckSetDto { + @IsString({ each: true }) + acks!: string[]; +} + +export class SyncAckDeleteDto { + @IsEnum(SyncEntityType, { each: true }) + @ApiProperty({ enumName: 'SyncEntityType', enum: SyncEntityType, isArray: true }) + @Optional() + types?: SyncEntityType[]; +} diff --git a/server/src/entities/index.ts b/server/src/entities/index.ts index 75e92038a..a1df269c0 100644 --- a/server/src/entities/index.ts +++ b/server/src/entities/index.ts @@ -20,8 +20,10 @@ import { SessionEntity } from 'src/entities/session.entity'; import { SharedLinkEntity } from 'src/entities/shared-link.entity'; import { SmartSearchEntity } from 'src/entities/smart-search.entity'; import { StackEntity } from 'src/entities/stack.entity'; +import { SessionSyncCheckpointEntity } from 'src/entities/sync-checkpoint.entity'; import { SystemMetadataEntity } from 'src/entities/system-metadata.entity'; import { TagEntity } from 'src/entities/tag.entity'; +import { UserAuditEntity } from 'src/entities/user-audit.entity'; import { UserMetadataEntity } from 'src/entities/user-metadata.entity'; import { UserEntity } from 'src/entities/user.entity'; import { VersionHistoryEntity } from 'src/entities/version-history.entity'; @@ -44,12 +46,14 @@ export const entities = [ MoveEntity, PartnerEntity, PersonEntity, + SessionSyncCheckpointEntity, SharedLinkEntity, SmartSearchEntity, StackEntity, SystemMetadataEntity, TagEntity, UserEntity, + UserAuditEntity, UserMetadataEntity, SessionEntity, LibraryEntity, diff --git a/server/src/entities/sync-checkpoint.entity.ts b/server/src/entities/sync-checkpoint.entity.ts new file mode 100644 index 000000000..2a91d2386 --- /dev/null +++ b/server/src/entities/sync-checkpoint.entity.ts @@ -0,0 +1,24 @@ +import { SessionEntity } from 'src/entities/session.entity'; +import { SyncEntityType } from 'src/enum'; +import { Column, CreateDateColumn, Entity, ManyToOne, PrimaryColumn, UpdateDateColumn } from 'typeorm'; + +@Entity('session_sync_checkpoints') +export class SessionSyncCheckpointEntity { + @ManyToOne(() => SessionEntity, { onDelete: 'CASCADE', onUpdate: 'CASCADE' }) + session?: SessionEntity; + + @PrimaryColumn() + sessionId!: string; + + @PrimaryColumn({ type: 'varchar' }) + type!: SyncEntityType; + + @CreateDateColumn({ type: 'timestamptz' }) + createdAt!: Date; + + @UpdateDateColumn({ type: 'timestamptz' }) + updatedAt!: Date; + + @Column() + ack!: string; +} diff --git a/server/src/entities/user-audit.entity.ts b/server/src/entities/user-audit.entity.ts new file mode 100644 index 000000000..305994a6d --- /dev/null +++ b/server/src/entities/user-audit.entity.ts @@ -0,0 +1,14 @@ +import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm'; + +@Entity('users_audit') +@Index('IDX_users_audit_deleted_at_asc_user_id_asc', ['deletedAt', 'userId']) +export class UserAuditEntity { + @PrimaryGeneratedColumn('increment') + id!: number; + + @Column({ type: 'uuid' }) + userId!: string; + + @CreateDateColumn({ type: 'timestamptz' }) + deletedAt!: Date; +} diff --git a/server/src/entities/user.entity.ts b/server/src/entities/user.entity.ts index 3f5b470ce..b597d15cf 100644 --- a/server/src/entities/user.entity.ts +++ b/server/src/entities/user.entity.ts @@ -10,12 +10,14 @@ import { CreateDateColumn, DeleteDateColumn, Entity, + Index, OneToMany, PrimaryGeneratedColumn, UpdateDateColumn, } from 'typeorm'; @Entity('users') +@Index('IDX_users_updated_at_asc_id_asc', ['updatedAt', 'id']) export class UserEntity { @PrimaryGeneratedColumn('uuid') id!: string; diff --git a/server/src/enum.ts b/server/src/enum.ts index 0c1fb01a1..b99518c4f 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -537,3 +537,12 @@ export enum DatabaseLock { GetSystemConfig = 69, BackupDatabase = 42, } + +export enum SyncRequestType { + UsersV1 = 'UsersV1', +} + +export enum SyncEntityType { + UserV1 = 'UserV1', + UserDeleteV1 = 'UserDeleteV1', +} diff --git a/server/src/middleware/global-exception.filter.ts b/server/src/middleware/global-exception.filter.ts index 7d7ade471..a8afa91cb 100644 --- a/server/src/middleware/global-exception.filter.ts +++ b/server/src/middleware/global-exception.filter.ts @@ -22,6 +22,13 @@ export class GlobalExceptionFilter implements ExceptionFilter { } } + handleError(res: Response, error: Error) { + const { status, body } = this.fromError(error); + if (!res.headersSent) { + res.status(status).json({ ...body, statusCode: status, correlationId: this.cls.getId() }); + } + } + private fromError(error: Error) { logGlobalError(this.logger, error); diff --git a/server/src/migrations/1740001232576-AddSessionSyncCheckpointTable.ts b/server/src/migrations/1740001232576-AddSessionSyncCheckpointTable.ts new file mode 100644 index 000000000..ef75dd7c0 --- /dev/null +++ b/server/src/migrations/1740001232576-AddSessionSyncCheckpointTable.ts @@ -0,0 +1,22 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class AddSessionSyncCheckpointTable1740001232576 implements MigrationInterface { + name = 'AddSessionSyncCheckpointTable1740001232576' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE TABLE "session_sync_checkpoints" ("sessionId" uuid NOT NULL, "type" character varying NOT NULL, "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "ack" character varying NOT NULL, CONSTRAINT "PK_b846ab547a702863ef7cd9412fb" PRIMARY KEY ("sessionId", "type"))`); + await queryRunner.query(`ALTER TABLE "session_sync_checkpoints" ADD CONSTRAINT "FK_d8ddd9d687816cc490432b3d4bc" FOREIGN KEY ("sessionId") REFERENCES "sessions"("id") ON DELETE CASCADE ON UPDATE CASCADE`); + await queryRunner.query(` + create trigger session_sync_checkpoints_updated_at + before update on session_sync_checkpoints + for each row execute procedure updated_at() + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`drop trigger session_sync_checkpoints_updated_at on session_sync_checkpoints`); + await queryRunner.query(`ALTER TABLE "session_sync_checkpoints" DROP CONSTRAINT "FK_d8ddd9d687816cc490432b3d4bc"`); + await queryRunner.query(`DROP TABLE "session_sync_checkpoints"`); + } + +} diff --git a/server/src/migrations/1740064899123-AddUsersAuditTable.ts b/server/src/migrations/1740064899123-AddUsersAuditTable.ts new file mode 100644 index 000000000..b8f2ce5e3 --- /dev/null +++ b/server/src/migrations/1740064899123-AddUsersAuditTable.ts @@ -0,0 +1,34 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class AddUsersAuditTable1740064899123 implements MigrationInterface { + name = 'AddUsersAuditTable1740064899123' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE INDEX IF NOT EXISTS "IDX_users_updated_at_asc_id_asc" ON "users" ("updatedAt" ASC, "id" ASC);`) + await queryRunner.query(`CREATE TABLE "users_audit" ("id" SERIAL NOT NULL, "userId" uuid NOT NULL, "deletedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180" PRIMARY KEY ("id"))`); + await queryRunner.query(`CREATE INDEX IF NOT EXISTS "IDX_users_audit_deleted_at_asc_user_id_asc" ON "users_audit" ("deletedAt" ASC, "userId" ASC);`) + await queryRunner.query(`CREATE OR REPLACE FUNCTION users_delete_audit() RETURNS TRIGGER AS + $$ + BEGIN + INSERT INTO users_audit ("userId") + SELECT "id" + FROM OLD; + RETURN NULL; + END; + $$ LANGUAGE plpgsql` + ); + await queryRunner.query(`CREATE OR REPLACE TRIGGER users_delete_audit + AFTER DELETE ON users + REFERENCING OLD TABLE AS OLD + FOR EACH STATEMENT + EXECUTE FUNCTION users_delete_audit(); + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TRIGGER users_delete_audit`); + await queryRunner.query(`DROP FUNCTION users_delete_audit`); + await queryRunner.query(`DROP TABLE "users_audit"`); + } + +} diff --git a/server/src/repositories/index.ts b/server/src/repositories/index.ts index d3a8aeeb6..180d8ccd4 100644 --- a/server/src/repositories/index.ts +++ b/server/src/repositories/index.ts @@ -30,6 +30,7 @@ import { SessionRepository } from 'src/repositories/session.repository'; import { SharedLinkRepository } from 'src/repositories/shared-link.repository'; import { StackRepository } from 'src/repositories/stack.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { TagRepository } from 'src/repositories/tag.repository'; import { TelemetryRepository } from 'src/repositories/telemetry.repository'; @@ -71,6 +72,7 @@ export const repositories = [ SharedLinkRepository, StackRepository, StorageRepository, + SyncRepository, SystemMetadataRepository, TagRepository, TelemetryRepository, diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts new file mode 100644 index 000000000..4023bf890 --- /dev/null +++ b/server/src/repositories/sync.repository.ts @@ -0,0 +1,80 @@ +import { Injectable } from '@nestjs/common'; +import { Insertable, Kysely, sql } from 'kysely'; +import { InjectKysely } from 'nestjs-kysely'; +import { columns } from 'src/database'; +import { DB, SessionSyncCheckpoints } from 'src/db'; +import { SyncEntityType } from 'src/enum'; +import { SyncAck } from 'src/types'; + +@Injectable() +export class SyncRepository { + constructor(@InjectKysely() private db: Kysely) {} + + getCheckpoints(sessionId: string) { + return this.db + .selectFrom('session_sync_checkpoints') + .select(['type', 'ack']) + .where('sessionId', '=', sessionId) + .execute(); + } + + upsertCheckpoints(items: Insertable[]) { + return this.db + .insertInto('session_sync_checkpoints') + .values(items) + .onConflict((oc) => + oc.columns(['sessionId', 'type']).doUpdateSet((eb) => ({ + ack: eb.ref('excluded.ack'), + })), + ) + .execute(); + } + + deleteCheckpoints(sessionId: string, types?: SyncEntityType[]) { + return this.db + .deleteFrom('session_sync_checkpoints') + .where('sessionId', '=', sessionId) + .$if(!!types, (qb) => qb.where('type', 'in', types!)) + .execute(); + } + + getUserUpserts(ack?: SyncAck) { + return this.db + .selectFrom('users') + .select(['id', 'name', 'email', 'deletedAt']) + .select(columns.ackEpoch('updatedAt')) + .$if(!!ack, (qb) => + qb.where((eb) => + eb.or([ + eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('updatedAt')), + eb.and([ + eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('updatedAt')), + eb('id', '>', ack!.ids[0]), + ]), + ]), + ), + ) + .orderBy(['updatedAt asc', 'id asc']) + .stream(); + } + + getUserDeletes(ack?: SyncAck) { + return this.db + .selectFrom('users_audit') + .select(['userId']) + .select(columns.ackEpoch('deletedAt')) + .$if(!!ack, (qb) => + qb.where((eb) => + eb.or([ + eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('deletedAt')), + eb.and([ + eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('deletedAt')), + eb('userId', '>', ack!.ids[0]), + ]), + ]), + ), + ) + .orderBy(['deletedAt asc', 'userId asc']) + .stream(); + } +} diff --git a/server/src/services/base.service.ts b/server/src/services/base.service.ts index f476adba1..63cca43cc 100644 --- a/server/src/services/base.service.ts +++ b/server/src/services/base.service.ts @@ -38,6 +38,7 @@ import { SessionRepository } from 'src/repositories/session.repository'; import { SharedLinkRepository } from 'src/repositories/shared-link.repository'; import { StackRepository } from 'src/repositories/stack.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { TagRepository } from 'src/repositories/tag.repository'; import { TelemetryRepository } from 'src/repositories/telemetry.repository'; @@ -85,6 +86,7 @@ export class BaseService { protected sharedLinkRepository: SharedLinkRepository, protected stackRepository: StackRepository, protected storageRepository: StorageRepository, + protected syncRepository: SyncRepository, protected systemMetadataRepository: SystemMetadataRepository, protected tagRepository: TagRepository, protected telemetryRepository: TelemetryRepository, diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index fe967e37e..b94e8cfcb 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -1,18 +1,112 @@ -import { Injectable } from '@nestjs/common'; +import { ForbiddenException, Injectable } from '@nestjs/common'; +import { Insertable } from 'kysely'; import { DateTime } from 'luxon'; +import { Writable } from 'node:stream'; import { AUDIT_LOG_MAX_DURATION } from 'src/constants'; +import { SessionSyncCheckpoints } from 'src/db'; import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; -import { AssetDeltaSyncDto, AssetDeltaSyncResponseDto, AssetFullSyncDto } from 'src/dtos/sync.dto'; -import { DatabaseAction, EntityType, Permission } from 'src/enum'; +import { + AssetDeltaSyncDto, + AssetDeltaSyncResponseDto, + AssetFullSyncDto, + SyncAckDeleteDto, + SyncAckSetDto, + SyncStreamDto, +} from 'src/dtos/sync.dto'; +import { DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum'; import { BaseService } from 'src/services/base.service'; +import { SyncAck } from 'src/types'; import { getMyPartnerIds } from 'src/utils/asset.util'; import { setIsEqual } from 'src/utils/set'; +import { fromAck, serialize } from 'src/utils/sync'; const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] }; +const SYNC_TYPES_ORDER = [ + // + SyncRequestType.UsersV1, +]; + +const throwSessionRequired = () => { + throw new ForbiddenException('Sync endpoints cannot be used with API keys'); +}; @Injectable() export class SyncService extends BaseService { + getAcks(auth: AuthDto) { + const sessionId = auth.session?.id; + if (!sessionId) { + return throwSessionRequired(); + } + + return this.syncRepository.getCheckpoints(sessionId); + } + + async setAcks(auth: AuthDto, dto: SyncAckSetDto) { + // TODO ack validation + + const sessionId = auth.session?.id; + if (!sessionId) { + return throwSessionRequired(); + } + + const checkpoints: Insertable[] = []; + for (const ack of dto.acks) { + const { type } = fromAck(ack); + checkpoints.push({ sessionId, type, ack }); + } + + await this.syncRepository.upsertCheckpoints(checkpoints); + } + + async deleteAcks(auth: AuthDto, dto: SyncAckDeleteDto) { + const sessionId = auth.session?.id; + if (!sessionId) { + return throwSessionRequired(); + } + + await this.syncRepository.deleteCheckpoints(sessionId, dto.types); + } + + async stream(auth: AuthDto, response: Writable, dto: SyncStreamDto) { + const sessionId = auth.session?.id; + if (!sessionId) { + return throwSessionRequired(); + } + + const checkpoints = await this.syncRepository.getCheckpoints(sessionId); + const checkpointMap: Partial> = Object.fromEntries( + checkpoints.map(({ type, ack }) => [type, fromAck(ack)]), + ); + + // TODO pre-filter/sort list based on optimal sync order + + for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { + switch (type) { + case SyncRequestType.UsersV1: { + const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]); + for await (const { ackEpoch, ...data } of deletes) { + response.write(serialize({ type: SyncEntityType.UserDeleteV1, ackEpoch, ids: [data.userId], data })); + } + + const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]); + for await (const { ackEpoch, ...data } of upserts) { + response.write(serialize({ type: SyncEntityType.UserV1, ackEpoch, ids: [data.id], data })); + } + + break; + } + + default: { + this.logger.warn(`Unsupported sync type: ${type}`); + break; + } + } + } + + response.end(); + } + async getFullSync(auth: AuthDto, dto: AssetFullSyncDto): Promise { // mobile implementation is faster if this is a single id const userId = dto.userId || auth.user.id; diff --git a/server/src/types.ts b/server/src/types.ts index 3a331127e..544d35524 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -4,6 +4,7 @@ import { ImageFormat, JobName, QueueName, + SyncEntityType, TranscodeTarget, VideoCodec, } from 'src/enum'; @@ -409,3 +410,9 @@ export interface IBulkAsset { addAssetIds: (id: string, assetIds: string[]) => Promise; removeAssetIds: (id: string, assetIds: string[]) => Promise; } + +export type SyncAck = { + type: SyncEntityType; + ackEpoch: string; + ids: string[]; +}; diff --git a/server/src/utils/misc.ts b/server/src/utils/misc.ts index 13969543e..e07d0fe03 100644 --- a/server/src/utils/misc.ts +++ b/server/src/utils/misc.ts @@ -12,6 +12,7 @@ import { writeFileSync } from 'node:fs'; import path from 'node:path'; import { SystemConfig } from 'src/config'; import { CLIP_MODEL_INFO, serverVersion } from 'src/constants'; +import { extraSyncModels } from 'src/dtos/sync.dto'; import { ImmichCookie, ImmichHeader, MetadataKey } from 'src/enum'; import { LoggingRepository } from 'src/repositories/logging.repository'; @@ -245,6 +246,7 @@ export const useSwagger = (app: INestApplication, { write }: { write: boolean }) const options: SwaggerDocumentOptions = { operationIdFactory: (controllerKey: string, methodKey: string) => methodKey, + extraModels: extraSyncModels, }; const specification = SwaggerModule.createDocument(app, config, options); diff --git a/server/src/utils/sync.ts b/server/src/utils/sync.ts new file mode 100644 index 000000000..8e426ab86 --- /dev/null +++ b/server/src/utils/sync.ts @@ -0,0 +1,30 @@ +import { SyncItem } from 'src/dtos/sync.dto'; +import { SyncEntityType } from 'src/enum'; +import { SyncAck } from 'src/types'; + +type Impossible = { + [P in K]: never; +}; + +type Exact = U & Impossible>; + +export const fromAck = (ack: string): SyncAck => { + const [type, timestamp, ...ids] = ack.split('|'); + return { type: type as SyncEntityType, ackEpoch: timestamp, ids }; +}; + +export const toAck = ({ type, ackEpoch, ids }: SyncAck) => [type, ackEpoch, ...ids].join('|'); + +export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n'; + +export const serialize = ({ + type, + ackEpoch, + ids, + data, +}: { + type: T; + ackEpoch: string; + ids: string[]; + data: Exact; +}) => mapJsonLine({ type, data, ack: toAck({ type, ackEpoch, ids }) }); diff --git a/server/test/repositories/sync.repository.mock.ts b/server/test/repositories/sync.repository.mock.ts new file mode 100644 index 000000000..fbb8ec2f6 --- /dev/null +++ b/server/test/repositories/sync.repository.mock.ts @@ -0,0 +1,13 @@ +import { SyncRepository } from 'src/repositories/sync.repository'; +import { RepositoryInterface } from 'src/types'; +import { Mocked, vitest } from 'vitest'; + +export const newSyncRepositoryMock = (): Mocked> => { + return { + getCheckpoints: vitest.fn(), + upsertCheckpoints: vitest.fn(), + deleteCheckpoints: vitest.fn(), + getUserUpserts: vitest.fn(), + getUserDeletes: vitest.fn(), + }; +}; diff --git a/server/test/utils.ts b/server/test/utils.ts index d1dda3eed..ca2272f6b 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -34,6 +34,7 @@ import { SessionRepository } from 'src/repositories/session.repository'; import { SharedLinkRepository } from 'src/repositories/shared-link.repository'; import { StackRepository } from 'src/repositories/stack.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { TagRepository } from 'src/repositories/tag.repository'; import { TelemetryRepository } from 'src/repositories/telemetry.repository'; @@ -75,6 +76,7 @@ import { newSessionRepositoryMock } from 'test/repositories/session.repository.m import { newSharedLinkRepositoryMock } from 'test/repositories/shared-link.repository.mock'; import { newStackRepositoryMock } from 'test/repositories/stack.repository.mock'; import { newStorageRepositoryMock } from 'test/repositories/storage.repository.mock'; +import { newSyncRepositoryMock } from 'test/repositories/sync.repository.mock'; import { newSystemMetadataRepositoryMock } from 'test/repositories/system-metadata.repository.mock'; import { newTagRepositoryMock } from 'test/repositories/tag.repository.mock'; import { ITelemetryRepositoryMock, newTelemetryRepositoryMock } from 'test/repositories/telemetry.repository.mock'; @@ -178,6 +180,7 @@ export const newTestService = ( const sharedLinkMock = newSharedLinkRepositoryMock(); const stackMock = newStackRepositoryMock(); const storageMock = newStorageRepositoryMock(); + const syncMock = newSyncRepositoryMock(); const systemMock = newSystemMetadataRepositoryMock(); const tagMock = newTagRepositoryMock(); const telemetryMock = newTelemetryRepositoryMock(); @@ -219,6 +222,7 @@ export const newTestService = ( sharedLinkMock as RepositoryInterface as SharedLinkRepository, stackMock as RepositoryInterface as StackRepository, storageMock as RepositoryInterface as StorageRepository, + syncMock as RepositoryInterface as SyncRepository, systemMock as RepositoryInterface as SystemMetadataRepository, tagMock as RepositoryInterface as TagRepository, telemetryMock as unknown as TelemetryRepository,