Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #299 from exoego/s3-upload
Browse files Browse the repository at this point in the history
[s3] Add s3 managed upload feature (s3.upload/s3.uploadFuture)
  • Loading branch information
exoego committed Nov 24, 2020
2 parents 3760e18 + 0fc7986 commit 0e9cd28
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ jobs:
node-version: '12'
- run: npm install
- name: Run Tests
run: sbt -jvm-opts .github/.jvmopts ++${{ matrix.scala }} core/test awsDynamoDB/test awsCloudFrontSigner/test all/test
run: sbt -jvm-opts .github/.jvmopts ++${{ matrix.scala }} core/test awsDynamoDB/test awsS3/test awsCloudFrontSigner/test all/test
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package facade.amazonaws.services.s3

import facade.amazonaws.{Error => AWSError}

import scala.concurrent.Future
import scala.scalajs.js
import scala.scalajs.js.annotation.JSImport

package object managedupload {

/** The managed uploader allows for easy and efficient uploading of buffers, blobs, or streams, using a configurable
* amount of concurrency to perform multipart uploads where possible. This abstraction also enables uploading streams
* of unknown size due to the use of multipart uploads.
*/
@js.native
@JSImport("aws-sdk/clients/s3", "ManagedUpload", "AWS.S3.ManagedUpload")
class ManagedUpload(options: ManagedUploadOptions) extends js.Object {
def abort(): Unit = js.native
def promise(): js.Promise[SendData] = js.native
def send(): Unit = js.native
def send(callback: js.Function2[AWSError, SendData, Unit]): Unit = js.native
def on(event: String, listener: js.Function1[Progress, Unit]): js.Dynamic = js.native

var maxTotalParts: Double = js.native
var minPartSize: Double = js.native
}

implicit final class ManagedUploadOps(private val instance: ManagedUpload) extends AnyVal {
def sendFuture(): Future[SendData] = instance.promise().toFuture
def onUploadProgress(handler: js.Function1[Progress, Unit]): Unit = instance.on("httpUploadProgress", handler)
}

@js.native
trait Progress extends js.Object {
var loaded: Double = js.native
var total: Double = js.native
}

object Progress {
def apply(
loaded: Double,
total: Double
): Progress = {
val _obj$ = js.Dynamic.literal(
"loaded" -> loaded.asInstanceOf[js.Any],
"total" -> total.asInstanceOf[js.Any]
)
_obj$.asInstanceOf[Progress]
}
}

@js.native
trait SendData extends js.Object {
var Location: String = js.native
var ETag: String = js.native
var Bucket: String = js.native
var Key: String = js.native
}

object SendData {
def apply(
Location: String,
ETag: String,
Bucket: String,
Key: String
): SendData = {
val _obj$ = js.Dynamic.literal(
"Location" -> Location.asInstanceOf[js.Any],
"ETag" -> ETag.asInstanceOf[js.Any],
"Bucket" -> Bucket.asInstanceOf[js.Any],
"Key" -> Key.asInstanceOf[js.Any]
)
_obj$.asInstanceOf[SendData]
}
}

@js.native
trait ManagedUploadOptions extends js.Object {
var params: js.UndefOr[PutObjectRequest] = js.native
var queueSize: js.UndefOr[Double] = js.native
var partSize: js.UndefOr[Double] = js.native
var leavePartsOnError: js.UndefOr[Boolean] = js.native
var service: js.UndefOr[S3] = js.native
var tags: js.UndefOr[js.Array[Tag]] = js.native
}

object ManagedUploadOptions {
def apply(
params: js.UndefOr[PutObjectRequest] = js.undefined,
queueSize: js.UndefOr[Double] = js.undefined,
partSize: js.UndefOr[Double] = js.undefined,
leavePartsOnError: js.UndefOr[Boolean] = js.undefined,
service: js.UndefOr[S3] = js.undefined,
tags: js.UndefOr[js.Array[Tag]] = js.undefined
): ManagedUploadOptions = {
val _obj$ = js.Dynamic.literal()
params.foreach(_v => _obj$.updateDynamic("params")(_v.asInstanceOf[js.Any]))
queueSize.foreach(_v => _obj$.updateDynamic("queueSize")(_v.asInstanceOf[js.Any]))
partSize.foreach(_v => _obj$.updateDynamic("partSize")(_v.asInstanceOf[js.Any]))
leavePartsOnError.foreach(_v => _obj$.updateDynamic("leavePartsOnError")(_v.asInstanceOf[js.Any]))
service.foreach(_v => _obj$.updateDynamic("service")(_v.asInstanceOf[js.Any]))
tags.foreach(_v => _obj$.updateDynamic("tags")(_v.asInstanceOf[js.Any]))
_obj$.asInstanceOf[ManagedUploadOptions]
}
}
}
46 changes: 40 additions & 6 deletions services/s3/src/main/scala/facade/amazonaws/services/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,34 @@ package object s3 {
loop()
}

