diff --git a/Library/Homebrew/cmd/fetch.rb b/Library/Homebrew/cmd/fetch.rb index a7b13180db..6c12d1c233 100644 --- a/Library/Homebrew/cmd/fetch.rb +++ b/Library/Homebrew/cmd/fetch.rb @@ -5,7 +5,6 @@ require "abstract_command" require "formula" require "fetch" require "cask/download" -require "retryable_download" require "download_queue" module Homebrew @@ -70,50 +69,6 @@ module Homebrew named_args [:formula, :cask], min: 1 end - sig { returns(Integer) } - def concurrency - @concurrency ||= T.let(args.concurrency&.to_i || 1, T.nilable(Integer)) - end - - sig { returns(DownloadQueue) } - def download_queue - @download_queue ||= T.let(begin - DownloadQueue.new(concurrency) - end, T.nilable(DownloadQueue)) - end - - class Spinner - FRAMES = [ - "⠋", - "⠙", - "⠚", - "⠞", - "⠖", - "⠦", - "⠴", - "⠲", - "⠳", - "⠓", - ].freeze - - sig { void } - def initialize - @start = T.let(Time.now, Time) - @i = T.let(0, Integer) - end - - sig { returns(String) } - def to_s - now = Time.now - if @start + 0.1 < now - @start = now - @i = (@i + 1) % FRAMES.count - end - - FRAMES.fetch(@i) - end - end - sig { override.void } def run Formulary.enable_factory_cache! @@ -173,9 +128,9 @@ module Homebrew end if (manifest_resource = bottle.github_packages_manifest_resource) - fetch_downloadable(manifest_resource) + download_queue.enqueue(manifest_resource) end - fetch_downloadable(bottle) + download_queue.enqueue(bottle) rescue Interrupt raise rescue => e @@ -192,15 +147,15 @@ module Homebrew next if fetched_bottle if (resource = formula.resource) - fetch_downloadable(resource) + download_queue.enqueue(resource) end formula.resources.each do |r| - fetch_downloadable(r) - r.patches.each { |patch| fetch_downloadable(patch.resource) if patch.external? } + download_queue.enqueue(r) + r.patches.each { |patch| download_queue.enqueue(patch.resource) if patch.external? } end - formula.patchlist.each { |patch| fetch_downloadable(patch.resource) if patch.external? } + formula.patchlist.each { |patch| download_queue.enqueue(patch.resource) if patch.external? } end end else @@ -220,134 +175,34 @@ module Homebrew quarantine = true if quarantine.nil? download = Cask::Download.new(cask, quarantine:) - fetch_downloadable(download) + download_queue.enqueue(download) end end end end - if concurrency == 1 - downloads.each do |downloadable, promise| - promise.wait! - rescue ChecksumMismatchError => e - opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}" - Homebrew.failed = true if downloadable.is_a?(Resource::Patch) - end - else - spinner = Spinner.new - remaining_downloads = downloads.dup.to_a - previous_pending_line_count = 0 - - begin - $stdout.print Tty.hide_cursor - $stdout.flush - - output_message = lambda do |downloadable, future, last| - status = case future.state - when :fulfilled - "#{Tty.green}✔︎#{Tty.reset}" - when :rejected - "#{Tty.red}✘#{Tty.reset}" - when :pending, :processing - "#{Tty.blue}#{spinner}#{Tty.reset}" - else - raise future.state.to_s - end - - message = "#{downloadable.download_type.capitalize} #{downloadable.name}" - $stdout.print "#{status} #{message}#{"\n" unless last}" - $stdout.flush - - if future.rejected? - if (e = future.reason).is_a?(ChecksumMismatchError) - opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}" - Homebrew.failed = true if downloadable.is_a?(Resource::Patch) - next 2 - else - message = future.reason.to_s - onoe message - Homebrew.failed = true - next message.count("\n") - end - end - - 1 - end - - until remaining_downloads.empty? - begin - finished_states = [:fulfilled, :rejected] - - finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future| - finished_states.include?(future.state) - end - - finished_downloads.each do |downloadable, future| - previous_pending_line_count -= 1 - $stdout.print Tty.clear_to_end - $stdout.flush - output_message.call(downloadable, future, false) - end - - previous_pending_line_count = 0 - max_lines = [concurrency, Tty.height].min - remaining_downloads.each_with_index do |(downloadable, future), i| - break if previous_pending_line_count >= max_lines - - $stdout.print Tty.clear_to_end - $stdout.flush - last = i == max_lines - 1 || i == remaining_downloads.count - 1 - previous_pending_line_count += output_message.call(downloadable, future, last) - end - - if previous_pending_line_count.positive? - if (previous_pending_line_count - 1).zero? - $stdout.print Tty.move_cursor_beginning - else - $stdout.print Tty.move_cursor_up_beginning(previous_pending_line_count - 1) - end - $stdout.flush - end - - sleep 0.05 - rescue Interrupt - remaining_downloads.each do |_, future| - # FIXME: Implement cancellation of running downloads. - end - - download_queue.cancel - - if previous_pending_line_count.positive? - $stdout.print Tty.move_cursor_down(previous_pending_line_count - 1) - $stdout.flush - end - - raise - end - end - ensure - $stdout.print Tty.show_cursor - $stdout.flush - end - end + download_queue.start ensure download_queue.shutdown end private - sig { returns(T::Hash[T.any(Resource, Bottle, Cask::Download), Concurrent::Promises::Future]) } - def downloads - @downloads ||= T.let({}, T.nilable(T::Hash[T.any(Resource, Bottle, Cask::Download), - Concurrent::Promises::Future])) + sig { returns(Integer) } + def concurrency + @concurrency ||= T.let(args.concurrency&.to_i || 1, T.nilable(Integer)) end - sig { params(downloadable: T.any(Resource, Bottle, Cask::Download)).void } - def fetch_downloadable(downloadable) - downloads[downloadable] ||= begin - tries = args.retry? ? {} : { tries: 1 } - download_queue.enqueue(RetryableDownload.new(downloadable, **tries), force: args.force?) - end + sig { returns(Integer) } + def retries + @retries ||= T.let(args.retry? ? FETCH_MAX_TRIES : 0, T.nilable(Integer)) + end + + sig { returns(DownloadQueue) } + def download_queue + @download_queue ||= T.let(begin + DownloadQueue.new(concurrency:, retries:, force: args.force?) + end, T.nilable(DownloadQueue)) end end end diff --git a/Library/Homebrew/download_queue.rb b/Library/Homebrew/download_queue.rb index 8340e7347d..0de8add528 100644 --- a/Library/Homebrew/download_queue.rb +++ b/Library/Homebrew/download_queue.rb @@ -1,39 +1,138 @@ -# typed: true # rubocop:todo Sorbet/StrictSigil +# typed: strict # frozen_string_literal: true require "downloadable" require "concurrent/promises" require "concurrent/executors" +require "retryable_download" module Homebrew class DownloadQueue - sig { returns(Concurrent::FixedThreadPool) } - attr_reader :pool - private :pool - - sig { params(size: Integer).void } - def initialize(size = 1) - @pool = Concurrent::FixedThreadPool.new(size) + sig { params(concurrency: Integer, retries: Integer, force: T::Boolean).void } + def initialize(concurrency:, retries:, force:) + @concurrency = concurrency + @quiet = T.let(concurrency > 1, T::Boolean) + @tries = T.let(retries + 1, Integer) + @force = force + @pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool) end - sig { params(downloadable: Downloadable, force: T::Boolean).returns(Concurrent::Promises::Future) } - def enqueue(downloadable, force: false) - quiet = pool.max_length > 1 - # Passing in arguments from outside into the future is a common `concurrent-ruby` pattern. - # rubocop:disable Lint/ShadowingOuterLocalVariable - Concurrent::Promises.future_on(pool, downloadable, force, quiet) do |downloadable, force, quiet| - downloadable.clear_cache if force - downloadable.fetch(quiet:) + sig { params(downloadable: T.any(Resource, Bottle, Cask::Download)).void } + def enqueue(downloadable) + downloads[downloadable] ||= Concurrent::Promises.future_on( + pool, RetryableDownload.new(downloadable, tries:), force, quiet + ) do |download, force, quiet| + download.clear_cache if force + download.fetch(quiet:) end - # rubocop:enable Lint/ShadowingOuterLocalVariable end sig { void } - def cancel - # FIXME: Implement graceful cancellaction of running downloads based on - # https://ruby-concurrency.github.io/concurrent-ruby/HEAD/Concurrent/Cancellation.html - # instead of killing the whole thread pool. - pool.kill + def start + if concurrency == 1 + downloads.each do |downloadable, promise| + promise.wait! + rescue ChecksumMismatchError => e + opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}" + Homebrew.failed = true if downloadable.is_a?(Resource::Patch) + end + else + spinner = Spinner.new + remaining_downloads = downloads.dup.to_a + previous_pending_line_count = 0 + + begin + $stdout.print Tty.hide_cursor + $stdout.flush + + output_message = lambda do |downloadable, future, last| + status = case future.state + when :fulfilled + "#{Tty.green}✔︎#{Tty.reset}" + when :rejected + "#{Tty.red}✘#{Tty.reset}" + when :pending, :processing + "#{Tty.blue}#{spinner}#{Tty.reset}" + else + raise future.state.to_s + end + + message = "#{downloadable.download_type.capitalize} #{downloadable.name}" + $stdout.print "#{status} #{message}#{"\n" unless last}" + $stdout.flush + + if future.rejected? + if (e = future.reason).is_a?(ChecksumMismatchError) + opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}" + Homebrew.failed = true if downloadable.is_a?(Resource::Patch) + next 2 + else + message = future.reason.to_s + onoe message + Homebrew.failed = true + next message.count("\n") + end + end + + 1 + end + + until remaining_downloads.empty? + begin + finished_states = [:fulfilled, :rejected] + + finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future| + finished_states.include?(future.state) + end + + finished_downloads.each do |downloadable, future| + previous_pending_line_count -= 1 + $stdout.print Tty.clear_to_end + $stdout.flush + output_message.call(downloadable, future, false) + end + + previous_pending_line_count = 0 + max_lines = [concurrency, Tty.height].min + remaining_downloads.each_with_index do |(downloadable, future), i| + break if previous_pending_line_count >= max_lines + + $stdout.print Tty.clear_to_end + $stdout.flush + last = i == max_lines - 1 || i == remaining_downloads.count - 1 + previous_pending_line_count += output_message.call(downloadable, future, last) + end + + if previous_pending_line_count.positive? + if (previous_pending_line_count - 1).zero? + $stdout.print Tty.move_cursor_beginning + else + $stdout.print Tty.move_cursor_up_beginning(previous_pending_line_count - 1) + end + $stdout.flush + end + + sleep 0.05 + rescue Interrupt + remaining_downloads.each do |_, future| + # FIXME: Implement cancellation of running downloads. + end + + cancel + + if previous_pending_line_count.positive? + $stdout.print Tty.move_cursor_down(previous_pending_line_count - 1) + $stdout.flush + end + + raise + end + end + ensure + $stdout.print Tty.show_cursor + $stdout.flush + end + end end sig { void } @@ -41,5 +140,68 @@ module Homebrew pool.shutdown pool.wait_for_termination end + + private + + sig { void } + def cancel + # FIXME: Implement graceful cancellation of running downloads based on + # https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html + # instead of killing the whole thread pool. + pool.kill + end + + sig { returns(Concurrent::FixedThreadPool) } + attr_reader :pool + + sig { returns(Integer) } + attr_reader :concurrency + + sig { returns(Integer) } + attr_reader :tries + + sig { returns(T::Boolean) } + attr_reader :force + + sig { returns(T::Boolean) } + attr_reader :quiet + + sig { returns(T::Hash[T.any(Resource, Bottle, Cask::Download), Concurrent::Promises::Future]) } + def downloads + @downloads ||= T.let({}, T.nilable(T::Hash[T.any(Resource, Bottle, Cask::Download), + Concurrent::Promises::Future])) + end + + class Spinner + FRAMES = [ + "⠋", + "⠙", + "⠚", + "⠞", + "⠖", + "⠦", + "⠴", + "⠲", + "⠳", + "⠓", + ].freeze + + sig { void } + def initialize + @start = T.let(Time.now, Time) + @i = T.let(0, Integer) + end + + sig { returns(String) } + def to_s + now = Time.now + if @start + 0.1 < now + @start = now + @i = (@i + 1) % FRAMES.count + end + + FRAMES.fetch(@i) + end + end end end diff --git a/Library/Homebrew/retryable_download.rb b/Library/Homebrew/retryable_download.rb index 63fd5ff5fe..8bcb6e9126 100644 --- a/Library/Homebrew/retryable_download.rb +++ b/Library/Homebrew/retryable_download.rb @@ -19,7 +19,7 @@ module Homebrew def mirrors = downloadable.mirrors sig { params(downloadable: Downloadable, tries: Integer).void } - def initialize(downloadable, tries: 3) + def initialize(downloadable, tries:) super() @downloadable = downloadable