Browse Source

cloudyr, more docs, cleanup

master
boB Rudis 3 years ago
parent
commit
b249779c43
No known key found for this signature in database GPG Key ID: 2A514A4997464560
11 changed files with 228 additions and 104 deletions
  1. +1
    -1
      DESCRIPTION
  2. +4
    -1
      NAMESPACE
  3. +3
    -1
      NEWS.md
  4. +45
    -20
      R/jdbc.r
  5. +17
    -1
      R/metis-package.R
  6. +49
    -36
      R/metis.r
  7. +18
    -15
      README.Rmd
  8. +39
    -13
      README.md
  9. +35
    -15
      man/athena_connect.Rd
  10. +5
    -1
      man/dbConnect-AthenaDriver-method.Rd
  11. +12
    -0
      man/use_credentials.Rd

+ 1
- 1
DESCRIPTION View File

@@ -18,5 +18,5 @@ Depends:
Imports:
DBI,
dplyr,
ini
aws.signature
RoxygenNote: 6.0.1

+ 4
- 1
NAMESPACE View File

@@ -2,6 +2,8 @@

export(Athena)
export(athena_connect)
export(read_credentials)
export(use_credentials)
exportClasses(AthenaConnection)
exportClasses(AthenaDriver)
exportClasses(AthenaResult)
@@ -11,4 +13,5 @@ exportMethods(dbSendQuery)
import(DBI)
import(RJDBC)
import(dplyr)
import(ini)
importFrom(aws.signature,read_credentials)
importFrom(aws.signature,use_credentials)

+ 3
- 1
NEWS.md View File

@@ -1,2 +1,4 @@
0.1.0
* Initial release
=========

- Using the `cloudyr` `aws.signature` package vs DIY

+ 45
- 20
R/jdbc.r View File