/** Uploads an arbitrarily sized buffer, blob, or stream, using intelligent concurrent handling of parts if the payload is large enough.
* Note that this is the only operation for which the SDK can retry requests with stream bodies.
* @return The managed upload object that can call send() or track progress.
*/
def upload(params: PutObjectRequest): managedupload.ManagedUpload = service.asInstanceOf[js.Dynamic].upload(params).asInstanceOf[managedupload.ManagedUpload]

/** Uploads an arbitrarily sized buffer, blob, or stream, using intelligent concurrent handling of parts if the payload is large enough.
* You can configure the concurrent queue size by setting options.
* Note that this is the only operation for which the SDK can retry requests with stream bodies.
* @return The managed upload object that can call send() or track progress.
*/
def upload(params: PutObjectRequest, options: managedupload.ManagedUploadOptions): managedupload.ManagedUpload = service.asInstanceOf[js.Dynamic].upload(params, options).asInstanceOf[managedupload.ManagedUpload]

/** Uploads an arbitrarily sized buffer, blob, or stream, using intelligent concurrent handling of parts if the payload is large enough.
* Note that this is the only operation for which the SDK can retry requests with stream bodies.
* @return The response data from the successful upload
*/
def uploadFuture(params: PutObjectRequest): Future[managedupload.SendData] = service.upload(params).sendFuture()

/** Uploads an arbitrarily sized buffer, blob, or stream, using intelligent concurrent handling of parts if the payload is large enough.
* You can configure the concurrent queue size by setting options.
* Note that this is the only operation for which the SDK can retry requests with stream bodies.
* @return The response data from the successful upload
*/
def uploadFuture(params: PutObjectRequest, options: managedupload.ManagedUploadOptions): Future[managedupload.SendData] = service.upload(params, options).sendFuture()

}

