diff options
Diffstat (limited to 'crates/shirabe')
31 files changed, 1336 insertions, 893 deletions
diff --git a/crates/shirabe/src/downloader/archive_downloader.rs b/crates/shirabe/src/downloader/archive_downloader.rs index ac898d5..6c8a8f9 100644 --- a/crates/shirabe/src/downloader/archive_downloader.rs +++ b/crates/shirabe/src/downloader/archive_downloader.rs @@ -2,15 +2,17 @@ use anyhow::Result; use indexmap::IndexMap; -use shirabe_external_packages::symfony::component::finder::Finder; +use shirabe_external_packages::symfony::component::finder::{Finder, SplFileInfo}; use shirabe_php_shim::{ - DIRECTORY_SEPARATOR, RuntimeException, bin2hex, file_exists, is_dir, random_bytes, realpath, + DIRECTORY_SEPARATOR, PhpMixed, RuntimeException, basename, bin2hex, file_exists, is_dir, + random_bytes, realpath, }; use crate::dependency_resolver::operation::InstallOperation; use crate::downloader::DownloaderInterface; use crate::downloader::FileDownloader; use crate::package::PackageInterface; +use crate::util::Filesystem; use crate::util::Platform; pub trait ArchiveDownloader { @@ -122,22 +124,79 @@ pub trait ArchiveDownloader { .ensure_directory_exists(&temporary_dir); let file_name = self.inner().get_file_name(package, path); - let _ = file_name; + match self.extract(package, &file_name, &temporary_dir).await { + Err(e) => { + install_cleanup(self.inner_mut(), package, path, &temporary_dir)?; + Err(e) + } + Ok(_) => { + if file_exists(&file_name) { + self.inner().filesystem.borrow().unlink(&file_name)?; + } + + let mut rename_as_one = false; + if !file_exists(path) { + rename_as_one = true; + } else if self.inner().filesystem.borrow().is_dir_empty(path) { + let removed = self + .inner() + .filesystem + .borrow_mut() + .remove_directory_php(path); + match removed { + Ok(true) => { + rename_as_one = true; + } + Ok(false) => {} + Err(e) => { + // ignore error, and simply do not renameAsOne + if e.downcast_ref::<RuntimeException>().is_none() { + return Err(e); + } + } + } + } + + let content_dir = get_folder_content(&temporary_dir); + let single_dir_at_top_level = content_dir.len() == 1 + && content_dir + .first() + .map(|file| is_dir(&file.get_pathname())) + .unwrap_or(false); + + if rename_as_one { + // if the target $path is clear, we can rename the whole package in one go instead of looping over the contents + let extracted_dir = if single_dir_at_top_level { + content_dir.first().unwrap().get_pathname() + } else { + temporary_dir.clone() + }; + self.inner() + .filesystem + .borrow_mut() + .rename(&extracted_dir, path)?; + } else { + // only one dir in the archive, extract its contents out of it + let mut from = temporary_dir.clone(); + if single_dir_at_top_level { + from = content_dir.first().unwrap().get_pathname(); + } - // TODO(phase-c-promise): rewrite extract().then(onFulfilled/onRejected) + renameRecursively chain as an await sequence - let promise = self.extract(package, "", &temporary_dir)?; + rename_recursively(&self.inner().filesystem, package, &from, path)?; + } - // TODO(phase-b): the original PHP chains React promise `.then(onFulfilled, onRejected)` - // callbacks that capture `$this`, `$filesystem`, `$package`, `$path`, `$temporaryDir`, - // `$fileName`, and a recursive `$renameRecursively` closure. PromiseInterface::then in - // Rust expects `FnOnce(Option<PhpMixed>) -> Option<PhpMixed>` and the callbacks here - // need both `&mut self` access and to return another promise. This needs a structural - // rework (likely splitting the trait or adding a `then_boxed_result` adapter), plus a - // way to share `&mut self` with the closure (probably `Rc<RefCell<...>>`). - let _ = (&promise, &temporary_dir, package, path); - todo!( - "ArchiveDownloader::install: rewire .then(onFulfilled, onRejected) chain to match PromiseInterface signature" - ) + self.inner() + .filesystem + .borrow_mut() + .remove_directory_async(&temporary_dir) + .await?; + self.inner_mut() + .remove_cleanup_path(package, &temporary_dir); + self.inner_mut().remove_cleanup_path(package, path); + + Ok(None) + } + } } /// @inheritDoc @@ -145,3 +204,87 @@ pub trait ArchiveDownloader { ": Extracting archive" } } + +fn install_cleanup( + inner: &mut FileDownloader, + package: &dyn PackageInterface, + path: &str, + temporary_dir: &str, +) -> Result<()> { + // remove cache if the file was corrupted + inner.clear_last_cache_write(package); + + // clean up + inner + .filesystem + .borrow_mut() + .remove_directory(temporary_dir)?; + if is_dir(path) && realpath(path) != Some(Platform::get_cwd(false).unwrap_or_default()) { + inner.filesystem.borrow_mut().remove_directory(path)?; + } + inner.remove_cleanup_path(package, temporary_dir); + let realpath = realpath(path); + if let Some(realpath) = realpath { + inner.remove_cleanup_path(package, &realpath); + } + + Ok(()) +} + +/// Returns the folder content, excluding .DS_Store +fn get_folder_content(dir: &str) -> Vec<SplFileInfo> { + let mut finder = Finder::create(); + finder + .ignore_vcs(false) + .ignore_dot_files(false) + .not_name(".DS_Store") + .depth(0) + .r#in(dir); + + finder.iter().collect() +} + +/// Renames (and recursively merges if needed) a folder into another one +/// +/// For custom installers, where packages may share paths, and given Composer 2's parallelism, we need to make sure +/// that the source directory gets merged into the target one if the target exists. Otherwise rename() by default would +/// put the source into the target e.g. src/ => target/src/ (assuming target exists) instead of src/ => target/ +fn rename_recursively( + filesystem: &std::rc::Rc<std::cell::RefCell<Filesystem>>, + package: &dyn PackageInterface, + from: &str, + to: &str, +) -> Result<()> { + let content_dir = get_folder_content(from); + + // move files back out of the temp dir + for file in &content_dir { + let file = file.get_pathname(); + if is_dir(&format!("{}/{}", to, basename(&file))) { + if !is_dir(&file) { + return Err(RuntimeException { + message: format!( + "Installing {} would lead to overwriting the {}/{} directory with a file from the package, invalid operation.", + package, + to, + basename(&file) + ), + code: 0, + } + .into()); + } + rename_recursively( + filesystem, + package, + &file, + &format!("{}/{}", to, basename(&file)), + )?; + } else { + filesystem + .borrow_mut() + .rename(&file, &format!("{}/{}", to, basename(&file)))?; + } + } + + Ok(()) +} diff --git a/crates/shirabe/src/downloader/downloader_interface.rs b/crates/shirabe/src/downloader/downloader_interface.rs index 7b333e6..18026a4 100644 --- a/crates/shirabe/src/downloader/downloader_interface.rs +++ b/crates/shirabe/src/downloader/downloader_interface.rs @@ -3,6 +3,7 @@ use crate::package::PackageInterface; use shirabe_php_shim::PhpMixed; +#[async_trait::async_trait(?Send)] pub trait DownloaderInterface: std::fmt::Debug { fn get_installation_source(&self) -> String; @@ -21,7 +22,7 @@ pub trait DownloaderInterface: std::fmt::Debug { path: &str, prev_package: Option<&dyn PackageInterface>, ) -> anyhow::Result<Option<PhpMixed>> { - self.download(package, path, prev_package, true) + self.download(package, path, prev_package, true).await } async fn prepare( @@ -45,7 +46,7 @@ pub trait DownloaderInterface: std::fmt::Debug { package: &dyn PackageInterface, path: &str, ) -> anyhow::Result<Option<PhpMixed>> { - self.install(package, path, true) + self.install(package, path, true).await } async fn update( @@ -68,7 +69,7 @@ pub trait DownloaderInterface: std::fmt::Debug { package: &dyn PackageInterface, path: &str, ) -> anyhow::Result<Option<PhpMixed>> { - self.remove(package, path, true) + self.remove(package, path, true).await } async fn cleanup( diff --git a/crates/shirabe/src/downloader/file_downloader.rs b/crates/shirabe/src/downloader/file_downloader.rs index 452ce8c..dc8c5a5 100644 --- a/crates/shirabe/src/downloader/file_downloader.rs +++ b/crates/shirabe/src/downloader/file_downloader.rs @@ -6,7 +6,6 @@ use indexmap::IndexMap; use std::sync::{LazyLock, Mutex}; use crate::util::Silencer; -use shirabe_external_packages::react::promise::resolve as react_promise_resolve; use shirabe_php_shim::{ DIRECTORY_SEPARATOR, InvalidArgumentException, PATHINFO_BASENAME, PATHINFO_EXTENSION, PHP_URL_PATH, PhpMixed, RuntimeException, UnexpectedValueException, array_search, array_shift, @@ -49,7 +48,7 @@ pub static DOWNLOAD_METADATA: LazyLock<Mutex<IndexMap<String, PhpMixed>>> = /// @var array<string, array<string>> /// @private /// @internal -pub static RESPONSE_HEADERS: LazyLock<Mutex<IndexMap<String, IndexMap<String, Vec<String>>>>> = +pub static RESPONSE_HEADERS: LazyLock<Mutex<IndexMap<String, Vec<String>>>> = LazyLock::new(|| Mutex::new(IndexMap::new())); /// Base downloader for files @@ -70,7 +69,12 @@ pub struct FileDownloader { /// @var ProcessExecutor pub(crate) process: std::rc::Rc<std::cell::RefCell<ProcessExecutor>>, /// @var array<string, string> Map of package name to cache key - last_cache_writes: IndexMap<String, String>, + /// + /// Behind a Mutex so `download()` can record cache writes through `&self`. `download()` blocks + /// for a while and (once the workers are parallelized) runs from several threads; requiring + /// `&mut self` would force locking the whole FileDownloader for each download's duration. Only + /// this write needs guarding, so it is the only field isolated behind a lock. + last_cache_writes: Mutex<IndexMap<String, String>>, /// @var array<string, string[]> Map of package name to list of paths additional_cleanup_paths: IndexMap<String, Vec<String>>, } @@ -115,7 +119,7 @@ impl FileDownloader { cache, process, filesystem, - last_cache_writes: IndexMap::new(), + last_cache_writes: Mutex::new(IndexMap::new()), additional_cleanup_paths: IndexMap::new(), }; @@ -140,6 +144,7 @@ impl FileDownloader { } } +#[async_trait::async_trait(?Send)] impl DownloaderInterface for FileDownloader { /// @inheritDoc fn get_installation_source(&self) -> String { @@ -199,16 +204,223 @@ impl DownloaderInterface for FileDownloader { .borrow_mut() .ensure_directory_exists(&dir_of_file)?; - // TODO(phase-c-promise): rewrite the accept/reject/retry promise orchestration as an async loop. - // TODO(plugin): inline closures rely on captured $accept/$reject/$urls/$retries. In Rust - // we'd need a struct holding shared state — left as a phase-b refactor. - let _ = (output, &urls, &mut retries, cache_key_generator, &file_name); - let _ = PluginEvents::PRE_FILE_DOWNLOAD; - let _ = PluginEvents::POST_FILE_DOWNLOAD; + // The PHP $download/$accept/$reject closures form a retry loop driven by recursion; here it + // is expressed as a loop. $reject's "return $download()" maps to `continue`, "throw" to + // `return Err`, and the success path runs the verification block and returns the file name. + let _ = cache_key_generator; + loop { + // === $download() === + let url = urls[0].clone(); + // TODO(plugin): dispatch PreFileDownloadEvent and apply its custom cache key / processed url. + urls[0] = url.clone(); - todo!( - "phase-b: orchestrate download/accept/reject closures and call download() returning a PromiseInterface" - ) + let checksum = package.get_dist_sha1_checksum().map(|s| s.to_string()); + let cache_key = url.cache_key.clone(); + + // use from cache if it is present and has a valid checksum or we have no checksum to check against + let mut from_cache = false; + if let Some(cache) = self.cache.clone() { + let checksum_matches = match checksum.as_deref() { + None | Some("") => true, + Some(c) => Some(c) == cache.borrow_mut().sha1(&cache_key).as_deref(), + }; + if checksum_matches && cache.borrow_mut().copy_to(&cache_key, &file_name)? { + from_cache = true; + } + } + + if from_cache { + if output { + self.io.write_error3( + &format!( + " - Loading <info>{}</info> (<comment>{}</comment>) from cache", + package.get_name(), + package.get_full_pretty_version(true, 0) + ), + true, + io_interface::VERY_VERBOSE, + ); + } + // mark the file as having been written in cache even though it is only read from cache, so that if + // the cache is corrupt the archive will be deleted and the next attempt will re-download it + // see https://github.com/composer/composer/issues/10028 + if let Some(cache) = self.cache.as_ref() { + if !cache.borrow().is_read_only() { + self.last_cache_writes + .lock() + .unwrap() + .insert(package.get_name().to_string(), cache_key.clone()); + } + } + } else { + if output { + self.io.write_error(&format!( + " - Downloading <info>{}</info> (<comment>{}</comment>)", + package.get_name(), + package.get_full_pretty_version(true, 0) + )); + } + + let add_copy_result = self + .http_downloader + .borrow_mut() + .add_copy(&url.processed, &file_name, package.get_transport_options()) + .await; + match add_copy_result { + Ok(mut response) => { + // === $accept($response) === + let cache_key = urls[0].cache_key.clone(); + let file_size = match filesize(&file_name) { + Some(size) => PhpMixed::Int(size), + None => PhpMixed::String( + response + .get_header("Content-Length") + .unwrap_or_else(|| "?".to_string()), + ), + }; + DOWNLOAD_METADATA + .lock() + .unwrap() + .insert(package.get_name().to_string(), file_size); + + if Platform::get_env("GITHUB_ACTIONS").is_some() + && Platform::get_env("COMPOSER_TESTS_ARE_RUNNING").is_none() + { + RESPONSE_HEADERS.lock().unwrap().insert( + package.get_name().to_string(), + response.get_headers().clone(), + ); + } + + if let Some(cache) = self.cache.as_ref() { + if !cache.borrow().is_read_only() { + self.last_cache_writes + .lock() + .unwrap() + .insert(package.get_name().to_string(), cache_key.clone()); + cache.borrow_mut().copy_from(&cache_key, &file_name); + } + } + + response.collect(); + } + Err(e) => { + // === $reject($e) === + // clean up + if file_exists(&file_name) { + self.filesystem.borrow().unlink(&file_name)?; + } + self.clear_last_cache_write(package); + + if e.downcast_ref::<IrrecoverableDownloadException>().is_some() { + return Err(e); + } + + if e.downcast_ref::<MaxFileSizeExceededException>().is_some() { + return Err(e); + } + + if let Some(te) = e.downcast_ref::<TransportException>() { + // if we got an http response with a proper code, then requesting again will probably not help, abort + if 0 != te.get_code() + && !in_array( + PhpMixed::Int(te.get_code()), + &PhpMixed::List(vec![ + Box::new(PhpMixed::Int(500)), + Box::new(PhpMixed::Int(502)), + Box::new(PhpMixed::Int(503)), + Box::new(PhpMixed::Int(504)), + ]), + true, + ) + { + retries = 0; + } + + // special error code returned when network is being artificially disabled + if te.get_status_code() == Some(499) { + retries = 0; + urls.clear(); + } + } + + if retries > 0 { + usleep(500000); + retries -= 1; + + continue; + } + + if !urls.is_empty() { + urls.remove(0); + } + if urls.len() > 0 { + let code = e + .downcast_ref::<TransportException>() + .map_or(0, |te| te.get_code()); + if self.io.is_debug() { + self.io.write_error(&format!( + " Failed downloading {}: [{}] {}: {}", + package.get_name(), + get_class(&PhpMixed::Null), + code, + e + )); + self.io.write_error(&format!( + " Trying the next URL for {}", + package.get_name() + )); + } else { + self.io.write_error(&format!( + " Failed downloading {}, trying the next URL ({}: {})", + package.get_name(), + code, + e + )); + } + + retries = 3; + usleep(100000); + + continue; + } + + return Err(e); + } + } + } + + // === $result->then(verify) === + if !file_exists(&file_name) { + return Err(UnexpectedValueException { + message: format!( + "{} could not be saved to {}, make sure the directory is writable and you have internet connectivity", + url.base, file_name + ), + code: 0, + } + .into()); + } + + if let Some(checksum) = checksum.as_deref() { + if !checksum.is_empty() + && hash_file("sha1", &file_name).as_deref() != Some(checksum) + { + return Err(UnexpectedValueException { + message: format!( + "The checksum verification of the file failed (downloaded from {})", + url.base + ), + code: 0, + } + .into()); + } + } + + // TODO(plugin): dispatch PostFileDownloadEvent. + + return Ok(Some(PhpMixed::String(file_name))); + } } /// @inheritDoc @@ -393,70 +605,52 @@ impl ChangeReportInterface for FileDownloader { package: &dyn PackageInterface, path: &str, ) -> Result<Option<String>> { - // TODO(phase-c-promise): get_local_changes drives promises via http_downloader/process wait(); - // converting requires deciding whether ChangeReportInterface::get_local_changes becomes async. Left as-is. // TODO(phase-b): swap self.io to NullIO and restore — needs a take/swap helper let mut null_io = NullIO::new(); null_io.load_configuration(&mut *self.config.borrow_mut())?; - // TODO(phase-b): `e` is captured by both the inner closure (assignment in error handler) - // and the outer block (read after the closure). PHP closures capture by reference (`use (&$e)`); - // emulate via Rc<RefCell> or restructure when proper async/promise types land. - let e: std::cell::RefCell<Option<anyhow::Error>> = std::cell::RefCell::new(None); - let output_cell: std::cell::RefCell<String> = std::cell::RefCell::new(String::new()); let target_dir = Filesystem::trim_trailing_slash(path); - let result: Result<()> = (|| -> Result<()> { + // PHP attaches an onRejected handler to capture the error and drives the promise via + // httpDownloader->wait() / process->wait(); the single-threaded sync bridge block_on's the + // download/install futures, so a rejection surfaces directly as the Err captured below. + let result: Result<String> = (|| -> Result<String> { if is_dir(&format!("{}_compare", target_dir)) { self.filesystem .borrow_mut() .remove_directory(&format!("{}_compare", target_dir))?; } - let promise = - self.download(package, &format!("{}_compare", target_dir), None, false)?; - promise.then_with( - None, - Some(Box::new(|ex: PhpMixed| { - let _ = ex; - PhpMixed::Null - })), - ); - self.http_downloader.borrow_mut().wait()?; - if e.borrow().is_some() { - return Err(e.borrow_mut().take().unwrap()); - } - let promise = self.install(package, &format!("{}_compare", target_dir), false)?; - promise.then_with( - None, - Some(Box::new(|ex: PhpMixed| { - let _ = ex; - PhpMixed::Null - })), - ); - self.process.borrow_mut().wait()?; - if e.borrow().is_some() { - return Err(e.borrow_mut().take().unwrap()); - } + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.download( + package, + &format!("{}_compare", target_dir), + None, + false, + ))?; + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.install(package, &format!("{}_compare", target_dir), false))?; let mut comparer = Comparer::new(); comparer.set_source(format!("{}_compare", target_dir)); comparer.set_update(target_dir.clone()); comparer.do_compare(); - *output_cell.borrow_mut() = comparer.get_changed_as_string(true, false); + let output = comparer.get_changed_as_string(true, false); self.filesystem .borrow_mut() .remove_directory(&format!("{}_compare", target_dir))?; - Ok(()) + Ok(output) })(); - if let Err(err) = result { - *e.borrow_mut() = Some(err); - } - let e = e.into_inner(); - let output = output_cell.into_inner(); // TODO(phase-b): restore self.io = prev_io + let (e, output) = match result { + Ok(output) => (None, output), + Err(err) => (Some(err), String::new()), + }; + if let Some(err) = e { if self.io.is_debug() { return Err(err); @@ -499,15 +693,12 @@ impl FileDownloader { .to_string() } - pub(crate) fn clear_last_cache_write(&mut self, package: &dyn PackageInterface) { - if self.cache.is_some() && self.last_cache_writes.contains_key(package.get_name()) { - let key = self - .last_cache_writes - .get(package.get_name()) - .unwrap() - .clone(); + pub(crate) fn clear_last_cache_write(&self, package: &dyn PackageInterface) { + let mut last_cache_writes = self.last_cache_writes.lock().unwrap(); + if self.cache.is_some() && last_cache_writes.contains_key(package.get_name()) { + let key = last_cache_writes.get(package.get_name()).unwrap().clone(); self.cache.as_ref().unwrap().borrow_mut().remove(&key); - self.last_cache_writes.shift_remove(package.get_name()); + last_cache_writes.shift_remove(package.get_name()); } } diff --git a/crates/shirabe/src/downloader/fossil_downloader.rs b/crates/shirabe/src/downloader/fossil_downloader.rs index 25f3a31..1e164b8 100644 --- a/crates/shirabe/src/downloader/fossil_downloader.rs +++ b/crates/shirabe/src/downloader/fossil_downloader.rs @@ -250,6 +250,7 @@ impl FossilDownloader { // TODO(phase-b): wire up VcsDownloader trait properly. FossilDownloader extends VcsDownloader // which implements DownloaderInterface in PHP. Delegating each trait method to todo!() until the // inner VcsDownloaderBase exposes the matching impl surface. +#[async_trait::async_trait(?Send)] impl DownloaderInterface for FossilDownloader { fn get_installation_source(&self) -> String { todo!() diff --git a/crates/shirabe/src/downloader/git_downloader.rs b/crates/shirabe/src/downloader/git_downloader.rs index e43d31b..a5e3638 100644 --- a/crates/shirabe/src/downloader/git_downloader.rs +++ b/crates/shirabe/src/downloader/git_downloader.rs @@ -4,7 +4,6 @@ use crate::io::io_interface; use anyhow::Result; use indexmap::IndexMap; use shirabe_external_packages::composer::pcre::{CaptureKey, Preg}; -use shirabe_external_packages::react::promise; use shirabe_php_shim::{ PhpMixed, RuntimeException, array_map, basename, dirname, implode, in_array, is_dir, preg_quote, realpath, rtrim, sprintf, strlen, strpos, substr, trim, version_compare, @@ -1359,6 +1358,7 @@ impl DvcsDownloaderInterface for GitDownloader { // TODO(phase-b): GitDownloader extends VcsDownloader which implements DownloaderInterface. // Delegating each trait method to todo!() until the inner VcsDownloaderBase exposes the // matching impl surface. +#[async_trait::async_trait(?Send)] impl crate::downloader::DownloaderInterface for GitDownloader { fn get_installation_source(&self) -> String { todo!() diff --git a/crates/shirabe/src/downloader/gzip_downloader.rs b/crates/shirabe/src/downloader/gzip_downloader.rs index d852a00..fe44fed 100644 --- a/crates/shirabe/src/downloader/gzip_downloader.rs +++ b/crates/shirabe/src/downloader/gzip_downloader.rs @@ -129,6 +129,7 @@ impl GzipDownloader { } } +#[async_trait::async_trait(?Send)] impl crate::downloader::DownloaderInterface for GzipDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/downloader/hg_downloader.rs b/crates/shirabe/src/downloader/hg_downloader.rs index 56f3b6f..b12e348 100644 --- a/crates/shirabe/src/downloader/hg_downloader.rs +++ b/crates/shirabe/src/downloader/hg_downloader.rs @@ -220,6 +220,7 @@ impl HgDownloader { // TODO(phase-b): wire up VcsDownloader trait properly. HgDownloader extends VcsDownloader which // implements DownloaderInterface in PHP. Delegating each trait method to todo!() until the inner // VcsDownloaderBase exposes the matching impl surface. +#[async_trait::async_trait(?Send)] impl DownloaderInterface for HgDownloader { fn get_installation_source(&self) -> String { todo!() diff --git a/crates/shirabe/src/downloader/path_downloader.rs b/crates/shirabe/src/downloader/path_downloader.rs index f677a50..46090ac 100644 --- a/crates/shirabe/src/downloader/path_downloader.rs +++ b/crates/shirabe/src/downloader/path_downloader.rs @@ -537,6 +537,7 @@ impl VcsCapableDownloaderInterface for PathDownloader { // overrides download/install/remove with &mut self signatures that diverge from the trait. The // trait methods here delegate to the inner FileDownloader; the bespoke overrides on the struct // itself are not yet routed through the trait. +#[async_trait::async_trait(?Send)] impl DownloaderInterface for PathDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/downloader/perforce_downloader.rs b/crates/shirabe/src/downloader/perforce_downloader.rs index fdb0f0d..b3f7bc9 100644 --- a/crates/shirabe/src/downloader/perforce_downloader.rs +++ b/crates/shirabe/src/downloader/perforce_downloader.rs @@ -159,6 +159,7 @@ impl PerforceDownloader { // TODO(phase-b): wire up VcsDownloader trait properly. PerforceDownloader extends VcsDownloader // which implements DownloaderInterface in PHP. Delegating each trait method to todo!() until the // inner VcsDownloaderBase exposes the matching impl surface. +#[async_trait::async_trait(?Send)] impl DownloaderInterface for PerforceDownloader { fn get_installation_source(&self) -> String { todo!() diff --git a/crates/shirabe/src/downloader/phar_downloader.rs b/crates/shirabe/src/downloader/phar_downloader.rs index 9cfb18f..a598437 100644 --- a/crates/shirabe/src/downloader/phar_downloader.rs +++ b/crates/shirabe/src/downloader/phar_downloader.rs @@ -63,6 +63,7 @@ impl PharDownloader { } } +#[async_trait::async_trait(?Send)] impl DownloaderInterface for PharDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/downloader/rar_downloader.rs b/crates/shirabe/src/downloader/rar_downloader.rs index c6031d9..f482582 100644 --- a/crates/shirabe/src/downloader/rar_downloader.rs +++ b/crates/shirabe/src/downloader/rar_downloader.rs @@ -143,6 +143,7 @@ impl RarDownloader { } } +#[async_trait::async_trait(?Send)] impl crate::downloader::DownloaderInterface for RarDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/downloader/svn_downloader.rs b/crates/shirabe/src/downloader/svn_downloader.rs index f3d0fef..162e91c 100644 --- a/crates/shirabe/src/downloader/svn_downloader.rs +++ b/crates/shirabe/src/downloader/svn_downloader.rs @@ -3,7 +3,6 @@ use crate::io::io_interface; use indexmap::IndexMap; use shirabe_external_packages::composer::pcre::{CaptureKey, Preg}; -use shirabe_external_packages::react::promise; use shirabe_php_shim::{PhpMixed, RuntimeException, is_dir, version_compare}; use crate::config::Config; @@ -432,6 +431,7 @@ impl SvnDownloader { // TODO(phase-b): wire up VcsDownloader trait properly. SvnDownloader extends VcsDownloader which // implements DownloaderInterface in PHP. Delegating each trait method to todo!() until the inner // VcsDownloaderBase exposes the matching impl surface. +#[async_trait::async_trait(?Send)] impl DownloaderInterface for SvnDownloader { fn get_installation_source(&self) -> String { todo!() diff --git a/crates/shirabe/src/downloader/tar_downloader.rs b/crates/shirabe/src/downloader/tar_downloader.rs index c720ccf..09b6f5a 100644 --- a/crates/shirabe/src/downloader/tar_downloader.rs +++ b/crates/shirabe/src/downloader/tar_downloader.rs @@ -58,6 +58,7 @@ impl TarDownloader { } } +#[async_trait::async_trait(?Send)] impl DownloaderInterface for TarDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/downloader/xz_downloader.rs b/crates/shirabe/src/downloader/xz_downloader.rs index d15287d..ca867df 100644 --- a/crates/shirabe/src/downloader/xz_downloader.rs +++ b/crates/shirabe/src/downloader/xz_downloader.rs @@ -77,6 +77,7 @@ impl XzDownloader { } } +#[async_trait::async_trait(?Send)] impl crate::downloader::DownloaderInterface for XzDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/downloader/zip_downloader.rs b/crates/shirabe/src/downloader/zip_downloader.rs index 88b3d3a..eb057e0 100644 --- a/crates/shirabe/src/downloader/zip_downloader.rs +++ b/crates/shirabe/src/downloader/zip_downloader.rs @@ -14,7 +14,8 @@ use shirabe_external_packages::symfony::component::process::Process; use shirabe_php_shim::{ DIRECTORY_SEPARATOR, ErrorException, PhpMixed, RuntimeException, UnexpectedValueException, ZipArchive, bin2hex, class_exists, file_exists, file_get_contents, filesize, function_exists, - hash_file, is_file, json_encode, random_int, version_compare, + hash_file, is_file, json_encode, random_int, str_contains, str_replace, strlen, substr, + version_compare, }; use std::sync::Mutex; @@ -270,27 +271,153 @@ impl ZipDownloader { } } - // TODO(phase-c-promise): execute_async + .then fallback closure captures &mut self/io; - // recursive promise flattening, not a mechanical await chain. - // TODO(phase-b): full try_fallback closure deferred — PHP captures `$io`, `$self` - // and several locals by reference, conflicting with Rust's borrow checker because - // `extract_with_zip_archive` later needs `&mut self`. Restructure once the - // promise/closure plumbing supports that shape. - let _ = ( - is_last_chance, - file, - path, - executable, - package, - &command, - &self.inner.io, - ); - match self.inner.process.borrow_mut().execute_async(&command, ()) { - Ok(_promise) => todo!("phase-b: chain promise.then with fallback closure"), - Err(_e) => todo!("phase-b: pipe execute_async error into try_fallback"), + let process_result = self + .inner + .process + .borrow_mut() + .execute_async(&command, ()) + .await; + match process_result { + Ok(process) => { + if !process.is_successful() { + if self.cleanup_executed.contains_key(package.get_name()) { + return Err(RuntimeException { + message: format!( + "Failed to extract {} as the installation was aborted by another package operation.", + package.get_name() + ), + code: 0, + } + .into()); + } + + let output = process.get_error_output(); + let output = + str_replace(&format!(", {}.zip or {}.ZIP", file, file), "", &output); + + return self + .try_fallback( + RuntimeException { + message: format!( + "Failed to extract {}: ({}) {}\n\n{}", + package.get_name(), + process + .get_exit_code() + .map(|c| c.to_string()) + .unwrap_or_default(), + command.join(" "), + output + ), + code: 0, + } + .into(), + is_last_chance, + file, + path, + package, + &executable, + ) + .await; + } + + Ok(None) + } + Err(e) => { + self.try_fallback(e, is_last_chance, file, path, package, &executable) + .await + } } } + async fn try_fallback( + &mut self, + process_error: anyhow::Error, + is_last_chance: bool, + file: &str, + path: &str, + package: &dyn PackageInterface, + executable: &str, + ) -> Result<Option<PhpMixed>> { + if is_last_chance { + return Err(process_error); + } + + if str_contains(&process_error.to_string(), "zip bomb") { + return Err(process_error); + } + + if !is_file(file) { + self.inner + .io + .write_error(&format!(" <warning>{}</warning>", process_error)); + self.inner.io.write_error(" <warning>This most likely is due to a custom installer plugin not handling the returned Promise from the downloader</warning>"); + self.inner.io.write_error(" <warning>See https://github.com/composer/installers/commit/5006d0c28730ade233a8f42ec31ac68fb1c5c9bb for an example fix</warning>"); + } else { + self.inner + .io + .write_error(&format!(" <warning>{}</warning>", process_error)); + self.inner.io.write_error(" The archive may contain identical file names with different capitalization (which fails on case insensitive filesystems)"); + self.inner.io.write_error(&format!( + " Unzip with {} command failed, falling back to ZipArchive class", + executable + )); + + // additional debug data to try to figure out GH actions issues https://github.com/composer/composer/issues/11148 + if Platform::get_env("GITHUB_ACTIONS").is_some() + && Platform::get_env("COMPOSER_TESTS_ARE_RUNNING").is_none() + { + self.inner.io.write_error(" <warning>Additional debug info, please report to https://github.com/composer/composer/issues/11148 if you see this:</warning>"); + self.inner.io.write_error(&format!( + "File size: {}", + filesize(file).map(|s| s.to_string()).unwrap_or_default() + )); + self.inner.io.write_error(&format!( + "File SHA1: {}", + hash_file("sha1", file).unwrap_or_default() + )); + self.inner.io.write_error(&format!( + "First 100 bytes (hex): {}", + bin2hex( + substr(&file_get_contents(file).unwrap_or_default(), 0, Some(100)) + .as_bytes() + ) + )); + self.inner.io.write_error(&format!( + "Last 100 bytes (hex): {}", + bin2hex( + substr(&file_get_contents(file).unwrap_or_default(), -100, None).as_bytes() + ) + )); + if strlen(package.get_dist_url().unwrap_or("")) > 0 { + self.inner.io.write_error(&format!( + "Origin URL: {}", + self.inner + .process_url(package, package.get_dist_url().unwrap_or(""))? + )); + let headers = { + let response_headers = crate::downloader::file_downloader::RESPONSE_HEADERS + .lock() + .unwrap(); + match response_headers.get(package.get_name()) { + Some(list) => PhpMixed::List( + list.iter() + .map(|s| Box::new(PhpMixed::String(s.clone()))) + .collect(), + ), + None => PhpMixed::List(vec![]), + } + }; + self.inner.io.write_error(&format!( + "Response Headers: {}", + json_encode(&headers).unwrap_or_default() + )); + } + } + } + + self.extract_with_zip_archive(package, file, path).await + } + async fn extract_with_zip_archive( &mut self, package: &dyn PackageInterface, @@ -425,6 +552,7 @@ impl ZipDownloader { // TODO(phase-b): ZipDownloader::download is overridden with extra setup (UNZIP_COMMANDS init, // etc.). The trait method here delegates straight to the inner FileDownloader; the bespoke // override on the struct itself takes &mut self and is not yet routed through the trait. +#[async_trait::async_trait(?Send)] impl crate::downloader::DownloaderInterface for ZipDownloader { fn get_installation_source(&self) -> String { self.inner.get_installation_source() diff --git a/crates/shirabe/src/installer/installation_manager.rs b/crates/shirabe/src/installer/installation_manager.rs index ae0470a..7a05324 100644 --- a/crates/shirabe/src/installer/installation_manager.rs +++ b/crates/shirabe/src/installer/installation_manager.rs @@ -3,7 +3,6 @@ use crate::io::io_interface; use anyhow::Result; use indexmap::IndexMap; -use shirabe_external_packages::react::promise; use shirabe_external_packages::seld::signal::SignalHandler; use shirabe_php_shim::{ InvalidArgumentException, PhpMixed, array_search_mixed, array_splice, array_unshift, count, @@ -181,7 +180,10 @@ impl InstallationManager { // @var array<callable(): ?PromiseInterface<void|null>> $cleanupPromises let mut cleanup_promises: IndexMap< i64, - Box<dyn Fn() -> Option<Box<dyn PromiseInterface>>>, + Box< + dyn Fn() + -> Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>>, + >, > = IndexMap::new(); let signal_handler = SignalHandler::create( @@ -234,15 +236,17 @@ impl InstallationManager { } for batch_to_execute in batches { - self.download_and_execute_batch( - repo, - batch_to_execute, - &mut cleanup_promises, - dev_mode, - run_scripts, - download_only, - // TODO(phase-b): allOperations should be the original full list; would require clone - vec![], + tokio::runtime::Runtime::new().unwrap().block_on( + self.download_and_execute_batch( + repo, + batch_to_execute, + &mut cleanup_promises, + dev_mode, + run_scripts, + download_only, + // TODO(phase-b): allOperations should be the original full list; would require clone + vec![], + ), )?; } @@ -255,7 +259,9 @@ impl InstallationManager { match result { Ok(()) => {} Err(e) => { - self.run_cleanup(&cleanup_promises); + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.run_cleanup(&cleanup_promises)); return Err(e); } } @@ -275,20 +281,22 @@ impl InstallationManager { /// @param OperationInterface[] $operations List of operations to execute in this batch /// @param OperationInterface[] $allOperations Complete list of operations to be executed in the install job, used for event listeners /// @phpstan-param array<callable(): ?PromiseInterface<void|null>> $cleanupPromises - fn download_and_execute_batch( + async fn download_and_execute_batch( &mut self, repo: &mut dyn InstalledRepositoryInterface, operations: IndexMap<i64, Box<dyn OperationInterface>>, - cleanup_promises: &mut IndexMap<i64, Box<dyn Fn() -> Option<Box<dyn PromiseInterface>>>>, + cleanup_promises: &mut IndexMap< + i64, + Box< + dyn Fn() + -> Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>>, + >, + >, dev_mode: bool, run_scripts: bool, download_only: bool, all_operations: Vec<Box<dyn OperationInterface>>, ) -> Result<()> { - // TODO(phase-c-promise): Loop::wait-driven batch orchestration (download/cleanup promises); - // rewrite to await collected futures once the async Loop boundary lands. - let mut promises: Vec<Box<dyn PromiseInterface>> = vec![]; - for (index, operation) in &operations { let op_type = operation.get_operation_type(); @@ -316,35 +324,35 @@ impl InstallationManager { } let installer = self.get_installer(package.get_type())?; - // TODO(phase-b): closure captures installer + package; needs Arc/Rc for shared state + // TODO(phase-b): closure captures installer + package; needs Rc-shared installer/package let _ = installer; let op_type_clone = op_type.clone(); - let cleanup: Box<dyn Fn() -> Option<Box<dyn PromiseInterface>>> = - Box::new(move || -> Option<Box<dyn PromiseInterface>> { - // avoid calling cleanup if the download was not even initialized for a package - // as without installation source configured nothing will work - // TODO(phase-b): if (null === $package->getInstallationSource()) return \React\Promise\resolve(null); - let _ = &op_type_clone; - Some(promise::resolve(None)) - }); + let cleanup: Box< + dyn Fn() + -> Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>>, + > = Box::new(move || { + // avoid calling cleanup if the download was not even initialized for a package + // as without installation source configured nothing will work + // TODO(phase-b): if (null === $package->getInstallationSource()) return resolve(null); + let _ = &op_type_clone; + // TODO(phase-c-promise): build the real installer.cleanup() future once the installer + // can be shared into a 'static cleanup closure (Stage 2 Rc/Arc). + let fut: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>> = + Box::pin(async { Ok(()) }); + Some(fut) + }); cleanup_promises.insert(*index, cleanup); if op_type != "uninstall" { + // TODO(phase-c-promise): PHP collects every download and runs them concurrently via + // Loop::wait; the single-threaded loop awaits each serially instead. let installer = self.get_installer(package.get_type())?; - let promise = installer.download(package, initial_package); - if let Ok(Some(p)) = promise { - promises.push(p); - } + installer.download(package, initial_package).await?; } } - // execute all downloads first - if (promises.len() as i64) > 0 { - self.wait_on_promises(promises); - } - if download_only { - self.run_cleanup(cleanup_promises); + self.run_cleanup(cleanup_promises).await; return Ok(()); } @@ -385,7 +393,8 @@ impl InstallationManager { dev_mode, run_scripts, &all_operations, - )?; + ) + .await?; } Ok(()) @@ -394,18 +403,21 @@ impl InstallationManager { /// @param OperationInterface[] $operations List of operations to execute in this batch /// @param OperationInterface[] $allOperations Complete list of operations to be executed in the install job, used for event listeners /// @phpstan-param array<callable(): ?PromiseInterface<void|null>> $cleanupPromises - fn execute_batch( + async fn execute_batch( &mut self, repo: &mut dyn InstalledRepositoryInterface, operations: IndexMap<i64, Box<dyn OperationInterface>>, - cleanup_promises: &IndexMap<i64, Box<dyn Fn() -> Option<Box<dyn PromiseInterface>>>>, + cleanup_promises: &IndexMap< + i64, + Box< + dyn Fn() + -> Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>>, + >, + >, dev_mode: bool, run_scripts: bool, all_operations: &[Box<dyn OperationInterface>], ) -> Result<()> { - // TODO(phase-c-promise): Loop::wait-driven batch orchestration (prepare/install promises); - // rewrite to await collected futures once the async Loop boundary lands. - let mut promises: Vec<Box<dyn PromiseInterface>> = vec![]; let mut post_exec_callbacks: Vec<Box<dyn Fn()>> = vec![]; for (index, operation) in operations { @@ -473,14 +485,13 @@ impl InstallationManager { let _io = self.io.as_ref(); let installer = self.get_installer(package.get_type())?; - let promise = installer.prepare(&op_type, package, initial_package); - let promise = match promise { - Ok(Some(p)) => p, - Ok(None) => promise::resolve(None), - Err(e) => return Err(e), - }; + // TODO(phase-c-promise): PHP chains prepare()->then(install/update/uninstall)->then(cleanup + // + repo.write); the single-threaded loop awaits prepare and leaves the rest as phase-b work. + installer + .prepare(&op_type, package, initial_package) + .await?; - // TODO(phase-b): chain `.then(cb1).then(cb2)` with cleanup_promises[index], repo.write, etc. + // TODO(phase-b): chain the install/update/uninstall step with cleanup_promises[index], repo.write, etc. let _ = cleanup_promises.get(&index); let event_name_post = match op_type.as_str() { @@ -497,13 +508,6 @@ impl InstallationManager { // dispatcher.dispatch_package_event(event_name_post, dev_mode, repo, all_operations, operation); })); } - - promises.push(promise); - } - - // execute all prepare => installs/updates/removes => cleanup steps - if (promises.len() as i64) > 0 { - self.wait_on_promises(promises); } Platform::workaround_filesystem_issues(); @@ -515,32 +519,6 @@ impl InstallationManager { Ok(()) } - /// @param array<PromiseInterface<void|null>> $promises - fn wait_on_promises(&mut self, promises: Vec<Box<dyn PromiseInterface>>) { - // TODO(phase-c-promise): thin wrapper over Loop::wait (the async boundary owned separately). - let mut progress: Option<()> = None; - // TODO(phase-b): self.io instanceof ConsoleIO downcast - let io_is_console = false; - if self.output_progress - && io_is_console - && Platform::get_env("CI").is_none() - && !self.io.is_debug() - && (promises.len() as i64) > 1 - { - // TODO(phase-b): progress = self.io.get_progress_bar(); - progress = Some(()); - } - // TODO(phase-b): pass actual ProgressBar when self.io.get_progress_bar() is implemented - let _ = self.loop_.borrow_mut().wait(promises, None); - if progress.is_some() { - // progress.clear(); - // ProgressBar in non-decorated output does not output a final line-break and clear() does nothing - if !self.io.is_decorated() { - self.io.write_error3("", true, io_interface::NORMAL); - } - } - } - /// Executes download operation. /// /// @phpstan-return PromiseInterface<void|null>|null @@ -653,10 +631,8 @@ impl InstallationManager { } pub fn notify_installs(&mut self, _io: &dyn IOInterface) { - // TODO(phase-c-promise): collects http_downloader.add() promises and drives them via Loop::wait; - // rewrite to await the collected futures once the async Loop boundary lands. - let mut promises: Vec<Box<dyn PromiseInterface>> = vec![]; - + // TODO(phase-c-promise): PHP collects every http_downloader.add() promise and runs them via + // Loop::wait; the single-threaded sync bridge block_on's each notification serially instead. let result: Result<()> = (|| -> Result<()> { for (repo_url, packages) in &self.notifiable_packages { // non-batch API, deprecated @@ -699,13 +675,13 @@ impl InstallationManager { ), ); - promises.push( + tokio::runtime::Runtime::new().unwrap().block_on( self.loop_ .borrow() .get_http_downloader() .borrow_mut() - .add(&url, opts)?, - ); + .add(&url, opts), + )?; } continue; @@ -771,17 +747,15 @@ impl InstallationManager { PhpMixed::Array(http.into_iter().map(|(k, v)| (k, Box::new(v))).collect()), ); - promises.push( + tokio::runtime::Runtime::new().unwrap().block_on( self.loop_ .borrow() .get_http_downloader() .borrow_mut() - .add(repo_url, opts)?, - ); + .add(repo_url, opts), + )?; } - let _ = self.loop_.borrow_mut().wait(promises, None); - Ok(()) })(); // PHP swallows the exception silently here @@ -800,28 +774,33 @@ impl InstallationManager { } /// @phpstan-param array<callable(): ?PromiseInterface<void|null>> $cleanupPromises - fn run_cleanup( + async fn run_cleanup( &mut self, - cleanup_promises: &IndexMap<i64, Box<dyn Fn() -> Option<Box<dyn PromiseInterface>>>>, + cleanup_promises: &IndexMap< + i64, + Box< + dyn Fn() + -> Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>>, + >, + >, ) { - // TODO(phase-c-promise): runs cleanup closures and drives them via Loop::wait; - // rewrite once the async Loop boundary and async cleanup closures land. - let mut promises: Vec<Box<dyn PromiseInterface>> = vec![]; + let mut promises: Vec<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>> = + vec![]; self.loop_.borrow().abort_jobs(); for (_, cleanup) in cleanup_promises { - // TODO(phase-b): React\Promise\Promise constructor with executor; emulate by wrapping cleanup() + // PHP wraps a missing cleanup promise in \React\Promise\resolve(null). let promise = cleanup(); if let Some(p) = promise { promises.push(p); } else { - promises.push(promise::resolve(None)); + promises.push(Box::pin(async { Ok(()) })); } } if (promises.len() as i64) > 0 { - let _ = self.loop_.borrow_mut().wait(promises, None); + let _ = self.loop_.borrow_mut().wait(promises, None).await; } } } diff --git a/crates/shirabe/src/installer/installer_interface.rs b/crates/shirabe/src/installer/installer_interface.rs index 29171cf..bb510b8 100644 --- a/crates/shirabe/src/installer/installer_interface.rs +++ b/crates/shirabe/src/installer/installer_interface.rs @@ -4,6 +4,7 @@ use crate::package::PackageInterface; use crate::repository::InstalledRepositoryInterface; use shirabe_php_shim::PhpMixed; +#[async_trait::async_trait(?Send)] pub trait InstallerInterface: std::fmt::Debug { fn supports(&self, package_type: &str) -> bool; diff --git a/crates/shirabe/src/installer/library_installer.rs b/crates/shirabe/src/installer/library_installer.rs index 0629de5..29cb455 100644 --- a/crates/shirabe/src/installer/library_installer.rs +++ b/crates/shirabe/src/installer/library_installer.rs @@ -5,7 +5,8 @@ use std::any::Any; use anyhow::Result; use shirabe_external_packages::composer::pcre::Preg; use shirabe_php_shim::{ - InvalidArgumentException, LogicException, is_link, preg_quote, realpath, rmdir, rtrim, strpos, + InvalidArgumentException, LogicException, PhpMixed, dirname, is_dir, is_link, preg_quote, + realpath, rmdir, rtrim, strpos, }; use crate::composer::PartialComposerWeakHandle; @@ -215,6 +216,7 @@ impl LibraryInstaller { } } +#[async_trait::async_trait(?Send)] impl InstallerInterface for LibraryInstaller { fn supports(&self, package_type: &str) -> bool { match &self.r#type { @@ -314,20 +316,16 @@ impl InstallerInterface for LibraryInstaller { self.binary_installer.remove_binaries(package); } - // TODO(phase-c-promise): rewrite install_code().then(installBinaries + repo.addPackage) as an await sequence. - let promise = self.install_code(package)?; - let promise = match promise { - Some(p) => p, - None => shirabe_external_packages::react::promise::resolve(None), - }; + let _ = self.install_code(package).await?; - // TODO(phase-b): promise.then expects Option<Box<dyn FnOnce(Option<PhpMixed>) -> Option<PhpMixed>>> - // arguments. The original PHP closure captures &mut self/binary_installer/repo/package; - // restructuring required. - let _ = promise; - Ok(Some(todo!( - "promise.then(...) chain to install_binaries + repo.add_package" - ))) + let install_path = self.get_install_path(package).unwrap(); + self.binary_installer + .install_binaries(package, &install_path, true); + if !repo.has_package(package) { + repo.add_package(package.clone_package_box()); + } + + Ok(None) } async fn update( @@ -348,20 +346,17 @@ impl InstallerInterface for LibraryInstaller { // self.initialize_vendor_dir(); self.binary_installer.remove_binaries(initial); - // TODO(phase-c-promise): rewrite update_code().then(installBinaries + repo updates) as an await sequence. - let promise = self.update_code(initial, target)?; - let promise = match promise { - Some(p) => p, - None => shirabe_external_packages::react::promise::resolve(None), - }; + let _ = self.update_code(initial, target).await?; + + let install_path = self.get_install_path(target).unwrap(); + self.binary_installer + .install_binaries(target, &install_path, true); + repo.remove_package(initial); + if !repo.has_package(target) { + repo.add_package(target.clone_package_box()); + } - // TODO(phase-b): promise.then expects Option<Box<dyn FnOnce(Option<PhpMixed>) -> Option<PhpMixed>>> - // arguments. Closure captures &mut self/binary_installer/repo/initial/target; - // restructuring required. - let _ = promise; - Ok(Some(todo!( - "promise.then(...) chain to install_binaries + repo updates" - ))) + Ok(None) } async fn uninstall( @@ -377,20 +372,25 @@ impl InstallerInterface for LibraryInstaller { .into()); } - // TODO(phase-c-promise): rewrite remove_code().then(remove_binaries/remove_package/rmdir) as an await sequence. - let promise = self.remove_code(package)?; - let promise = match promise { - Some(p) => p, - None => shirabe_external_packages::react::promise::resolve(None), - }; + let _ = self.remove_code(package).await?; + + let download_path = self.get_package_base_path(package); + self.binary_installer.remove_binaries(package); + repo.remove_package(package); + + if strpos(package.get_name(), "/").map_or(false, |pos| pos != 0) { + let package_vendor_dir = dirname(&download_path); + if is_dir(&package_vendor_dir) + && self.filesystem.borrow().is_dir_empty(&package_vendor_dir) + { + let _ = Silencer::call(|| { + rmdir(&package_vendor_dir); + Ok(()) + }); + } + } - // TODO(phase-b): promise.then expects Option<Box<dyn FnOnce(Option<PhpMixed>) -> Option<PhpMixed>>> - // arguments. Closure captures binary_installer/filesystem/download_path/package/repo; - // restructuring required. - let _ = promise; - Ok(Some(todo!( - "promise.then(...) chain to remove_binaries/remove_package/rmdir" - ))) + Ok(None) } fn get_install_path(&self, package: &dyn PackageInterface) -> Option<String> { diff --git a/crates/shirabe/src/installer/metapackage_installer.rs b/crates/shirabe/src/installer/metapackage_installer.rs index 23d92f2..2ea9685 100644 --- a/crates/shirabe/src/installer/metapackage_installer.rs +++ b/crates/shirabe/src/installer/metapackage_installer.rs @@ -22,6 +22,7 @@ impl MetapackageInstaller { } } +#[async_trait::async_trait(?Send)] impl InstallerInterface for MetapackageInstaller { fn supports(&self, package_type: &str) -> bool { package_type == "metapackage" diff --git a/crates/shirabe/src/installer/noop_installer.rs b/crates/shirabe/src/installer/noop_installer.rs index 68e4ef8..8297165 100644 --- a/crates/shirabe/src/installer/noop_installer.rs +++ b/crates/shirabe/src/installer/noop_installer.rs @@ -8,6 +8,7 @@ use shirabe_php_shim::{InvalidArgumentException, PhpMixed}; #[derive(Debug)] pub struct NoopInstaller; +#[async_trait::async_trait(?Send)] impl InstallerInterface for NoopInstaller { fn supports(&self, _package_type: &str) -> bool { true diff --git a/crates/shirabe/src/installer/plugin_installer.rs b/crates/shirabe/src/installer/plugin_installer.rs index 4f9918e..a540677 100644 --- a/crates/shirabe/src/installer/plugin_installer.rs +++ b/crates/shirabe/src/installer/plugin_installer.rs @@ -41,7 +41,7 @@ impl PluginInstaller { self.get_plugin_manager().borrow_mut().disable_plugins(); } - fn rollback_install( + async fn rollback_install( &mut self, e: anyhow::Error, repo: &mut dyn InstalledRepositoryInterface, @@ -51,7 +51,7 @@ impl PluginInstaller { "Plugin initialization failed ({}), uninstalling plugin", e )); - self.inner.uninstall(repo, package)?; + self.inner.uninstall(repo, package).await?; Err(e) } @@ -61,6 +61,7 @@ impl PluginInstaller { } } +#[async_trait::async_trait(?Send)] impl InstallerInterface for PluginInstaller { fn supports(&self, package_type: &str) -> bool { package_type == "composer-plugin" || package_type == "composer-installer" diff --git a/crates/shirabe/src/installer/project_installer.rs b/crates/shirabe/src/installer/project_installer.rs index 034a07a..8960854 100644 --- a/crates/shirabe/src/installer/project_installer.rs +++ b/crates/shirabe/src/installer/project_installer.rs @@ -29,6 +29,7 @@ impl ProjectInstaller { } } +#[async_trait::async_trait(?Send)] impl InstallerInterface for ProjectInstaller { fn supports(&self, _package_type: &str) -> bool { true diff --git a/crates/shirabe/src/package/archiver/archive_manager.rs b/crates/shirabe/src/package/archiver/archive_manager.rs index 390efc4..e198a03 100644 --- a/crates/shirabe/src/package/archiver/archive_manager.rs +++ b/crates/shirabe/src/package/archiver/archive_manager.rs @@ -164,16 +164,26 @@ impl ArchiveManager { filesystem.ensure_directory_exists(&source_path)?; let download_result = (|| -> anyhow::Result<()> { - let promise = - self.download_manager - .borrow() - .download(package, &source_path, None)?; - SyncHelper::r#await(&self.r#loop, Some(promise))?; - let promise = self - .download_manager - .borrow() - .install(package, &source_path)?; - SyncHelper::r#await(&self.r#loop, Some(promise))?; + SyncHelper::r#await( + &self.r#loop, + Some(Box::pin(async { + self.download_manager + .borrow() + .download(package, &source_path, None) + .await + .map(|_| ()) + })), + )?; + SyncHelper::r#await( + &self.r#loop, + Some(Box::pin(async { + self.download_manager + .borrow() + .install(package, &source_path) + .await + .map(|_| ()) + })), + )?; Ok(()) })(); diff --git a/crates/shirabe/src/package/version/version_guesser.rs b/crates/shirabe/src/package/version/version_guesser.rs index 79f9d6d..47054a4 100644 --- a/crates/shirabe/src/package/version/version_guesser.rs +++ b/crates/shirabe/src/package/version/version_guesser.rs @@ -518,18 +518,14 @@ impl VersionGuesser { strnatcasecmp(b, a) }); - // TODO(phase-c-promise): execute_async is now async; the .then continuation captures and mutates shared - // state (last_index, length, version, pretty_version, promises) and cancels sibling promises, while the - // loop is driven by process.wait(). This concurrent job/cancellation machinery needs design before it can - // become an await-based form, so the promise body (including the inner todo!()) is left as-is. - let mut promises: Vec< - Box<dyn shirabe_external_packages::react::promise::PromiseInterface>, - > = vec![]; self.process.borrow_mut().set_max_jobs(30); - // TODO(phase-b): try/finally with resetMaxJobs + // PHP runs every candidate diff in parallel and cancels the siblings once a zero-length + // diff is found; the single-threaded sync bridge block_on's each diff serially and stops + // at the first zero-length match. let result: Result<()> = (|| -> Result<()> { let mut last_index: i64 = -1; for (index, candidate) in branches.iter().enumerate() { + let index = index as i64; let candidate_version = Preg::replace(r"{^remotes/\S+/}", "", candidate).unwrap_or_default(); @@ -550,22 +546,29 @@ impl VersionGuesser { }, &scm_cmdline, ); - let async_promise = self.process.borrow_mut().execute_async(&cmd_line, path)?; - // TODO(phase-b): closure receives Process in PHP but PromiseInterface::then expects fn(Option<PhpMixed>) -> Option<PhpMixed>; - // closure captures need shared mutable state (last_index, length, version, pretty_version, promises) - promises.push(async_promise.then( - Some(Box::new( - move |_value: Option<PhpMixed>| -> Option<PhpMixed> { - todo!( - "mutate last_index/length/version/pretty_version and possibly cancel promises" - ) - }, - )), - None, - )); + let process = tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.process.borrow_mut().execute_async(&cmd_line, path))?; + if !process.is_successful() { + continue; + } + + let output = process.get_output(); + // overwrite existing if we have a shorter diff, or we have an equal diff and an index that comes later in the array (i.e. older version) + // as newer versions typically have more commits, if the feature branch is based on a newer branch it should have a longer diff to the old version + // but if it doesn't and they have equal diffs, then it probably is based on the old version + if strlen(&output) < length || (strlen(&output) == length && last_index < index) + { + last_index = index; + length = strlen(&output); + version = Some(self.version_parser.normalize_branch(&candidate_version)?); + pretty_version = Some(format!("dev-{}", candidate_version)); + if length == 0 { + break; + } + } } - self.process.borrow_mut().wait(); Ok(()) })(); self.process.borrow_mut().reset_max_jobs(); diff --git a/crates/shirabe/src/repository/composer_repository.rs b/crates/shirabe/src/repository/composer_repository.rs index 3c5eb31..4bd5442 100644 --- a/crates/shirabe/src/repository/composer_repository.rs +++ b/crates/shirabe/src/repository/composer_repository.rs @@ -1103,10 +1103,6 @@ impl ComposerRepository { .map_or(false, |c| c.metadata) && (allow_partial_advisories || api_url.is_none()) { - // TODO(phase-c-promise): collects start_cached_async_download().then_boxed() promises and - // drives them via Loop::wait; rewrite to await the collected futures once the async Loop - // boundary lands. - let mut promises: Vec<Box<dyn PromiseInterface>> = Vec::new(); let names: Vec<String> = package_constraint_map.keys().cloned().collect(); for name in names { let name = strtolower(&name); @@ -1116,89 +1112,46 @@ impl ComposerRepository { continue; } - // TODO(phase-b): then_boxed expects closure returning Box<dyn PromiseInterface>, - // not anyhow::Result<()>; needs structural reshape of closure type - let promise = self - .start_cached_async_download(&name, Some(&name))? - .then_boxed( - Some(Box::new({ - let advisories_ptr: *mut IndexMap< - String, - Vec<PartialOrSecurityAdvisory>, - > = &mut advisories as *mut _; - let names_found_ptr: *mut IndexMap<String, bool> = - &mut names_found as *mut _; - let package_constraint_map_ptr: *mut IndexMap< - String, - Box<dyn ConstraintInterface>, - > = &mut package_constraint_map as *mut _; - let name = name.clone(); - // TODO(phase-b): create closure captures local references (semver_parser, repo_name, - // allow_partial_advisories) but is consumed by a 'static Box; needs restructuring - move |spec: PhpMixed| -> Box<dyn PromiseInterface> { - let _result: anyhow::Result<()> = (|| -> anyhow::Result<()> { - // [$response] = $spec; - let response = spec - .as_list() - .and_then(|l| l.first()) - .map(|b| (**b).clone()) - .unwrap_or(PhpMixed::Null); - let response_arr = match response.as_array() { - Some(a) => a.clone(), - None => return Ok(()), - }; - let sec_advs = match response_arr.get("security-advisories") { - Some(v) => v.clone(), - None => return Ok(()), - }; - let sec_advs_arr = match sec_advs.as_array() { - Some(a) => a.clone(), - None => return Ok(()), - }; - unsafe { - (*names_found_ptr).insert(name.clone(), true); - } - if !sec_advs_arr.is_empty() { - let mut entries: Vec<PartialOrSecurityAdvisory> = - Vec::new(); - for (_k, data_mixed) in sec_advs_arr.iter() { - if let Some(data) = data_mixed.as_array() { - let data_map: IndexMap<String, PhpMixed> = data - .iter() - .map(|(k, v)| (k.clone(), (**v).clone())) - .collect(); - let _pcm: &IndexMap< - String, - Box<dyn ConstraintInterface>, - > = unsafe { &*package_constraint_map_ptr }; - let _ = &data_map; - // TODO(phase-b): call create() closure; it captures references - if let Some(adv) = None::<PartialOrSecurityAdvisory> - { - entries.push(adv); - } - } - } - unsafe { - (*advisories_ptr).insert(name.clone(), entries); - } - } - unsafe { - (*package_constraint_map_ptr).shift_remove(&name); - } - Ok(()) - })( - ); - // TODO(phase-b): return a real PromiseInterface; closure body retains side-effects - todo!("return real PromiseInterface") + let spec = tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.start_cached_async_download(&name, Some(&name)))?; + + // [$response] = $spec; + let response = spec + .as_list() + .and_then(|l| l.first()) + .map(|b| (**b).clone()) + .unwrap_or(PhpMixed::Null); + let response_arr = match response.as_array() { + Some(a) => a.clone(), + None => continue, + }; + let sec_advs_arr = match response_arr + .get("security-advisories") + .and_then(|v| v.as_array()) + { + Some(a) => a.clone(), + None => continue, + }; + + names_found.insert(name.clone(), true); + if !sec_advs_arr.is_empty() { + let mut entries: Vec<PartialOrSecurityAdvisory> = Vec::new(); + for (_k, data_mixed) in sec_advs_arr.iter() { + if let Some(data) = data_mixed.as_array() { + let data_map: IndexMap<String, PhpMixed> = data + .iter() + .map(|(k, v)| (k.clone(), (**v).clone())) + .collect(); + if let Some(adv) = create(&data_map, &name, &package_constraint_map)? { + entries.push(adv); } - })), - None, - ); - promises.push(promise); + } + } + advisories.insert(name.clone(), entries); + } + package_constraint_map.shift_remove(&name); } - - self.r#loop.borrow_mut().wait(promises, None)?; } if let Some(api_url) = api_url { @@ -1851,10 +1804,6 @@ impl ComposerRepository { let mut packages: IndexMap<String, Box<dyn BasePackage>> = IndexMap::new(); let mut names_found: IndexMap<String, bool> = IndexMap::new(); - // TODO(phase-c-promise): collects start_cached_async_download().then_boxed() promises and - // drives them via Loop::wait; rewrite to await the collected futures once the async Loop - // boundary lands. - let mut promises: Vec<Box<dyn PromiseInterface>> = Vec::new(); if self.lazy_providers_url.is_none() { return Err(LogicException { @@ -1908,183 +1857,148 @@ impl ComposerRepository { continue; } - // TODO(phase-b): Box<dyn PackageInterface> is not Clone; share via Rc - let already_loaded_clone: IndexMap< - String, - IndexMap<String, Box<dyn PackageInterface>>, - > = todo!("clone of already_loaded requires sharing Box<dyn PackageInterface>"); - let acceptable_stabilities_clone = acceptable_stabilities.cloned(); - let stability_flags_clone = stability_flags.cloned(); let version_parser = self.version_parser.clone(); - // TODO(phase-b): then_boxed expects closure returning Box<dyn PromiseInterface>, - // not anyhow::Result<()>; needs structural reshape - let promise = self - .start_cached_async_download(&name, Some(&real_name))? - .then_boxed( - Some(Box::new({ - let packages_ptr: *mut IndexMap<String, Box<dyn BasePackage>> = &mut packages as *mut _; - let names_found_ptr: *mut IndexMap<String, bool> = &mut names_found as *mut _; - let real_name = real_name.clone(); - let constraint = constraint; - move |spec: PhpMixed| -> Box<dyn PromiseInterface> { - let _result: anyhow::Result<()> = (|| -> anyhow::Result<()> { - let spec_list = spec.as_list().cloned().unwrap_or_default(); - let response = spec_list - .first() - .map(|b| (**b).clone()) - .unwrap_or(PhpMixed::Null); - let packages_source_val = spec_list - .get(1) - .map(|b| (**b).clone()) - .unwrap_or(PhpMixed::Null); - let packages_source: Option<String> = - packages_source_val.as_string().map(|s| s.to_string()); - if response.is_null() { - return Ok(()); - } - let response_arr = match response.as_array() { - Some(a) => a.clone(), - None => return Ok(()), - }; - let inner_packages = response_arr.get("packages"); - let versions_mixed = match inner_packages - .and_then(|v| v.as_array()) - .and_then(|a| a.get(&real_name)) - .cloned() - { - Some(b) => *b, - None => return Ok(()), - }; + let spec = tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.start_cached_async_download(&name, Some(&real_name)))?; - let mut versions: Vec<IndexMap<String, PhpMixed>> = - match &versions_mixed { - PhpMixed::List(l) => l - .iter() - .filter_map(|v| { - v.as_array().map(|a| { - a.iter() - .map(|(k, v)| (k.clone(), (**v).clone())) - .collect::<IndexMap<String, PhpMixed>>() - }) - }) - .collect(), - PhpMixed::Array(a) => a - .values() - .filter_map(|v| { - v.as_array().map(|a| { - a.iter() - .map(|(k, v)| (k.clone(), (**v).clone())) - .collect::<IndexMap<String, PhpMixed>>() - }) - }) - .collect(), - _ => return Ok(()), - }; + // [$response, $packagesSource] = $spec; + let spec_list = spec.as_list().cloned().unwrap_or_default(); + let response = spec_list + .first() + .map(|b| (**b).clone()) + .unwrap_or(PhpMixed::Null); + let packages_source_val = spec_list + .get(1) + .map(|b| (**b).clone()) + .unwrap_or(PhpMixed::Null); + let packages_source: Option<String> = + packages_source_val.as_string().map(|s| s.to_string()); + if response.is_null() { + continue; + } + let response_arr = match response.as_array() { + Some(a) => a.clone(), + None => continue, + }; + let inner_packages = response_arr.get("packages"); + let versions_mixed = match inner_packages + .and_then(|v| v.as_array()) + .and_then(|a| a.get(&real_name)) + .cloned() + { + Some(b) => *b, + None => continue, + }; - let minified = response_arr - .get("minified") - .and_then(|v| v.as_string()) - .map_or(false, |s| s == "composer/2.0"); - if minified { - // TODO(phase-b): MetadataMinifier::expand expects/returns IndexMap but versions is Vec - versions = todo!("MetadataMinifier::expand signature mismatch with Vec<IndexMap>"); - } + let mut versions: Vec<IndexMap<String, PhpMixed>> = match &versions_mixed { + PhpMixed::List(l) => l + .iter() + .filter_map(|v| { + v.as_array().map(|a| { + a.iter() + .map(|(k, v)| (k.clone(), (**v).clone())) + .collect::<IndexMap<String, PhpMixed>>() + }) + }) + .collect(), + PhpMixed::Array(a) => a + .values() + .filter_map(|v| { + v.as_array().map(|a| { + a.iter() + .map(|(k, v)| (k.clone(), (**v).clone())) + .collect::<IndexMap<String, PhpMixed>>() + }) + }) + .collect(), + _ => continue, + }; - unsafe { - (*names_found_ptr).insert(real_name.clone(), true); - } - let mut versions_to_load: Vec<IndexMap<String, PhpMixed>> = Vec::new(); - for version in versions.into_iter() { - let mut version = version; - let has_vn = version.contains_key("version_normalized"); - if !has_vn { - let v = version - .get("version") - .and_then(|v| v.as_string()) - .unwrap_or("") - .to_string(); - let normalized = version_parser.normalize(&v, None)?; - version.insert( - "version_normalized".to_string(), - PhpMixed::String(normalized), - ); - } else if version - .get("version_normalized") - .and_then(|v| v.as_string()) - .map_or(false, |s| s == VersionParser::DEFAULT_BRANCH_ALIAS) - { - // handling of existing repos which need to remain composer v1 compatible, in case the version_normalized contained VersionParser::DEFAULT_BRANCH_ALIAS, we renormalize it - let v = version - .get("version") - .and_then(|v| v.as_string()) - .unwrap_or("") - .to_string(); - let normalized = version_parser.normalize(&v, None)?; - version.insert( - "version_normalized".to_string(), - PhpMixed::String(normalized), - ); - } + let minified = response_arr + .get("minified") + .and_then(|v| v.as_string()) + .map_or(false, |s| s == "composer/2.0"); + if minified { + // TODO(phase-b): MetadataMinifier::expand expects/returns IndexMap but versions is Vec + versions = todo!("MetadataMinifier::expand signature mismatch with Vec<IndexMap>"); + } - let version_normalized = version - .get("version_normalized") - .and_then(|v| v.as_string()) - .unwrap_or("") - .to_string(); - // avoid loading packages which have already been loaded - if already_loaded_clone - .get(&real_name) - .map_or(false, |m| m.contains_key(&version_normalized)) - { - continue; - } + names_found.insert(real_name.clone(), true); + let mut versions_to_load: Vec<IndexMap<String, PhpMixed>> = Vec::new(); + for version in versions.into_iter() { + let mut version = version; + let has_vn = version.contains_key("version_normalized"); + if !has_vn { + let v = version + .get("version") + .and_then(|v| v.as_string()) + .unwrap_or("") + .to_string(); + let normalized = version_parser.normalize(&v, None)?; + version.insert( + "version_normalized".to_string(), + PhpMixed::String(normalized), + ); + } else if version + .get("version_normalized") + .and_then(|v| v.as_string()) + .map_or(false, |s| s == VersionParser::DEFAULT_BRANCH_ALIAS) + { + // handling of existing repos which need to remain composer v1 compatible, in case the version_normalized contained VersionParser::DEFAULT_BRANCH_ALIAS, we renormalize it + let v = version + .get("version") + .and_then(|v| v.as_string()) + .unwrap_or("") + .to_string(); + let normalized = version_parser.normalize(&v, None)?; + version.insert( + "version_normalized".to_string(), + PhpMixed::String(normalized), + ); + } - let acceptable = ComposerRepository::is_version_acceptable_static( - constraint.as_deref(), - &real_name, - &version, - acceptable_stabilities_clone.as_ref(), - stability_flags_clone.as_ref(), - )?; - if acceptable { - versions_to_load.push(version); - } - } + let version_normalized = version + .get("version_normalized") + .and_then(|v| v.as_string()) + .unwrap_or("") + .to_string(); + // avoid loading packages which have already been loaded + if already_loaded + .get(&real_name) + .map_or(false, |m| m.contains_key(&version_normalized)) + { + continue; + } - let loaded_packages: Vec<Box<dyn BasePackage>> = - ComposerRepository::create_packages_static( - versions_to_load, - packages_source, - )?; - for mut package in loaded_packages.into_iter() { - package.set_repository_self(); - let hash_c = spl_object_hash(&*package); - if let Some(alias) = package.as_alias_package_mut() { - let aliased_hash = spl_object_hash(alias.get_alias_of()); - if !unsafe { (*packages_ptr).contains_key(&aliased_hash) } { - alias.get_alias_of_mut().set_repository_self(); - let aliased_clone = dyn_clone_box(alias.get_alias_of()); - unsafe { - (*packages_ptr).insert(aliased_hash, aliased_clone); - } - } - } - unsafe { - (*packages_ptr).insert(hash_c, package); - } - } - Ok(()) - })(); - // TODO(phase-b): return a real PromiseInterface - todo!("return real PromiseInterface") - } - })), - None, - ); - promises.push(promise); - } + let acceptable = ComposerRepository::is_version_acceptable_static( + constraint.as_deref(), + &real_name, + &version, + acceptable_stabilities, + stability_flags, + )?; + if acceptable { + versions_to_load.push(version); + } + } - self.r#loop.borrow_mut().wait(promises, None)?; + let loaded_packages: Vec<Box<dyn BasePackage>> = + ComposerRepository::create_packages_static(versions_to_load, packages_source)?; + for mut package in loaded_packages.into_iter() { + package.set_repository_self(); + let hash_c = spl_object_hash(&*package); + if let Some(alias) = package.as_alias_package_mut() { + let aliased_hash = spl_object_hash(alias.get_alias_of()); + if !packages.contains_key(&aliased_hash) { + alias.get_alias_of_mut().set_repository_self(); + let aliased_clone = dyn_clone_box(alias.get_alias_of()); + packages.insert(aliased_hash, aliased_clone); + } + } + packages.insert(hash_c, package); + } + } Ok(LoadAsyncPackagesResult { names_found, @@ -3290,10 +3204,6 @@ impl ComposerRepository { return Ok(PhpMixed::Bool(true)); } - // TODO(phase-c-promise): the live fetch path builds accept/reject closures (with raw-pointer - // shared state) and returns http_downloader.add(...).then_with_reject_boxed(accept, reject). - // Rewrite as an async fetch + inline accept/reject handling once the HttpDownloader async - // boundary lands. let mut filename = filename.to_string(); let mut options = self.options.clone(); if let Some(dispatcher) = self.event_dispatcher.as_ref() { @@ -3354,140 +3264,116 @@ impl ComposerRepository { } } - let filename_for_closures = filename.clone(); - let cache_key_owned = cache_key.to_string(); - let url_owned = self.url.clone(); - let last_modified_time_owned = last_modified_time.map(|s| s.to_string()); - - let packages_not_found_ptr: *mut IndexMap<String, bool> = - &mut self.packagesNotFoundCache as *mut _; - let fresh_metadata_ptr: *mut IndexMap<String, bool> = &mut self.freshMetadataUrls as *mut _; - let degraded_ptr: *mut bool = &mut self.degraded_mode as *mut _; - let cache_ptr: *mut Cache = &mut self.cache as *mut _; - let io_ptr = self.io.as_ref() as *const dyn IOInterface; - - let accept = { - let filename = filename_for_closures.clone(); - let cache_key_owned = cache_key_owned.clone(); - let url_owned = url_owned.clone(); - move |response_mixed: PhpMixed| -> anyhow::Result<PhpMixed> { - // emulate: $response is a Response object; status code/body/header accessed via methods - let mut response = Response::from_php_mixed(response_mixed); - // package not found is acceptable for a v2 protocol repository - if response.get_status_code() == 404 { - unsafe { - (*packages_not_found_ptr).insert(filename.clone(), true); - } + let response_result = self + .http_downloader + .borrow_mut() + .add(&filename, options) + .await; + match response_result { + Ok(response) => self.async_fetch_file_accept(response, &filename, cache_key), + Err(e) => self.async_fetch_file_reject(e, &filename, cache_key, last_modified_time), + } + } - let mut empty: IndexMap<String, PhpMixed> = IndexMap::new(); - empty.insert("packages".to_string(), PhpMixed::Array(IndexMap::new())); - return Ok(PhpMixed::Array( - empty.into_iter().map(|(k, v)| (k, Box::new(v))).collect(), - )); - } + /// The onFulfilled handler of `asyncFetchFile`: turns the HTTP response into decoded metadata, + /// caches it, and records the URL as fresh. + fn async_fetch_file_accept( + &mut self, + mut response: Response, + filename: &str, + cache_key: &str, + ) -> anyhow::Result<PhpMixed> { + // package not found is acceptable for a v2 protocol repository + if response.get_status_code() == 404 { + self.packagesNotFoundCache + .insert(filename.to_string(), true); - let mut json = response.get_body().unwrap_or("").to_string(); - if json.is_empty() && response.get_status_code() == 304 { - unsafe { - (*fresh_metadata_ptr).insert(filename.clone(), true); - } + let mut empty: IndexMap<String, PhpMixed> = IndexMap::new(); + empty.insert("packages".to_string(), PhpMixed::Array(IndexMap::new())); + return Ok(PhpMixed::Array( + empty.into_iter().map(|(k, v)| (k, Box::new(v))).collect(), + )); + } - return Ok(PhpMixed::Bool(true)); - } + let mut json = response.get_body().unwrap_or("").to_string(); + if json.is_empty() && response.get_status_code() == 304 { + self.freshMetadataUrls.insert(filename.to_string(), true); - // TODO(plugin): dispatch PostFileDownloadEvent + return Ok(PhpMixed::Bool(true)); + } - let decoded = response.decode_json()?; - let mut data: IndexMap<String, PhpMixed> = decoded - .as_array() - .map(|a| a.iter().map(|(k, v)| (k.clone(), (**v).clone())).collect()) - .unwrap_or_default(); - let io_ref = unsafe { &*io_ptr }; - HttpDownloader::output_warnings(io_ref, &url_owned, &data); + // TODO(plugin): dispatch PostFileDownloadEvent - let last_modified_date = response.get_header("last-modified"); - response.collect(); - if let Some(lmd) = last_modified_date { - data.insert("last-modified".to_string(), PhpMixed::String(lmd)); - let as_mixed = PhpMixed::Array( - data.iter() - .map(|(k, v)| (k.clone(), Box::new(v.clone()))) - .collect(), - ); - json = JsonFile::encode( - &as_mixed, - JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE, - ); - } - let is_ro = unsafe { (*cache_ptr).is_read_only() }; - if !is_ro { - unsafe { - (*cache_ptr).write(&cache_key_owned, &json); - } - } - unsafe { - (*fresh_metadata_ptr).insert(filename.clone(), true); - } + let decoded = response.decode_json()?; + let mut data: IndexMap<String, PhpMixed> = decoded + .as_array() + .map(|a| a.iter().map(|(k, v)| (k.clone(), (**v).clone())).collect()) + .unwrap_or_default(); + HttpDownloader::output_warnings(self.io.as_ref(), &self.url, &data); - Ok(PhpMixed::Array( - data.into_iter().map(|(k, v)| (k, Box::new(v))).collect(), - )) - } - }; + let last_modified_date = response.get_header("last-modified"); + response.collect(); + if let Some(lmd) = last_modified_date { + data.insert("last-modified".to_string(), PhpMixed::String(lmd)); + let as_mixed = PhpMixed::Array( + data.iter() + .map(|(k, v)| (k.clone(), Box::new(v.clone()))) + .collect(), + ); + json = JsonFile::encode(&as_mixed, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); + } + if !self.cache.is_read_only() { + self.cache.write(cache_key, &json); + } + self.freshMetadataUrls.insert(filename.to_string(), true); - let reject = { - let filename = filename_for_closures.clone(); - let url_owned = url_owned.clone(); - let last_modified_time = last_modified_time_owned.clone(); - let accept_clone = accept.clone(); - move |e: anyhow::Error| -> anyhow::Result<PhpMixed> { - if let Some(te) = e.downcast_ref::<TransportException>() { - if te.get_status_code() == Some(404) { - unsafe { - (*packages_not_found_ptr).insert(filename.clone(), true); - } + Ok(PhpMixed::Array( + data.into_iter().map(|(k, v)| (k, Box::new(v))).collect(), + )) + } - return Ok(PhpMixed::Bool(false)); - } - } + /// The onRejected handler of `asyncFetchFile`: marks the package as not found / the repo as + /// degraded, and fakes a 304/404 response from cache where appropriate. + fn async_fetch_file_reject( + &mut self, + e: anyhow::Error, + filename: &str, + cache_key: &str, + last_modified_time: Option<&str>, + ) -> anyhow::Result<PhpMixed> { + if let Some(te) = e.downcast_ref::<TransportException>() { + if te.get_status_code() == Some(404) { + self.packagesNotFoundCache + .insert(filename.to_string(), true); - let is_degraded = unsafe { *degraded_ptr }; - if !is_degraded { - let io_ref = unsafe { &*io_ptr }; - io_ref.write_error(&format!( - "<warning>{} could not be fully loaded ({}), package information was loaded from the local cache and may be out of date</warning>", - url_owned, - e.to_string() - )); - } - unsafe { - *degraded_ptr = true; - } + return Ok(PhpMixed::Bool(false)); + } + } - // if the file is in the cache, we fake a 304 Not Modified to allow the process to continue - if last_modified_time.is_some() { - let resp = Response::new_fake(&url_owned, 304, IndexMap::new(), String::new()); - return accept_clone(resp.to_php_mixed()); - } + if !self.degraded_mode { + self.io.write_error(&format!( + "<warning>{} could not be fully loaded ({}), package information was loaded from the local cache and may be out of date</warning>", + self.url, + e + )); + } + self.degraded_mode = true; - // special error code returned when network is being artificially disabled - if let Some(te) = e.downcast_ref::<TransportException>() { - if te.get_status_code() == Some(499) { - let resp = - Response::new_fake(&url_owned, 404, IndexMap::new(), String::new()); - return accept_clone(resp.to_php_mixed()); - } - } + // if the file is in the cache, we fake a 304 Not Modified to allow the process to continue + if last_modified_time.is_some() { + let resp = Response::new_fake(&self.url, 304, IndexMap::new(), String::new()); + return self.async_fetch_file_accept(resp, filename, cache_key); + } - Err(e) + // special error code returned when network is being artificially disabled + if let Some(te) = e.downcast_ref::<TransportException>() { + if te.get_status_code() == Some(499) { + let resp = Response::new_fake(&self.url, 404, IndexMap::new(), String::new()); + return self.async_fetch_file_accept(resp, filename, cache_key); } - }; + } - let initial = self - .http_downloader - .borrow_mut() - .add(&filename, options.clone())?; - Ok(initial.then_with_reject_boxed(Box::new(accept), Box::new(reject))) + Err(e) } /// This initializes the packages key of a partial packages.json that contain some packages inlined + a providers-lazy-url @@ -3610,7 +3496,3 @@ fn dyn_clone_box(_pkg: &dyn BasePackage) -> Box<dyn BasePackage> { fn dyn_clone_constraint(_c: &dyn ConstraintInterface) -> Box<dyn ConstraintInterface> { todo!() } - -fn react_promise_resolve(_value: PhpMixed) -> Box<dyn PromiseInterface> { - todo!() -} diff --git a/crates/shirabe/src/util/filesystem.rs b/crates/shirabe/src/util/filesystem.rs index 1bf4673..73f21d6 100644 --- a/crates/shirabe/src/util/filesystem.rs +++ b/crates/shirabe/src/util/filesystem.rs @@ -149,43 +149,26 @@ impl Filesystem { vec!["rm".to_string(), "-rf".to_string(), directory.to_string()] }; - // TODO(phase-c-promise): execute_async is now async fn -> Result<Process>; the .then_boxed continuation that - // inspects the process result and flattens into a recursive removeDirectoryPhp fallback needs job-machine - // boundary design before it can become a flat await chain. - let promise = self.get_process().execute_async( - PhpMixed::List( - cmd.iter() - .map(|s| Box::new(PhpMixed::String(s.clone()))) - .collect(), - ), - (), - )?; + let process = self + .get_process() + .execute_async( + PhpMixed::List( + cmd.iter() + .map(|s| Box::new(PhpMixed::String(s.clone()))) + .collect(), + ), + (), + ) + .await?; - let directory_owned = directory.to_string(); - // TODO(plugin): closure capture of $this in PHP — port wires the same logic via a callback handle. - Ok(promise.then_boxed( - Some(Box::new( - move |process: PhpMixed| -> Box<dyn PromiseInterface> { - // clear stat cache because external processes aren't tracked by the php stat cache - clearstatcache2(false, ""); + // clear stat cache because external processes aren't tracked by the php stat cache + clearstatcache2(false, ""); - // TODO(phase-b): ArrayObject has no call_method; PHP-side calls $process->isSuccessful(). - let is_successful = matches!(process, PhpMixed::Bool(true)); - if is_successful && !is_dir(&directory_owned) { - return shirabe_external_packages::react::promise::resolve(Some( - PhpMixed::Bool(true), - )); - } + if process.is_successful() && !is_dir(directory) { + return Ok(true); + } - // PHP: \React\Promise\resolve($this->removeDirectoryPhp($directory)) - // The recursive PHP call doesn't have a clean async equivalent; we resort to a sync call. - let mut fs = Filesystem::new(None); - let res = fs.remove_directory_php(&directory_owned).unwrap_or(false); - shirabe_external_packages::react::promise::resolve(Some(PhpMixed::Bool(res))) - }, - )), - None, - )) + self.remove_directory_php(directory) } /// Returns null when no edge case was hit. Otherwise a bool whether removal was successful diff --git a/crates/shirabe/src/util/http/curl_downloader.rs b/crates/shirabe/src/util/http/curl_downloader.rs index bd6817c..c3a1227 100644 --- a/crates/shirabe/src/util/http/curl_downloader.rs +++ b/crates/shirabe/src/util/http/curl_downloader.rs @@ -36,7 +36,6 @@ use crate::util::Url; use crate::util::http::CurlResponse; use crate::util::http::ProxyManager; use crate::util::{AuthHelper, PromptAuthResult, StoreAuth}; -// use shirabe_external_packages::react::promise::Promise; // typehint only in PHP /// @phpstan-type Attributes array{retryAuthFailure: bool, redirects: int<0, max>, retries: int<0, max>, storeAuth: 'prompt'|bool, ipResolve: 4|6|null} /// @phpstan-type Job array{url: non-empty-string, origin: string, attributes: Attributes, options: mixed[], progress: mixed[], curlHandle: \CurlHandle, filename: string|null, headerHandle: resource, bodyHandle: resource, resolve: callable, reject: callable, primaryIp: string} diff --git a/crates/shirabe/src/util/http_downloader.rs b/crates/shirabe/src/util/http_downloader.rs index 6f769ce..b1cf921 100644 --- a/crates/shirabe/src/util/http_downloader.rs +++ b/crates/shirabe/src/util/http_downloader.rs @@ -16,9 +16,9 @@ use crate::composer; use crate::composer::ComposerHandle; use crate::config::Config; use crate::downloader::TransportException; -use crate::exception::IrrecoverableDownloadException; use crate::io::IOInterface; use crate::package::version::VersionParser; +use crate::util::GetResult; use crate::util::Platform; use crate::util::RemoteFilesystem; use crate::util::StreamContextFactory; @@ -60,11 +60,13 @@ struct Job { request: Request, sync: bool, origin: String, - resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, - reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, curl_id: Option<i64>, response: Option<Response>, exception: Option<anyhow::Error>, + /// Completion slot written by the curl resolve/reject closures (driven by `curl.tick()`) + /// and read by `count_active_jobs`. Uses `Arc<Mutex>` because `CurlDownloader::download` + /// requires `Send + Sync` callbacks. + settled: std::sync::Arc<std::sync::Mutex<Option<anyhow::Result<Response>>>>, } impl std::fmt::Debug for Job { @@ -178,7 +180,7 @@ impl HttpDownloader { } .into()); } - let (job, promise) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -186,13 +188,6 @@ impl HttpDownloader { }, true, )?; - promise.then_with( - None, - Some(Box::new(|_e: PhpMixed| { - // suppress error as it is rethrown to the caller by getResponse() a few lines below - PhpMixed::Null - })), - ); self.wait_id(Some(job.id))?; let response = self.get_response(job.id)?; @@ -213,7 +208,7 @@ impl HttpDownloader { } .into()); } - let (_, promise) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -221,8 +216,9 @@ impl HttpDownloader { }, false, )?; + self.wait_id(Some(job.id))?; - Ok(promise) + self.get_response(job.id) } /// Copy a file synchronously @@ -239,7 +235,7 @@ impl HttpDownloader { } .into()); } - let (job, _) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -266,7 +262,7 @@ impl HttpDownloader { } .into()); } - let (_, promise) = self.add_job( + let job = self.add_job( Request { url: url.to_string(), options, @@ -274,8 +270,9 @@ impl HttpDownloader { }, false, )?; + self.wait_id(Some(job.id))?; - Ok(promise) + self.get_response(job.id) } /// Retrieve the options set in the constructor @@ -289,27 +286,17 @@ impl HttpDownloader { } /// @phpstan-param Request $request - /// @return array{Job, PromiseInterface} - async fn add_job(&mut self, mut request: Request, sync: bool) -> Result<(JobHandle, Response)> { + /// + /// Queues a job and starts it if there is capacity. Mirrors PHP `addJob`: for non-curl (rfs) + /// jobs the work runs synchronously here (PHP runs it in the Promise resolver during + /// construction); for curl jobs the work is driven later by `start_job` / `count_active_jobs`. + fn add_job(&mut self, mut request: Request, sync: bool) -> Result<JobHandle> { request.options = array_replace_recursive(self.options.clone(), request.options); let id = self.id_gen; self.id_gen += 1; let origin = Url::get_origin(&*self.config.borrow(), &request.url); - let job = Job { - id, - status: Self::STATUS_QUEUED, - request: request.clone(), - sync, - origin: origin.clone(), - resolve: None, - reject: None, - curl_id: None, - response: None, - exception: None, - }; - if !sync && !self.allow_async { return Err(LogicException { message: @@ -346,34 +333,124 @@ impl HttpDownloader { ); } - // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs; needs Rc<RefCell> wiring - let _ = (&self.rfs, &self.curl); - - let resolver: Box<dyn Fn(Box<dyn Fn(PhpMixed)>, Box<dyn Fn(PhpMixed)>)> = - Box::new(|_resolve, _reject| { - // TODO(phase-b) - }); - let canceler: Box<dyn Fn()> = Box::new(|| { - // PHP canceler logic — TODO(phase-b) - let _ = IrrecoverableDownloadException(shirabe_php_shim::RuntimeException { - message: "Download canceled".to_string(), - code: 0, - }); - let _ = Url::sanitize(String::new()); - }); - let _ = (resolver, canceler); - - let promise = Promise::new(Box::new(|_resolve, _reject| {})); - // TODO(phase-b): wire promise.then() side-effects: mark job done & store response/exception - let promise: Box<dyn PromiseInterface> = Box::new(promise); - + let job = Job { + id, + status: Self::STATUS_QUEUED, + request: request.clone(), + sync, + origin, + curl_id: None, + response: None, + exception: None, + settled: std::sync::Arc::new(std::sync::Mutex::new(None)), + }; + let can_use_curl = self.can_use_curl(&job); self.jobs.insert(id, job); + // PHP runs the resolver synchronously while constructing the Promise. For non-curl jobs + // the resolver performs the blocking RemoteFilesystem download and resolves immediately. + if !can_use_curl { + self.run_rfs_job(id); + } + if self.running_jobs < self.max_jobs { self.start_job(id); } - Ok((JobHandle { id }, promise)) + Ok(JobHandle { id }) + } + + /// Mirrors the non-curl branch of PHP `addJob`'s Promise resolver plus the `.then` side + /// effects: performs the blocking RemoteFilesystem download and settles the job. + fn run_rfs_job(&mut self, id: i64) { + let (request, origin, url, options, copy_to) = { + let job = self.jobs.get(&id).unwrap(); + ( + job.request.clone(), + job.origin.clone(), + job.request.url.clone(), + job.request.options.clone(), + job.request.copy_to.clone(), + ) + }; + + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_STARTED; + } + + let result: anyhow::Result<Response> = { + let rfs = self.rfs.as_mut().unwrap(); + (|| -> anyhow::Result<Response> { + if let Some(copy_to) = copy_to.as_deref() { + rfs.copy(&origin, &url, copy_to, false, options.clone())?; + + let headers = rfs.get_last_headers().to_vec(); + let code = RemoteFilesystem::find_status_code(&headers); + let body = Some(format!("{}~", copy_to)); + match Response::new(Self::request_to_map(&request), code, headers, body)? { + Ok(r) => Ok(r), + Err(e) => Err(e.into()), + } + } else { + let body = match rfs.get_contents(&origin, &url, false, options.clone())? { + GetResult::Content(s) => Some(s), + _ => None, + }; + let headers = rfs.get_last_headers().to_vec(); + let code = RemoteFilesystem::find_status_code(&headers); + match Response::new(Self::request_to_map(&request), code, headers, body)? { + Ok(r) => Ok(r), + Err(e) => Err(e.into()), + } + } + })() + }; + + self.settle_job(id, result); + } + + /// PHP `new Response($job['request'], ...)` is fed the whole request array; reproduce it. + fn request_to_map(request: &Request) -> IndexMap<String, PhpMixed> { + let mut m: IndexMap<String, PhpMixed> = IndexMap::new(); + m.insert("url".to_string(), PhpMixed::String(request.url.clone())); + m.insert( + "options".to_string(), + PhpMixed::Array( + request + .options + .iter() + .map(|(k, v)| (k.clone(), Box::new(v.clone()))) + .collect(), + ), + ); + m.insert( + "copyTo".to_string(), + match &request.copy_to { + Some(s) => PhpMixed::String(s.clone()), + None => PhpMixed::Null, + }, + ); + m + } + + /// Applies the effect of PHP's promise `.then` handlers: records the response/exception, + /// transitions the job status and decrements the running-job counter. + fn settle_job(&mut self, id: i64, result: anyhow::Result<Response>) { + match result { + Ok(response) => { + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_COMPLETED; + job.response = Some(response); + } + } + Err(e) => { + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_FAILED; + job.exception = Some(e); + } + } + } + self.mark_job_done(); } fn start_job(&mut self, id: i64) { @@ -398,7 +475,6 @@ impl HttpDownloader { }; let url = request.url.clone(); let options = request.options.clone(); - let _ = origin; if self.disabled { let has_if_modified_since = { @@ -433,8 +509,13 @@ impl HttpDownloader { if has_if_modified_since { let mut req_map: IndexMap<String, PhpMixed> = IndexMap::new(); req_map.insert("url".to_string(), PhpMixed::String(url.clone())); - let _ = Response::new(req_map, Some(304), Vec::new(), Some(String::new())); - // job.resolve(response) — TODO(phase-b) + let response = + match Response::new(req_map, Some(304), Vec::new(), Some(String::new())) { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e.into()), + Err(e) => Err(e), + }; + self.settle_job(id, response); } else { let mut e = TransportException::new( format!( @@ -444,15 +525,45 @@ impl HttpDownloader { 499, ); e.set_status_code(Some(499)); - // job.reject(e) — TODO(phase-b) - let _ = e; + self.settle_job(id, Err(e.into())); } return; } - let _ = copy_to; - // TODO(phase-b): try { curl->download(...) } catch (...) { reject(e) } + // curl branch: register the request with the curl multi handle. Completion is delivered + // asynchronously by curl.tick() into the job's `settled` slot (read by count_active_jobs). + // PHP catches any exception from download() and rejects the job. + let settled = self.jobs.get(&id).unwrap().settled.clone(); + let settled_for_reject = settled.clone(); + let resolve: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_response: PhpMixed| { + // TODO(phase-c-promise): curl.tick() delivers the response as PhpMixed here; convert it + // into a Response and store Ok(..) in `settled`. Bottoms at the todo!() curl I/O. + let _ = &settled; + }); + let reject: Box<dyn Fn(PhpMixed) + Send + Sync> = Box::new(move |_error: PhpMixed| { + // TODO(phase-c-promise): convert the PhpMixed error into anyhow::Error and store + // Err(..) in `settled`. Bottoms at the todo!() curl I/O. + let _ = &settled_for_reject; + }); + + let download_result = { + let curl = self.curl.as_mut().unwrap(); + curl.download(resolve, reject, &origin, &url, options, copy_to.as_deref()) + }; + match download_result { + Ok(Ok(curl_id)) => { + if let Some(job) = self.jobs.get_mut(&id) { + job.curl_id = Some(curl_id); + } + } + Ok(Err(e)) => { + self.settle_job(id, Err(e.into())); + } + Err(e) => { + self.settle_job(id, Err(e)); + } + } } fn mark_job_done(&mut self) { @@ -468,7 +579,7 @@ impl HttpDownloader { fn wait_id(&mut self, index: Option<i64>) -> Result<()> { loop { - let job_count = self.count_active_jobs(index); + let job_count = self.count_active_jobs(index)?; if job_count == 0 { break; } @@ -482,7 +593,7 @@ impl HttpDownloader { } /// @internal - pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 { + pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> { if self.running_jobs < self.max_jobs { let queued_ids: Vec<i64> = self .jobs @@ -499,16 +610,32 @@ impl HttpDownloader { } if let Some(curl) = self.curl.as_mut() { - curl.tick(); + curl.tick()?; + } + + // Apply completions delivered by curl.tick() into each started job's `settled` slot. + // This reproduces the effect of PHP's resolve/reject callbacks firing during tick(). + let started_ids: Vec<i64> = self + .jobs + .values() + .filter(|j| j.status == Self::STATUS_STARTED) + .map(|j| j.id) + .collect(); + for id in started_ids { + let settled = self.jobs.get(&id).unwrap().settled.lock().unwrap().take(); + if let Some(result) = settled { + self.settle_job(id, result); + } } if let Some(index) = index { - return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED - { - 1 - } else { - 0 - }; + return Ok( + if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED { + 1 + } else { + 0 + }, + ); } let mut active: i64 = 0; @@ -525,7 +652,7 @@ impl HttpDownloader { } } - active + Ok(active) } /// @param int $index Job id diff --git a/crates/shirabe/src/util/loop.rs b/crates/shirabe/src/util/loop.rs index 18f2359..5512211 100644 --- a/crates/shirabe/src/util/loop.rs +++ b/crates/shirabe/src/util/loop.rs @@ -3,25 +3,12 @@ use crate::util::HttpDownloader; use crate::util::ProcessExecutor; use anyhow::Result; -use indexmap::IndexMap; use shirabe_external_packages::symfony::component::console::helper::ProgressBar; -use shirabe_php_shim::microtime; +#[derive(Debug)] pub struct Loop { http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>, process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>, - current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>, - wait_index: i64, -} - -impl std::fmt::Debug for Loop { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Loop") - .field("http_downloader", &self.http_downloader) - .field("process_executor", &self.process_executor) - .field("wait_index", &self.wait_index) - .finish() - } } impl Loop { @@ -39,8 +26,6 @@ impl Loop { Self { http_downloader, process_executor, - current_promises: IndexMap::new(), - wait_index: 0, } } @@ -54,65 +39,25 @@ impl Loop { self.process_executor.as_ref() } - pub fn wait( + pub async fn wait<'p>( &mut self, - promises: Vec<Box<dyn PromiseInterface>>, - mut progress: Option<&mut ProgressBar>, + promises: Vec<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'p>>>, + _progress: Option<&mut ProgressBar>, ) -> Result<()> { - let uncaught: Option<anyhow::Error> = None; - - // TODO(phase-b): Promise::then captures uncaught by Fn; needs a Cell/RefCell wrapper - // and a thunk that matches FnOnce(Option<PhpMixed>) -> Option<PhpMixed>. - let _ = shirabe_external_packages::react::promise::all( - promises - .iter() - .map(|_| todo!("clone Box<dyn PromiseInterface>")) - .collect(), - ); - - // keep track of every group of promises that is waited on, so abortJobs can - // cancel them all, even if wait() was called within a wait() - let wait_index = self.wait_index; - self.wait_index += 1; - self.current_promises.insert(wait_index, promises); - - if let Some(ref mut progress) = progress { - let mut total_jobs: i64 = 0; - total_jobs += self.http_downloader.borrow_mut().count_active_jobs(None); - if let Some(ref pe) = self.process_executor { - total_jobs += pe.borrow_mut().count_active_jobs(None); - } - progress.start(Some(total_jobs)); - } - - let mut last_update: f64 = 0.0; - loop { - let mut active_jobs: i64 = 0; + let mut uncaught: Option<anyhow::Error> = None; - active_jobs += self.http_downloader.borrow_mut().count_active_jobs(None); - if let Some(ref pe) = self.process_executor { - active_jobs += pe.borrow_mut().count_active_jobs(None); - } - - if let Some(ref mut progress) = progress { - if microtime(true) - last_update > 0.1 { - last_update = microtime(true); - let new_progress = progress.get_max_steps() - active_jobs; - progress.set_progress(new_progress); + // TODO(phase-c-promise): the asynchronous worker classes (HttpDownloader / ProcessExecutor) + // run single-threaded for now, so the promises are consumed serially. Once the workers run + // on a multi-thread runtime these futures should be driven concurrently instead of in order. + // The PHP progress bar is tied to the worker active-job count and is also deferred until then. + for promise in promises { + if let Err(e) = promise.await { + if uncaught.is_none() { + uncaught = Some(e); } } - - if active_jobs == 0 { - break; - } } - // as we skip progress updates if they are too quick, make sure we do one last one here at 100% - if let Some(ref mut progress) = progress { - progress.finish(); - } - - self.current_promises.remove(&wait_index); if let Some(e) = uncaught { return Err(e); } @@ -121,11 +66,8 @@ impl Loop { } pub fn abort_jobs(&self) { - for promise_group in self.current_promises.values() { - for _promise in promise_group { - // TODO(phase-b): cancel requires CancellablePromiseInterface; PromiseInterface trait - // doesn't expose it. Drop the wrap+cancel until we have the right trait. - } - } + // TODO(phase-c-promise): no-op until a cancellation mechanism is introduced. PHP cancels + // every in-flight promise group it tracks in $currentPromises; reintroduce that tracking + // once the asynchronous workers support cancellation on a multi-thread runtime. } } diff --git a/crates/shirabe/src/util/process_executor.rs b/crates/shirabe/src/util/process_executor.rs index 1ee821c..74fc30c 100644 --- a/crates/shirabe/src/util/process_executor.rs +++ b/crates/shirabe/src/util/process_executor.rs @@ -53,8 +53,7 @@ struct Job { command: PhpMixed, cwd: Option<String>, process: Option<Process>, - resolve: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, - reject: Option<Box<dyn Fn(PhpMixed) + Send + Sync>>, + exception: Option<anyhow::Error>, } impl std::fmt::Debug for Job { @@ -396,32 +395,30 @@ impl ProcessExecutor { command, cwd: cwd_opt, process: None, - resolve: None, - reject: None, + exception: None, }; - // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs - let resolver: Box<dyn Fn(Option<PhpMixed>, Option<PhpMixed>)> = - Box::new(|_resolve, _reject| {}); - let canceler: Box<dyn Fn()> = Box::new(|| { - if defined("SIGINT") { - // job.process.signal(SIGINT) - } - // job.process.stop(1) - }); - let _ = (resolver, canceler); - - let promise = Promise::new(Box::new(|_resolve, _reject| {})); - // TODO(phase-b): wire promise.then() side-effects: mark job done & update status - let promise: Box<dyn PromiseInterface> = Box::new(promise); - self.jobs.insert(id, job); if self.running_jobs < self.max_jobs { self.start_job(id); } - Ok(promise) + // Drive the job to completion (serial pump). PHP resolves the promise with the Process + // once it stops running; here we await by pumping count_active_jobs and then hand back the + // Process (or the rejection captured during start_job). + self.wait_id(Some(id))?; + + let mut job = self.jobs.shift_remove(&id).unwrap(); + if let Some(process) = job.process.take() { + Ok(process) + } else if let Some(e) = job.exception.take() { + Err(e) + } else { + Err(anyhow::anyhow!( + "ProcessExecutor async job completed without a process" + )) + } } fn output_handler(&mut self, r#type: &str, buffer: &str) { @@ -496,8 +493,13 @@ impl ProcessExecutor { })(); let process = match process_result { Ok(p) => p, - Err(_e) => { - // job.reject(e) — TODO(phase-b) + Err(e) => { + // PHP: $job['reject']($e) — record the rejection and settle the job as failed. + if let Some(job) = self.jobs.get_mut(&id) { + job.status = Self::STATUS_FAILED; + job.exception = Some(e); + } + self.mark_job_done(); return; } }; @@ -544,7 +546,7 @@ impl ProcessExecutor { pub fn wait_id(&mut self, index: Option<i64>) -> Result<()> { loop { - if 0 == self.count_active_jobs(index) { + if 0 == self.count_active_jobs(index)? { return Ok(()); } @@ -558,7 +560,7 @@ impl ProcessExecutor { } /// @internal - pub fn count_active_jobs(&mut self, index: Option<i64>) -> i64 { + pub fn count_active_jobs(&mut self, index: Option<i64>) -> Result<i64> { // tick let ids: Vec<i64> = self.jobs.keys().copied().collect(); for id in &ids { @@ -575,18 +577,26 @@ impl ProcessExecutor { .map(|p| p.is_running()) .unwrap_or(false); if !is_running { - if let Some(job) = self.jobs.get(id) { - if let Some(resolve) = job.resolve.as_ref() { - let process_mixed = PhpMixed::Null; // TODO(phase-b): wrap Process as PhpMixed - resolve(process_mixed); - } + // PHP: call_user_func($job['resolve'], $job['process']) — the .then handler + // marks the job completed/failed based on the process exit status. + let successful = self + .jobs + .get(id) + .and_then(|j| j.process.as_ref()) + .map(|p| p.is_successful()) + .unwrap_or(false); + if let Some(job) = self.jobs.get_mut(id) { + job.status = if successful { + Self::STATUS_COMPLETED + } else { + Self::STATUS_FAILED + }; } + self.mark_job_done(); } - if let Some(job) = self.jobs.get_mut(id) { - if let Some(p) = job.process.as_mut() { - p.check_timeout(); - } + if let Some(p) = self.jobs.get(id).and_then(|j| j.process.as_ref()) { + p.check_timeout()?; } } } @@ -600,12 +610,13 @@ impl ProcessExecutor { } if let Some(index) = index { - return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED - { - 1 - } else { - 0 - }; + return Ok( + if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED { + 1 + } else { + 0 + }, + ); } let mut active: i64 = 0; @@ -619,7 +630,7 @@ impl ProcessExecutor { } } - active + Ok(active) } fn mark_job_done(&mut self) { diff --git a/crates/shirabe/src/util/sync_helper.rs b/crates/shirabe/src/util/sync_helper.rs index 4c48c5c..7388b96 100644 --- a/crates/shirabe/src/util/sync_helper.rs +++ b/crates/shirabe/src/util/sync_helper.rs @@ -86,7 +86,6 @@ impl<'a> DownloaderOrManager<'a> { pub struct SyncHelper; impl SyncHelper { - // TODO(phase-c-promise): synchronous wrapper driving now-async downloader calls via Self::await (loop.wait); needs async/loop boundary design. pub fn download_and_install_package_sync( r#loop: &std::rc::Rc<std::cell::RefCell<Loop>>, downloader: DownloaderOrManager<'_>, @@ -100,21 +99,41 @@ impl SyncHelper { "install" }; - let result: Result<()> = (|| { + let result: Result<()> = (|| -> Result<()> { Self::r#await( r#loop, - Some(downloader.download(package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .download(package, &path, prev_package) + .await + .map(|_| ()) + })), )?; Self::r#await( r#loop, - Some(downloader.prepare(r#type, package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .prepare(r#type, package, &path, prev_package) + .await + .map(|_| ()) + })), )?; if r#type == "update" { if let Some(prev) = prev_package { - Self::r#await(r#loop, Some(downloader.update(package, prev, &path)?))?; + Self::r#await( + r#loop, + Some(Box::pin(async { + downloader.update(package, prev, &path).await.map(|_| ()) + })), + )?; } } else { - Self::r#await(r#loop, Some(downloader.install(package, &path)?))?; + Self::r#await( + r#loop, + Some(Box::pin(async { + downloader.install(package, &path).await.map(|_| ()) + })), + )?; } Ok(()) })(); @@ -122,25 +141,36 @@ impl SyncHelper { if result.is_err() { Self::r#await( r#loop, - Some(downloader.cleanup(r#type, package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .cleanup(r#type, package, &path, prev_package) + .await + .map(|_| ()) + })), )?; return result; } Self::r#await( r#loop, - Some(downloader.cleanup(r#type, package, &path, prev_package)?), + Some(Box::pin(async { + downloader + .cleanup(r#type, package, &path, prev_package) + .await + .map(|_| ()) + })), )?; Ok(()) } - // TODO(phase-c-promise): loop-pump synchronous wait over a promise; driving mechanism needs design. pub fn r#await( r#loop: &std::rc::Rc<std::cell::RefCell<Loop>>, - promise: Option<Box<dyn PromiseInterface>>, + promise: Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + '_>>>, ) -> Result<()> { if let Some(promise) = promise { - r#loop.borrow_mut().wait(vec![promise], None)?; + tokio::runtime::Runtime::new() + .unwrap() + .block_on(r#loop.borrow_mut().wait(vec![promise], None))?; } Ok(()) } |
