From 3c0c80abd249923d56ca96fd7c369ab30e9fd609 Mon Sep 17 00:00:00 2001 From: xkeyC <3334969096@qq.com> Date: Sun, 25 Feb 2024 10:30:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AD=90=E8=BF=9B=E7=A8=8B=E5=88=87=E6=8D=A2?= =?UTF-8?q?=E5=88=B0=20rust=20win32job=20=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/common/io/aria2c.dart | 75 +++++++++++++----------- lib/common/rust/api/process_api.dart | 18 ++++++ lib/common/rust/frb_generated.dart | 38 +++++++++++++ lib/common/rust/frb_generated.io.dart | 1 + lib/common/rust/frb_generated.web.dart | 1 + pubspec.yaml | 1 - rust/Cargo.toml | 3 +- rust/src/api/http_api.rs | 1 - rust/src/api/mod.rs | 2 + rust/src/api/process_api.rs | 79 ++++++++++++++++++++++++++ rust/src/frb_generated.rs | 48 ++++++++++++++++ rust/src/lib.rs | 4 +- 12 files changed, 232 insertions(+), 39 deletions(-) create mode 100644 lib/common/rust/api/process_api.dart create mode 100644 rust/src/api/process_api.rs diff --git a/lib/common/io/aria2c.dart b/lib/common/io/aria2c.dart index 209ff3d..674569d 100644 --- a/lib/common/io/aria2c.dart +++ b/lib/common/io/aria2c.dart @@ -1,4 +1,3 @@ -import 'dart:convert'; import 'dart:io'; import 'dart:math'; @@ -11,8 +10,11 @@ import 'package:starcitizen_doctor/common/conf/app_conf.dart'; import 'package:starcitizen_doctor/common/conf/binary_conf.dart'; import 'package:starcitizen_doctor/common/helper/system_helper.dart'; +import 'package:starcitizen_doctor/common/rust/api/process_api.dart' + as rs_process; + class Aria2cManager { - static int? _daemonPID; + static bool _isDaemonRunning = false; static final String _aria2cDir = "${AppConf.applicationSupportDir}\\modules\\aria2c"; @@ -24,7 +26,7 @@ class Aria2cManager { throw "not connect!"; } - static bool get isAvailable => _daemonPID != null && _aria2c != null; + static bool get isAvailable => _isDaemonRunning && _aria2c != null; static Future checkLazyLoad() async { try { @@ -40,7 +42,7 @@ class Aria2cManager { } static Future launchDaemon() async { - if (_daemonPID != null) return; + if (_isDaemonRunning) return; await BinaryModuleConf.extractModule(["aria2c"]); /// skip for debug hot reload @@ -63,9 +65,10 @@ class Aria2cManager { final trackerList = await Api.getTorrentTrackerList(); dPrint("trackerList === $trackerList"); dPrint("Aria2cManager .----- aria2c start $port------"); - final p = await Process.start( - exePath, - [ + + final stream = rs_process.startProcess( + executable: exePath, + arguments: [ "-V", "-c", "-x 10", @@ -82,36 +85,24 @@ class Aria2cManager { "--seed-time=0", ], workingDirectory: _aria2cDir); - p.stdout.transform(utf8.decoder).listen((event) async { - if (event.trim().isEmpty) return; - dPrint("[aria2c]: ${event.trim()}"); - if (event.contains("IPv4 RPC: listening on TCP port")) { - _daemonPID = p.pid; - _aria2c = Aria2c("ws://127.0.0.1:$port/jsonrpc", "websocket", pwd); - _aria2c!.getVersion().then((value) { - dPrint("Aria2cManager.connected! version == ${value.version}"); - }); - final box = await Hive.openBox("app_conf"); - _aria2c!.changeGlobalOption(Aria2Option() - ..maxOverallUploadLimit = - textToByte(box.get("downloader_up_limit", defaultValue: "0")) - ..maxOverallDownloadLimit = - textToByte(box.get("downloader_down_limit", defaultValue: "0")) - ..btTracker = trackerList); + + stream.listen((event) { + dPrint("Aria2cManager.rs_process event === $event"); + if (event.startsWith("output:")) { + if (event.contains("IPv4 RPC: listening on TCP port")) { + _onLaunch(port, pwd, trackerList); + } + } else if (event.startsWith("error:")) { + _isDaemonRunning = false; + _aria2c = null; + } else if (event.startsWith("exit:")) { + _isDaemonRunning = false; + _aria2c = null; } - }, onDone: () { - dPrint("[aria2c] onDone: "); - _daemonPID = null; - }, onError: (e) { - dPrint("[aria2c] stdout ERROR: $e"); - _daemonPID = null; - }); - p.pid; - p.stderr.transform(utf8.decoder).listen((event) { - dPrint("[aria2c] stderr ERROR : $event"); }); + while (true) { - if (_daemonPID != null) return; + if (_isDaemonRunning) return; await Future.delayed(const Duration(milliseconds: 100)); } } @@ -150,4 +141,20 @@ class Aria2cManager { } return 0; } + + static Future _onLaunch( + int port, String pwd, String trackerList) async { + _isDaemonRunning = true; + _aria2c = Aria2c("ws://127.0.0.1:$port/jsonrpc", "websocket", pwd); + _aria2c!.getVersion().then((value) { + dPrint("Aria2cManager.connected! version == ${value.version}"); + }); + final box = await Hive.openBox("app_conf"); + _aria2c!.changeGlobalOption(Aria2Option() + ..maxOverallUploadLimit = + textToByte(box.get("downloader_up_limit", defaultValue: "0")) + ..maxOverallDownloadLimit = + textToByte(box.get("downloader_down_limit", defaultValue: "0")) + ..btTracker = trackerList); + } } diff --git a/lib/common/rust/api/process_api.dart b/lib/common/rust/api/process_api.dart new file mode 100644 index 0000000..f9dcf2e --- /dev/null +++ b/lib/common/rust/api/process_api.dart @@ -0,0 +1,18 @@ +// This file is automatically generated, so please do not edit it. +// Generated by `flutter_rust_bridge`@ 2.0.0-dev.24. + +// ignore_for_file: invalid_use_of_internal_member, unused_import, unnecessary_import + +import '../frb_generated.dart'; +import 'package:flutter_rust_bridge/flutter_rust_bridge_for_generated.dart'; + +Stream startProcess( + {required String executable, + required List arguments, + required String workingDirectory, + dynamic hint}) => + RustLib.instance.api.startProcess( + executable: executable, + arguments: arguments, + workingDirectory: workingDirectory, + hint: hint); diff --git a/lib/common/rust/frb_generated.dart b/lib/common/rust/frb_generated.dart index bafab51..1c680a5 100644 --- a/lib/common/rust/frb_generated.dart +++ b/lib/common/rust/frb_generated.dart @@ -4,6 +4,7 @@ // ignore_for_file: unused_import, unused_element, unnecessary_import, duplicate_ignore, invalid_use_of_internal_member, annotate_overrides, non_constant_identifier_names, curly_braces_in_flow_control_structures, prefer_const_literals_to_create_immutables, unused_field import 'api/http_api.dart'; +import 'api/process_api.dart'; import 'dart:async'; import 'dart:convert'; import 'frb_generated.io.dart' if (dart.library.html) 'frb_generated.web.dart'; @@ -75,6 +76,12 @@ abstract class RustLibApi extends BaseApi { Future setDefaultHeader( {required Map headers, dynamic hint}); + Stream startProcess( + {required String executable, + required List arguments, + required String workingDirectory, + dynamic hint}); + RustArcIncrementStrongCountFnType get rust_arc_increment_strong_count_ReqwestVersion; @@ -177,6 +184,37 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi { argNames: ["headers"], ); + @override + Stream startProcess( + {required String executable, + required List arguments, + required String workingDirectory, + dynamic hint}) { + return handler.executeStream(StreamTask( + callFfi: (port_) { + final serializer = SseSerializer(generalizedFrbRustBinding); + sse_encode_String(executable, serializer); + sse_encode_list_String(arguments, serializer); + sse_encode_String(workingDirectory, serializer); + pdeCallFfi(generalizedFrbRustBinding, serializer, + funcId: 4, port: port_); + }, + codec: SseCodec( + decodeSuccessData: sse_decode_String, + decodeErrorData: null, + ), + constMeta: kStartProcessConstMeta, + argValues: [executable, arguments, workingDirectory], + apiImpl: this, + hint: hint, + )); + } + + TaskConstMeta get kStartProcessConstMeta => const TaskConstMeta( + debugName: "start_process", + argNames: ["executable", "arguments", "workingDirectory"], + ); + RustArcIncrementStrongCountFnType get rust_arc_increment_strong_count_ReqwestVersion => wire .rust_arc_increment_strong_count_RustOpaque_flutter_rust_bridgefor_generatedrust_asyncRwLockreqwestVersion; diff --git a/lib/common/rust/frb_generated.io.dart b/lib/common/rust/frb_generated.io.dart index 28eda35..ae3ae3d 100644 --- a/lib/common/rust/frb_generated.io.dart +++ b/lib/common/rust/frb_generated.io.dart @@ -4,6 +4,7 @@ // ignore_for_file: unused_import, unused_element, unnecessary_import, duplicate_ignore, invalid_use_of_internal_member, annotate_overrides, non_constant_identifier_names, curly_braces_in_flow_control_structures, prefer_const_literals_to_create_immutables, unused_field import 'api/http_api.dart'; +import 'api/process_api.dart'; import 'dart:async'; import 'dart:convert'; import 'dart:ffi' as ffi; diff --git a/lib/common/rust/frb_generated.web.dart b/lib/common/rust/frb_generated.web.dart index e5a13b1..c0447a9 100644 --- a/lib/common/rust/frb_generated.web.dart +++ b/lib/common/rust/frb_generated.web.dart @@ -4,6 +4,7 @@ // ignore_for_file: unused_import, unused_element, unnecessary_import, duplicate_ignore, invalid_use_of_internal_member, annotate_overrides, non_constant_identifier_names, curly_braces_in_flow_control_structures, prefer_const_literals_to_create_immutables, unused_field import 'api/http_api.dart'; +import 'api/process_api.dart'; import 'dart:async'; import 'dart:convert'; import 'frb_generated.dart'; diff --git a/pubspec.yaml b/pubspec.yaml index 2946cd1..9f6a32e 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -76,7 +76,6 @@ dependencies: #git: https://github.com/xkeyC/dart_aria2_rpc.git path: ../../xkeyC/dart_aria2_rpc intl: ^0.18.0 - dependency_overrides: http: ^1.1.2 diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 3ec2a7d..07f00d9 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -12,7 +12,7 @@ crate-type = ["cdylib", "staticlib"] [dependencies] flutter_rust_bridge = "=2.0.0-dev.24" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros","process"] } url = "2.5.0" uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] } async-std = "1.12.0" @@ -21,3 +21,4 @@ once_cell = "1.19.0" reqwest = { version = "0.11", features = ["rustls-tls-native-roots", "cookies", "gzip", "json","stream"] } hickory-resolver = {version = "0.24.0"} anyhow = "1.0" +win32job = "2" diff --git a/rust/src/api/http_api.rs b/rust/src/api/http_api.rs index cf92406..18e4ba1 100644 --- a/rust/src/api/http_api.rs +++ b/rust/src/api/http_api.rs @@ -3,7 +3,6 @@ use hyper::Method; use crate::http_package; use crate::http_package::RustHttpResponse; - pub enum MyMethod { Options, Gets, diff --git a/rust/src/api/mod.rs b/rust/src/api/mod.rs index 4aad6df..e1be596 100644 --- a/rust/src/api/mod.rs +++ b/rust/src/api/mod.rs @@ -2,3 +2,5 @@ // Do not put code in `mod.rs`, but put in e.g. `simple.rs`. // pub mod http_api; + +pub mod process_api; \ No newline at end of file diff --git a/rust/src/api/process_api.rs b/rust/src/api/process_api.rs new file mode 100644 index 0000000..15b6713 --- /dev/null +++ b/rust/src/api/process_api.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, BufReader}; +use crate::frb_generated::StreamSink; + +pub async fn start_process( + executable: String, + arguments: Vec, + working_directory: String, + stream_sink: StreamSink, +) { + let stream_sink_arc = Arc::from(stream_sink); + + let mut command = tokio::process::Command::new(&executable); + command + .args(arguments) + .current_dir(working_directory) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .kill_on_drop(true); + + command.creation_flags(0x08000000); + + let job = win32job::Job::create().unwrap(); + let mut info = job.query_extended_limit_info().unwrap(); + info.limit_kill_on_job_close(); + job.set_extended_limit_info(&mut info).unwrap(); + job.assign_current_process().unwrap(); + + + if let Ok(mut child) = command.spawn() { + let stdout = child.stdout.take().expect("Failed to open stdout"); + let stderr = child.stderr.take().expect("Failed to open stderr"); + let output_task = tokio::spawn(process_output(stdout, stream_sink_arc.clone())); + let error_task = tokio::spawn(process_error(stderr, stream_sink_arc.clone())); + + tokio::select! { + _ = output_task => (), + _ = error_task => (), + } + + let exit_status = child.wait().await.expect("Failed to wait for child process"); + if !exit_status.success() { + eprintln!("Child process exited with an error: {:?}", exit_status); + stream_sink_arc.add("exit:".to_string()).unwrap(); + } + } else { + eprintln!("Failed to start {}", executable); + stream_sink_arc.add("error:Failed to start".to_string()).unwrap(); + } +} + +async fn process_output(stdout: R, stream_sink: Arc>) + where + R: tokio::io::AsyncRead + Unpin, +{ + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await.unwrap() { + if line.trim().is_empty() { + continue; + } + println!("{}", line.trim()); + stream_sink.add("output:".to_string() + &*line.trim().to_string()).unwrap(); + } +} + +async fn process_error(stderr: R, stream_sink: Arc>) + where + R: tokio::io::AsyncRead + Unpin, +{ + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await.unwrap() { + println!("{}", line.trim()); + stream_sink.add("error:".to_string() + &*line.trim().to_string()).unwrap(); + } +} + diff --git a/rust/src/frb_generated.rs b/rust/src/frb_generated.rs index 253b969..adc876a 100644 --- a/rust/src/frb_generated.rs +++ b/rust/src/frb_generated.rs @@ -150,6 +150,53 @@ fn wire_set_default_header_impl( }, ) } +fn wire_start_process_impl( + port_: flutter_rust_bridge::for_generated::MessagePort, + ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr, + rust_vec_len_: i32, + data_len_: i32, +) { + FLUTTER_RUST_BRIDGE_HANDLER.wrap_async::( + flutter_rust_bridge::for_generated::TaskInfo { + debug_name: "start_process", + port: Some(port_), + mode: flutter_rust_bridge::for_generated::FfiCallMode::Stream, + }, + move || { + let message = unsafe { + flutter_rust_bridge::for_generated::Dart2RustMessageSse::from_wire( + ptr_, + rust_vec_len_, + data_len_, + ) + }; + let mut deserializer = + flutter_rust_bridge::for_generated::SseDeserializer::new(message); + let api_executable = ::sse_decode(&mut deserializer); + let api_arguments = >::sse_decode(&mut deserializer); + let api_working_directory = ::sse_decode(&mut deserializer); + deserializer.end(); + move |context| async move { + transform_result_sse( + (move || async move { + Result::<_, ()>::Ok( + crate::api::process_api::start_process( + api_executable, + api_arguments, + api_working_directory, + StreamSink::new( + context.rust2dart_context().stream_sink::<_, String>(), + ), + ) + .await, + ) + })() + .await, + ) + } + }, + ) +} // Section: related_funcs @@ -382,6 +429,7 @@ fn pde_ffi_dispatcher_primary_impl( 3 => wire_dns_lookup_txt_impl(port, ptr, rust_vec_len, data_len), 2 => wire_fetch_impl(port, ptr, rust_vec_len, data_len), 1 => wire_set_default_header_impl(port, ptr, rust_vec_len, data_len), + 4 => wire_start_process_impl(port, ptr, rust_vec_len, data_len), _ => unreachable!(), } } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index bec7fbc..51d22c2 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -1,3 +1,3 @@ mod frb_generated; -mod api; -mod http_package; +pub mod api; +pub mod http_package;