diff --git a/.gitignore b/.gitignore index 200f8e9..4333d5e 100644 --- a/.gitignore +++ b/.gitignore @@ -262,3 +262,11 @@ cython_debug/ /report.csv /report.html dump.csv + +*.csv +*.parquet +*.pdf +*.html +*.ttf +*.ipynb +*.sketch \ No newline at end of file diff --git a/README.md b/README.md index e6ce852..61690bb 100755 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ A small backtesting utility. - [Dump](#dump) - [Influx](#influx) - [QuantStats](#quantstats) + - [PDF](#pdf) - [Specific Return](#specific-return) - [Data Sources](#data-sources) - [Yahoo](#yahoo) @@ -112,6 +113,20 @@ Generate a tearsheet from the backtest data. | `--quantstats-benchmark-ticker` | `` | `SPY` | `symbol` | Specify the ticker to use as a benchmark in the tearsheet. | | `--quantstats-auto-delete` | | `false` | | Automatically delete the previous report files if they are present. | +##### PDF + +Generate a tearsheet from a custom template. + +| Option | Value | Default | Format | Description | +| --- | --- | --- | --- | --- | +| `--pdf` | | `false` | | Enable the pdf exporter. | +| `--pdf-template` | `` | `tearsheet.sketch` | `path` | Specify the template file. | +| `--pdf-output-file` | `` | `report.pdf` | `path` | Specify the output file. | +| `--pdf-auto-delete` | | `false` | | Automatically delete the previous report file if present. | +| `--pdf-debug` | | `false` | | Enable the pdf renderer's debugging tools. | +| `--pdf-variable` | `[ ]` | `[]` | `string` `string` | Add a custom variable. | +| `--pdf-user-script` | `[]` | `[]` | `path` | Add a user script. | + ##### Specific Return Generate a tearsheet from the specific return backtest data. diff --git a/backtest/__main__.py b/backtest/__main__.py index 4e28416..4cafccb 100644 --- a/backtest/__main__.py +++ b/backtest/__main__.py @@ -1,3 +1,3 @@ -from .cli import main +from .cli import cli -main() +cli() diff --git a/backtest/cli.py b/backtest/cli.py index 6e75bac..1bd074f 100755 --- a/backtest/cli.py +++ b/backtest/cli.py @@ -1,16 +1,27 @@ import datetime +import logging import sys -import pandas as pd +import typing +import os +import importlib +import time +import webbrowser import click import dotenv +import pandas +import contexttimer +import watchdog +import watchdog.observers +import watchdog.events +import readwrite -from .utils import is_number +from .utils import is_number, use_attrs dotenv.load_dotenv() -@click.command() +@click.group(invoke_without_command=True) @click.option('--start', type=click.DateTime(formats=["%Y-%m-%d"]), default=None, help="Start date.") @click.option('--end', type=click.DateTime(formats=["%Y-%m-%d"]), default=None, help="End date.") @click.option('--offset-before-trading', type=int, default=1, show_default=True, help="Number of day to offset to push the signal before trading it.") @@ -48,6 +59,13 @@ @click.option('--quantstats-output-file-csv', type=str, default="report.csv", show_default=True, help="Specify the output csv file.") @click.option('--quantstats-benchmark-ticker', type=str, default="SPY", show_default=True, help="Specify the symbol to use as a benchmark.") @click.option('--quantstats-auto-delete', is_flag=True, help="Should conflicting files be automatically deleted?") +@click.option('--pdf', is_flag=True, help="Enable the quantstats exporter.") +@click.option('--pdf-template', type=str, default="tearsheet.sketch", show_default=True, help="Specify the template file.") +@click.option('--pdf-output-file', type=str, default="report.pdf", show_default=True, help="Specify the output pdf file.") +@click.option('--pdf-auto-delete', is_flag=True, help="Should aa conflicting file be automatically deleted?") +@click.option('--pdf-debug', is_flag=True, help="Enable renderer debugging.") +@click.option('--pdf-variable', "pdf_variables", nargs=2, multiple=True, type=(str, str), help="Specify custom variables.") +@click.option('--pdf-user-script', "pdf_user_script_paths", multiple=True, type=str, help="Specify custom scripts.") @click.option('--specific-return', type=str, help="Enable the specific return exporter by proving a .parquet.") @click.option('--specific-return-column-date', type=str, default="date", show_default=True, help="Specify the column name containing the dates.") @click.option('--specific-return-column-symbol', type=str, default="symbol", show_default=True, help="Specify the column name containing the symbols.") @@ -68,6 +86,12 @@ @click.option('--file-parquet-column-price', type=str, default="price", show_default=True, help="Specify the column name containing the prices.") @click.option('--rfr-file', type=str, help="Specify the path of the risk free rate file") @click.option('--rfr-file-column-date', type=str, default="date", help="Specify the date column of the risk free rate file") +@click.pass_context +def cli(ctx: click.Context, **kwargs): + if ctx.invoked_subcommand is None: + main(**kwargs) + + def main( start: datetime.datetime, end: datetime.datetime, offset_before_trading: int, @@ -82,6 +106,7 @@ def main( dump: str, dump_output_file: str, dump_auto_delete: bool, influx, influx_host, influx_port, influx_database, influx_measurement, influx_key, quantstats, quantstats_output_file_html, quantstats_output_file_csv, quantstats_benchmark_ticker, quantstats_auto_delete, + pdf: bool, pdf_template: str, pdf_output_file: str, pdf_auto_delete: bool, pdf_debug: bool, pdf_variables: typing.Tuple[typing.Tuple[str, str]], pdf_user_script_paths: str, specific_return: str, specific_return_column_date: str, specific_return_column_symbol: str, specific_return_column_value: str, specific_return_output_file_html: str, specific_return_output_file_csv: str, specific_return_auto_delete: bool, yahoo, coinmarketcap, coinmarketcap_force_mapping_refresh, coinmarketcap_page_size, @@ -89,6 +114,8 @@ def main( file_parquet, file_parquet_column_date, file_parquet_column_symbol, file_parquet_column_price, rfr_file: str, rfr_file_column_date: str ): + logging.getLogger('matplotlib.font_manager').setLevel(logging.ERROR) + now = datetime.date.today() quantity_in_decimal = quantity_mode == "percent" @@ -127,9 +154,9 @@ def main( end = end.date() else: end = dates[-1] - + end += datetime.timedelta(days=offset_before_ending) - + if end > now: end = now @@ -170,7 +197,8 @@ def main( ) if data_source is not None: - print(f"[info] multiple data source provider, delegating: {data_source.get_name()}", file=sys.stderr) + print( + f"[info] multiple data source provider, delegating: {data_source.get_name()}", file=sys.stderr) from .data.source import DelegateDataSource data_source = DelegateDataSource([ @@ -184,10 +212,12 @@ def main( from .data.source import YahooDataSource data_source = YahooDataSource() - print(f"[warning] no data source selected, defaulting to --yahoo", file=sys.stderr) + print( + f"[warning] no data source selected, defaulting to --yahoo", file=sys.stderr) from .price_provider import SymbolMapper - symbol_mapper = None if not symbol_mapping else SymbolMapper.from_file(symbol_mapping) + symbol_mapper = None if not symbol_mapping else SymbolMapper.from_file( + symbol_mapping) fee_model = None if fee_model_value: @@ -252,6 +282,29 @@ def main( auto_delete=specific_return_auto_delete, )) + if pdf: + from .export import QuantStatsExporter, DumpExporter + + quantstats_exporter = next( + filter(lambda x: isinstance(x, QuantStatsExporter), exporters), None) + dump_exporter = next( + filter(lambda x: isinstance(x, DumpExporter), exporters), None) + + template = _load_template(pdf_template) + user_scripts = _load_user_scripts(pdf_user_script_paths) + + from .export import PdfExporter + exporters.append(PdfExporter( + quantstats_exporter=quantstats_exporter, + dump_exporter=dump_exporter, + template=template, + output_file=pdf_output_file, + auto_delete=pdf_auto_delete, + debug=pdf_debug, + variables=_to_variables(pdf_variables), + user_scripts=user_scripts + )) + if not len(exporters): from .export import ConsoleExporter exporters.append(ConsoleExporter()) @@ -260,10 +313,10 @@ def main( f"[warning] no exporter selected, defaulting to --console", file=sys.stderr) if rfr_file: - rfr = pd.read_parquet(rfr_file) + rfr = pandas.read_parquet(rfr_file) rfr = rfr.set_index(rfr_file_column_date) else: - rfr = pd.Series() + rfr = pandas.Series(dtype="float64") from .backtest import Backtester Backtester( @@ -283,3 +336,192 @@ def main( weekends=weekends, holidays=holidays ) + + +@cli.group(name="template") +def template_group(): + pass + + +@template_group.command() +@click.argument('template-path', type=click.Path(exists=True, dir_okay=False), default="tearsheet.sketch") +def info( + template_path: str +): + template = _load_template(template_path) + + print(f"name: {template.name}") + + keys = list(sorted(( + key + for key in template.slots.keys() + if key.startswith("$") + ))) + + print(f"variables:") + for key in keys: + print(f" {key}") + + +@template_group.command() +@click.option('--output-file', type=str, default="report.pdf", show_default=True, help="Specify the output pdf file.") +@click.option('--debug', is_flag=True, help="Enable renderer debugging.") +@click.option('--variable', "variables", nargs=2, multiple=True, type=(str, str), help="Specify custom variables.") +@click.option('--user-script', "user_script_paths", multiple=True, type=str, help="Specify custom scripts.") +@click.option('--dataframe-returns', "dataframe_returns_path", type=click.Path(exists=True), help="Specify the returns dataframe path (from quantstats exporter).") +@click.option('--dataframe-benchmark', "dataframe_benchmark_path", type=click.Path(exists=True), help="Specify benchmark dataframe path (from quantstats exporter).") +@click.option('--dataframe-dump', "dataframe_dump_path", type=click.Path(exists=True), help="Specify dump dataframe path (from dump exporter).") +@click.argument('template-path', type=click.Path(exists=True, dir_okay=False), default="tearsheet.sketch") +def render( + output_file: str, + debug: bool, + variables: typing.List[str], + user_script_paths: typing.List[str], + dataframe_returns_path: typing.Optional[str], + dataframe_benchmark_path: typing.Optional[str], + dataframe_dump_path: typing.Optional[str], + template_path: str, +): + template = _load_template(template_path) + user_scripts = _load_user_scripts(user_script_paths) + + dataframe_returns = readwrite.read(dataframe_returns_path) + dataframe_benchmark = readwrite.read(dataframe_benchmark_path) + dataframe_dump = readwrite.read(dataframe_dump_path) + + if dataframe_returns is not None: + dataframe_returns["date"] = pandas.to_datetime(dataframe_returns["date"]) + dataframe_returns.set_index("date", drop=True, inplace=True) + dataframe_returns = dataframe_returns["daily_profit_pct"] + + if dataframe_benchmark is not None: + dataframe_benchmark["date"] = pandas.to_datetime(dataframe_benchmark["date"]).dt.date + dataframe_benchmark.set_index("date", drop=True, inplace=True) + dataframe_benchmark = dataframe_benchmark["close"] + + quantstats_exporter = use_attrs({ + "returns": dataframe_returns, + "benchmark": dataframe_benchmark, + }) + + if dataframe_dump is not None: + dataframe_dump["date"] = pandas.to_datetime(dataframe_dump["date"]).dt.date + + dump_exporter = use_attrs({ + "dataframe": dataframe_dump, + }) + + from .export import PdfExporter + PdfExporter( + quantstats_exporter=quantstats_exporter, + dump_exporter=dump_exporter, + template=template, + output_file=output_file, + auto_delete=True, + debug=debug, + variables=_to_variables(variables), + user_scripts=user_scripts + ).finalize() + + +@template_group.command() +@click.option('--output-file', type=str, default="report.pdf", show_default=True, help="Specify the output pdf file.") +@click.option('--debug', is_flag=True, help="Enable debug rendering.") +@click.option('--watch', is_flag=True, help="Watch and continuously re-render.") +@click.option('--open', "open_after_render", is_flag=True, help="Open after render.") +@click.argument('template-path', type=click.Path(exists=True, dir_okay=False), default="tearsheet.sketch") +def identity( + template_path: str, + output_file: str, + debug: bool, + watch: bool, + open_after_render: bool, +): + from .template import PdfTemplateRenderer + renderer = PdfTemplateRenderer(debug=debug) + + def do_render(): + with contexttimer.Timer(prefix="loading", output=sys.stderr): + template = _load_template(template_path) + + with contexttimer.Timer(prefix="rendering", output=sys.stderr): + with open(output_file, "wb") as fd: + renderer.render(template, fd) + + if open_after_render: + webbrowser.open(output_file) + + if watch: + do_render() + + directory = os.path.dirname(template_path) or "." + path = os.path.join(directory, template_path) + + class Handler(watchdog.events.FileSystemEventHandler): + + def dispatch(self, event): + if event.src_path != path: + return + + super().dispatch(event) + + def on_modified(self, event): + do_render() + + def on_created(self, event): + do_render() + + event_handler = Handler() + + observer = watchdog.observers.Observer() + observer.schedule(event_handler, directory, recursive=False) + observer.start() + + try: + print(f"watching for changes on: {template_path}") + + while True: + time.sleep(1) + except KeyboardInterrupt: + print("exit") + finally: + observer.stop() + observer.join() + else: + do_render() + + +def _load_template(path: str): + if path.endswith(".sketch"): + from .template import SketchTemplateLoader + return SketchTemplateLoader().load(path) + else: + raise click.Abort(f"unsupported template: {path}") + + +def _load_user_scripts(paths: typing.List[str]): + modules = [] + + for index, path in enumerate(paths or list()): + directory = os.path.dirname(path) + + spec = importlib.util.spec_from_file_location( + f"user_code_{index}", + path + ) + + module = importlib.util.module_from_spec(spec) + + sys.path.insert(0, directory) + spec.loader.exec_module(module) + + modules.append(module) + + return modules + + +def _to_variables(variables: typing.Optional[typing.List[typing.Tuple[str, str]]]): + return { + f"${key}": value + for key, value in (variables or tuple()) + } diff --git a/backtest/export/__init__.py b/backtest/export/__init__.py index 456f1f6..ce83b2a 100644 --- a/backtest/export/__init__.py +++ b/backtest/export/__init__.py @@ -4,4 +4,5 @@ from .quants import QuantStatsExporter from .dump import DumpExporter from .specific_return import SpecificReturnExporter +from .pdf import PdfExporter from .model import Snapshot diff --git a/backtest/export/pdf.py b/backtest/export/pdf.py new file mode 100644 index 0000000..af8d20f --- /dev/null +++ b/backtest/export/pdf.py @@ -0,0 +1,171 @@ +import abc +import datetime +import os +import typing +import quantstats +import slugify +import pandas + +from .base import BaseExporter +from .quants import QuantStatsExporter +from .dump import DumpExporter +from ..template import Template, PdfTemplateRenderer + + +_EMPTY_DICT = dict() + + +class PdfExporter(BaseExporter): + + def __init__( + self, + quantstats_exporter: typing.Optional[QuantStatsExporter], + dump_exporter: typing.Optional[DumpExporter], + template: Template, + output_file="report.pdf", + auto_delete=False, + debug=False, + variables: typing.Dict[str, str] = _EMPTY_DICT, + user_scripts: "module" = list(), + ): + self.quantstats_exporter = quantstats_exporter + self.dump_exporter = dump_exporter + self.output_file = output_file + self.auto_delete = auto_delete + self.template = template + self.variables = variables + self.user_scripts = user_scripts + + self.renderer = PdfTemplateRenderer( + debug=debug, + ) + + @abc.abstractmethod + def initialize(self) -> None: + if os.path.exists(self.output_file): + can_delete = self.auto_delete + if not can_delete: + can_delete = input( + f"{self.output_file}: delete file? [y/N]").lower() == 'y' + + if can_delete: + os.remove(self.output_file) + + @abc.abstractmethod + def finalize(self) -> None: + df_returns = self.quantstats_exporter.returns if self.quantstats_exporter else None + df_benchmark = self.quantstats_exporter.benchmark if self.quantstats_exporter else None + + df_dump = None + if self.dump_exporter and self.dump_exporter.dataframe is not None: + df_dump = self.dump_exporter.dataframe.reset_index().sort_values(by='date') + + if df_benchmark is not None: + df_benchmark.name = "RUSSELL (1000)" + + df_metrics, df_drowdowns = None, None + if df_returns is not None: + df_returns.name = "Strategy" + df_metrics = quantstats.reports.metrics(df_returns, benchmark=df_benchmark, display=False, mode="full", as_pct=True) + df_metrics.index = df_metrics.index.map(slugify.slugify) + df_metrics.columns = df_metrics.columns.map(slugify.slugify) + + df_drowdowns = quantstats.stats.to_drawdown_series(df_returns) + if not df_drowdowns.empty: + details = quantstats.stats.drawdown_details(df_drowdowns) + df_drowdowns = details.sort_values( + by=details.columns[4], + ascending=True + )[:5] + + if df_drowdowns.empty: + df_drowdowns = None + else: + df_drowdowns["start"] = pandas.to_datetime(df_drowdowns["start"]) + df_drowdowns["end"] = pandas.to_datetime(df_drowdowns["end"]) + + self.template.apply({ + "$date": datetime.date.today().isoformat(), + }) + + figsize = (8, 5) + + if df_returns is not None: + self.template.apply({ + "$qs.montly-returns": lambda _: quantstats.plots.monthly_returns(df_returns, show=False, cbar=False, figsize=(figsize[0], figsize[0]*.5)), + "$qs.cumulative-returns": lambda _: quantstats.plots.returns(df_returns, df_benchmark, show=False, subtitle=False), + "$qs.cumulative-returns-volatility": lambda _: quantstats.plots.returns(df_returns, df_benchmark, match_volatility=df_benchmark is not None, show=False), + "$qs.eoy-returns": lambda _: quantstats.plots.yearly_returns(df_returns, df_benchmark, show=False), + "$qs.underwater-plot": lambda _: quantstats.plots.drawdown(df_returns, show=False), + }) + + # TODO Use name% format instead + NOT_PERCENT = [ + "sharpe", + "avg-drawdown-days", + "kurtosis", + "sortino", + ] + + def get_metric(column: str, name: str): + try: + name.index(".") + print(f"[warning] invalid metric name: `{name}`: contains a dot") + name = name.replace(".", "-") + except: + pass + + value = df_metrics.loc[name, column] + if name in NOT_PERCENT: + return value + else: + return f"{value}%" + + def get_drawdown(n: int, key: typing.Union[typing.Literal["dates"], typing.Literal["value"]]): + if df_drowdowns is not None and len(df_drowdowns) >= n: + index = n - 1 + row = df_drowdowns.iloc[index] + else: + row = None + + if key == "dates": + if row is None: + return "----/--/-- - ----/--/--" + + start = row["start"].strftime('%Y/%m/%d') + end = row["end"].strftime('%Y/%m/%d') + + return f"{start} - {end}" + else: + if row is None: + return "--%" + + value = row["max drawdown"] + return f"{value:.2f}%" + + if df_benchmark is not None: + self.template.apply_re({ + r"\$qs\.metric\.(strategy|benchmark)\.(.+)": lambda _, type, metric: get_metric(type, metric), + }) + else: + self.template.apply_re({ + r"\$qs\.metric\.strategy\.(.+)": lambda _, metric: get_metric("strategy", metric), + }) + + self.template.apply_re({ + r"\$qs\.worst-drawdowns\.(\d+).(dates|value)": lambda _, n, key: get_drawdown(int(n), key), + }) + + self.template.apply(self.variables) + + for user_script in self.user_scripts: + get_template_values = getattr(user_script, "get_template_values", None) + if not callable(get_template_values): + continue + + values, values_re = get_template_values(**locals()) + self.template.apply(values) + self.template.apply_re(values_re) + + with open(self.output_file, "wb") as fd: + self.renderer.render(self.template, fd) diff --git a/backtest/export/quants.py b/backtest/export/quants.py index 7c843df..90ff68f 100644 --- a/backtest/export/quants.py +++ b/backtest/export/quants.py @@ -108,6 +108,9 @@ def finalize(self) -> None: else: print(f"[warning] {self.csv_output_file} already exists", file=sys.stderr) + self.returns = returns + self.benchmark = benchmark + if self.html_output_file is not None: if self.auto_override or not os.path.exists(self.html_output_file): quantstats.reports.html(returns, benchmark=benchmark, output=self.html_output_file, active_returns=False) diff --git a/backtest/template/__init__.py b/backtest/template/__init__.py new file mode 100644 index 0000000..e4bd5d3 --- /dev/null +++ b/backtest/template/__init__.py @@ -0,0 +1,3 @@ +from .template import Template, TemplateLoader, TemplateRenderer +from .sketch import SketchTemplateLoader +from .pdf import PdfTemplateRenderer diff --git a/backtest/template/models.py b/backtest/template/models.py new file mode 100644 index 0000000..010e7d7 --- /dev/null +++ b/backtest/template/models.py @@ -0,0 +1,158 @@ +import dataclasses +import io +import typing +import enum +import cached_property + +Identifier = typing.Any +NaturalIdentifier = str + + +@dataclasses.dataclass +class Vector2: + + x: typing.Union[int, float] + y: typing.Union[int, float] + + def __add__(self, other): + if isinstance(other, Vector2): + return Vector2( + self.x + other.x, + self.y + other.y, + ) + + if isinstance(other, (int, float)): + return Vector2( + self.x + other, + self.y + other, + ) + + raise ValueError(f"unsupported operator with: {type(other)}") + + @staticmethod + def zero(): + return Vector2(0, 0) + + +@dataclasses.dataclass +class Rectangle2: + + x: typing.Union[int, float] + y: typing.Union[int, float] + width: typing.Union[int, float] + height: typing.Union[int, float] + + @property + def tuple(self): + return (self.x, self.y, self.width, self.height) + + +@dataclasses.dataclass +class Color: + + red: int + green: int + blue: int + alpha: int + + @property + def hex_string(self) -> typing.Tuple[int, int, int]: + return '#%02x%02x%02x%02x' % ( + int(self.red), + int(self.green), + int(self.blue), + int(self.alpha), + ) + + @staticmethod + def black() -> "Color": + return Color(0, 0, 0, 255) + + +@dataclasses.dataclass +class Element: + + id: Identifier + natural_id: NaturalIdentifier + position: Rectangle2 + + +@dataclasses.dataclass +class Shape(Element): + + points: typing.List[Vector2] + color: Color + clip: Rectangle2 = None + + +@dataclasses.dataclass +class Font: + + family: str + size: int + bytes: typing.Optional[bytes] = None + + @cached_property.cached_property + def file_name(self): + return f"{self.family}.ttf" + + +class Alignment(enum.Enum): + + LEFT = 0 + RIGHT = 1 + + +@dataclasses.dataclass +class Span: + + start: int + length: int + content: str + font: typing.Optional[Font] + color: typing.Optional[Color] + + @property + def end(self): + return self.start + self.length + + +@dataclasses.dataclass +class Text(Element): + + content: str + color: Color + font: Font + alignment: Alignment + spans: typing.List[Span] + + +@dataclasses.dataclass +class Image(Element): + + bytes: io.BytesIO + alternative: str + + +@dataclasses.dataclass +class Page: + + size: Vector2 + elements: typing.List[Element] + + +@dataclasses.dataclass +class Document: + + pages: typing.List[Page] + + @property + def fonts(self) -> typing.Iterator[Font]: + for page in self.pages: + for element in page.elements: + if isinstance(element, Text): + yield element.font + + for span in element.spans: + if span.font: + yield span.font diff --git a/backtest/template/pdf.py b/backtest/template/pdf.py new file mode 100644 index 0000000..bd06262 --- /dev/null +++ b/backtest/template/pdf.py @@ -0,0 +1,373 @@ +import io +import os +import typing +import itertools + +import fpdf + +from .models import * +from .template import * + + +@dataclasses.dataclass() +class Word: + + position: Rectangle2 + content: str + font: Font + color: Color + + +@dataclasses.dataclass() +class Line: + + size: Vector2 + words: typing.List[Word] + + +class PdfTemplateRenderer(TemplateRenderer): + + DEFAULT_PRIORITIES = { + Text.__name__.lower(): 50, + Shape.__name__.lower(): -20, + Image.__name__.lower(): 10 + } + + def __init__( + self, + priorities: typing.Dict[str, int] = None, + debug=False, + ) -> None: + super().__init__() + + if priorities is None: + priorities = PdfTemplateRenderer.DEFAULT_PRIORITIES + + self.priorities = priorities + self.debug = debug + + def render( + self, + template: Template, + output: io.FileIO + ) -> Template: + pdf = fpdf.FPDF( + unit="pt" + ) + + families = set() + + for font in template.document.fonts: + file_name = font.file_name + + if font.family.lower() in pdf.fonts: + continue + + if os.path.exists(file_name): + pdf.add_font(font.family, "", file_name) + elif font.bytes: + self._add_font_from_bytes(pdf, font) + else: + font.family = "helvetica" + + if file_name not in families: + print(f"font {file_name} not found, using {font.family} instead") + + families.add(file_name) + + pdf.set_auto_page_break(False) + + for page in template.document.pages: + pdf.add_page( + format=(page.size.x, page.size.y), + ) + + elements = page.elements.copy() + for index, element in enumerate(elements): + element._index = index + + elements = sorted(elements, key=lambda x: ( + self.priorities[x.__class__.__name__.lower()], x._index)) + + for element in elements: + pdf.set_xy(element.position.x, element.position.y) + + if isinstance(element, Shape): + self._render_shape(pdf, element) + elif isinstance(element, Text): + self._render_text(pdf, element) + elif isinstance(element, Image): + self._render_image(pdf, element) + + if self.debug: + self._render_debug(pdf, elements) + + pdf.output(output) + + def _render_image(self, pdf: fpdf.FPDF, image: Image): + pdf.image( + image.bytes, + image.position.x, image.position.y, + image.position.width, image.position.height + ) + + def _compute_lines(self, pdf: fpdf.FPDF, text: Text) -> typing.List[Line]: + lines = [] + words = [] + + def commit_line(min_height: float): + nonlocal words, lines + + width = 0 + height = min_height + + if len(words): + width = sum(word.position.width for word in words) + height = max(word.position.height for word in words) + + lines.append(Line( + size=Vector2( + width, + height + ), + words=words + )) + + words = [] + + font = text.font + pdf.set_font(font.family, '', font.size) + space_size = pdf.get_string_width(" ") + + x, y = 0, 0 + + def find_span(start: int): + previous = None + + for index, span in enumerate(text.spans): + if span.start > start: + break + + previous = span + + return previous, index + + span_height = 0 + + for index, word in _split_words(text.content): + span, span_index = find_span(index) + + color = span.color or text.color + font = span.font or text.font + pdf.set_font(font.family, '', font.size) + + if self.debug: + if span_index % 2: + pdf.set_text_color(255, 0, 0) + else: + pdf.set_text_color(0, 255, 0) + + span_height = font.size + + if word[0] == "\n": + line_count = len(word) + + for _ in range(line_count): + commit_line(span_height) + + x = 0 + y += font.size * line_count + elif word[0] == " ": + x += space_size * len(word) + span_width = pdf.get_string_width(word) + else: + span_width = pdf.get_string_width(word) + + next_x = x + span_width + + width = next_x + height = span_height + + if width > text.position.width + 2: + commit_line(span_height) + + x = 0 + y += height + next_x = span_width + + words.append(Word( + position=Rectangle2( + x, y, + span_width, span_height + ), + content=word, + font=font, + color=color + )) + + x = next_x + + commit_line(span_height) + + return lines + + def _render_text(self, pdf: fpdf.FPDF, text: Text): + if right := (text.alignment == Alignment.RIGHT): + start_x = pdf.x - 3 + start_y = pdf.y + 1 + else: + start_x = pdf.x - 3 + start_y = pdf.y + 1 + + lines = self._compute_lines(pdf, text) + + height_sum = sum(line.size.y for line in lines) + free_space = max(0, text.position.height - height_sum) + extra_space = free_space / (len(lines) + 1) + + for index, line in enumerate(lines, 1): + if right: + last_x = max(word.position.x + word.position.width for word in line.words) if len(line.words) else 0 + right_offset = text.position.width - last_x + + for word in line.words: + pdf.set_text_color(word.color.red, word.color.green, word.color.blue) + pdf.set_font(word.font.family, '', word.font.size) + + x = start_x + word.position.x + y = start_y + word.position.y + (index * extra_space) + + if right: + x += right_offset + + pdf.set_xy(x, y) + pdf.cell( + word.position.width, + word.position.height, + word.content, + ) + + def _render_shape(self, pdf: fpdf.FPDF, shape: Shape): + with pdf.new_path(shape.position.x, shape.position.y) as path: + path.style.fill_color = shape.color.hex_string + path.style.stroke_color = fpdf.drawing.gray8(0, 0) + path.style.stroke_opacity = 0 + + clip = shape.clip + if clip: + clipping_path = fpdf.drawing.ClippingPath() + clipping_path.rectangle( + clip.x, clip.y, + clip.width, clip.height, + ) + + path.clipping_path = clipping_path + + for point in shape.points: + path.line_to(point.x, point.y) + + path.close() + + def _render_debug(self, pdf: fpdf.FPDF, elements: typing.List[Element]): + for element in elements: + if isinstance(element, Text): + pdf.set_draw_color(255, 0, 0) + elif isinstance(element, Image): + pdf.set_draw_color(0, 255, 0) + elif isinstance(element, Shape): + pdf.set_draw_color(0, 0, 255) + else: + continue + + pdf.rect( + element.position.x, + element.position.y, + element.position.width, + element.position.height + ) + + def _add_font_from_bytes(self, pdf: fpdf.FPDF, font: Font): + """ + Partially extracted from official fpdf.FPDF.add_font function. + But this one only keep the important part and use a io buffer instead of a filename. + """ + font_ = font + + import warnings + from fontTools import ttLib + from fpdf.fpdf import SubsetMap + from fpdf.enums import FontDescriptorFlags, TextEmphasis + from fpdf.output import PDFFontDescriptor + + fontkey = font.family.lower() + if fontkey in pdf.fonts or fontkey in pdf.core_fonts: + warnings.warn(f"Core font or font already added '{fontkey}': doing nothing") + return + + font = ttLib.TTFont(io.BytesIO(font.bytes), fontNumber=0, lazy=True) + + scale = 1000 / font["head"].unitsPerEm + default_width = round(scale * font["hmtx"].metrics[".notdef"][0]) + + try: + cap_height = font["OS/2"].sCapHeight + except AttributeError: + cap_height = font["hhea"].ascent + + flags = FontDescriptorFlags.SYMBOLIC + if font["post"].isFixedPitch: + flags |= FontDescriptorFlags.FIXED_PITCH + if font["post"].italicAngle != 0: + flags |= FontDescriptorFlags.ITALIC + if font["OS/2"].usWeightClass >= 600: + flags |= FontDescriptorFlags.FORCE_BOLD + + desc = PDFFontDescriptor( + ascent=round(font["hhea"].ascent * scale), + descent=round(font["hhea"].descent * scale), + cap_height=round(cap_height * scale), + flags=flags, + font_b_box=( + f"[{font['head'].xMin * scale:.0f} {font['head'].yMin * scale:.0f}" + f" {font['head'].xMax * scale:.0f} {font['head'].yMax * scale:.0f}]" + ), + italic_angle=int(font["post"].italicAngle), + stem_v=round(50 + int(pow((font["OS/2"].usWeightClass / 65), 2))), + missing_width=default_width, + ) + + char_widths = collections.defaultdict(lambda: default_width) + font_cmap = tuple(font.getBestCmap().keys()) + for char in font_cmap: + glyph = font.getBestCmap()[char] + w = font["hmtx"].metrics[glyph][0] + if w == 65535: + w = 0 + + char_widths[char] = round(scale * w + 0.001) + + sbarr = "\x00 " + if pdf.str_alias_nb_pages: + sbarr += "0123456789" + sbarr += pdf.str_alias_nb_pages + + pdf.fonts[fontkey] = { + "i": len(pdf.fonts) + 1, + "type": "TTF", + "name": re.sub("[ ()]", "", font["name"].getBestFullName()), + "desc": desc, + "up": round(font["post"].underlinePosition * scale), + "ut": round(font["post"].underlineThickness * scale), + "cw": char_widths, + "ttffile": io.BytesIO(font_.bytes), + "fontkey": fontkey, + "emphasis": TextEmphasis.coerce(""), + "subset": SubsetMap(map(ord, sbarr)), + "cmap": font_cmap, + } + + +def _split_words(text: str): + for _, group in itertools.groupby(enumerate(text), lambda x: (x[1] == " ", x[1] == "\n")): + index, part = next(group) + yield index, part + "".join(x for _, x in group) diff --git a/backtest/template/sketch.py b/backtest/template/sketch.py new file mode 100644 index 0000000..959ba3d --- /dev/null +++ b/backtest/template/sketch.py @@ -0,0 +1,310 @@ +import zipfile +import shutil +import json + +from .models import * +from .template import * + + +class SketchTemplateLoader(TemplateLoader): + + def _load(self, zipfd: zipfile.ZipFile, sketch: dict): + elements: typing.List[Element] + + def find(layer: dict, absolute_offset: Vector2, clip: Rectangle2): + id = layer["do_objectID"] + natural_id = layer["name"] + + local = self._get_frame_xy(layer) + size = self._get_frame_wh(layer) + absolute = local + absolute_offset + + position = Rectangle2( + absolute.x, absolute.y, + size.x, size.y, + ) + + class_ = layer["_class"] + if class_ == "shapePath": + ( + points, + color + ) = self._extract_shape_points( + layer, + absolute, + size + ) + + elements.append(Shape( + id=id, + natural_id=natural_id, + position=position, + points=points, + color=color, + clip=clip, + )) + + elif class_ == "text": + ( + content, + font, + color, + alignment, + spans, + ) = self._extract_text( + layer + ) + + elements.append(Text( + id=id, + natural_id=natural_id, + position=position, + content=content, + font=font, + color=color, + alignment=alignment, + spans=spans + )) + + elif class_ == "bitmap": + path = layer["image"]["_ref"] + + bytes = io.BytesIO() + with zipfd.open(path) as fd: + shutil.copyfileobj(fd, bytes) + + elements.append(Image( + id=id, + natural_id=natural_id, + position=position, + bytes=bytes, + alternative=natural_id + )) + + clip: Rectangle2 = None + for sub_layer in layer.get("layers", []): + if sub_layer["hasClippingMask"]: + sub_size = self._get_frame_wh(sub_layer) + sub_local = self._get_frame_xy(sub_layer) + + sub_absolute = sub_local + absolute + + clip = Rectangle2( + sub_absolute.x, sub_absolute.y, + sub_size.x, sub_size.y, + ) + else: + find(sub_layer, absolute, clip) + clip = None + + pages: typing.List[Page] = [] + for layer in reversed(sketch["layers"]): + layer["frame"]["x"] = 0 + layer["frame"]["y"] = 0 + + elements = [] + find(layer, Vector2.zero(), None) + + if len(elements) > 1: + pages.append(Page( + size=self._get_frame_wh(layer), + elements=elements + )) + + return pages + + def load(self, path: str) -> Template: + with zipfile.ZipFile(path) as zipfd: + def open_or_none(file_path: str): + try: + return zipfd.open(file_path) + except KeyError as error: + return None + + with zipfd.open('document.json') as fd: + document_meta = json.load(fd) + + sketchs: typing.List[dict] = [] + for item in document_meta["pages"]: + ref = item["_ref"] + with zipfd.open(f"{ref}.json") as fd: + sketchs.append(json.load(fd)) + + pages: typing.List[Page] = [] + for sketch in sketchs: + pages.extend(self._load(zipfd, sketch)) + + document = Document( + pages=pages + ) + + loaded_fonts = dict() + for font in document.fonts: + file_name = font.file_name + + current = loaded_fonts.get(file_name) + if current == False: + continue + elif isinstance(current, bytes): + font.bytes = current + continue + + fd = open_or_none(file_name) or open_or_none(f"fonts/{file_name}") + if fd: + with fd: + current = fd.read() + font.bytes = current + loaded_fonts[file_name] = current + else: + loaded_fonts[file_name] = False + + return Template( + path, + document + ) + + def _get_frame_xy(self, layer: dict): + return Vector2( + layer["frame"]["x"], + layer["frame"]["y"], + ) + + def _get_frame_wh(self, layer: dict): + return Vector2( + layer["frame"]["width"], + layer["frame"]["height"], + ) + + def _extract_xy(self, input: str): + input = input.replace("{", "") + input = input.replace("}", "") + + return tuple(map(float, input.split(", "))) + + def _convert_color(self, input: dict) -> Color: + return Color( + input["red"] * 255, + input["green"] * 255, + input["blue"] * 255, + input["alpha"] * 255 + ) + + def _extract_shape_points( + self, + layer: dict, + absolute: Vector2, + size: Vector2 + ) -> Shape: + fill = next(iter(layer["style"]["fills"]), None) + color = self._convert_color(fill["color"]) if fill else Color.black() + + points: typing.List[Vector2] = [] + for point in layer["points"]: + x, y = self._extract_xy(point["curveTo"]) + + points.append(Vector2( + absolute.x + x * size.x, + absolute.y + y * size.y, + )) + + return points, color + + REPLACE_TABLE = { + "\uFB01": "fi", + "\uFB02": "fl", + "\u2019": "'", + "\u201C": "\"", + "\u201D": "\"", + "\t": " ", + } + + def _sanitize(self, input: str): + if input is None: + return None + + added = 0 + output = "" + + for character in input: + replacement = self.REPLACE_TABLE.get(character) + + if replacement is None: + output += character + continue + + length = len(replacement) + if length > 1: + added += length - 1 + + output += replacement + + return output, added + + def _extract_text( + self, + layer: dict, + ): + raw_string = layer["attributedString"]["string"] + content, _ = self._sanitize(raw_string) + + font = Font( + family=layer["style"]["textStyle"]["encodedAttributes"]["MSAttributedStringFontAttribute"]["attributes"]["name"], + size=layer["style"]["textStyle"]["encodedAttributes"]["MSAttributedStringFontAttribute"]["attributes"]["size"], + ) + + color = self._convert_color( + layer["style"]["textStyle"]["encodedAttributes"]["MSAttributedStringColorAttribute"] + ) + + alignment = Alignment.LEFT + if layer["style"]["textStyle"]["encodedAttributes"].get("paragraphStyle", {}).get("alignment", None) == 1: + alignment = Alignment.RIGHT + + spans = self._extract_spans(layer, raw_string) + + content_length = len(content) + span_length_sum = sum(span.length for span in spans) + if content_length != span_length_sum: + raise ValueError(f"spans are not same length as content: {content_length} != {span_length_sum}") + + return content, font, color, alignment, spans + + def _extract_span( + self, + attributes_item: dict, + text_string: str, + ) -> Span: + location = attributes_item["location"] + length = attributes_item["length"] + + raw_string = text_string[location:location + length] + content, added = self._sanitize(raw_string) + + font = Font( + family=attributes_item["attributes"]["MSAttributedStringFontAttribute"]["attributes"]["name"], + size=attributes_item["attributes"]["MSAttributedStringFontAttribute"]["attributes"]["size"], + ) + + color = self._convert_color( + attributes_item["attributes"]["MSAttributedStringColorAttribute"] + ) + + return Span( + start=location, + length=length + added, + content=content, + font=font, + color=color, + ) + + def _extract_spans( + self, + layer: dict, + text_string: str, + ): + attributes = layer["attributedString"]["attributes"] + + return [ + self._extract_span(item, text_string) + for item in attributes + ] diff --git a/backtest/template/template.py b/backtest/template/template.py new file mode 100644 index 0000000..49699e4 --- /dev/null +++ b/backtest/template/template.py @@ -0,0 +1,117 @@ +import io +import shutil +import re +import collections +import matplotlib.figure +import sys + +from .models import * + + +class Template: + + name: str + document: Document + slots: typing.Dict[ + typing.Union[NaturalIdentifier, Identifier], + typing.List[Element] + ] + + def __init__( + self, + name: str, + document: Document + ) -> None: + self.name = name + self.document = document + + self.slots = collections.defaultdict(list) + for page in document.pages: + for element in page.elements: + self.slots[element.id].append(element) + self.slots[element.natural_id].append(element) + + def log(self, message: str): + print(f"template: {message}", file=sys.stderr) + + def apply(self, variables: typing.Dict[typing.Union[NaturalIdentifier, Identifier], typing.Union[typing.Callable[[str], typing.Any], typing.Any]]): + for key, value in variables.items(): + self.set(key, value) + + def apply_re(self, variables: typing.Dict[str, typing.Union[typing.Callable[[str], typing.Any], typing.Any]]): + for pattern, value in variables.items(): + found = False + + for key, elements in self.slots.items(): + match = re.search(f"^{pattern}$", key) + if match is None: + continue + + found = True + + real_value = value + if callable(value): + real_value = value(key, *match.groups()) + + self._set(elements, real_value, key) + + if not found: + self.log(f"no element for pattern={pattern}") + + def set(self, key: typing.Union[NaturalIdentifier, Identifier, re.Pattern], value: typing.Union[typing.Callable[[str], typing.Any], typing.Any]): + elements = self.slots.get(key) + if elements is None: + self.log(f"no element for key={key}") + return + + if callable(value): + value = value(key) + + return self._set(elements, value, key) + + def _set(self, elements: typing.List[Element], value: typing.Any, original_key=None): + self.log( + f"apply len(elements)={len(elements)} key='{original_key}' value='{value}'") + + for element in elements: + if isinstance(element, Text): + text = element + text.content = str(value) + elif isinstance(element, Image): + image = element + + if isinstance(value, io.BytesIO): + image.bytes = value + elif isinstance(value, matplotlib.figure.Figure): + figure: matplotlib.figure.Figure = value + figure.suptitle("") + figure.gca().set_ylabel("") + figure.gca().set_xlabel("") + figure.gca().set_title("") + + bytes = io.BytesIO() + figure.savefig(bytes, bbox_inches="tight") + bytes.seek(0) + + image.bytes = bytes + elif isinstance(value, str): + bytes = io.BytesIO() + with open(value, "rb") as fd: + shutil.copyfileobj(fd, bytes) + + image.bytes = bytes + else: + raise ValueError( + f"unsupported for image: {type(value)} {value}") + + +class TemplateLoader: + + def load(path: str) -> Template: + raise NotImplemented() + + +class TemplateRenderer: + + def render(self, template: Template, output: io.FileIO) -> Template: + raise NotImplemented() diff --git a/backtest/utils.py b/backtest/utils.py index 11465d7..6fb6153 100644 --- a/backtest/utils.py +++ b/backtest/utils.py @@ -1,10 +1,13 @@ +import typing + + def signum(n): if n < 0: return -1 - + if n > 0: return 1 - + return 0 @@ -32,11 +35,27 @@ def is_blank(value: str): return value is None or len(value.strip()) == 0 -def ensure_not_blank(value: str, property: str=None) -> str: +def ensure_not_blank(value: str, property: str = None) -> str: if is_blank(value): if property: raise ValueError(f"{property} must not be blank") else: raise ValueError(f"must not be blank") - return value \ No newline at end of file + return value + + +def use_attrs(x: dict): + + class Wrapped: + + def __getattr__(self, name: str) -> typing.Any: + try: + return x[name] + except KeyError as error: + raise AttributeError(*error.args) from error + + def __setattr__(self, name: str, value: typing.Any) -> None: + x[name] = value + + return Wrapped() diff --git a/example/tearsheet-script.py b/example/tearsheet-script.py new file mode 100644 index 0000000..5fc7611 --- /dev/null +++ b/example/tearsheet-script.py @@ -0,0 +1,18 @@ +import pandas +import typing + +def get_template_values( + df_returns: typing.Optional[pandas.Series], + df_benchmark: typing.Optional[pandas.Series], + df_dump: typing.Optional[pandas.DataFrame], + df_metrics: typing.Optional[pandas.DataFrame], + **kwargs +): + apply, apply_re = {}, {} + + apply.update({ + "website": "direct value", + "title": lambda _: "lazy" + "value", + }) + + return apply, apply_re diff --git a/main.py b/main.py index 2d3bc87..13ea870 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,3 @@ from backtest import cli -cli.main() \ No newline at end of file +cli.cli() diff --git a/requirements.txt b/requirements.txt index 7895817..2f6bcc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,10 @@ yfinance >=0.2.24, <0.2.27 python-dotenv >=0.20, <1.0.0 colorama >=0.4.4, <0.4.6 ipython==8.15.0 -seaborn>=0.12.0, <0.13.0 \ No newline at end of file +seaborn>=0.12.0, <0.13.0 +python-slugify +cached-property +fpdf2==2.7.4 +contexttimer +watchdog +fastparquet