Giter VIP home page Giter VIP logo

Comments (8)

hygor-knust avatar hygor-knust commented on June 15, 2024 1

No, it is working normally.

from fs2-blobstore.

gafiatulin avatar gafiatulin commented on June 15, 2024

Thank you for reporting. It probably doesn't matter much, but do you know which SFTP server is used on the other side?

from fs2-blobstore.

hygor-knust avatar hygor-knust commented on June 15, 2024

The server is running OpenSSH_4.3

from fs2-blobstore.

gafiatulin avatar gafiatulin commented on June 15, 2024

@hygorkn
Sorry for such a delayed response.

Can you please provide your app (do you use IOApp or something else) and SFTP session setups, rough estimate on file sizes in SFTP, and number of cores/memory in the environment where your app is running.

I tried to replicate the issue and was not able to: blobstore-sftp-test. Here it is listing 50k files: https://github.com/gafiatulin/blobstore-sftp-test/runs/2763544039?check_suite_focus=true#step:5:17

from fs2-blobstore.

hygor-knust avatar hygor-knust commented on June 15, 2024

Hi @gafiatulin,

I was trying to mock up a stream processing to see if I could replace an Akka Streams service that I have, so I wrote a simple fs2 script using import cats.effect.unsafe.implicits.global.

To test I was trying to download, gunzip and parse 7500 xml.gz files, totalizing ~500mb.

But after checking the test you wrote, I tried removing the implicits and using IOApp instead. After that!

Here is what the code looked like:

  val session: IO[Session] = IO {
    val jsch = new JSch()
    val session = jsch.getSession("sftp-user", "sftp-host", 22)
    val proxy = new ProxySOCKS5("proxy-host", 1080)

    proxy.setUserPasswd("proxy-user", "proxy-password")

    session.setConfig("StrictHostKeyChecking", "no")
    session.setPassword("sftp-password")
    session.setTimeout(60 * 60)
    session.setProxy(proxy);

    session
  }

  val store: Resource[IO, SftpStore[IO]] = SftpStore(session, Some(32))
  val pmFolder = "/some/huge/directory"

  def main(args: List[String]): Unit = {
    val xs = fs2.Stream
      .resource(store)
      .flatMap(_.list(Path(pmFolder), true))
      .evalMap(file => store.use { _.get(file, 1024).compile.toList  })
      .flatMap(fs2.Stream.emits)
      .through(fs2.compression.Compression[IO].gunzip(1024))
      .flatMap(_.content)
      .through(fs2.io.toInputStream)
      .map(file => parse(IOUtils.toString(file)))
      .debug()
      .compile
      .toList()

    println(xs.length)
  }

Thanks for your support!

from fs2-blobstore.

gafiatulin avatar gafiatulin commented on June 15, 2024

@hygorkn Is it still blocking after you replaced cats.effect.unsafe.implicits.global with IOApp?

from fs2-blobstore.

gafiatulin avatar gafiatulin commented on June 15, 2024

Great! I'm going to close the issue then.

@hygorkn Couple notes regarding code from this comment: #413 (comment):

  1. You probably want to reuse the same store instance you used for listing to get the data as well.
  2. It doesn't really make sense to compile Stream[O, Byte] to List[Byte] and then unwrap resulting list using Stream.emits back to be Stream[O, Byte]
  3. Can use through(fs2.text.utf8Decode).compile.foldMonoid instead of going through InputStream and, I assume, org.apache.commons.io.IOUtils to get string content from byte stream.

Maybe something like this:

val session: IO[Session] = IO.delay {
  val jsch = new JSch()
  val session = jsch.getSession("user", "localhost", 2222)
  session.setTimeout(10000)
  session.setPassword("password")
  val config = new Properties
  config.put("StrictHostKeyChecking", "no")
  session.setConfig(config)
  session
}

val store: Resource[IO, SftpStore[IO]] = SftpStore(session, Some(32))
val pmFolder = "/some/huge/directory"

// Not sure what is this
type T = ???
def parse(str: String): T = ???
// Not sure what is this

def listContents(
    sftpStore: SftpStore[IO],
    folderPath: Path.Plain
): IO[List[T]] =
  sftpStore
    .list(folderPath, recursive = true)
    .evalMap { file => // Can also use parEvalMap/parEvalMapUnordered
      val fileContent: IO[String] = sftpStore
        .get(file, 1024)
        .through(Compression[IO].gunzip(1024))
        .flatMap(_.content)
        .through(fs2.text.utf8Decode)
        .compile
        .foldMonoid

      fileContent.map(parse)
    }
    .debug()
    .compile
    .toList

override def run(args: List[String]): IO[ExitCode] = store
  .use { sftpStore =>
    for {
      l <- listContents(sftpStore, Path(pmFolder))
      _ <- Console[IO].println(l.length)
    } yield ()
  }
  .as(ExitCode.Success)

from fs2-blobstore.

hygor-knust avatar hygor-knust commented on June 15, 2024

In the current solution, the parser is the bottleneck of the stream. So I separated the download and parse pipes so I can control how many threads each one uses. Something like:

parEvalMap(4)(download).parEvalMap(24)(parse)

I'll try to apply you suggestions and see how it performs.

Thank you very much!

from fs2-blobstore.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.