|
|
@ -23,45 +23,89 @@ list( |
|
|
|
"1111" = as.character # OTHER |
|
|
|
) -> .jdbc_converters |
|
|
|
|
|
|
|
#' AthenaJDBC |
|
|
|
#' |
|
|
|
#' @param conn Athena connection |
|
|
|
#' @param statement SQL statement |
|
|
|
#' @param ... unused |
|
|
|
#' @importFrom rJava .jcall |
|
|
|
#' @export |
|
|
|
setMethod( |
|
|
|
#' @keywords internal |
|
|
|
setMethod("dbGetInfo", "AthenaDriver", def=function(dbObj, ...) |
|
|
|
list( |
|
|
|
name = "AthenaJDBC", |
|
|
|
driver_version = list.files(system.file("java", package="metis.lite"), "jar$")[1], |
|
|
|
package_version = utils::packageVersion("metis.lite") |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
"dbGetQuery", |
|
|
|
signature(conn="AthenaConnection", statement="character"), |
|
|
|
|
|
|
|
definition = function(conn, statement, type_convert=FALSE, ...) { |
|
|
|
#' @export |
|
|
|
#' @keywords internal |
|
|
|
setMethod("dbGetInfo", "AthenaConnection", def=function(dbObj, ...) |
|
|
|
list( |
|
|
|
name = "AthenaJDBC", |
|
|
|
driver_version = list.files(system.file("java", package="metis.lite"), "jar$")[1], |
|
|
|
package_version = utils::packageVersion("metis.lite") |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
r <- dbSendQuery(conn, statement, ...) |
|
|
|
|
|
|
|
on.exit(.jcall(r@stat, "V", "close")) |
|
|
|
|
|
|
|
#message("dbGetQuery()") |
|
|
|
#' Fetch records from a previously executed query |
|
|
|
#' |
|
|
|
#' Fetch the next `n` elements (rows) from the result set and return them |
|
|
|
#' as a data.frame. |
|
|
|
#' |
|
|
|
#' @param res An object inheriting from [DBIResult-class], created by |
|
|
|
#' [dbSendQuery()]. |
|
|
|
#' @param n maximum number of records to retrieve per fetch. Use `n = -1` |
|
|
|
#' or `n = Inf` |
|
|
|
#' to retrieve all pending records. Some implementations may recognize other |
|
|
|
#' special values. |
|
|
|
#' @param ... Other arguments passed on to methods. |
|
|
|
#' @export |
|
|
|
setMethod( |
|
|
|
"fetch", |
|
|
|
signature(res="AthenaResult", n="numeric"), |
|
|
|
def = function(res, n, block = 1000L, ...) { |
|
|
|
|
|
|
|
nms <- c() |
|
|
|
athena_type_convert <- list() |
|
|
|
|
|
|
|
cols <- .jcall(r@md, "I", "getColumnCount") |
|
|
|
cols <- .jcall(res@md, "I", "getColumnCount") |
|
|
|
|
|
|
|
for (i in 1:cols) { |
|
|
|
ct <- as.character(.jcall(r@md, "I", "getColumnType", i)) |
|
|
|
ct <- as.character(.jcall(res@md, "I", "getColumnType", i)) |
|
|
|
athena_type_convert[[i]] <- .jdbc_converters[[ct]] |
|
|
|
nms <- c(nms, .jcall(r@md, "S", "getColumnLabel", i)) |
|
|
|
nms <- c(nms, .jcall(res@md, "S", "getColumnLabel", i)) |
|
|
|
} |
|
|
|
|
|
|
|
athena_type_convert <- set_names(athena_type_convert, nms) |
|
|
|
|
|
|
|
res <- fetch(r, -1, block = 1000) |
|
|
|
out <- callNextMethod(res = res, n = n, block = block, ...) |
|
|
|
|
|
|
|
for (nm in names(athena_type_convert)) { |
|
|
|
res[[nm]] <- athena_type_convert[[nm]](res[[nm]]) |
|
|
|
out[[nm]] <- athena_type_convert[[nm]](out[[nm]]) |
|
|
|
} |
|
|
|
|
|
|
|
out |
|
|
|
|
|
|
|
} |
|
|
|
) |
|
|
|
|
|
|
|
#' AthenaJDBC |
|
|
|
#' |
|
|
|
#' @param conn Athena connection |
|
|
|
#' @param statement SQL statement |
|
|
|
#' @param ... unused |
|
|
|
#' @importFrom rJava .jcall |
|
|
|
#' @export |
|
|
|
setMethod( |
|
|
|
|
|
|
|
"dbGetQuery", |
|
|
|
signature(conn="AthenaConnection", statement="character"), |
|
|
|
|
|
|
|
definition = function(conn, statement, type_convert=FALSE, ...) { |
|
|
|
|
|
|
|
r <- dbSendQuery(conn, statement, ...) |
|
|
|
|
|
|
|
on.exit(.jcall(r@stat, "V", "close")) |
|
|
|
|
|
|
|
res <- fetch(r, -1, block = 1000L) |
|
|
|
|
|
|
|
class(res) <- c("tbl_df", "tbl", "data.frame") |
|
|
|
|
|
|
|
res |
|
|
|