forked from atseanpaul/review-o-matic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrollreviewergit.py
151 lines (127 loc) · 4.97 KB
/
trollreviewergit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from trollreview import ReviewType
from trollreviewer import ChangeReviewer
import requests
import sys
import urllib
class GitChangeReviewer(ChangeReviewer):
DEFAULT_REMOTE='git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git'
DEFAULT_BRANCH='master'
def __init__(self, reviewer, change, dry_run):
super().__init__(reviewer, change, dry_run)
self.upstream_sha = None
@staticmethod
def can_review_change(change, days_since_last_review):
raise NotImplementedError()
def get_cgit_web_link_path(self):
return '/commit/?head={}&id={}'.format(self.upstream_sha['branch'],
self.upstream_sha['sha'])
def get_upstream_web_link(self):
remote = self.upstream_sha['remote']
parsed = urllib.parse.urlparse(remote)
l = 'https://'
if parsed.netloc == 'git.kernel.org':
l += parsed.netloc
l += parsed.path
l += self.get_cgit_web_link_path()
elif 'github.com' in parsed.netloc:
l += parsed.netloc
l += parsed.path
l += '/commit/{}'.format(self.upstream_sha['sha'])
elif 'anongit' in parsed.netloc:
l += parsed.netloc.replace('anongit', 'cgit')
l += parsed.path
l += self.get_cgit_web_link_path()
elif 'git.infradead.org' in parsed.netloc:
l = 'http://' # whomp whomp
l += parsed.netloc
l += parsed.path
l += '/commit/{}'.format(self.upstream_sha['sha'])
elif 'linuxtv.org' in parsed.netloc:
l += 'git.linuxtv.org'
l += parsed.path
l += self.get_cgit_web_link_path()
else:
sys.stderr.write(
'ERROR: Could not parse web link for {}\n'.format(remote))
return
r = requests.get(l)
if r.status_code == 200:
self.review_result.add_web_link(l)
else:
sys.stderr.write('ERROR: Got {} status for {}\n'.format(r.status_code, l))
return
def add_missing_hash_review(self):
msg = self.strings.MISSING_HASH_HEADER
msg += self.strings.HASH_EXAMPLE
msg += self.strings.MISSING_HASH_FOOTER
self.review_result.add_review(ReviewType.MISSING_HASH, msg, vote=-1,
notify=True)
def add_invalid_hash_review(self, hashes):
msg = self.strings.INVALID_HASH_HEADER
for h in hashes:
remote_str = h['remote']
if h['branch']:
remote_str += ' branch {}'.format(h['branch'])
msg += self.strings.INVALID_HASH_LINE.format(h['sha'], remote_str)
msg += self.strings.INVALID_HASH_FOOTER
msg += self.strings.HASH_EXAMPLE
self.review_result.add_review(ReviewType.INVALID_HASH, msg, vote=-1,
notify=True)
def add_fixes_ref_review(self, fixes_ref):
msg = self.strings.FOUND_FIXES_REF_HEADER
for l in fixes_ref.splitlines():
msg += self.strings.FIXES_REF_LINE.format(l)
msg += self.strings.FIXES_REF_FOOTER
self.review_result.add_review(ReviewType.FIXES_REF, msg, notify=True)
def add_altered_upstream_review(self):
msg = self.strings.DIFFERS_HEADER
msg += self.strings.ALTERED_UPSTREAM
msg += self.format_diff()
self.review_result.add_review(ReviewType.ALTERED_UPSTREAM,
msg, vote=-1, notify=True)
def add_backport_diff_review(self):
msg = self.strings.DIFFERS_HEADER
msg += self.strings.BACKPORT_DIFF
msg += self.format_diff()
self.review_result.add_review(ReviewType.BACKPORT, msg)
def get_upstream_patch(self):
upstream_shas = self.reviewer.get_cherry_pick_shas_from_patch(
self.gerrit_patch)
if not upstream_shas:
self.add_missing_hash_review()
return
upstream_sha = None
for s in reversed(upstream_shas):
if not s['remote']:
s['remote'] = self.DEFAULT_REMOTE
if not s['branch']:
s['branch'] = self.DEFAULT_BRANCH
s['remote_name'] = self.reviewer.generate_remote_name(s['remote'])
self.reviewer.fetch_remote(s['remote_name'], s['remote'], s['branch'])
if not self.reviewer.is_sha_in_branch(s['sha'], s['remote_name'],
s['branch']):
continue
self.upstream_patch = self.reviewer.get_commit_from_sha(s['sha'])
self.upstream_sha = s
if not self.upstream_patch:
self.add_invalid_hash_review(upstream_shas)
return
def get_patches(self):
super().get_patches()
if self.upstream_patch and self.upstream_sha:
fixes_ref = self.reviewer.find_fixes_reference(
self.upstream_sha['sha'],
self.upstream_sha['remote_name'],
self.upstream_sha['branch'])
if fixes_ref:
self.add_fixes_ref_review(fixes_ref)
def compare_patches_backport(self):
if len(self.diff) == 0:
self.add_clean_backport_review()
else:
self.add_backport_diff_review()
def compare_patches_clean(self):
if len(self.diff):
self.add_altered_upstream_review()
else:
self.add_successful_review()