Add status command receiver

This commit is contained in:
problematicconsumer
2023-08-28 13:15:57 +03:30
parent 4f5c6c12a5
commit 01980ba28d
9 changed files with 147 additions and 6 deletions

View File

@@ -1,3 +1,5 @@
import 'dart:convert';
import 'package:fpdart/fpdart.dart'; import 'package:fpdart/fpdart.dart';
import 'package:hiddify/data/api/clash_api.dart'; import 'package:hiddify/data/api/clash_api.dart';
import 'package:hiddify/data/repository/exception_handlers.dart'; import 'package:hiddify/data/repository/exception_handlers.dart';
@@ -6,6 +8,7 @@ import 'package:hiddify/domain/connectivity/connection_status.dart';
import 'package:hiddify/domain/constants.dart'; import 'package:hiddify/domain/constants.dart';
import 'package:hiddify/domain/core_facade.dart'; import 'package:hiddify/domain/core_facade.dart';
import 'package:hiddify/domain/core_service_failure.dart'; import 'package:hiddify/domain/core_service_failure.dart';
import 'package:hiddify/domain/singbox/singbox.dart';
import 'package:hiddify/services/connectivity/connectivity.dart'; import 'package:hiddify/services/connectivity/connectivity.dart';
import 'package:hiddify/services/files_editor_service.dart'; import 'package:hiddify/services/files_editor_service.dart';
import 'package:hiddify/services/singbox/singbox_service.dart'; import 'package:hiddify/services/singbox/singbox_service.dart';
@@ -95,6 +98,19 @@ class CoreFacadeImpl with ExceptionHandler, InfraLogger implements CoreFacade {
); );
} }
@override
Stream<Either<CoreServiceFailure, CoreStatus>> watchCoreStatus() {
return singbox.watchStatus().map((event) {
final json = jsonDecode(event);
return CoreStatus.fromJson(json as Map<String, dynamic>);
}).handleExceptions(
(error, stackTrace) {
loggy.warning("error watching status", error, stackTrace);
return CoreServiceFailure.unexpected(error, stackTrace);
},
);
}
@override @override
Stream<Either<CoreServiceFailure, String>> watchLogs() { Stream<Either<CoreServiceFailure, String>> watchLogs() {
return singbox return singbox

View File

@@ -0,0 +1,20 @@
import 'package:freezed_annotation/freezed_annotation.dart';
part 'core_status.freezed.dart';
part 'core_status.g.dart';
@freezed
class CoreStatus with _$CoreStatus {
@JsonSerializable(fieldRename: FieldRename.kebab)
const factory CoreStatus({
required int connectionsIn,
required int connectionsOut,
required int uplink,
required int downlink,
required int uplinkTotal,
required int downlinkTotal,
}) = _CoreStatus;
factory CoreStatus.fromJson(Map<String, dynamic> json) =>
_$CoreStatusFromJson(json);
}

View File

@@ -1 +1,2 @@
export 'core_status.dart';
export 'singbox_facade.dart'; export 'singbox_facade.dart';

View File

@@ -1,5 +1,6 @@
import 'package:fpdart/fpdart.dart'; import 'package:fpdart/fpdart.dart';
import 'package:hiddify/domain/core_service_failure.dart'; import 'package:hiddify/domain/core_service_failure.dart';
import 'package:hiddify/domain/singbox/core_status.dart';
abstract interface class SingboxFacade { abstract interface class SingboxFacade {
TaskEither<CoreServiceFailure, Unit> setup(); TaskEither<CoreServiceFailure, Unit> setup();
@@ -12,5 +13,7 @@ abstract interface class SingboxFacade {
TaskEither<CoreServiceFailure, Unit> stop(); TaskEither<CoreServiceFailure, Unit> stop();
Stream<Either<CoreServiceFailure, CoreStatus>> watchCoreStatus();
Stream<Either<CoreServiceFailure, String>> watchLogs(); Stream<Either<CoreServiceFailure, String>> watchLogs();
} }

View File

@@ -17,12 +17,16 @@ class TrafficNotifier extends _$TrafficNotifier with AppLogger {
Stream<List<Traffic>> build() async* { Stream<List<Traffic>> build() async* {
final serviceRunning = await ref.watch(serviceRunningProvider.future); final serviceRunning = await ref.watch(serviceRunningProvider.future);
if (serviceRunning) { if (serviceRunning) {
yield* ref.watch(coreFacadeProvider).watchTraffic().map( // TODO: temporary!
(event) => _mapToState( yield* ref.watch(coreFacadeProvider).watchCoreStatus().map((event) {
event return event.map(
.getOrElse((_) => const ClashTraffic(upload: 0, download: 0)), (a) => ClashTraffic(upload: a.uplink, download: a.downlink),
), );
); }).map(
(event) => _mapToState(
event.getOrElse((_) => const ClashTraffic(upload: 0, download: 0)),
),
);
} else { } else {
yield* Stream.periodic(const Duration(seconds: 1)).asyncMap( yield* Stream.periodic(const Duration(seconds: 1)).asyncMap(
(_) async { (_) async {

View File

@@ -857,6 +857,20 @@ class SingboxNativeLibrary {
late final __FCmulcr = late final __FCmulcr =
__FCmulcrPtr.asFunction<_Fcomplex Function(_Fcomplex, double)>(); __FCmulcrPtr.asFunction<_Fcomplex Function(_Fcomplex, double)>();
void setupOnce(
ffi.Pointer<ffi.Void> api,
) {
return _setupOnce(
api,
);
}
late final _setupOncePtr =
_lookup<ffi.NativeFunction<ffi.Void Function(ffi.Pointer<ffi.Void>)>>(
'setupOnce');
late final _setupOnce =
_setupOncePtr.asFunction<void Function(ffi.Pointer<ffi.Void>)>();
void setup( void setup(
ffi.Pointer<ffi.Char> baseDir, ffi.Pointer<ffi.Char> baseDir,
ffi.Pointer<ffi.Char> workingDir, ffi.Pointer<ffi.Char> workingDir,
@@ -920,6 +934,37 @@ class SingboxNativeLibrary {
late final _stopPtr = late final _stopPtr =
_lookup<ffi.NativeFunction<ffi.Pointer<ffi.Char> Function()>>('stop'); _lookup<ffi.NativeFunction<ffi.Pointer<ffi.Char> Function()>>('stop');
late final _stop = _stopPtr.asFunction<ffi.Pointer<ffi.Char> Function()>(); late final _stop = _stopPtr.asFunction<ffi.Pointer<ffi.Char> Function()>();
ffi.Pointer<ffi.Char> startCommandClient(
int command,
int port,
) {
return _startCommandClient(
command,
port,
);
}
late final _startCommandClientPtr = _lookup<
ffi.NativeFunction<
ffi.Pointer<ffi.Char> Function(
ffi.Int, ffi.LongLong)>>('startCommandClient');
late final _startCommandClient = _startCommandClientPtr
.asFunction<ffi.Pointer<ffi.Char> Function(int, int)>();
ffi.Pointer<ffi.Char> stopCommandClient(
int command,
) {
return _stopCommandClient(
command,
);
}
late final _stopCommandClientPtr =
_lookup<ffi.NativeFunction<ffi.Pointer<ffi.Char> Function(ffi.Int)>>(
'stopCommandClient');
late final _stopCommandClient =
_stopCommandClientPtr.asFunction<ffi.Pointer<ffi.Char> Function(int)>();
} }
typedef va_list = ffi.Pointer<ffi.Char>; typedef va_list = ffi.Pointer<ffi.Char>;

View File

@@ -1,6 +1,7 @@
import 'dart:async'; import 'dart:async';
import 'dart:ffi'; import 'dart:ffi';
import 'dart:io'; import 'dart:io';
import 'dart:isolate';
import 'package:combine/combine.dart'; import 'package:combine/combine.dart';
import 'package:ffi/ffi.dart'; import 'package:ffi/ffi.dart';
@@ -16,6 +17,8 @@ final _logger = Loggy('FFISingboxService');
class FFISingboxService with InfraLogger implements SingboxService { class FFISingboxService with InfraLogger implements SingboxService {
static final SingboxNativeLibrary _box = _gen(); static final SingboxNativeLibrary _box = _gen();
Stream<String>? _statusStream;
static SingboxNativeLibrary _gen() { static SingboxNativeLibrary _gen() {
String fullPath = ""; String fullPath = "";
if (Platform.environment.containsKey('FLUTTER_TEST')) { if (Platform.environment.containsKey('FLUTTER_TEST')) {
@@ -47,6 +50,7 @@ class FFISingboxService with InfraLogger implements SingboxService {
workingDir.toNativeUtf8().cast(), workingDir.toNativeUtf8().cast(),
tempDir.toNativeUtf8().cast(), tempDir.toNativeUtf8().cast(),
); );
_box.setupOnce(NativeApi.initializeApiDLData);
return right(unit); return right(unit);
}, },
), ),
@@ -119,6 +123,46 @@ class FFISingboxService with InfraLogger implements SingboxService {
); );
} }
@override
Stream<String> watchStatus() {
if (_statusStream != null) return _statusStream!;
final receiver = ReceivePort('status receiver');
final statusStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping status command client");
final err = _box.stopCommandClient(1).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.warning("error stopping status client");
}
receiver.close();
_statusStream = null;
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
loggy.warning("[status client] error received: $event");
throw event.replaceFirst('error:', "");
}
return event;
}
loggy.warning("[status client] unexpected type, msg: $event");
throw "invalid type";
},
);
final err = _box
.startCommandClient(1, receiver.sendPort.nativePort)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
loggy.warning("error starting status command: $err");
throw err;
}
return _statusStream = statusStream;
}
@override @override
Stream<String> watchLogs(String path) { Stream<String> watchLogs(String path) {
var linesRead = 0; var linesRead = 0;

View File

@@ -67,6 +67,12 @@ class MobileSingboxService with InfraLogger implements SingboxService {
); );
} }
@override
Stream<String> watchStatus() {
// TODO: implement watchStatus
return const Stream.empty();
}
@override @override
Stream<String> watchLogs(String path) { Stream<String> watchLogs(String path) {
return _logsChannel.receiveBroadcastStream().map( return _logsChannel.receiveBroadcastStream().map(

View File

@@ -26,5 +26,7 @@ abstract interface class SingboxService {
TaskEither<String, Unit> stop(); TaskEither<String, Unit> stop();
Stream<String> watchStatus();
Stream<String> watchLogs(String path); Stream<String> watchLogs(String path);
} }