diff options
Diffstat (limited to 'crates/shirabe/src/downloader/file_downloader.rs')
| -rw-r--r-- | crates/shirabe/src/downloader/file_downloader.rs | 313 |
1 files changed, 252 insertions, 61 deletions
diff --git a/crates/shirabe/src/downloader/file_downloader.rs b/crates/shirabe/src/downloader/file_downloader.rs index 452ce8c..dc8c5a5 100644 --- a/crates/shirabe/src/downloader/file_downloader.rs +++ b/crates/shirabe/src/downloader/file_downloader.rs @@ -6,7 +6,6 @@ use indexmap::IndexMap; use std::sync::{LazyLock, Mutex}; use crate::util::Silencer; -use shirabe_external_packages::react::promise::resolve as react_promise_resolve; use shirabe_php_shim::{ DIRECTORY_SEPARATOR, InvalidArgumentException, PATHINFO_BASENAME, PATHINFO_EXTENSION, PHP_URL_PATH, PhpMixed, RuntimeException, UnexpectedValueException, array_search, array_shift, @@ -49,7 +48,7 @@ pub static DOWNLOAD_METADATA: LazyLock<Mutex<IndexMap<String, PhpMixed>>> = /// @var array<string, array<string>> /// @private /// @internal -pub static RESPONSE_HEADERS: LazyLock<Mutex<IndexMap<String, IndexMap<String, Vec<String>>>>> = +pub static RESPONSE_HEADERS: LazyLock<Mutex<IndexMap<String, Vec<String>>>> = LazyLock::new(|| Mutex::new(IndexMap::new())); /// Base downloader for files @@ -70,7 +69,12 @@ pub struct FileDownloader { /// @var ProcessExecutor pub(crate) process: std::rc::Rc<std::cell::RefCell<ProcessExecutor>>, /// @var array<string, string> Map of package name to cache key - last_cache_writes: IndexMap<String, String>, + /// + /// Behind a Mutex so `download()` can record cache writes through `&self`. `download()` blocks + /// for a while and (once the workers are parallelized) runs from several threads; requiring + /// `&mut self` would force locking the whole FileDownloader for each download's duration. Only + /// this write needs guarding, so it is the only field isolated behind a lock. + last_cache_writes: Mutex<IndexMap<String, String>>, /// @var array<string, string[]> Map of package name to list of paths additional_cleanup_paths: IndexMap<String, Vec<String>>, } @@ -115,7 +119,7 @@ impl FileDownloader { cache, process, filesystem, - last_cache_writes: IndexMap::new(), + last_cache_writes: Mutex::new(IndexMap::new()), additional_cleanup_paths: IndexMap::new(), }; @@ -140,6 +144,7 @@ impl FileDownloader { } } +#[async_trait::async_trait(?Send)] impl DownloaderInterface for FileDownloader { /// @inheritDoc fn get_installation_source(&self) -> String { @@ -199,16 +204,223 @@ impl DownloaderInterface for FileDownloader { .borrow_mut() .ensure_directory_exists(&dir_of_file)?; - // TODO(phase-c-promise): rewrite the accept/reject/retry promise orchestration as an async loop. - // TODO(plugin): inline closures rely on captured $accept/$reject/$urls/$retries. In Rust - // we'd need a struct holding shared state — left as a phase-b refactor. - let _ = (output, &urls, &mut retries, cache_key_generator, &file_name); - let _ = PluginEvents::PRE_FILE_DOWNLOAD; - let _ = PluginEvents::POST_FILE_DOWNLOAD; + // The PHP $download/$accept/$reject closures form a retry loop driven by recursion; here it + // is expressed as a loop. $reject's "return $download()" maps to `continue`, "throw" to + // `return Err`, and the success path runs the verification block and returns the file name. + let _ = cache_key_generator; + loop { + // === $download() === + let url = urls[0].clone(); + // TODO(plugin): dispatch PreFileDownloadEvent and apply its custom cache key / processed url. + urls[0] = url.clone(); - todo!( - "phase-b: orchestrate download/accept/reject closures and call download() returning a PromiseInterface" - ) + let checksum = package.get_dist_sha1_checksum().map(|s| s.to_string()); + let cache_key = url.cache_key.clone(); + + // use from cache if it is present and has a valid checksum or we have no checksum to check against + let mut from_cache = false; + if let Some(cache) = self.cache.clone() { + let checksum_matches = match checksum.as_deref() { + None | Some("") => true, + Some(c) => Some(c) == cache.borrow_mut().sha1(&cache_key).as_deref(), + }; + if checksum_matches && cache.borrow_mut().copy_to(&cache_key, &file_name)? { + from_cache = true; + } + } + + if from_cache { + if output { + self.io.write_error3( + &format!( + " - Loading <info>{}</info> (<comment>{}</comment>) from cache", + package.get_name(), + package.get_full_pretty_version(true, 0) + ), + true, + io_interface::VERY_VERBOSE, + ); + } + // mark the file as having been written in cache even though it is only read from cache, so that if + // the cache is corrupt the archive will be deleted and the next attempt will re-download it + // see https://github.com/composer/composer/issues/10028 + if let Some(cache) = self.cache.as_ref() { + if !cache.borrow().is_read_only() { + self.last_cache_writes + .lock() + .unwrap() + .insert(package.get_name().to_string(), cache_key.clone()); + } + } + } else { + if output { + self.io.write_error(&format!( + " - Downloading <info>{}</info> (<comment>{}</comment>)", + package.get_name(), + package.get_full_pretty_version(true, 0) + )); + } + + let add_copy_result = self + .http_downloader + .borrow_mut() + .add_copy(&url.processed, &file_name, package.get_transport_options()) + .await; + match add_copy_result { + Ok(mut response) => { + // === $accept($response) === + let cache_key = urls[0].cache_key.clone(); + let file_size = match filesize(&file_name) { + Some(size) => PhpMixed::Int(size), + None => PhpMixed::String( + response + .get_header("Content-Length") + .unwrap_or_else(|| "?".to_string()), + ), + }; + DOWNLOAD_METADATA + .lock() + .unwrap() + .insert(package.get_name().to_string(), file_size); + + if Platform::get_env("GITHUB_ACTIONS").is_some() + && Platform::get_env("COMPOSER_TESTS_ARE_RUNNING").is_none() + { + RESPONSE_HEADERS.lock().unwrap().insert( + package.get_name().to_string(), + response.get_headers().clone(), + ); + } + + if let Some(cache) = self.cache.as_ref() { + if !cache.borrow().is_read_only() { + self.last_cache_writes + .lock() + .unwrap() + .insert(package.get_name().to_string(), cache_key.clone()); + cache.borrow_mut().copy_from(&cache_key, &file_name); + } + } + + response.collect(); + } + Err(e) => { + // === $reject($e) === + // clean up + if file_exists(&file_name) { + self.filesystem.borrow().unlink(&file_name)?; + } + self.clear_last_cache_write(package); + + if e.downcast_ref::<IrrecoverableDownloadException>().is_some() { + return Err(e); + } + + if e.downcast_ref::<MaxFileSizeExceededException>().is_some() { + return Err(e); + } + + if let Some(te) = e.downcast_ref::<TransportException>() { + // if we got an http response with a proper code, then requesting again will probably not help, abort + if 0 != te.get_code() + && !in_array( + PhpMixed::Int(te.get_code()), + &PhpMixed::List(vec![ + Box::new(PhpMixed::Int(500)), + Box::new(PhpMixed::Int(502)), + Box::new(PhpMixed::Int(503)), + Box::new(PhpMixed::Int(504)), + ]), + true, + ) + { + retries = 0; + } + + // special error code returned when network is being artificially disabled + if te.get_status_code() == Some(499) { + retries = 0; + urls.clear(); + } + } + + if retries > 0 { + usleep(500000); + retries -= 1; + + continue; + } + + if !urls.is_empty() { + urls.remove(0); + } + if urls.len() > 0 { + let code = e + .downcast_ref::<TransportException>() + .map_or(0, |te| te.get_code()); + if self.io.is_debug() { + self.io.write_error(&format!( + " Failed downloading {}: [{}] {}: {}", + package.get_name(), + get_class(&PhpMixed::Null), + code, + e + )); + self.io.write_error(&format!( + " Trying the next URL for {}", + package.get_name() + )); + } else { + self.io.write_error(&format!( + " Failed downloading {}, trying the next URL ({}: {})", + package.get_name(), + code, + e + )); + } + + retries = 3; + usleep(100000); + + continue; + } + + return Err(e); + } + } + } + + // === $result->then(verify) === + if !file_exists(&file_name) { + return Err(UnexpectedValueException { + message: format!( + "{} could not be saved to {}, make sure the directory is writable and you have internet connectivity", + url.base, file_name + ), + code: 0, + } + .into()); + } + + if let Some(checksum) = checksum.as_deref() { + if !checksum.is_empty() + && hash_file("sha1", &file_name).as_deref() != Some(checksum) + { + return Err(UnexpectedValueException { + message: format!( + "The checksum verification of the file failed (downloaded from {})", + url.base + ), + code: 0, + } + .into()); + } + } + + // TODO(plugin): dispatch PostFileDownloadEvent. + + return Ok(Some(PhpMixed::String(file_name))); + } } /// @inheritDoc @@ -393,70 +605,52 @@ impl ChangeReportInterface for FileDownloader { package: &dyn PackageInterface, path: &str, ) -> Result<Option<String>> { - // TODO(phase-c-promise): get_local_changes drives promises via http_downloader/process wait(); - // converting requires deciding whether ChangeReportInterface::get_local_changes becomes async. Left as-is. // TODO(phase-b): swap self.io to NullIO and restore — needs a take/swap helper let mut null_io = NullIO::new(); null_io.load_configuration(&mut *self.config.borrow_mut())?; - // TODO(phase-b): `e` is captured by both the inner closure (assignment in error handler) - // and the outer block (read after the closure). PHP closures capture by reference (`use (&$e)`); - // emulate via Rc<RefCell> or restructure when proper async/promise types land. - let e: std::cell::RefCell<Option<anyhow::Error>> = std::cell::RefCell::new(None); - let output_cell: std::cell::RefCell<String> = std::cell::RefCell::new(String::new()); let target_dir = Filesystem::trim_trailing_slash(path); - let result: Result<()> = (|| -> Result<()> { + // PHP attaches an onRejected handler to capture the error and drives the promise via + // httpDownloader->wait() / process->wait(); the single-threaded sync bridge block_on's the + // download/install futures, so a rejection surfaces directly as the Err captured below. + let result: Result<String> = (|| -> Result<String> { if is_dir(&format!("{}_compare", target_dir)) { self.filesystem .borrow_mut() .remove_directory(&format!("{}_compare", target_dir))?; } - let promise = - self.download(package, &format!("{}_compare", target_dir), None, false)?; - promise.then_with( - None, - Some(Box::new(|ex: PhpMixed| { - let _ = ex; - PhpMixed::Null - })), - ); - self.http_downloader.borrow_mut().wait()?; - if e.borrow().is_some() { - return Err(e.borrow_mut().take().unwrap()); - } - let promise = self.install(package, &format!("{}_compare", target_dir), false)?; - promise.then_with( - None, - Some(Box::new(|ex: PhpMixed| { - let _ = ex; - PhpMixed::Null - })), - ); - self.process.borrow_mut().wait()?; - if e.borrow().is_some() { - return Err(e.borrow_mut().take().unwrap()); - } + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.download( + package, + &format!("{}_compare", target_dir), + None, + false, + ))?; + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.install(package, &format!("{}_compare", target_dir), false))?; let mut comparer = Comparer::new(); comparer.set_source(format!("{}_compare", target_dir)); comparer.set_update(target_dir.clone()); comparer.do_compare(); - *output_cell.borrow_mut() = comparer.get_changed_as_string(true, false); + let output = comparer.get_changed_as_string(true, false); self.filesystem .borrow_mut() .remove_directory(&format!("{}_compare", target_dir))?; - Ok(()) + Ok(output) })(); - if let Err(err) = result { - *e.borrow_mut() = Some(err); - } - let e = e.into_inner(); - let output = output_cell.into_inner(); // TODO(phase-b): restore self.io = prev_io + let (e, output) = match result { + Ok(output) => (None, output), + Err(err) => (Some(err), String::new()), + }; + if let Some(err) = e { if self.io.is_debug() { return Err(err); @@ -499,15 +693,12 @@ impl FileDownloader { .to_string() } - pub(crate) fn clear_last_cache_write(&mut self, package: &dyn PackageInterface) { - if self.cache.is_some() && self.last_cache_writes.contains_key(package.get_name()) { - let key = self - .last_cache_writes - .get(package.get_name()) - .unwrap() - .clone(); + pub(crate) fn clear_last_cache_write(&self, package: &dyn PackageInterface) { + let mut last_cache_writes = self.last_cache_writes.lock().unwrap(); + if self.cache.is_some() && last_cache_writes.contains_key(package.get_name()) { + let key = last_cache_writes.get(package.get_name()).unwrap().clone(); self.cache.as_ref().unwrap().borrow_mut().remove(&key); - self.last_cache_writes.shift_remove(package.get_name()); + last_cache_writes.shift_remove(package.get_name()); } } |
