Access and Query Amazon Athena via DBI/JDBC
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

224 rindas
5.4KB

  1. structure(
  2. 0:6,
  3. .Names = c(
  4. "OFF", "FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"
  5. )
  6. )-> .ll_trans
  7. #' AthenaJDBC
  8. #'
  9. #' @export
  10. setClass(
  11. "AthenaDriver",
  12. representation(
  13. "JDBCDriver",
  14. identifier.quote = "character",
  15. jdrv = "jobjRef"
  16. )
  17. )
  18. #' AthenaJDBC
  19. #'
  20. #' @export
  21. Athena <- function(identifier.quote = '`') {
  22. JDBC(
  23. driverClass = "com.simba.athena.jdbc.Driver",
  24. system.file("java", "AthenaJDBC42_2.0.6.jar", package = "metis.lite"),
  25. identifier.quote = identifier.quote
  26. ) -> drv
  27. return(as(drv, "AthenaDriver"))
  28. }
  29. #' AthenaJDBC
  30. #'
  31. #' Connect to Athena
  32. #'
  33. #' @section Driver Configuration Options:
  34. #'
  35. #' - `BinaryColumnLength`: <int> The maximum data length for `BINARY` columns. Default `32767L`
  36. #' - `ComplexTypeColumnLength`: <int> The maximum data length for `ARRAY`, `MAP`, and `STRUCT` columns. Default `65535L`
  37. #' - `StringColumnLength`: <int> The maximum data length for `STRING` columns. Default `255L`
  38. #'
  39. #' @param provider JDBC auth provider (ideally leave default)
  40. #' @param region AWS region the Athena tables are in
  41. #' @param s3_staging_dir A write-able bucket on S3 that you have permissions for
  42. #' @param schema_name LOL if only this actually worked with Amazon's hacked Presto driver
  43. #' @param max_error_retries,connection_timeout,socket_timeout
  44. #' technical connection info that you should only muck with if you know what you're doing.
  45. #' @param log_path,log_level The Athena JDBC driver can (shockingly) provide a decent bit
  46. #' of data in logs. Set this to a temporary directory or something log4j can use. For
  47. #' `log_level` use the names ("INFO", "DEBUG", "WARN", "ERROR", "ALL", "OFF", "FATAL", "TRACE") or
  48. #' their corresponding integer values 0-6.
  49. #' @param ... passed on to the driver. See Details.
  50. #' @references [Connect with JDBC](https://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html);
  51. #' [Simba Athena JDBC Driver with SQL Connector Installation and Configuration Guide](https://s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC_2.0.6/docs/Simba+Athena+JDBC+Driver+Install+and+Configuration+Guide.pdf)
  52. #' @export
  53. setMethod(
  54. "dbConnect",
  55. "AthenaDriver",
  56. def = function(
  57. drv,
  58. provider = "com.simba.athena.amazonaws.auth.DefaultAWSCredentialsProviderChain",
  59. region = "us-east-1",
  60. s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"),
  61. schema_name = "default",
  62. fetch_size = 1000L,
  63. max_error_retries = 10,
  64. connection_timeout = 10000,
  65. socket_timeout = 10000,
  66. log_path = "",
  67. log_level = 0,
  68. ...) {
  69. conn_string = sprintf(
  70. 'jdbc:awsathena://athena.%s.amazonaws.com:443/%s', region, schema_name
  71. )
  72. if (!(log_level %in% 0:6)) log_level <- .ll_trans[log_level]
  73. callNextMethod(
  74. drv,
  75. conn_string,
  76. S3OutputLocation = s3_staging_dir,
  77. Schema = schema_name,
  78. MaxErrorRetry = max_error_retries,
  79. ConnectTimeout = connection_timeout,
  80. SocketTimeout = socket_timeout,
  81. LogPath = log_path,
  82. LogLevel = log_level,
  83. AwsCredentialsProviderClass = provider,
  84. ...
  85. ) -> jc
  86. jc <- as(jc, "AthenaConnection")
  87. jc@fetch_size <- as.integer(fetch_size)
  88. return(jc)
  89. }
  90. )
  91. #' AthenaJDBC
  92. #'
  93. #' @export
  94. setClass("AthenaConnection", representation("JDBCConnection", jc="jobjRef", identifier.quote="character", fetch_size="integer"))
  95. # setClass("AthenaConnection", contains = "JDBCConnection")
  96. #' AthenaJDBC
  97. #'
  98. #' @export
  99. setClass("AthenaResult", contains = "JDBCResult")
  100. #' AthenaJDBC
  101. #'
  102. #' @param conn Athena connection
  103. #' @param statement SQL statement
  104. #' @param ... unused
  105. #' @export
  106. setMethod(
  107. "dbSendQuery",
  108. signature(conn="AthenaConnection", statement="character"),
  109. definition = function(conn, statement, ...) {
  110. return(as(callNextMethod(), "AthenaResult"))
  111. }
  112. )
  113. #' AthenaJDBC
  114. #'
  115. #' @param conn Athena connection
  116. #' @param pattern table name pattern
  117. #' @param schema Athena schema name
  118. #' @param ... unused
  119. #' @export
  120. setMethod(
  121. "dbListTables",
  122. signature(conn="AthenaConnection"),
  123. definition = function(conn, pattern='*', schema, ...) {
  124. if (missing(pattern)) {
  125. dbGetQuery(
  126. conn, sprintf("SHOW TABLES IN %s", schema)
  127. ) -> x
  128. } else {
  129. dbGetQuery(
  130. conn, sprintf("SHOW TABLES IN %s %s", schema, dbQuoteString(conn, pattern))
  131. ) -> x
  132. }
  133. x$tab_name
  134. }
  135. )
  136. #' AthenaJDBC
  137. #'
  138. #' @param conn Athena connection
  139. #' @param name table name
  140. #' @param schema Athena schema name
  141. #' @param ... unused
  142. #' @export
  143. setMethod(
  144. "dbExistsTable",
  145. signature(conn="AthenaConnection", name="character"),
  146. definition = function(conn, name, schema, ...) {
  147. length(dbListTables(conn, schema=schema, pattern=name)) > 0
  148. }
  149. )
  150. #' AthenaJDBC
  151. #'
  152. #' @param conn Athena connection
  153. #' @param name table name
  154. #' @param schema Athena schema name
  155. #' @param ... unused
  156. #' @export
  157. setMethod(
  158. "dbListFields",
  159. signature(conn="AthenaConnection", name="character"),
  160. definition = function(conn, name, schema, ...) {
  161. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, name)
  162. res <- dbGetQuery(conn, query)
  163. colnames(res)
  164. }
  165. )
  166. #' AthenaJDBC
  167. #'
  168. #' @param conn Athena connection
  169. #' @param name table name
  170. #' @param schema Athena schema name
  171. #' @param ... unused
  172. #' @export
  173. setMethod(
  174. "dbReadTable",
  175. signature(conn="AthenaConnection", name="character"),
  176. definition = function(conn, name, schema, ...) {
  177. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, dbQuoteString(conn, name))
  178. dbGetQuery(conn, query)
  179. }
  180. )