-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
149 lines (125 loc) · 4.52 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# python -m venv C:\Users\pc\MyCodebase\Streamlit\sltest
# activate
# python -m pip install --upgrade pip
# pip install streamlit
# pip install datetime
# pip install re
# pip install mechanicalsoup
# pip install requests
# pip freeze (per vedere le versioni da mettere in requirements.txt)
# streamlit run main.py
# ctrl+C
# deactivate
import streamlit as sl
import datetime
import re
import pandas as pd
from requests import get
from io import BytesIO
# @sl.cache
def locate_dump():
datadump = sl.secrets['files_position']['datadump']
if datadump.startswith('http'):
r = get(datadump)
data = pd.read_json(BytesIO(r.content))
else:
data = pd.read_json(datadump)
data.index.names = ['index']
return data
# DEFINIZIONE DEL FILTRO DA APPLICARE
def suitable(isin, data, dpolicy, scat, yold, discount, ter, dyeld, repl):
etf = data.loc[data['isin'] == isin].to_dict('records')
if len(etf):
etf = etf[0]
else:
return False
#print(etf)
# Distribution Policy
if etf['distribution_policy'] == dpolicy:
return False
# Capitalization
if etf['fund_size_category'] not in scat:
return False
# Index Replication
if repl=='Fisica':
if re.match(r"[pP]hysical", etf['replication']) is None:
return False
else:
if re.match(r"[pP]hysical", etf['replication']) is not None:
return False
# Age
dt = datetime.timedelta(days=yold*365) # at least 3 years old
# dt = datetime.timedelta(days=5*365) #at least 5 years old
# dt = datetime.timedelta(days=1*365) #at least 1 year old
# dt = datetime.timedelta(days=10*365) #at least 10 year old
if etf['inception_date'] > (datetime.datetime.now() - dt).strftime("%Y-%m-%d"):
return False
# Price
tiles = 5 # divisione in quintili
ntile = discount # primo quintile == fortemente in sconto, terzo quintile == mediamente incerto, sesto quintile == va bene qualsiasi prezzo
if etf['last_quote_value'] > (etf['one_year_low'] + ntile*(etf['one_year_high']-etf['one_year_low'])/tiles):
return False
# Gestione Economica
if etf['ter_percentage'] > (ter/100):
return False
# Yield da Distribuzione
if etf['distribution_policy'] == 'Distributing':
etf['distribution_yield'] = etf['one_year_distributions']/etf['last_quote_value']
if etf['distribution_yield'] <= (dyeld/1000):
return False
else:
etf['distribution_yield'] = 0.0
return etf
# START
sl.header('ETF Screener')
dataelab = sl.subheader('Attendi i risultati...')
data = locate_dump()
list_selection = sl.sidebar.selectbox(
"Scegli una lista", (
'Tutti gli Etf',
'A Distribuzione',
'Ad Accumulazione'
)
)
if list_selection == 'Tutti gli Etf':
FILEPATH = sl.secrets['files_position']['full_list']
dpolicy=""
elif list_selection == 'A Distribuzione':
FILEPATH = sl.secrets['files_position']['distributing_list']
dpolicy="Accumulating"
elif list_selection == 'Ad Accumulazione':
FILEPATH = sl.secrets['files_position']['accumulating_list']
dpolicy="Distributing"
# FILTRI
repl = sl.sidebar.radio("Tipo di replica",["Fisica", "Swap"])
scat = sl.sidebar.multiselect("Dimensioni", ['small cap', 'mid cap', 'high cap'])
yold = sl.sidebar.slider("Anni sul Mercato",0,30)
discount = sl.sidebar.slider("Categoria Sconto",6,1)
ter = sl.sidebar.slider("Total Expense Ratio ‰ massimo",1,100)
dyeld = sl.sidebar.slider("Yield da Distribuzione ‰ minimo",1,200)
# ELABORAZIONE
screened = []
r = get(FILEPATH)
lista = pd.read_csv(BytesIO(r.content), header=None)
# print(lista)
for index, line in lista.iterrows():
# print(line[0])
etf = suitable(line[0].strip(), data, dpolicy, scat, yold, discount, ter, dyeld, repl)
if etf:
screened.append([
etf['isin'],
etf['replication'],
etf['distribution_policy'],
etf['distribution_yield'],
etf['ter'],
etf['fund_size_category'],
etf['inception_date'],
etf['url']
])
else:
#print(f"ISIN not Valid: {line}")
pass
sl.subheader("ETF che rispettano il filtro")
df = pd.DataFrame(screened, columns=['Isin', 'Replica', 'Earnings Policy', 'Yield da Distribuzione','Ter', 'Dimensioni', 'Data Emissione', 'Link'])
dataelab.text('Attendi i risultati... Fatto!')
sl.dataframe(df)