# typed: strict # frozen_string_literal: true require "system_command" module GitHub sig { params(scopes: T::Array[String]).returns(String) } def self.pat_blurb(scopes = ALL_SCOPES) require "utils/formatter" require "utils/shell" <<~EOS Create a GitHub personal access token: #{Formatter.url( "https://github.com/settings/tokens/new?scopes=#{scopes.join(",")}&description=Homebrew", )} #{Utils::Shell.set_variable_in_profile("HOMEBREW_GITHUB_API_TOKEN", "your_token_here")} EOS end API_URL = T.let("https://api.github.com", String) API_MAX_PAGES = T.let(50, Integer) private_constant :API_MAX_PAGES API_MAX_ITEMS = T.let(5000, Integer) private_constant :API_MAX_ITEMS PAGINATE_RETRY_COUNT = T.let(3, Integer) private_constant :PAGINATE_RETRY_COUNT CREATE_GIST_SCOPES = T.let(["gist"].freeze, T::Array[String]) CREATE_ISSUE_FORK_OR_PR_SCOPES = T.let(["repo"].freeze, T::Array[String]) CREATE_WORKFLOW_SCOPES = T.let(["workflow"].freeze, T::Array[String]) ALL_SCOPES = T.let((CREATE_GIST_SCOPES + CREATE_ISSUE_FORK_OR_PR_SCOPES + CREATE_WORKFLOW_SCOPES).freeze, T::Array[String]) private_constant :ALL_SCOPES GITHUB_PERSONAL_ACCESS_TOKEN_REGEX = T.let(/^(?:[a-f0-9]{40}|(?:gh[pousr]|github_pat)_\w{36,251})$/, Regexp) private_constant :GITHUB_PERSONAL_ACCESS_TOKEN_REGEX # Helper functions for accessing the GitHub API. # # @api internal module API extend SystemCommand::Mixin # Generic API error. class Error < RuntimeError sig { returns(T.nilable(String)) } attr_reader :github_message sig { params(message: T.nilable(String), github_message: String).void } def initialize(message = nil, github_message = T.unsafe(nil)) @github_message = T.let(github_message, T.nilable(String)) super(message) end end # Error when the requested URL is not found. class HTTPNotFoundError < Error sig { params(github_message: String).void } def initialize(github_message) super(nil, github_message) end end # Error when the API rate limit is exceeded. class RateLimitExceededError < Error sig { params(reset: Integer, github_message: String).void } def initialize(reset, github_message) new_pat_message = ", or:\n#{GitHub.pat_blurb}" if API.credentials.blank? message = <<~EOS GitHub API Error: #{github_message} Try again in #{pretty_ratelimit_reset(reset)}#{new_pat_message} EOS super(message, github_message) end sig { params(reset: Integer).returns(String) } def pretty_ratelimit_reset(reset) pretty_duration(Time.at(reset) - Time.now) end end GITHUB_IP_ALLOWLIST_ERROR = T.let( Regexp.new( "Although you appear to have the correct authorization credentials, " \ "the `(.+)` organization has an IP allow list enabled, " \ "and your IP address is not permitted to access this resource", ).freeze, Regexp, ) NO_CREDENTIALS_MESSAGE = T.let <<~MESSAGE.freeze, String No GitHub credentials found in macOS Keychain, GitHub CLI or the environment. #{GitHub.pat_blurb} MESSAGE # Error when authentication fails. class AuthenticationFailedError < Error sig { params(credentials_type: Symbol, github_message: String).void } def initialize(credentials_type, github_message) message = "GitHub API Error: #{github_message}\n" message << case credentials_type when :github_cli_token <<~EOS Your GitHub CLI login session may be invalid. Refresh it with: gh auth login --hostname github.com EOS when :keychain_username_password <<~EOS The GitHub credentials in the macOS keychain may be invalid. Clear them with: printf "protocol=https\\nhost=github.com\\n" | git credential-osxkeychain erase EOS when :env_token require "utils/formatter" <<~EOS HOMEBREW_GITHUB_API_TOKEN may be invalid or expired; check: #{Formatter.url("https://github.com/settings/tokens")} EOS when :none NO_CREDENTIALS_MESSAGE end super message.freeze, github_message end end # Error when the user has no GitHub API credentials set at all (macOS keychain, GitHub CLI or envvar). class MissingAuthenticationError < Error sig { void } def initialize super NO_CREDENTIALS_MESSAGE end end # Error when the API returns a validation error. class ValidationFailedError < Error sig { params(github_message: String, errors: T::Array[String]).void } def initialize(github_message, errors) github_message = "#{github_message}: #{errors}" unless errors.empty? super(github_message, github_message) end end ERRORS = T.let([ AuthenticationFailedError, HTTPNotFoundError, RateLimitExceededError, Error, JSON::ParserError, ].freeze, T::Array[T.any(T.class_of(Error), T.class_of(JSON::ParserError))]) # Gets the token from the GitHub CLI for github.com. sig { returns(T.nilable(String)) } def self.github_cli_token require "utils/uid" Utils::UID.drop_euid do # Avoid `Formula["gh"].opt_bin` so this method works even with `HOMEBREW_DISABLE_LOAD_FORMULA`. env = { "PATH" => PATH.new(HOMEBREW_PREFIX/"opt/gh/bin", ENV.fetch("PATH")), "HOME" => Utils::UID.uid_home, }.compact gh_out, _, result = system_command "gh", args: ["auth", "token", "--hostname", "github.com"], env:, print_stderr: false return unless result.success? gh_out.chomp.presence end end # Gets the password field from `git-credential-osxkeychain` for github.com, # but only if that password looks like a GitHub Personal Access Token. sig { returns(T.nilable(String)) } def self.keychain_username_password require "utils/uid" Utils::UID.drop_euid do git_credential_out, _, result = system_command "git", args: ["credential-osxkeychain", "get"], input: ["protocol=https\n", "host=github.com\n"], env: { "HOME" => Utils::UID.uid_home }.compact, print_stderr: false return unless result.success? git_credential_out.force_encoding("ASCII-8BIT") github_username = git_credential_out[/^username=(.+)/, 1] github_password = git_credential_out[/^password=(.+)/, 1] return unless github_username # Don't use passwords from the keychain unless they look like # GitHub Personal Access Tokens: # https://github.com/Homebrew/brew/issues/6862#issuecomment-572610344 return unless GITHUB_PERSONAL_ACCESS_TOKEN_REGEX.match?(github_password) github_password.presence end end sig { returns(T.nilable(String)) } def self.credentials @credentials ||= T.let(nil, T.nilable(String)) @credentials ||= Homebrew::EnvConfig.github_api_token.presence @credentials ||= github_cli_token @credentials ||= keychain_username_password end sig { returns(Symbol) } def self.credentials_type if Homebrew::EnvConfig.github_api_token.present? :env_token elsif github_cli_token.present? :github_cli_token elsif keychain_username_password.present? :keychain_username_password else :none end end CREDENTIAL_NAMES = T.let({ env_token: "HOMEBREW_GITHUB_API_TOKEN", github_cli_token: "GitHub CLI login", keychain_username_password: "macOS Keychain GitHub", }.freeze, T::Hash[Symbol, String]) # Given an API response from GitHub, warn the user if their credentials # have insufficient permissions. sig { params(response_headers: T::Hash[String, String], needed_scopes: T::Array[String]).void } def self.credentials_error_message(response_headers, needed_scopes) return if response_headers.empty? scopes = response_headers["x-accepted-oauth-scopes"].to_s.split(", ").presence needed_scopes = Set.new(scopes || needed_scopes) credentials_scopes = response_headers["x-oauth-scopes"] return if needed_scopes.subset?(Set.new(credentials_scopes.to_s.split(", "))) github_permission_link = GitHub.pat_blurb(needed_scopes.to_a) needed_scopes = needed_scopes.to_a.join(", ").presence || "none" credentials_scopes = "none" if credentials_scopes.blank? what = CREDENTIAL_NAMES.fetch(credentials_type) @credentials_error_message ||= T.let(begin error_message = <<~EOS Your #{what} credentials do not have sufficient scope! Scopes required: #{needed_scopes} Scopes present: #{credentials_scopes} #{github_permission_link} EOS onoe error_message error_message end, T.nilable(String)) end sig { params( url: T.any(String, URI::Generic), data: T::Hash[Symbol, T.untyped], data_binary_path: String, request_method: Symbol, scopes: T::Array[String], parse_json: T::Boolean, _block: T.nilable( T.proc .params(data: T::Hash[String, T.untyped]) .returns(T.untyped), ), ).returns(T.untyped) } def self.open_rest(url, data: T.unsafe(nil), data_binary_path: T.unsafe(nil), request_method: T.unsafe(nil), scopes: [].freeze, parse_json: true, &_block) # This is a no-op if the user is opting out of using the GitHub API. return block_given? ? yield({}) : {} if Homebrew::EnvConfig.no_github_api? # This is a Curl format token, not a Ruby one. # rubocop:disable Style/FormatStringToken args = ["--header", "Accept: application/vnd.github+json", "--write-out", "\n%{http_code}"] # rubocop:enable Style/FormatStringToken token = credentials args += ["--header", "Authorization: token #{token}"] if credentials_type != :none args += ["--header", "X-GitHub-Api-Version:2022-11-28"] require "tempfile" data_tmpfile = nil if data begin data = JSON.pretty_generate data data_tmpfile = Tempfile.new("github_api_post", HOMEBREW_TEMP) rescue JSON::ParserError => e raise Error, "Failed to parse JSON request:\n#{e.message}\n#{data}", e.backtrace end end if data_binary_path.present? args += ["--data-binary", "@#{data_binary_path}"] args += ["--header", "Content-Type: application/gzip"] end headers_tmpfile = Tempfile.new("github_api_headers", HOMEBREW_TEMP) begin if data_tmpfile data_tmpfile.write data data_tmpfile.close args += ["--data", "@#{data_tmpfile.path}"] args += ["--request", request_method.to_s] if request_method end args += ["--dump-header", T.must(headers_tmpfile.path)] require "utils/curl" result = Utils::Curl.curl_output("--location", url.to_s, *args, secrets: [token]) output, _, http_code = result.stdout.rpartition("\n") output, _, http_code = output.rpartition("\n") if http_code == "000" headers = headers_tmpfile.read ensure if data_tmpfile data_tmpfile.close data_tmpfile.unlink end headers_tmpfile.close headers_tmpfile.unlink end begin if !http_code.start_with?("2") || !result.status.success? raise_error(output, result.stderr, http_code, headers || "", scopes) end return if http_code == "204" # No Content output = JSON.parse output if parse_json if block_given? yield output else output end rescue JSON::ParserError => e raise Error, "Failed to parse JSON response\n#{e.message}", e.backtrace end end sig { params( url: T.any(String, URI::Generic), additional_query_params: String, per_page: Integer, scopes: T::Array[String], _block: T.proc .params(result: T.untyped, page: Integer) .returns(T.untyped), ).void } def self.paginate_rest(url, additional_query_params: T.unsafe(nil), per_page: 100, scopes: [].freeze, &_block) (1..API_MAX_PAGES).each do |page| retry_count = 1 result = begin API.open_rest("#{url}?per_page=#{per_page}&page=#{page}&#{additional_query_params}", scopes:) rescue Error if retry_count < PAGINATE_RETRY_COUNT retry_count += 1 retry end raise end break if result.blank? yield(result, page) end end sig { params( query: String, variables: T::Hash[Symbol, T.untyped], scopes: T::Array[String], raise_errors: T::Boolean, ).returns(T.untyped) } def self.open_graphql(query, variables: {}, scopes: [].freeze, raise_errors: true) data = { query:, variables: } result = open_rest("#{API_URL}/graphql", scopes:, data:, request_method: :POST) if raise_errors raise Error, result["errors"].map { |e| e["message"] }.join("\n") if result["errors"].present? result["data"] else result end end sig { params( query: String, variables: T::Hash[Symbol, T.untyped], scopes: T::Array[String], raise_errors: T::Boolean, _block: T.proc.params(data: T::Hash[String, T.untyped]).returns(T.untyped), ).void } def self.paginate_graphql(query, variables: {}, scopes: [].freeze, raise_errors: true, &_block) result = API.open_graphql(query, variables:, scopes:, raise_errors:) has_next_page = T.let(true, T::Boolean) while has_next_page page_info = yield result has_next_page = page_info["hasNextPage"] if has_next_page variables[:after] = page_info["endCursor"] result = API.open_graphql(query, variables:, scopes:, raise_errors:) end end end sig { params( output: String, errors: String, http_code: String, headers: String, scopes: T::Array[String], ).void } def self.raise_error(output, errors, http_code, headers, scopes) json = begin JSON.parse(output) rescue nil end message = json&.[]("message") || "curl failed! #{errors}" meta = {} headers.lines.each do |l| key, _, value = l.delete(":").partition(" ") key = key.downcase.strip next if key.empty? meta[key] = value.strip end credentials_error_message(meta, scopes) case http_code when "401" raise AuthenticationFailedError.new(credentials_type, message) when "403" if meta.fetch("x-ratelimit-remaining", 1).to_i <= 0 reset = meta.fetch("x-ratelimit-reset").to_i raise RateLimitExceededError.new(reset, message) end raise AuthenticationFailedError.new(credentials_type, message) when "404" raise MissingAuthenticationError if credentials_type == :none && scopes.present? raise HTTPNotFoundError, message when "422" errors = json&.[]("errors") || [] raise ValidationFailedError.new(message, errors) else raise Error, message end end end end