Skip to content

Commit

Permalink
Merge pull request #204 from FNNDSC/cut-log
Browse files Browse the repository at this point in the history
Tail logs
  • Loading branch information
jennydaman authored Mar 19, 2022
2 parents 51e8a61 + 2e4bb35 commit c23f814
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 30 deletions.
9 changes: 6 additions & 3 deletions pman/abstractmgr.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, NewType, Optional, TypedDict
from typing import Generic, TypeVar, NewType, Optional, TypedDict, AnyStr
from dataclasses import dataclass
from enum import Enum

Expand Down Expand Up @@ -99,9 +99,12 @@ def get_job(self, name: JobName) -> J:
...

@abstractmethod
def get_job_logs(self, job: J) -> str:
def get_job_logs(self, job: J, tail: int) -> AnyStr:
"""
Get the logs string from a previously scheduled job object.
Get the logs (combined stdout+stdin) from a previously scheduled job object.
:param job: the job which to get the logs for
:param tail: how many bytes to read from the end of the logs.
"""
...

Expand Down
2 changes: 2 additions & 0 deletions pman/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def __init__(self):
env = Env()
env.read_env() # also read .env file, if it exists

self.JOB_LOGS_TAIL = env.int('JOB_LOGS_TAIL', 1000)

self.CONTAINER_ENV = env('CONTAINER_ENV', 'swarm')
self.STORAGE_TYPE = env('STORAGE_TYPE', 'host')

Expand Down
2 changes: 1 addition & 1 deletion pman/cromwellmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def get_job(self, name: JobName) -> WorkflowId:
return job.id
raise CromwellException(f'No job found for name="{name}"', status_code=404)

def get_job_logs(self, job: WorkflowId) -> str:
def get_job_logs(self, job: WorkflowId, tail: int) -> str:
# cromwell_tools.utilities.download
data = self.__client.logs_idc(job)
return (
Expand Down
34 changes: 16 additions & 18 deletions pman/kubernetesmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
as manage their state in the cluster.
"""

from typing import AnyStr
import logging
import shlex

from kubernetes import client as k_client
Expand All @@ -11,6 +13,8 @@
from kubernetes.client.rest import ApiException
from .abstractmgr import AbstractManager, ManagerException, JobInfo, JobStatus, TimeStamp, JobName

logger = logging.getLogger(__name__)


class KubernetesManager(AbstractManager[V1Job]):

Expand Down Expand Up @@ -41,17 +45,15 @@ def get_job(self, name) -> V1Job:
raise ManagerException(str(e), status_code=status_code)
return job

def get_job_logs(self, job):
"""
Get the logs string from a previously scheduled job object.
"""
def get_job_logs(self, job: V1Job, tail: int) -> AnyStr:
# TODO: Think of a better way to abstract out logs in case of multiple pods running parallelly

logs = ''
pods = self.get_job_pods(job.metadata.name)
for pod_item in pods.items:
pod_name = pod_item.metadata.name
logs += self.get_pod_log(pod_name)
logs += self.get_pod_log(pod_name, tail)

return logs

def get_job_info(self, job) -> JobInfo:
Expand Down Expand Up @@ -198,21 +200,17 @@ def get_job_pods(self, name):
return self.kube_client.list_namespaced_pod(job_namespace,
label_selector='job-name='+name)

def get_pod_log(self, name, container_name=None):
"""
Get a pod log
"""
def get_pod_log(self, pod_name: str, tail: int) -> AnyStr:
job_namespace = self.config.get('JOB_NAMESPACE')
try:
if container_name:
log = self.kube_client.read_namespaced_pod_log(name=name,
namespace=job_namespace,
container=container_name)
else:
log = self.kube_client.read_namespaced_pod_log(name=name,
namespace=job_namespace)
except ApiException:
log = ''
log = self.kube_client.read_namespaced_pod_log(
name=pod_name,
namespace=job_namespace,
tail_lines=tail
)
except ApiException as e:
log = 'Error: check pman logs.'
logger.error('Exception getting logs for pod="%s": %s', pod_name, str(e))
return log

def get_pod_status(self, name):
Expand Down
2 changes: 1 addition & 1 deletion pman/openshiftmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def get_job(self, name):
return job


def get_job_logs(self,job):
def get_job_logs(self, job, tail):
#return ''
name = job.metadata.name
str_logs = ''
Expand Down
7 changes: 5 additions & 2 deletions pman/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def post(self):
job_info = compute_mgr.get_job_info(job)
logger.info(f'Successful job {job_id} schedule response from '
f'{self.container_env}: {job_info}')
job_logs = compute_mgr.get_job_logs(job)
job_logs = ''

return {
'jid': job_id,
Expand Down Expand Up @@ -148,6 +148,7 @@ def __init__(self):

self.container_env = app.config.get('CONTAINER_ENV')
self.compute_mgr = get_compute_mgr(self.container_env)
self.job_logs_tail = app.config.get('JOB_LOGS_TAIL')

def get(self, job_id):
job_id = job_id.lstrip('/')
Expand All @@ -161,7 +162,9 @@ def get(self, job_id):
job_info = self.compute_mgr.get_job_info(job)
logger.info(f'Successful job {job_id} status response from '
f'{self.container_env}: {job_info}')
job_logs = self.compute_mgr.get_job_logs(job)
job_logs = self.compute_mgr.get_job_logs(job, self.job_logs_tail)
if isinstance(job_logs, bytes):
job_logs = job_logs.decode(encoding='utf-8', errors='replace')

return {
'jid': job_id,
Expand Down
8 changes: 3 additions & 5 deletions pman/swarmmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Swarm cluster manager module that provides functionality to schedule
jobs (short-lived services) as well as manage their state in the cluster.
"""
from typing import AnyStr, Sequence, Iterable

import docker
from docker.models.services import Service
Expand Down Expand Up @@ -52,11 +53,8 @@ def get_job(self, name) -> Service:
raise ManagerException(str(e), status_code=400)
return job

def get_job_logs(self, job):
"""
Get the logs from a previously scheduled job object.
"""
return ''.join([l.decode(errors='replace') for l in job.logs(stdout=True, stderr=True)])
def get_job_logs(self, job: Service, tail: int) -> AnyStr:
return b''.join(job.logs(stdout=True, stderr=True, tail=tail))

def get_job_info(self, job: Service) -> JobInfo:
"""
Expand Down

0 comments on commit c23f814

Please sign in to comment.