-
Notifications
You must be signed in to change notification settings - Fork 0
/
search.py
283 lines (269 loc) · 15.4 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import os, sys, argparse
from custom_functions import *
from config import *
from colorama import Fore, init
# Initialize colorama
init(strip=False, convert=False, autoreset=True)
def found_smth():
global found
found = True
def search_dir(directory, term, case_sensitive):
nexted = False # I create stupid variable names
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
clear_line()
relative_file_path = os.path.relpath(file_path, start=os.getcwd())
splitted = relative_file_path.split(os.path.sep)
docontinue = False
for ddir in config("read", "default.ignore_dirs"):
if ddir in splitted:
if not nexted:
clear_line()
print(f"\r", end="")
nexted = True
docontinue = True
if docontinue:
continue
nexted = False
if args.verbose:
clear_line()
print(f"\r{relative_file_path}", end="")
if not is_binary(file_path):
search_in_file(file_path, term, case_sensitive)
elif args.include_filename and ((case_sensitive and term in str(relative_file_path)) or term.lower() in str(relative_file_path).lower()):
clear_line("-", "\n")
print(f"\r{Fore.WHITE}Found {Fore.YELLOW}{term} {Fore.WHITE}in {Fore.GREEN}{os.path.relpath(file_path, start=os.getcwd())}")
found_smth()
def search_in_cwd(term, case_sensitive):
for item in os.listdir(os.getcwd()):
if args.verbose:
clear_line()
print(f"\r{item}", end="")
if os.path.isfile(item) and not is_binary(item):
file_path = os.path.join(os.getcwd(), item)
search_in_file(file_path, term, case_sensitive)
elif args.include_filename and term in str(item):
clear_line("-", "\n")
print(f"\r{Fore.WHITE}Found {Fore.YELLOW}{term} {Fore.WHITE}in {Fore.GREEN}{item}")
found_smth()
def search_in_file(file_path, term, case_sensitive):
if file_path.split(os.path.sep)[-1] != args.file_name and args.file_name != "*":
return
if term in str(file_path) and args.include_filename:
clear_line("-", "\n")
print(f"\r{Fore.WHITE}Found {Fore.YELLOW}{term} {Fore.WHITE}in {Fore.GREEN}{os.path.relpath(file_path, start=os.getcwd())}")
found_smth()
if not args.search_content:
return
samefile = False
last_printed_line = -1
printed_line_numbers = []
with open(file_path, 'r', errors='ignore') as f:
lines = f.readlines()
for line_number, line in enumerate(lines, start=1):
if (case_sensitive and term in line) or (term.lower() in line.lower()):
if not samefile:
clear_line("-", "\n")
print(f"\r{Fore.WHITE}Found {Fore.YELLOW}{term} {Fore.WHITE}in {Fore.GREEN}{os.path.relpath(file_path, start=os.getcwd())}")
samefile = True
found_smth()
start_line = max(0, line_number - 1 - args.context)
end_line = min(len(lines), line_number + args.context)
for i in range(start_line, end_line):
if i > last_printed_line:
if line_number - 1 == i:
line_marker = ">"
clrs = [config("read", "clr.has_term.line_number", is_theme=True), config("read", "clr.has_term.line", is_theme=True)]
line = lines[i][:-1]
else:
line_marker = " "
clrs = [config("read", "clr.no_term.line_number", is_theme=True), config("read", "clr.no_term.line", is_theme=True)]
line = lines[i][:-1]
if term in lines[i] and i+1 < line_number or i+1 in printed_line_numbers:
pass
elif line_number < i+1 and term in lines[i]:
break
else:
print(f"{clrs[0]}{line_marker} {i+1}{Fore.WHITE}\t: {clrs[1]}{line}{reset()}")
printed_line_numbers.append(i+1)
def main(namespace_arguments):
global args
args = namespace_arguments
print()
if args.config != None:
configure(args.config)
exit(0)
try:
if args.update or config("read", "updater.auto_update"):
remote_url = "https://raw.githubusercontent.com/NSPC911/search/main/"
import requests
# Screw you, you are updating everything
try:
response = requests.get(remote_url + "config.json")
except requests.exceptions.ConnectionError:
print(f"{Fore.RED}ConnectionError: Max retries exceeded, likely due to no connection.")
print(f"{Fore.YELLOW}\tTurn off Flight Mode and connect to a WiFi, or restart your device.")
exit(1)
if response.status_code == 200:
remote_config = response.json()
current_config = load_json(config_path)
version = {"canary": {"remote": remote_config["updater.env.canary_version"].split("."), "local": current_config["updater.env.canary_version"].split(".")}, "stable": {"remote": remote_config["updater.env.current_version"].split(), "local": current_config["updater.env.current_version"].split()}}
print(version)
# Update Checker
if current_config["updater.canary"] and ( version["canary"]["remote"][0] > version["canary"]["local"][0] or version["canary"]["remote"][1] > version["canary"]["local"][1] or version["canary"]["remote"][2] > version["canary"]["local"][2] or version["canary"]["remote"][3] > version["canary"]["local"][3]):
print(f"{Fore.GREEN}New canary version found!")
# Get latest commit details (my commits are terrible)
ghapi_response = requests.get("https://api.github.com/repos/NSPC911/search/commits")
if ghapi_response.status_code == 200:
commit_details = ghapi_response.json()[0]['commit']
print(f"Latest commit:\n{Fore.CYAN}{commit_details['message']}")
print(f"Commit Hash: {Fore.LIGHTMAGENTA_EX}{commit_details['tree']['sha']}")
canary_current = "canary"
elif ( version["stable"]["remote"][0] > version["stable"]["local"][0] or version["stable"]["remote"][1] > version["stable"]["local"][1] or version["stable"]["remote"][2] > version["stable"]["local"][2] ):
print(f"{Fore.GREEN}New stable version found")
print(f"{Fore.YELLOW}Release notes: https://github.com/NSPC911/search/releases/tag/v{remote_config['updater.env.current_version']}")
canary_current = "current"
elif remote_config["updater.env.current_version"] == current_config["updater.env.current_version"]:
print(f"{Fore.GREEN}Stable version up-to-date!")
if remote_config["updater.env.canary_version"] == current_config["updater.env.canary_version"]:
print(f"{Fore.GREEN}Canary version up-to-date!")
else:
print(f"{Fore.LIGHTMAGENTA_EX}Canary version has changes!")
print(f"{Fore.YELLOW}Run `{Fore.CYAN}search --config set updater.canary true{Fore.YELLOW}` to get canary updates.")
exit(0)
# Update Chooser
should_update = ""
notes_response = requests.get(remote_url + "notes.json")
if notes_response.status_code == 200:
notes = notes_response.json()
try:
print("Updater Notes:")
note = notes[remote_config["updater.env.current_version"]]
print('-' * (len(note) + 4))
print(f'| {note} |')
print('-' * (len(note) + 4))
except KeyError:
pass
if canary_current == "canary":
print(f"{Fore.YELLOW}Canary versions are bound to have errors. You will need to set {Fore.LIGHTBLUE_EX}`updater.env.current_version`{Fore.YELLOW} to an older version and disable {Fore.LIGHTBLUE_EX}`updater.canary`{Fore.YELLOW} to revert to the stable version.")
while not should_update.lower().startswith(("y", "n")):
should_update = input(f"Update from {Fore.LIGHTRED_EX}v{current_config[f'updater.env.{canary_current}_version']}{Fore.RESET} to {Fore.GREEN}v{remote_config[f'updater.env.{canary_current}_version']}{Fore.RESET}? [Y/n] ")
if should_update.lower().startswith("n"):
print(f"{Fore.RED}Update cancelled!")
if config("read", "updater.auto_update") and args.term != "":
print(f"{Fore.LIGHTGREEN_EX}Continuing search...")
raise ChildProcessError
exit(0)
# Update
print(f"{Fore.YELLOW}Updating...\n")
new_config = {**remote_config, **current_config}
if current_config["updater.canary"]:
new_config["updater.env.canary_version"] = remote_config["updater.env.canary_version"]
else:
remote_url = f"https://raw.githubusercontent.com/NSPC911/search/refs/tags/v{remote_config['updater.env.current_version']}/"
new_config["updater.env.current_version"] = remote_config["updater.env.current_version"]
dump_json(config_path,new_config)
print(f"{Fore.GREEN}Updated config.json from remote!")
else:
raise ReferenceError("config.json")
files_to_get = ["config.py", "custom_functions.py", "search.py"]
for file in files_to_get:
response = requests.get(remote_url + file)
if response.status_code == 200:
with open(file, "w") as f:
f.write(response.text)
print(f"{Fore.GREEN}Updated {file} from remote!")
else:
raise ReferenceError(file)
print(f"{Fore.GREEN}Updated all scripts from remote!")
if args.term != "" and config("read", "updater.auto_update"):
should_continue = ""
while not should_continue.lower().startswith(("y", "n")):
should_continue = input(f"{Fore.YELLOW}Continue search? [Y/n] {Fore.RESET}")
if should_continue.lower().startswith("y"):
raise ChildProcessError
exit(0)
except ChildProcessError: # I just use random errors that suit lol
pass
delattr(args, "config")
delattr(args, "update")
global found
found = False
if args.in_cwd:
print(f"{Fore.WHITE}Searching for {Fore.LIGHTBLUE_EX}{args.term} {Fore.WHITE}in {Fore.YELLOW}{os.getcwd()}")
search_in_cwd(args.term, args.case_sensitive)
else:
print(f"{Fore.WHITE}Searching for {Fore.LIGHTBLUE_EX}{args.term}")
search_dir(os.getcwd(), args.term, args.case_sensitive)
if not found:
clear_line()
print(f"\n{Fore.YELLOW}Couldn't find {Fore.LIGHTBLUE_EX}{args.term}")
else:
clear_line("-")
print()
if __name__ == "__main__":
try:
parser = argparse.ArgumentParser(description=f"Find.exe but {Fore.CYAN}better{Fore.RESET}")
parser.add_argument("term", nargs="*", help="Term to search for")
if config("read", "default.show_current"):
parser.add_argument("--quiet", "-q", action="store_true", default=False, help=f"{Fore.YELLOW}Don't show current file{Fore.RESET}")
else:
parser.add_argument("--verbose", "-v", action="store_true", default=False, help=f"{Fore.YELLOW}Show current file{Fore.RESET} being searched")
if config("read", "default.search_content"):
parser.add_argument("--no-search-content", "-nsc", action="store_true", default=False, help=f"{Fore.YELLOW}Exclude file content{Fore.RESET} from the search")
else:
parser.add_argument("--search-content", "-sc", action="store_true", default=False, help=f"{Fore.YELLOW}Include file content{Fore.RESET} in the search")
if config("read", "default.include_filename"):
parser.add_argument("--exclude-filename", "-nf", action="store_true", default=False, help=f"{Fore.YELLOW}Exclude file names{Fore.RESET} from the search")
else:
parser.add_argument("--include-filename", "-f", action="store_true", default=False, help=f"{Fore.YELLOW}Include file names{Fore.RESET} in the search")
if config("read", "default.in_cwd"):
parser.add_argument("--recursive", "-r", action="store_true", default=False, help=f"Search {Fore.YELLOW}in sub-directories{Fore.RESET}")
else:
parser.add_argument("--in-cwd", "-nr", action="store_true", default=False, help=f"Search {Fore.YELLOW}only{Fore.RESET} in the current directory")
parser.add_argument("--context", "-c", type=int, default=config("read", "default.context"), help=f"Number of {Fore.YELLOW}context lines{Fore.RESET} to show")
parser.add_argument("--file-name", "-fn", default="*", help=f"Search only in the {Fore.YELLOW}specified file name{Fore.RESET}")
if config("read", "default.case_sensitive"):
parser.add_argument("--case-insensitive", "-ncs", action="store_true", default=False, help=f"Disable case-sensitive searching")
else:
parser.add_argument("--case-sensitive", "-cs", action="store_true", default=False, help=f"Enable case-sensitive searching")
parser.add_argument("--config", nargs=argparse.REMAINDER, metavar=('modifier', 'key', 'value'), help=f"Extra args: [{Fore.LIGHTBLUE_EX}set{Fore.WHITE}/{Fore.CYAN}read{Fore.WHITE}/where/list/reset] [{Fore.LIGHTBLUE_EX}key{Fore.WHITE}/{Fore.CYAN}key{Fore.WHITE}] [{Fore.LIGHTBLUE_EX}value{Fore.WHITE}]")
parser.add_argument("--update", "-u", action="store_true", help="Update files from remote")
args = parser.parse_args()
try:
args.verbose = not args.quiet
delattr(args, "quiet")
except AttributeError:
pass
try:
args.include_filename = not args.exclude_filename
delattr(args, "exclude_filename")
except AttributeError:
pass
try:
args.search_content = not args.no_search_content
delattr(args, "no_search_content")
except AttributeError:
pass
try:
args.in_cwd = not args.recursive
delattr(args, "recursive")
except AttributeError:
pass
try:
args.case_sensitive = not args.case_insensitive
delattr(args, "case_insensitive")
except AttributeError:
pass
if len(sys.argv) == 1:
parser.print_help()
exit(0)
args.term = " ".join(args.term) # It's a list for some reason
main(args)
except KeyboardInterrupt:
pass
except ReferenceError: # I just use random errors that suit lol
print(f"{Fore.RED}RequestError: Couldn't fetch data from remote for {Fore.YELLOW}{args.file_name}")
print(f"{Fore.RED}Please check your internet connection and try again.")
exit(1)