Move more logic from cmd/fetch to download_queue

This is the first step towards using the download queue for all
download operations e.g. `brew install`, `brew upgrade`,
`brew reinstall` etc.

While we're here, do some API cleanup and Sorbet type improvements.
This commit is contained in:
Mike McQuaid 2025-07-11 15:54:49 +01:00
parent 8856a609d6
commit f4e629331f
No known key found for this signature in database
3 changed files with 206 additions and 189 deletions

View File

@ -5,7 +5,6 @@ require "abstract_command"
require "formula" require "formula"
require "fetch" require "fetch"
require "cask/download" require "cask/download"
require "retryable_download"
require "download_queue" require "download_queue"
module Homebrew module Homebrew
@ -70,50 +69,6 @@ module Homebrew
named_args [:formula, :cask], min: 1 named_args [:formula, :cask], min: 1
end end
sig { returns(Integer) }
def concurrency
@concurrency ||= T.let(args.concurrency&.to_i || 1, T.nilable(Integer))
end
sig { returns(DownloadQueue) }
def download_queue
@download_queue ||= T.let(begin
DownloadQueue.new(concurrency)
end, T.nilable(DownloadQueue))
end
class Spinner
FRAMES = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
].freeze
sig { void }
def initialize
@start = T.let(Time.now, Time)
@i = T.let(0, Integer)
end
sig { returns(String) }
def to_s
now = Time.now
if @start + 0.1 < now
@start = now
@i = (@i + 1) % FRAMES.count
end
FRAMES.fetch(@i)
end
end
sig { override.void } sig { override.void }
def run def run
Formulary.enable_factory_cache! Formulary.enable_factory_cache!
@ -173,9 +128,9 @@ module Homebrew
end end
if (manifest_resource = bottle.github_packages_manifest_resource) if (manifest_resource = bottle.github_packages_manifest_resource)
fetch_downloadable(manifest_resource) download_queue.enqueue(manifest_resource)
end end
fetch_downloadable(bottle) download_queue.enqueue(bottle)
rescue Interrupt rescue Interrupt
raise raise
rescue => e rescue => e
@ -192,15 +147,15 @@ module Homebrew
next if fetched_bottle next if fetched_bottle
if (resource = formula.resource) if (resource = formula.resource)
fetch_downloadable(resource) download_queue.enqueue(resource)
end end
formula.resources.each do |r| formula.resources.each do |r|
fetch_downloadable(r) download_queue.enqueue(r)
r.patches.each { |patch| fetch_downloadable(patch.resource) if patch.external? } r.patches.each { |patch| download_queue.enqueue(patch.resource) if patch.external? }
end end
formula.patchlist.each { |patch| fetch_downloadable(patch.resource) if patch.external? } formula.patchlist.each { |patch| download_queue.enqueue(patch.resource) if patch.external? }
end end
end end
else else
@ -220,134 +175,34 @@ module Homebrew
quarantine = true if quarantine.nil? quarantine = true if quarantine.nil?
download = Cask::Download.new(cask, quarantine:) download = Cask::Download.new(cask, quarantine:)
fetch_downloadable(download) download_queue.enqueue(download)
end end
end end
end end
end end
if concurrency == 1 download_queue.start
downloads.each do |downloadable, promise|
promise.wait!
rescue ChecksumMismatchError => e
opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}"
Homebrew.failed = true if downloadable.is_a?(Resource::Patch)
end
else
spinner = Spinner.new
remaining_downloads = downloads.dup.to_a
previous_pending_line_count = 0
begin
$stdout.print Tty.hide_cursor
$stdout.flush
output_message = lambda do |downloadable, future, last|
status = case future.state
when :fulfilled
"#{Tty.green}✔︎#{Tty.reset}"
when :rejected
"#{Tty.red}#{Tty.reset}"
when :pending, :processing
"#{Tty.blue}#{spinner}#{Tty.reset}"
else
raise future.state.to_s
end
message = "#{downloadable.download_type.capitalize} #{downloadable.name}"
$stdout.print "#{status} #{message}#{"\n" unless last}"
$stdout.flush
if future.rejected?
if (e = future.reason).is_a?(ChecksumMismatchError)
opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}"
Homebrew.failed = true if downloadable.is_a?(Resource::Patch)
next 2
else
message = future.reason.to_s
onoe message
Homebrew.failed = true
next message.count("\n")
end
end
1
end
until remaining_downloads.empty?
begin
finished_states = [:fulfilled, :rejected]
finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future|
finished_states.include?(future.state)
end
finished_downloads.each do |downloadable, future|
previous_pending_line_count -= 1
$stdout.print Tty.clear_to_end
$stdout.flush
output_message.call(downloadable, future, false)
end
previous_pending_line_count = 0
max_lines = [concurrency, Tty.height].min
remaining_downloads.each_with_index do |(downloadable, future), i|
break if previous_pending_line_count >= max_lines
$stdout.print Tty.clear_to_end
$stdout.flush
last = i == max_lines - 1 || i == remaining_downloads.count - 1
previous_pending_line_count += output_message.call(downloadable, future, last)
end
if previous_pending_line_count.positive?
if (previous_pending_line_count - 1).zero?
$stdout.print Tty.move_cursor_beginning
else
$stdout.print Tty.move_cursor_up_beginning(previous_pending_line_count - 1)
end
$stdout.flush
end
sleep 0.05
rescue Interrupt
remaining_downloads.each do |_, future|
# FIXME: Implement cancellation of running downloads.
end
download_queue.cancel
if previous_pending_line_count.positive?
$stdout.print Tty.move_cursor_down(previous_pending_line_count - 1)
$stdout.flush
end
raise
end
end
ensure
$stdout.print Tty.show_cursor
$stdout.flush
end
end
ensure ensure
download_queue.shutdown download_queue.shutdown
end end
private private
sig { returns(T::Hash[T.any(Resource, Bottle, Cask::Download), Concurrent::Promises::Future]) } sig { returns(Integer) }
def downloads def concurrency
@downloads ||= T.let({}, T.nilable(T::Hash[T.any(Resource, Bottle, Cask::Download), @concurrency ||= T.let(args.concurrency&.to_i || 1, T.nilable(Integer))
Concurrent::Promises::Future]))
end end
sig { params(downloadable: T.any(Resource, Bottle, Cask::Download)).void } sig { returns(Integer) }
def fetch_downloadable(downloadable) def retries
downloads[downloadable] ||= begin @retries ||= T.let(args.retry? ? FETCH_MAX_TRIES : 0, T.nilable(Integer))
tries = args.retry? ? {} : { tries: 1 } end
download_queue.enqueue(RetryableDownload.new(downloadable, **tries), force: args.force?)
end sig { returns(DownloadQueue) }
def download_queue
@download_queue ||= T.let(begin
DownloadQueue.new(concurrency:, retries:, force: args.force?)
end, T.nilable(DownloadQueue))
end end
end end
end end

