diff --git a/lib/features/proxy/data/proxy_repository.dart b/lib/features/proxy/data/proxy_repository.dart index 640558f4..8d931d50 100644 --- a/lib/features/proxy/data/proxy_repository.dart +++ b/lib/features/proxy/data/proxy_repository.dart @@ -32,7 +32,7 @@ class ProxyRepositoryImpl @override Stream>> watchProxies() { - return singbox.watchOutbounds().map((event) { + return singbox.watchGroups().map((event) { final groupWithSelected = { for (final group in event) group.tag: group.selected, }; @@ -66,7 +66,7 @@ class ProxyRepositoryImpl @override Stream>> watchActiveProxies() { - return singbox.watchActiveOutbounds().map((event) { + return singbox.watchActiveGroups().map((event) { final groupWithSelected = { for (final group in event) group.tag: group.selected, }; diff --git a/lib/singbox/service/ffi_singbox_service.dart b/lib/singbox/service/ffi_singbox_service.dart index b07a575c..3626c5b9 100644 --- a/lib/singbox/service/ffi_singbox_service.dart +++ b/lib/singbox/service/ffi_singbox_service.dart @@ -237,7 +237,7 @@ class FFISingboxService with InfraLogger implements SingboxService { @override Stream watchStats() { if (_serviceStatsStream != null) return _serviceStatsStream!; - final receiver = ReceivePort('service stats receiver'); + final receiver = ReceivePort('stats'); final statusStream = receiver.asBroadcastStream( onCancel: (_) { _logger.debug("stopping stats command client"); @@ -277,67 +277,28 @@ class FFISingboxService with InfraLogger implements SingboxService { } @override - Stream> watchOutbounds() { + Stream> watchGroups() { + final logger = newLoggy("watchGroups"); if (_outboundsStream != null) return _outboundsStream!; - final receiver = ReceivePort('outbounds receiver'); + final receiver = ReceivePort('groups'); final outboundsStream = receiver.asBroadcastStream( onCancel: (_) { - _logger.debug("stopping group command client"); + logger.debug("stopping"); + receiver.close(); + _outboundsStream = null; final err = _box.stopCommandClient(4).cast().toDartString(); if (err.isNotEmpty) { _logger.error("error stopping group client"); } - receiver.close(); - _outboundsStream = null; }, ).map( (event) { if (event case String _) { if (event.startsWith('error:')) { - loggy.error("[group client] error received: $event"); + logger.error("error received: $event"); throw event.replaceFirst('error:', ""); } - return (jsonDecode(event) as List).map((e) { - return SingboxOutboundGroup.fromJson(e as Map); - }).toList(); - } - loggy.error("[group client] unexpected type, msg: $event"); - throw "invalid type"; - }, - ); - final err = _box - .startCommandClient(4, receiver.sendPort.nativePort) - .cast() - .toDartString(); - if (err.isNotEmpty) { - loggy.error("error starting group command: $err"); - throw err; - } - - return _outboundsStream = outboundsStream; - } - - @override - Stream> watchActiveOutbounds() { - final logger = newLoggy("[ActiveGroupsClient]"); - final receiver = ReceivePort('active groups receiver'); - final outboundsStream = receiver.asBroadcastStream( - onCancel: (_) { - logger.debug("stopping"); - final err = _box.stopCommandClient(12).cast().toDartString(); - if (err.isNotEmpty) { - logger.error("failed stopping: $err"); - } - receiver.close(); - }, - ).map( - (event) { - if (event case String _) { - if (event.startsWith('error:')) { - logger.error(event); - throw event.replaceFirst('error:', ""); - } return (jsonDecode(event) as List).map((e) { return SingboxOutboundGroup.fromJson(e as Map); }).toList(); @@ -347,14 +308,67 @@ class FFISingboxService with InfraLogger implements SingboxService { }, ); - final err = _box - .startCommandClient(12, receiver.sendPort.nativePort) - .cast() - .toDartString(); - if (err.isNotEmpty) { - logger.error("error starting: $err"); - throw err; + try { + final err = _box + .startCommandClient(4, receiver.sendPort.nativePort) + .cast() + .toDartString(); + if (err.isNotEmpty) { + logger.error("error starting group command: $err"); + throw err; + } + } catch (e) { + receiver.close(); + rethrow; } + + return _outboundsStream = outboundsStream; + } + + @override + Stream> watchActiveGroups() { + final logger = newLoggy("[ActiveGroupsClient]"); + final receiver = ReceivePort('active groups'); + final outboundsStream = receiver.asBroadcastStream( + onCancel: (_) { + logger.debug("stopping"); + receiver.close(); + final err = _box.stopCommandClient(12).cast().toDartString(); + if (err.isNotEmpty) { + logger.error("failed stopping: $err"); + } + }, + ).map( + (event) { + if (event case String _) { + if (event.startsWith('error:')) { + logger.error(event); + throw event.replaceFirst('error:', ""); + } + + return (jsonDecode(event) as List).map((e) { + return SingboxOutboundGroup.fromJson(e as Map); + }).toList(); + } + logger.error("unexpected type, msg: $event"); + throw "invalid type"; + }, + ); + + try { + final err = _box + .startCommandClient(12, receiver.sendPort.nativePort) + .cast() + .toDartString(); + if (err.isNotEmpty) { + logger.error("error starting: $err"); + throw err; + } + } catch (e) { + receiver.close(); + rethrow; + } + return outboundsStream; } diff --git a/lib/singbox/service/platform_singbox_service.dart b/lib/singbox/service/platform_singbox_service.dart index d0a3ad94..2aadacf5 100644 --- a/lib/singbox/service/platform_singbox_service.dart +++ b/lib/singbox/service/platform_singbox_service.dart @@ -13,12 +13,19 @@ import 'package:hiddify/utils/custom_loggers.dart'; import 'package:rxdart/rxdart.dart'; class PlatformSingboxService with InfraLogger implements SingboxService { - late final _methodChannel = const MethodChannel("com.hiddify.app/method"); - late final _statusChannel = - const EventChannel("com.hiddify.app/service.status", JSONMethodCodec()); - late final _alertsChannel = - const EventChannel("com.hiddify.app/service.alerts", JSONMethodCodec()); - late final _logsChannel = const EventChannel("com.hiddify.app/service.logs"); + static const channelPrefix = "com.hiddify.app"; + + static const methodChannel = MethodChannel("$channelPrefix/method"); + static const statusChannel = + EventChannel("$channelPrefix/service.status", JSONMethodCodec()); + static const alertsChannel = + EventChannel("$channelPrefix/service.alerts", JSONMethodCodec()); + static const statsChannel = + EventChannel("$channelPrefix/stats", JSONMethodCodec()); + static const groupsChannel = EventChannel("$channelPrefix/groups"); + static const activeGroupsChannel = + EventChannel("$channelPrefix/active-groups"); + static const logsChannel = EventChannel("$channelPrefix/service.logs"); late final ValueStream _status; @@ -26,26 +33,23 @@ class PlatformSingboxService with InfraLogger implements SingboxService { Future init() async { loggy.debug("initializing"); final status = - _statusChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent); + statusChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent); final alerts = - _alertsChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent); + alertsChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent); _status = ValueConnectableStream(Rx.merge([status, alerts])).autoConnect(); await _status.first; } @override - TaskEither setup( - Directories directories, - bool debug, - ) { + TaskEither setup(Directories directories, bool debug) { return TaskEither( () async { if (!Platform.isIOS) { return right(unit); } - await _methodChannel.invokeMethod("setup"); + await methodChannel.invokeMethod("setup"); return right(unit); }, ); @@ -59,7 +63,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { ) { return TaskEither( () async { - final message = await _methodChannel.invokeMethod( + final message = await methodChannel.invokeMethod( "parse_config", {"path": path, "tempPath": tempPath, "debug": debug}, ); @@ -73,7 +77,8 @@ class PlatformSingboxService with InfraLogger implements SingboxService { TaskEither changeOptions(SingboxConfigOption options) { return TaskEither( () async { - await _methodChannel.invokeMethod( + loggy.debug("changing options"); + await methodChannel.invokeMethod( "change_config_options", jsonEncode(options.toJson()), ); @@ -83,12 +88,11 @@ class PlatformSingboxService with InfraLogger implements SingboxService { } @override - TaskEither generateFullConfigByPath( - String path, - ) { + TaskEither generateFullConfigByPath(String path) { return TaskEither( () async { - final configJson = await _methodChannel.invokeMethod( + loggy.debug("generating full config by path"); + final configJson = await methodChannel.invokeMethod( "generate_config", {"path": path}, ); @@ -109,7 +113,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { return TaskEither( () async { loggy.debug("starting"); - await _methodChannel.invokeMethod( + await methodChannel.invokeMethod( "start", {"path": path, "name": name}, ); @@ -123,7 +127,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { return TaskEither( () async { loggy.debug("stopping"); - await _methodChannel.invokeMethod("stop"); + await methodChannel.invokeMethod("stop"); return right(unit); }, ); @@ -138,7 +142,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { return TaskEither( () async { loggy.debug("restarting"); - await _methodChannel.invokeMethod( + await methodChannel.invokeMethod( "restart", {"path": path, "name": name}, ); @@ -159,17 +163,16 @@ class PlatformSingboxService with InfraLogger implements SingboxService { } loggy.debug("resetting tunnel"); - await _methodChannel.invokeMethod("reset"); + await methodChannel.invokeMethod("reset"); return right(unit); }, ); } @override - Stream> watchOutbounds() { - const channel = EventChannel("com.hiddify.app/groups"); - loggy.debug("watching outbounds"); - return channel.receiveBroadcastStream().map( + Stream> watchGroups() { + loggy.debug("watching groups"); + return groupsChannel.receiveBroadcastStream().map( (event) { if (event case String _) { return (jsonDecode(event) as List).map((e) { @@ -183,10 +186,9 @@ class PlatformSingboxService with InfraLogger implements SingboxService { } @override - Stream> watchActiveOutbounds() { - const channel = EventChannel("com.hiddify.app/active-groups"); - loggy.debug("watching active outbounds"); - return channel.receiveBroadcastStream().map( + Stream> watchActiveGroups() { + loggy.debug("watching active groups"); + return activeGroupsChannel.receiveBroadcastStream().map( (event) { if (event case String _) { return (jsonDecode(event) as List).map((e) { @@ -204,9 +206,8 @@ class PlatformSingboxService with InfraLogger implements SingboxService { @override Stream watchStats() { - const channel = EventChannel("com.hiddify.app/stats", JSONMethodCodec()); loggy.debug("watching stats"); - return channel.receiveBroadcastStream().map( + return statsChannel.receiveBroadcastStream().map( (event) { if (event case Map _) { return SingboxStats.fromJson(event); @@ -224,7 +225,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { return TaskEither( () async { loggy.debug("selecting outbound"); - await _methodChannel.invokeMethod( + await methodChannel.invokeMethod( "select_outbound", {"groupTag": groupTag, "outboundTag": outboundTag}, ); @@ -237,7 +238,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { TaskEither urlTest(String groupTag) { return TaskEither( () async { - await _methodChannel.invokeMethod( + await methodChannel.invokeMethod( "url_test", {"groupTag": groupTag}, ); @@ -248,7 +249,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { @override Stream> watchLogs(String path) async* { - yield* _logsChannel + yield* logsChannel .receiveBroadcastStream() .map((event) => (event as List).map((e) => e as String).toList()); } @@ -257,7 +258,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService { TaskEither clearLogs() { return TaskEither( () async { - await _methodChannel.invokeMethod("clear_logs"); + await methodChannel.invokeMethod("clear_logs"); return right(unit); }, ); diff --git a/lib/singbox/service/singbox_service.dart b/lib/singbox/service/singbox_service.dart index bf3f7f6c..895b74f7 100644 --- a/lib/singbox/service/singbox_service.dart +++ b/lib/singbox/service/singbox_service.dart @@ -21,11 +21,17 @@ abstract interface class SingboxService { Future init(); + /// setup directories and other initial platform services TaskEither setup( Directories directories, bool debug, ); + /// validates config by path and save it + /// + /// [path] is used to save validated config + /// [tempPath] includes base config, possibly invalid + /// [debug] indicates if debug mode (avoid in prod) TaskEither validateConfigByPath( String path, String tempPath, @@ -34,10 +40,17 @@ abstract interface class SingboxService { TaskEither changeOptions(SingboxConfigOption options); - TaskEither generateFullConfigByPath( - String path, - ); + /// generates full sing-box configuration + /// + /// [path] is the path to the base config file + /// returns full patched json config file as string + TaskEither generateFullConfigByPath(String path); + /// start sing-box service + /// + /// [path] is the path to the base config file (to be patched by previously set [SingboxConfigOption]) + /// [name] is the name of the active profile (not unique, used for presentation in platform specific ui) + /// [disableMemoryLimit] is used to disable service memory limit (mostly used in mobile platforms i.e. iOS) TaskEither start( String path, String name, @@ -46,6 +59,7 @@ abstract interface class SingboxService { TaskEither stop(); + /// similar to [start], but uses platform dependent behavior to restart the service TaskEither restart( String path, String name, @@ -54,16 +68,18 @@ abstract interface class SingboxService { TaskEither resetTunnel(); - Stream> watchOutbounds(); + Stream> watchGroups(); - Stream> watchActiveOutbounds(); + Stream> watchActiveGroups(); TaskEither selectOutbound(String groupTag, String outboundTag); TaskEither urlTest(String groupTag); + /// watch status of sing-box service (started, starting, etc.) Stream watchStatus(); + /// watch stats of sing-box service (uplink, downlink, etc.) Stream watchStats(); Stream> watchLogs(String path);