Skip to content

Commit

Permalink
Support truncating bad padding
Browse files Browse the repository at this point in the history
- Add fall-back mechanism that will remove the last character from the base64
  string and re-attempt decoding if the first attempt fails.
- Adds support for automatic detection/handling of URL-safe base64 encoding.
- Adds some base64 padding decoding unit tests.
  • Loading branch information
lowell80 committed Aug 24, 2022
1 parent 80d42cb commit 389c502
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
29 changes: 27 additions & 2 deletions bin/b64.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import os
import sys
import re
import binascii
import codecs

from base64 import b64decode, b64encode
Expand All @@ -28,6 +30,22 @@ def star_errors(exc):
codecs.register_error("replace_star", star_errors)


def decode_base64(data, altchars='+/'):
# Inspired from https://stackoverflow.com/a/9807138
# https://stackoverflow.com/a/2942127
data = re.sub(r'[^a-zA-Z0-9%s]+' % altchars, "", data) # normalize
missing_padding = len(data) % 4
if missing_padding:
data += '=' * (4 - missing_padding)
try:
return b64decode(data, altchars)
except binascii.Error:
# Dropping last character and try again
lens = len(data) - 1
lenx = lens - lens % (4 if lens % 4 else 4)
return b64decode(data[:lenx], altchars)


@Configuration()
class B64Command(StreamingCommand):
"""
Expand Down Expand Up @@ -90,16 +108,23 @@ def stream(self, records):

if self.action == "decode":
def fct(s):
# Detect URL-safe base64 encodings
if "-" in s or "_" in s:
alt_chars = "-_"
else:
alt_chars = "+/"
# Fix padding
if self.fix_padding:
s += "==="
s = b64decode(s)
s = decode_base64(s, alt_chars)
else:
s = b64decode(s, alt_chars)
return s.decode(self.encoding, errors=errors)
else:
def fct(s):
if isinstance(s, str):
# Convert to bytes if encode and the field is a string
s = s.encode(self.encoding)
# Splunk always sends data to us in UTF-8
return b64encode(s).decode("utf-8")

if self.mode == "append":
Expand Down
38 changes: 15 additions & 23 deletions tests/test_base64.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,24 @@
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "bin")) # noqa


import b64

# COOKIECUTTER-TODO: Fill in unit tests logic, as required. Remove default tests
from b64 import decode_base64


class TestB64Command(unittest.TestCase):
def test_example001(self):
""" Descrption of the test ... """
actual = "1"
self.assertEqual(actual, "1")

def test_example002(self):
""" Descrption of a exception raising test """
with self.assertRaises(ValueError):
int("apple") > float()

@unittest.expectedFailure
def test_not_working(self):
""" Demonstration of a failing tests """
# This pattern is useful for known bug scenarios, once fixed, remove @expectFailure
raise Exception("We know dis! (Dr Wenowdis)")

@unittest.skipUnless("SPLUNK_HOME" in os.environ, "Need 'SPLUNK_HOME' to run test")
def test_that_needs_splunk(self):
# Some test that will only run if SPLUNK_HOME is set
os.listdir(os.environ["SPLUNK_HOME"])
def test_decode_simple_ascii(self):
self.assertEqual(decode_base64("ZnJlZA==").decode("utf-8"), "fred")
self.assertEqual(decode_base64("TXkgRmFuY3kgQkFTRTY0IFRlWFQ="), b"My Fancy BASE64 TeXT")

def test_padding_fix(self):

self.assertEqual(decode_base64("V2VsY29tZQ=="), b"Welcome") # Correct
self.assertEqual(decode_base64("V2VsY29tZQ="), b"Welcome")
self.assertEqual(decode_base64("V2VsY29tZQ"), b"Welcome")
self.assertEqual(decode_base64("V2VsY29tZQ==="), b"Welcome") # Ignore extras

def test_fixing_utf16le(self):
self.assertEqual(decode_base64("fAAgAGUAdgBhAGwAIABzAGUAc").decode("utf-16le"),
"| eval se")


if __name__ == '__main__':
Expand Down

0 comments on commit 389c502

Please sign in to comment.