|
| 1 | +#!/usr/bin/env python |
| 2 | +import sys |
| 3 | +import logging |
| 4 | +log = logging.getLogger('git_repo.gogs') |
| 5 | + |
| 6 | +from ..service import register_target, RepositoryService, os |
| 7 | +from ...exceptions import ResourceError, ResourceExistsError, ResourceNotFoundError |
| 8 | + |
| 9 | +import gogs_client |
| 10 | +import requests |
| 11 | +from urllib.parse import urlparse, urlunparse |
| 12 | +import functools |
| 13 | + |
| 14 | +from git import config as git_config |
| 15 | +from git.exc import GitCommandError |
| 16 | + |
| 17 | +@register_target('gg', 'gogs') |
| 18 | +class GogsService(RepositoryService): |
| 19 | + fqdn = 'try.gogs.io' |
| 20 | + #fqdn = 'http://127.0.0.1:3000' |
| 21 | + gg = None |
| 22 | + |
| 23 | + def __init__(self, *args, **kwargs): |
| 24 | + self.session = requests.Session() |
| 25 | + RepositoryService.__init__(self, *args, **kwargs) |
| 26 | + self.ensure_init() |
| 27 | + |
| 28 | + def ensure_init(self): |
| 29 | + if self.gg is not None: |
| 30 | + return |
| 31 | + self.url_base, self.fqdn = self._url_parse(self.fqdn) |
| 32 | + if 'insecure' not in self.config: |
| 33 | + self.insecure = self.fqdn != 'try.gogs.io' |
| 34 | + self.session.verify = not self.insecure |
| 35 | + if 'server-cert' in self.config: |
| 36 | + self.session.verify = self.config['server-cert'] |
| 37 | + self.default_private = self.config.get('default-private', 'false').lower() not in ('0','no','false') |
| 38 | + self.ssh_url = self.config.get('ssh-url', None) or self.fqdn |
| 39 | + if not self.repository: |
| 40 | + config = git_config.GitConfigParser(os.path.join(os.environ['HOME'], '.gitconfig'), True) |
| 41 | + else: |
| 42 | + config = self.repository.config_reader() |
| 43 | + proxies = {} |
| 44 | + for scheme in 'http https'.split(): |
| 45 | + proxy = config.get_value(scheme, 'proxy', '') |
| 46 | + if proxy: |
| 47 | + proxies[scheme] = proxy |
| 48 | + self.session.proxies.update(proxies) |
| 49 | + self.gg = gogs_client.GogsApi(self.url_base, self.session) |
| 50 | + #if ':' in self._privatekey: |
| 51 | + # self.auth = gogs_client.UsernamePassword(*self._privatekey.split(':',1)) |
| 52 | + #else: |
| 53 | + self.auth = gogs_client.Token(self._privatekey) |
| 54 | + |
| 55 | + @classmethod |
| 56 | + def _url_parse(cls, url): |
| 57 | + if '://' not in url: |
| 58 | + url = 'https://'+url |
| 59 | + parse = urlparse(url) |
| 60 | + url_base = urlunparse((parse.scheme, parse.netloc)+('',)*4) |
| 61 | + fqdn = parse.hostname |
| 62 | + return url_base, fqdn |
| 63 | + |
| 64 | + @property |
| 65 | + def url_ro(self): |
| 66 | + return self.url_base |
| 67 | + |
| 68 | + @property |
| 69 | + def url_rw(self): |
| 70 | + url = self.ssh_url |
| 71 | + if '@' in url: |
| 72 | + return url |
| 73 | + return '@'.join([self.git_user, url]) |
| 74 | + |
| 75 | + @classmethod |
| 76 | + def get_auth_token(cls, login, password, prompt=None): |
| 77 | + import platform |
| 78 | + name = 'git-repo2 token used on {}'.format(platform.node()) |
| 79 | + if '/' in login: |
| 80 | + url, login = login.rsplit('/', 1) |
| 81 | + else: |
| 82 | + url = input('URL [{}]> '.format(cls.fqdn)) or cls.fqdn |
| 83 | + url_base, fqdn = cls._url_parse(url) |
| 84 | + gg = gogs_client.GogsApi(url_base) |
| 85 | + auth = gogs_client.UsernamePassword(login, password) |
| 86 | + tokens = gg.get_tokens(auth, login) |
| 87 | + tokens = dict((token.name, token.token) for token in tokens) |
| 88 | + if name in tokens: |
| 89 | + return tokens[name] |
| 90 | + if 'git-repo2 token' in tokens: |
| 91 | + return tokens['git-repo2 token'] |
| 92 | + token = gg.create_token(auth, name, login) |
| 93 | + return token.token |
| 94 | + |
| 95 | + @property |
| 96 | + def user(self): |
| 97 | + return self.gg.authenticated_user(self.auth).username |
| 98 | + |
| 99 | + def orgs(self): |
| 100 | + orgs = self.gg._check_ok(self.gg._get('/user/orgs', auth=self.auth)).json() |
| 101 | + #return [gogs_client.GogsUser.from_json(org) for org in orgs] |
| 102 | + return [org['username'] for org in orgs] |
| 103 | + |
| 104 | + def connect(self): |
| 105 | + self.ensure_init() |
| 106 | + try: |
| 107 | + if self.insecure: |
| 108 | + try: |
| 109 | + try: |
| 110 | + urllib3 = requests.packages.urllib3 |
| 111 | + except Exception: |
| 112 | + import urllib3 |
| 113 | + urllib3.disable_warnings() |
| 114 | + except ImportError: |
| 115 | + pass |
| 116 | + self.username = self.user # Call to self.gg.authenticated_user() |
| 117 | + except requests.HTTPError as err: |
| 118 | + if err.response is not None and err.response.status_code == 401: |
| 119 | + if not self._privatekey: |
| 120 | + raise ConnectionError('Could not connect to GoGS. ' |
| 121 | + 'Please configure .gitconfig ' |
| 122 | + 'with your gogs private key.') from err |
| 123 | + else: |
| 124 | + raise ConnectionError('Could not connect to GoGS. ' |
| 125 | + 'Check your configuration and try again.') from err |
| 126 | + else: |
| 127 | + raise err |
| 128 | + |
| 129 | + def create(self, user, repo, add=False): |
| 130 | + try: |
| 131 | + if user == self.username: |
| 132 | + repository = self.gg.create_repo(self.auth, name=repo, private=self.default_private) |
| 133 | + elif user in self.orgs(): |
| 134 | + data = dict(name=repo, private=self.default_private) |
| 135 | + response = self.gg._post('/org/{}/repos'.format(user), auth=self.auth, data=data) |
| 136 | + repository = gogs_client.GogsRepo.from_json(self.gg._check_ok(response).json()) |
| 137 | + else: |
| 138 | + data = dict(name=repo, private=self.default_private) |
| 139 | + response = self.gg._post('/admin/users/{}/repos'.format(user), auth=self.auth, data=data) |
| 140 | + repository = gogs_client.GogsRepo.from_json(self.gg._check_ok(response).json()) |
| 141 | + except gogs_client.ApiFailure as err: |
| 142 | + if err.status_code == 422: |
| 143 | + raise ResourceExistsError("Project already exists.") from err |
| 144 | + else: |
| 145 | + raise ResourceError("Unhandled error.") from err |
| 146 | + except Exception as err: |
| 147 | + raise ResourceError("Unhandled exception: {}".format(err)) from err |
| 148 | + if add: |
| 149 | + self.add(user=self.username, repo=repo, tracking=self.name) |
| 150 | + |
| 151 | + def fork(self, user, repo): |
| 152 | + raise NotImplementedError |
| 153 | + |
| 154 | + def delete(self, repo, user=None): |
| 155 | + if not user: |
| 156 | + user = self.username |
| 157 | + try: |
| 158 | + self.gg.delete_repo(self.auth, user, repo) |
| 159 | + except gogs_client.ApiFailure as err: |
| 160 | + if err.status_code == 404: |
| 161 | + raise ResourceNotFoundError("Cannot delete: repository {}/{} does not exists.".format(user, repo)) from err |
| 162 | + elif err.status_code == 403: |
| 163 | + raise ResourcePermissionError("You don't have enough permissions for deleting the repository. Check the namespace or the private token's privileges") from err |
| 164 | + elif err.status_code == 422: |
| 165 | + raise ResourceNotFoundError("Cannot delete repository {}/{}: user {} does not exists.".format(user, repo, user)) from err |
| 166 | + raise ResourceError("Unhandled error: {}".format(err)) from err |
| 167 | + except Exception as err: |
| 168 | + raise ResourceError("Unhandled exception: {}".format(err)) from err |
| 169 | + |
| 170 | + def list(self, user, _long=False): |
| 171 | + import shutil, sys |
| 172 | + from datetime import datetime |
| 173 | + term_width = shutil.get_terminal_size((80, 20)).columns |
| 174 | + def col_print(lines, indent=0, pad=2): |
| 175 | + # prints a list of items in a fashion similar to the dir command |
| 176 | + # borrowed from https://gist.github.com/critiqjo/2ca84db26daaeb1715e1 |
| 177 | + n_lines = len(lines) |
| 178 | + if n_lines == 0: |
| 179 | + return |
| 180 | + col_width = max(len(line) for line in lines) |
| 181 | + n_cols = int((term_width + pad - indent)/(col_width + pad)) |
| 182 | + n_cols = min(n_lines, max(1, n_cols)) |
| 183 | + col_len = int(n_lines/n_cols) + (0 if n_lines % n_cols == 0 else 1) |
| 184 | + if (n_cols - 1) * col_len >= n_lines: |
| 185 | + n_cols -= 1 |
| 186 | + cols = [lines[i*col_len : i*col_len + col_len] for i in range(n_cols)] |
| 187 | + rows = list(zip(*cols)) |
| 188 | + rows_missed = zip(*[col[len(rows):] for col in cols[:-1]]) |
| 189 | + rows.extend(rows_missed) |
| 190 | + for row in rows: |
| 191 | + print(" "*indent + (" "*pad).join(line.ljust(col_width) for line in row)) |
| 192 | + |
| 193 | + r = self.gg._get('/user/repos', auth=self.auth) |
| 194 | + repositories = self.gg._check_ok(r).json() |
| 195 | + repositories = [repo for repo in repositories if repo['owner']['username'] == user] |
| 196 | + if user != self.username and not repositories and user not in self.orgs: |
| 197 | + raise ResourceNotFoundError("Unable to list namespace {} - only authenticated user and orgs available for listing.".format(user)) |
| 198 | + if not _long: |
| 199 | + col_print([repo['full_name'] for repo in repositories]) |
| 200 | + else: |
| 201 | + print('Status\tCommits\tReqs\tIssues\tForks\tCoders\tWatch\tLikes\tLang\tModif\t\t\t\tName', file=sys.stderr) |
| 202 | + for repo in repositories: |
| 203 | + status = ''.join([ |
| 204 | + 'F' if repo['fork'] else ' ', # is a fork? |
| 205 | + 'P' if repo['private'] else ' ', # is private? |
| 206 | + ]) |
| 207 | + try: |
| 208 | + issues = self.gg._check_ok(self.gg._get('/repos/{}/issues'.format(repo['full_name']), auth=self.auth)).json() |
| 209 | + except Exception: |
| 210 | + issues = [] |
| 211 | + print('\t'.join([ |
| 212 | + # status |
| 213 | + status, |
| 214 | + # stats |
| 215 | + str(len(list(()))), # number of commits |
| 216 | + str(len(list(()))), # number of pulls |
| 217 | + str(len(list(issues))), # number of issues |
| 218 | + str(repo.get('forks_count') or 0), # number of forks |
| 219 | + str(len(list(()))), # number of contributors |
| 220 | + str(repo.get('watchers_count') or 0), # number of subscribers |
| 221 | + str(repo.get('stars_count') or 0), # number of ♥ |
| 222 | + # info |
| 223 | + repo.get('language') or '?', # language |
| 224 | + repo['updated_at'], # date |
| 225 | + repo['full_name'], # name |
| 226 | + ])) |
| 227 | + |
| 228 | + def get_repository(self, user, repo): |
| 229 | + try: |
| 230 | + return self.gg.get_repo(self.auth, user, repo) |
| 231 | + except gogs_client.ApiFailure as err: |
| 232 | + if err.status_code == 404: |
| 233 | + raise ResourceNotFoundError("Cannot get: repository {}/{} does not exists.".format(user, repo)) from err |
| 234 | + raise ResourceError("Unhandled error: {}".format(err)) from err |
| 235 | + except Exception as err: |
| 236 | + raise ResourceError("Unhandled exception: {}".format(err)) from err |
| 237 | + |
| 238 | + def gist_list(self, gist=None): |
| 239 | + raise NotImplementedError |
| 240 | + |
| 241 | + def gist_fetch(self, gist, fname=None): |
| 242 | + raise NotImplementedError |
| 243 | + |
| 244 | + def gist_clone(self, gist): |
| 245 | + raise NotImplementedError |
| 246 | + |
| 247 | + def gist_create(self, gist_pathes, description, secret=False): |
| 248 | + raise NotImplementedError |
| 249 | + |
| 250 | + def gist_delete(self, gist_id): |
| 251 | + raise NotImplementedError |
| 252 | + |
| 253 | + def request_create(self, user, repo, local_branch, remote_branch, title, description=None): |
| 254 | + raise NotImplementedError |
| 255 | + |
| 256 | + def request_list(self, user, repo): |
| 257 | + raise NotImplementedError |
| 258 | + |
| 259 | + def request_fetch(self, user, repo, request, pull=False): |
| 260 | + raise NotImplementedError |
0 commit comments