Access and Query Amazon Athena via DBI/JDBC
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

jdbc.r 4.6KB


  1. structure(
  2. 0:6,
  3. .Names = c(
  4. "OFF", "FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"
  5. )
  6. )-> .ll_trans
  7. #' AthenaJDBC
  8. #'
  9. #' @export
  10. setClass(
  11. "AthenaDriver",
  12. representation(
  13. "JDBCDriver",
  14. identifier.quote = "character",
  15. jdrv = "jobjRef"
  16. )
  17. )
  18. #' AthenaJDBC
  19. #'
  20. #' @export
  21. Athena <- function(identifier.quote = '`') {
  22. JDBC(
  23. driverClass = "com.simba.athena.jdbc.Driver",
  24. system.file("java", "AthenaJDBC42_2.0.6.jar", package = "metis.lite"),
  25. identifier.quote = identifier.quote
  26. ) -> drv
  27. return(as(drv, "AthenaDriver"))
  28. }
  29. #' AthenaJDBC
  30. #'
  31. #' @param provider JDBC auth provider (ideally leave default)
  32. #' @param region AWS region the Athena tables are in
  33. #' @param s3_staging_dir A write-able bucket on S3 that you have permissions for
  34. #' @param schema_name LOL if only this actually worked with Amazon's hacked Presto driver
  35. #' @param max_error_retries,connection_timeout,socket_timeout
  36. #' technical connection info that you should only muck with if you know what you're doing.
  37. #' @param log_path,log_level The Athena JDBC driver can (shockingly) provide a decent bit
  38. #' of data in logs. Set this to a temporary directory or something log4j can use. For
  39. #' `log_level` use the names ("INFO", "DEBUG", "WARN", "ERROR", "ALL", "OFF", "FATAL", "TRACE") or
  40. #' their corresponding integer values 0-6.
  41. #' @param ... unused
  42. #' @references <https://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html>
  43. #' @export
  44. setMethod(
  45. "dbConnect",
  46. "AthenaDriver",
  47. def = function(
  48. drv,
  49. provider = "com.simba.athena.amazonaws.auth.DefaultAWSCredentialsProviderChain",
  50. region = "us-east-1",
  51. s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"),
  52. schema_name = "default",
  53. max_error_retries = 10,
  54. connection_timeout = 10000,
  55. socket_timeout = 10000,
  56. log_path = "",
  57. log_level = 0,
  58. ...) {
  59. conn_string = sprintf(
  60. 'jdbc:awsathena://athena.%s.amazonaws.com:443/%s', region, schema_name
  61. )
  62. if (!(log_level %in% 0:6)) log_level <- .ll_trans[log_level]
  63. callNextMethod(
  64. drv,
  65. conn_string,
  66. S3OutputLocation = s3_staging_dir,
  67. Schema = schema_name,
  68. MaxErrorRetry = max_error_retries,
  69. ConnectTimeout = connection_timeout,
  70. SocketTimeout = socket_timeout,
  71. LogPath = log_path,
  72. LogLevel = log_level,
  73. AwsCredentialsProviderClass = provider,
  74. ...
  75. ) -> jc
  76. return(as(jc, "AthenaConnection"))
  77. }
  78. )
  79. #' AthenaJDBC
  80. #'
  81. #' @export
  82. setClass("AthenaConnection", contains = "JDBCConnection")
  83. #' AthenaJDBC
  84. #'
  85. #' @export
  86. setClass("AthenaResult", contains = "JDBCResult")
  87. #' AthenaJDBC
  88. #'
  89. #' @param conn Athena connection
  90. #' @param statement SQL statement
  91. #' @param ... unused
  92. #' @export
  93. setMethod(
  94. "dbSendQuery",
  95. signature(conn="AthenaConnection", statement="character"),
  96. definition = function(conn, statement, ...) {
  97. return(as(callNextMethod(), "AthenaResult"))
  98. }
  99. )
  100. #' AthenaJDBC
  101. #'
  102. #' @param conn Athena connection
  103. #' @param pattern table name pattern
  104. #' @param schema Athena schema name
  105. #' @param ... unused
  106. #' @export
  107. setMethod(
  108. "dbListTables",
  109. signature(conn="AthenaConnection"),
  110. definition = function(conn, pattern='*', schema, ...) {
  111. if (missing(pattern)) {
  112. dbGetQuery(
  113. conn, sprintf("SHOW TABLES IN %s", schema)
  114. ) -> x
  115. } else {
  116. dbGetQuery(
  117. conn, sprintf("SHOW TABLES IN %s %s", schema, dbQuoteString(conn, pattern))
  118. ) -> x
  119. }
  120. x$tab_name
  121. }
  122. )
  123. #' AthenaJDBC
  124. #'
  125. #' @param conn Athena connection
  126. #' @param name table name
  127. #' @param schema Athena schema name
  128. #' @param ... unused
  129. #' @export
  130. setMethod(
  131. "dbExistsTable",
  132. signature(conn="AthenaConnection", name="character"),
  133. definition = function(conn, name, schema, ...) {
  134. length(dbListTables(conn, schema=schema, pattern=name)) > 0
  135. }
  136. )
  137. #' AthenaJDBC
  138. #'
  139. #' @param conn Athena connection
  140. #' @param name table name
  141. #' @param schema Athena schema name
  142. #' @param ... unused
  143. #' @export
  144. setMethod(
  145. "dbListFields",
  146. signature(conn="AthenaConnection", name="character"),
  147. definition = function(conn, name, schema, ...) {
  148. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, name)
  149. res <- dbGetQuery(conn, query)
  150. colnames(res)
  151. }
  152. )
  153. #' AthenaJDBC
  154. #'
  155. #' @param conn Athena connection
  156. #' @param name table name
  157. #' @param schema Athena schema name
  158. #' @param ... unused
  159. #' @export
  160. setMethod(
  161. "dbReadTable",
  162. signature(conn="AthenaConnection", name="character"),
  163. definition = function(conn, name, schema, ...) {
  164. query <- sprintf("SELECT * FROM %s.%s LIMIT 1", schema, dbQuoteString(conn, name))
  165. dbGetQuery(conn, query)
  166. }
  167. )