Skip to content

Commit

Permalink
Logging/Asset Modules (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
dehume authored Apr 18, 2023
1 parent 6fabeca commit 71cb1cd
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
24 changes: 24 additions & 0 deletions week_3/workspaces/content/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import random

from dagster import AssetMaterialization, String, graph, op

FRUIT = ["apple", "orange", "lime", "lemon"]


@op
def random_asset(context):
context.log_event(
AssetMaterialization(
asset_key="my_asset",
description="Recording a random number and random fruit",
metadata={"random_number": random.randint(0, 10), "random_fruit": random.choice(FRUIT)},
)
)


@graph
def asset_graph():
random_asset()


asset_job = asset_graph.to_job()
14 changes: 12 additions & 2 deletions week_3/workspaces/content/deployment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from dagster import Definitions
from workspaces.content.assets import asset_job
from workspaces.content.etl import etl_docker, etl_local, etl_local_partitioned_schedule
from workspaces.content.io_retry import job_local_io_manager, job_local_io_manager_retry
from workspaces.content.logging import logging_dev_job, logging_prod_job

definition = Definitions(
schedules=[etl_local_partitioned_schedule],
jobs=[job_local_io_manager, job_local_io_manager_retry, etl_docker, etl_local],
)
jobs=[
job_local_io_manager,
job_local_io_manager_retry,
etl_docker,
etl_local,
asset_job,
logging_dev_job,
logging_prod_job,
],
)
68 changes: 68 additions & 0 deletions week_3/workspaces/content/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import logging

from dagster import Field, InitLoggerContext, graph, logger, op


def message_slack(api_key: str, message: str):
# Pretend API call
print(message)


class JsonFormatter(logging.Formatter):
def __init__(self, api_key: str):
self.api_key = api_key

def format(self, record):
message_slack(self.api_key, record.__dict__["msg"])
return json.dumps(record.__dict__)


@logger(
{
"api_key": Field(str, is_required=True),
"log_level": Field(str, is_required=False, default_value="INFO"),
"name": Field(str, is_required=False, default_value="CoRise"),
},
description="Our custom CoRise logger",
)
def corise_logger(init_context: InitLoggerContext):
api_key = init_context.logger_config["api_key"]
level = init_context.logger_config["log_level"]
name = init_context.logger_config["name"]

klass = logging.getLoggerClass()
logger_ = klass(name, level=level)

handler = logging.StreamHandler()

handler.setFormatter(JsonFormatter(api_key=api_key))
logger_.addHandler(handler)

return logger_


@op
def print_logging(context):
print("Using print()")


@op
def basic_logging(context, start_after):
context.log.info("Logging via context: info")
context.log.warning("Logging via context: warning")
context.log.error("Logging via context: error")


@graph
def logging_graph():
basic_logging(print_logging())


logging_dev_job = logging_graph.to_job(name="logging_dev_job")

logging_prod_job = logging_graph.to_job(
name="logging_prod_job",
config={"loggers": {"corise_logger": {"config": {"api_key": "XXX", "log_level": "ERROR"}}}},
logger_defs={"corise_logger": corise_logger},
)

0 comments on commit 71cb1cd

Please sign in to comment.