Access and Query Amazon Athena via DBI/JDBC
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
3.3 KiB

5 years ago
list(
"-7" = as.logical, # BIT
"-6" = as.integer, # TINYINT
"-5" = bit64::as.integer64, # BIGINT
"-4" = as.character, # LONGVARBINARY
"-3" = as.character, # VARBINARY
"-2" = as.character, # BINARY
"-1" = as.character, # LONGVARCHAR
"0" = as.character, # NULL
"1" = as.character, # CHAR
"2" = as.double, # NUMERIC
"3" = as.double, # DECIMAL
"4" = as.integer, # INTEGER
"5" = as.integer, # SMALLINT
"6" = as.double, # FLOAT
"7" = as.double, # REAL
"8" = as.double, # DOUBLE
"12" = as.character, # VARCHAR
"16" = as_logical, # BOOLEAN
5 years ago
"91" = as_date, # DATE
"92" = as.character, # TIME
"93" = as_posixct, # TIMESTAMP
"2003" = as.character, # ARRAY
5 years ago
"1111" = as.character # OTHER
) -> .jdbc_converters
5 years ago
#' Retrieve connection/driver/database metadata
#'
#' @param dbObj driver/connection
#' @param ... unused
5 years ago
#' @export
#' @keywords internal
setMethod("dbGetInfo", "AthenaDriver", def=function(dbObj, ...)
list(
name = "AthenaJDBC",
5 years ago
driver_version = metis.jars::simba_driver_version(),
package_version = utils::packageVersion("metis.jars")
)
)
5 years ago
5 years ago
#' Retrieve connection/driver//database metadata
#'
#' @param dbObj driver/connection
#' @param ... unused
#' @export
#' @keywords internal
setMethod("dbGetInfo", "AthenaConnection", def=function(dbObj, ...)
list(
name = "AthenaJDBC",
driver_version = list.files(system.file("java", package="metis.lite"), "jar$")[1],
5 years ago
package_version = utils::packageVersion("metis")
)
)
5 years ago
#' Fetch records from a previously executed query
#'
#' Fetch the next `n` elements (rows) from the result set and return them
#' as a data.frame.
#'
#' @param res An object inheriting from [DBIResult-class], created by
#' [dbSendQuery()].
#' @param n maximum number of records to retrieve per fetch. Use `n = -1`
#' or `n = Inf`
#' to retrieve all pending records. Some implementations may recognize other
#' special values.
5 years ago
#' @param block clock size
#' @param ... Other arguments passed on to methods.
#' @export
setMethod(
"fetch",
signature(res="AthenaResult", n="numeric"),
def = function(res, n, block = 1000L, ...) {
5 years ago
nms <- c()
athena_type_convert <- list()
cols <- .jcall(res@md, "I", "getColumnCount")
5 years ago
for (i in 1:cols) {
ct <- as.character(.jcall(res@md, "I", "getColumnType", i))
5 years ago
athena_type_convert[[i]] <- .jdbc_converters[[ct]]
nms <- c(nms, .jcall(res@md, "S", "getColumnLabel", i))
# message(ct, "|", tail(nms, 1))
5 years ago
}
athena_type_convert <- set_names(athena_type_convert, nms)
out <- callNextMethod(res = res, n = n, block = block, ...)
5 years ago
# print(str(out))
5 years ago
for (nm in names(athena_type_convert)) {
f <- athena_type_convert[[nm]]
if (length(f) == 0) f <- as.character # catchall in case AMZN is tricksy
out[[nm]] <- f(out[[nm]])
5 years ago
}
out
}
)
#' AthenaJDBC
#'
#' @param conn Athena connection
#' @param statement SQL statement
#' @param ... unused
#' @importFrom rJava .jcall
#' @export
setMethod(
"dbGetQuery",
signature(conn="AthenaConnection", statement="character"),
definition = function(conn, statement, ...) {
r <- dbSendQuery(conn, statement, ...)
on.exit(.jcall(r@stat, "V", "close"))
res <- fetch(r, -1, block = conn@fetch_size)
5 years ago
class(res) <- c("tbl_df", "tbl", "data.frame")
res
}
)