Change proxies flow

This commit is contained in:
problematicconsumer
2023-08-29 19:32:31 +03:30
parent 375cb8a945
commit e8eb55ac8d
15 changed files with 335 additions and 229 deletions

View File

@@ -98,6 +98,42 @@ class CoreFacadeImpl with ExceptionHandler, InfraLogger implements CoreFacade {
);
}
@override
Stream<Either<CoreServiceFailure, List<OutboundGroup>>> watchOutbounds() {
return singbox.watchOutbounds().map((event) {
return (jsonDecode(event) as List).map((e) {
return OutboundGroup.fromJson(e as Map<String, dynamic>);
}).toList();
}).handleExceptions(
(error, stackTrace) {
loggy.warning("error watching outbounds", error, stackTrace);
return CoreServiceFailure.unexpected(error, stackTrace);
},
);
}
@override
TaskEither<CoreServiceFailure, Unit> selectOutbound(
String groupTag,
String outboundTag,
) {
return exceptionHandler(
() => singbox
.selectOutbound(groupTag, outboundTag)
.mapLeft(CoreServiceFailure.other)
.run(),
CoreServiceFailure.unexpected,
);
}
@override
TaskEither<CoreServiceFailure, Unit> urlTest(String groupTag) {
return exceptionHandler(
() => singbox.urlTest(groupTag).mapLeft(CoreServiceFailure.other).run(),
CoreServiceFailure.unexpected,
);
}
@override
Stream<Either<CoreServiceFailure, CoreStatus>> watchCoreStatus() {
return singbox.watchStatus().map((event) {

View File

@@ -0,0 +1,38 @@
import 'package:dartx/dartx.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:hiddify/domain/singbox/proxy_type.dart';
part 'outbounds.freezed.dart';
part 'outbounds.g.dart';
@freezed
class OutboundGroup with _$OutboundGroup {
@JsonSerializable(fieldRename: FieldRename.kebab)
const factory OutboundGroup({
required String tag,
@JsonKey(fromJson: _typeFromJson) required ProxyType type,
required String selected,
@Default([]) List<OutboundGroupItem> items,
}) = _OutboundGroup;
factory OutboundGroup.fromJson(Map<String, dynamic> json) =>
_$OutboundGroupFromJson(json);
}
@freezed
class OutboundGroupItem with _$OutboundGroupItem {
@JsonSerializable(fieldRename: FieldRename.kebab)
const factory OutboundGroupItem({
required String tag,
@JsonKey(fromJson: _typeFromJson) required ProxyType type,
required int urlTestDelay,
}) = _OutboundGroupItem;
factory OutboundGroupItem.fromJson(Map<String, dynamic> json) =>
_$OutboundGroupItemFromJson(json);
}
ProxyType _typeFromJson(dynamic type) =>
ProxyType.values
.firstOrNullWhere((e) => e.key == (type as String?)?.toLowerCase()) ??
ProxyType.unknown;

View File

@@ -0,0 +1,31 @@
enum ProxyType {
direct("Direct"),
block("Block"),
dns("DNS"),
socks("SOCKS"),
http("HTTP"),
vmess("VMess"),
trojan("Trojan"),
naive("Naive"),
wireguard("WireGuard"),
hysteria("Hysteria"),
tor("Tor"),
ssh("SSH"),
shadowtls("ShadowTLS"),
shadowsocksr("ShadowsocksR"),
vless("VLESS"),
tuic("TUIC"),
selector("Selector"),
urltest("URLTest"),
unknown("Unknown");
const ProxyType(this.label);
final String label;
String get key => name;
static List<ProxyType> groupValues = [selector, urltest];
}

View File

@@ -1,2 +1,3 @@
export 'core_status.dart';
export 'outbounds.dart';
export 'singbox_facade.dart';

View File

@@ -1,6 +1,7 @@
import 'package:fpdart/fpdart.dart';
import 'package:hiddify/domain/core_service_failure.dart';
import 'package:hiddify/domain/singbox/core_status.dart';
import 'package:hiddify/domain/singbox/outbounds.dart';
abstract interface class SingboxFacade {
TaskEither<CoreServiceFailure, Unit> setup();
@@ -13,6 +14,15 @@ abstract interface class SingboxFacade {
TaskEither<CoreServiceFailure, Unit> stop();
Stream<Either<CoreServiceFailure, List<OutboundGroup>>> watchOutbounds();
TaskEither<CoreServiceFailure, Unit> selectOutbound(
String groupTag,
String outboundTag,
);
TaskEither<CoreServiceFailure, Unit> urlTest(String groupTag);
Stream<Either<CoreServiceFailure, CoreStatus>> watchCoreStatus();
Stream<Either<CoreServiceFailure, String>> watchLogs();

View File

@@ -1,43 +0,0 @@
import 'package:combine/combine.dart';
import 'package:flutter/foundation.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:hiddify/domain/clash/clash.dart';
part 'group_with_proxies.freezed.dart';
@freezed
class GroupWithProxies with _$GroupWithProxies {
const GroupWithProxies._();
const factory GroupWithProxies({
required ClashProxyGroup group,
required List<ClashProxy> proxies,
}) = _GroupWithProxies;
static Future<List<GroupWithProxies>> fromProxies(
List<ClashProxy> proxies,
) async {
final stopWatch = Stopwatch()..start();
final res = await CombineWorker().execute(
() {
final result = <GroupWithProxies>[];
for (final proxy in proxies) {
if (proxy is ClashProxyGroup) {
// if (mode != TunnelMode.global && proxy.name == "GLOBAL") continue;
if (proxy.name == "GLOBAL") continue;
final current = <ClashProxy>[];
for (final name in proxy.all) {
current.addAll(proxies.where((e) => e.name == name).toList());
}
result.add(GroupWithProxies(group: proxy, proxies: current));
}
}
return result;
},
);
debugPrint(
"computed grouped proxies in [${stopWatch.elapsedMilliseconds}ms]",
);
return res;
}
}

View File

@@ -1 +0,0 @@
export 'group_with_proxies.dart';

View File

@@ -1,83 +0,0 @@
import 'dart:async';
import 'package:dartx/dartx.dart';
import 'package:hiddify/core/prefs/misc_prefs.dart';
import 'package:hiddify/data/data_providers.dart';
import 'package:hiddify/domain/clash/clash.dart';
import 'package:hiddify/features/common/active_profile/active_profile_notifier.dart';
import 'package:hiddify/utils/utils.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:rxdart/rxdart.dart';
part 'proxies_delay_notifier.g.dart';
// TODO: rewrite
@Riverpod(keepAlive: true)
class ProxiesDelayNotifier extends _$ProxiesDelayNotifier with AppLogger {
@override
Map<String, int> build() {
ref.onDispose(
() {
loggy.debug("disposing");
_currentTest?.cancel();
},
);
ref.listen(
activeProfileProvider.selectAsync((value) => value?.id),
(prev, next) async {
if (await prev != await next) ref.invalidateSelf();
},
);
return {};
}
ClashFacade get _clash => ref.read(coreFacadeProvider);
StreamSubscription? _currentTest;
Future<void> testDelay(Iterable<String> proxies) async {
final testUrl = ref.read(connectionTestUrlProvider);
final concurrent = ref.read(concurrentTestCountProvider);
loggy.info(
'testing delay for [${proxies.length}] proxies with [$testUrl], [$concurrent] at a time',
);
// cancel possible running test
await _currentTest?.cancel();
// reset previous
state = state.filterNot((entry) => proxies.contains(entry.key));
void setDelay(String name, int delay) {
state = {
...state
..update(
name,
(_) => delay,
ifAbsent: () => delay,
),
};
}
_currentTest = Stream.fromIterable(proxies)
.bufferCount(concurrent)
.asyncMap(
(chunk) => Future.wait(
chunk.map(
(e) async => setDelay(
e,
await _clash
.testDelay(e, testUrl: testUrl)
.getOrElse((l) => -1)
.run(),
),
),
),
)
.listen((event) {});
}
Future<void> cancelDelayTest() async => _currentTest?.cancel();
}

View File

@@ -1,11 +1,9 @@
import 'dart:async';
import 'package:fpdart/fpdart.dart';
import 'package:hiddify/data/data_providers.dart';
import 'package:hiddify/domain/clash/clash.dart';
import 'package:hiddify/domain/core_service_failure.dart';
import 'package:hiddify/domain/singbox/singbox.dart';
import 'package:hiddify/features/common/connectivity/connectivity_controller.dart';
import 'package:hiddify/features/proxies/model/model.dart';
import 'package:hiddify/utils/utils.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
@@ -14,31 +12,51 @@ part 'proxies_notifier.g.dart';
@Riverpod(keepAlive: true)
class ProxiesNotifier extends _$ProxiesNotifier with AppLogger {
@override
Future<List<GroupWithProxies>> build() async {
loggy.debug('building');
if (!await ref.watch(serviceRunningProvider.future)) {
Stream<List<OutboundGroup>> build() async* {
loggy.debug("building");
final serviceRunning = await ref.watch(serviceRunningProvider.future);
if (!serviceRunning) {
throw const CoreServiceNotRunning();
}
return _clash.getProxies().flatMap(
(proxies) {
return TaskEither(
() async => right(await GroupWithProxies.fromProxies(proxies)),
yield* ref.watch(coreFacadeProvider).watchOutbounds().map(
(event) => event.getOrElse(
(f) {
loggy.warning("error receiving proxies", f);
throw f;
},
),
);
},
).getOrElse((l) {
loggy.warning("failed receiving proxies: $l");
throw l;
}).run();
}
ClashFacade get _clash => ref.read(coreFacadeProvider);
Future<void> changeProxy(String groupTag, String outboundTag) async {
loggy.debug(
"changing proxy, group: [$groupTag] - outbound: [$outboundTag]",
);
if (state case AsyncData(value: final outbounds)) {
await ref
.read(coreFacadeProvider)
.selectOutbound(groupTag, outboundTag)
.getOrElse((l) {
loggy.warning("error selecting outbound", l);
throw l;
}).run();
state = AsyncData(
[
...outbounds.map(
(e) => e.tag == groupTag ? e.copyWith(selected: outboundTag) : e,
),
],
).copyWithPrevious(state);
}
}
Future<void> changeProxy(String selectorName, String proxyName) async {
loggy.debug("changing proxy, selector: $selectorName - proxy: $proxyName ");
await _clash
.changeProxy(selectorName, proxyName)
.getOrElse((l) => throw l)
.run();
ref.invalidateSelf();
Future<void> urlTest(String groupTag) async {
loggy.debug("testing group: [$groupTag]");
if (state case AsyncData()) {
await ref.read(coreFacadeProvider).urlTest(groupTag).getOrElse((l) {
loggy.warning("error testing group", l);
throw l;
}).run();
}
}
}

View File

@@ -3,7 +3,6 @@ import 'package:hiddify/core/core_providers.dart';
import 'package:hiddify/domain/failures.dart';
import 'package:hiddify/features/common/common.dart';
import 'package:hiddify/features/proxies/notifier/notifier.dart';
import 'package:hiddify/features/proxies/notifier/proxies_delay_notifier.dart';
import 'package:hiddify/features/proxies/widgets/widgets.dart';
import 'package:hiddify/utils/utils.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
@@ -18,7 +17,6 @@ class ProxiesPage extends HookConsumerWidget with PresLogger {
final asyncProxies = ref.watch(proxiesNotifierProvider);
final notifier = ref.watch(proxiesNotifierProvider.notifier);
final delays = ref.watch(proxiesDelayNotifierProvider);
final selectActiveProxyMutation = useMutation(
initialOnFailure: (error) =>
@@ -47,55 +45,33 @@ class ProxiesPage extends HookConsumerWidget with PresLogger {
);
}
final select = groups.first;
final group = groups.first;
return Scaffold(
body: CustomScrollView(
slivers: [
NestedTabAppBar(
title: Text(t.proxies.pageTitle.titleCase),
actions: [
PopupMenuButton(
itemBuilder: (_) {
return [
PopupMenuItem(
onTap: ref
.read(proxiesDelayNotifierProvider.notifier)
.cancelDelayTest,
child: Text(
t.proxies.cancelTestButtonText.sentenceCase,
),
),
];
},
),
],
),
NestedTabAppBar(title: Text(t.proxies.pageTitle.titleCase)),
SliverLayoutBuilder(
builder: (context, constraints) {
final width = constraints.crossAxisExtent;
if (!PlatformUtils.isDesktop && width < 648) {
return SliverList.builder(
itemBuilder: (_, index) {
final proxy = select.proxies[index];
final proxy = group.items[index];
return ProxyTile(
proxy,
selected: select.group.now == proxy.name,
delay: delays[proxy.name],
selected: group.selected == proxy.tag,
onSelect: () async {
if (selectActiveProxyMutation.state.isInProgress) {
return;
}
selectActiveProxyMutation.setFuture(
notifier.changeProxy(
select.group.name,
proxy.name,
),
notifier.changeProxy(group.tag, proxy.tag),
);
},
);
},
itemCount: select.proxies.length,
itemCount: group.items.length,
);
}
@@ -105,36 +81,31 @@ class ProxiesPage extends HookConsumerWidget with PresLogger {
mainAxisExtent: 68,
),
itemBuilder: (context, index) {
final proxy = select.proxies[index];
final proxy = group.items[index];
return ProxyTile(
proxy,
selected: select.group.now == proxy.name,
delay: delays[proxy.name],
selected: group.selected == proxy.tag,
onSelect: () async {
if (selectActiveProxyMutation.state.isInProgress) {
return;
}
selectActiveProxyMutation.setFuture(
notifier.changeProxy(
select.group.name,
proxy.name,
group.tag,
proxy.tag,
),
);
},
);
},
itemCount: select.proxies.length,
itemCount: group.items.length,
);
},
),
],
),
floatingActionButton: FloatingActionButton(
onPressed: () async =>
// TODO: improve
ref.read(proxiesDelayNotifierProvider.notifier).testDelay(
select.proxies.map((e) => e.name),
),
onPressed: () async => notifier.urlTest(group.tag),
tooltip: t.proxies.delayTestTooltip.titleCase,
child: const Icon(Icons.bolt),
),

View File

@@ -1,5 +1,5 @@
import 'package:flutter/material.dart';
import 'package:hiddify/domain/clash/clash.dart';
import 'package:hiddify/domain/singbox/singbox.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
class ProxyTile extends HookConsumerWidget {
@@ -8,13 +8,11 @@ class ProxyTile extends HookConsumerWidget {
super.key,
required this.selected,
required this.onSelect,
this.delay,
});
final ClashProxy proxy;
final OutboundGroupItem proxy;
final bool selected;
final VoidCallback onSelect;
final int? delay;
@override
Widget build(BuildContext context, WidgetRef ref) {
@@ -23,10 +21,7 @@ class ProxyTile extends HookConsumerWidget {
return ListTile(
shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(12)),
title: Text(
switch (proxy) {
ClashProxyGroup(:final name) => name.toUpperCase(),
ClashProxyItem(:final name) => name,
},
proxy.tag,
overflow: TextOverflow.ellipsis,
),
leading: Padding(
@@ -40,38 +35,12 @@ class ProxyTile extends HookConsumerWidget {
),
),
),
subtitle: Text.rich(
TextSpan(
children: [
TextSpan(text: proxy.type.label),
// if (proxy.udp)
// WidgetSpan(
// child: Padding(
// padding: const EdgeInsets.symmetric(horizontal: 4),
// child: DecoratedBox(
// decoration: BoxDecoration(
// border: Border.all(
// color: theme.colorScheme.tertiaryContainer,
// ),
// borderRadius: BorderRadius.circular(6),
// ),
// child: Text(
// " UDP ",
// style: TextStyle(
// fontSize: theme.textTheme.labelSmall?.fontSize,
// ),
// ),
// ),
// ),
// ),
if (proxy case ClashProxyGroup(:final now)) ...[
TextSpan(text: " ($now)"),
],
],
),
subtitle: Text(
proxy.type.label,
overflow: TextOverflow.ellipsis,
),
trailing: delay != null ? Text(delay.toString()) : null,
trailing:
proxy.urlTestDelay != 0 ? Text(proxy.urlTestDelay.toString()) : null,
selected: selected,
onTap: onSelect,
horizontalTitleGap: 4,

View File

@@ -965,6 +965,38 @@ class SingboxNativeLibrary {
'stopCommandClient');
late final _stopCommandClient =
_stopCommandClientPtr.asFunction<ffi.Pointer<ffi.Char> Function(int)>();
ffi.Pointer<ffi.Char> selectOutbound(
ffi.Pointer<ffi.Char> groupTag,
ffi.Pointer<ffi.Char> outboundTag,
) {
return _selectOutbound(
groupTag,
outboundTag,
);
}
late final _selectOutboundPtr = _lookup<
ffi.NativeFunction<
ffi.Pointer<ffi.Char> Function(
ffi.Pointer<ffi.Char>, ffi.Pointer<ffi.Char>)>>('selectOutbound');
late final _selectOutbound = _selectOutboundPtr.asFunction<
ffi.Pointer<ffi.Char> Function(
ffi.Pointer<ffi.Char>, ffi.Pointer<ffi.Char>)>();
ffi.Pointer<ffi.Char> urlTest(
ffi.Pointer<ffi.Char> groupTag,
) {
return _urlTest(
groupTag,
);
}
late final _urlTestPtr = _lookup<
ffi.NativeFunction<
ffi.Pointer<ffi.Char> Function(ffi.Pointer<ffi.Char>)>>('urlTest');
late final _urlTest = _urlTestPtr
.asFunction<ffi.Pointer<ffi.Char> Function(ffi.Pointer<ffi.Char>)>();
}
typedef va_list = ffi.Pointer<ffi.Char>;

View File

@@ -18,6 +18,7 @@ class FFISingboxService with InfraLogger implements SingboxService {
static final SingboxNativeLibrary _box = _gen();
Stream<String>? _statusStream;
Stream<String>? _groupsStream;
static SingboxNativeLibrary _gen() {
String fullPath = "";
@@ -163,6 +164,85 @@ class FFISingboxService with InfraLogger implements SingboxService {
return _statusStream = statusStream;
}
@override
Stream<String> watchOutbounds() {
if (_groupsStream != null) return _groupsStream!;
final receiver = ReceivePort('outbounds receiver');
final groupsStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping group command client");
final err = _box.stopCommandClient(4).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.warning("error stopping group client");
}
receiver.close();
_groupsStream = null;
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
loggy.warning("[group client] error received: $event");
throw event.replaceFirst('error:', "");
}
return event;
}
loggy.warning("[group client] unexpected type, msg: $event");
throw "invalid type";
},
);
final err = _box
.startCommandClient(4, receiver.sendPort.nativePort)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
loggy.warning("error starting group command: $err");
throw err;
}
return _groupsStream = groupsStream;
}
@override
TaskEither<String, Unit> selectOutbound(String groupTag, String outboundTag) {
return TaskEither(
() => CombineWorker().execute(
() {
final err = _box
.selectOutbound(
groupTag.toNativeUtf8().cast(),
outboundTag.toNativeUtf8().cast(),
)
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
TaskEither<String, Unit> urlTest(String groupTag) {
return TaskEither(
() => CombineWorker().execute(
() {
final err = _box
.urlTest(groupTag.toNativeUtf8().cast())
.cast<Utf8>()
.toDartString();
if (err.isNotEmpty) {
return left(err);
}
return right(unit);
},
),
);
}
@override
Stream<String> watchLogs(String path) {
var linesRead = 0;

View File

@@ -67,12 +67,53 @@ class MobileSingboxService with InfraLogger implements SingboxService {
);
}
@override
Stream<String> watchOutbounds() {
const channel = EventChannel("com.hiddify.app/groups");
loggy.debug("watching outbounds");
return channel.receiveBroadcastStream().map(
(event) {
if (event case String _) {
return event;
}
throw "invalid type";
},
);
}
@override
Stream<String> watchStatus() {
// TODO: implement watchStatus
return const Stream.empty();
}
@override
TaskEither<String, Unit> selectOutbound(String groupTag, String outboundTag) {
return TaskEither(
() async {
loggy.debug("selecting outbound");
await _methodChannel.invokeMethod(
"select_outbound",
{"groupTag": groupTag, "outboundTag": outboundTag},
);
return right(unit);
},
);
}
@override
TaskEither<String, Unit> urlTest(String groupTag) {
return TaskEither(
() async {
await _methodChannel.invokeMethod(
"url_test",
{"groupTag": groupTag},
);
return right(unit);
},
);
}
@override
Stream<String> watchLogs(String path) {
return _logsChannel.receiveBroadcastStream().map(

View File

@@ -26,6 +26,12 @@ abstract interface class SingboxService {
TaskEither<String, Unit> stop();
Stream<String> watchOutbounds();
TaskEither<String, Unit> selectOutbound(String groupTag, String outboundTag);
TaskEither<String, Unit> urlTest(String groupTag);
Stream<String> watchStatus();
Stream<String> watchLogs(String path);