Comments (8)
No, it is working normally.
from fs2-blobstore.
Thank you for reporting. It probably doesn't matter much, but do you know which SFTP server is used on the other side?
from fs2-blobstore.
The server is running OpenSSH_4.3
from fs2-blobstore.
@hygorkn
Sorry for such a delayed response.
Can you please provide your app (do you use IOApp or something else) and SFTP session setups, rough estimate on file sizes in SFTP, and number of cores/memory in the environment where your app is running.
I tried to replicate the issue and was not able to: blobstore-sftp-test. Here it is listing 50k files: https://github.com/gafiatulin/blobstore-sftp-test/runs/2763544039?check_suite_focus=true#step:5:17
from fs2-blobstore.
Hi @gafiatulin,
I was trying to mock up a stream processing to see if I could replace an Akka Streams service that I have, so I wrote a simple fs2 script using import cats.effect.unsafe.implicits.global
.
To test I was trying to download, gunzip and parse 7500 xml.gz files, totalizing ~500mb.
But after checking the test you wrote, I tried removing the implicits and using IOApp
instead. After that!
Here is what the code looked like:
val session: IO[Session] = IO {
val jsch = new JSch()
val session = jsch.getSession("sftp-user", "sftp-host", 22)
val proxy = new ProxySOCKS5("proxy-host", 1080)
proxy.setUserPasswd("proxy-user", "proxy-password")
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("sftp-password")
session.setTimeout(60 * 60)
session.setProxy(proxy);
session
}
val store: Resource[IO, SftpStore[IO]] = SftpStore(session, Some(32))
val pmFolder = "/some/huge/directory"
def main(args: List[String]): Unit = {
val xs = fs2.Stream
.resource(store)
.flatMap(_.list(Path(pmFolder), true))
.evalMap(file => store.use { _.get(file, 1024).compile.toList })
.flatMap(fs2.Stream.emits)
.through(fs2.compression.Compression[IO].gunzip(1024))
.flatMap(_.content)
.through(fs2.io.toInputStream)
.map(file => parse(IOUtils.toString(file)))
.debug()
.compile
.toList()
println(xs.length)
}
Thanks for your support!
from fs2-blobstore.
@hygorkn Is it still blocking after you replaced cats.effect.unsafe.implicits.global
with IOApp?
from fs2-blobstore.
Great! I'm going to close the issue then.
@hygorkn Couple notes regarding code from this comment: #413 (comment):
- You probably want to reuse the same store instance you used for listing to get the data as well.
- It doesn't really make sense to compile
Stream[O, Byte]
toList[Byte]
and then unwrap resulting list usingStream.emits
back to beStream[O, Byte]
- Can use
through(fs2.text.utf8Decode).compile.foldMonoid
instead of going through InputStream and, I assume,org.apache.commons.io.IOUtils
to get string content from byte stream.
Maybe something like this:
val session: IO[Session] = IO.delay {
val jsch = new JSch()
val session = jsch.getSession("user", "localhost", 2222)
session.setTimeout(10000)
session.setPassword("password")
val config = new Properties
config.put("StrictHostKeyChecking", "no")
session.setConfig(config)
session
}
val store: Resource[IO, SftpStore[IO]] = SftpStore(session, Some(32))
val pmFolder = "/some/huge/directory"
// Not sure what is this
type T = ???
def parse(str: String): T = ???
// Not sure what is this
def listContents(
sftpStore: SftpStore[IO],
folderPath: Path.Plain
): IO[List[T]] =
sftpStore
.list(folderPath, recursive = true)
.evalMap { file => // Can also use parEvalMap/parEvalMapUnordered
val fileContent: IO[String] = sftpStore
.get(file, 1024)
.through(Compression[IO].gunzip(1024))
.flatMap(_.content)
.through(fs2.text.utf8Decode)
.compile
.foldMonoid
fileContent.map(parse)
}
.debug()
.compile
.toList
override def run(args: List[String]): IO[ExitCode] = store
.use { sftpStore =>
for {
l <- listContents(sftpStore, Path(pmFolder))
_ <- Console[IO].println(l.length)
} yield ()
}
.as(ExitCode.Success)
from fs2-blobstore.
In the current solution, the parser is the bottleneck of the stream. So I separated the download and parse pipes so I can control how many threads each one uses. Something like:
parEvalMap(4)(download).parEvalMap(24)(parse)
I'll try to apply you suggestions and see how it performs.
Thank you very much!
from fs2-blobstore.
Related Issues (20)
- Add support for ranged read.
- weaver-test
- Batch operations HOT 6
- Scala 3 support HOT 2
- Azure: get seems to sometimes fail HOT 1
- Inconsistent and undefined behavior on put to path with pre-existing content HOT 2
- Support for Multipart Uploads for content of a known size in S3Store HOT 5
- Stream not completing when cancelling a CompletableFuture after passing to blobstore.util.liftJavaFuture HOT 2
- Establish benchmarks
- Replace JSch dependency with the fork that supports OPENSSH key format
- Java 8 Support HOT 3
- List recursively
- README is still based on v0.7.x HOT 3
- Path breaks on file name with spaces in them HOT 1
- `S3Store.stat` always returns a Url even if the key doesn't exist in the s3 bucket HOT 2
- Delete recursively
- Add support for Alibaba OSS HOT 1
- Add support for Backblaze B2 Storage
- Path append is removing leading slash HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fs2-blobstore.