Amazon Athena JDBC Driver Wrapper Supporting the 'metis' Package
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

92 рядки
2.1KB

  1. #' AthenaJDBC
  2. #'
  3. #' @export
  4. setClass("AthenaDriver", representation("JDBCDriver", identifier.quote="character", jdrv="jobjRef"))
  5. #' AthenaJDBC
  6. #'
  7. #' @export
  8. Athena <- function(identifier.quote='`') {
  9. drv <- JDBC(driverClass="com.amazonaws.athena.jdbc.AthenaDriver",
  10. system.file("AthenaJDBC41-1.0.1.jar", package="metis"),
  11. identifier.quote="'")
  12. return(as(drv, "AthenaDriver"))
  13. }
  14. #' AthenaJDBC
  15. #'
  16. #' @export
  17. setMethod(
  18. "dbConnect",
  19. "AthenaDriver",
  20. def = function(drv,
  21. provider = "com.amazonaws.athena.jdbc.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
  22. conn_string = 'jdbc:awsathena://athena.us-east-1.amazonaws.com:443/',
  23. schema_name, ...) {
  24. if (!is.null(provider)) {
  25. jc <- callNextMethod(drv, conn_string,
  26. s3_staging_dir=Sys.getenv("AWS_S3_STAGING_DIR"),
  27. schema_name=schema_name,
  28. aws_credentials_provider_class=provider, ...)
  29. } else {
  30. jc <- callNextMethod(drv,
  31. 'jdbc:awsathena://athena.us-east-1.amazonaws.com:443/',
  32. s3_staging_dir=Sys.getenv("AWS_S3_STAGING_DIR"),
  33. schema_name=schema_name,
  34. user = Sys.getenv("AWS_ACCESS_KEY_ID"),
  35. password = Sys.getenv("AWS_SECRET_ACCESS_KEY"))
  36. }
  37. return(as(jc, "AthenaConnection"))
  38. }
  39. )
  40. #' AthenaJDBC
  41. #'
  42. #' @export
  43. setClass("AthenaConnection", contains = "JDBCConnection")
  44. #' AthenaJDBC
  45. #'
  46. #' @export
  47. setClass("AthenaResult", contains = "JDBCResult")
  48. #' AthenaJDBC
  49. #'
  50. #' @export
  51. setMethod(
  52. "dbSendQuery",
  53. "AthenaDriver",
  54. def = function(conn, statement, ...) {
  55. return(as(callNextMethod(), "AthenaResult"))
  56. }
  57. )
  58. #' AthenaJDBC
  59. #'
  60. #' @export
  61. setMethod(
  62. "dbGetQuery",
  63. signature(conn="AthenaConnection", statement="character"),
  64. def = function(conn, statement, ...) {
  65. r <- dbSendQuery(conn, statement, ...)
  66. on.exit(.jcall(r@stat, "V", "close"))
  67. dplyr::tbl_df(fetch(r, -1, block=256))
  68. }
  69. )