aboutsummaryrefslogtreecommitdiffhomepage
path: root/crates/shirabe/src/downloader/file_downloader.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/shirabe/src/downloader/file_downloader.rs')
-rw-r--r--crates/shirabe/src/downloader/file_downloader.rs313
1 files changed, 252 insertions, 61 deletions
diff --git a/crates/shirabe/src/downloader/file_downloader.rs b/crates/shirabe/src/downloader/file_downloader.rs
index 452ce8c..dc8c5a5 100644
--- a/crates/shirabe/src/downloader/file_downloader.rs
+++ b/crates/shirabe/src/downloader/file_downloader.rs
@@ -6,7 +6,6 @@ use indexmap::IndexMap;
use std::sync::{LazyLock, Mutex};
use crate::util::Silencer;
-use shirabe_external_packages::react::promise::resolve as react_promise_resolve;
use shirabe_php_shim::{
DIRECTORY_SEPARATOR, InvalidArgumentException, PATHINFO_BASENAME, PATHINFO_EXTENSION,
PHP_URL_PATH, PhpMixed, RuntimeException, UnexpectedValueException, array_search, array_shift,
@@ -49,7 +48,7 @@ pub static DOWNLOAD_METADATA: LazyLock<Mutex<IndexMap<String, PhpMixed>>> =
/// @var array<string, array<string>>
/// @private
/// @internal
-pub static RESPONSE_HEADERS: LazyLock<Mutex<IndexMap<String, IndexMap<String, Vec<String>>>>> =
+pub static RESPONSE_HEADERS: LazyLock<Mutex<IndexMap<String, Vec<String>>>> =
LazyLock::new(|| Mutex::new(IndexMap::new()));
/// Base downloader for files
@@ -70,7 +69,12 @@ pub struct FileDownloader {
/// @var ProcessExecutor
pub(crate) process: std::rc::Rc<std::cell::RefCell<ProcessExecutor>>,
/// @var array<string, string> Map of package name to cache key
- last_cache_writes: IndexMap<String, String>,
+ ///
+ /// Behind a Mutex so `download()` can record cache writes through `&self`. `download()` blocks
+ /// for a while and (once the workers are parallelized) runs from several threads; requiring
+ /// `&mut self` would force locking the whole FileDownloader for each download's duration. Only
+ /// this write needs guarding, so it is the only field isolated behind a lock.
+ last_cache_writes: Mutex<IndexMap<String, String>>,
/// @var array<string, string[]> Map of package name to list of paths
additional_cleanup_paths: IndexMap<String, Vec<String>>,
}
@@ -115,7 +119,7 @@ impl FileDownloader {
cache,
process,
filesystem,
- last_cache_writes: IndexMap::new(),
+ last_cache_writes: Mutex::new(IndexMap::new()),
additional_cleanup_paths: IndexMap::new(),
};
@@ -140,6 +144,7 @@ impl FileDownloader {
}
}
+#[async_trait::async_trait(?Send)]
impl DownloaderInterface for FileDownloader {
/// @inheritDoc
fn get_installation_source(&self) -> String {
@@ -199,16 +204,223 @@ impl DownloaderInterface for FileDownloader {
.borrow_mut()
.ensure_directory_exists(&dir_of_file)?;
- // TODO(phase-c-promise): rewrite the accept/reject/retry promise orchestration as an async loop.
- // TODO(plugin): inline closures rely on captured $accept/$reject/$urls/$retries. In Rust
- // we'd need a struct holding shared state — left as a phase-b refactor.
- let _ = (output, &urls, &mut retries, cache_key_generator, &file_name);
- let _ = PluginEvents::PRE_FILE_DOWNLOAD;
- let _ = PluginEvents::POST_FILE_DOWNLOAD;
+ // The PHP $download/$accept/$reject closures form a retry loop driven by recursion; here it
+ // is expressed as a loop. $reject's "return $download()" maps to `continue`, "throw" to
+ // `return Err`, and the success path runs the verification block and returns the file name.
+ let _ = cache_key_generator;
+ loop {
+ // === $download() ===
+ let url = urls[0].clone();
+ // TODO(plugin): dispatch PreFileDownloadEvent and apply its custom cache key / processed url.
+ urls[0] = url.clone();
- todo!(
- "phase-b: orchestrate download/accept/reject closures and call download() returning a PromiseInterface"
- )
+ let checksum = package.get_dist_sha1_checksum().map(|s| s.to_string());
+ let cache_key = url.cache_key.clone();
+
+ // use from cache if it is present and has a valid checksum or we have no checksum to check against
+ let mut from_cache = false;
+ if let Some(cache) = self.cache.clone() {
+ let checksum_matches = match checksum.as_deref() {
+ None | Some("") => true,
+ Some(c) => Some(c) == cache.borrow_mut().sha1(&cache_key).as_deref(),
+ };
+ if checksum_matches && cache.borrow_mut().copy_to(&cache_key, &file_name)? {
+ from_cache = true;
+ }
+ }
+
+ if from_cache {
+ if output {
+ self.io.write_error3(
+ &format!(
+ " - Loading <info>{}</info> (<comment>{}</comment>) from cache",
+ package.get_name(),
+ package.get_full_pretty_version(true, 0)
+ ),
+ true,
+ io_interface::VERY_VERBOSE,
+ );
+ }
+ // mark the file as having been written in cache even though it is only read from cache, so that if
+ // the cache is corrupt the archive will be deleted and the next attempt will re-download it
+ // see https://github.com/composer/composer/issues/10028
+ if let Some(cache) = self.cache.as_ref() {
+ if !cache.borrow().is_read_only() {
+ self.last_cache_writes
+ .lock()
+ .unwrap()
+ .insert(package.get_name().to_string(), cache_key.clone());
+ }
+ }
+ } else {
+ if output {
+ self.io.write_error(&format!(
+ " - Downloading <info>{}</info> (<comment>{}</comment>)",
+ package.get_name(),
+ package.get_full_pretty_version(true, 0)
+ ));
+ }
+
+ let add_copy_result = self
+ .http_downloader
+ .borrow_mut()
+ .add_copy(&url.processed, &file_name, package.get_transport_options())
+ .await;
+ match add_copy_result {
+ Ok(mut response) => {
+ // === $accept($response) ===
+ let cache_key = urls[0].cache_key.clone();
+ let file_size = match filesize(&file_name) {
+ Some(size) => PhpMixed::Int(size),
+ None => PhpMixed::String(
+ response
+ .get_header("Content-Length")
+ .unwrap_or_else(|| "?".to_string()),
+ ),
+ };
+ DOWNLOAD_METADATA
+ .lock()
+ .unwrap()
+ .insert(package.get_name().to_string(), file_size);
+
+ if Platform::get_env("GITHUB_ACTIONS").is_some()
+ && Platform::get_env("COMPOSER_TESTS_ARE_RUNNING").is_none()
+ {
+ RESPONSE_HEADERS.lock().unwrap().insert(
+ package.get_name().to_string(),
+ response.get_headers().clone(),
+ );
+ }
+
+ if let Some(cache) = self.cache.as_ref() {
+ if !cache.borrow().is_read_only() {
+ self.last_cache_writes
+ .lock()
+ .unwrap()
+ .insert(package.get_name().to_string(), cache_key.clone());
+ cache.borrow_mut().copy_from(&cache_key, &file_name);
+ }
+ }
+
+ response.collect();
+ }
+ Err(e) => {
+ // === $reject($e) ===
+ // clean up
+ if file_exists(&file_name) {
+ self.filesystem.borrow().unlink(&file_name)?;
+ }
+ self.clear_last_cache_write(package);
+
+ if e.downcast_ref::<IrrecoverableDownloadException>().is_some() {
+ return Err(e);
+ }
+
+ if e.downcast_ref::<MaxFileSizeExceededException>().is_some() {
+ return Err(e);
+ }
+
+ if let Some(te) = e.downcast_ref::<TransportException>() {
+ // if we got an http response with a proper code, then requesting again will probably not help, abort
+ if 0 != te.get_code()
+ && !in_array(
+ PhpMixed::Int(te.get_code()),
+ &PhpMixed::List(vec![
+ Box::new(PhpMixed::Int(500)),
+ Box::new(PhpMixed::Int(502)),
+ Box::new(PhpMixed::Int(503)),
+ Box::new(PhpMixed::Int(504)),
+ ]),
+ true,
+ )
+ {
+ retries = 0;
+ }
+
+ // special error code returned when network is being artificially disabled
+ if te.get_status_code() == Some(499) {
+ retries = 0;
+ urls.clear();
+ }
+ }
+
+ if retries > 0 {
+ usleep(500000);
+ retries -= 1;
+
+ continue;
+ }
+
+ if !urls.is_empty() {
+ urls.remove(0);
+ }
+ if urls.len() > 0 {
+ let code = e
+ .downcast_ref::<TransportException>()
+ .map_or(0, |te| te.get_code());
+ if self.io.is_debug() {
+ self.io.write_error(&format!(
+ " Failed downloading {}: [{}] {}: {}",
+ package.get_name(),
+ get_class(&PhpMixed::Null),
+ code,
+ e
+ ));
+ self.io.write_error(&format!(
+ " Trying the next URL for {}",
+ package.get_name()
+ ));
+ } else {
+ self.io.write_error(&format!(
+ " Failed downloading {}, trying the next URL ({}: {})",
+ package.get_name(),
+ code,
+ e
+ ));
+ }
+
+ retries = 3;
+ usleep(100000);
+
+ continue;
+ }
+
+ return Err(e);
+ }
+ }
+ }
+
+ // === $result->then(verify) ===
+ if !file_exists(&file_name) {
+ return Err(UnexpectedValueException {
+ message: format!(
+ "{} could not be saved to {}, make sure the directory is writable and you have internet connectivity",
+ url.base, file_name
+ ),
+ code: 0,
+ }
+ .into());
+ }
+
+ if let Some(checksum) = checksum.as_deref() {
+ if !checksum.is_empty()
+ && hash_file("sha1", &file_name).as_deref() != Some(checksum)
+ {
+ return Err(UnexpectedValueException {
+ message: format!(
+ "The checksum verification of the file failed (downloaded from {})",
+ url.base
+ ),
+ code: 0,
+ }
+ .into());
+ }
+ }
+
+ // TODO(plugin): dispatch PostFileDownloadEvent.
+
+ return Ok(Some(PhpMixed::String(file_name)));
+ }
}
/// @inheritDoc
@@ -393,70 +605,52 @@ impl ChangeReportInterface for FileDownloader {
package: &dyn PackageInterface,
path: &str,
) -> Result<Option<String>> {
- // TODO(phase-c-promise): get_local_changes drives promises via http_downloader/process wait();
- // converting requires deciding whether ChangeReportInterface::get_local_changes becomes async. Left as-is.
// TODO(phase-b): swap self.io to NullIO and restore — needs a take/swap helper
let mut null_io = NullIO::new();
null_io.load_configuration(&mut *self.config.borrow_mut())?;
- // TODO(phase-b): `e` is captured by both the inner closure (assignment in error handler)
- // and the outer block (read after the closure). PHP closures capture by reference (`use (&$e)`);
- // emulate via Rc<RefCell> or restructure when proper async/promise types land.
- let e: std::cell::RefCell<Option<anyhow::Error>> = std::cell::RefCell::new(None);
- let output_cell: std::cell::RefCell<String> = std::cell::RefCell::new(String::new());
let target_dir = Filesystem::trim_trailing_slash(path);
- let result: Result<()> = (|| -> Result<()> {
+ // PHP attaches an onRejected handler to capture the error and drives the promise via
+ // httpDownloader->wait() / process->wait(); the single-threaded sync bridge block_on's the
+ // download/install futures, so a rejection surfaces directly as the Err captured below.
+ let result: Result<String> = (|| -> Result<String> {
if is_dir(&format!("{}_compare", target_dir)) {
self.filesystem
.borrow_mut()
.remove_directory(&format!("{}_compare", target_dir))?;
}
- let promise =
- self.download(package, &format!("{}_compare", target_dir), None, false)?;
- promise.then_with(
- None,
- Some(Box::new(|ex: PhpMixed| {
- let _ = ex;
- PhpMixed::Null
- })),
- );
- self.http_downloader.borrow_mut().wait()?;
- if e.borrow().is_some() {
- return Err(e.borrow_mut().take().unwrap());
- }
- let promise = self.install(package, &format!("{}_compare", target_dir), false)?;
- promise.then_with(
- None,
- Some(Box::new(|ex: PhpMixed| {
- let _ = ex;
- PhpMixed::Null
- })),
- );
- self.process.borrow_mut().wait()?;
- if e.borrow().is_some() {
- return Err(e.borrow_mut().take().unwrap());
- }
+ tokio::runtime::Runtime::new()
+ .unwrap()
+ .block_on(self.download(
+ package,
+ &format!("{}_compare", target_dir),
+ None,
+ false,
+ ))?;
+ tokio::runtime::Runtime::new()
+ .unwrap()
+ .block_on(self.install(package, &format!("{}_compare", target_dir), false))?;
let mut comparer = Comparer::new();
comparer.set_source(format!("{}_compare", target_dir));
comparer.set_update(target_dir.clone());
comparer.do_compare();
- *output_cell.borrow_mut() = comparer.get_changed_as_string(true, false);
+ let output = comparer.get_changed_as_string(true, false);
self.filesystem
.borrow_mut()
.remove_directory(&format!("{}_compare", target_dir))?;
- Ok(())
+ Ok(output)
})();
- if let Err(err) = result {
- *e.borrow_mut() = Some(err);
- }
- let e = e.into_inner();
- let output = output_cell.into_inner();
// TODO(phase-b): restore self.io = prev_io
+ let (e, output) = match result {
+ Ok(output) => (None, output),
+ Err(err) => (Some(err), String::new()),
+ };
+
if let Some(err) = e {
if self.io.is_debug() {
return Err(err);
@@ -499,15 +693,12 @@ impl FileDownloader {
.to_string()
}
- pub(crate) fn clear_last_cache_write(&mut self, package: &dyn PackageInterface) {
- if self.cache.is_some() && self.last_cache_writes.contains_key(package.get_name()) {
- let key = self
- .last_cache_writes
- .get(package.get_name())
- .unwrap()
- .clone();
+ pub(crate) fn clear_last_cache_write(&self, package: &dyn PackageInterface) {
+ let mut last_cache_writes = self.last_cache_writes.lock().unwrap();
+ if self.cache.is_some() && last_cache_writes.contains_key(package.get_name()) {
+ let key = last_cache_writes.get(package.get_name()).unwrap().clone();
self.cache.as_ref().unwrap().borrow_mut().remove(&key);
- self.last_cache_writes.shift_remove(package.get_name());
+ last_cache_writes.shift_remove(package.get_name());
}
}