Skip to content

Commit

Permalink
Adding a more adapt python tooling for Advanced Logger v4 (#415)
Browse files Browse the repository at this point in the history
# Preface

Please ensure you have read the [contribution
docs](https://github.com/microsoft/mu/blob/master/CONTRIBUTING.md) prior
to submitting the pull request. In particular,
[pull request
guidelines](https://github.com/microsoft/mu/blob/master/CONTRIBUTING.md#pull-request-best-practices).

## Description

This change will allow the advanced logger parser to handle the v4,
which would get lines from the same UEFI boot phase without intercepting
each other. It will also search for the beginning of returned buffer
with more sophisticated logic.

Resolves #401

For each item, place an "x" in between `[` and `]` if true. Example:
`[x]`.
_(you can also check items in the GitHub UI)_

- [x] Impacts functionality?
- **Functionality** - Does the change ultimately impact how firmware
functions?
- Examples: Add a new library, publish a new PPI, update an algorithm,
...
- [ ] Impacts security?
- **Security** - Does the change have a direct security impact on an
application,
    flow, or firmware?
  - Examples: Crypto algorithm change, buffer overflow fix, parameter
    validation improvement, ...
- [ ] Breaking change?
- **Breaking change** - Will anyone consuming this change experience a
break
    in build or boot behavior?
- Examples: Add a new library class, move a module to a different repo,
call
    a function in a new library class in a pre-existing module, ...
- [ ] Includes tests?
  - **Tests** - Does the change include any explicit test code?
  - Examples: Unit tests, integration tests, robot tests, ...
- [ ] Includes documentation?
- **Documentation** - Does the change contain explicit documentation
additions
    outside direct code modifications (and comments)?
- Examples: Update readme file, add feature readme file, link to
documentation
    on an a separate Web page, ...

## How This Was Tested

This was tested on both QEMU Q35 and proprietary hardware platform
returned advanced logger buffer binary.

## Integration Instructions

N/A

---------

Co-authored-by: Oliver Smith-Denny <[email protected]>
  • Loading branch information
kuqin12 and os-d authored Jan 26, 2024
1 parent 832500f commit 12f028a
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 18 deletions.
221 changes: 207 additions & 14 deletions AdvLoggerPkg/Application/DecodeUefiLog/DecodeUefiLog.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import struct
import argparse
import tempfile
import traceback
import copy

from win32com.shell import shell

Expand Down Expand Up @@ -422,7 +424,7 @@ def _ReadMessageEntry(self, LoggerInfo):
InFile.seek(EntryStart + MessageEntry["MessageOffset"])
MessageEntry["MessageText"] = InFile.read(MessageEntry["MessageLen"]).decode('utf-8', 'replace')
else:
raise Exception("Message Block has wrong signature at offset 0x%X" % InFile.tell())
return (None, None)

Skip = InFile.tell()
Norm = int((int((Skip + 7) / 8)) * 8)
Expand Down Expand Up @@ -589,6 +591,181 @@ def _GetPhaseString(self, Phase):
PhaseString = self.PHASE_STRING_LIST[Phase]
return PhaseString

#
# This helper function will help to identify the
# first valid block in the logger buffer. This
# is needed when the buffer wraps around for
# version 4+.
#
def _FindFirstValidBlock(self, LoggerInfo):
# start.
OriginalOffset = LoggerInfo["InFile"].tell()
StartOffset = LoggerInfo["LogCurrent"]
while True:
LoggerInfo["InFile"].seek(StartOffset)
CurrentOffset = StartOffset
ThisIsGood = True
# We need to verify that from this point on, we have only eligible
# entries. If not, we need to move along the pointer.
while CurrentOffset < LoggerInfo["LogBufferSize"]:
(MessageEntry, _) = self._ReadMessageEntry(LoggerInfo)
CurrentOffset = LoggerInfo["InFile"].tell()
if MessageEntry is None:
ThisIsGood = False
break

if ThisIsGood:
LoggerInfo["InFile"].seek(StartOffset)
break
else:
# We found the first legible line, so we can start from here.
LogStream = LoggerInfo["InFile"].read()
StartOffset = LogStream.find(b'ALM2')
if StartOffset == -1:
StartOffset = LogStream.find(b'ALMS')
if StartOffset == -1:
print("DecodeUefiLog unable to find ALM* signature. Using the beginning as start")
StartOffset = OriginalOffset
break
else:
StartOffset += CurrentOffset
else:
StartOffset += CurrentOffset

return StartOffset

#
# Helper function to break up a single message entry
# into multiple lines based on the line break.
#
def _BreakUpMessageEntry(self, MessageEntry):
MessageEntries = []

# Break a line if we see a line break in the content.
t_text = MessageEntry["MessageText"]
if t_text == "":
# This is an intentional empty line, we will keep it.
MessageEntries.append(MessageEntry)
return MessageEntries

linebreak = t_text.find("\n")
while linebreak != -1:
newMessageEntry = copy.deepcopy(MessageEntry)
newMessageEntry["MessageText"] = t_text[:linebreak + 1]
MessageEntries.append(newMessageEntry)
t_text = t_text[linebreak + 1:]
linebreak = t_text.find("\n")

if t_text != "":
newMessageEntry = copy.deepcopy(MessageEntry)
newMessageEntry["MessageText"] = t_text
MessageEntries.append(newMessageEntry)

return MessageEntries

#
# Get the message entries from the logger buffer
# starting from the given offset. This function
# will wrap around the buffer end and return all
# the message entries until the LogCurrent
# pointer.
#
def _FetchAllMessageEntries(self, LoggerInfo, OriginalOffset, StartOffset):
MessageEntries = []
if StartOffset != OriginalOffset:
MaximumOffset = LoggerInfo["LogBufferSize"]
NeedWrap = True
else:
# To be honest, there might be some prints that we missed during
# querying the logger buffer. We can deal with this nuance later.
MaximumOffset = LoggerInfo["LogCurrent"]
NeedWrap = False

# Ok, let's go
LoggerInfo["InFile"].seek(StartOffset)
CurrentOffset = StartOffset
while CurrentOffset < MaximumOffset:
(MessageEntry, _) = self._ReadMessageEntry(LoggerInfo)
CurrentOffset = LoggerInfo["InFile"].tell()
if MessageEntry is None:
break

# Break a line if we see a line break in the content.
entries = self._BreakUpMessageEntry(MessageEntry)
MessageEntries.extend(entries)

# If we need to wrap, we need to start from the beginning.
if NeedWrap:
CurrentOffset = OriginalOffset
MaximumOffset = LoggerInfo["LogCurrent"]
LoggerInfo["InFile"].seek(CurrentOffset)
while CurrentOffset < MaximumOffset:
(MessageEntry, _) = self._ReadMessageEntry(LoggerInfo)
CurrentOffset = LoggerInfo["InFile"].tell()
if MessageEntry is None:
print(f"Warning at {CurrentOffset}: MessageEntry is None!!!")
break

# Break a line if we see a line break in the content.
entries = self._BreakUpMessageEntry(MessageEntry)
MessageEntries.extend(entries)

# Experimental: Try to creep through from here to the StartOffset,
# or until the end of this log, but we bail at the first failure.
CurrentOffset = LoggerInfo["LogCurrent"]
if NeedWrap:
# This means the start is in the middle of the buffer.
MaximumOffset = StartOffset
else:
# Otherwise, we search through the end.
MaximumOffset = LoggerInfo["LogBufferSize"]

LoggerInfo["InFile"].seek(CurrentOffset)
while CurrentOffset < MaximumOffset:
(MessageEntry, _) = self._ReadMessageEntry(LoggerInfo)
CurrentOffset = LoggerInfo["InFile"].tell()
if MessageEntry is None:
break

# Break a line if we see a line break in the content.
entries = self._BreakUpMessageEntry(MessageEntry)
MessageEntries.extend(entries)

return MessageEntries

#
# Parse all the message entries and format them into
# lines based on line ending as well as phase information
# in the message entries list.
#
def _ParseAllMessageEntries(self, lines, LoggerInfo, MessageEntries, StartLine=0, CurrentLine=0):
for i in range(len(MessageEntries)):
if MessageEntries[i] is None:
continue

Ticks = MessageEntries[i]["TimeStamp"]
PhaseString = self._GetPhaseString(MessageEntries[i]["Phase"])
line = self._GetTimeStamp(Ticks, LoggerInfo["Frequency"], LoggerInfo["BaseTime"]) + PhaseString
j = i
tempEntry = MessageEntries[i]
while j < len(MessageEntries):
# We will keep globbering the message until we see a new line of the same phase.
if MessageEntries[j] is not None and MessageEntries[j]["Phase"] == tempEntry["Phase"]:
line += MessageEntries[j]["MessageText"]
MessageEntries[j] = None
if line.endswith('\n'):
# Finally a line break, we are done.
break
j += 1

if CurrentLine >= StartLine:
# Regardless of whether we have a line break or not, we need to move the cursor
lines.append(line.rstrip("\r\n") + '\n')

CurrentLine += 1

LoggerInfo["CurrentLine"] = CurrentLine

#
# Get all of the formated message lines
#
Expand All @@ -598,22 +775,38 @@ def _GetLines(self, lines, LoggerInfo):
CurrentLine = LoggerInfo["CurrentLine"]
StartLine = LoggerInfo["StartLine"]

while (Status == self.SUCCESS):
(Status, MessageLine) = self._GetNextFormattedLine(MessageLine, LoggerInfo)
if Status != self.SUCCESS:
if Status != self.END_OF_FILE:
print(f"Error {Status} from GetNextFormattedLine")
break
if LoggerInfo["Version"] == self.V4_LOGGER_INFO_VERSION:
# There is a potential that we may have enabled auto wrap.
# If so, we need to find the first legible line. Given that
# we read the logger information from the head of the buffer,
# we can start from this cursor as an acceptable estimated
# start.
OriginalOffset = LoggerInfo["InFile"].tell()
StartOffset = self._FindFirstValidBlock(LoggerInfo)

# Navigate to the identified StartOffset.
MessageEntries = self._FetchAllMessageEntries(LoggerInfo, OriginalOffset, StartOffset)

# Now we have all the message entries, let's format them
# into lines.
self._ParseAllMessageEntries(lines, LoggerInfo, MessageEntries, StartLine, CurrentLine)
else:
while (Status == self.SUCCESS):
(Status, MessageLine) = self._GetNextFormattedLine(MessageLine, LoggerInfo)
if Status != self.SUCCESS:
if Status != self.END_OF_FILE:
print(f"Error {Status} from GetNextFormattedLine")
break

if CurrentLine >= StartLine:
Ticks = MessageLine["TimeStamp"]
PhaseString = self._GetPhaseString(MessageLine["Phase"])
NewLine = self._GetTimeStamp(Ticks, LoggerInfo["Frequency"], LoggerInfo["BaseTime"]) + PhaseString + MessageLine["Message"].rstrip("\r\n")
lines.append(NewLine + '\n')
if CurrentLine >= StartLine:
Ticks = MessageLine["TimeStamp"]
PhaseString = self._GetPhaseString(MessageLine["Phase"])
NewLine = self._GetTimeStamp(Ticks, LoggerInfo["Frequency"], LoggerInfo["BaseTime"]) + PhaseString + MessageLine["Message"].rstrip("\r\n")
lines.append(NewLine + '\n')

CurrentLine += 1
CurrentLine += 1

LoggerInfo["CurrentLine"] = CurrentLine
LoggerInfo["CurrentLine"] = CurrentLine

return lines

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ AdvLoggerAccessInit (
V2 ...
...
Vxx - returns the last few bytes of the log
Vxx - returns the last few bytes of the logging area
Vxx+1 = returns EFI_NOT_FOUND.
Expand Down Expand Up @@ -221,7 +221,7 @@ AdvLoggerAccessGetVariable (
}

LogBufferStart = (UINT8 *)mLoggerInfo;
LogBufferEnd = (UINT8 *)PTR_FROM_PA (mLoggerInfo->LogCurrent);
LogBufferEnd = (UINT8 *)PTR_FROM_PA (mMaxAddress);
LogBufferStart += (BlockNumber * mLoggerTransferSize);

if (LogBufferStart >= LogBufferEnd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ AdvLoggerAccessInit (
V2 ...
...
Vxx - returns the last few bytes of the log
Vxx - returns the last few bytes of the logging area
Vxx+1 = returns EFI_NOT_FOUND.
Expand Down Expand Up @@ -221,7 +221,7 @@ AdvLoggerAccessGetVariable (
}

LogBufferStart = (UINT8 *)mLoggerInfo;
LogBufferEnd = (UINT8 *)PTR_FROM_PA (mLoggerInfo->LogCurrent);
LogBufferEnd = (UINT8 *)PTR_FROM_PA (mMaxAddress);
LogBufferStart += (BlockNumber * mLoggerTransferSize);

if (LogBufferStart >= LogBufferEnd) {
Expand Down

0 comments on commit 12f028a

Please sign in to comment.