Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added indicators from tushare #661

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
162 changes: 62 additions & 100 deletions pandas_ta/momentum/kdj.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,62 @@
# -*- coding: utf-8 -*-
from pandas import DataFrame
from pandas_ta.overlap import rma
from pandas_ta.utils import get_offset, non_zero_range, verify_series


def kdj(high=None, low=None, close=None, length=None, signal=None, offset=None, **kwargs):
"""Indicator: KDJ (KDJ)"""
# Validate Arguments
length = int(length) if length and length > 0 else 9
signal = int(signal) if signal and signal > 0 else 3
_length = max(length, signal)
high = verify_series(high, _length)
low = verify_series(low, _length)
close = verify_series(close, _length)
offset = get_offset(offset)

if high is None or low is None or close is None: return

# Calculate Result
highest_high = high.rolling(length).max()
lowest_low = low.rolling(length).min()

fastk = 100 * (close - lowest_low) / non_zero_range(highest_high, lowest_low)

k = rma(fastk, length=signal)
d = rma(k, length=signal)
j = 3 * k - 2 * d

# Offset
if offset != 0:
k = k.shift(offset)
d = d.shift(offset)
j = j.shift(offset)

# Handle fills
if "fillna" in kwargs:
k.fillna(kwargs["fillna"], inplace=True)
d.fillna(kwargs["fillna"], inplace=True)
j.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
k.fillna(method=kwargs["fill_method"], inplace=True)
d.fillna(method=kwargs["fill_method"], inplace=True)
j.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Categorize it
_params = f"_{length}_{signal}"
k.name = f"K{_params}"
d.name = f"D{_params}"
j.name = f"J{_params}"
k.category = d.category = j.category = "momentum"

# Prepare DataFrame to return
kdjdf = DataFrame({k.name: k, d.name: d, j.name: j})
kdjdf.name = f"KDJ{_params}"
kdjdf.category = "momentum"

return kdjdf


kdj.__doc__ = \
"""KDJ (KDJ)

The KDJ indicator is actually a derived form of the Slow
Stochastic with the only difference being an extra line
called the J line. The J line represents the divergence
of the %D value from the %K. The value of J can go
beyond [0, 100] for %K and %D lines on the chart.

Sources:
https://www.prorealcode.com/prorealtime-indicators/kdj/
https://docs.anychart.com/Stock_Charts/Technical_Indicators/Mathematical_Description#kdj

Calculation:
Default Inputs:
length=9, signal=3
LL = low for last 9 periods
HH = high for last 9 periods

FAST_K = 100 * (close - LL) / (HH - LL)

K = RMA(FAST_K, signal)
D = RMA(K, signal)
J = 3K - 2D

Args:
high (pd.Series): Series of 'high's
low (pd.Series): Series of 'low's
close (pd.Series): Series of 'close's
length (int): Default: 9
signal (int): Default: 3
offset (int): How many periods to offset the result. Default: 0

Kwargs:
fillna (value, optional): pd.DataFrame.fillna(value)
fill_method (value, optional): Type of fill method

Returns:
pd.Series: New feature generated.
"""

"""
KDJ

Not sure if this is the same as the existing Stochastic Indicator
Please reference https://www.moomoo.com/us/learn/detail-what-is-the-kdj-67019-220809006
"""
import numpy as np
import pandas as pd
from pandas_ta.utils import verify_series
def kdj(close, low, high):

# Validate arguments

close = verify_series(close)
low = verify_series(low)
high = verify_series(high)

if close is None: return
if low is None: return
if high is None: return

#calculate

rsv = (close - low) / (high - low) * 100
K, D, J = [], [], []


K.append( (2 / 3) * 50 + (1 / 3) * rsv[0])


D.append((2 / 3) * 50 + (1 / 3) * K[0])

J.append(3 * K[0] - 2 * D[0])

for i in range(1,len(rsv)):
K.append((2 / 3) * K[i-1] + (1 / 3) * rsv[i])
D.append((2 / 3) * D[i-1] + (1 / 3) * K[i])
J.append(3 * K[i] - 2 * D[i])

# Prepare DataFrame to return

K = pd.DataFrame(K)
D = pd.DataFrame(D)
J = pd.DataFrame(J)


K.name = 'STOCH_K_TUSHARE'
D.name = 'STOCH_D_TUSHARE'
J.name = 'STOCH_J_TUSHARE'

K.category = D.category = J.category = "momentum"


df = pd.DataFrame()
df["K"] = K
df["D"] = D
df["J"] = J