@@ -24,26 +24,51 @@ setMethod(
def = function(drv,
provider = "com.amazonaws.athena.jdbc.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
region = "us-east-1",
schema_name = "default", ...) {

conn_string = sprintf('jdbc:awsathena://athena.%s.amazonaws.com:443/', region)

if (!is.null(provider)) {

jc <- callNextMethod(drv, conn_string,
s3_staging_dir=Sys.getenv("AWS_S3_STAGING_DIR"),
schema_name=schema_name,
aws_credentials_provider_class=provider, ...)

} else {

jc <- callNextMethod(drv, conn_string,
s3_staging_dir=Sys.getenv("AWS_S3_STAGING_DIR"),
schema_name=schema_name,
user = Sys.getenv("AWS_ACCESS_KEY_ID"),
password = Sys.getenv("AWS_SECRET_ACCESS_KEY"))

}
s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"),
schema_name = "default",
max_error_retries = 10,
connection_timeout = 10000,
socket_timeout = 10000,
retry_base_delay = 100,
retry_max_backoff_time = 1000,
log_path,
log_level,
...) {

conn_string = sprintf('jdbc:awsathena://athena.%s.amazonaws.com:443/%s', region, schema_name)

# if (!is.null(provider)) {

jc <- callNextMethod(drv, conn_string,
s3_staging_dir = s3_staging_dir,
schema_name = schema_name,
max_error_retries = max_error_retries,
connection_timeout = connection_timeout,
socket_timeout = socket_timeout,
retry_base_delay = retry_base_delay,
retry_max_backoff_time = retry_max_backoff_time,
log_path = log_path,
log_level = log_level,
aws_credentials_provider_class = provider,
...)

# } else {
#
# jc <- callNextMethod(drv, conn_string,
# schema_name = schema_name,
# max_error_retries = max_error_retries,
# connection_timeout = connection_timeout,
# socket_timeout = socket_timeout,
# retry_base_delay = retry_base_delay,
# retry_max_backoff_time = retry_max_backoff_time,
# s3_staging_dir = s3_staging_dir,
# schema_name = schema_name,
# log_path = log_path,
# log_level = log_level,
# user = Sys.getenv("AWS_ACCESS_KEY_ID"),
# password = Sys.getenv("AWS_SECRET_ACCESS_KEY"))
#
# }

return(as(jc, "AthenaConnection"))



+ 17
- 1
R/metis-package.R View File

@@ -8,5 +8,21 @@
#' @import RJDBC
#' @import DBI
#' @import dplyr
#' @import ini
#' @importFrom aws.signature use_credentials read_credentials
NULL


#' Use Credentials from .aws/credentials File
#'
#' @md
#' @references [aws.signature::use_credentials()] / [aws.signature::read_credentials()]
#' @name use_credentials
#' @rdname use_credentials
#' @inheritParams aws.signature::use_credentials
#' @export
NULL

#' @name read_credentials
#' @rdname use_credentials
#' @export
NULL

+ 49
- 36
R/metis.r View File

@@ -2,50 +2,63 @@
#'
#' Handles the up-front JDBC config
#'
#' For all connection types it is expected that you have the following environment variables
#' defined (a good place is `~/.Renviron`):
#' @md
#' @param default_schema default schema (you'll still need to fully qualify non-default schema table names)
#' @param region AWS region (Ref: <http://docs.aws.amazon.com/general/latest/gr/rande.html#athena>)
#' @param s3_staging_dir the Amazon S3 location to which your query output is written. The JDBC driver then asks Athena to read the results and provide rows of data back to the user.
#' @param max_error_retries the maximum number of retries that the JDBC client attempts to make a request to Athena.
#' @param connection_timeout the maximum amount of time, in milliseconds, to make a successful connection to Athena before an attempt is terminated.
#' @param socket_timeout the maximum amount of time, in milliseconds, to wait for a socket in order to send data to Athena.
#' @param retry_base_delay minimum delay amount, in milliseconds, between retrying attempts to connect Athena.
#' @param retry_max_backoff_time maximum delay amount, in milliseconds, between retrying attempts to connect Athena.
#' @param log_path local path of the Athena JDBC driver logs. If no log path is provided, then no log files are created.
#' @param log_level log level of the Athena JDBC driver logs.
#' @export
#' @examples \dontrun{
#' use_credentials("personal")
#'
#' - `AWS_S3_STAGING_DIR`: the name of the S3 bucket where Athena can write stuff
#' - `AWS_PROFILE`: the AWS profile ID in `~/.aws/credentials` (defaults to `default` if not present)
#' ath <- athena_connect(default_schema = "sampledb",
#' s3_staging_dir = "s3://accessible-bucket",
#' log_path = "/tmp/athena.log",
#' log_level = "DEBUG")
#'
#' For `simple` == `FALSE` the expectation is that you're working with a managed
#' `~/.aws/credentials` file.
#' dbListTables(ath)
#'
#' There's a high likelihood of params changing in the near term as I work this out, but I'm
#' not very keen on parameter-izing things like id/secret.
#' dbGetQuery(ath, "SELECT * FROM sampledb.elb_logs LIMIT 1")
#'
#' @md
#' @param default_schema def schema
#' @param region AWS region (Ref: <http://docs.aws.amazon.com/general/latest/gr/rande.html#athena>)
#' @param simple pickup id/secret only or use temp token? (this will become more robust)
#' @export
athena_connect <- function(default_schema = "default", region = "us-east-1", simple=FALSE) {
#' }
athena_connect <- function(default_schema = "default",
region = c("us-east-1", "us-east-2", "us-west-2"),
s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"),
max_error_retries = 10,
connection_timeout = 10000,
socket_timeout = 10000,
retry_base_delay = 100,
retry_max_backoff_time = 1000,
log_path = "",
log_level = c("INFO", "DEBUG", "WARN", "ERROR", "ALL", "OFF", "FATAL", "TRACE")) {

athena_jdbc <- Athena()

aws_config <- ini::read.ini(path.expand("~/.aws/credentials"))
aws_profile <- aws_config[Sys.getenv("AWS_PROFILE", "default")][[1]]

Sys.unsetenv("AWS_ACCESS_KEY_ID")
Sys.unsetenv("AWS_SECRET_ACCESS_KEY")

Sys.setenv(AWS_ACCESS_KEY_ID = aws_profile$aws_access_key_id)
Sys.setenv(AWS_SECRET_ACCESS_KEY = aws_profile$aws_secret_access_key)

con <- NULL

if (!simple) {

Sys.unsetenv("AWS_SESSION_TOKEN")
Sys.setenv(AWS_SESSION_TOKEN = aws_profile$aws_session_token)

con <- dbConnect(athena_jdbc, schema_name = default_schema, region = region)

} else {

con <- dbConnect(athena_jdbc, provider = NULL, schema_name = default_schema, region = region)
region <- match.arg(region, c("us-east-1", "us-east-2", "us-west-2"))
log_level <- match.arg(log_level, c("INFO", "DEBUG", "WARN", "ERROR", "ALL", "OFF", "FATAL", "TRACE"))

}
# if (!simple) {
con <- dbConnect(athena_jdbc,
schema_name = default_schema,
region = region,
s3_staging_dir = s3_staging_dir,
max_error_retries = max_error_retries,
connection_timeout = connection_timeout,
socket_timeout = socket_timeout,
retry_base_delay = retry_base_delay,
retry_max_backoff_time = retry_max_backoff_time,
log_path = log_path,
log_level = log_level)
# } else {
# con <- dbConnect(athena_jdbc, provider = NULL, schema_name = default_schema, region = region,
# s3_staging_dir = s3_staging_dir, log_path = log_path, log_level = log_level)
# }

con



+ 18
- 15
README.Rmd View File

@@ -11,17 +11,19 @@ Including a lightweight RJDBC shim.
THIS IS SUPER ALPHA QUALITY. NOTHING TO SEE HERE. MOVE ALONG.

The goal will be to get around enough of the "gotchas" that are preventing raw RJDBC Athena
connecitons from "just working" with `dplyr` v0.6.0+ and also get around the [`fetchSize` problem](https://www.reddit.com/r/aws/comments/6aq22b/fetchsize_limit/) without having to not use `dbGetQuery()`.
connections from "just working" with `dplyr` v0.6.0+ and also get around the [`fetchSize` problem](https://www.reddit.com/r/aws/comments/6aq22b/fetchsize_limit/) without having to not use `dbGetQuery()`.

It will also support more than the vanilla id/secret auth mechism (it currently support the default basic auth and temp token auth, the latter via environment variables).

This package includes the `AthenaJDBC41-1.0.1.jar` JAR file out of convenience but that will likely move to a separate package as this gets closer to prime time.
This package includes the `AthenaJDBC41-1.1.0.jar` JAR file out of convenience but that will likely move to a separate package as this gets closer to prime time.

See the **Usage** section for an example.

The following functions are implemented:

- `athena_connect`: Make a JDBC connection to Athena (this returns an `AthenaConnection` object which is a super-class of it's RJDBC vanilla counterpart)
- `read_credentials`: Use Credentials from .aws/credentials File
- `use_credentials`: Use Credentials from .aws/credentials File
- `Athena`: AthenaJDBC`
- `AthenaConnection-class`: AthenaJDBC
- `AthenaDriver-class`: AthenaJDBC
@@ -44,22 +46,23 @@ options(width=120)

```{r message=FALSE, warning=FALSE, error=FALSE}
library(metis)
library(dplyr)
library(tidyverse)

# current verison
packageVersion("metis")
```

```{r message=FALSE, warning=FALSE, error=FALSE, eval=FALSE}
ath <- athena_connect("your_schema_name")

res <- dbGetQuery(ath, "
SELECT format_datetime(timestamp, 'yyyy-MM-dd HH:00:00') timestamp,
port as field, count(port) cnt_field FROM your_schema_name.your_table_name
WHERE CONTAINS(ARRAY['201705'], date)
AND port IN (445, 139, 3389)
AND timestamp > date '2017-05-01'
AND timestamp <= date '2017-05-22'
GROUP BY format_datetime(timestamp, 'yyyy-MM-dd HH:00:00'), port LIMIT 1000000
")
```{r message=FALSE, warning=FALSE, error=FALSE}
use_credentials("personal")

ath <- athena_connect(default_schema = "sampledb",
s3_staging_dir = "s3://accessible-bucket",
log_path = "/tmp/athena.log",
log_level = "DEBUG")

dbListTables(ath)

dbGetQuery(ath, "SELECT * FROM sampledb.elb_logs LIMIT 10") %>%
type_convert() %>%
glimpse()
```

+ 39
- 13
README.md View File

@@ -11,13 +11,15 @@ The goal will be to get around enough of the "gotchas" that are preventing raw R

It will also support more than the vanilla id/secret auth mechism (it currently support the default basic auth and temp token auth, the latter via environment variables).

This package includes the `AthenaJDBC41-1.0.1.jar` JAR file out of convenience but that will likely move to a separate package as this gets closer to prime time.
This package includes the `AthenaJDBC41-1.1.0.jar` JAR file out of convenience but that will likely move to a separate package as this gets closer to prime time.

See the **Usage** section for an example.

The following functions are implemented:

- `athena_connect`: Make a JDBC connection to Athena (this returns an `AthenaConnection` object which is a super-class of it's RJDBC vanilla counterpart)
- `read_credentials`: Use Credentials from .aws/credentials File
- `use_credentials`: Use Credentials from .aws/credentials File
- `Athena`: AthenaJDBC\`
- `AthenaConnection-class`: AthenaJDBC
- `AthenaDriver-class`: AthenaJDBC
@@ -36,7 +38,7 @@ devtools::install_github("hrbrmstr/metis")

``` r
library(metis)
library(dplyr)
library(tidyverse)

# current verison
packageVersion("metis")
@@ -45,15 +47,39 @@ packageVersion("metis")
## [1] '0.1.0'

``` r
ath <- athena_connect("your_schema_name")

res <- dbGetQuery(ath, "
SELECT format_datetime(timestamp, 'yyyy-MM-dd HH:00:00') timestamp,
port as field, count(port) cnt_field FROM your_schema_name.your_table_name
WHERE CONTAINS(ARRAY['201705'], date)
AND port IN (445, 139, 3389)
AND timestamp > date '2017-05-01'
AND timestamp <= date '2017-05-22'
GROUP BY format_datetime(timestamp, 'yyyy-MM-dd HH:00:00'), port LIMIT 1000000
")
use_credentials("personal")

ath <- athena_connect(default_schema = "sampledb",
s3_staging_dir = "s3://accessible-bucket",
log_path = "/tmp/athena.log",
log_level = "DEBUG")

dbListTables(ath)
```

## [1] "elb_logs"

``` r
dbGetQuery(ath, "SELECT * FROM sampledb.elb_logs LIMIT 10") %>%
type_convert() %>%
glimpse()
```

## Observations: 10
## Variables: 16
## $ timestamp <dttm> 2014-09-26 23:00:27, 2014-09-26 23:00:40, 2014-09-26 23:00:57, 2014-09-26 23:01:10, ...
## $ elbname <chr> "lb-demo", "lb-demo", "lb-demo", "lb-demo", "lb-demo", "lb-demo", "lb-demo", "lb-demo...
## $ requestip <chr> "250.244.91.129", "245.151.134.252", "250.144.139.199", "242.201.196.167", "253.206.1...
## $ requestport <dbl> 30339, 30339, 30339, 30339, 30339, 30339, 30339, 30339, 30339, 30339
## $ backendip <chr> "249.70.230.218", "240.86.17.179", "240.204.230.249", "254.88.26.13", "245.227.117.51...
## $ backendport <dbl> 8000, 80, 8888, 8899, 8888, 8888, 8888, 8888, 8888, 8888
## $ requestprocessingtime <dbl> 0.000082, 0.000093, 0.000094, 0.000101, 0.000092, 0.000091, 0.000093, 0.000089, 0.000...
## $ backendprocessingtime <dbl> 0.047690, 0.047722, 0.039022, 0.046465, 0.046841, 0.042139, 0.040092, 0.048087, 0.039...
## $ clientresponsetime <dbl> 7.2e-05, 5.1e-05, 6.4e-05, 6.4e-05, 4.9e-05, 4.7e-05, 5.1e-05, 7.4e-05, 5.4e-05, 5.4e-05
## $ elbresponsecode <int> 200, 200, 200, 200, 200, 200, 200, 200, 200, 200
## $ backendresponsecode <int> 200, 200, 404, 400, 500, 200, 200, 200, 200, 200
## $ receivedbytes <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
## $ sentbytes <dbl> 2, 2, 2, 2, 2, 2, 2, 2, 2, 2
## $ requestverb <chr> "GET", "GET", "GET", "GET", "GET", "GET", "GET", "GET", "GET", "GET"
## $ url <chr> "http://www.abcxyz.com:80/jobbrowser/?format=json&state=running&user=15llx5s", "http:...
## $ protocol <chr> "HTTP/1.1", "HTTP/1.1", "HTTP/1.1", "HTTP/1.1", "HTTP/1.1", "HTTP/1.1", "HTTP/1.1", "...

+ 35
- 15
man/athena_connect.Rd View File

@@ -4,30 +4,50 @@
\alias{athena_connect}
\title{Make a JDBC connection to Athena}
\usage{
athena_connect(default_schema = "default", region = "us-east-1",
simple = FALSE)
athena_connect(default_schema = "default", region = c("us-east-1",
"us-east-2", "us-west-2"),
s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"), max_error_retries = 10,
connection_timeout = 10000, socket_timeout = 10000,
retry_base_delay = 100, retry_max_backoff_time = 1000, log_path = "",
log_level = c("INFO", "DEBUG", "WARN", "ERROR", "ALL", "OFF", "FATAL",
"TRACE"))
}
\arguments{
\item{default_schema}{def schema}
\item{default_schema}{default schema (you'll still need to fully qualify non-default schema table names)}

\item{region}{AWS region (Ref: \url{http://docs.aws.amazon.com/general/latest/gr/rande.html#athena})}

\item{simple}{pickup id/secret only or use temp token? (this will become more robust)}
\item{s3_staging_dir}{the Amazon S3 location to which your query output is written. The JDBC driver then asks Athena to read the results and provide rows of data back to the user.}

\item{max_error_retries}{the maximum number of retries that the JDBC client attempts to make a request to Athena.}

\item{connection_timeout}{the maximum amount of time, in milliseconds, to make a successful connection to Athena before an attempt is terminated.}

\item{socket_timeout}{the maximum amount of time, in milliseconds, to wait for a socket in order to send data to Athena.}

\item{retry_base_delay}{minimum delay amount, in milliseconds, between retrying attempts to connect Athena.}

\item{retry_max_backoff_time}{maximum delay amount, in milliseconds, between retrying attempts to connect Athena.}

\item{log_path}{local path of the Athena JDBC driver logs. If no log path is provided, then no log files are created.}

\item{log_level}{log level of the Athena JDBC driver logs.}
}
\description{
Handles the up-front JDBC config
}
\details{
For all connection types it is expected that you have the following environment variables
defined (a good place is \code{~/.Renviron}):
\itemize{
\item \code{AWS_S3_STAGING_DIR}: the name of the S3 bucket where Athena can write stuff
\item \code{AWS_PROFILE}: the AWS profile ID in \code{~/.aws/credentials} (defaults to \code{default} if not present)
}
\examples{
\dontrun{
use_credentials("personal")

ath <- athena_connect(default_schema = "sampledb",
s3_staging_dir = "s3://accessible-bucket",
log_path = "/tmp/athena.log",
log_level = "DEBUG")

For \code{simple} == \code{FALSE} the expectation is that you're working with a managed
\code{~/.aws/credentials} file.
dbListTables(ath)

There's a high likelihood of params changing in the near term as I work this out, but I'm
not very keen on parameter-izing things like id/secret.
dbGetQuery(ath, "SELECT * FROM sampledb.elb_logs LIMIT 1")

}
}

+ 5
- 1
man/dbConnect-AthenaDriver-method.Rd View File

@@ -7,7 +7,11 @@
\usage{
\S4method{dbConnect}{AthenaDriver}(drv,
provider = "com.amazonaws.athena.jdbc.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
region = "us-east-1", schema_name = "default", ...)
region = "us-east-1", s3_staging_dir = Sys.getenv("AWS_S3_STAGING_DIR"),
schema_name = "default", max_error_retries = 10,
connection_timeout = 10000, socket_timeout = 10000,
retry_base_delay = 100, retry_max_backoff_time = 1000, log_path,
log_level, ...)
}
\description{
AthenaJDBC


+ 12
- 0
man/use_credentials.Rd View File

@@ -0,0 +1,12 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/metis-package.R
\name{use_credentials}
\alias{use_credentials}
\alias{read_credentials}
\title{Use Credentials from .aws/credentials File}
\description{
Use Credentials from .aws/credentials File
}
\references{
\code{\link[aws.signature:use_credentials]{aws.signature::use_credentials()}} / \code{\link[aws.signature:read_credentials]{aws.signature::read_credentials()}}
}

Loading…
Cancel
Save