mirror of
https://github.com/immich-app/immich.git
synced 2025-03-11 15:09:45 +02:00
wip: user sync stream
This commit is contained in:
parent
8fb2da5ef7
commit
5ee072dabc
7
mobile/lib/domain/interfaces/sync-stream.interface.dart
Normal file
7
mobile/lib/domain/interfaces/sync-stream.interface.dart
Normal file
@ -0,0 +1,7 @@
|
||||
import 'package:immich_mobile/domain/interfaces/db.interface.dart';
|
||||
import 'package:openapi/api.dart';
|
||||
|
||||
abstract interface class ISyncStreamRepository implements IDatabaseRepository {
|
||||
Future<bool> updateUsersV1(SyncUserV1 data);
|
||||
Future<bool> deleteUsersV1(SyncUserDeleteV1 data);
|
||||
}
|
@ -1,13 +1,9 @@
|
||||
class SyncEvent {
|
||||
// dynamic
|
||||
// ignore: avoid-dynamic
|
||||
final dynamic data;
|
||||
|
||||
final String ack;
|
||||
|
||||
SyncEvent({
|
||||
required this.data,
|
||||
required this.ack,
|
||||
});
|
||||
const SyncEvent({required this.data, required this.ack});
|
||||
|
||||
@override
|
||||
String toString() => 'SyncEvent(data: $data, ack: $ack)';
|
||||
|
@ -1,43 +1,42 @@
|
||||
// ignore_for_file: avoid-passing-async-when-sync-expected
|
||||
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:immich_mobile/domain/interfaces/sync-stream.interface.dart';
|
||||
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'package:openapi/api.dart';
|
||||
|
||||
class SyncStreamService {
|
||||
final Logger _logger = Logger('SyncStreamService');
|
||||
|
||||
final ISyncApiRepository _syncApiRepository;
|
||||
|
||||
SyncStreamService(this._syncApiRepository);
|
||||
|
||||
final ISyncStreamRepository _syncStreamRepository;
|
||||
StreamSubscription? _userSyncSubscription;
|
||||
|
||||
SyncStreamService({
|
||||
required ISyncApiRepository syncApiRepository,
|
||||
required ISyncStreamRepository syncStreamRepository,
|
||||
}) : _syncApiRepository = syncApiRepository,
|
||||
_syncStreamRepository = syncStreamRepository;
|
||||
|
||||
void syncUsers() {
|
||||
_userSyncSubscription =
|
||||
_syncApiRepository.watchUserSyncEvent().listen((events) async {
|
||||
for (final event in events) {
|
||||
bool status = false;
|
||||
if (event.data is SyncUserV1) {
|
||||
final data = event.data as SyncUserV1;
|
||||
debugPrint("User Update: $data");
|
||||
|
||||
// final user = await _userRepository.get(data.id);
|
||||
|
||||
// if (user == null) {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// user.name = data.name;
|
||||
// user.email = data.email;
|
||||
// user.updatedAt = DateTime.now();
|
||||
|
||||
// await _userRepository.update(user);
|
||||
// await _syncApiRepository.ack(event.ack);
|
||||
_logger.fine("UserSyncUpdate: ${event.data}");
|
||||
status = await _syncStreamRepository.updateUsersV1(event.data);
|
||||
}
|
||||
|
||||
if (event.data is SyncUserDeleteV1) {
|
||||
final data = event.data as SyncUserDeleteV1;
|
||||
_logger.fine("SyncUserDelete: ${event.data}");
|
||||
status = await _syncStreamRepository.deleteUsersV1(event.data);
|
||||
}
|
||||
|
||||
debugPrint("User delete: $data");
|
||||
// await _syncApiRepository.ack(event.ack);
|
||||
if (status) {
|
||||
await _syncApiRepository.ack(event.ack);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -0,0 +1,29 @@
|
||||
import 'package:drift/drift.dart';
|
||||
import 'package:immich_mobile/domain/interfaces/sync-stream.interface.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||
import 'package:openapi/api.dart';
|
||||
|
||||
class DriftSyncStreamRepository extends DriftDatabaseRepository
|
||||
implements ISyncStreamRepository {
|
||||
final Drift _db;
|
||||
|
||||
const DriftSyncStreamRepository(super.db) : _db = db;
|
||||
|
||||
@override
|
||||
Future<bool> deleteUsersV1(SyncUserDeleteV1 data) async {
|
||||
return (await _db.managers.userEntity
|
||||
.filter((row) => row.uid.equals(data.userId))
|
||||
.delete()) ==
|
||||
1;
|
||||
}
|
||||
|
||||
@override
|
||||
Future<bool> updateUsersV1(SyncUserV1 data) async {
|
||||
return (await _db.managers.userEntity
|
||||
.filter((row) => row.uid.equals(data.id))
|
||||
.update(
|
||||
(u) => u(email: Value(data.email), name: Value(data.name)),
|
||||
)) ==
|
||||
1;
|
||||
}
|
||||
}
|
@ -2,11 +2,11 @@ import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:http/http.dart' as http;
|
||||
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
|
||||
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
|
||||
import 'package:immich_mobile/services/api.service.dart';
|
||||
import 'package:openapi/api.dart';
|
||||
import 'package:http/http.dart' as http;
|
||||
|
||||
class SyncApiRepository implements ISyncApiRepository {
|
||||
final ApiService _api;
|
||||
@ -14,9 +14,7 @@ class SyncApiRepository implements ISyncApiRepository {
|
||||
|
||||
@override
|
||||
Stream<List<SyncEvent>> watchUserSyncEvent() {
|
||||
return _getSyncStream(
|
||||
SyncStreamDto(types: [SyncRequestType.usersV1]),
|
||||
);
|
||||
return _getSyncStream(SyncStreamDto(types: [SyncRequestType.usersV1]));
|
||||
}
|
||||
|
||||
@override
|
||||
@ -31,7 +29,7 @@ class SyncApiRepository implements ISyncApiRepository {
|
||||
final client = http.Client();
|
||||
final endpoint = "${_api.apiClient.basePath}/sync/stream";
|
||||
|
||||
final headers = <String, String>{
|
||||
final headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/jsonlines+json',
|
||||
};
|
||||
@ -61,6 +59,7 @@ class SyncApiRepository implements ISyncApiRepository {
|
||||
|
||||
await for (final chunk in response.stream.transform(utf8.decoder)) {
|
||||
previousChunk += chunk;
|
||||
// ignore: move-variable-outside-iteration
|
||||
final parts = previousChunk.split('\n');
|
||||
previousChunk = parts.removeLast();
|
||||
lines.addAll(parts);
|
||||
@ -81,6 +80,7 @@ class SyncApiRepository implements ISyncApiRepository {
|
||||
}
|
||||
}
|
||||
|
||||
// ignore: avoid-dynamic
|
||||
const _kResponseMap = <SyncEntityType, Function(dynamic)>{
|
||||
SyncEntityType.userV1: SyncUserV1.fromJson,
|
||||
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
|
||||
@ -90,7 +90,7 @@ const _kResponseMap = <SyncEntityType, Function(dynamic)>{
|
||||
List<SyncEvent> _parseSyncResponse(List<String> lines) {
|
||||
final List<SyncEvent> data = [];
|
||||
|
||||
for (var line in lines) {
|
||||
for (final line in lines) {
|
||||
try {
|
||||
final jsonData = jsonDecode(line);
|
||||
final type = SyncEntityType.fromJson(jsonData['type'])!;
|
||||
|
@ -1,3 +1,4 @@
|
||||
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||
import 'package:isar/isar.dart';
|
||||
import 'package:riverpod_annotation/riverpod_annotation.dart';
|
||||
|
||||
@ -5,3 +6,6 @@ part 'db.provider.g.dart';
|
||||
|
||||
@Riverpod(keepAlive: true)
|
||||
Isar isar(IsarRef ref) => throw UnimplementedError('isar');
|
||||
|
||||
@Riverpod(keepAlive: true)
|
||||
Drift drift(DriftRef _) => Drift();
|
||||
|
@ -20,5 +20,19 @@ final isarProvider = Provider<Isar>.internal(
|
||||
);
|
||||
|
||||
typedef IsarRef = ProviderRef<Isar>;
|
||||
String _$driftHash() => r'c1f614969c5af583671fc54700ec7387f1f17daf';
|
||||
|
||||
/// See also [drift].
|
||||
@ProviderFor(drift)
|
||||
final driftProvider = Provider<Drift>.internal(
|
||||
drift,
|
||||
name: r'driftProvider',
|
||||
debugGetCreateSourceHash:
|
||||
const bool.fromEnvironment('dart.vm.product') ? null : _$driftHash,
|
||||
dependencies: null,
|
||||
allTransitiveDependencies: null,
|
||||
);
|
||||
|
||||
typedef DriftRef = ProviderRef<Drift>;
|
||||
// ignore_for_file: type=lint
|
||||
// ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member
|
||||
|
@ -2,23 +2,26 @@ import 'dart:async';
|
||||
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/domain/services/sync_stream.service.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/sync-stream.repository.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
|
||||
import 'package:immich_mobile/providers/api.provider.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||
|
||||
final syncStreamServiceProvider = Provider(
|
||||
(ref) {
|
||||
final instance = SyncStreamService(
|
||||
ref.watch(syncApiRepositoryProvider),
|
||||
);
|
||||
final syncStreamServiceProvider = Provider((ref) {
|
||||
final instance = SyncStreamService(
|
||||
syncApiRepository: ref.watch(syncApiRepositoryProvider),
|
||||
syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
|
||||
);
|
||||
|
||||
ref.onDispose(() => unawaited(instance.dispose()));
|
||||
ref.onDispose(() => unawaited(instance.dispose()));
|
||||
|
||||
return instance;
|
||||
},
|
||||
);
|
||||
return instance;
|
||||
});
|
||||
|
||||
final syncApiRepositoryProvider = Provider(
|
||||
(ref) => SyncApiRepository(
|
||||
ref.watch(apiServiceProvider),
|
||||
),
|
||||
(ref) => SyncApiRepository(ref.watch(apiServiceProvider)),
|
||||
);
|
||||
|
||||
final syncStreamRepositoryProvider = Provider(
|
||||
(ref) => DriftSyncStreamRepository(ref.watch(driftProvider)),
|
||||
);
|
||||
|
Loading…
x
Reference in New Issue
Block a user