View File

@ -1,39 +1,138 @@
# typed: true # rubocop:todo Sorbet/StrictSigil # typed: strict
# frozen_string_literal: true # frozen_string_literal: true
require "downloadable" require "downloadable"
require "concurrent/promises" require "concurrent/promises"
require "concurrent/executors" require "concurrent/executors"
require "retryable_download"
module Homebrew module Homebrew
class DownloadQueue class DownloadQueue
sig { returns(Concurrent::FixedThreadPool) } sig { params(concurrency: Integer, retries: Integer, force: T::Boolean).void }
attr_reader :pool def initialize(concurrency:, retries:, force:)
private :pool @concurrency = concurrency
@quiet = T.let(concurrency > 1, T::Boolean)
sig { params(size: Integer).void } @tries = T.let(retries + 1, Integer)
def initialize(size = 1) @force = force
@pool = Concurrent::FixedThreadPool.new(size) @pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool)
end end
sig { params(downloadable: Downloadable, force: T::Boolean).returns(Concurrent::Promises::Future) } sig { params(downloadable: T.any(Resource, Bottle, Cask::Download)).void }
def enqueue(downloadable, force: false) def enqueue(downloadable)
quiet = pool.max_length > 1 downloads[downloadable] ||= Concurrent::Promises.future_on(
# Passing in arguments from outside into the future is a common `concurrent-ruby` pattern. pool, RetryableDownload.new(downloadable, tries:), force, quiet
# rubocop:disable Lint/ShadowingOuterLocalVariable ) do |download, force, quiet|
Concurrent::Promises.future_on(pool, downloadable, force, quiet) do |downloadable, force, quiet| download.clear_cache if force
downloadable.clear_cache if force download.fetch(quiet:)
downloadable.fetch(quiet:)
end end
# rubocop:enable Lint/ShadowingOuterLocalVariable
end end
sig { void } sig { void }
def cancel def start
# FIXME: Implement graceful cancellaction of running downloads based on if concurrency == 1
# https://ruby-concurrency.github.io/concurrent-ruby/HEAD/Concurrent/Cancellation.html downloads.each do |downloadable, promise|
# instead of killing the whole thread pool. promise.wait!
pool.kill rescue ChecksumMismatchError => e
opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}"
Homebrew.failed = true if downloadable.is_a?(Resource::Patch)
end
else
spinner = Spinner.new
remaining_downloads = downloads.dup.to_a
previous_pending_line_count = 0
begin
$stdout.print Tty.hide_cursor
$stdout.flush
output_message = lambda do |downloadable, future, last|
status = case future.state
when :fulfilled
"#{Tty.green}✔︎#{Tty.reset}"
when :rejected
"#{Tty.red}#{Tty.reset}"
when :pending, :processing
"#{Tty.blue}#{spinner}#{Tty.reset}"
else
raise future.state.to_s
end
message = "#{downloadable.download_type.capitalize} #{downloadable.name}"
$stdout.print "#{status} #{message}#{"\n" unless last}"
$stdout.flush
if future.rejected?
if (e = future.reason).is_a?(ChecksumMismatchError)
opoo "#{downloadable.download_type.capitalize} reports different checksum: #{e.expected}"
Homebrew.failed = true if downloadable.is_a?(Resource::Patch)
next 2
else
message = future.reason.to_s
onoe message
Homebrew.failed = true
next message.count("\n")
end
end
1
end
until remaining_downloads.empty?
begin
finished_states = [:fulfilled, :rejected]
finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future|
finished_states.include?(future.state)
end
finished_downloads.each do |downloadable, future|
previous_pending_line_count -= 1
$stdout.print Tty.clear_to_end
$stdout.flush
output_message.call(downloadable, future, false)
end
previous_pending_line_count = 0
max_lines = [concurrency, Tty.height].min
remaining_downloads.each_with_index do |(downloadable, future), i|
break if previous_pending_line_count >= max_lines
$stdout.print Tty.clear_to_end
$stdout.flush
last = i == max_lines - 1 || i == remaining_downloads.count - 1
previous_pending_line_count += output_message.call(downloadable, future, last)
end
if previous_pending_line_count.positive?
if (previous_pending_line_count - 1).zero?
$stdout.print Tty.move_cursor_beginning
else
$stdout.print Tty.move_cursor_up_beginning(previous_pending_line_count - 1)
end
$stdout.flush
end
sleep 0.05
rescue Interrupt
remaining_downloads.each do |_, future|
# FIXME: Implement cancellation of running downloads.
end
cancel
if previous_pending_line_count.positive?
$stdout.print Tty.move_cursor_down(previous_pending_line_count - 1)
$stdout.flush
end
raise
end
end
ensure
$stdout.print Tty.show_cursor
$stdout.flush
end
end
end end
sig { void } sig { void }
@ -41,5 +140,68 @@ module Homebrew
pool.shutdown pool.shutdown
pool.wait_for_termination pool.wait_for_termination
end end
private
sig { void }
def cancel
# FIXME: Implement graceful cancellation of running downloads based on
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html
# instead of killing the whole thread pool.
pool.kill
end
sig { returns(Concurrent::FixedThreadPool) }
attr_reader :pool
sig { returns(Integer) }
attr_reader :concurrency
sig { returns(Integer) }
attr_reader :tries
sig { returns(T::Boolean) }
attr_reader :force
sig { returns(T::Boolean) }
attr_reader :quiet
sig { returns(T::Hash[T.any(Resource, Bottle, Cask::Download), Concurrent::Promises::Future]) }
def downloads
@downloads ||= T.let({}, T.nilable(T::Hash[T.any(Resource, Bottle, Cask::Download),
Concurrent::Promises::Future]))
end
class Spinner
FRAMES = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
].freeze
sig { void }
def initialize
@start = T.let(Time.now, Time)
@i = T.let(0, Integer)
end
sig { returns(String) }
def to_s
now = Time.now
if @start + 0.1 < now
@start = now
@i = (@i + 1) % FRAMES.count
end
FRAMES.fetch(@i)
end
end
end end
end end

View File

@ -19,7 +19,7 @@ module Homebrew
def mirrors = downloadable.mirrors def mirrors = downloadable.mirrors
sig { params(downloadable: Downloadable, tries: Integer).void } sig { params(downloadable: Downloadable, tries: Integer).void }
def initialize(downloadable, tries: 3) def initialize(downloadable, tries:)
super() super()
@downloadable = downloadable @downloadable = downloadable