//! ref: composer/src/Composer/Util/HttpDownloader.php use anyhow::Result; use indexmap::IndexMap; use crate::util::Silencer; use shirabe_external_packages::composer::pcre::{CaptureKey, Preg}; use shirabe_external_packages::react::promise::Promise; use shirabe_external_packages::react::promise::PromiseInterface; use shirabe_php_shim::{ InvalidArgumentException, LogicException, PhpMixed, array_replace_recursive, chr, extension_loaded, file_get_contents, function_exists, implode, is_numeric, max, min, rawurldecode, stream_context_create, stripos, strpos, substr, ucfirst, }; use shirabe_semver::constraint::Constraint; use crate::composer::Composer; use crate::config::Config; use crate::downloader::TransportException; use crate::exception::IrrecoverableDownloadException; use crate::io::IOInterface; use crate::package::version::VersionParser; use crate::util::Platform; use crate::util::RemoteFilesystem; use crate::util::StreamContextFactory; use crate::util::Url; use crate::util::http::CurlDownloader; use crate::util::http::Response; /// @phpstan-type Request array{url: non-empty-string, options: mixed[], copyTo: string|null} /// @phpstan-type Job array{id: int, status: int, request: Request, sync: bool, origin: string, resolve?: callable, reject?: callable, curl_id?: int, response?: Response, exception?: \Throwable} #[derive(Debug)] pub struct HttpDownloader { /// @var IOInterface io: Box, /// @var Config config: std::rc::Rc>, /// @var array jobs: IndexMap, /// @var mixed[] options: IndexMap, /// @var int running_jobs: i64, /// @var int max_jobs: i64, /// @var ?CurlDownloader curl: Option, /// @var ?RemoteFilesystem rfs: Option, /// @var int id_gen: i64, /// @var bool disabled: bool, /// @var bool allow_async: bool, } struct Job { id: i64, status: i64, request: Request, sync: bool, origin: String, resolve: Option>, reject: Option>, curl_id: Option, response: Option, exception: Option, } impl std::fmt::Debug for Job { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Job") .field("id", &self.id) .field("status", &self.status) .field("request", &self.request) .field("sync", &self.sync) .field("origin", &self.origin) .field("curl_id", &self.curl_id) .field("response", &self.response) .field("exception", &self.exception) .finish() } } #[derive(Debug, Clone)] struct Request { url: String, options: IndexMap, copy_to: Option, } impl HttpDownloader { const STATUS_QUEUED: i64 = 1; const STATUS_STARTED: i64 = 2; const STATUS_COMPLETED: i64 = 3; const STATUS_FAILED: i64 = 4; const STATUS_ABORTED: i64 = 5; /// @param IOInterface $io The IO instance /// @param Config $config The config /// @param mixed[] $options The options pub fn new( io: Box, config: std::rc::Rc>, options: IndexMap, disable_tls: bool, ) -> Self { let disabled = Platform::get_env("COMPOSER_DISABLE_NETWORK") .map_or(false, |s| !s.is_empty() && s != "0"); // Setup TLS options // The cafile option can be set via config.json let mut self_options: IndexMap = IndexMap::new(); if disable_tls == false { self_options = StreamContextFactory::get_tls_defaults(&options, Some(&*io)).unwrap_or_default(); } // handle the other externally set options normally. self_options = array_replace_recursive(self_options, options.clone()); let curl = if Self::is_curl_enabled() { Some(CurlDownloader::new( io.clone_box(), std::rc::Rc::clone(&config), options.clone(), disable_tls, )) } else { None }; let rfs = Some(RemoteFilesystem::new( io.clone_box(), std::rc::Rc::clone(&config), options.clone(), disable_tls, None, )); let mut max_jobs: i64 = 12; let max_jobs_env = Platform::get_env("COMPOSER_MAX_PARALLEL_HTTP"); let max_jobs_env_mixed = match &max_jobs_env { Some(s) => PhpMixed::String(s.clone()), None => PhpMixed::Bool(false), }; if is_numeric(&max_jobs_env_mixed) { max_jobs = max( 1, min( 50, max_jobs_env.as_deref().unwrap_or("0").parse().unwrap_or(0), ), ); } Self { io, config, jobs: IndexMap::new(), options: self_options, running_jobs: 0, max_jobs, curl, rfs, id_gen: 0, disabled, allow_async: false, } } /// Download a file synchronously pub fn get(&mut self, url: &str, options: IndexMap) -> Result { if "" == url { return Err(InvalidArgumentException { message: "$url must not be an empty string".to_string(), code: 0, } .into()); } let (job, promise) = self.add_job( Request { url: url.to_string(), options, copy_to: None, }, true, )?; promise.then_with( None, Some(Box::new(|_e: PhpMixed| { // suppress error as it is rethrown to the caller by getResponse() a few lines below PhpMixed::Null })), ); self.wait_id(Some(job.id))?; let response = self.get_response(job.id)?; Ok(response) } /// Create an async download operation pub fn add( &mut self, url: &str, options: IndexMap, ) -> Result> { if "" == url { return Err(InvalidArgumentException { message: "$url must not be an empty string".to_string(), code: 0, } .into()); } let (_, promise) = self.add_job( Request { url: url.to_string(), options, copy_to: None, }, false, )?; Ok(promise) } /// Copy a file synchronously pub fn copy( &mut self, url: &str, to: &str, options: IndexMap, ) -> Result { if "" == url { return Err(InvalidArgumentException { message: "$url must not be an empty string".to_string(), code: 0, } .into()); } let (job, _) = self.add_job( Request { url: url.to_string(), options, copy_to: Some(to.to_string()), }, true, )?; self.wait_id(Some(job.id))?; self.get_response(job.id) } /// Create an async copy operation pub fn add_copy( &mut self, url: &str, to: &str, options: IndexMap, ) -> Result> { if "" == url { return Err(InvalidArgumentException { message: "$url must not be an empty string".to_string(), code: 0, } .into()); } let (_, promise) = self.add_job( Request { url: url.to_string(), options, copy_to: Some(to.to_string()), }, false, )?; Ok(promise) } /// Retrieve the options set in the constructor pub fn get_options(&self) -> &IndexMap { &self.options } /// Merges new options pub fn set_options(&mut self, options: IndexMap) { self.options = array_replace_recursive(self.options.clone(), options); } /// @phpstan-param Request $request /// @return array{Job, PromiseInterface} fn add_job( &mut self, mut request: Request, sync: bool, ) -> Result<(JobHandle, Box)> { request.options = array_replace_recursive(self.options.clone(), request.options); let id = self.id_gen; self.id_gen += 1; let origin = Url::get_origin(&*self.config.borrow(), &request.url); let job = Job { id, status: Self::STATUS_QUEUED, request: request.clone(), sync, origin: origin.clone(), resolve: None, reject: None, curl_id: None, response: None, exception: None, }; if !sync && !self.allow_async { return Err(LogicException { message: "You must use the HttpDownloader instance which is part of a Composer\\Loop instance to be able to run async http requests" .to_string(), code: 0, } .into()); } // capture username/password from URL if there is one let mut m: IndexMap = IndexMap::new(); if Preg::is_match_strict_groups3( r"{^https?://([^:/]+):([^@/]+)@([^/]+)}i", &request.url, Some(&mut m), ) .unwrap_or(false) { self.io.set_authentication( origin.clone(), rawurldecode( m.get(&CaptureKey::ByIndex(1)) .cloned() .unwrap_or_default() .as_str(), ), Some(rawurldecode( m.get(&CaptureKey::ByIndex(2)) .cloned() .unwrap_or_default() .as_str(), )), ); } // TODO(phase-b): build resolver/canceler closures bound to &mut self.jobs; needs Rc wiring let _ = (&self.rfs, &self.curl); let resolver: Box, Box)> = Box::new(|_resolve, _reject| { // TODO(phase-b) }); let canceler: Box = Box::new(|| { // PHP canceler logic — TODO(phase-b) let _ = IrrecoverableDownloadException(shirabe_php_shim::RuntimeException { message: "Download canceled".to_string(), code: 0, }); let _ = Url::sanitize(String::new()); }); let _ = (resolver, canceler); let promise = Promise::new(Box::new(|_resolve, _reject| {})); // TODO(phase-b): wire promise.then() side-effects: mark job done & store response/exception let promise: Box = Box::new(promise); self.jobs.insert(id, job); if self.running_jobs < self.max_jobs { self.start_job(id); } Ok((JobHandle { id }, promise)) } fn start_job(&mut self, id: i64) { let job_status = self.jobs.get(&id).map(|j| j.status); if job_status != Some(Self::STATUS_QUEUED) { return; } // start job if let Some(job) = self.jobs.get_mut(&id) { job.status = Self::STATUS_STARTED; } self.running_jobs += 1; let (request, origin, copy_to) = { let job = self.jobs.get(&id).unwrap(); ( job.request.clone(), job.origin.clone(), job.request.copy_to.clone(), ) }; let url = request.url.clone(); let options = request.options.clone(); let _ = origin; if self.disabled { let has_if_modified_since = { let http_header = options .get("http") .and_then(|v| match v { PhpMixed::Array(m) => m.get("header"), _ => None, }) .cloned(); if let Some(PhpMixed::List(list)) = http_header.as_deref() { let joined = implode( "", &list .iter() .map(|v| v.as_string().unwrap_or("").to_string()) .collect::>(), ); stripos(&joined, "if-modified-since").is_some() } else if let Some(PhpMixed::Array(m)) = http_header.as_deref() { let joined = implode( "", &m.values() .map(|v| v.as_string().unwrap_or("").to_string()) .collect::>(), ); stripos(&joined, "if-modified-since").is_some() } else { false } }; if has_if_modified_since { let mut req_map: IndexMap = IndexMap::new(); req_map.insert("url".to_string(), PhpMixed::String(url.clone())); let _ = Response::new(req_map, Some(304), Vec::new(), Some(String::new())); // job.resolve(response) — TODO(phase-b) } else { let mut e = TransportException::new( format!( "Network disabled, request canceled: {}", Url::sanitize(url.clone()) ), 499, ); e.set_status_code(Some(499)); // job.reject(e) — TODO(phase-b) let _ = e; } return; } let _ = copy_to; // TODO(phase-b): try { curl->download(...) } catch (...) { reject(e) } } fn mark_job_done(&mut self) { self.running_jobs -= 1; } /// Wait for current async download jobs to complete /// /// @param int|null $index For internal use only, the job id pub fn wait(&mut self) -> Result<()> { self.wait_id(None) } fn wait_id(&mut self, index: Option) -> Result<()> { loop { let job_count = self.count_active_jobs(index); if job_count == 0 { break; } } Ok(()) } /// @internal pub fn enable_async(&mut self) { self.allow_async = true; } /// @internal pub fn count_active_jobs(&mut self, index: Option) -> i64 { if self.running_jobs < self.max_jobs { let queued_ids: Vec = self .jobs .values() .filter(|j| j.status == Self::STATUS_QUEUED) .map(|j| j.id) .collect(); for id in queued_ids { if self.running_jobs >= self.max_jobs { break; } self.start_job(id); } } if let Some(curl) = self.curl.as_mut() { curl.tick(); } if let Some(index) = index { return if self.jobs.get(&index).map(|j| j.status).unwrap_or(0) < Self::STATUS_COMPLETED { 1 } else { 0 }; } let mut active: i64 = 0; let ids: Vec = self.jobs.keys().copied().collect(); for id in ids { let (status, sync) = { let j = self.jobs.get(&id).unwrap(); (j.status, j.sync) }; if status < Self::STATUS_COMPLETED { active += 1; } else if !sync { self.jobs.shift_remove(&id); } } active } /// @param int $index Job id fn get_response(&mut self, index: i64) -> Result { if !self.jobs.contains_key(&index) { return Err(LogicException { message: "Invalid request id".to_string(), code: 0, } .into()); } if self.jobs.get(&index).unwrap().status == Self::STATUS_FAILED { // PHP: assert(isset($this->jobs[$index]['exception'])) let mut job = self.jobs.shift_remove(&index).unwrap(); return Err(job.exception.take().unwrap()); } if self.jobs.get(&index).unwrap().response.is_none() { return Err(LogicException { message: "Response not available yet, call wait() first".to_string(), code: 0, } .into()); } let mut job = self.jobs.shift_remove(&index).unwrap(); let resp = job.response.take().unwrap(); Ok(resp) } /// @internal /// /// @param array{warning?: string, info?: string, warning-versions?: string, info-versions?: string, warnings?: array, infos?: array} $data pub fn output_warnings( io: &dyn IOInterface, url: &str, data: &IndexMap, ) -> Result<()> { let clean_message = |msg: &str| -> anyhow::Result { if !io.is_decorated() { return Preg::replace(&format!("{{{}{}}}u", chr(27), "\\[[;\\d]*m"), "", msg); } Ok(msg.to_string()) }; // legacy warning/info keys for r#type in ["warning", "info"].iter() { let entry = data.get(*r#type); if entry.is_none() || shirabe_php_shim::empty(entry.unwrap()) { continue; } let versions_key = format!("{}-versions", r#type); if let Some(versions_value) = data.get(&versions_key) { if !shirabe_php_shim::empty(versions_value) { // TODO(phase-b): VersionParser::new let version_parser: VersionParser = todo!("VersionParser::new()"); let constraint = version_parser .parse_constraints(versions_value.as_string().unwrap_or(""))?; let composer_constraint = Constraint::new( "==", &version_parser.normalize(&Composer::get_version(), None)?, ); if !constraint.matches(&composer_constraint) { continue; } } } io.write_error(&format!( "<{tp}>{capitalized} from {url}: {msg}", tp = r#type, capitalized = ucfirst(r#type), url = Url::sanitize(url.to_string()), msg = clean_message(entry.unwrap().as_string().unwrap_or(""))? )); } // modern Composer 2.2+ format with support for multiple warning/info messages for key in ["warnings", "infos"].iter() { let entry = data.get(*key); if entry.is_none() || shirabe_php_shim::empty(entry.unwrap()) { continue; } // TODO(phase-b): VersionParser::new let version_parser: VersionParser = todo!("VersionParser::new()"); if let Some(PhpMixed::List(list)) = entry { for spec in list { let r#type = substr(key, 0, Some(-1)); if let PhpMixed::Array(spec_map) = spec.as_ref() { let constraint = version_parser.parse_constraints( spec_map .get("versions") .and_then(|v| v.as_string()) .unwrap_or(""), )?; let composer_constraint = Constraint::new( "==", &version_parser.normalize(&Composer::get_version(), None)?, ); if !constraint.matches(&composer_constraint) { continue; } io.write_error(&format!( "<{tp}>{capitalized} from {url}: {msg}", tp = r#type, capitalized = ucfirst(&r#type), url = Url::sanitize(url.to_string()), msg = clean_message( spec_map .get("message") .and_then(|v| v.as_string()) .unwrap_or("") )? )); } } } } Ok(()) } /// @internal /// /// @return ?string[] pub fn get_exception_hints(e: &anyhow::Error) -> Option> { // TODO(phase-b): `$e instanceof TransportException` let e_as_transport: Option<&TransportException> = e.downcast_ref::(); if e_as_transport.is_none() { return None; } let e_as_transport = e_as_transport.unwrap(); if false != strpos(e_as_transport.get_message(), "Resolving timed out").is_some() || false != strpos(e_as_transport.get_message(), "Could not resolve host").is_some() { Silencer::suppress(None); let mut ctx_options: IndexMap = IndexMap::new(); let mut ssl_map: IndexMap> = IndexMap::new(); ssl_map.insert("verify_peer".to_string(), Box::new(PhpMixed::Bool(false))); ctx_options.insert("ssl".to_string(), PhpMixed::Array(ssl_map)); let mut http_map: IndexMap> = IndexMap::new(); http_map.insert( "follow_location".to_string(), Box::new(PhpMixed::Bool(false)), ); http_map.insert("ignore_errors".to_string(), Box::new(PhpMixed::Bool(true))); ctx_options.insert("http".to_string(), PhpMixed::Array(http_map)); // TODO(phase-b): file_get_contents only takes a path; stream context arg dropped. let _ = stream_context_create(&ctx_options, None); let test_connectivity = file_get_contents("https://8.8.8.8"); Silencer::restore(); if test_connectivity.is_some() { return Some(vec![ "The following exception probably indicates you have misconfigured DNS resolver(s)".to_string(), ]); } return Some(vec![ "The following exception probably indicates you are offline or have misconfigured DNS resolver(s)".to_string(), ]); } None } /// @param Job $job fn can_use_curl(&self, job: &Job) -> bool { if self.curl.is_none() { return false; } if !Preg::is_match(r"{^https?://}i", &job.request.url).unwrap_or(false) { return false; } let allow_self_signed = job.request.options.get("ssl").and_then(|v| match v { PhpMixed::Array(m) => m.get("allow_self_signed").cloned(), _ => None, }); if let Some(v) = allow_self_signed { if !shirabe_php_shim::empty(&v) { return false; } } true } /// @internal pub fn is_curl_enabled() -> bool { extension_loaded("curl") && function_exists("curl_multi_exec") && function_exists("curl_multi_init") } } #[derive(Debug, Clone, Copy)] struct JobHandle { id: i64, }