PyStreamAPI is a stream library for Python inspired by the Java Stream API and implements almost exact the same method names and functionality as Java Stream API!
PyStreamAPI uses lazy execution and offers sequential as well as parallel streams.
Now you might think: Why another library? There are so many!
Here are a few of the advantages this implementation has:
-
Sequential as well as parallel version
-
Lazy execution
-
High speed
-
100% test coverage
-
Pythonic implementation
-
Clean and easy to read code
Here a small example:
from pystreamapi import Stream
Stream.parallel_of([" ", '3', None, "2", 1, ""]) \
.filter(lambda x: x is not None) \
.map(str) \
.map(lambda x: x.strip()) \
.filter(lambda x: len(x) > 0) \
.map(int) \
.sorted() \
.for_each(print) # Output: 1 2 3
The same code in Java:
Object[] words = { " ", '3', null, "2", 1, "" };
Arrays.stream( words )
.filter( Objects::nonNull )
.map( Objects::toString )
.map( String::trim )
.filter( s -> ! s.isEmpty() )
.map( Integer::parseInt )
.sorted()
.forEach( System.out::println ); // Output: 1 2 3
A stream is a pipeline, in which elements from an Iterable are computed on demand. It is similar to SQL queries and is used to manipulate data.
E.g. Get the second-highest salary of Employee
Select distinct Salary from Employee e1
where 2=Select count(distinct Salary)
from Employee e2 where e1.salary<=e2.salary;
Now the same thing in Python
employees = [...] # A list with employee objects
Stream.of(employees) \
.map(lambda x: x.salary) \
.sorted() \
.reversed() \
.to_list()[1] # Returns the second-highest salary
pystreamapi.Stream
represents a stream on which one or more operations can be performed. Stream operations are either intermediate or terminal.
The terminal operations return a result of a specific type, and intermediate operations return the stream itself, so we can chain multiple methods together to perform the operation in multiple steps.
Again the example from above:
Stream.of(employees) \ # Create a BaseStream object
.map(lambda x: x.salary) \ # Intermediate Operation
.sorted() \ # Intermediate Operation
.reversed() \ # Intermediate Operation
.to_list()[1] # Terminal Operation
Operations can be performed on a stream in parallel or sequentially. When parallel, it is called parallel stream else it is a sequential stream.
Based on the above points, a stream is:
- Not a data structure
- Not offering indexed access
- Designed for lambdas
- Easy to aggregate as lists or tuples/sets
- Parallelizable
- Processing lazy
To start using PyStreamAPI just install the module with this command:
pip install streams.py
Afterwards you can import it with:
from pystreamapi import Stream
🎉 PyStreamAPI is now ready to process your data
There are a few factory methods that create new Streams.
Stream.of([1, 2, 3]) # Can return a sequential or a parallel stream
Using the of()
method will let the implementation decide which Stream
to use.
Note
Currently, it returns always a
SequentialStream
Stream.parallel_of([1, 2, 3]) # Returns a parallel stream
Stream.sequential_of([1, 2, 3]) # Returns a sequential stream
Stream.of_noneable([1, 2, 3]) # Can return a sequential or a parallel stream
If the source is None
, you get an empty Stream
Stream.iterate(0, lambda n: n + 2)
Creates a Stream of an infinite Iterator like 0, 2, 4, 6, 8, 10, 12, 14...
Note Do not forget to limit the stream with
.limit()
Stream.concat(Stream.of([1, 2]), Stream.of([3, 4]))
# Like Stream.of([1, 2, 3, 4])
Creates a new Stream from multiple Streams. Order doesn't change
Returns a stream consisting of the elements of this stream that match the given predicate.
Stream.of([1, 2, 3, None]) \
.filter(lambda x: x is not None) \
.for_each(print) # 1 2 3
Returns a stream consisting of the results of applying the given function to the elements of this stream.
Stream.of([1, "2", 3.0, None]) \
.map(str) \
.to_list() # ["1", "2", "3.0", "None"]
Returns a stream consisting of the results of applying the int()
function to the elements of this stream. Note that this method is not none safe.
Stream.of([1, "2", 3.0]) \
.map_to_int() \
.to_list() # [1, 2, 3]
Returns a stream consisting of the results of applying the str()
function to the elements of this stream.
Stream.of([1, 2, 3]) \
.map_to_str() \
.to_list() # ["1", "2", "3"]
Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element.
Stream.of([1, 2, 3]) \
.flat_map(lambda x: self.stream([x, x])) \
.to_list() # [1, 1, 2, 2, 3, 3]
Returns a stream consisting of the distinct elements of this stream.
Stream.of([1, 1, 2, 3]) \
.distinct() \
.to_list() # [1, 2, 3]
Returns a stream consisting of the elements of this stream, sorted according to natural order.
Stream.of([2, 9, 1]) \
.sorted() \
.to_list() # [1, 2, 9]
Returns a stream consisting of the elements of this stream in reverse order.
Stream.of([1, 2, 3]) \
.reversed() \
.to_list() # [3, 2, 1]
Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.
Stream.of([2, 1, 3]) \
.sorted() \
.peek(print) \ # 1, 2, 3
.reversed() \
.for_each(print) # 3, 2, 1
Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.
Stream.of([1, 2, 3]) \
.limit(2) \
.to_list() # [1, 2]
Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream.
Stream.of([1, 2, 3]) \
.skip(2) \
.to_list() # [3]
Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream that match the given predicate.
Stream.of([1, 2, 3]) \
.take_while(lambda x: x < 3) \
.to_list() # [1, 2]
Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping the longest prefix of elements that match the given predicate.
Stream.of([1, 2, 3]) \
.drop_while(lambda x: x < 3) \
.to_list() # [3]
These operations will trigger the pipeline's execution
Returns whether all elements of this stream match the provided predicate.
Stream.of([1, 2, 3]) \
.all_match(lambda x: x > 0) # True
Returns whether any elements of this stream match the provided predicate.
Stream.of([1, 2, 3]) \
.any_match(lambda x: x < 0) # False
Returns whether no elements of this stream match the provided predicate.
Stream.of([1, 2, 3]) \
.none_match(lambda x: x < 0) # True
Returns the number of elements in this stream.
Stream.of([1, 2, 3]) \
.count() # 3
Returns the minimum element of this stream
Stream.of([1, 2, 3]) \
.min() # 1
Returns the maximum element of this stream
Stream.of([1, 2, 3]) \
.max() # 3
Returns the result of reducing the elements of this stream to a single value using the provided reducer.
Stream.of([1, 2, 3]) \
.reduce(lambda x, y: x + y) # 6
Performs the provided action for each element of this stream.
Stream.of([1, 2, 3]) \
.for_each(print) # 1 2 3
Returns a list containing the elements of this stream.
Stream.of([1, 2, 3]) \
.to_list() # [1, 2, 3]
Returns a set containing the elements of this stream.
Stream.of([1, 2, 3]) \
.to_set() # {1, 2, 3}
Returns a tuple containing the elements of this stream.
Stream.of([1, 2, 3]) \
.to_tuple() # (1, 2, 3)
Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty.
Stream.of([1, 2, 3]) \
.find_first() # Optional[1]
Returns an Optional describing an arbitrary element of this stream, or an empty Optional if the stream is empty.
Stream.of([1, 2, 3]) \
.find_any() # Optional[1]
Stream.parallel_of([" ", '3', None, "2", 1, ""]) \
.filter(lambda x: x is not None) \
.map(str) \
.map(lambda x: x.strip()) \
.filter(lambda x: len(x) > 0) \
.map(int) \
.sorted()\
.for_each(print) # 1 2 3
def fib():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
Stream.of(fib()) \
.limit(10) \
.for_each(print) # 0 1 1 2 3 5 8 13 21 34
Note that parallel Streams are not always faster than sequential Streams. Especially when the number of elements is small, we can expect sequential Streams to be faster.
Bug reports can be submitted in GitHub's issue tracker.
Contributions are welcome! Please submit a pull request or open an issue.