Skip to content

Commit

Permalink
add start of 1brc example
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Jan 7, 2024
1 parent 82fc327 commit 88d214a
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 0 deletions.
11 changes: 11 additions & 0 deletions examples/1brc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Example for 1 Billion Row contest.

KlongPy isn't very suited to this problem because most of the problem is processing the data as it's read. For KlongPy to shine here, the data needs to already be loaded into memory so NumPy can do its magic.

So for this example, we should a few neat things you can do with KlongPy.

1. Use Python multiprocessing to parallel read the data. This is the normal solution that's fast.
2. Instead of processing the data as its read, we buffer the data into chunks and pass it to a KlongPy function which does vector processing on it to compute stats.
3. Results are aggregated in the primary process and reported.


4 changes: 4 additions & 0 deletions examples/1brc/par.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.py("par.py")

r::load(.os.argv@0)
.p(r)
109 changes: 109 additions & 0 deletions examples/1brc/par.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# based on https://github.com/ifnesi/1brc/blob/main/calculateAverage.py

# time python3 calculateAverage.py
import os
import multiprocessing as mp
import numpy as np
from klongpy import KlongInterpreter

def chunks(
file_name: str,
max_cpu: int = 8,
) -> list:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())

file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count

start_end = list()
with open(file_name, "r") as f:

def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == "\n"

def next_line(position):
f.seek(position)
f.readline()
return f.tell()

chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)

while not is_new_line(chunk_end):
chunk_end -= 1

if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)

start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)

chunk_start = chunk_end

return cpu_count, start_end


def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
) -> dict:
"""Process each file chunk in a different process"""
print(chunk_start, chunk_end)
stations = []
temps = []
klong = KlongInterpreter()
klong('.l("worker.kg")')
fn = klong['stats']
m = {}
with open(file_name, "r") as f:
f.seek(chunk_start)
for line in f:
chunk_start += len(line)
if chunk_start > chunk_end:
break
location, measurement = line.split(";")
# measurement = float(measurement)
stations.append(location)
temps.append(measurement)
if len(stations) > 2**19:
r = np.asarray([np.asarray(stations), np.asarray(temps,dtype=float)],dtype=object)
fn(m,r)
stations = []
temps = []

r = np.asarray([np.asarray(stations), np.asarray(temps,dtype=float)],dtype=object)
fn(m,r)
return m


def process(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)

return chunk_results

def load(fname):
cpu_count, start_end = chunks(fname)
return process(cpu_count, start_end)


7 changes: 7 additions & 0 deletions examples/1brc/worker.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.pyf("numpy";["min" "max" "mean"])

collect::{min(x),max(x),(+/x),#x}
merge::{:[:_x;y;((x@0)&(y@0)),((x@1)|(y@1)),((x@2)+(y@2)),((x@3)+(y@3))]}
stats::{[m s t g];m::x;s::y@0;t::y@1;{[q];q::s@(x@0);m,q,,merge(m?q;collect(t@x))}'=s;m}


0 comments on commit 88d214a

Please sign in to comment.