Skip to content

Commit

Permalink
Added a guide & sample for a custom logger client implementation.
Browse files Browse the repository at this point in the history
Signed-off-by: Djcarrillo6 <[email protected]>

Black formatter

Signed-off-by: Djcarrillo6 <[email protected]>
  • Loading branch information
Djcarrillo6 committed Nov 13, 2023
1 parent f02d6de commit e35700f
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added `samples`, `benchmarks` and `docs` to `nox -rs format` ([#556](https://github.com/opensearch-project/opensearch-py/pull/556))
- Added guide on the document lifecycle API(s) ([#559](https://github.com/opensearch-project/opensearch-py/pull/559))
- Added Windows CI ([#569](https://github.com/opensearch-project/opensearch-py/pull/569))
- Added guide on using a Python logging integration with OpenSearch logs ([#]())
### Changed
- Generate `tasks` client from API specs ([#508](https://github.com/opensearch-project/opensearch-py/pull/508))
- Generate `ingest` client from API specs ([#513](https://github.com/opensearch-project/opensearch-py/pull/513))
Expand Down
131 changes: 131 additions & 0 deletions guides/log_collection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Log Collection Guide
- [Import Required Modules](#import-required-modules)
- [Setup Connection with OpenSearch Cluster](#setup-connection-with-opensearch-cluster)
- [Initialize Logger](#initialize-logger)
- [Define Custom Handler for OpenSearch](#define-custom-handler-for-opensearch)
- [Create OpenSearch Handler and Add to Logger](#create-opensearch-handler-and-add-to-logger)
- [Setup Asynchronous Logging Using Queues](#setup-asynchronous-logging-using-queues)
- [Clean Up](#clean-up)

# Log Collection Guide
In this guide, we will look at how to collect logs from your application and send them to OpenSearch.

# Import Required Modules
Let's import the required modules:

```python
import urllib3
urllib3.disable_warnings()
from datetime import datetime
import logging
import queue
from opensearchpy import OpenSearch
from logging.handlers import QueueHandler, QueueListener
```

# Setup Connection with OpenSearch Cluster
Let's create a client instance:

```python
opensearch_client = OpenSearch(
"https://admin:admin@localhost:9200",
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
http_auth=("admin", "admin")
)
```

# Initialize Logger
Now, let's initialize a logger named "OpenSearchLogs" for OpenSearch and set the log level to INFO:

```python
# Initialize a logger named "OpenSearchLogs" for OpenSearch & set log level to INFO
print("Initializing logger...")
os_logger = logging.getLogger("OpenSearchLogs")
os_logger.setLevel(logging.INFO)
```

# Define Custom Handler for OpenSearch
Next, let's define a custom handler that logs to OpenSearch:

```python
# Define a custom handler that logs to OpenSearch
class OpenSearchHandler(logging.Handler):
# Initializer / Instance attributes
def __init__(self, opensearch_client):
logging.Handler.__init__(self)
self.os_client = opensearch_client

# Build index name (e.g., "logs-YYYY-MM-DD")
def _build_index_name(self):
return f"logs-{datetime.date(datetime.now())}"

# Emit logs to the OpenSearch cluster
def emit(self, record):
document = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"name": record.name,
"level": record.levelname,
"message": record.getMessage(),
"source": {
"file": record.pathname,
"line": record.lineno,
"function": record.funcName
},
"process": {
"id": record.process,
"name": record.processName
},
"thread": {
"id": record.thread,
"name": record.threadName
}
}

# Write the log entry to OpenSearch, handle exceptions
try:
self.os_client.index(index="movies", id=1, body={'title': 'Beauty and the Beast', 'year': 1991})
except Exception as e:
print(f"Failed to send log to OpenSearch: {e}")
```

# Create OpenSearch Handler and Add to Logger
Now, let's create an instance of OpenSearchHandler and add it to the logger:

```python
print("Creating an instance of OpenSearchHandler and adding it to the logger...")
# Create an instance of OpenSearchHandler and add it to the logger
os_handler = OpenSearchHandler(opensearch_client)
os_logger.addHandler(os_handler)
```

# Setup Asynchronous Logging Using Queues
Finally, let's setup asynchronous logging using Queues:

```python
print("Setting up asynchronous logging using Queues...")
# Setup asynchronous logging using Queues
log_queue = queue.Queue(-1) # no limit on size
os_queue_handler = QueueHandler(log_queue)
os_queue_listener = QueueListener(log_queue, os_handler)

# Add queue handler to the logger
os_logger.addHandler(os_queue_handler)

# Start listening on the queue using the os_queue_listener
os_queue_listener.start()
```

# Clean Up
Finally, let's clean up by stopping the queue listener:

```python
print("Cleaning up...")
# Stop listening on the queue
os_queue_listener.stop()
print("Log Collection Guide has completed running")
```

# Sample Code
See [log_collection_sample.py](/samples/logging/log_collection_sample.py) for a working sample of the concepts in this guide.
107 changes: 107 additions & 0 deletions samples/logging/log_collection_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from datetime import datetime
import logging
import queue
from opensearchpy import OpenSearch
from logging.handlers import QueueHandler, QueueListener

# For cleaner output, comment in the two lines below to disable warnings and informational messages
# import urllib3
# urllib3.disable_warnings()


def run_log_collection_guide() -> None:
print("Running Log Collection Guide")

# Setup connection with the OpenSearch cluster
print("Setting up connection with OpenSearch cluster...")
opensearch_client = OpenSearch(
"https://admin:admin@localhost:9200",
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
http_auth=("admin", "admin"),
)

# Initialize a logger named "OpenSearchLogs" for OpenSearch
print("Initializing logger...")
os_logger = logging.getLogger("OpenSearchLogs")
os_logger.setLevel(logging.INFO)

# Define a custom handler that logs to OpenSearch
class OpenSearchHandler(logging.Handler):
# Initializer / Instance attributes
def __init__(self, opensearch_client):
logging.Handler.__init__(self)
self.os_client = opensearch_client

# Build index name (e.g., "logs-YYYY-MM-DD")
def _build_index_name(self):
return f"logs-{datetime.date(datetime.now())}"

# Emit logs to the OpenSearch cluster
def emit(self, record):
document = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"name": record.name,
"level": record.levelname,
"message": record.getMessage(),
"source": {
"file": record.pathname,
"line": record.lineno,
"function": record.funcName,
},
"process": {"id": record.process, "name": record.processName},
"thread": {"id": record.thread, "name": record.threadName},
}

# Write the log entry to OpenSearch, handle exceptions
try:
self.os_client.index(
index="movies",
id=1,
body={"title": "Beauty and the Beast", "year": 1991},
)
except Exception as e:
print(f"Failed to send log to OpenSearch: {e}")

print("Creating an instance of OpenSearchHandler and adding it to the logger...")
# Create an instance of OpenSearchHandler and add it to the logger
os_handler = OpenSearchHandler(opensearch_client)
os_logger.addHandler(os_handler)

print("Setting up asynchronous logging using Queues...")
# Setup asynchronous logging using Queues
log_queue = queue.Queue(-1) # no limit on size
os_queue_handler = QueueHandler(log_queue)
os_queue_listener = QueueListener(log_queue, os_handler)

# Add queue handler to the logger
os_logger.addHandler(os_queue_handler)

# Start listening on the queue using the os_queue_listener
os_queue_listener.start()

print("Logger is set up and listener has started. Sending a test log...")
# Logging a test message
os_logger.info("This is a test log message")

print("Cleaning up...")
# Stop listening on the queue
os_queue_listener.stop()
print("Log Collection Guide has completed running")


if __name__ == "__main__":
run_log_collection_guide()

0 comments on commit e35700f

Please sign in to comment.