Access and Query Amazon Athena via DBI/JDBC
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

129 rindas
3.3KB

  1. list(
  2. "-7" = as.logical, # BIT
  3. "-6" = as.integer, # TINYINT
  4. "-5" = bit64::as.integer64, # BIGINT
  5. "-4" = as.character, # LONGVARBINARY
  6. "-3" = as.character, # VARBINARY
  7. "-2" = as.character, # BINARY
  8. "-1" = as.character, # LONGVARCHAR
  9. "0" = as.character, # NULL
  10. "1" = as.character, # CHAR
  11. "2" = as.double, # NUMERIC
  12. "3" = as.double, # DECIMAL
  13. "4" = as.integer, # INTEGER
  14. "5" = as.integer, # SMALLINT
  15. "6" = as.double, # FLOAT
  16. "7" = as.double, # REAL
  17. "8" = as.double, # DOUBLE
  18. "12" = as.character, # VARCHAR
  19. "16" = as_logical, # BOOLEAN
  20. "91" = as_date, # DATE
  21. "92" = as.character, # TIME
  22. "93" = as_posixct, # TIMESTAMP
  23. "2003" = as.character, # ARRAY
  24. "1111" = as.character # OTHER
  25. ) -> .jdbc_converters
  26. #' Retrieve connection/driver/database metadata
  27. #'
  28. #' @param dbObj driver/connection
  29. #' @param ... unused
  30. #' @export
  31. #' @keywords internal
  32. setMethod("dbGetInfo", "AthenaDriver", def=function(dbObj, ...)
  33. list(
  34. name = "AthenaJDBC",
  35. driver_version = metis.jars::simba_driver_version(),
  36. package_version = utils::packageVersion("metis.jars")
  37. )
  38. )
  39. #' Retrieve connection/driver//database metadata
  40. #'
  41. #' @param dbObj driver/connection
  42. #' @param ... unused
  43. #' @export
  44. #' @keywords internal
  45. setMethod("dbGetInfo", "AthenaConnection", def=function(dbObj, ...)
  46. list(
  47. name = "AthenaJDBC",
  48. driver_version = list.files(system.file("java", package="metis.lite"), "jar$")[1],
  49. package_version = utils::packageVersion("metis")
  50. )
  51. )
  52. #' Fetch records from a previously executed query
  53. #'
  54. #' Fetch the next `n` elements (rows) from the result set and return them
  55. #' as a data.frame.
  56. #'
  57. #' @param res An object inheriting from [DBIResult-class], created by
  58. #' [dbSendQuery()].
  59. #' @param n maximum number of records to retrieve per fetch. Use `n = -1`
  60. #' or `n = Inf`
  61. #' to retrieve all pending records. Some implementations may recognize other
  62. #' special values.
  63. #' @param block clock size
  64. #' @param ... Other arguments passed on to methods.
  65. #' @export
  66. setMethod(
  67. "fetch",
  68. signature(res="AthenaResult", n="numeric"),
  69. def = function(res, n, block = 1000L, ...) {
  70. nms <- c()
  71. athena_type_convert <- list()
  72. cols <- .jcall(res@md, "I", "getColumnCount")
  73. for (i in 1:cols) {
  74. ct <- as.character(.jcall(res@md, "I", "getColumnType", i))
  75. athena_type_convert[[i]] <- .jdbc_converters[[ct]]
  76. nms <- c(nms, .jcall(res@md, "S", "getColumnLabel", i))
  77. # message(ct, "|", tail(nms, 1))
  78. }
  79. athena_type_convert <- set_names(athena_type_convert, nms)
  80. out <- callNextMethod(res = res, n = n, block = block, ...)
  81. # print(str(out))
  82. for (nm in names(athena_type_convert)) {
  83. f <- athena_type_convert[[nm]]
  84. if (length(f) == 0) f <- as.character # catchall in case AMZN is tricksy
  85. out[[nm]] <- f(out[[nm]])
  86. }
  87. out
  88. }
  89. )
  90. #' AthenaJDBC
  91. #'
  92. #' @param conn Athena connection
  93. #' @param statement SQL statement
  94. #' @param ... unused
  95. #' @importFrom rJava .jcall
  96. #' @export
  97. setMethod(
  98. "dbGetQuery",
  99. signature(conn="AthenaConnection", statement="character"),
  100. definition = function(conn, statement, ...) {
  101. r <- dbSendQuery(conn, statement, ...)
  102. on.exit(.jcall(r@stat, "V", "close"))
  103. res <- fetch(r, -1, block = conn@fetch_size)
  104. class(res) <- c("tbl_df", "tbl", "data.frame")
  105. res
  106. }
  107. )