Access and Query Amazon Athena via DBI/JDBC
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
5.6KB

  1. structure(
  2. 0:6,
  3. .Names = c(
  4. "OFF", "FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"
  5. )
  6. )-> .ll_trans
  7. #' AthenaJDBC
  8. #'
  9. #' @export
  10. setClass(
  11. "AthenaDriver",
  12. representation(
  13. "JDBCDriver",
  14. identifier.quote = "character",
  15. jdrv = "jobjRef"
  16. )
  17. )
  18. #' AthenaJDBC
  19. #'
  20. #' @param identifier.quote how to quote identifiers
  21. #' @export
  22. Athena <- function(identifier.quote = '`') {
  23. JDBC(
  24. driverClass = "com.simba.athena.jdbc.Driver",
  25. metis.jars::metis_jar_path(),
  26. identifier.quote = identifier.quote
  27. ) -> drv
  28. return(as(drv, "AthenaDriver"))
  29. }
  30. #' AthenaJDBC
  31. #'
  32. #' Connect to Athena
  33. #'
  34. #' @section Driver Configuration Options:
  35. #'
  36. #' - `BinaryColumnLength`: <int> The maximum data length for `BINARY` columns. Default `32767L`
  37. #' - `ComplexTypeColumnLength`: <int> The maximum data length for `ARRAY`, `MAP`, and `STRUCT` columns. Default `65535L`
  38. #' - `StringColumnLength`: <int> The maximum data length for `STRING` columns. Default `255L`
  39. #'
  40. #' @param drv driver
  41. #' @param provider JDBC auth provider (ideally leave default)
  42. #' @param region AWS region the Athena tables are in
  43. #' @param s3_staging_dir A write-able bucket on S3 that you have permissions for
  44. #' @param schema_name LOL if only this actually worked with Amazon's hacked Presto driver
  45. #' @param max_error_retries,connection_timeout,socket_timeout
  46. #' technical connection info that you should only muck with if you know what you're doing.
  47. #' @param log_path,log_level The Athena JDBC driver can (shockingly) provide a decent bit
  48. #' of data in logs. Set this to a temporary directory or something log4j can use. For
  49. #' `log_level` use the names ("INFO", "DEBUG", "WARN", "ERROR", "ALL", "OFF", "FATAL", "TRACE") or
  50. #' their corresponding integer values 0-6.
  51. #' @param fetch_size Athena results fetch size
  52. #' @param ... passed on to the driver. See Details.
  53. #' @references [Connect with JDBC](https://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html);
  54. #' [Simba Athena JDBC Driver with SQL Connector Installation and Configuration Guide](https://s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC_2.0.6/docs/Simba+Athena+JDBC+Driver+Install+and+Configuration+Guide.pdf)
  55. #' @export
  56. setMethod(
  57. "dbConnect",
  58. "AthenaDriver",
  59. def = function(
  60. drv,
  61. provider = "com.simba.athena.amazonaws.auth.DefaultAWSCredentialsProviderChain",
  62. region = "us-east-1",
  63. s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"),
  64. schema_name = "default",
  65. fetch_size = 1000L,
  66. max_error_retries = 10,
  67. connection_timeout = 10000,
  68. socket_timeout = 10000,
  69. log_path = "",
  70. log_level = 0,
  71. ...) {
  72. conn_string = sprintf(
  73. 'jdbc:awsathena://athena.%s.amazonaws.com:443/%s', region, schema_name
  74. )
  75. if (!(log_level %in% 0:6)) log_level <- .ll_trans[log_level]
  76. callNextMethod(
  77. drv,
  78. conn_string,
  79. S3OutputLocation = s3_staging_dir,
  80. Schema = schema_name,
  81. MaxErrorRetry = max_error_retries,
  82. ConnectTimeout = connection_timeout,
  83. SocketTimeout = socket_timeout,
  84. LogPath = log_path,
  85. LogLevel = log_level,
  86. AwsCredentialsProviderClass = provider,
  87. ...
  88. ) -> jc
  89. jc <- as(jc, "AthenaConnection")
  90. jc@fetch_size <- as.integer(fetch_size)
  91. return(jc)
  92. }
  93. )
  94. #' AthenaJDBC
  95. #'
  96. #' @param jc job ref
  97. #' @param identifier.quote how to quote identifiers
  98. #' @param fetch_size Athena results fetch size
  99. #' @export
  100. setClass("AthenaConnection", representation("JDBCConnection", jc="jobjRef", identifier.quote="character", fetch_size="integer"))
  101. # setClass("AthenaConnection", contains = "JDBCConnection")
  102. #' AthenaJDBC
  103. #'
  104. #' @export
  105. setClass("AthenaResult", contains = "JDBCResult")
  106. #' AthenaJDBC
  107. #'
  108. #' @param conn Athena connection
  109. #' @param statement SQL statement
  110. #' @param ... unused
  111. #' @export
  112. setMethod(
  113. "dbSendQuery",
  114. signature(conn="AthenaConnection", statement="character"),
  115. definition = function(conn, statement, ...) {
  116. return(as(callNextMethod(), "AthenaResult"))
  117. }
  118. )
  119. #' AthenaJDBC
  120. #'
  121. #' @param conn Athena connection
  122. #' @param pattern table name pattern
  123. #' @param schema Athena schema name
  124. #' @param ... unused
  125. #' @export
  126. setMethod(
  127. "dbListTables",
  128. signature(conn="AthenaConnection"),
  129. definition = function(conn, pattern='*', schema, ...) {
  130. if (missing(pattern)) {
  131. dbGetQuery(
  132. conn, sprintf("SHOW TABLES IN %s", schema)
  133. ) -> x
  134. } else {
  135. dbGetQuery(
  136. conn, sprintf("SHOW TABLES IN %s %s", schema, dbQuoteString(conn, pattern))
  137. ) -> x
  138. }
  139. x$tab_name
  140. }
  141. )
  142. #' AthenaJDBC
  143. #'
  144. #' @param conn Athena connection
  145. #' @param name table name
  146. #' @param schema Athena schema name
  147. #' @param ... unused
  148. #' @export
  149. setMethod(
  150. "dbExistsTable",
  151. signature(conn="AthenaConnection", name="character"),
  152. definition = function(conn, name, schema, ...) {
  153. length(dbListTables(conn, schema=schema, pattern=name)) > 0
  154. }
  155. )
  156. #' AthenaJDBC
  157. #'
  158. #' @param conn Athena connection
  159. #' @param name table name
  160. #' @param schema Athena schema name
  161. #' @param ... unused
  162. #' @export
  163. setMethod(
  164. "dbListFields",
  165. signature(conn="AthenaConnection", name="character"),
  166. definition = function(conn, name, schema, ...) {
  167. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, name)
  168. res <- dbGetQuery(conn, query)
  169. colnames(res)
  170. }
  171. )
  172. #' AthenaJDBC
  173. #'
  174. #' @param conn Athena connection
  175. #' @param name table name
  176. #' @param schema Athena schema name
  177. #' @param ... unused
  178. #' @export
  179. setMethod(
  180. "dbReadTable",
  181. signature(conn="AthenaConnection", name="character"),
  182. definition = function(conn, name, schema, ...) {
  183. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, dbQuoteString(conn, name))
  184. dbGetQuery(conn, query)
  185. }
  186. )