df.name = 'KDJ'
df.category = K.category
return df
40 changes: 40 additions & 0 deletions pandas_ta/trend/adxr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
ADXR
"""

import numpy as np
import pandas as pd
from pandas_ta.utils import verify_series
from pandas_ta.trend import adx
def adxr(high, low, close, n=None):

# Validate arguments
close = verify_series(close)
low = verify_series(low)
high = verify_series(high)


if close is None: return
if low is None: return
if high is None: return

n = n if n and n > 0 else 6

ADX = adx(high, low, close, length=n).iloc[:, 0]
print(ADX)
ADXR = []

for i in range(0,len(close)):

if i>= n:
adxr = (ADX[i] + ADX[i - n]) / 2
ADXR.append(adxr)
else:
ADXR.append(0)

# Prepare DataFrame to return

ADXR = pd.DataFrame(ADXR)
ADXR.name = 'ADXR'
ADXR.category = 'trend'
return ADXR
42 changes: 42 additions & 0 deletions pandas_ta/trend/bbi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Indicator: BBI Bull Bear Index
While this was inspired by Tushare, I found a different way to calculate this at https://www.tradingview.com/script/KvLCcuHB-BBI-4MA-4-overlay/

"""
from pandas_ta.overlap import ma
import numpy as np
import pandas as pd
from pandas_ta.utils import verify_series

def bbi(close, shorter=None, short=None, long=None, longer=None):
# Validate arguments
close = verify_series(close)
if close is None: return
#usually a 3, 6, 12, and 24 day sma is used here I think
#but it may be more useful to allow this to be changed
shorter = shorter if shorter and shorter > 0 else 3
short = short if short and short > 0 else 6
long = long if long and long > 0 else 12
longer = longer if longer and longer > 0 else 24



CS = []
BBI = []
for i in range(0,len(close)):
CS.append(close[i])

if len(CS) < 24:
BBI.append(close[i])
else:
#bbi = np.average([np.average(CS[-3:]), np.average(CS[-6:]), np.average(CS[-12:]), np.average(CS[-24:])])
bbi = (ma("sma", CS[-shorter:], length=shorter)+ma("sma", CS[-short:], length=short)+ma("sma", CS[-long:], length=long)+ma("sma", CS[-longer:], length=longer)/4)
BBI.append(bbi)

# Prepare DataFrame to return
BBI = pd.DataFrame(BBI)
BBI.name = 'BBI'
BBI.category = 'trend'
return BBI


47 changes: 47 additions & 0 deletions pandas_ta/trend/wr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
WR Williams Overbought/Oversold Index


"""
import pandas as pd
from pandas_ta.utils import verify_series

def wnr(close, low, high, n=None):

# Validate arguments

close = verify_series(close)
low = verify_series(low)
high = verify_series(high)

if close is None: return
if low is None: return
if open is None: return
n = n if n and n > 0 else 14

#calculate

high_prices = []
low_prices = []
WNR = []

for i in range(0,len(close)):
high_prices.append(high[i])
if len(high_prices) == n:
del high_prices[0]
low_prices.append(low[i])
if len(low_prices) == n:
del low_prices[0]

highest = max(high_prices)
lowest = min(low_prices)

wnr = (highest - close[i]) / (highest - lowest) * 100
WNR.append(wnr)
# Prepare DataFrame to return
WNR = pd.DataFrame(WNR)
WNR.category = "trend"
WNR.name = "Williams_Index"

return WNR

65 changes: 65 additions & 0 deletions pandas_ta/volatility/asi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Indicator: ASI Accumulation Swing Index
Inspired by tushare

"""
import numpy as np
from pandas_ta.overlap import ma
import pandas as pd
from pandas_ta.utils import verify_series

def asi(close, low, high, open, n=None):

# Validate arguments

close = verify_series(close)
low = verify_series(low)
high = verify_series(high)
open = verify_series(open)

if close is None: return
if low is None: return
if high is None: return
if open is None: return

n = n if n and n > 0 else 5
#calculate
SI = []
SI.append(0.)

for i in range(1,len(close)):
a = abs(close[i] - close[i-1])
b = abs(low[i] - close[i-1])
c = abs(high[i] - close[i-1])
d = abs(close[i-1] - open[i-1])

if b > a and b > c:
r = b + (1 / 2) * a + (1 / 4) * d
elif c > a and c > b:
r = c + (1 / 4) * d
else:
r = 0

e = close[i] - close[i-1]
f = close[i] - open[i-1]
g = close[i-1] - open[i-1]

x = e + (1 / 2) * f + g
k = max(a, b)
l = 3

if np.isclose(r, 0) or np.isclose(l, 0):
si = 0
else:
si = 50 * (x / r) * (k / l)

SI.append(si)

# Prepare DataFrame to return
SI = pd.DataFrame(SI)

ASI = SI.rolling(n).mean()
ASI.name = 'ASI'
ASI.category = 'volatility'
return ASI

Loading