Skip to content

Commit

Permalink
Respect lines argument of resp_stream_lines
Browse files Browse the repository at this point in the history
  • Loading branch information
jcheng5 committed Sep 6, 2024
1 parent 6539d2f commit 03c5c32
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 29 deletions.
74 changes: 48 additions & 26 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -226,43 +226,64 @@ find_line_boundary <- function(buffer) {

#' @export
#' @rdname resp_stream_raw
#' @param lines How many lines to read
#' @param lines The maximum number of lines to return at once.
#' @param warn Like [readLines()]: warn if the connection ends without a final
#' EOL.
resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
check_streaming_response(resp)
check_number_whole(lines, min = 0, allow_infinite = TRUE)

check_number_whole(max_size, min = 1, allow_infinite = TRUE)
check_logical(warn)

if (lines == 0) {

Check warning on line 238 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L235-L238

Added lines #L235 - L238 were not covered by tests
# If you want to do that, who am I to judge?
return(character())

Check warning on line 240 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L240

Added line #L240 was not covered by tests
}

line_bytes <- resp_boundary_pushback(resp, max_size, find_line_boundary, include_trailer = TRUE)
if (length(line_bytes) == 0) {
return(character())
lines_read <- character(0)
while (lines > 0) {
line <- resp_stream_oneline(resp, max_size, warn)

Check warning on line 245 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L245

Added line #L245 was not covered by tests
if (length(line) == 0) {
# No more data, either because EOF or req_perform_connection(blocking=FALSE).
# Either way, return what we have

Check warning on line 248 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L247-L248

Added lines #L247 - L248 were not covered by tests
return(lines_read)
}
lines_read <- c(lines_read, line)
lines <- lines - 1
}
lines_read
}

Check warning on line 256 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L251-L256

Added lines #L251 - L256 were not covered by tests
eat_next_lf <- resp$cache$resp_stream_lines_eat_next_lf
resp$cache$resp_stream_lines_eat_next_lf <- FALSE

if (identical(line_bytes, as.raw(0x0A)) && isTRUE(eat_next_lf)) {
# We hit that special edge case, see below
return(resp_stream_lines(resp, lines, max_size, warn))
}
resp_stream_oneline <- function(resp, max_size, warn) {
repeat {
line_bytes <- resp_boundary_pushback(resp, max_size, find_line_boundary, include_trailer = TRUE)
if (length(line_bytes) == 0) {
return(character())
}

# If ending on \r, there's a special edge case here where if the
# next line begins with \n, that byte should be eaten.
if (tail(line_bytes, 1) == 0x0D) {
resp$cache$resp_stream_lines_eat_next_lf <- TRUE
}
eat_next_lf <- resp$cache$resp_stream_oneline_eat_next_lf

Check warning on line 264 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L260-L264

Added lines #L260 - L264 were not covered by tests
resp$cache$resp_stream_oneline_eat_next_lf <- FALSE

if (identical(line_bytes, as.raw(0x0A)) && isTRUE(eat_next_lf)) {

Check warning on line 267 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L266-L267

Added lines #L266 - L267 were not covered by tests
# We hit that special edge case, see below
next

Check warning on line 269 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L269

Added line #L269 was not covered by tests
}

# If ending on \r, there's a special edge case here where if the

Check warning on line 272 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L271-L272

Added lines #L271 - L272 were not covered by tests
# next line begins with \n, that byte should be eaten.
if (tail(line_bytes, 1) == 0x0D) {
resp$cache$resp_stream_oneline_eat_next_lf <- TRUE
}

# Use `resp$body` as the variable name so that if warn=TRUE, you get
# "incomplete final line found on 'resp$body'" as the warning message
`resp$body` <- line_bytes
line_con <- rawConnection(`resp$body`)
on.exit(close(line_con))
# TODO: Use iconv to convert from whatever encoding is specified in the
# response header, to UTF-8
readLines(line_con, n = 1, warn = warn)
# Use `resp$body` as the variable name so that if warn=TRUE, you get

Check warning on line 278 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L277-L278

Added lines #L277 - L278 were not covered by tests
# "incomplete final line found on 'resp$body'" as the warning message
`resp$body` <- line_bytes
line_con <- rawConnection(`resp$body`)
on.exit(close(line_con))
# TODO: Use iconv to convert from whatever encoding is specified in the
# response header, to UTF-8

Check warning on line 284 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L282-L284

Added lines #L282 - L284 were not covered by tests
return(readLines(line_con, n = 1, warn = warn))
}
}

Check warning on line 288 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L287-L288

Added lines #L287 - L288 were not covered by tests
# Slices the vector using the only sane semantics: start inclusive, end
Expand Down Expand Up @@ -412,7 +433,8 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
}
}

Check warning on line 435 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L433-L435

Added lines #L433 - L435 were not covered by tests
#' @param max_size The maximum number of bytes to buffer; once
#' @param max_size The maximum number of bytes to buffer; once this number of
#' bytes has been exceeded without a line/event boundary, an error is thrown.
#' @export
#' @rdname resp_stream_raw
# TODO: max_size
Expand Down
10 changes: 7 additions & 3 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions tests/testthat/test-req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,26 @@ test_that("streams the specified number of lines", {
resp_stream_lines(resp1, 3),
c("d", "e")
)
expect_equal(
resp_stream_lines(resp1, 3),
character()
)

resp2 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp2))
Sys.sleep(0.2)
expect_equal(
resp_stream_lines(resp2, 3),
c("a", "b", "c")
)
expect_equal(
resp_stream_lines(resp2, 3),
c("d", "e")
)
expect_equal(
resp_stream_lines(resp2, 3),
character()
)
})

test_that("can feed sse events one at a time", {
Expand Down

0 comments on commit 03c5c32

Please sign in to comment.