-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathligysis.py
2427 lines (2005 loc) · 110 KB
/
ligysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
### IMPORTS
import warnings
warnings.filterwarnings("ignore")
import os
import sys
import copy
import math
import time
import scipy
import pickle
import random
import shutil
import logging
import argparse
import Bio.SeqIO
import numpy as np
import pandas as pd
import configparser
from Bio import PDB
from Bio import AlignIO
from Bio.PDB import Select
import scipy.stats as stats
from Bio.PDB import MMCIFParser
from Bio.PDB.MMCIF2Dict import MMCIF2Dict
from prointvar.pdbx import PDBXreader, PDBXwriter
from prointvar.dssp import DSSPrunner, DSSPreader
from prointvar.config import config as cfg
from prointvar.fetchers import download_structure_from_pdbe
import varalign.align_variants
import varalign.alignments
from urllib.error import HTTPError
from urllib.error import URLError
from config import BASE_DIR, OUTPUT_FOLDER, MOLS_FOLDER, INTERS_FOLDER, EXP_FOLDER, MATS_FOLDER, SEGMENT_FOLDER, STRUCTURE_FOLDER, ASYM_FOLDER, ASSEMBLY_FOLDER, CHAIN_REMAPPING_FOLDER, CIF_SIFTS_FOLDER
### SETTING UP LOGGER
logging.basicConfig(filename = "ligysis.log", format = '%(asctime)s %(name)s [%(levelname)-8s] - %(message)s', level = logging.INFO)
log = logging.getLogger("LIGYSIS")
## READ CONFIG FILE
config = configparser.ConfigParser()
config_path = os.path.join(BASE_DIR, "ligysis_config.txt")
config.read(config_path) # assuming this program is being executed one level above where this script and its config file are located
arpeggio_python_bin = config["paths"].get("arpeggio_python_bin") # location of python binary to run pdbe-arpeggio.
arpeggio_bin = config["paths"].get("arpeggio_bin") # location of pdbe-arpeggio binary.
ensembl_sqlite_path = config["paths"].get("ensembl_sqlite") # location of a local copy of ENSEMBL mappings from UniProt Accession to genome (sqlite)
gnomad_vcf = config["paths"].get("gnomad_vcf") # location of gnomAD VCF. This database is not updated.
swissprot = config["paths"].get("swissprot") # location of local SwissProt copy. This database is not updated, current version is Nov 2021.
max_retry = int(config["other"].get("max_retry")) # number of maximum attempts to make to retrieve a certain piece of data from PDBe API.
sleep_time = float(config["other"].get("sleep_time")) # time to sleep between queries to the PDBe API.
### VARIABLES
MSA_fmt = "stockholm"
biolip_data = "./BioLiP.pkl"
headings = ["ID", "RSA", "DS", "MES", "Size", "Cluster", "FS"]
bbone_atoms = ["C", "CA", "N", "O"]
pdb_resnames = [
"ALA", "CYS", "ASP", "GLU", "PHE", "GLY", "HIS", "ILE", "LYS", "LEU",
"MET", "ASN", "PRO", "GLN", "ARG", "SER", "THR", "VAL", "TRP", "TYR"
]
aas_1l= [
"A", "R", "N", "D", "C", "Q", "E", "G", "H", "I",
"L", "K", "M", "F", "P", "S", "T", "W", "Y", "V",
"-"
]
chimeraX_commands = [
"color white; set bgColor white",
"set silhouette ON; set silhouetteWidth 3; set silhouetteColor black",
"color byattribute binding_site palette paired-12; col ::binding_site==-1 grey",
"~disp; select ~protein; ~select : HOH; ~select ::binding_site==-1; disp sel; ~sel",
"surf; surface color white; transparency 0 s;"
]
exp_data_cols = [
"resolution", "resolution_low", "resolution_high",
"r_factor", "r_free", "r_work", "experimental_method", # columns of interest from experimental data table
"structure_determination_method"
]
dssp_cols = [
"PDB_ResNum", "SS", "ACC", "KAPPA", # selects subset of columns
"ALPHA", "PHI", "PSI", "RSA"
]
consvar_class_colours = [
"royalblue", "green", "grey", "firebrick", "orange"
]
i_cols = [
"pdbx_sifts_xref_db_acc",
"label_asym_id",
"auth_seq_id",
"pdbx_sifts_xref_db_num"
]
interaction_to_color = { # following Arpeggio's colour scheme
'clash': '#000000',
'covalent':'#999999',
'vdw_clash': '#999999',
'vdw': '#999999',
'proximal': '#999999',
'hbond': '#f04646',
'weak_hbond': '#fc7600',
'xbond': '#3977db', #halogen bond
'ionic': '#e3e159',
'metal_complex': '#800080',
'aromatic': '#00ccff',
'hydrophobic': '#006633',
'carbonyl': '#ff007f',
'polar': '#f04646',
'weak_polar': '#fc7600',
}
### FUNCTIONS
## UTILS
def is_dir_empty(dir_path):
return not os.listdir(dir_path) if os.path.exists(dir_path) else True
def get_status_code_data(status_code_file):
"""
Reads status code file and returns dictionaries
for accession and segment status codes.
"""
accs_status_dict = {}
segs_status_dict = {}
with open(status_code_file, "r") as f:
for line in f:
d = line.strip().split("\t")
if "_" in d[0]:
segs_status_dict[d[0]] = d[1]
else:
accs_status_dict[d[0]] = d[1]
return accs_status_dict, segs_status_dict
def cp_sqlite(wd, og_path = ensembl_sqlite_path):
"""
Copies ensembl_cache.sqlite to execution directory.
"""
hidden_var_dir = os.path.join(wd, ".varalign")
sqlite_name = os.path.basename(og_path)
if not os.path.isdir(hidden_var_dir):
os.mkdir(hidden_var_dir)
else:
pass
cp_path = os.path.join(hidden_var_dir, sqlite_name)
shutil.copy(og_path, cp_path)
return cp_path
def rm_sqlite(cp_path):
"""
Removes ensembl_cache.sqlite from execution directory.
"""
hidden_var_dir = os.path.dirname(cp_path)
os.remove(cp_path)
os.rmdir(hidden_var_dir)
def dump_pickle(data, f_out):
"""
Dumps data to pickle.
"""
with open(f_out, "wb") as f:
pickle.dump(data, f)
def load_pickle(f_in):
"""
Loads data from pickle.
"""
with open(f_in, "rb") as f:
data = pickle.load(f)
return data
## TRANSFORMING PDB FILES
def fmt_mat_in(mat_in):
"""
Formats a transformation matrix in a way that can be
used by the transformation function.
"""
mat_out = [
[mat_in[0][0], mat_in[1][0], mat_in[2][0]],
[mat_in[0][1], mat_in[1][1], mat_in[2][1]],
[mat_in[0][2], mat_in[1][2], mat_in[2][2]],
[mat_in[0][-1], mat_in[1][-1], mat_in[2][-1]]
]
matrix_dict = {
"rotation": [mat_out[0], mat_out[1], mat_out[2]],
"translation": mat_out[3]
}
return matrix_dict
def get_segments_dict(supp_data, acc):
"""
Given a superimposition table for a protein, creates
a dictionary with information about the different
structure coverage segments.
"""
segment_dict = {}
for idx, row in supp_data.iterrows():
segment_dict[idx] = {}
clusters = row[acc]["clusters"]
for cluster in clusters:
for member in cluster:
pdb_id = member["pdb_id"]
chain_id = member["struct_asym_id"] # this corresponds to the original label_asym_id of the CIF file
unit_id = "{}_{}".format(pdb_id, chain_id)
segment_dict[idx][unit_id] = member
return segment_dict
def get_segment_membership(supp_data, acc):
"""
Given a superimposition table for a protein, creates
a dictionary that indicates which pdb chains are included
within each structural coverage segment.
"""
segment_membership = {}
for idx, row in supp_data.iterrows():
segment_membership[idx] = []
clusters = row[acc]["clusters"]
for cluster in clusters:
for member in cluster:
pdb_id = member["pdb_id"]
chain_id = member["struct_asym_id"] # this correspobds to the original label_asym_id of the CIF file
unit_id = "{}_{}".format(pdb_id, chain_id)
segment_membership[idx].append(unit_id)
return segment_membership
def parse_pdb_file(pdb_path, fmt):
"""
:param pdb_path:
:return: biopython's structure object
"""
parser = PDB.MMCIFParser()
pdb_id, _ = os.path.splitext(os.path.basename(pdb_path))
try:
structure = parser.get_structure(pdb_id, pdb_path)
return structure
except:
log.error("Could not parse {}".format(pdb_path)) # A0QSG2 can't parse the structure due to Blank altlocs in duplicate residues
return None
class HighestOccupancy(Select):
def __init__(self, structure, chain_id):
self.structure = structure
self.chain_id = chain_id
self.highest_occupancy_altlocs = self._find_highest_occupancy_altlocs()
def _find_highest_occupancy_altlocs(self):
highest_occupancy = {}
for model in self.structure:
for chain in model:
if chain.id == self.chain_id:
for residue in chain:
for atom in residue:
if atom.element == 'H':
continue # Skip hydrogen atoms
if not atom.is_disordered():
continue # Ignore non-disordered atoms
atom_id = (residue.get_id(), atom.get_name())
for altloc_id, altloc_atom in atom.child_dict.items():
if altloc_id == ' ':
continue # Skip the default blank altloc
if atom_id not in highest_occupancy or altloc_atom.get_occupancy() > highest_occupancy[atom_id][1]:
highest_occupancy[atom_id] = (altloc_id, altloc_atom.get_occupancy())
return highest_occupancy
def accept_atom(self, atom):
if atom.element == 'H':
return False # Skip hydrogen atoms
if atom.is_disordered():
atom_id = (atom.get_parent().get_id(), atom.get_name())
return atom.get_altloc() == self.highest_occupancy_altlocs.get(atom_id, (' ', 0))[0]
else:
return True # Accept non-disordered atoms
def count_accepted_atoms(self):
count = 0
for model in self.structure:
for chain in model:
if chain.id == self.chain_id:
for residue in chain:
for atom in residue:
if self.accept_atom(atom):
count += 1
return count
def apply_transformation(structure, matrix, output_path, chain_id, fmt):
"""
Transforms structure based on the transformation matrix
:param structure: biopython's structure object
:param matrix: transformation matrix dict
:param output_path: path to the output file
:param chain_id: chain ID to transform (By default it is AUTH_ASYM_ID)
:param fmt: structure format
:return: transformed structure
"""
rotation = matrix["rotation"]
translation = matrix["translation"]
#if fmt == "pdb":
# io = PDB.PDBIO()
#elif fmt == "cif":
io = PDB.MMCIFIO()
for model in structure:
for chain in model:
if chain.id == chain_id:
for residue in chain:
for atom in residue:
atom.transform(rotation, translation)
io.set_structure(chain)
high_occ = HighestOccupancy(structure, chain_id)
n_high_occ_atoms = high_occ.count_accepted_atoms()
# log.info("Number of atoms in chain {} with highest occupancy: {}".format(chain_id, n_high_occ_atoms))
io.save(output_path, select = high_occ) # it seems it is MMCIFIO() that is dropping _atom_site.auth_comp_id and _atom_site.auth_atom_id .
def pdb_transform(structure, output_path, matrix_raw, chain_id, fmt = 'cif'):
"""
Applies transformation matrix to the PDB file and writes the new PDB to file
:param pdb_path: path to the input PDB file
:param matrix_path: path to the transformation matrix file
:param output_path: path to the output PDB file
"""
#structure = parse_pdb_file(pdb_path, fmt)
#if structure == None:
# return 1
matrix_rf = fmt_mat_in(matrix_raw)
#print(pdb_path, output_path, chain_id, fmt, matrix_rf)
apply_transformation(structure, matrix_rf, output_path, chain_id, fmt) # by default has to be AUTH_ASYM_ID
# check number of rows in the output file
trans_cif = PDBXreader(inputfile = output_path).atoms(format_type = "mmcif", excluded=())
# print(len(trans_cif))
# print(trans_cif.head())
return 0
def transform_all_files(pdb_ids, matrices, struct_chains, auth_chains, asymmetric_dir, trans_dir, OVERRIDE_TRANS = False):
"""
Given a set of PDB IDs, matrices, and chains, transforms
the coordinates according to a transformation matrix
"""
no_trans = [] # capture files that are not transformed
for i, pdb_id in enumerate(pdb_ids):
asym_cif = os.path.join(asymmetric_dir, "{}.cif".format(pdb_id))
root, ext = os.path.splitext(os.path.basename(asym_cif))
trans_cif = os.path.join(trans_dir, "{}_{}_trans{}".format(root, struct_chains[i], ext))
if OVERRIDE_TRANS or not os.path.isfile(trans_cif):
structure = parse_pdb_file(asym_cif, 'cif') # by parsing structure here, we only parse it once
if structure == None:
log.error("{}_{} could not be transformed".format(pdb_id, struct_chains[i]))
no_trans.append(pdb_id)
else:
chain_cif = PDBXreader(inputfile = asym_cif).atoms(format_type = "mmcif", excluded=()).query('label_asym_id == @struct_chains[@i]').copy()
max_occ = chain_cif["occupancy"].max()
if max_occ < 1.0:
log.warning("Chain {} of {} has low occupancy (<1.0). Not transforming...".format(struct_chains[i], pdb_id))
no_trans.append(pdb_id)
continue
# print(chain_cif.head())
#print occupancies of chain_cif
# print(chain_cif["occupancy"].unique())
ec = pdb_transform(structure, trans_cif, matrices[i], auth_chains[i], fmt = 'cif')
if ec == 0:
log.info("{}_{} transformed".format(pdb_id, struct_chains[i]))
elif ec == 1:
log.error("{}_{} could not be transformed".format(pdb_id, struct_chains[i]))
no_trans.append(pdb_id)
else:
pass
return no_trans
## EXPERIMENTAL DATA AND VALIDATION
def get_experimental_data(pdb_ids, exp_data_dir, out):
"""
Retrieves experimental data from PDBe rest API
and saves subset of columns to pandas dataframe, and pickle.
"""
exp_data_dfs = []
for pdb_id in pdb_ids:
pdb_expd_out = os.path.join(exp_data_dir, "{}_exp_data.json".format(pdb_id))
if os.path.isfile(pdb_expd_out):
exp_data_df = pd.read_json(pdb_expd_out, convert_axes = False, dtype = False)
exp_data_dfs.append(exp_data_df)
else:
try:
exp_data = pd.read_json("https://www.ebi.ac.uk/pdbe/api/pdb/entry/experiment/{}".format(pdb_id), convert_axes = False, dtype = False)
exp_data_dict = exp_data.loc[0, pdb_id]
exp_data_df = pd.DataFrame({k: v for k, v in exp_data_dict.items() if k in exp_data_cols}, index = [0])
exp_data_df["pdb_id"] = pdb_id
exp_data_df.to_json(pdb_expd_out)
exp_data_dfs.append(exp_data_df)
log.debug("Experimental data retrieved for {}".format(pdb_id))
time.sleep(sleep_time)
except HTTPError as e:
log.error("Experimental data not retrieved for {}: {}".format(pdb_id, e))
continue
except URLError as u:
log.error("Experimental data not retrieved for {}: {}".format(pdb_id, u))
continue
master_exp_data_df = pd.concat(exp_data_dfs)
master_exp_data_df.reset_index(drop = True, inplace = True)
master_exp_data_df.to_pickle(out)
return master_exp_data_df
## SIMPLIFYING PDB FILES
def get_simple_pdbs(trans_dir, simple_dir, OVERRIDE_SIMPLE = False):
"""
This function simplifies a group of CIF files that have been
transformed and are superimposed in space. It will only keep the
ATOM records of the first file. For the rest, it will only
save the HETATM records, corresponding to ligands.
"""
cif_files = [os.path.join(trans_dir, f) for f in os.listdir(trans_dir) if f.endswith("_trans.cif")]
first_simple = os.path.join(simple_dir, os.path.basename(cif_files[0]))
if OVERRIDE_SIMPLE or not os.path.isfile(first_simple):
shutil.copy(cif_files[0], first_simple)
else:
pass
# if os.path.isfile(first_simple):
# pass
# else:
# shutil.copy(cif_files[0], first_simple)
for cif_in in cif_files[1:]: #0 of os.listdir(trans_dir) will be the one showing the ribbon
pdb_id = os.path.basename(cif_in)[:6]
cif_out = os.path.join(simple_dir, os.path.basename(cif_in))
if OVERRIDE_SIMPLE or not os.path.isfile(cif_out):
#delete cif_out if it exists
if os.path.isfile(cif_out):
os.remove(cif_out)
cif_df = PDBXreader(inputfile = cif_in).atoms(format_type = "mmcif", excluded=())
hetatm_df = cif_df.query('group_PDB == "HETATM"')
if len(hetatm_df) == 0:
log.info("No HETATM records in {}".format(pdb_id))
continue
else:
hetatm_df = hetatm_df.replace({"label_alt_id": ""}, " ")
w = PDBXwriter(outputfile = cif_out)
w.run(hetatm_df, format_type = "mmcif") # category by default is "label". I think this is making 3DMol.js parser not work, as it uses "auth".
log.debug("{} simplified".format(pdb_id))
else:
continue
# if os.path.isfile(cif_out):
# continue
# # print(cif_in)
# cif_df = PDBXreader(inputfile = cif_in).atoms(format_type = "mmcif", excluded=())
# hetatm_df = cif_df.query('group_PDB == "HETATM"') #[pdb_df.group_PDB == "HETATM"]
# if len(hetatm_df) == 0:
# log.info("No HETATM records in {}".format(pdb_id))
# continue
# hetatm_df = hetatm_df.replace({"label_alt_id": ""}, " ")
# w = PDBXwriter(outputfile = cif_out)
# w.run(hetatm_df, format_type = "mmcif") # category by default is "label". I think this is making 3DMol.js parser not work, as it uses "auth".
# log.debug("{} simplified".format(pdb_id))
## RELATIVE INTERSECTION, AND METRIC FUNCTIONS
def get_intersect_rel_matrix(binding_ress):
"""
Given a set of ligand binding residues, calcualtes a
similarity matrix between all the different sets of ligand
binding residues.
"""
inters = {i: {} for i in range(len(binding_ress))}
for i in range(len(binding_ress)):
inters[i][i] = intersection_rel(binding_ress[i], binding_ress[i])
for j in range(i+1, len(binding_ress)):
inters[i][j] = intersection_rel(binding_ress[i], binding_ress[j])
inters[j][i] = inters[i][j]
return inters # implement different distances, e.g., jaccard_sim
def intersection_rel(l1, l2):
"""
Calculates relative intersection.
"""
len1 = len(l1)
len2 = len(l2)
I_max = min([len1, len2])
I = len(list(set(l1).intersection(l2)))
return I/I_max
def download_and_move_files(pdb_ids, asymmetric_dir, bio=False, OVERRIDE_PDB=False):
"""
Downloads CIF of a series of PDB IDs and moves
them to a given directory. If OVERRIDE_PDB is True,
any existing file will be deleted before the download.
"""
cifs = []
for pdb_id in pdb_ids:
# Define file names
file_suffix = "_bio.cif" if bio else ".cif"
cif_in = os.path.join(cfg.db_root, cfg.db_pdbx, f"{pdb_id}{file_suffix}")
cif_out = os.path.join(asymmetric_dir, f"{pdb_id}{file_suffix}")
# Check if the file exists
file_exists = os.path.isfile(cif_out)
# If file exists and OVERRIDE_PDB is True, delete the file
if file_exists and OVERRIDE_PDB:
log.debug(f"{cif_out} exists and will be overridden.")
os.remove(cif_out)
file_exists = False # Update existence status after deletion
# If the file does not exist, proceed to download and move
if not file_exists:
download_structure_from_pdbe(pdb_id, bio=bio)
shutil.move(cif_in, cif_out)
else:
log.debug(f"{cif_out} already exists!")
cifs.append(cif_out)
return cifs
def get_SIFTS_from_CIF(cif_df, pdb_id):
"""
Generates PDB2UP and UP2PDB SIFTS mapping
dictionaries given a dataframe and a PDB ID.
"""
df_chains = sorted(cif_df.label_asym_id.unique().tolist())
pdb2up = {pdb_id: {}}
up2pdb = {pdb_id: {}}
chain2acc = {}
for chain in df_chains:
#print(chain)
df_chain = cif_df.query('label_asym_id == @chain & group_PDB == "ATOM" & pdbx_sifts_xref_db_acc != "?" & pdbx_sifts_xref_db_acc != ""').copy()
#print(df_chain)
if df_chain.empty:
#log.warning("No atoms in chain {} of {}".format(chain, pdb_id))
continue
else:
df_filt = df_chain[i_cols].drop_duplicates()
df_filt = df_filt.query('pdbx_sifts_xref_db_num != "?"').copy()
try:
df_filt.auth_seq_id = df_filt.auth_seq_id.astype(int)
except:
raise
#print(pdb_id, chain, df_filt, df_chain)
#sys.exit(0)
df_filt.pdbx_sifts_xref_db_num = df_filt.pdbx_sifts_xref_db_num.astype(int)
pdb2up[pdb_id][chain] = df_filt.set_index('auth_seq_id')['pdbx_sifts_xref_db_num'].to_dict()
up2pdb[pdb_id][chain] = df_filt.set_index('pdbx_sifts_xref_db_num')['auth_seq_id'].to_dict()
up_accs_4_chain = df_filt.pdbx_sifts_xref_db_acc.unique().tolist()
try:
assert len(up_accs_4_chain) == 1
except AssertionError:
log.error("More than one UniProt accession for chain {} of {}".format(chain, pdb_id))
chain2acc[chain] = up_accs_4_chain[0] # dict from orig_label_asym_id to UniProt accession
return pdb2up, up2pdb, chain2acc
def get_loi_data_from_assembly(assembly_files, biolip_dict, acc):
"""
Returns LOI name, chain ID, and ResNum of all LOI
molecules for a given list of CIF files and a biolip
dict.
"""
ligs_dict = {}
for assembly in assembly_files:
cif_id = os.path.basename(assembly).split("_")[0]
ligs_dict[cif_id] = []
lig_names = biolip_dict[acc][cif_id]
lig_names = [lig for lig in lig_names if lig not in pdb_resnames] # filtering out protein amino acids LOIs
# print(assembly, cif_id)
cif_df = PDBXreader(inputfile = assembly).atoms(format_type = "mmcif", excluded=())
for lig in lig_names:
cif_df.auth_seq_id = cif_df.auth_seq_id.astype(int)
lig_rows = cif_df.query('label_comp_id == @lig').copy().drop_duplicates(["auth_comp_id", "auth_asym_id","auth_seq_id"])
lig_data = list(lig_rows[["auth_comp_id","auth_asym_id","auth_seq_id"]].itertuples(index=False, name=None))
ligs_dict[cif_id].extend(lig_data)
return ligs_dict
def extract_assembly_metadata(assembly_path, section_name):
"""
Extracts assembly metadata, written specifically for
chain remapping from asymmetric unit to biological
assembly, so SIFTS mapping can be extrapolated from
asymmetric unit to preferred assembly.
"""
# Flag to indicate if we are in the relevant section
in_section = False
# List to hold the extracted data
data = []
columns = []
# Open the file and read line by line
with open(assembly_path, 'r') as file:
for line in file:
# Check if we've reached the relevant section
if line.startswith(section_name):
in_section = True
columns.append(line.strip().split(".")[-1])
continue
# Check if we've reached the end of the section
if line.startswith("#") and in_section:
break
# Extract data if in the relevant section
if in_section and not line.startswith(section_name):
data.append(line.strip().split())
df = pd.DataFrame(data, columns=columns)
return df
def run_arpeggio(pdb_path, lig_sel, out_dir):
"""
runs Arpeggio
"""
args = [
arpeggio_python_bin, arpeggio_bin, pdb_path,
"-s", lig_sel, "-o", out_dir, "--mute"
]
cmd = " ".join(args)
exit_code = os.system(cmd)
return exit_code, cmd
def switch_columns(df, names):
# Columns to switch
columns_to_switch = [
'auth_asym_id', 'auth_atom_id', 'auth_seq_id', 'label_comp_id'
]
# Iterate through the DataFrame and switch columns where necessary
for index, row in df.iterrows():
if row['label_comp_id_end'] in names:
for col in columns_to_switch:
bgn_col = f"{col}_bgn"
end_col = f"{col}_end"
df.at[index, bgn_col], df.at[index, end_col] = df.at[index, end_col], df.at[index, bgn_col]
return df
def map_values(row, pdb2up, pdb_id):
"""
maps UniProt ResNums from SIFTS dictionary from CIF file to Arpeggio dataframe.
"""
try:
return pdb2up[pdb_id][row['orig_label_asym_id_end']][row['auth_seq_id_end']]
except KeyError:
log.debug("Residue {} {} has no mapping to UniProt".format(row['orig_label_asym_id_end'], row['auth_seq_id_end']))
return np.nan # if there is no mapping, return NaN
def map_values_dssp(row, pdb2up, pdb_id, remap_dict):
"""
maps UniProt ResNums from SIFTS dictionary from CIF file to Arpeggio dataframe.
"""
try:
return pdb2up[pdb_id][remap_dict[row['THE_CHAIN']]][row['PDB_ResNum']]
except:
return np.nan
def process_arpeggio_df(arp_df, pdb_id, ligand_names, chain_remap_dict, pdb2up, chain2acc, acc, segment_start, segment_end):
"""
Process Arpeggio Df to put in appropriate
format to extract fingerprings. Also filter out
non-relevant interactions.
"""
#print(pdb_id)
#print(arp_df)
arp_df_end_expanded = arp_df['end'].apply(pd.Series)
arp_df_bgn_expanded = arp_df['bgn'].apply(pd.Series)
arp_df = arp_df.join(arp_df_end_expanded).drop(labels='end', axis=1)
arp_df = arp_df.join(arp_df_bgn_expanded, lsuffix = "_end", rsuffix = "_bgn").drop(labels='bgn', axis = 1)
inter_df = arp_df.query('interacting_entities == "INTER" & type == "atom-atom"').copy().reset_index(drop = True)
inter_df = inter_df[inter_df['contact'].apply(lambda x: 'clash' not in x)].copy().reset_index(drop = True) # filtering out clashes
inter_df = inter_df.query('label_comp_id_bgn in @pdb_resnames or label_comp_id_end in @pdb_resnames').copy().reset_index(drop = True) # filtering out ligand-ligand interactions
if inter_df.empty:
log.warning("No protein-ligand interaction for {}".format(pdb_id))
return inter_df, "no-PL-inters"
inter_df = inter_df.query('label_comp_id_bgn in @ligand_names or label_comp_id_end in @ligand_names').copy().reset_index(drop = True) # filtering out non-LOI interactions (only to avoid re-running Arpeggio, once it has been run with wrong selection)
#print(inter_df)
switched_df = switch_columns(inter_df, ligand_names)
#print(switched_df)
switched_df = switched_df.query('label_comp_id_end in @pdb_resnames').copy() # filtering out non-protein-ligand interactions
#print(pdb_id, ligand_names)
# Add original label_asym_id from asymmetric unit
switched_df["orig_label_asym_id_end"] = switched_df.auth_asym_id_end.map(chain_remap_dict)
#print(switched_df)
# Apply the function and create a new column
switched_df["UniProt_ResNum_end"] = switched_df.apply(lambda row: map_values(row, pdb2up, pdb_id), axis=1)
# Add original label_asym_id from asymmetric unit
switched_df["UniProt_acc_end"] = switched_df.orig_label_asym_id_end.map(chain2acc)
#print(switched_df)
prot_acc_inters = switched_df.query('UniProt_acc_end == @acc').copy() # filtering out non-POI interactions
if prot_acc_inters.empty:
log.warning("No interactions with protein atoms for {}".format(pdb_id))
return prot_acc_inters, "no-POI-inters"
segment_inters = prot_acc_inters.query('@segment_start <= UniProt_ResNum_end <= @segment_end').copy() # filtering out non-segment interactions
if segment_inters.empty:
log.warning("No interactions with protein segment atoms between {}-{} for {}".format(segment_start, segment_end, pdb_id))
return segment_inters, "no-SOI-inters"
segment_inters = segment_inters.sort_values(by=["auth_asym_id_end", "UniProt_ResNum_end", "auth_atom_id_end"]).reset_index(drop = True)
#print(segment_inters)
#print(segment_inters)
return segment_inters, "OK"
def generate_dictionary(mmcif_file):
"""
Generates coordinate dictionary from a mmCIF file.
"""
# Parse the mmCIF file
mmcif_dict = MMCIF2Dict(mmcif_file)
# Initialise the result dictionary
result = {}
# Iterate through the atoms and populate the dictionary
for i, auth_asym_id in enumerate(mmcif_dict["_atom_site.auth_asym_id"]):
label_comp_id_end = mmcif_dict["_atom_site.label_comp_id"][i]
auth_seq_id = mmcif_dict["_atom_site.auth_seq_id"][i]
auth_atom_id_end = mmcif_dict["_atom_site.auth_atom_id"][i]
x = mmcif_dict["_atom_site.Cartn_x"][i]
y = mmcif_dict["_atom_site.Cartn_y"][i]
z = mmcif_dict["_atom_site.Cartn_z"][i]
# Dictionary creation
result[auth_asym_id, label_comp_id_end, int(auth_seq_id), auth_atom_id_end] = [x, y, z]
return result
def determine_width(interactions):
"""
Generates cylinder width for 3DMol.js interaction
representation depending on Arpeggio contact
fingerprint.
"""
return 0.125 if 'vdw_clash' in interactions else 0.0625
def determine_color(interactions):
"""
Generates cylinder colour for 3DMol.js interaction
representation depending on Arpeggio contact
fingerprint.
"""
undef = ['covalent', 'vdw', 'vdw_clash', 'proximal']
if len(interactions) == 1 and interactions[0] in undef:
return '#999999'
else:
colors = [interaction_to_color[interaction] for interaction in interactions if interaction in interaction_to_color and interaction not in undef]
if colors:
return colors[0]
else:
log.critical("No color found for {}".format(interactions))
return None # Return the first color found, or None if no match
def get_arpeggio_fingerprints(pdb_ids, assembly_cif_dir, asymmetric_dir, arpeggio_dir, chain_remapping_dir, cif_sifts_dir, ligs_dict, acc, segment_start, segment_end, OVERRIDE = False, OVERRIDE_ARPEGGIO = False):
"""
Given a series of PDB IDs, runs Arpeggion on the preferred assemblies
of those IDs, also gathers chain remapping data, in order to do some
filtering of the interactions. Saves CIF-derived SIFTS mappings, as well
as chain remapping dictionary, processed Arpeggio table and returns finger-
print dictionary.
"""
fp_dict = {}
no_mapping_pdbs = []
fp_status = {}
for pdb_id in pdb_ids:
assembly_path = os.path.join(assembly_cif_dir, "{}_bio.cif".format(pdb_id))
basename = os.path.splitext(os.path.basename(assembly_path))[0]
remapping_out = os.path.join(chain_remapping_dir, basename + "_chain_remapping.pkl")
input_struct = os.path.join(asymmetric_dir, "{}.cif".format(pdb_id))
pdb2up_out = os.path.join(cif_sifts_dir, "{}_pdb2up.pkl".format(pdb_id))
up2pdb_out = os.path.join(cif_sifts_dir, "{}_up2pdb.pkl".format(pdb_id))
chain2acc_out = os.path.join(cif_sifts_dir, "{}_chain2acc.pkl".format(pdb_id))
if ligs_dict[pdb_id] == []:
log.warning("No LOI in assembly for {}".format(pdb_id))
fp_status[pdb_id] = "No-LOI"
continue
lig_sel = " ".join(["/{}/{}/".format(el[1], el[2]) for el in ligs_dict[pdb_id]])
ligand_names = list(set([el[0] for el in ligs_dict[pdb_id]]))
# read assembly and check chain number
n_chains_assembly = len(PDBXreader(inputfile = assembly_path).atoms(format_type = "mmcif", excluded=()).query('group_PDB == "ATOM"').label_asym_id.unique()) # HARD THRESHOLD. NOT RUNNING ARPEGGIO ON ASSEMBLIES WITH > 50 CHAINS
if n_chains_assembly > 24:
log.warning("25 chains or more. NOT RUNNING ARPEGGIO for {}!".format(pdb_id))
fp_status[pdb_id] = "Many-chains"
continue
arpeggio_out = os.path.join(arpeggio_dir, basename + ".json")
if OVERRIDE_ARPEGGIO or not os.path.isfile(arpeggio_out): # changed from OVERRIDE to OVERRIDE_ARPEGGIO, to avoid re-running Arpeggio when it has already been run
ec, cmd = run_arpeggio(assembly_path, lig_sel, arpeggio_dir)
if ec != 0:
log.error("Arpeggio failed for {} with {}".format(pdb_id, cmd))
fp_status[pdb_id] = "Arpeggio-fail"
continue
else:
log.debug("{} already exists!".format(arpeggio_out))
pass
# print(arpeggio_out, os.path.getsize(arpeggio_out))
arp_df = pd.read_json(arpeggio_out)
#print(arp_df)
arpeggio_proc_df_out = os.path.join(arpeggio_dir, basename + "_proc.pkl")
if OVERRIDE or not os.path.isfile(arpeggio_proc_df_out):
if OVERRIDE or not os.path.isfile(remapping_out):
chain_remap_df = extract_assembly_metadata(
assembly_path,
"_pdbe_chain_remapping"
)
chain_remap_df.to_pickle(remapping_out)
else:
log.debug("Loading remapping dataframe!")
chain_remap_df = pd.read_pickle(remapping_out)
#print(pdb_id)
try:
chain_remap_dict = dict(zip(chain_remap_df["new_auth_asym_id"], chain_remap_df["orig_label_asym_id"])) # dict from new_auth_asym_id to orig_label_asym_id
except KeyError:
log.error("No chain remapping data for {}".format(pdb_id)) # example: 8gia. No chain remapping data, legacy CIF. Downloaded editing the URL, removing "-".
no_mapping_pdbs.append(pdb_id)
fp_status[pdb_id] = "No-mapping"
continue
if OVERRIDE or not os.path.isfile(pdb2up_out) or not os.path.isfile(up2pdb_out) or not os.path.isfile(chain2acc_out):
asym_cif_df = PDBXreader(inputfile = input_struct).atoms(format_type = "mmcif")
#print(input_struct)
pdb2up, up2pdb, chain2acc = get_SIFTS_from_CIF(asym_cif_df, pdb_id)
dump_pickle(pdb2up, pdb2up_out)
dump_pickle(up2pdb, up2pdb_out)
dump_pickle(chain2acc, chain2acc_out)
else:
log.debug("Loading CIF SIFTS mapping dicts!")
pdb2up = load_pickle(pdb2up_out)
up2pdb = load_pickle(up2pdb_out)
chain2acc = load_pickle(chain2acc_out)
proc_inters, fp_stat = process_arpeggio_df(
arp_df, pdb_id, ligand_names, chain_remap_dict,
pdb2up, chain2acc, acc, segment_start, segment_end
)
fp_status[pdb_id] = fp_stat
#if fp_stat != "OK":
# continue
#print(proc_inters)
coords_dict = generate_dictionary(assembly_path)
proc_inters["coords_end"] = proc_inters.set_index(["auth_asym_id_end", "label_comp_id_end", "auth_seq_id_end", "auth_atom_id_end"]).index.map(coords_dict.get)
proc_inters["coords_bgn"] = proc_inters.set_index(["auth_asym_id_bgn", "label_comp_id_bgn", "auth_seq_id_bgn", "auth_atom_id_bgn"]).index.map(coords_dict.get)
proc_inters["width"] = proc_inters["contact"].apply(determine_width)
proc_inters["color"] = proc_inters["contact"].apply(determine_color)
proc_inters.to_pickle(arpeggio_proc_df_out)
else:
log.debug("{} already exists!".format(arpeggio_proc_df_out))
proc_inters = pd.read_pickle(arpeggio_proc_df_out)
proc_inters_indexed = proc_inters.set_index(["label_comp_id_bgn", "auth_asym_id_bgn", "auth_seq_id_bgn"])
#print(proc_inters_indexed.head())
#print(proc_inters_indexed)
lig_fps_status = {}
for el in ligs_dict[pdb_id]:
try:
lig_rows = proc_inters_indexed.loc[[el], :].copy() # Happens for 7bf3 (all aa binding MG are artificial N-term), also for low-occuoancy ligands? e.g., 5srs, 5sq5
#print(lig_rows)
#if pdb_id == "8dbs":
#print(el, lig_rows, lig_rows.isnull().values)
if lig_rows.isnull().values.all(): # need all so works only when WHOLE row is Nan
log.warning("No interactions for ligand {} in {}".format(el, pdb_id))
lig_fps_status[el] = "No-PLIs"
continue
###### CHECK IF LIGAND FINGERPRINT IS EMPTY ######
lig_rows.UniProt_ResNum_end = lig_rows.UniProt_ResNum_end.astype(int)
lig_fp = lig_rows.UniProt_ResNum_end.unique().tolist()
#print(lig_fp)
lig_key = "{}_".format(pdb_id) + "_".join([str(l) for l in el])
fp_dict[lig_key] = lig_fp
#lig_fps_status[lig_key] = "OK"
except:
#raise #ValueError("No interactions for ligand {} in {}".format(el, pdb_id))
log.warning("Empty fingerprint for ligand {} in {}".format(el, pdb_id))
continue
bad_lig_fps = [k for k, v in lig_fps_status.items() if v != "OK"]
if set(bad_lig_fps) == set(ligs_dict[pdb_id]):
#log.warning("No Segment-LOI interactions in {}".format(pdb_id))
fp_status[pdb_id] = "No-PLIs" # changing fp_status to reflect that there are no protein-ligand interactions
return fp_dict, no_mapping_pdbs, fp_status
def get_labs(fingerprints_dict):
"""
Returns all ligand labels from fingerprints dict.
"""
return [k for k in fingerprints_dict.keys()]
def get_inters(fingerprints_dict):
"""
Returns all ligand fingerprints from fingerprints dict.