diff --git a/.gitignore b/.gitignore index 99e2cea..1f65508 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ /dev/ /docs/build/ /docs/site/ +datasets/ diff --git a/Project.toml b/Project.toml index cdc0ad9..548e8ee 100644 --- a/Project.toml +++ b/Project.toml @@ -3,8 +3,17 @@ uuid = "ebb30991-6a3b-4324-962c-6bc29053301c" authors = ["Pedro Conrado"] version = "0.1.0" +[deps] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +CSVFiles = "5d742f6a-9f54-50ce-8119-2520741973ca" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +MLJ = "add582a8-e3ab-11e8-2d5e-e98b27df1bc7" +RDatasets = "ce6b1742-4840-55fa-b093-852dadbb1d8b" +Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" +Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" + [compat] -julia = "0.1" +julia = "1.0" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/src/EasyStream.jl b/src/EasyStream.jl index 80568f2..eb2834b 100644 --- a/src/EasyStream.jl +++ b/src/EasyStream.jl @@ -1,5 +1,10 @@ module EasyStream + using DataFrames -greet() = print("Hello World!") + include("source.jl") + include("stream.jl") + include("pool.jl") + include("datasets.jl") + using .DatasetsStreams end # module diff --git a/src/datasets.jl b/src/datasets.jl new file mode 100644 index 0000000..66a3de6 --- /dev/null +++ b/src/datasets.jl @@ -0,0 +1,47 @@ +module DatasetsStreams + using CSV, EasyStream + + export Dataset1CDT, DatasetUG_2C_5D + + const defdir = joinpath(dirname(@__FILE__), "..", "datasets") + + function get1cdtdata(dir) + mkpath(joinpath(defdir, "synthetic")) + path = download("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/1CDT.csv") + mv(path, joinpath(defdir, "synthetic/1CDT.csv")) + end + + function getug2c5ddata(dir) + mkpath(joinpath(defdir, "synthetic")) + path = download("https://raw.githubusercontent.com/Conradox/datastreams/master/sinthetic/UG_2C_5D.csv") + mv(path, joinpath(defdir, "synthetic/UG_2C_5D.csv")) + end + + function Dataset1CDT(initial_size::Int, flux_size::Int)::EasyStream.MemoryStream + filename = "$(defdir)/synthetic/1CDT.csv" + + isfile(filename) || get1cdtdata(defdir) + + data = CSV.read(filename; header = false) + + stream = EasyStream.MemoryStream(data, initial_size, flux_size) + + return stream + end + + Dataset1CDT() = Dataset1CDT(150, 1) + + function DatasetUG_2C_5D(initial_size::Int, flux_size::Int)::EasyStream.MemoryStream + filename = "$(defdir)/synthetic/UG_2C_5D.csv" + + isfile(filename) || getug2c5ddata(defdir) + + data = CSV.read(filename; header = false) + + stream = EasyStream.MemoryStream(data, initial_size, flux_size) + + return stream + end + + DatasetUG_2C_5D() = DatasetUG_2C_5D(150, 1) +end diff --git a/src/pool.jl b/src/pool.jl new file mode 100644 index 0000000..553bd26 --- /dev/null +++ b/src/pool.jl @@ -0,0 +1,3 @@ +mutable struct Pool + data::Vector +end \ No newline at end of file diff --git a/src/pool_bk.jl b/src/pool_bk.jl new file mode 100644 index 0000000..3dc2d97 --- /dev/null +++ b/src/pool_bk.jl @@ -0,0 +1,127 @@ +mutable struct Pool{T <: Stream, N} + stream::T + data::Vector{N} + mapping::Vector{Vector{Bool}} + size::Int64 + N +end + +function Pool(stream::Stream) + data = Vector{DataFrame}() + streamdata = next!(stream) + push!(data, streamdata) + mapping = Vector{Vector{Bool}}() + push!(mapping, ones(Bool, size(streamdata, 1))) + + return Pool(stream, data, mapping, size(streamdata, 1), DataFrame) +end + +function next!(pool::Pool) + streamdata = next!(pool.stream) + pool.size += size(streamdata, 1) + + #push!(pool.mapping, rand(Bool, size(streamdata, 1))) + push!(pool.mapping, ones(Bool, size(streamdata, 1))) + push!(pool.data, streamdata) + return streamdata +end + + +##Utils +function useble_length(pool) + count = 0 + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + count += 1 + end + end + end + return count +end + +##Indexing - Using three indexes to move in data through the instances +function Base.getindex(pool::Pool, instance::Int) + count = 1 + for i=1:size(pool.data, 1) + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + if count == instance + return pool.data[i][j, :] + end + count += 1 + end + end + end +end + +function Base.getindex(pool::Pool, i::Colon) + data = pool.N() + for i=1:useble_length(pool) + push!(data, pool[i]) + end + return data +end + +function Base.getindex(pool::Pool, range::UnitRange{Int64}) + data = pool.N() + for i in range + push!(data, pool[i]) + end + return data +end + +##Indexing - Using two indexes to move in data through the instances and features + +Base.getindex(pool::Pool, instance::Int, feature::Int) = pool[instance][feature] + +Base.getindex(pool::Pool, instance::Int, c::Colon) = pool[instance] + +Base.getindex(pool::Pool, instance::Int, range::UnitRange{Int64}) = pool[instance][range] + +Base.getindex(pool::Pool, c::Colon, feature::Int) = pool[:][:, feature] + +Base.getindex(pool::Pool, c::Colon, range::UnitRange{Int64}) = pool[:][:, range] + +Base.getindex(pool::Pool, c1::Colon, c2::Colon) = pool[:] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, feature::Int) = pool[range][:, feature] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, c::Colon) = pool[range] + +Base.getindex(pool::Pool, range::UnitRange{Int64}, range2::UnitRange{Int64}) = pool[range][:, range2] + +##Indexing - Using three indexes to move in data through the instances, features, samples + + +function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Int) + count = 1 + data = pool.N() + for j=1:size(pool.data[sample], 1) + if pool.mapping[sample][j] + push!(data, pool.data[sample][j, :]) + count += 1 + end + end + return data +end + +function Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::UnitRange{Int64}) + count = 1 + data = pool.N() + for i=range + for j=1:size(pool.data[i], 1) + if pool.mapping[i][j] + if count == instance + push!(data, pool.data[i][j, :]) + end + count += 1 + end + end + end + return data +end + +Base.getindex(pool::Pool, instance::Colon, feature::Colon, sample::Colon) = pool[:] + +Base.getindex(pool::Pool, instance, feature, sample) = pool[:, :, sample][instance, feature] diff --git a/src/source.jl b/src/source.jl new file mode 100644 index 0000000..151e1cf --- /dev/null +++ b/src/source.jl @@ -0,0 +1,52 @@ +using Tables + +abstract type AbstractSource end + +mutable struct Source <: AbstractSource + table + position::Int + initial_size::Int + batch::Int +end + +function Source(table, initial_size::Int, batch::Int) + if !Tables.istable(table) + @error "não é um tipo table" + end + + if initial_size > size(table, 1) + initial_size = size(table, 1) + @warn "initial size é maior que o arquivo e será definido para o tamanho do arquivo" + end + + if initial_size == 0 + @warn "initial size é zero" + end + + if batch == 0 + @warn "flux size é zero" + end + + return Source(table, 0, initial_size, batch) +end + +function next(source::Source) + if source.position < source.initial_size + source.position = source.initial_size + return source.table[1:source.initial_size, :] + end + + if source.position >= size(source.table, 1) + return nothing + end + + if source.position < source.initial_size + source.position = source.initial_size + index = 1:source.initial_size + else + index = (source.position + 1):(source.position + source.batch) + source.position = source.position + source.batch + end + + return source.table[index, :] +end \ No newline at end of file diff --git a/src/stream.jl b/src/stream.jl new file mode 100644 index 0000000..9576e92 --- /dev/null +++ b/src/stream.jl @@ -0,0 +1,24 @@ +struct Stream + source::AbstractSource + data_tables::Vector +end + +Stream(source::AbstractSource) = Stream(source, Vector{Any}()) + +function next(stream::Stream; f::Function = copyall) + data = next(stream.source) + + elements = f(size(data)[1], length(stream.data_tables)) + + for i=1:length(stream.data_tables) + append!(stream.data_tables[i], data[elements[:, i], :]) + end +end + +copyall(qnt_elements, qnt_tables) = ones(Bool, qnt_elements, qnt_tables) + +function publish(stream::Stream, data_tables...) + for data_table in data_tables + push!(stream.data_tables, data_table) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index c7de039..f6f3d03 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,115 @@ -using EasyStream using Test +using EasyStream + +@testset "Dataset Test" begin + pool = EasyStream.Dataset1CDT() |> EasyStream.Pool + @test size(pool[:], 1) == 150 + + initial_size = 200 + flux_size = 5 + pool = EasyStream.Dataset1CDT(initial_size, flux_size) |> EasyStream.Pool + @test size(pool[:], 1) == 200 + + pool = EasyStream.DatasetUG_2C_5D() |> EasyStream.Pool + @test size(pool[:], 1) == 150 + + initial_size = 200 + flux_size = 5 + pool = EasyStream.DatasetUG_2C_5D(initial_size, flux_size) |> EasyStream.Pool + @test size(pool[:], 1) == 200 +end + +@testset "Stream Test" begin + stream = EasyStream.Dataset1CDT() + @test size(EasyStream.next!(stream), 1) == 150 + x = EasyStream.next!(stream) + + @test size(EasyStream.next!(stream), 1) == 1 + @test size(EasyStream.next!(stream), 1) == 1 + + initial_size = 200 + flux_size = 5 + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size + @test size(EasyStream.next!(stream), 1) == flux_size + @test size(EasyStream.next!(stream), 1) == flux_size + + initial_size = 16000 + flux_size = 1 + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size + @test EasyStream.next!(stream) == nothing + + initial_size = 16001 + flux_size = 1 + stream = EasyStream.Dataset1CDT(initial_size, flux_size) + @test size(EasyStream.next!(stream), 1) == initial_size - 1 + @test EasyStream.next!(stream) == nothing + + @test_logs (:warn, "initial size é zero") EasyStream.Dataset1CDT(0, 1) + @test_logs (:warn, "flux size é zero") EasyStream.Dataset1CDT(1, 0) +end + +@testset "Pool Indexing" begin + @testset "Test using one index" begin + pool = EasyStream.Dataset1CDT() |> EasyStream.Pool + test_data = pool.data[1] + + data_size = size(test_data, 1) + for i=1:data_size + @test pool[i] == test_data[i,:] + end + + @test pool[:] == test_data[:, :] + + for i=1:data_size + @test pool[1:i] == test_data[1:i,:] + end + + ## Testing mapper + counter = pool.stream.initial_size + for i=1:20 + EasyStream.next!(pool) + if pool.mapping[i+1][1] + counter += 1 + end + @test size(pool[:], 1) == counter + end + + + #@test_throws BoundsError pool[-1] + + #@test_throws BoundsError pool[data_size + 1] + end + + @testset "Test using two index " begin + pool = EasyStream.Dataset1CDT() |> EasyStream.Pool + + @test pool[1, :]== pool[1] + + for i=1:length(pool[1]) + @test pool[1, i] == pool[1][i] + end + + for i=1:length(pool[1]) + @test pool[:, i] == pool[:][:, i] + end + + @test pool[:] == pool[:, :] + + #TODO Criação de testes unitários para o acesso ao pool usando range + #= + N_INSTANCES = size(pool[:], 1) + N_FEATURES = length(pool[1]) + + @test_throws BoundsError pool[1, N_FEATURES + 1] + @test_throws BoundsError pool[:, N_FEATURES + 1] + @test_throws BoundsError pool[N_INSTANCES + 1, :] + + @test_throws BoundsError pool[1, -1] + @test_throws BoundsError pool[:, -1] + @test_throws BoundsError pool[-1, :] + =# + end -@testset "EasyStream.jl" begin - # Write your own tests here. end diff --git a/test/test.jl b/test/test.jl new file mode 100644 index 0000000..646cca0 --- /dev/null +++ b/test/test.jl @@ -0,0 +1,62 @@ +using Revise + +Pkg.activate(".") + +using EasyStream + +stream = EasyStream.Dataset1CDT() + +stream + +pool[150,:] + +EasyStream.next!(stream) + +stream[1,:] + + +using DataFrames + +function createdummydatasetone() + df = DataFrame() + df[:user] = [1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 7, 7, 7] + df[:item] = [1, 1, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 1, 2, 4, 5, 6, 2, 4, 5] + df[:rating] = [2.5, 3.5, 3.0, 3.5, 2.5, 3.0, 3, 3.5, 1.5, 5, 3, 3.5, 2.5, 3.0, 3.5, 4.0, 3.5, 3.0, 4.0, 2.5, 4.5, 3.0, 4.0, 2.0, 3.0, 2.0, 3.0, 3.0, 4.0, 5.0, 3.5, 3.0, 4.5, 4.0, 1.0] + return df +end + +df = createdummydatasetone() +EasyStream.Source(df, 100, 10) + +using CSV + +data = CSV.read("datasets/synthetic/1CDT.csv"; header = true) + +data = CSV.File("datasets/synthetic/1CDT.csv"; header = false) + +data = CSV.Rows("datasets/synthetic/1CDT.csv"; header = true, ignoreemptylines = true) +data +for row in CSV.Rows("datasets/synthetic/1CDT.csv") + println("$row") + break +end + +using Revise +using EasyStream +using CSV +using DataFrames + +data = CSV.read("datasets/synthetic/1CDT.csv"; header = false) +source = EasyStream.Source(data, 10, 5) + +stream = EasyStream.Stream(source) + +table1 = Vector() + +EasyStream.publish(stream, table1) +stream.data_tables + +EasyStream.next(stream) +length(table1) + +table1[1] \ No newline at end of file