新增 Rust 实现的多线程下载器,优化下载可靠性

This commit is contained in:
2023-11-05 15:56:48 +08:00
parent 3409a8597f
commit 5bc7024fe7
17 changed files with 1379 additions and 421 deletions

View File

@ -8,4 +8,9 @@ crate-type = ["staticlib", "cdylib"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flutter_rust_bridge = "1"
flutter_rust_bridge = "1"
http-downloader = { version = "0.3.2", features = ["status-tracker", "speed-tracker", "breakpoint-resume", "bson-file-archiver"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
url = "2.4.1"
uuid = { version = "1.5.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
async-std = "1.12.0"

View File

@ -1,63 +1,16 @@
// This is the entry point of your Rust library.
// When adding new code to your project, note that only items used
// here will be transformed to their Dart equivalents.
use std::sync::Arc;
use async_std::task;
use flutter_rust_bridge::StreamSink;
use crate::downloader::{do_cancel_download, do_start_download, DownloadCallbackData};
// A plain enum without any fields. This is similar to Dart- or C-style enums.
// flutter_rust_bridge is capable of generating code for enums with fields
// (@freezed classes in Dart and tagged unions in C).
pub enum Platform {
Unknown,
Android,
Ios,
Windows,
Unix,
MacIntel,
MacApple,
Wasm,
pub fn ping() -> String {
return String::from("PONG");
}
// A function definition in Rust. Similar to Dart, the return type must always be named
// and is never inferred.
pub fn platform() -> Platform {
// This is a macro, a special expression that expands into code. In Rust, all macros
// end with an exclamation mark and can be invoked with all kinds of brackets (parentheses,
// brackets and curly braces). However, certain conventions exist, for example the
// vector macro is almost always invoked as vec![..].
//
// The cfg!() macro returns a boolean value based on the current compiler configuration.
// When attached to expressions (#[cfg(..)] form), they show or hide the expression at compile time.
// Here, however, they evaluate to runtime values, which may or may not be optimized out
// by the compiler. A variety of configurations are demonstrated here which cover most of
// the modern oeprating systems. Try running the Flutter application on different machines
// and see if it matches your expected OS.
//
// Furthermore, in Rust, the last expression in a function is the return value and does
// not have the trailing semicolon. This entire if-else chain forms a single expression.
if cfg!(windows) {
Platform::Windows
} else if cfg!(target_os = "android") {
Platform::Android
} else if cfg!(target_os = "ios") {
Platform::Ios
} else if cfg!(all(target_os = "macos", target_arch = "aarch64")) {
Platform::MacApple
} else if cfg!(target_os = "macos") {
Platform::MacIntel
} else if cfg!(target_family = "wasm") {
Platform::Wasm
} else if cfg!(unix) {
Platform::Unix
} else {
Platform::Unknown
}
pub fn start_download(url: String, save_path: String, file_name: String, connection_count: u8, sink: StreamSink<DownloadCallbackData>) {
let _ = do_start_download(url, save_path, file_name, connection_count, Arc::new(sink));
}
pub fn add(left: usize, right: usize) -> usize {
left + right
}
// The convention for Rust identifiers is the snake_case,
// and they are automatically converted to camelCase on the Dart side.
pub fn rust_release_mode() -> bool {
cfg!(not(debug_assertions))
pub fn cancel_download(id: String) {
task::block_on(do_cancel_download(&id))
}

View File

@ -2,28 +2,65 @@ use super::*;
// Section: wire functions
#[no_mangle]
pub extern "C" fn wire_platform(port_: i64) {
wire_platform_impl(port_)
pub extern "C" fn wire_ping(port_: i64) {
wire_ping_impl(port_)
}
#[no_mangle]
pub extern "C" fn wire_add(port_: i64, left: usize, right: usize) {
wire_add_impl(port_, left, right)
pub extern "C" fn wire_start_download(
port_: i64,
url: *mut wire_uint_8_list,
save_path: *mut wire_uint_8_list,
file_name: *mut wire_uint_8_list,
connection_count: u8,
) {
wire_start_download_impl(port_, url, save_path, file_name, connection_count)
}
#[no_mangle]
pub extern "C" fn wire_rust_release_mode(port_: i64) {
wire_rust_release_mode_impl(port_)
pub extern "C" fn wire_cancel_download(port_: i64, id: *mut wire_uint_8_list) {
wire_cancel_download_impl(port_, id)
}
// Section: allocate functions
#[no_mangle]
pub extern "C" fn new_uint_8_list_0(len: i32) -> *mut wire_uint_8_list {
let ans = wire_uint_8_list {
ptr: support::new_leak_vec_ptr(Default::default(), len),
len,
};
support::new_leak_box_ptr(ans)
}
// Section: related functions
// Section: impl Wire2Api
impl Wire2Api<String> for *mut wire_uint_8_list {
fn wire2api(self) -> String {
let vec: Vec<u8> = self.wire2api();
String::from_utf8_lossy(&vec).into_owned()
}
}
impl Wire2Api<Vec<u8>> for *mut wire_uint_8_list {
fn wire2api(self) -> Vec<u8> {
unsafe {
let wrap = support::box_from_leak_ptr(self);
support::vec_from_leak_ptr(wrap.ptr, wrap.len)
}
}
}
// Section: wire structs
#[repr(C)]
#[derive(Clone)]
pub struct wire_uint_8_list {
ptr: *mut u8,
len: i32,
}
// Section: impl NewWithNullPtr
pub trait NewWithNullPtr {

View File

@ -1,43 +0,0 @@
use super::*;
// Section: wire functions
#[no_mangle]
pub extern "C" fn wire_add(port_: i64,left: usize,right: usize) {
wire_add_impl(port_,left,right)
}
// Section: allocate functions
// Section: related functions
// Section: impl Wire2Api
// Section: wire structs
// Section: impl NewWithNullPtr
pub trait NewWithNullPtr {
fn new_with_null_ptr() -> Self;
}
impl<T> NewWithNullPtr for *mut T {
fn new_with_null_ptr() -> Self {
std::ptr::null_mut()
}
}
// Section: sync execution mode utility
#[no_mangle]
pub extern "C" fn free_WireSyncReturn(ptr: support::WireSyncReturn) {
unsafe { let _ = support::box_from_leak_ptr(ptr); };
}

View File

@ -20,46 +20,65 @@ use std::sync::Arc;
// Section: imports
use crate::downloader::DownloadCallbackData;
use crate::downloader::MyDownloaderStatus;
use crate::downloader::MyNetworkItemPendingType;
// Section: wire functions
fn wire_platform_impl(port_: MessagePort) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, Platform, _>(
fn wire_ping_impl(port_: MessagePort) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, String, _>(
WrapInfo {
debug_name: "platform",
debug_name: "ping",
port: Some(port_),
mode: FfiCallMode::Normal,
},
move || move |task_callback| Result::<_, ()>::Ok(platform()),
move || move |task_callback| Result::<_, ()>::Ok(ping()),
)
}
fn wire_add_impl(
fn wire_start_download_impl(
port_: MessagePort,
left: impl Wire2Api<usize> + UnwindSafe,
right: impl Wire2Api<usize> + UnwindSafe,
url: impl Wire2Api<String> + UnwindSafe,
save_path: impl Wire2Api<String> + UnwindSafe,
file_name: impl Wire2Api<String> + UnwindSafe,
connection_count: impl Wire2Api<u8> + UnwindSafe,
) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, usize, _>(
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, (), _>(
WrapInfo {
debug_name: "add",
debug_name: "start_download",
port: Some(port_),
mode: FfiCallMode::Stream,
},
move || {
let api_url = url.wire2api();
let api_save_path = save_path.wire2api();
let api_file_name = file_name.wire2api();
let api_connection_count = connection_count.wire2api();
move |task_callback| {
Result::<_, ()>::Ok(start_download(
api_url,
api_save_path,
api_file_name,
api_connection_count,
task_callback.stream_sink::<_, DownloadCallbackData>(),
))
}
},
)
}
fn wire_cancel_download_impl(port_: MessagePort, id: impl Wire2Api<String> + UnwindSafe) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, (), _>(
WrapInfo {
debug_name: "cancel_download",
port: Some(port_),
mode: FfiCallMode::Normal,
},
move || {
let api_left = left.wire2api();
let api_right = right.wire2api();
move |task_callback| Result::<_, ()>::Ok(add(api_left, api_right))
let api_id = id.wire2api();
move |task_callback| Result::<_, ()>::Ok(cancel_download(api_id))
},
)
}
fn wire_rust_release_mode_impl(port_: MessagePort) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, bool, _>(
WrapInfo {
debug_name: "rust_release_mode",
port: Some(port_),
mode: FfiCallMode::Normal,
},
move || move |task_callback| Result::<_, ()>::Ok(rust_release_mode()),
)
}
// Section: wrapper structs
// Section: static checks
@ -82,30 +101,66 @@ where
(!self.is_null()).then(|| self.wire2api())
}
}
impl Wire2Api<usize> for usize {
fn wire2api(self) -> usize {
impl Wire2Api<u8> for u8 {
fn wire2api(self) -> u8 {
self
}
}
// Section: impl IntoDart
impl support::IntoDart for Platform {
impl support::IntoDart for DownloadCallbackData {
fn into_dart(self) -> support::DartAbi {
vec![
self.id.into_into_dart().into_dart(),
self.total.into_into_dart().into_dart(),
self.progress.into_into_dart().into_dart(),
self.speed.into_into_dart().into_dart(),
self.status.into_into_dart().into_dart(),
]
.into_dart()
}
}
impl support::IntoDartExceptPrimitive for DownloadCallbackData {}
impl rust2dart::IntoIntoDart<DownloadCallbackData> for DownloadCallbackData {
fn into_into_dart(self) -> Self {
self
}
}
impl support::IntoDart for MyDownloaderStatus {
fn into_dart(self) -> support::DartAbi {
match self {
Self::Unknown => 0,
Self::Android => 1,
Self::Ios => 2,
Self::Windows => 3,
Self::Unix => 4,
Self::MacIntel => 5,
Self::MacApple => 6,
Self::Wasm => 7,
Self::NoStart => vec![0.into_dart()],
Self::Running => vec![1.into_dart()],
Self::Pending(field0) => vec![2.into_dart(), field0.into_into_dart().into_dart()],
Self::Error(field0) => vec![3.into_dart(), field0.into_into_dart().into_dart()],
Self::Finished => vec![4.into_dart()],
}
.into_dart()
}
}
impl support::IntoDartExceptPrimitive for Platform {}
impl rust2dart::IntoIntoDart<Platform> for Platform {
impl support::IntoDartExceptPrimitive for MyDownloaderStatus {}
impl rust2dart::IntoIntoDart<MyDownloaderStatus> for MyDownloaderStatus {
fn into_into_dart(self) -> Self {
self
}
}
impl support::IntoDart for MyNetworkItemPendingType {
fn into_dart(self) -> support::DartAbi {
match self {
Self::QueueUp => 0,
Self::Starting => 1,
Self::Stopping => 2,
Self::Initializing => 3,
}
.into_dart()
}
}
impl support::IntoDartExceptPrimitive for MyNetworkItemPendingType {}
impl rust2dart::IntoIntoDart<MyNetworkItemPendingType> for MyNetworkItemPendingType {
fn into_into_dart(self) -> Self {
self
}

183
rust/src/downloader/mod.rs Normal file
View File

@ -0,0 +1,183 @@
use std::collections::HashMap;
use std::error::Error;
use std::num::{NonZeroU8, NonZeroUsize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_std::sync::Mutex;
use http_downloader::{breakpoint_resume::DownloadBreakpointResumeExtension, ExtendedHttpFileDownloader, HttpDownloaderBuilder, speed_tracker::DownloadSpeedTrackerExtension, status_tracker::DownloadStatusTrackerExtension};
use http_downloader::bson_file_archiver::{ArchiveFilePath, BsonFileArchiverBuilder};
use url::Url;
use flutter_rust_bridge::StreamSink;
use flutter_rust_bridge::support::lazy_static;
use http_downloader::status_tracker::{DownloaderStatus, NetworkItemPendingType};
use uuid::Uuid;
pub enum MyNetworkItemPendingType {
QueueUp,
Starting,
Stopping,
Initializing,
}
pub enum MyDownloaderStatus {
NoStart,
Running,
Pending(MyNetworkItemPendingType),
Error(String),
Finished,
}
pub struct DownloadCallbackData {
pub id: String,
pub total: u64,
pub progress: u64,
pub speed: u64,
pub status: MyDownloaderStatus,
}
impl DownloadCallbackData {
pub fn new(id: String, total: u64) -> Self {
DownloadCallbackData {
id,
total,
progress: 0,
speed: 0,
status: MyDownloaderStatus::NoStart,
}
}
}
lazy_static! {
static ref DOWNLOADERS_MAP: Mutex<HashMap<String, Arc<ExtendedHttpFileDownloader>>> = {
let map = HashMap::new();
Mutex::new(map)
};
}
pub async fn do_cancel_download(id: &str) {
let d = get_downloader(id).await;
if d.is_none() {
return;
}
d.unwrap().cancel().await
}
#[tokio::main]
pub async fn do_start_download(url: String, save_path: String, file_name: String, connection_count: u8, sink: Arc<StreamSink<DownloadCallbackData>>) -> Result<(), Box<dyn Error>> {
let save_dir = PathBuf::from(save_path);
let test_url = Url::parse(&*url)?;
let (mut downloader, (status_state, speed_state, ..)) =
HttpDownloaderBuilder::new(test_url, save_dir)
.chunk_size(NonZeroUsize::new(1024 * 1024 * 10).unwrap()) // 块大小
.download_connection_count(NonZeroU8::new(connection_count).unwrap())
.file_name(Option::from(file_name))
.build((
// 下载状态追踪扩展
// by cargo feature "status-tracker" enable
DownloadStatusTrackerExtension { log: true },
// 下载速度追踪扩展
// by cargo feature "speed-tracker" enable
DownloadSpeedTrackerExtension { log: true },
// 断点续传扩展,
// by cargo feature "breakpoint-resume" enable
DownloadBreakpointResumeExtension {
// BsonFileArchiver by cargo feature "bson-file-archiver" enable
download_archiver_builder: BsonFileArchiverBuilder::new(ArchiveFilePath::Suffix("bson".to_string()))
}
));
let status_state_arc = Arc::new(status_state);
let status_state_clone = Arc::clone(&status_state_arc);
let id = Uuid::new_v4();
// info!("Prepare download准备下载");
let download_future = downloader.prepare_download()?;
let sink_clone = sink.clone();
add_downloader(&id.to_string(), Arc::new(downloader)).await;
// 打印下载进度
// Print download Progress
tokio::spawn({
let mut downloaded_len_receiver = get_downloader(&id.to_string()).await.unwrap().downloaded_len_receiver().clone();
let total_size_future = get_downloader(&id.to_string()).await.unwrap().total_size_future();
async move {
let total_len = total_size_future.await;
if let Some(total_len) = total_len {
// info!("Total size: {:.2} Mb",total_len.get() as f64 / 1024_f64/ 1024_f64);
sink_clone.add(DownloadCallbackData::new(id.to_string(), total_len.get()));
}
while downloaded_len_receiver.changed().await.is_ok() {
let p = *downloaded_len_receiver.borrow();
let _status = status_state_clone.status(); // get download status 获取状态
let _byte_per_second = speed_state.download_speed(); // get download speedByte per second获取速度字节每秒
if let Some(total_len) = total_len {
sink_clone.add(DownloadCallbackData {
id: id.to_string(),
total: total_len.get(),
progress: p,
speed: _byte_per_second,
status: get_my_status(_status),
});
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});
download_future.await?;
let _status = status_state_arc.status();
sink.add(DownloadCallbackData {
id: id.to_string(),
total: 0,
progress: 0,
speed: 0,
status: get_my_status(_status),
});
sink.close();
remove_downloader(&id.to_string()).await;
println!("rust downloader download complete");
Ok(())
}
async fn remove_downloader(id: &str) {
let mut downloader_map = DOWNLOADERS_MAP.lock().await;
downloader_map.remove(id);
}
async fn get_downloader(id: &str) -> Option<Arc<ExtendedHttpFileDownloader>> {
let downloader_map = DOWNLOADERS_MAP.lock().await;
return if let Some(downloader) = downloader_map.get(id) {
Some(downloader.clone())
} else {
None
};
}
async fn add_downloader(id: &str, d: Arc<ExtendedHttpFileDownloader>) {
let mut downloader_map = DOWNLOADERS_MAP.lock().await;
downloader_map.insert(id.to_string(), d);
}
fn get_my_status(_status: DownloaderStatus) -> MyDownloaderStatus {
match _status {
DownloaderStatus::NoStart => { MyDownloaderStatus::NoStart }
DownloaderStatus::Running => { MyDownloaderStatus::Running }
DownloaderStatus::Pending(n) => { MyDownloaderStatus::Pending(get_my_network_type(n)) }
DownloaderStatus::Error(e) => { MyDownloaderStatus::Error(e) }
DownloaderStatus::Finished => { MyDownloaderStatus::Finished }
}
}
fn get_my_network_type(n: NetworkItemPendingType) -> MyNetworkItemPendingType {
match n {
NetworkItemPendingType::QueueUp => { MyNetworkItemPendingType::QueueUp }
NetworkItemPendingType::Starting => { MyNetworkItemPendingType::Starting }
NetworkItemPendingType::Stopping => { MyNetworkItemPendingType::Stopping }
NetworkItemPendingType::Initializing => { MyNetworkItemPendingType::Initializing }
}
}

View File

@ -1,2 +1,3 @@
mod api;
mod bridge_generated; /* AUTO INJECTED BY flutter_rust_bridge. This line may not be accurate, and you can change it according to your needs. */
mod bridge_generated;
mod downloader;