Access and Query Amazon Athena via DBI/JDBC
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

246 рядки
6.3 KiB

5 роки тому
structure(
0:6,
5 роки тому
.Names = c(
"OFF", "FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"
)
)-> .ll_trans
7 роки тому
#' AthenaJDBC
#'
#' @export
6 роки тому
setClass(
"AthenaDriver",
6 роки тому
representation(
"JDBCDriver",
identifier.quote = "character",
jdrv = "jobjRef"
)
6 роки тому
)
7 роки тому
#' AthenaJDBC
#'
5 роки тому
#' @param identifier.quote how to quote identifiers
7 роки тому
#' @export
6 роки тому
Athena <- function(identifier.quote = '`') {
6 роки тому
6 роки тому
JDBC(
driverClass = "com.simba.athena.jdbc.Driver",
5 роки тому
metis.jars::metis_jar_path(),
6 роки тому
identifier.quote = identifier.quote
) -> drv
6 роки тому
7 роки тому
return(as(drv, "AthenaDriver"))
6 роки тому
7 роки тому
}
#' AthenaJDBC
#'
5 роки тому
#' Connect to Athenadb
5 роки тому
#'
5 роки тому
#' Mandatory JDBC connection parameters are also named function
#' parameters. You can use `...` to supply additional/optional
#' parameters.
#'
#' @section Higlighted Extra Driver Configuration Options:
#'
#' These are take from the second item in References. See that resource
#' for more information.
5 роки тому
#'
#' - `BinaryColumnLength`: <int> The maximum data length for `BINARY` columns. Default `32767L`
#' - `ComplexTypeColumnLength`: <int> The maximum data length for `ARRAY`, `MAP`, and `STRUCT` columns. Default `65535L`
#' - `StringColumnLength`: <int> The maximum data length for `STRING` columns. Default `255L`
#'
5 роки тому
#' @param drv driver
5 роки тому
#' @param Schema The name of the database schema to use when a schema is not explicitly
#' specified in a query. You can still issue queries on other schemas by explicitly
#' specifying the schema in the query.
#' @param AwsRegion AWS region the Athena tables are in
#' @param AwsCredentialsProviderClass JDBC auth provider; You can add a
#' lengrh1 character vecrtor named parameter `AwsCredentialsProviderArguments`
#' to the `dbConnect()` call to use alternate auth providers. Use a
#' comma-separated list of String arguments.
#' @param S3OutputLocation A write-able bucket on S3 that you have permissions for
#' @param MaxErrorRetry,ConnectTimeout,SocketTimeout
6 роки тому
#' technical connection info that you should only muck with if you know what you're doing.
5 роки тому
#' @param LogLevel,LogPath The Athena JDBC driver can provide a decent bit
#' of data in logs. Set this to a temporary directory or something `log4j` can use. For
#' `LogPath` use the names ("`INFO`", "`DEBUG`", "`WARN`", "`ERROR`", "`ALL`", "`OFF`", "`FATAL`", "`TRACE`") or
#' their corresponding integer values 0-6.
5 роки тому
#' @param fetch_size Athena results fetch size
5 роки тому
#' @param ... passed on to the driver. See Details.
#' @references [Connect with JDBC](https://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html);
#' [Simba Athena JDBC Driver with SQL Connector Installation and Configuration Guide](https://s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC_2.0.6/docs/Simba+Athena+JDBC+Driver+Install+and+Configuration+Guide.pdf)
7 роки тому
#' @export
setMethod(
"dbConnect",
"AthenaDriver",
def = function(
drv,
5 роки тому
Schema = "default",
AwsRegion = "us-east-1",
5 роки тому
AwsCredentialsProviderClass = paste0(c(
"com", "simba", "athena", "amazonaws","auth",
"DefaultAWSCredentialsProviderChain"
), collapse = "."),
5 роки тому
S3OutputLocation = Sys.getenv("AWS_S3_STAGING_DIR", unset = ""),
MaxErrorRetry = 10,
ConnectTimeout = 10000,
SocketTimeout = 10000,
LogPath = "",
LogLevel = 0,
5 роки тому
fetch_size = 1000L,
...) {
7 роки тому
6 роки тому
conn_string = sprintf(
5 роки тому
'jdbc:awsathena://athena.%s.amazonaws.com:443/%s', AwsRegion, Schema
6 роки тому
)
7 роки тому
5 роки тому
if (!(LogLevel %in% 0:6)) LogLevel <- .ll_trans[LogLevel]
6 роки тому
callNextMethod(
drv,
conn_string,
5 роки тому
S3OutputLocation = S3OutputLocation,
Schema = Schema,
AwsRegion = AwsRegion,
MaxErrorRetry = MaxErrorRetry,
ConnectTimeout = ConnectTimeout,
SocketTimeout = SocketTimeout,
LogPath = LogPath,
LogLevel = LogLevel,
AwsCredentialsProviderClass = AwsCredentialsProviderClass,
6 роки тому
...
) -> jc
7 роки тому
5 роки тому
jc <- as(jc, "AthenaConnection")
jc@fetch_size <- as.integer(fetch_size)
return(jc)
7 роки тому
}
)
#' AthenaJDBC
#'
5 роки тому
#' @param jc job ref
#' @param identifier.quote how to quote identifiers
#' @param fetch_size Athena results fetch size
7 роки тому
#' @export
5 роки тому
setClass("AthenaConnection", representation("JDBCConnection", jc="jobjRef", identifier.quote="character", fetch_size="integer"))
# setClass("AthenaConnection", contains = "JDBCConnection")
7 роки тому
#' AthenaJDBC
#'
#' @export
setClass("AthenaResult", contains = "JDBCResult")
#' AthenaJDBC
#'
6 роки тому
#' @param conn Athena connection
#' @param statement SQL statement
#' @param ... unused
7 роки тому
#' @export
setMethod(
"dbSendQuery",
6 роки тому
signature(conn="AthenaConnection", statement="character"),
7 роки тому
6 роки тому
definition = function(conn, statement, ...) {
7 роки тому
return(as(callNextMethod(), "AthenaResult"))
}
)
#' AthenaJDBC
#'
6 роки тому
#' @param conn Athena connection
#' @param pattern table name pattern
#' @param schema Athena schema name
#' @param ... unused
#' @export
setMethod(
"dbListTables",
signature(conn="AthenaConnection"),
6 роки тому
definition = function(conn, pattern='*', schema, ...) {
6 роки тому
if (missing(pattern)) {
dbGetQuery(
conn, sprintf("SHOW TABLES IN %s", schema)
) -> x
} else {
dbGetQuery(
conn, sprintf("SHOW TABLES IN %s %s", schema, dbQuoteString(conn, pattern))
) -> x
}
x$tab_name
}
)
#' AthenaJDBC
#'
#' @param conn Athena connection
#' @param name table name
#' @param schema Athena schema name
#' @param ... unused
#' @export
setMethod(
"dbExistsTable",
signature(conn="AthenaConnection", name="character"),
6 роки тому
definition = function(conn, name, schema, ...) {
6 роки тому
length(dbListTables(conn, schema=schema, pattern=name)) > 0
}
)
#' AthenaJDBC
#'
#' @param conn Athena connection
#' @param name table name
#' @param schema Athena schema name
#' @param ... unused
#' @export
setMethod(
"dbListFields",
signature(conn="AthenaConnection", name="character"),
6 роки тому
definition = function(conn, name, schema, ...) {
6 роки тому
query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, name)
res <- dbGetQuery(conn, query)
colnames(res)
}
)
#' AthenaJDBC
#'
#' @param conn Athena connection
#' @param name table name
#' @param schema Athena schema name
#' @param ... unused
#' @export
setMethod(
"dbReadTable",
signature(conn="AthenaConnection", name="character"),
6 роки тому
definition = function(conn, name, schema, ...) {
6 роки тому
query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, dbQuoteString(conn, name))
dbGetQuery(conn, query)
7 роки тому
}
)