移除 rust 多线程下载器

This commit is contained in:
2024-02-22 20:38:35 +08:00
parent 02101db99d
commit 46808e96c5
15 changed files with 78 additions and 1015 deletions

View File

@ -12,7 +12,6 @@ crate-type = ["cdylib", "staticlib"]
[dependencies]
flutter_rust_bridge = "=2.0.0-dev.24"
http-downloader = { version = "0.3.2", features = ["status-tracker", "speed-tracker", "breakpoint-resume", "bson-file-archiver"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
url = "2.5.0"
uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] }

View File

@ -1,11 +0,0 @@
use std::sync::Arc;
use crate::downloader::{do_cancel_download, do_start_download, DownloadCallbackData};
use crate::frb_generated::StreamSink;
pub async fn start_download(url: String, save_path: String, file_name: String, connection_count: u8, sink: StreamSink<DownloadCallbackData>) {
do_start_download(url, save_path, file_name, connection_count, Arc::new(sink)).await.unwrap();
}
pub async fn cancel_download(id: String) {
do_cancel_download(&id).await
}

View File

@ -1,6 +1,4 @@
//
// Do not put code in `mod.rs`, but put in e.g. `simple.rs`.
//
pub mod downloader_api;
pub mod http_api;

View File

@ -1,180 +0,0 @@
use std::collections::HashMap;
use std::num::{NonZeroU8, NonZeroUsize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_std::sync::Mutex;
use flutter_rust_bridge::for_generated::lazy_static;
use http_downloader::{breakpoint_resume::DownloadBreakpointResumeExtension, ExtendedHttpFileDownloader, HttpDownloaderBuilder, speed_tracker::DownloadSpeedTrackerExtension, status_tracker::DownloadStatusTrackerExtension};
use http_downloader::bson_file_archiver::{ArchiveFilePath, BsonFileArchiverBuilder};
use url::Url;
use crate::frb_generated::StreamSink;
use http_downloader::status_tracker::{DownloaderStatus, NetworkItemPendingType};
use uuid::Uuid;
pub enum MyNetworkItemPendingType {
QueueUp,
Starting,
Stopping,
Initializing,
}
pub enum MyDownloaderStatus {
NoStart,
Running,
Pending(MyNetworkItemPendingType),
Error(String),
Finished,
}
pub struct DownloadCallbackData {
pub id: String,
pub total: u64,
pub progress: u64,
pub speed: u64,
pub status: MyDownloaderStatus,
}
impl DownloadCallbackData {
pub fn new(id: String, total: u64) -> Self {
DownloadCallbackData {
id,
total,
progress: 0,
speed: 0,
status: MyDownloaderStatus::NoStart,
}
}
}
lazy_static! {
static ref DOWNLOADERS_MAP: Mutex<HashMap<String, Arc<ExtendedHttpFileDownloader>>> = {
let map = HashMap::new();
Mutex::new(map)
};
}
pub async fn do_cancel_download(id: &str) {
let d = get_downloader(id).await;
if d.is_none() {
return;
}
d.unwrap().cancel().await
}
pub async fn do_start_download(url: String, save_path: String, file_name: String, connection_count: u8, sink: Arc<StreamSink<DownloadCallbackData>>) -> anyhow::Result<()> {
let save_dir = PathBuf::from(save_path);
let test_url = Url::parse(&*url)?;
let (mut downloader, (status_state, speed_state, ..)) =
HttpDownloaderBuilder::new(test_url, save_dir)
.chunk_size(NonZeroUsize::new(1024 * 1024 * 10).unwrap()) // 块大小
.download_connection_count(NonZeroU8::new(connection_count).unwrap())
.file_name(Option::from(file_name))
.build((
// 下载状态追踪扩展
// by cargo feature "status-tracker" enable
DownloadStatusTrackerExtension { log: true },
// 下载速度追踪扩展
// by cargo feature "speed-tracker" enable
DownloadSpeedTrackerExtension { log: true },
// 断点续传扩展,
// by cargo feature "breakpoint-resume" enable
DownloadBreakpointResumeExtension {
// BsonFileArchiver by cargo feature "bson-file-archiver" enable
download_archiver_builder: BsonFileArchiverBuilder::new(ArchiveFilePath::Suffix("bson".to_string()))
}
));
let status_state_arc = Arc::new(status_state);
let status_state_clone = Arc::clone(&status_state_arc);
let id = Uuid::new_v4();
// info!("Prepare download准备下载");
let download_future = downloader.prepare_download()?;
let sink_clone = sink.clone();
add_downloader(&id.to_string(), Arc::new(downloader)).await;
// 打印下载进度
// Print download Progress
tokio::spawn({
let mut downloaded_len_receiver = get_downloader(&id.to_string()).await.unwrap().downloaded_len_receiver().clone();
let total_size_future = get_downloader(&id.to_string()).await.unwrap().total_size_future();
async move {
let total_len = total_size_future.await;
if let Some(total_len) = total_len {
// info!("Total size: {:.2} Mb",total_len.get() as f64 / 1024_f64/ 1024_f64);
sink_clone.add(DownloadCallbackData::new(id.to_string(), total_len.get())).unwrap();
}
while downloaded_len_receiver.changed().await.is_ok() {
let p = *downloaded_len_receiver.borrow();
let _status = status_state_clone.status(); // get download status 获取状态
let _byte_per_second = speed_state.download_speed(); // get download speedByte per second获取速度字节每秒
if let Some(total_len) = total_len {
sink_clone.add(DownloadCallbackData {
id: id.to_string(),
total: total_len.get(),
progress: p,
speed: _byte_per_second,
status: get_my_status(_status),
}).unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});
download_future.await?;
let _status = status_state_arc.status();
sink.add(DownloadCallbackData {
id: id.to_string(),
total: 0,
progress: 0,
speed: 0,
status: get_my_status(_status),
}).unwrap();
remove_downloader(&id.to_string()).await;
println!("rust downloader download complete");
Ok(())
}
async fn remove_downloader(id: &str) {
let mut downloader_map = DOWNLOADERS_MAP.lock().await;
downloader_map.remove(id);
}
async fn get_downloader(id: &str) -> Option<Arc<ExtendedHttpFileDownloader>> {
let downloader_map = DOWNLOADERS_MAP.lock().await;
return if let Some(downloader) = downloader_map.get(id) {
Some(downloader.clone())
} else {
None
};
}
async fn add_downloader(id: &str, d: Arc<ExtendedHttpFileDownloader>) {
let mut downloader_map = DOWNLOADERS_MAP.lock().await;
downloader_map.insert(id.to_string(), d);
}
fn get_my_status(_status: DownloaderStatus) -> MyDownloaderStatus {
match _status {
DownloaderStatus::NoStart => { MyDownloaderStatus::NoStart }
DownloaderStatus::Running => { MyDownloaderStatus::Running }
DownloaderStatus::Pending(n) => { MyDownloaderStatus::Pending(get_my_network_type(n)) }
DownloaderStatus::Error(e) => { MyDownloaderStatus::Error(e) }
DownloaderStatus::Finished => { MyDownloaderStatus::Finished }
}
}
fn get_my_network_type(n: NetworkItemPendingType) -> MyNetworkItemPendingType {
match n {
NetworkItemPendingType::QueueUp => { MyNetworkItemPendingType::QueueUp }
NetworkItemPendingType::Starting => { MyNetworkItemPendingType::Starting }
NetworkItemPendingType::Stopping => { MyNetworkItemPendingType::Stopping }
NetworkItemPendingType::Initializing => { MyNetworkItemPendingType::Initializing }
}
}

View File

@ -39,61 +39,6 @@ flutter_rust_bridge::frb_generated_default_handler!();
// Section: wire_funcs
fn wire_cancel_download_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr,
rust_vec_len_: i32,
data_len_: i32,
) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap_async::<flutter_rust_bridge::for_generated::SseCodec, _, _, _>(
flutter_rust_bridge::for_generated::TaskInfo {
debug_name: "cancel_download",
port: Some(port_),
mode: flutter_rust_bridge::for_generated::FfiCallMode::Normal,
},
move || {
let message = unsafe {
flutter_rust_bridge::for_generated::Dart2RustMessageSse::from_wire(
ptr_,
rust_vec_len_,
data_len_,
)
};
let mut deserializer =
flutter_rust_bridge::for_generated::SseDeserializer::new(message);
let api_id = <String>::sse_decode(&mut deserializer);
deserializer.end();
move |context| async move {
transform_result_sse(
(move || async move {
Result::<_, ()>::Ok(
crate::api::downloader_api::cancel_download(api_id).await,
)
})()
.await,
)
}
},
)
}
fn wire_start_download_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr,
rust_vec_len_: i32,
data_len_: i32,
) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap_async::<flutter_rust_bridge::for_generated::SseCodec,_,_,_>(flutter_rust_bridge::for_generated::TaskInfo{ debug_name: "start_download", port: Some(port_), mode: flutter_rust_bridge::for_generated::FfiCallMode::Stream }, move || {
let message = unsafe { flutter_rust_bridge::for_generated::Dart2RustMessageSse::from_wire(ptr_, rust_vec_len_, data_len_) };
let mut deserializer = flutter_rust_bridge::for_generated::SseDeserializer::new(message);
let api_url = <String>::sse_decode(&mut deserializer);
let api_save_path = <String>::sse_decode(&mut deserializer);
let api_file_name = <String>::sse_decode(&mut deserializer);
let api_connection_count = <u8>::sse_decode(&mut deserializer);deserializer.end(); move |context| async move {
transform_result_sse((move || async move {
Result::<_,()>::Ok(crate::api::downloader_api::start_download(api_url, api_save_path, api_file_name, api_connection_count, StreamSink::new(context.rust2dart_context().stream_sink::<_,crate::downloader::DownloadCallbackData>())).await)
})().await)
} })
}
fn wire_dns_lookup_txt_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
ptr_: flutter_rust_bridge::for_generated::PlatformGeneralizedUint8ListPtr,
@ -257,24 +202,6 @@ impl SseDecode for String {
}
}
impl SseDecode for crate::downloader::DownloadCallbackData {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
let mut var_id = <String>::sse_decode(deserializer);
let mut var_total = <u64>::sse_decode(deserializer);
let mut var_progress = <u64>::sse_decode(deserializer);
let mut var_speed = <u64>::sse_decode(deserializer);
let mut var_status = <crate::downloader::MyDownloaderStatus>::sse_decode(deserializer);
return crate::downloader::DownloadCallbackData {
id: var_id,
total: var_total,
progress: var_progress,
speed: var_speed,
status: var_status,
};
}
}
impl SseDecode for i32 {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
@ -318,36 +245,6 @@ impl SseDecode for Vec<(String, String)> {
}
}
impl SseDecode for crate::downloader::MyDownloaderStatus {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
let mut tag_ = <i32>::sse_decode(deserializer);
match tag_ {
0 => {
return crate::downloader::MyDownloaderStatus::NoStart;
}
1 => {
return crate::downloader::MyDownloaderStatus::Running;
}
2 => {
let mut var_field0 =
<crate::downloader::MyNetworkItemPendingType>::sse_decode(deserializer);
return crate::downloader::MyDownloaderStatus::Pending(var_field0);
}
3 => {
let mut var_field0 = <String>::sse_decode(deserializer);
return crate::downloader::MyDownloaderStatus::Error(var_field0);
}
4 => {
return crate::downloader::MyDownloaderStatus::Finished;
}
_ => {
unimplemented!("");
}
}
}
}
impl SseDecode for crate::api::http_api::MyMethod {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
@ -367,20 +264,6 @@ impl SseDecode for crate::api::http_api::MyMethod {
}
}
impl SseDecode for crate::downloader::MyNetworkItemPendingType {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
let mut inner = <i32>::sse_decode(deserializer);
return match inner {
0 => crate::downloader::MyNetworkItemPendingType::QueueUp,
1 => crate::downloader::MyNetworkItemPendingType::Starting,
2 => crate::downloader::MyNetworkItemPendingType::Stopping,
3 => crate::downloader::MyNetworkItemPendingType::Initializing,
_ => unreachable!("Invalid variant for MyNetworkItemPendingType: {}", inner),
};
}
}
impl SseDecode for Option<std::collections::HashMap<String, String>> {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
@ -429,7 +312,7 @@ impl SseDecode for crate::http_package::RustHttpResponse {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_decode(deserializer: &mut flutter_rust_bridge::for_generated::SseDeserializer) -> Self {
let mut var_statusCode = <u16>::sse_decode(deserializer);
let mut var_headers = <HashMap<String, String>>::sse_decode(deserializer);
let mut var_headers = <std::collections::HashMap<String, String>>::sse_decode(deserializer);
let mut var_url = <String>::sse_decode(deserializer);
let mut var_contentLength = <Option<u64>>::sse_decode(deserializer);
let mut var_version = <reqwest::Version>::sse_decode(deserializer);
@ -496,11 +379,9 @@ fn pde_ffi_dispatcher_primary_impl(
) {
// Codec=Pde (Serialization + dispatch), see doc to use other codecs
match func_id {
2 => wire_cancel_download_impl(port, ptr, rust_vec_len, data_len),
1 => wire_start_download_impl(port, ptr, rust_vec_len, data_len),
5 => wire_dns_lookup_txt_impl(port, ptr, rust_vec_len, data_len),
4 => wire_fetch_impl(port, ptr, rust_vec_len, data_len),
3 => wire_set_default_header_impl(port, ptr, rust_vec_len, data_len),
3 => wire_dns_lookup_txt_impl(port, ptr, rust_vec_len, data_len),
2 => wire_fetch_impl(port, ptr, rust_vec_len, data_len),
1 => wire_set_default_header_impl(port, ptr, rust_vec_len, data_len),
_ => unreachable!(),
}
}
@ -550,57 +431,6 @@ impl
}
}
// Codec=Dco (DartCObject based), see doc to use other codecs
impl flutter_rust_bridge::IntoDart for crate::downloader::DownloadCallbackData {
fn into_dart(self) -> flutter_rust_bridge::for_generated::DartAbi {
[
self.id.into_into_dart().into_dart(),
self.total.into_into_dart().into_dart(),
self.progress.into_into_dart().into_dart(),
self.speed.into_into_dart().into_dart(),
self.status.into_into_dart().into_dart(),
]
.into_dart()
}
}
impl flutter_rust_bridge::for_generated::IntoDartExceptPrimitive
for crate::downloader::DownloadCallbackData
{
}
impl flutter_rust_bridge::IntoIntoDart<crate::downloader::DownloadCallbackData>
for crate::downloader::DownloadCallbackData
{
fn into_into_dart(self) -> crate::downloader::DownloadCallbackData {
self
}
}
// Codec=Dco (DartCObject based), see doc to use other codecs
impl flutter_rust_bridge::IntoDart for crate::downloader::MyDownloaderStatus {
fn into_dart(self) -> flutter_rust_bridge::for_generated::DartAbi {
match self {
crate::downloader::MyDownloaderStatus::NoStart => [0.into_dart()].into_dart(),
crate::downloader::MyDownloaderStatus::Running => [1.into_dart()].into_dart(),
crate::downloader::MyDownloaderStatus::Pending(field0) => {
[2.into_dart(), field0.into_into_dart().into_dart()].into_dart()
}
crate::downloader::MyDownloaderStatus::Error(field0) => {
[3.into_dart(), field0.into_into_dart().into_dart()].into_dart()
}
crate::downloader::MyDownloaderStatus::Finished => [4.into_dart()].into_dart(),
}
}
}
impl flutter_rust_bridge::for_generated::IntoDartExceptPrimitive
for crate::downloader::MyDownloaderStatus
{
}
impl flutter_rust_bridge::IntoIntoDart<crate::downloader::MyDownloaderStatus>
for crate::downloader::MyDownloaderStatus
{
fn into_into_dart(self) -> crate::downloader::MyDownloaderStatus {
self
}
}
// Codec=Dco (DartCObject based), see doc to use other codecs
impl flutter_rust_bridge::IntoDart for crate::api::http_api::MyMethod {
fn into_dart(self) -> flutter_rust_bridge::for_generated::DartAbi {
match self {
@ -628,28 +458,6 @@ impl flutter_rust_bridge::IntoIntoDart<crate::api::http_api::MyMethod>
}
}
// Codec=Dco (DartCObject based), see doc to use other codecs
impl flutter_rust_bridge::IntoDart for crate::downloader::MyNetworkItemPendingType {
fn into_dart(self) -> flutter_rust_bridge::for_generated::DartAbi {
match self {
Self::QueueUp => 0.into_dart(),
Self::Starting => 1.into_dart(),
Self::Stopping => 2.into_dart(),
Self::Initializing => 3.into_dart(),
}
}
}
impl flutter_rust_bridge::for_generated::IntoDartExceptPrimitive
for crate::downloader::MyNetworkItemPendingType
{
}
impl flutter_rust_bridge::IntoIntoDart<crate::downloader::MyNetworkItemPendingType>
for crate::downloader::MyNetworkItemPendingType
{
fn into_into_dart(self) -> crate::downloader::MyNetworkItemPendingType {
self
}
}
// Codec=Dco (DartCObject based), see doc to use other codecs
impl flutter_rust_bridge::IntoDart for crate::http_package::RustHttpResponse {
fn into_dart(self) -> flutter_rust_bridge::for_generated::DartAbi {
[
@ -715,17 +523,6 @@ impl SseEncode for String {
}
}
impl SseEncode for crate::downloader::DownloadCallbackData {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {
<String>::sse_encode(self.id, serializer);
<u64>::sse_encode(self.total, serializer);
<u64>::sse_encode(self.progress, serializer);
<u64>::sse_encode(self.speed, serializer);
<crate::downloader::MyDownloaderStatus>::sse_encode(self.status, serializer);
}
}
impl SseEncode for i32 {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {
@ -763,31 +560,6 @@ impl SseEncode for Vec<(String, String)> {
}
}
impl SseEncode for crate::downloader::MyDownloaderStatus {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {
match self {
crate::downloader::MyDownloaderStatus::NoStart => {
<i32>::sse_encode(0, serializer);
}
crate::downloader::MyDownloaderStatus::Running => {
<i32>::sse_encode(1, serializer);
}
crate::downloader::MyDownloaderStatus::Pending(field0) => {
<i32>::sse_encode(2, serializer);
<crate::downloader::MyNetworkItemPendingType>::sse_encode(field0, serializer);
}
crate::downloader::MyDownloaderStatus::Error(field0) => {
<i32>::sse_encode(3, serializer);
<String>::sse_encode(field0, serializer);
}
crate::downloader::MyDownloaderStatus::Finished => {
<i32>::sse_encode(4, serializer);
}
}
}
}
impl SseEncode for crate::api::http_api::MyMethod {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {
@ -811,24 +583,6 @@ impl SseEncode for crate::api::http_api::MyMethod {
}
}
impl SseEncode for crate::downloader::MyNetworkItemPendingType {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {
<i32>::sse_encode(
match self {
crate::downloader::MyNetworkItemPendingType::QueueUp => 0,
crate::downloader::MyNetworkItemPendingType::Starting => 1,
crate::downloader::MyNetworkItemPendingType::Stopping => 2,
crate::downloader::MyNetworkItemPendingType::Initializing => 3,
_ => {
unimplemented!("");
}
},
serializer,
);
}
}
impl SseEncode for Option<std::collections::HashMap<String, String>> {
// Codec=Sse (Serialization based), see doc to use other codecs
fn sse_encode(self, serializer: &mut flutter_rust_bridge::for_generated::SseSerializer) {

View File

@ -28,6 +28,7 @@ lazy_static! {
.use_rustls_tls()
.connect_timeout(Duration::from_secs(10))
.gzip(true)
.no_proxy()
.build()
.unwrap()
};

View File

@ -1,4 +1,3 @@
mod frb_generated;
mod api;
mod downloader;
mod http_package;