#!/usr/bin/python2 # -*- coding: utf-8 -*- vim: set fileencoding=utf-8 : """ A collection of utility commands for Pencil Code coders working with git: - git pc checkout - git pc tag-wip - git pc panic - git pc ff-update - git pc reverse-merge - git pc reverse-pull - git-pc-update-and-push Use git pc -h to get an overview. """ # To do: # - Add 'git tag-wip --cleanup' flag # - Support the compact syntax '--log-level=N' # - Test whether things still work with Lucid # - Better integrate the Lucid tests (e.g. commit my proboscis-dummy module and # always load it when 'import proboscis' fails) # - Could this work on Hardy? # - Can this easily be made to work with Python3? import subprocess import sys import time from abc import ABCMeta, abstractmethod from distutils import spawn from optparse import OptionParser if not (sys.hexversion >= 0x02060000): sys.exit('Python 2.6 or later is required') # Backport Python 2.7 subprocess commands if necessary: try: from subprocess import CalledProcessError except ImportError: # Use definition from Python 2.7 subprocess module: class CalledProcessError(Exception): def __init__(self, returncode, cmd, output=None): self.returncode = returncode self.cmd = cmd self.output = output def __str__(self): return "Command '%s' returned non-zero exit status %d" \ % (self.cmd, self.returncode) usage_text = '\n'.join([ 'Usage:', ' git pc [options] [command_options] [arguments]', 'where is one of', '%s', 'Options:', ' --log-level N -- set the log level to N, an integer out of', ' 0: don\'t print anything unnecessary,', ' 1: print output from git commands,', ' 2: also print the git commands before executing.', '', 'Run', ' git pc --help', 'to get help for a specific git-pc command', ]) # Log level: # 0: be quiet if operations succeed, but print all output from failed git # commands # 1: print output from all git commands # 2: print the git commands BE_QUIET = 0 PRINT_OUTPUT = 1 PRINT_GIT_COMMANDS = 2 log_level = BE_QUIET def main(): args = sys.argv[1:] if len(args) < 1 or args[0] == '-h' or args[0] == '--help': abort_with_usage(0) global log_level if args[0] == '--log-level': args.pop(0) log_level = int(args.pop(0)) (cmd, cmd_args) = (args[0], args[1:]) for command in create_command_list(): if cmd == command.name: command.execute(cmd_args) sys.exit(0) print 'Unknown subcommand \'%s\'. Run\n %s' % (cmd, 'git pc -h', ) print 'to get a list of available subcommands' sys.exit(1) def abort_with_usage(exit_status): global usage_text cmd_overview = '' for cmd in create_command_list(): cmd_overview += ' %-15s -- %-10s\n' % (cmd.name, cmd.overview) print usage_text % (cmd_overview, ) sys.exit(exit_status) def create_command_list(): return [ CheckoutCommand(), TagWipCommand(), PanicCommand(), FfUpdateCommand(), ReverseMergeCommand(), ReversePullCommand(), UpdateAndPushCommand(), ] def create_tag_name(prefix): """Create a tag name including a time stamp.""" tag = '%s-%s' % ( prefix, time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) ) return tag def git_head(): """Return the SHA1 of the current HEAD""" return git_hash('HEAD') def git_branch_name(): """Return the name of the current branch, or None (if detached)""" try: long_name = git_output(['symbolic-ref', '-q', 'HEAD']) return long_name.replace('refs/heads/', '') except CalledProcessError, e: if e.returncode == 1: # detached HEAD return None else: raise Exception(e) def git_branch_exists(branch_name): return git( ['rev-parse', '--verify', branch_name], catch_errors=True ) def git_commits_equal(c1, c2): """Do the two commit-ishs represent the same commit?""" return git_hash(c1) == git_hash(c2) def git_hash(commit): """Return the SHA1 of the current HEAD""" return git_output(['rev-parse', '--verify', '%s^{commit}' % (commit, )]) def git(cmd_list, catch_errors=False): """Run git with the given commands. E.g. git(['checkout', 'master']) Prints all output from the command to the terminal and aborts if the git command returned a non-zero exit status. Arguments: cmd_line -- the git subcommand to run catch_errors -- if True, catch process errors Return: True -- if command execution was successful False -- if catch_errors is set and an error occurred """ cmd_line = ['git'] cmd_line.extend(cmd_list) return run_system_cmd(cmd_line, catch_errors) def git_output(cmd_list): """Run git with the given commands; return the first line of output. E.g. head = git_output(['rev-parse', 'HEAD']) """ outputs = git_outputs(cmd_list) if outputs: return outputs[0] else: return None def git_outputs(cmd_list): """Run git with the given commands; return the output as array of lines. E.g. status_lines = git_output(['status', '--short']) """ cmd_line = ['git'] cmd_line.extend(cmd_list) return run_system_cmd_get_output(cmd_line) def run_system_cmd(cmd_line, catch_errors=False): """Run a system command, writing output to the terminal. Arguments: cmd_line -- the command to run catch_errors -- catch any process errors Return: True -- if command execution was successful False -- if catch_errors is set and an error occurred """ retcode, std_output = _run(cmd_line) if retcode: if catch_errors: return False raise CalledProcessError(retcode, cmd_line) else: return True def run_system_cmd_get_output(cmd_line): """Run a system command and return output as array of lines""" retcode, std_output = _run(cmd_line) if retcode: raise CalledProcessError(retcode, cmd_line) else: return std_output def _run(cmd_line): if log_level >= PRINT_GIT_COMMANDS: print ' '.join(cmd_line) process = subprocess.Popen( cmd_line, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # FIXME: for huge outpout, this will hang std_output, err_output = process.communicate() std_output = std_output.splitlines() err_output = err_output.splitlines() retcode = process.poll() if retcode or log_level >= PRINT_OUTPUT: log_output('OUT:', std_output) log_output('ERR:', err_output) return retcode, std_output def log_output(prefix, lines): for line in lines: print prefix, line class Command(object): """A subcommand of 'git pc'""" __metaclass__ = ABCMeta def __init__(self, name, overview, usage, flags=()): """Describe the subcommand and define its interface. Options: name -- name for calling the subcommand overview -- overview line for 'git pc -h' usage -- the usage text flags -- a list of OptionFlag objects defining the options of the subcommand args -- the raw arguments given on the command line """ self.name = name self.overview = overview self.parser = OptionParser(usage=usage) for flag in flags: flag.add_to(self.parser) def execute(self, args): (options, args) = self.parser.parse_args(args) self.run(options, args) @abstractmethod def run(self, options, args): pass def usage(self, exit_status): self.parser.print_help() sys.exit(exit_status) class CheckoutCommand(Command): def __init__(self): Command.__init__( self, name='checkout', overview='clone the pencil repo from github', usage='\n' + ' git pc checkout \n' + 'Check out the Pencil Code repository from github.\n' + '\n' + 'Arguments:\n' + ' -- check out to that directory [default: ./pencil-code]' ) def run(self, options, args): git_cmd = ['clone', 'https://github.com/pencil-code/pencil-code'] if len(args) == 0: pass elif len(args) == 1: checkout_dir = args[0] git_cmd.append(checkout_dir) else: self.usage(1) git(git_cmd) class TagWipCommand(Command): def __init__(self): Command.__init__( self, name='tag-wip', overview='create a tag containing all of your uncommitted changes', usage='\n' + ' git pc tag-wip\n' + 'Create a tag representing all of your work-in-progress (WIP).' ) def run(self, options, args): if len(args) > 0: self.usage(1) tag = create_tag_name('WIP') unrecorded = UnrecordedChanges(reset=False) if unrecorded.empty: print 'Tagging HEAD with %s' % (tag, ) git(['tag', tag, 'HEAD']) else: print 'Tagging temporary commit with %s' % (tag, ) git(['tag', tag, unrecorded.tip]) unrecorded.restore() class PanicCommand(Command): def __init__(self): Command.__init__( self, name='panic', overview='show all changes and files that were ever known to git', usage='\n' + ' git pc panic [--full] [--visual|--curses|--graph|--log]\n' + 'show all changes and files that were ever known to git.', flags=[ OptionFlag( '-f', '--full', 'Try as hard as you can', action='store_true', dest='full', default=False ), OptionFlag( '-v', '--visual', 'Use \'gitk\' to display', action='store_true', dest='visual', default=False ), OptionFlag( '-c', '--curses', 'Use \'tig\' to display', action='store_true', dest='curses', default=False ), OptionFlag( '-g', '--graph', 'Use \'git log --graph\' to display', action='store_true', dest='graph', default=False ), OptionFlag( '-l', '--log', 'Use plain \'git log\' do display', action='store_true', dest='log', default=False ), ] ) self.interpreters = { 'visual': ['gitk'], 'curses': ['tig'], 'graph': [ 'git', 'log', '--graph', '--pretty=format:%h %aN%d %s' ], 'log': ['git', 'log'], } def run(self, options, args): if len(args) > 0: self.usage(1) style = 'visual' # default fallback = 'log' if options.curses: style = 'curses' if options.graph: style = 'graph' elif options.log: style = 'log' cmd = self.interpreters[style] if not spawn.find_executable(cmd[0]): fallback_cmd = self.interpreters[fallback] print '\'%s\' Command not found, falling back on \'%s\'' \ % (cmd[0], ' '.join(fallback_cmd)) cmd = fallback_cmd cmd.extend(self._get_common_args(options.full)) run_system_cmd(cmd) def _get_common_args(self, include_dangling): """Return a list of arguments. These arguments make the 'git log' family of commands show practically every kown commit. """ args = ['--reflog', '--all'] stashes = [ line.split(':')[0] for line in git_outputs(['stash', 'list']) ] args.extend(stashes) if include_dangling: print 'Looking for dangling commits. This may take a while...' args.extend(self._dangling_commits()) return args def _dangling_commits(self): """Return a list of dangling commits. This includes in particular dropped stashes. Note that 'git fsck --dangling' would do, but git 1.7 doesn't know that option yet. """ return [ line.split()[2] for line in git_outputs([ 'fsck', '--unreachable', '--no-reflog' ]) if 'dangling commit' in line ] class FfUpdateCommand(Command): """Fast-forward update a branch without checking it out. Adopted and simplified from http://stackoverflow.com/questions/4156957/ /merging-branches-without-checkout/4157435#4157435 """ def __init__(self): Command.__init__( self, name='ff-update', overview='update a branch without checking it out', usage='\n' + ' git pc ff-update []\n' + 'Update from upstream without checking it out,\n' + 'provided that this update is a fast-forward.\n' + 'If it is not, cry foul.\n' + '\n' + 'Arguments:\n' + ' -- the branch to update\n' + ' -- the commit to update branch to (defaults' + ' to branch@{u})' ) def run(self, options, args): if len(args) == 1: branch = args[0] upstream = '%s@{u}' % (branch, ) elif len(args) == 2: branch, upstream = args else: self.usage(1) branch, upstream = None # make syntax-check happy FfUpdateCommand._merge_ff(branch, upstream) @staticmethod def _merge_ff(branch, upstream): if git_commits_equal(branch, upstream): print 'Branch %s is already up to date' % (branch, ) return if git_commits_equal('HEAD', branch): # Update checked-out branch git(['merge', '--ff-only', upstream]) return merge_base = git_output(['merge-base', branch, upstream]) if git_commits_equal(merge_base, upstream): print 'Branch %s is ahead of upstream' % (branch, ) return is_ff = git_commits_equal(merge_base, branch) if is_ff: reason = 'Fast-forward merge %s into %s' % (upstream, branch, ) ref = 'refs/heads/%s' % (branch, ) git(['update-ref', '-m', reason, ref, upstream]) else: print 'Not a fast-forward: Merging %s into %s' \ % (upstream, branch, ) sys.exit(1) class ReverseMergeCommand(Command): def __init__(self): Command.__init__( self, name='reverse-merge', overview='merge the current into the given branch.', usage='\n' + ' git pc reverse-merge [--autostash] \n' + 'Merge the current branch into the given branch ' + '(default: upstream).\n' + 'This is most useful when trying to integrate upstream changes ' + 'with local\n' + '(committed) changes:\n' + ' git commit -m \'Last commit, will push now\'\n' + ' git push\n' + ' # [fails: upstream changes]\n' + ' git fetch\n' + ' git pc reverse-merge\n' + ' # [test before pushing ...]\n' + ' git push\n' + '\n' + 'Arguments:\n' + ' -- the branch to merge into (defaults to @{u}, ' + 'i.e. the current\n' + ' upstream branch)', flags=[ OptionFlag( '', '--autostash', 'Stash away any unrecorded changes before the operation' + ' begins and restore them\n' + 'after it ends, so you can run reverse-merge on a dirty ' + ' worktree.\n' + 'However, the final restoration step may result in' + ' non-trivial conflicts.', action='store_true', dest='autostash', default=False, status='experimental' ), ] ) def run(self, options, args): work_branch = git_branch_name() if len(args) == 0: other_branch = '%s@{u}' % (work_branch, ) elif len(args) == 1: other_branch = args[0] else: self.usage(1) other_branch = None # make syntax-check happy unrecorded = None # make syntax-check happy if options.autostash: unrecorded = UnrecordedChanges(reset=True) temp_tag = create_tag_name('before-reverse-merge') git(['tag', temp_tag]) temp_branch = 'reverse-merge-work-%s' % (work_branch, ) if git_branch_exists(temp_branch): git(['branch', '-D', 'temp_branch']) git(['checkout', '-b', temp_branch, other_branch]) message = 'Merge branch \'%s\' into %s using \'%s\'' \ % (work_branch, other_branch, 'git pc reverse-merge') git(['merge', work_branch, '-m', message]) # Now what happens in case of a conflict?? merge_commit = git_head() git(['checkout', work_branch]) git(['reset', '--hard', merge_commit]) if options.autostash: unrecorded.restore() git(['branch', '-D', temp_branch]) git(['tag', '-d', temp_tag]) class ReversePullCommand(Command): def __init__(self): Command.__init__( self, name='reverse-pull', overview='fetch changes and merge current into remote branch', usage='\n' + ' git pc reverse-pull [--autostash] []\n' + 'Fetch upstream changes and merge the current branch into' + ' the remote branch.\n' + 'This is most useful when trying to integrate upstream changes ' + 'with local\n' + '(committed) changes:\n' + ' git commit -m \'Last commit, will push now\'\n' + ' git push\n' + ' # [fails: upstream changes]\n' + ' git pc reverse-pull\n' + ' # [test before pushing ...]\n' + ' git push\n' + '\n' + 'Arguments:\n' + ' -- the repository to fetch from', flags=[ OptionFlag( '', '--autostash', 'Stash away any unrecorded changes before the operation' + ' begins and restore them\n' + 'after it ends, so you can run reverse-pull on a dirty ' + ' worktree.\n' + 'However, the final restoration step may result in' + ' non-trivial conflicts.', action='store_true', dest='autostash', default=False, status='experimental' ), ] ) def run(self, options, args): if len(args) == 0: git(['fetch']) elif len(args) == 1: repo = args[0] git(['fetch', repo]) else: self.usage(1) ReverseMergeCommand().run(options, []) class UpdateAndPushCommand(Command): def __init__(self): Command.__init__( self, name='update-and-push', overview='push commits to upstream on top of' + ' yet unpulled changes', usage='\n' + ' git pc update-and-push \n' + 'Push commits on top of yet unpulled changes,' + ' even from a dirty index and\n' + 'file-tree. This is achieved by\n' + '1. stashing away unrecorded changes,\n' + '2. running \'pull --rebase\',\n' + '3. pushing,\n' + '4. restoring the stashed changes.\n' + '\n' + 'Note that starting wcith git 2.6, you can achieve the' + ' same result by setting\n' + 'the configuration variable rebase.autostash=true and running' + ' \'git pull --rebase;\n' + 'git push\'.\n' + '\n' + 'Arguments:\n' + ' -- where to push to. Default: the upstream' + ' branch associated\n' + ' with the current branch' ) def run(self, options, args): if len(args) == 0: repository = None elif len(args) == 1: repository = args[0] else: self.usage(1) repository = None # make syntax-check happy unrecorded = UnrecordedChanges(reset=True) if repository: git(['pull', '--rebase', repository]) else: git(['pull', '--rebase']) git(['push']) unrecorded.restore() class OptionFlag(object): """Specification for a command-line option""" def __init__( self, short, long, help, action=None, dest=None, default=None, status=None ): self.short = short self.long = long self.help = help self.action = action self.dest = dest self.default = default self.status = status if self.status: if self.status == 'experimental': self.help = '[Experimental] ' + self.help else: raise Exception('Unknown status %s' % (str(status), )) def add_to(self, parser): """Add this option to an optparse.OptionParser""" parser.add_option( self.short, self.long, action=self.action, dest=self.dest, default=self.default, help=self.help ) class UnrecordedChanges(object): """Record and remove unrecorded changes that we want to restore later. Conceptually, this is similar to a stash, but our implementation does not use 'git stash --include-untracked', because the untracked files would end up in separate commit that is not returned by the stash command. Normally, creating an UnrecordedChanges object removes the unrecorded changes from the file tree, and the index. It creates a detached commit (which is not stored anywhere in the ref namespace). """ def __init__(self, reset=False): """Store away any unrecorded changes. Parameters ---------- reset : bool If True, reset head, index and file system to the last head commit, i.e. all changes are discarded, but stored as detached commits. If False, head, index and file system remain untouched. """ self.reset = reset if git_outputs(['status', '--porcelain']): self.empty = False else: self.empty = True head = git_head() # Commit changes in the index if self._index_dirty(): self._commit('Changes already in the index') self.index = git_head() # Add changes to tracked files if self._untracked_changes(): git(['add', '--update']) self._commit('Changes to tracked files') self.tracked = git_head() # Add untracked files if self._untracked_files(): git(['add', '--all']) # Untracked files may be huge, so add a warning to the commit # message: self._commit('Untracked files [DON\'T PUSH!]') self.untracked = git_head() self.tip = self.untracked self.tag = create_tag_name('unrecorded-changes') git(['tag', '-f', self.tag]) if self.reset: # Reset to HEAD git(['reset', '--hard', head]) else: git(['reset', '--mixed', self.index]) git(['reset', '--soft', head]) def _commit(self, message): """Commit the current state of the index using the given message. The tricky part here is to circumvent all checks and hooks. '--no-verify' is not enough, as it still runs the prepare-commit-msg hook, so we use the core.hooksPath configuration variable to make sure Git sees no hooks at all. """ git([ '-c', 'core.hooksPath=/dev/null', 'commit', '--allow-empty', '--message', message ]) def _index_dirty(self): return not git(['diff', '--cached', '--quiet'], catch_errors=True) def _untracked_changes(self): return not git(['diff', '--quiet'], catch_errors=True) def _untracked_files(self): git_cmd = [ 'ls-files', '--other', '--exclude-standard', '--directory' ] if git_output(git_cmd): return True else: return False def restore(self): """Restore any changes that were stored away with the UnrecordedChanges() constructor, and remove the tag we set there. """ if self.reset: new_head = git_head() git(['cherry-pick', '-x', self.index]) new_index = git_head() git(['cherry-pick', '-x', self.tracked]) git(['cherry-pick', '-x', self.untracked]) git(['reset', '--mixed', new_index]) git(['reset', '--soft', new_head]) else: # Nothing to restore pass git(['tag', '-d', self.tag]) if __name__ == '__main__': main()