@js.native
sealed trait Operation extends js.Any
object Operation {
Expand All @@ -377,41 +404,45 @@ package object s3 {
@inline def copyObject = "copyObject".asInstanceOf[Operation]
@inline def createBucket = "createBucket".asInstanceOf[Operation]
@inline def createMultipartUpload = "createMultipartUpload".asInstanceOf[Operation]
@inline def deleteBucket = "deleteBucket".asInstanceOf[Operation]
@inline def deleteBucketAnalyticsConfiguration = "deleteBucketAnalyticsConfiguration".asInstanceOf[Operation]
@inline def deleteBucketCors = "deleteBucketCors".asInstanceOf[Operation]
@inline def deleteBucketEncryption = "deleteBucketEncryption".asInstanceOf[Operation]
@inline def deleteBucket = "deleteBucket".asInstanceOf[Operation]
@inline def deleteBucketIntelligentTieringConfiguration = "deleteBucketIntelligentTieringConfiguration".asInstanceOf[Operation]
@inline def deleteBucketInventoryConfiguration = "deleteBucketInventoryConfiguration".asInstanceOf[Operation]
@inline def deleteBucketLifecycle = "deleteBucketLifecycle".asInstanceOf[Operation]
@inline def deleteBucketMetricsConfiguration = "deleteBucketMetricsConfiguration".asInstanceOf[Operation]
@inline def deleteBucketOwnershipControls = "deleteBucketOwnershipControls".asInstanceOf[Operation]
@inline def deleteBucketPolicy = "deleteBucketPolicy".asInstanceOf[Operation]
@inline def deleteBucketReplication = "deleteBucketReplication".asInstanceOf[Operation]
@inline def deleteBucketTagging = "deleteBucketTagging".asInstanceOf[Operation]
@inline def deleteBucketWebsite = "deleteBucketWebsite".asInstanceOf[Operation]
@inline def deleteObject = "deleteObject".asInstanceOf[Operation]
@inline def deleteObjectTagging = "deleteObjectTagging".asInstanceOf[Operation]
@inline def deleteObjects = "deleteObjects".asInstanceOf[Operation]
@inline def deleteObjectTagging = "deleteObjectTagging".asInstanceOf[Operation]
@inline def deletePublicAccessBlock = "deletePublicAccessBlock".asInstanceOf[Operation]
@inline def getBucketAccelerateConfiguration = "getBucketAccelerateConfiguration".asInstanceOf[Operation]
@inline def getBucketAcl = "getBucketAcl".asInstanceOf[Operation]
@inline def getBucketAnalyticsConfiguration = "getBucketAnalyticsConfiguration".asInstanceOf[Operation]
@inline def getBucketCors = "getBucketCors".asInstanceOf[Operation]
@inline def getBucketEncryption = "getBucketEncryption".asInstanceOf[Operation]
@inline def getBucketIntelligentTieringConfiguration = "getBucketIntelligentTieringConfiguration".asInstanceOf[Operation]
@inline def getBucketInventoryConfiguration = "getBucketInventoryConfiguration".asInstanceOf[Operation]
@inline def getBucketLifecycleConfiguration = "getBucketLifecycleConfiguration".asInstanceOf[Operation]
@inline def getBucketLocation = "getBucketLocation".asInstanceOf[Operation]
@inline def getBucketLogging = "getBucketLogging".asInstanceOf[Operation]
@inline def getBucketMetricsConfiguration = "getBucketMetricsConfiguration".asInstanceOf[Operation]
@inline def getBucketNotificationConfiguration = "getBucketNotificationConfiguration".asInstanceOf[Operation]
@inline def getBucketOwnershipControls = "getBucketOwnershipControls".asInstanceOf[Operation]
@inline def getBucketPolicy = "getBucketPolicy".asInstanceOf[Operation]
@inline def getBucketPolicyStatus = "getBucketPolicyStatus".asInstanceOf[Operation]
@inline def getBucketReplication = "getBucketReplication".asInstanceOf[Operation]
@inline def getBucketRequestPayment = "getBucketRequestPayment".asInstanceOf[Operation]
@inline def getBucketTagging = "getBucketTagging".asInstanceOf[Operation]
@inline def getBucketVersioning = "getBucketVersioning".asInstanceOf[Operation]
@inline def getBucketWebsite = "getBucketWebsite".asInstanceOf[Operation]
@inline def getObjectAcl = "getObjectAcl".asInstanceOf[Operation]
@inline def getObject = "getObject".asInstanceOf[Operation]
@inline def getObjectAcl = "getObjectAcl".asInstanceOf[Operation]
@inline def getObjectLegalHold = "getObjectLegalHold".asInstanceOf[Operation]
@inline def getObjectLockConfiguration = "getObjectLockConfiguration".asInstanceOf[Operation]
@inline def getObjectRetention = "getObjectRetention".asInstanceOf[Operation]
Expand All @@ -421,41 +452,44 @@ package object s3 {
@inline def headBucket = "headBucket".asInstanceOf[Operation]
@inline def headObject = "headObject".asInstanceOf[Operation]
@inline def listBucketAnalyticsConfigurations = "listBucketAnalyticsConfigurations".asInstanceOf[Operation]
@inline def listBucketIntelligentTieringConfigurations = "listBucketIntelligentTieringConfigurations".asInstanceOf[Operation]
@inline def listBucketInventoryConfigurations = "listBucketInventoryConfigurations".asInstanceOf[Operation]
@inline def listBucketMetricsConfigurations = "listBucketMetricsConfigurations".asInstanceOf[Operation]
@inline def listBuckets = "listBuckets".asInstanceOf[Operation]
@inline def listMultipartUploads = "listMultipartUploads".asInstanceOf[Operation]
@inline def listObjectVersions = "listObjectVersions".asInstanceOf[Operation]
@inline def listObjects = "listObjects".asInstanceOf[Operation]
@inline def listObjectsV2 = "listObjectsV2".asInstanceOf[Operation]
@inline def listObjectVersions = "listObjectVersions".asInstanceOf[Operation]
@inline def listParts = "listParts".asInstanceOf[Operation]
@inline def putBucketAccelerateConfiguration = "putBucketAccelerateConfiguration".asInstanceOf[Operation]
@inline def putBucketAcl = "putBucketAcl".asInstanceOf[Operation]
@inline def putBucketAnalyticsConfiguration = "putBucketAnalyticsConfiguration".asInstanceOf[Operation]
@inline def putBucketCors = "putBucketCors".asInstanceOf[Operation]
@inline def putBucketEncryption = "putBucketEncryption".asInstanceOf[Operation]
@inline def putBucketIntelligentTieringConfiguration = "putBucketIntelligentTieringConfiguration".asInstanceOf[Operation]
@inline def putBucketInventoryConfiguration = "putBucketInventoryConfiguration".asInstanceOf[Operation]
@inline def putBucketLifecycleConfiguration = "putBucketLifecycleConfiguration".asInstanceOf[Operation]
@inline def putBucketLogging = "putBucketLogging".asInstanceOf[Operation]
@inline def putBucketMetricsConfiguration = "putBucketMetricsConfiguration".asInstanceOf[Operation]
@inline def putBucketNotificationConfiguration = "putBucketNotificationConfiguration".asInstanceOf[Operation]
@inline def putBucketOwnershipControls = "putBucketOwnershipControls".asInstanceOf[Operation]
@inline def putBucketPolicy = "putBucketPolicy".asInstanceOf[Operation]
@inline def putBucketReplication = "putBucketReplication".asInstanceOf[Operation]
@inline def putBucketRequestPayment = "putBucketRequestPayment".asInstanceOf[Operation]
@inline def putBucketTagging = "putBucketTagging".asInstanceOf[Operation]
@inline def putBucketVersioning = "putBucketVersioning".asInstanceOf[Operation]
@inline def putBucketWebsite = "putBucketWebsite".asInstanceOf[Operation]
@inline def putObjectAcl = "putObjectAcl".asInstanceOf[Operation]
@inline def putObject = "putObject".asInstanceOf[Operation]
@inline def putObjectAcl = "putObjectAcl".asInstanceOf[Operation]
@inline def putObjectLegalHold = "putObjectLegalHold".asInstanceOf[Operation]
@inline def putObjectLockConfiguration = "putObjectLockConfiguration".asInstanceOf[Operation]
@inline def putObjectRetention = "putObjectRetention".asInstanceOf[Operation]
@inline def putObjectTagging = "putObjectTagging".asInstanceOf[Operation]
@inline def putPublicAccessBlock = "putPublicAccessBlock".asInstanceOf[Operation]
@inline def restoreObject = "restoreObject".asInstanceOf[Operation]
@inline def selectObjectContent = "selectObjectContent".asInstanceOf[Operation]
@inline def uploadPartCopy = "uploadPartCopy".asInstanceOf[Operation]
@inline def uploadPart = "uploadPart".asInstanceOf[Operation]
@inline def uploadPartCopy = "uploadPartCopy".asInstanceOf[Operation]
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package facade.amazonaws.services.s3.managedupload

import facade.amazonaws.AWSConfig
import facade.amazonaws.services.s3
import org.scalatest.funsuite.AsyncFunSuite

import scala.concurrent.ExecutionContext

class ManagedUploadTest extends AsyncFunSuite {
override implicit val executionContext = ExecutionContext.Implicits.global

test("ManagedUpload") {
val instance = new ManagedUpload(ManagedUploadOptions(
params = s3.PutObjectRequest(
Bucket = "test",
Key = "test",
Body = ""
),
service = new s3.S3(AWSConfig(endpoint = "localhost"))
))

assert(instance.maxTotalParts > 0)
assert(instance.minPartSize > 0)
instance.sendFuture().failed.map { e =>
succeed
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package facade.amazonaws.services.s3

import facade.amazonaws.AWSConfig
import org.scalatest.funsuite.AsyncFunSuite

import scala.concurrent.ExecutionContext

class S3Test extends AsyncFunSuite {
override implicit val executionContext = ExecutionContext.Implicits.global

val service = new S3(AWSConfig(endpoint = "localhost"))

test("upload") {
val upload = service.upload(PutObjectRequest(
Bucket = "test",
Key = "test",
Body = ""
))

assert(upload.maxTotalParts > 0)
assert(upload.minPartSize > 0)

upload.sendFuture().failed.map { e =>
succeed
}
}

test("uploadFuture") {
val uploadFuture = service.uploadFuture(PutObjectRequest(
Bucket = "test",
Key = "test",
Body = ""
))

uploadFuture.failed.map { e =>
succeed
}
}
}

0 comments on commit 0e9cd28

Please sign in to comment.