Change service error handling

This commit is contained in:
problematicconsumer
2024-02-13 17:02:10 +03:30
parent 28ece8fb2e
commit 2293a390e5
4 changed files with 129 additions and 98 deletions

View File

@@ -32,7 +32,7 @@ class ProxyRepositoryImpl
@override
Stream<Either<ProxyFailure, List<ProxyGroupEntity>>> watchProxies() {
return singbox.watchOutbounds().map((event) {
return singbox.watchGroups().map((event) {
final groupWithSelected = {
for (final group in event) group.tag: group.selected,
};
@@ -66,7 +66,7 @@ class ProxyRepositoryImpl
@override
Stream<Either<ProxyFailure, List<ProxyGroupEntity>>> watchActiveProxies() {
return singbox.watchActiveOutbounds().map((event) {
return singbox.watchActiveGroups().map((event) {
final groupWithSelected = {
for (final group in event) group.tag: group.selected,
};

View File

@@ -237,7 +237,7 @@ class FFISingboxService with InfraLogger implements SingboxService {
@override
Stream<SingboxStats> watchStats() {
if (_serviceStatsStream != null) return _serviceStatsStream!;
final receiver = ReceivePort('service stats receiver');
final receiver = ReceivePort('stats');
final statusStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping stats command client");
@@ -277,67 +277,28 @@ class FFISingboxService with InfraLogger implements SingboxService {
}
@override
Stream<List<SingboxOutboundGroup>> watchOutbounds() {
Stream<List<SingboxOutboundGroup>> watchGroups() {
final logger = newLoggy("watchGroups");
if (_outboundsStream != null) return _outboundsStream!;
final receiver = ReceivePort('outbounds receiver');
final receiver = ReceivePort('groups');
final outboundsStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping group command client");
logger.debug("stopping");
receiver.close();
_outboundsStream = null;
final err = _box.stopCommandClient(4).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.error("error stopping group client");
}
receiver.close();
_outboundsStream = null;
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
loggy.error("[group client] error received: $event");
logger.error("error received: $event");
throw event.replaceFirst('error:', "");
}
return (jsonDecode(event) as List).map((e) {
return SingboxOutboundGroup.fromJson(e as Map<String, dynamic>);
}).toList();
}
loggy.error("[group client] unexpected type, msg: $event");
throw "invalid type";
},
);
final err = _box
.startCommandClient(4, receiver.sendPort.nativePort)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
loggy.error("error starting group command: $err");
throw err;
}
return _outboundsStream = outboundsStream;
}
@override
Stream<List<SingboxOutboundGroup>> watchActiveOutbounds() {
final logger = newLoggy("[ActiveGroupsClient]");
final receiver = ReceivePort('active groups receiver');
final outboundsStream = receiver.asBroadcastStream(
onCancel: (_) {
logger.debug("stopping");
final err = _box.stopCommandClient(12).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
logger.error("failed stopping: $err");
}
receiver.close();
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
logger.error(event);
throw event.replaceFirst('error:', "");
}
return (jsonDecode(event) as List).map((e) {
return SingboxOutboundGroup.fromJson(e as Map<String, dynamic>);
}).toList();
@@ -347,6 +308,54 @@ class FFISingboxService with InfraLogger implements SingboxService {
},
);
try {
final err = _box
.startCommandClient(4, receiver.sendPort.nativePort)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
logger.error("error starting group command: $err");
throw err;
}
} catch (e) {
receiver.close();
rethrow;
}
return _outboundsStream = outboundsStream;
}
@override
Stream<List<SingboxOutboundGroup>> watchActiveGroups() {
final logger = newLoggy("[ActiveGroupsClient]");
final receiver = ReceivePort('active groups');
final outboundsStream = receiver.asBroadcastStream(
onCancel: (_) {
logger.debug("stopping");
receiver.close();
final err = _box.stopCommandClient(12).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
logger.error("failed stopping: $err");
}
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
logger.error(event);
throw event.replaceFirst('error:', "");
}
return (jsonDecode(event) as List).map((e) {
return SingboxOutboundGroup.fromJson(e as Map<String, dynamic>);
}).toList();
}
logger.error("unexpected type, msg: $event");
throw "invalid type";
},
);
try {
final err = _box
.startCommandClient(12, receiver.sendPort.nativePort)
.cast<Utf8>()
@@ -355,6 +364,11 @@ class FFISingboxService with InfraLogger implements SingboxService {
logger.error("error starting: $err");
throw err;
}
} catch (e) {
receiver.close();
rethrow;
}
return outboundsStream;
}

View File

@@ -13,12 +13,19 @@ import 'package:hiddify/utils/custom_loggers.dart';
import 'package:rxdart/rxdart.dart';
class PlatformSingboxService with InfraLogger implements SingboxService {
late final _methodChannel = const MethodChannel("com.hiddify.app/method");
late final _statusChannel =
const EventChannel("com.hiddify.app/service.status", JSONMethodCodec());
late final _alertsChannel =
const EventChannel("com.hiddify.app/service.alerts", JSONMethodCodec());
late final _logsChannel = const EventChannel("com.hiddify.app/service.logs");
static const channelPrefix = "com.hiddify.app";
static const methodChannel = MethodChannel("$channelPrefix/method");
static const statusChannel =
EventChannel("$channelPrefix/service.status", JSONMethodCodec());
static const alertsChannel =
EventChannel("$channelPrefix/service.alerts", JSONMethodCodec());
static const statsChannel =
EventChannel("$channelPrefix/stats", JSONMethodCodec());
static const groupsChannel = EventChannel("$channelPrefix/groups");
static const activeGroupsChannel =
EventChannel("$channelPrefix/active-groups");
static const logsChannel = EventChannel("$channelPrefix/service.logs");
late final ValueStream<SingboxStatus> _status;
@@ -26,26 +33,23 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
Future<void> init() async {
loggy.debug("initializing");
final status =
_statusChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent);
statusChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent);
final alerts =
_alertsChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent);
alertsChannel.receiveBroadcastStream().map(SingboxStatus.fromEvent);
_status = ValueConnectableStream(Rx.merge([status, alerts])).autoConnect();
await _status.first;
}
@override
TaskEither<String, Unit> setup(
Directories directories,
bool debug,
) {
TaskEither<String, Unit> setup(Directories directories, bool debug) {
return TaskEither(
() async {
if (!Platform.isIOS) {
return right(unit);
}
await _methodChannel.invokeMethod("setup");
await methodChannel.invokeMethod("setup");
return right(unit);
},
);
@@ -59,7 +63,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
) {
return TaskEither(
() async {
final message = await _methodChannel.invokeMethod<String>(
final message = await methodChannel.invokeMethod<String>(
"parse_config",
{"path": path, "tempPath": tempPath, "debug": debug},
);
@@ -73,7 +77,8 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
TaskEither<String, Unit> changeOptions(SingboxConfigOption options) {
return TaskEither(
() async {
await _methodChannel.invokeMethod(
loggy.debug("changing options");
await methodChannel.invokeMethod(
"change_config_options",
jsonEncode(options.toJson()),
);
@@ -83,12 +88,11 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
}
@override
TaskEither<String, String> generateFullConfigByPath(
String path,
) {
TaskEither<String, String> generateFullConfigByPath(String path) {
return TaskEither(
() async {
final configJson = await _methodChannel.invokeMethod<String>(
loggy.debug("generating full config by path");
final configJson = await methodChannel.invokeMethod<String>(
"generate_config",
{"path": path},
);
@@ -109,7 +113,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
return TaskEither(
() async {
loggy.debug("starting");
await _methodChannel.invokeMethod(
await methodChannel.invokeMethod(
"start",
{"path": path, "name": name},
);
@@ -123,7 +127,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
return TaskEither(
() async {
loggy.debug("stopping");
await _methodChannel.invokeMethod("stop");
await methodChannel.invokeMethod("stop");
return right(unit);
},
);
@@ -138,7 +142,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
return TaskEither(
() async {
loggy.debug("restarting");
await _methodChannel.invokeMethod(
await methodChannel.invokeMethod(
"restart",
{"path": path, "name": name},
);
@@ -159,17 +163,16 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
}
loggy.debug("resetting tunnel");
await _methodChannel.invokeMethod("reset");
await methodChannel.invokeMethod("reset");
return right(unit);
},
);
}
@override
Stream<List<SingboxOutboundGroup>> watchOutbounds() {
const channel = EventChannel("com.hiddify.app/groups");
loggy.debug("watching outbounds");
return channel.receiveBroadcastStream().map(
Stream<List<SingboxOutboundGroup>> watchGroups() {
loggy.debug("watching groups");
return groupsChannel.receiveBroadcastStream().map(
(event) {
if (event case String _) {
return (jsonDecode(event) as List).map((e) {
@@ -183,10 +186,9 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
}
@override
Stream<List<SingboxOutboundGroup>> watchActiveOutbounds() {
const channel = EventChannel("com.hiddify.app/active-groups");
loggy.debug("watching active outbounds");
return channel.receiveBroadcastStream().map(
Stream<List<SingboxOutboundGroup>> watchActiveGroups() {
loggy.debug("watching active groups");
return activeGroupsChannel.receiveBroadcastStream().map(
(event) {
if (event case String _) {
return (jsonDecode(event) as List).map((e) {
@@ -204,9 +206,8 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
@override
Stream<SingboxStats> watchStats() {
const channel = EventChannel("com.hiddify.app/stats", JSONMethodCodec());
loggy.debug("watching stats");
return channel.receiveBroadcastStream().map(
return statsChannel.receiveBroadcastStream().map(
(event) {
if (event case Map<String, dynamic> _) {
return SingboxStats.fromJson(event);
@@ -224,7 +225,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
return TaskEither(
() async {
loggy.debug("selecting outbound");
await _methodChannel.invokeMethod(
await methodChannel.invokeMethod(
"select_outbound",
{"groupTag": groupTag, "outboundTag": outboundTag},
);
@@ -237,7 +238,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
TaskEither<String, Unit> urlTest(String groupTag) {
return TaskEither(
() async {
await _methodChannel.invokeMethod(
await methodChannel.invokeMethod(
"url_test",
{"groupTag": groupTag},
);
@@ -248,7 +249,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
@override
Stream<List<String>> watchLogs(String path) async* {
yield* _logsChannel
yield* logsChannel
.receiveBroadcastStream()
.map((event) => (event as List).map((e) => e as String).toList());
}
@@ -257,7 +258,7 @@ class PlatformSingboxService with InfraLogger implements SingboxService {
TaskEither<String, Unit> clearLogs() {
return TaskEither(
() async {
await _methodChannel.invokeMethod("clear_logs");
await methodChannel.invokeMethod("clear_logs");
return right(unit);
},
);

View File

@@ -21,11 +21,17 @@ abstract interface class SingboxService {
Future<void> init();
/// setup directories and other initial platform services
TaskEither<String, Unit> setup(
Directories directories,
bool debug,
);
/// validates config by path and save it
///
/// [path] is used to save validated config
/// [tempPath] includes base config, possibly invalid
/// [debug] indicates if debug mode (avoid in prod)
TaskEither<String, Unit> validateConfigByPath(
String path,
String tempPath,
@@ -34,10 +40,17 @@ abstract interface class SingboxService {
TaskEither<String, Unit> changeOptions(SingboxConfigOption options);
TaskEither<String, String> generateFullConfigByPath(
String path,
);
/// generates full sing-box configuration
///
/// [path] is the path to the base config file
/// returns full patched json config file as string
TaskEither<String, String> generateFullConfigByPath(String path);
/// start sing-box service
///
/// [path] is the path to the base config file (to be patched by previously set [SingboxConfigOption])
/// [name] is the name of the active profile (not unique, used for presentation in platform specific ui)
/// [disableMemoryLimit] is used to disable service memory limit (mostly used in mobile platforms i.e. iOS)
TaskEither<String, Unit> start(
String path,
String name,
@@ -46,6 +59,7 @@ abstract interface class SingboxService {
TaskEither<String, Unit> stop();
/// similar to [start], but uses platform dependent behavior to restart the service
TaskEither<String, Unit> restart(
String path,
String name,
@@ -54,16 +68,18 @@ abstract interface class SingboxService {
TaskEither<String, Unit> resetTunnel();
Stream<List<SingboxOutboundGroup>> watchOutbounds();
Stream<List<SingboxOutboundGroup>> watchGroups();
Stream<List<SingboxOutboundGroup>> watchActiveOutbounds();
Stream<List<SingboxOutboundGroup>> watchActiveGroups();
TaskEither<String, Unit> selectOutbound(String groupTag, String outboundTag);
TaskEither<String, Unit> urlTest(String groupTag);
/// watch status of sing-box service (started, starting, etc.)
Stream<SingboxStatus> watchStatus();
/// watch stats of sing-box service (uplink, downlink, etc.)
Stream<SingboxStats> watchStats();
Stream<List<String>> watchLogs(String path);