Giter VIP home page Giter VIP logo

fs2-blobstore's Introduction

fs2-blobstore

CI Release Maven Central codecov Join the chat at https://gitter.im/fs2-blobstore/Lobby

Unified Scala interface based on fs2 for hierarhical and flat object stores. This library lets you integrate fs2 programs with various storage technologies such as S3, GCS, Azure Blob Storage, SFTP and Box. It also offers an interface that abstracts over the underlying storage technology, this lets you write fs2 programs that are agnostic to what storage provider files are hosted on.

Quick Start

libraryDependencies ++= Seq(
  "com.github.fs2-blobstore" %% "core"  % "<version>",
  "com.github.fs2-blobstore" %% "sftp"  % "<version>",
  "com.github.fs2-blobstore" %% "s3"    % "<version>",
  "com.github.fs2-blobstore" %% "gcs"   % "<version>",
  "com.github.fs2-blobstore" %% "azure" % "<version>",
  "com.github.fs2-blobstore" %% "box"   % "<version>",
) 

Head over to the microsite for documentation

fs2-blobstore's People

Contributors

daddykotex avatar daenyth avatar dependabot[bot] avatar dougc avatar gafiatulin avatar igosuki avatar jgogstad avatar kubukoz avatar loran-steinberger avatar lumengxi avatar mergify[bot] avatar moradology avatar robertbutacu avatar rolandomanrique avatar scala-steward avatar stew avatar teddywilson avatar tpolecat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fs2-blobstore's Issues

Putting a file to a directory path when absRoot is set doesn't work (regression)

The implementation of StfpStore.mkdirs changed in the 0.6.1 release (042ac30) and broke the way directories are created when putting a file to an absolute directory path.

E.g.
Say you are using a user with a home dir of /home/user1/ then if you create an SftpStore with an absRoot value of /foo/bar and put a file using a path of tmp it will be correctly placed in /foo/bar/tmp.

However, if you attempt to put a file to a path of baz/tmp then the store will create a directory called /home/usr1/home/user1/baz and then the underlying ChannelSftp.put will fail with a File not found error.

The problem lies in the SftpStore.mkdirs function. It takes /foo/bar/baz/tmp, splits it on '/' to get ["", "foo", "bar", "baz", "tmp"], then discards empty strings to get ["foo", "bar", "baz", "tmp"] and starts to build up paths for mkdir as "foo", "foo/bar", and "foo/bar/baz", but loses the leading slash so these all become directories relative the current directory, i.e. the user's home dir.

Azure: get seems to sometimes fail

During a heavy test, I see sometimes the get failing:

java.lang.Error: received record [java.nio.HeapByteBuffer[pos=0 lim=615 cap=615]] in invalid state [UpstreamCompletion]
	at fs2.interop.reactivestreams.StreamSubscriber$.$anonfun$fsm$5(StreamSubscriber.scala:119)
	at cats.effect.concurrent.Ref$SyncRef.spin$4(Ref.scala:316)
	at cats.effect.concurrent.Ref$SyncRef.$anonfun$modify$1(Ref.scala:320)
	at zio.internal.FiberContext.evaluateNow(FiberContext.scala:361)
	at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:775)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

I'm making a test case which hopefully will demonstrate the issue

Scala 3 support

I started with this, but at the moment we have a dependency at shapeless for the url module.

What would you propose to get things working?

Support for Multipart Uploads for content of a known size in S3Store

Hi,

For reference, I've been using the following artefacts for this example:

"com.github.fs2-blobstore" %% "core" % "0.7.3",
"com.github.fs2-blobstore" %% "s3"   % "0.7.3"

Apologies if this is already covered somewhere. I've been playing around with the S3Store a bit and it seems that it's only possible to initiate a multipart upload for an object when directed down the putUnknownSize branch. It's possible that I'm just doing something odd, as this is my first real exposure to fs2 and the V2 AWS Java SDK, so as a crude example:

import blobstore.Path
import blobstore.s3.S3Store
import cats.effect.{ExitCode, IO, IOApp, Resource}
import fs2.{Chunk, Stream}
import software.amazon.awssdk.auth.credentials.{AwsSessionCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient

import scala.util.Random

object TestApp extends IOApp {

  private val GB = 1024L * 1024L * 1024L
  private val ObjectSize = (5L * GB) + 1L

  private val myBucket: String = ??? // TODO - fill in
  private val myKey: String = ??? // TODO - fill in
  private val myRegion: Region = ??? // TODO - fill in

  override def run(args: List[String]): IO[ExitCode] =
    s3AsyncClientResource
      .flatMap(s3Client => Resource.liftF(S3Store[IO](s3Client, None, None)))
      .use { s3Store =>
        fileInputStream
//          .through(s3Store.put(Path(s"s3://$myBucket/$myKey")))
          .through(s3Store.put(Path(s"s3://$myBucket/$myKey").withSize(Some(ObjectSize))))
          .compile
          .drain
      }
      .as(ExitCode.Success)

  private def s3AsyncClientResource: Resource[IO, S3AsyncClient] =
    Resource.fromAutoCloseable(IO(S3AsyncClient.builder().region(myRegion).build()))

  // Just some random bytes, I don't really care for the purposes of this example
  private def fileInputStream: Stream[IO, Byte] =
    Stream.constant[IO, Unit](())
      .flatMap(_ => Stream.eval(IO(Random.nextBytes(1024))))
      .flatMap(bytes => Stream.chunk(Chunk.bytes(bytes)))
      .take(ObjectSize)

}

This currently fails when trying to upload to S3 with an S3 error with the message Your proposed upload exceeds the maximum allowed size. If I change the through to .through(s3Store.put(Path(s"s3://$myBucket/$myKey"))) (i.e. we no longer specify the object size) then the multipart upload starts as expected.

Would be possible to add support for MPUs with a known content size? Ideally as a part of this it would also be possible to avoid buffering the entirety of each part into memory for each UploadPartRequest - we'll know the total size of the object and how many bytes we've currently processed so I think that we could work out the size of the upcoming part and hopefully do something similar to the request in putKnownSize (which doesn't seem to require buffering the entire object into memory). This is less of a problem with smaller objects (since we can use a smaller buffer size), but with objects near the S3 object limit (5TB) we are limited to 10,000 parts so must use a large buffer size (e.g. 500 MB), which is a significant extra overhead.

Path append is removing leading slash

Hi!

Thanks for the great library, I am playing around a little bit, sorry if it is misuse.

Noticed that if a absolute path is added to file:// url, the leading slash is removed.

In my case I was parsing the absolute path (/my/dir/test/) from config, and appending to the url, and it wasn't finding anything.

import blobstore.fs.FileStore
import blobstore.url.Url
import cats.effect.IO
import cats.implicits._
import cats.effect.unsafe.implicits.global

val suffix = "/my/dir/test/"
// suffix: String = "/my/dir/test/"
val a      = Url.unsafe("file://") / suffix
// a: Url.Plain = Url(
//   scheme = "file",
//   authority = Authority(
//     host = Hostname(labels = Singleton(a = Label(value = "localhost"))),
//     userInfo = None,
//     port = None
//   ),
//   path = RootlessPath(
//     representation = "my/dir/test/",
//     segments = Append(leftNE = Wrap(seq = List("my", "dir", "test")), rightNE = Singleton(a = ""))
//   )
// )
val b      = Url.unsafe(s"file://$suffix")
// b: Url.Plain = Url(
//   scheme = "file",
//   authority = Authority(
//     host = Hostname(labels = Singleton(a = Label(value = "localhost"))),
//     userInfo = None,
//     port = None
//   ),
//   path = AbsolutePath(
//     representation = "/my/dir/test/",
//     segments = Wrap(seq = List("my", "dir", "test", ""))
//   )
// )

a == b
// res0: Boolean = false

val fs = FileStore[IO].lift(_.path.valid)
// fs: blobstore.Store[IO, blobstore.fs.NioPath] = blobstore.Store$DelegatingStore@26024048

fs.list(a).compile.toList.unsafeRunSync()
// res1: List[Url[blobstore.fs.NioPath]] = List()
fs.list(b).compile.toList.unsafeRunSync()
// res2: List[Url[blobstore.fs.NioPath]] = List(
//   Url(
//     scheme = "file",
//     authority = Authority(
//       host = Hostname(labels = Singleton(a = Label(value = "localhost"))),
//       userInfo = None,
//       port = None
//     ),
//     path = AbsolutePath(
//       representation = NioPath(
//         path = /my/dir/test/test1.txt,
//         size = Some(value = 0L),
//         isDir = false,
//         lastModified = Some(value = 2023-09-19T08:25:14.874880710Z)
//       ),
//       segments = Wrap(seq = List("my", "dir", "test", "test1.txt"))
//     )
//   )
// )

Anyways not sure if it is intended, thanks again

Stream not completing when cancelling a CompletableFuture after passing to blobstore.util.liftJavaFuture

Hey,

I've been using the following artefacts for this example:

"com.github.fs2-blobstore" %% "core" % "0.7.3",
"com.github.fs2-blobstore" %% "s3"   % "0.7.3"

I've been fiddling around with the S3Store a bit more and ran into some unexpected behaviour with what appeared to be the HTTP client from the S3 client cancelling the future and the stream not terminating. I've had trouble reproducing exactly, but I've tried to create a simpler example to see if this could have happened.

NB: It's definitely possible that I'm either misunderstanding something, or that my expectations on what should happen are wrong - apologies if so.

object Test extends IOApp {
  import scala.concurrent.duration._

  private val cancelFuture = (c: CompletableFuture[String]) => IO(c.cancel(true)).void
  private val successfulFuture = (c: CompletableFuture[String]) => IO(c.complete("Success!")).void
  private val failFuture = (c: CompletableFuture[String]) => IO(c.completeExceptionally(new Exception("Oh no!"))).void

  override def run(args: List[String]): IO[ExitCode] = {
    def stream(completeFuture: CompletableFuture[String] => IO[Unit]): IO[Unit] =
      fs2.Stream(123)
        .evalMap(_ => doInCompletableFuture(completeFuture))
        .compile
        .drain

    for {
      s <- stream(successfulFuture).attempt
      _ <- IO(println(s"successfulFuture completed with $s"))
      f <- stream(failFuture).attempt
      _ <- IO(println(s"failFuture completed with $f"))
      c <- stream(cancelFuture).attempt
      _ <- IO(println(s"cancelFuture completed with $c"))
    } yield ExitCode.Success
  }

  private def doInCompletableFuture(completeFuture: CompletableFuture[String] => IO[Unit]): IO[Unit] = {
    val c = new CompletableFuture[String]()

    val r = for {
      // Just doing it in the background to try to make sure that the CompletableFuture is not completed when 
      // we enter the liftJavaFuture method
      _ <- IO.sleep(5.seconds).flatMap(_ => completeFuture(c)).background
      _ <- Resource.eval(blobstore.util.liftJavaFuture(IO.pure(c)))
    } yield ()

    r.use(IO.pure)
  }

}

I'd expected the program to complete, but it hangs indefinitely on the stream(cancelFuture).attempt . Is this expected behaviour and I'm just doing something silly?

Inconsistent and undefined behavior on put to path with pre-existing content

When puting to pre-existing Path different behavior is observed for different stores:
FileStore prepends new content in front of old, while all other stores overwrite.

Overwrite is one valid option, as well as failing the stream.
Need to decide on what is desired behavior and make all stores adhere to it.
Might want to support both, by adding overwrite: Boolean parameter to put method.

Delete recursively

  • Synonymous with delete a prefix on GCS and S3. Client might need some special handling with special options set.
  • Needs walking the file tree on other stores

SftpStore cannot remove directories

The SftpStore.remove method always calls the underlying channel.rm function, even if the isDir value in the Path is set to true. So attempting to delete a directory on the remote system will always fail.

Solution would seem to be to check isDir and call channel.rmdir if it is true

List recursively

Support use case of listing all leafs (files) given the path:

.../folder/a
.../folder/b
.../folder/c
.../folder/sub-folder/d
.../folder/sub-folder/sub-sub-folder/e
=>
list(folder, recursive = false) -> [a, b, c, sub-folder]
list(folder, recursive = true)  -> [a, b, c, d, e]

All stores should be able to list all files accessible from given path no matter how deep in the sub-folders file is located.

Blob stores (gcs, s3, azure) support this natively, for sftp, box and fs can do recursive descend.

No paths where isDir == true should be returned.

Add support for ranged read.

Some blob stores support ranged gets a la HTTP Range header.

Currently this can be emulated by dropping and taking on returned stream, but this still requires downloading all data to be dropped later. Consider for example extreme case of downloading entire content just to check if csv file has trailing newline while knowing the size of that file.

Can expose this in a Store[F].get by falling back to dropping and taking for stores that don't support ranged access.

Relevant information about ranged downloads:

Refactor type checks, casts and throws in BoxStore

Scalafix in #17 complains about throws, type checks and casts in BoxStore.

Make BoxStore's functions pure and total. Remove the @SuppressWarnings, run sbt scalafix and fix.

I need to set up a Box account before working on it.

`S3Store.stat` always returns a Url even if the key doesn't exist in the s3 bucket

override def stat[A](url: Url[A]): Stream[F, Url[S3Blob]] =

Documentation on Store.stat is a little lacking, but my understanding is that it should return a stream with a single url if the object exists in the store, and an empty stream if the object does not exist, so that it's possible to determine if an object is in the store before trying to, say, get it.

The implementation of S3Store.stat always returns a stream containing a single url, even if the key does not exist in s3.

Batch operations

Would it be possible to support batch operations, we now have a case where we upload ~1600 individual files of 2 kilobytes which is a bit slow.

  • Do existing http 1.x apis support this
  • Would it work over http2 ?
  • What stores would suppor this?

Java 8 Support

With v0.8.3, my application throws java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer when using S3Store on Java 8. Has Java 8 support been dropped?

Support store specific path metadata

The Path abstraction is used across all stores and represents the common denominator for paths:

final case class Path(root: String, key: String, size: Option[Long], isDir: Boolean, lastModified: Option[Date]) {
  override def toString: String = s"$root/$key"
}

See brief discussion in #5

It has a couple of issues:

  1. root and key assumes that paths has two components. This is always the case for blob stores, but not necessarily the case for file stores. Note that the current workaround key="" works fine, we just need to filter for blank Strings in file stores, and the current toString is a bit broken. It feels a bit clunky though.
  2. No support for per-path configuration. It's possible (common?) in S3 and GCS to configure ACL's/IAM policies, encryption, life cycle management, storage classes, versioning, and so on on a per-object basis.

Come up with a new design that allows for the following use cases:

  • List blobs on S3 and filter based on storage class. For instance, "list all files that are not on Glacier"
  • Write files to S3 with per-object configuration, for instance "encrypt objects matching predicate with some key"

This should of course generalize to GCS as well.

Existing use cases should still be supported, for instance

s3Store
  .list(Path("foo/bar"))
  .map(blob => s3Store.get(blob).through(gcsStore.put(blob)))

Source compatibility is not a goal, but it should be straight forward to list files in one store and write them to another.

Path breaks on file name with spaces in them

When there is a space in the Path, it throws an IllegalArgumentException

Exception in thread "main" java.lang.IllegalArgumentException: Unable to extract path from 'tmp/docs/reports/0003b97c-d6bc-4496-96b4-3dc128fbdc49-my report.csv'
	at blobstore.Path$.$anonfun$apply$1(Path.scala:219)
	at scala.Option.getOrElse(Option.scala:201)
	at blobstore.Path$.apply(Path.scala:219)
	at blobstore.PathOps$AppendOps.$div(PathOps.scala:18)
	at com.garnercorp.Main$.delayedEndpoint$com$garnercorp$Main$1(Main.scala:11)
	at com.garnercorp.Main$delayedInit$body.apply(Main.scala:6)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1(App.scala:73)
	at scala.App.$anonfun$main$1$adapted(App.scala:73)
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:921)
	at scala.App.main(App.scala:73)
	at scala.App.main$(App.scala:71)
	at com.garnercorp.Main$.main(Main.scala:6)
	at com.garnercorp.Main.main(Main.scala)

To reproduce:

import blobstore.Path
import blobstore.PathOps._

object Main extends App {
  val root = "/tmp/docs/reports"
  val goodFileName = "0003b97c-d6bc-4496-96b4-3dc128fbdc49-my-report.csv"
  val badFileName = "0003b97c-d6bc-4496-96b4-3dc128fbdc49-my report.csv"
  println(Path(root) / goodFileName) // fine
  println(Path(root) / badFileName) // blows up
}

SftpStore list hangs when there are too many files in directory

I'm developing a process to list for changes in a SFTP server and download the new files.

However, when there are too many files in a directory, the list method hangs.

val store = SftpStore(sftpConnection, Some(32))
  val x = store
    .allocated
    .map(_._1)
    .unsafeRunSync()
    .list(Path(hugeFolder))
    .take(10)
    .compile
    .toList
    .unsafeRunSync()

  println(x) // Never reaches here
  val y = store.use { client =>
    client
      .list(Path(hugeFolder))
      .take(10)
      .compile
      .toList
  }.unsafeRunSync()

  println(y) // Never reaches here
  val z = Stream
    .resource(store)
    .flatMap(f => f.list(Path(hugeFolder)))
    .take(10)
    .compile
    .toList
    .unsafeRunSync()

  println(z) // Never reaches here

The hugeFolder dir contains arround 7k files. If I change it to a small directory (10 files) the code works fine.

These are the dependencies:

  def ivyDeps = Agg(
    ivy"co.fs2::fs2-core:3.0.4",
    ivy"co.fs2::fs2-io:3.0.4",
    ivy"org.typelevel::cats-effect:3.1.0",
    ivy"com.github.fs2-blobstore::core:0.9.0-beta3",
    ivy"com.github.fs2-blobstore::sftp:0.9.0-beta3"
  )

Thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.