Access and Query Amazon Athena via DBI/JDBC
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

246 linhas
6.3KB

  1. structure(
  2. 0:6,
  3. .Names = c(
  4. "OFF", "FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"
  5. )
  6. )-> .ll_trans
  7. #' AthenaJDBC
  8. #'
  9. #' @export
  10. setClass(
  11. "AthenaDriver",
  12. representation(
  13. "JDBCDriver",
  14. identifier.quote = "character",
  15. jdrv = "jobjRef"
  16. )
  17. )
  18. #' AthenaJDBC
  19. #'
  20. #' @param identifier.quote how to quote identifiers
  21. #' @export
  22. Athena <- function(identifier.quote = '`') {
  23. JDBC(
  24. driverClass = "com.simba.athena.jdbc.Driver",
  25. metis.jars::metis_jar_path(),
  26. identifier.quote = identifier.quote
  27. ) -> drv
  28. return(as(drv, "AthenaDriver"))
  29. }
  30. #' AthenaJDBC
  31. #'
  32. #' Connect to Athenadb
  33. #'
  34. #' Mandatory JDBC connection parameters are also named function
  35. #' parameters. You can use `...` to supply additional/optional
  36. #' parameters.
  37. #'
  38. #' @section Higlighted Extra Driver Configuration Options:
  39. #'
  40. #' These are take from the second item in References. See that resource
  41. #' for more information.
  42. #'
  43. #' - `BinaryColumnLength`: <int> The maximum data length for `BINARY` columns. Default `32767L`
  44. #' - `ComplexTypeColumnLength`: <int> The maximum data length for `ARRAY`, `MAP`, and `STRUCT` columns. Default `65535L`
  45. #' - `StringColumnLength`: <int> The maximum data length for `STRING` columns. Default `255L`
  46. #'
  47. #' @param drv driver
  48. #' @param Schema The name of the database schema to use when a schema is not explicitly
  49. #' specified in a query. You can still issue queries on other schemas by explicitly
  50. #' specifying the schema in the query.
  51. #' @param AwsRegion AWS region the Athena tables are in
  52. #' @param AwsCredentialsProviderClass JDBC auth provider; You can add a
  53. #' lengrh1 character vecrtor named parameter `AwsCredentialsProviderArguments`
  54. #' to the `dbConnect()` call to use alternate auth providers. Use a
  55. #' comma-separated list of String arguments.
  56. #' @param S3OutputLocation A write-able bucket on S3 that you have permissions for
  57. #' @param MaxErrorRetry,ConnectTimeout,SocketTimeout
  58. #' technical connection info that you should only muck with if you know what you're doing.
  59. #' @param LogLevel,LogPath The Athena JDBC driver can provide a decent bit
  60. #' of data in logs. Set this to a temporary directory or something `log4j` can use. For
  61. #' `LogPath` use the names ("`INFO`", "`DEBUG`", "`WARN`", "`ERROR`", "`ALL`", "`OFF`", "`FATAL`", "`TRACE`") or
  62. #' their corresponding integer values 0-6.
  63. #' @param fetch_size Athena results fetch size
  64. #' @param ... passed on to the driver. See Details.
  65. #' @references [Connect with JDBC](https://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html);
  66. #' [Simba Athena JDBC Driver with SQL Connector Installation and Configuration Guide](https://s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC_2.0.6/docs/Simba+Athena+JDBC+Driver+Install+and+Configuration+Guide.pdf)
  67. #' @export
  68. setMethod(
  69. "dbConnect",
  70. "AthenaDriver",
  71. def = function(
  72. drv,
  73. Schema = "default",
  74. AwsRegion = "us-east-1",
  75. AwsCredentialsProviderClass = paste0(c(
  76. "com", "simba", "athena", "amazonaws","auth",
  77. "DefaultAWSCredentialsProviderChain"
  78. ), collapse = "."),
  79. S3OutputLocation = Sys.getenv("AWS_S3_STAGING_DIR", unset = ""),
  80. MaxErrorRetry = 10,
  81. ConnectTimeout = 10000,
  82. SocketTimeout = 10000,
  83. LogPath = "",
  84. LogLevel = 0,
  85. fetch_size = 1000L,
  86. ...) {
  87. conn_string = sprintf(
  88. 'jdbc:awsathena://athena.%s.amazonaws.com:443/%s', AwsRegion, Schema
  89. )
  90. if (!(LogLevel %in% 0:6)) LogLevel <- .ll_trans[LogLevel]
  91. callNextMethod(
  92. drv,
  93. conn_string,
  94. S3OutputLocation = S3OutputLocation,
  95. Schema = Schema,
  96. AwsRegion = AwsRegion,
  97. MaxErrorRetry = MaxErrorRetry,
  98. ConnectTimeout = ConnectTimeout,
  99. SocketTimeout = SocketTimeout,
  100. LogPath = LogPath,
  101. LogLevel = LogLevel,
  102. AwsCredentialsProviderClass = AwsCredentialsProviderClass,
  103. ...
  104. ) -> jc
  105. jc <- as(jc, "AthenaConnection")
  106. jc@fetch_size <- as.integer(fetch_size)
  107. return(jc)
  108. }
  109. )
  110. #' AthenaJDBC
  111. #'
  112. #' @param jc job ref
  113. #' @param identifier.quote how to quote identifiers
  114. #' @param fetch_size Athena results fetch size
  115. #' @export
  116. setClass("AthenaConnection", representation("JDBCConnection", jc="jobjRef", identifier.quote="character", fetch_size="integer"))
  117. # setClass("AthenaConnection", contains = "JDBCConnection")
  118. #' AthenaJDBC
  119. #'
  120. #' @export
  121. setClass("AthenaResult", contains = "JDBCResult")
  122. #' AthenaJDBC
  123. #'
  124. #' @param conn Athena connection
  125. #' @param statement SQL statement
  126. #' @param ... unused
  127. #' @export
  128. setMethod(
  129. "dbSendQuery",
  130. signature(conn="AthenaConnection", statement="character"),
  131. definition = function(conn, statement, ...) {
  132. return(as(callNextMethod(), "AthenaResult"))
  133. }
  134. )
  135. #' AthenaJDBC
  136. #'
  137. #' @param conn Athena connection
  138. #' @param pattern table name pattern
  139. #' @param schema Athena schema name
  140. #' @param ... unused
  141. #' @export
  142. setMethod(
  143. "dbListTables",
  144. signature(conn="AthenaConnection"),
  145. definition = function(conn, pattern='*', schema, ...) {
  146. if (missing(pattern)) {
  147. dbGetQuery(
  148. conn, sprintf("SHOW TABLES IN %s", schema)
  149. ) -> x
  150. } else {
  151. dbGetQuery(
  152. conn, sprintf("SHOW TABLES IN %s %s", schema, dbQuoteString(conn, pattern))
  153. ) -> x
  154. }
  155. x$tab_name
  156. }
  157. )
  158. #' AthenaJDBC
  159. #'
  160. #' @param conn Athena connection
  161. #' @param name table name
  162. #' @param schema Athena schema name
  163. #' @param ... unused
  164. #' @export
  165. setMethod(
  166. "dbExistsTable",
  167. signature(conn="AthenaConnection", name="character"),
  168. definition = function(conn, name, schema, ...) {
  169. length(dbListTables(conn, schema=schema, pattern=name)) > 0
  170. }
  171. )
  172. #' AthenaJDBC
  173. #'
  174. #' @param conn Athena connection
  175. #' @param name table name
  176. #' @param schema Athena schema name
  177. #' @param ... unused
  178. #' @export
  179. setMethod(
  180. "dbListFields",
  181. signature(conn="AthenaConnection", name="character"),
  182. definition = function(conn, name, schema, ...) {
  183. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, name)
  184. res <- dbGetQuery(conn, query)
  185. colnames(res)
  186. }
  187. )
  188. #' AthenaJDBC
  189. #'
  190. #' @param conn Athena connection
  191. #' @param name table name
  192. #' @param schema Athena schema name
  193. #' @param ... unused
  194. #' @export
  195. setMethod(
  196. "dbReadTable",
  197. signature(conn="AthenaConnection", name="character"),
  198. definition = function(conn, name, schema, ...) {
  199. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, dbQuoteString(conn, name))
  200. dbGetQuery(conn, query)
  201. }
  202. )