Mike McQuaid f2103c7138
utils/github/api: fix Sorbet types.
Let's not use `T.unsafe(nil)` but instead an empty hash.
2025-06-17 14:59:46 +01:00

468 lines
16 KiB
Ruby

# typed: strict
# frozen_string_literal: true
require "system_command"
module GitHub
sig { params(scopes: T::Array[String]).returns(String) }
def self.pat_blurb(scopes = ALL_SCOPES)
require "utils/formatter"
require "utils/shell"
<<~EOS
Create a GitHub personal access token:
#{Formatter.url(
"https://github.com/settings/tokens/new?scopes=#{scopes.join(",")}&description=Homebrew",
)}
#{Utils::Shell.set_variable_in_profile("HOMEBREW_GITHUB_API_TOKEN", "your_token_here")}
EOS
end
API_URL = T.let("https://api.github.com", String)
API_MAX_PAGES = T.let(50, Integer)
private_constant :API_MAX_PAGES
API_MAX_ITEMS = T.let(5000, Integer)
private_constant :API_MAX_ITEMS
PAGINATE_RETRY_COUNT = T.let(3, Integer)
private_constant :PAGINATE_RETRY_COUNT
CREATE_GIST_SCOPES = T.let(["gist"].freeze, T::Array[String])
CREATE_ISSUE_FORK_OR_PR_SCOPES = T.let(["repo"].freeze, T::Array[String])
CREATE_WORKFLOW_SCOPES = T.let(["workflow"].freeze, T::Array[String])
ALL_SCOPES = T.let((CREATE_GIST_SCOPES + CREATE_ISSUE_FORK_OR_PR_SCOPES + CREATE_WORKFLOW_SCOPES).freeze,
T::Array[String])
private_constant :ALL_SCOPES
GITHUB_PERSONAL_ACCESS_TOKEN_REGEX = T.let(/^(?:[a-f0-9]{40}|(?:gh[pousr]|github_pat)_\w{36,251})$/, Regexp)
private_constant :GITHUB_PERSONAL_ACCESS_TOKEN_REGEX
# Helper functions for accessing the GitHub API.
#
# @api internal
module API
extend SystemCommand::Mixin
# Generic API error.
class Error < RuntimeError
sig { returns(T.nilable(String)) }
attr_reader :github_message
sig { params(message: T.nilable(String), github_message: String).void }
def initialize(message = nil, github_message = T.unsafe(nil))
@github_message = T.let(github_message, T.nilable(String))
super(message)
end
end
# Error when the requested URL is not found.
class HTTPNotFoundError < Error
sig { params(github_message: String).void }
def initialize(github_message)
super(nil, github_message)
end
end
# Error when the API rate limit is exceeded.
class RateLimitExceededError < Error
sig { params(reset: Integer, github_message: String).void }
def initialize(reset, github_message)
new_pat_message = ", or:\n#{GitHub.pat_blurb}" if API.credentials.blank?
message = <<~EOS
GitHub API Error: #{github_message}
Try again in #{pretty_ratelimit_reset(reset)}#{new_pat_message}
EOS
super(message, github_message)
end
sig { params(reset: Integer).returns(String) }
def pretty_ratelimit_reset(reset)
pretty_duration(Time.at(reset) - Time.now)
end
end
GITHUB_IP_ALLOWLIST_ERROR = T.let(
Regexp.new(
"Although you appear to have the correct authorization credentials, " \
"the `(.+)` organization has an IP allow list enabled, " \
"and your IP address is not permitted to access this resource",
).freeze,
Regexp,
)
NO_CREDENTIALS_MESSAGE = T.let <<~MESSAGE.freeze, String
No GitHub credentials found in macOS Keychain, GitHub CLI or the environment.
#{GitHub.pat_blurb}
MESSAGE
# Error when authentication fails.
class AuthenticationFailedError < Error
sig { params(credentials_type: Symbol, github_message: String).void }
def initialize(credentials_type, github_message)
message = "GitHub API Error: #{github_message}\n"
message << case credentials_type
when :github_cli_token
<<~EOS
Your GitHub CLI login session may be invalid.
Refresh it with:
gh auth login --hostname github.com
EOS
when :keychain_username_password
<<~EOS
The GitHub credentials in the macOS keychain may be invalid.
Clear them with:
printf "protocol=https\\nhost=github.com\\n" | git credential-osxkeychain erase
EOS
when :env_token
require "utils/formatter"
<<~EOS
HOMEBREW_GITHUB_API_TOKEN may be invalid or expired; check:
#{Formatter.url("https://github.com/settings/tokens")}
EOS
when :none
NO_CREDENTIALS_MESSAGE
end
super message.freeze, github_message
end
end
# Error when the user has no GitHub API credentials set at all (macOS keychain, GitHub CLI or envvar).
class MissingAuthenticationError < Error
sig { void }
def initialize
super NO_CREDENTIALS_MESSAGE
end
end
# Error when the API returns a validation error.
class ValidationFailedError < Error
sig { params(github_message: String, errors: T::Array[String]).void }
def initialize(github_message, errors)
github_message = "#{github_message}: #{errors}" unless errors.empty?
super(github_message, github_message)
end
end
ERRORS = T.let([
AuthenticationFailedError,
HTTPNotFoundError,
RateLimitExceededError,
Error,
JSON::ParserError,
].freeze, T::Array[T.any(T.class_of(Error), T.class_of(JSON::ParserError))])
# Gets the token from the GitHub CLI for github.com.
sig { returns(T.nilable(String)) }
def self.github_cli_token
require "utils/uid"
Utils::UID.drop_euid do
# Avoid `Formula["gh"].opt_bin` so this method works even with `HOMEBREW_DISABLE_LOAD_FORMULA`.
env = {
"PATH" => PATH.new(HOMEBREW_PREFIX/"opt/gh/bin", ENV.fetch("PATH")),
"HOME" => Utils::UID.uid_home,
}.compact
gh_out, _, result = system_command "gh",
args: ["auth", "token", "--hostname", "github.com"],
env:,
print_stderr: false
return unless result.success?
gh_out.chomp.presence
end
end
# Gets the password field from `git-credential-osxkeychain` for github.com,
# but only if that password looks like a GitHub Personal Access Token.
sig { returns(T.nilable(String)) }
def self.keychain_username_password
require "utils/uid"
Utils::UID.drop_euid do
git_credential_out, _, result = system_command "git",
args: ["credential-osxkeychain", "get"],
input: ["protocol=https\n", "host=github.com\n"],
env: { "HOME" => Utils::UID.uid_home }.compact,
print_stderr: false
return unless result.success?
git_credential_out.force_encoding("ASCII-8BIT")
github_username = git_credential_out[/^username=(.+)/, 1]
github_password = git_credential_out[/^password=(.+)/, 1]
return unless github_username
# Don't use passwords from the keychain unless they look like
# GitHub Personal Access Tokens:
# https://github.com/Homebrew/brew/issues/6862#issuecomment-572610344
return unless GITHUB_PERSONAL_ACCESS_TOKEN_REGEX.match?(github_password)
github_password.presence
end
end
sig { returns(T.nilable(String)) }
def self.credentials
@credentials ||= T.let(nil, T.nilable(String))
@credentials ||= Homebrew::EnvConfig.github_api_token.presence
@credentials ||= github_cli_token
@credentials ||= keychain_username_password
end
sig { returns(Symbol) }
def self.credentials_type
if Homebrew::EnvConfig.github_api_token.present?
:env_token
elsif github_cli_token.present?
:github_cli_token
elsif keychain_username_password.present?
:keychain_username_password
else
:none
end
end
CREDENTIAL_NAMES = T.let({
env_token: "HOMEBREW_GITHUB_API_TOKEN",
github_cli_token: "GitHub CLI login",
keychain_username_password: "macOS Keychain GitHub",
}.freeze, T::Hash[Symbol, String])
# Given an API response from GitHub, warn the user if their credentials
# have insufficient permissions.
sig { params(response_headers: T::Hash[String, String], needed_scopes: T::Array[String]).void }
def self.credentials_error_message(response_headers, needed_scopes)
return if response_headers.empty?
scopes = response_headers["x-accepted-oauth-scopes"].to_s.split(", ").presence
needed_scopes = Set.new(scopes || needed_scopes)
credentials_scopes = response_headers["x-oauth-scopes"]
return if needed_scopes.subset?(Set.new(credentials_scopes.to_s.split(", ")))
github_permission_link = GitHub.pat_blurb(needed_scopes.to_a)
needed_scopes = needed_scopes.to_a.join(", ").presence || "none"
credentials_scopes = "none" if credentials_scopes.blank?
what = CREDENTIAL_NAMES.fetch(credentials_type)
@credentials_error_message ||= T.let(begin
error_message = <<~EOS
Your #{what} credentials do not have sufficient scope!
Scopes required: #{needed_scopes}
Scopes present: #{credentials_scopes}
#{github_permission_link}
EOS
onoe error_message
error_message
end, T.nilable(String))
end
sig {
params(
url: T.any(String, URI::Generic),
data: T::Hash[Symbol, T.untyped],
data_binary_path: String,
request_method: Symbol,
scopes: T::Array[String],
parse_json: T::Boolean,
_block: T.nilable(
T.proc
.params(data: T::Hash[String, T.untyped])
.returns(T.untyped),
),
).returns(T.untyped)
}
def self.open_rest(url, data: T.unsafe(nil), data_binary_path: T.unsafe(nil), request_method: T.unsafe(nil),
scopes: [].freeze, parse_json: true, &_block)
# This is a no-op if the user is opting out of using the GitHub API.
return block_given? ? yield({}) : {} if Homebrew::EnvConfig.no_github_api?
# This is a Curl format token, not a Ruby one.
# rubocop:disable Style/FormatStringToken
args = ["--header", "Accept: application/vnd.github+json", "--write-out", "\n%{http_code}"]
# rubocop:enable Style/FormatStringToken
token = credentials
args += ["--header", "Authorization: token #{token}"] if credentials_type != :none
args += ["--header", "X-GitHub-Api-Version:2022-11-28"]
require "tempfile"
data_tmpfile = nil
if data
begin
data = JSON.pretty_generate data
data_tmpfile = Tempfile.new("github_api_post", HOMEBREW_TEMP)
rescue JSON::ParserError => e
raise Error, "Failed to parse JSON request:\n#{e.message}\n#{data}", e.backtrace
end
end
if data_binary_path.present?
args += ["--data-binary", "@#{data_binary_path}"]
args += ["--header", "Content-Type: application/gzip"]
end
headers_tmpfile = Tempfile.new("github_api_headers", HOMEBREW_TEMP)
begin
if data_tmpfile
data_tmpfile.write data
data_tmpfile.close
args += ["--data", "@#{data_tmpfile.path}"]
args += ["--request", request_method.to_s] if request_method
end
args += ["--dump-header", T.must(headers_tmpfile.path)]
require "utils/curl"
result = Utils::Curl.curl_output("--location", url.to_s, *args, secrets: [token])
output, _, http_code = result.stdout.rpartition("\n")
output, _, http_code = output.rpartition("\n") if http_code == "000"
headers = headers_tmpfile.read
ensure
if data_tmpfile
data_tmpfile.close
data_tmpfile.unlink
end
headers_tmpfile.close
headers_tmpfile.unlink
end
begin
if !http_code.start_with?("2") || !result.status.success?
raise_error(output, result.stderr, http_code, headers || "", scopes)
end
return if http_code == "204" # No Content
output = JSON.parse output if parse_json
if block_given?
yield output
else
output
end
rescue JSON::ParserError => e
raise Error, "Failed to parse JSON response\n#{e.message}", e.backtrace
end
end
sig {
params(
url: T.any(String, URI::Generic),
additional_query_params: String,
per_page: Integer,
scopes: T::Array[String],
_block: T.proc
.params(result: T.untyped, page: Integer)
.returns(T.untyped),
).void
}
def self.paginate_rest(url, additional_query_params: T.unsafe(nil), per_page: 100, scopes: [].freeze, &_block)
(1..API_MAX_PAGES).each do |page|
retry_count = 1
result = begin
API.open_rest("#{url}?per_page=#{per_page}&page=#{page}&#{additional_query_params}", scopes:)
rescue Error
if retry_count < PAGINATE_RETRY_COUNT
retry_count += 1
retry
end
raise
end
break if result.blank?
yield(result, page)
end
end
sig {
params(
query: String,
variables: T::Hash[Symbol, T.untyped],
scopes: T::Array[String],
raise_errors: T::Boolean,
).returns(T.untyped)
}
def self.open_graphql(query, variables: {}, scopes: [].freeze, raise_errors: true)
data = { query:, variables: }
result = open_rest("#{API_URL}/graphql", scopes:, data:, request_method: :POST)
if raise_errors
raise Error, result["errors"].map { |e| e["message"] }.join("\n") if result["errors"].present?
result["data"]
else
result
end
end
sig {
params(
query: String,
variables: T::Hash[Symbol, T.untyped],
scopes: T::Array[String],
raise_errors: T::Boolean,
_block: T.proc.params(data: T::Hash[String, T.untyped]).returns(T.untyped),
).void
}
def self.paginate_graphql(query, variables: {}, scopes: [].freeze, raise_errors: true, &_block)
result = API.open_graphql(query, variables:, scopes:, raise_errors:)
has_next_page = T.let(true, T::Boolean)
while has_next_page
page_info = yield result
has_next_page = page_info["hasNextPage"]
if has_next_page
variables[:after] = page_info["endCursor"]
result = API.open_graphql(query, variables:, scopes:, raise_errors:)
end
end
end
sig {
params(
output: String,
errors: String,
http_code: String,
headers: String,
scopes: T::Array[String],
).void
}
def self.raise_error(output, errors, http_code, headers, scopes)
json = begin
JSON.parse(output)
rescue
nil
end
message = json&.[]("message") || "curl failed! #{errors}"
meta = {}
headers.lines.each do |l|
key, _, value = l.delete(":").partition(" ")
key = key.downcase.strip
next if key.empty?
meta[key] = value.strip
end
credentials_error_message(meta, scopes)
case http_code
when "401"
raise AuthenticationFailedError.new(credentials_type, message)
when "403"
if meta.fetch("x-ratelimit-remaining", 1).to_i <= 0
reset = meta.fetch("x-ratelimit-reset").to_i
raise RateLimitExceededError.new(reset, message)
end
raise AuthenticationFailedError.new(credentials_type, message)
when "404"
raise MissingAuthenticationError if credentials_type == :none && scopes.present?
raise HTTPNotFoundError, message
when "422"
errors = json&.[]("errors") || []
raise ValidationFailedError.new(message, errors)
else
raise Error, message
end
end
end
end