Giter VIP home page Giter VIP logo

spark-search's Introduction

Maven Central CI license LoC codecov

Spark Search brings advanced full text search features to your Dataframe, Dataset and RDD. Powered by Apache Lucene.

Context

Let's assume you have a billion records dataset you want to query on and match against another one using full text search... You do not expect an external datasource or database system than Spark, and of course with the best performances. Spark Search fits your needs: it builds for all parent RDD partitions a one-2-one volatile Lucene index available during the lifecycle of your spark session across your executors local directories and RAM. Strongly typed, Spark Search supports Java and Scala RDD and plans to support Python, Spark SQL and Dataset. Have a look and feel free to contribute!

Getting started

RDD API

  • Scala
import org.apache.spark.search.rdd._ // to implicitly enhance RDD with search features

// Load some Amazon computer user reviews
val computersReviews: RDD[Review] = loadReviews("**/*/reviews_Computers.json.gz") 
    // Number of partition is the number of Lucene index which will be created across your cluster
    .repartition(4)

// Count positive review: indexation + count matched doc with fuzzy matching
computersReviews.count("reviewText:happy OR reviewText:best OR reviewText:good OR reviewText:\"sounds great\"~")

// Search for key words
computersReviews.searchList("reviewText:\"World of Warcraft\" OR reviewText:\"Civilization IV\"",
 topK = 100, minScore = 10)
  .foreach(println)

// /!\ Important lucene indexation is done each time a SearchRDD is computed,
// if you do multiple operations on the same parent RDD, you might have a variable in the driver:
val computersReviewsSearchRDD: SearchRDD[Review] = computersReviewsRDD.searchRDD(
  SearchOptions.builder[Review]() // See all other options SearchOptions, IndexationOptions and ReaderOptions
    .read((r: ReaderOptions.Builder[Review]) => r.defaultFieldName("reviewText"))
    .analyzer(classOf[EnglishAnalyzer])
    .build())

// Boolean queries and boosting examples returning RDD
computersReviewsSearchRDD.search("(RAM OR memory) AND (CPU OR processor~)^4", 15)
        .collect()
        .foreach(println)

// Fuzzy matching
computersReviews.searchList("(reviewerName:Mikey~0.8) OR (reviewerName:Wiliam~0.4) OR (reviewerName:jonh~0.2)",
                                      topKByPartition = 10)
                        .map(doc => s"${doc.source.reviewerName}=${doc.score}")
                        .foreach(println)

// RDD full text joining - example here searches for persons
// who did both computer and software reviews with fuzzy matching on reviewer name
val softwareReviews: RDD[Review] = loadReviews("**/*/reviews_Software_10.json.gz")
val matchesReviewers: RDD[(Review, Array[SearchRecord[Review]])] = computersReviews.matches(
                             softwareReviewsRDD.filter(_.reviewerName != null).map(sr => (sr.asin, sr)),
                            (sr: Review) => "reviewerName:\"" + sr.reviewerName + "\"~0.4",
                             topK = 10)
                            .values
matchesReviewersRDD
  .filter(_._2.nonEmpty)
  .map(m => (s"Reviewer ${m._1.reviewerName} reviews computer ${m._1.asin} but also on software:",
          m._2.map(h => s"${h.source.reviewerName}=${h.score}=${h.source.asin}").toList))
  .collect()
  .foreach(println)

// Drop duplicates
println("Dropping duplicated reviewers:")
val distinctReviewers: RDD[String] = computersReviews.searchDropDuplicates[Int, Review](
 queryBuilder = queryStringBuilder(sr => "reviewerName:\"" + sr.reviewerName.replace('"', ' ') + "\"~0.4")
).map(sr => sr.reviewerName)
distinctReviewers.collect().foreach(println)

// Save then restore onto hdfs
matchesReviewersRDD.save("/tmp/hdfs-pathname")
val restoredSearchRDD: SearchRDD[Review] = SearchRDD.load[Review](sc, "/tmp/hdfs-pathname")

// Restored index can be used as classical rdd
val topReviewer = restoredSearchRDD.map(r => (r.reviewerID, 1))
        .reduceByKey(_ + _)
        .sortBy(_._2, ascending = false)
        .take(1).head
println(s"${topReviewer._1} has submitted ${topReviewer._2} reviews")

See Examples and Documentation for more details.

  • Java
import org.apache.spark.search.rdd.*;

class SearchRDDJava {
 public void examples() {
  System.err.println("Loading reviews...");
  JavaRDD<Review> reviewsRDD = loadReviewRDD(spark, "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Computers.json.gz");

  // Create the SearchRDD based on the JavaRDD to enjoy search features
  SearchRDDJava<Review> computerReviews = SearchRDDJava.of(reviewsRDD, Review.class);

  // Count matching docs
  System.err.println("Computer reviews with good recommendations: "
          + computerReviews.count("reviewText:good AND reviewText:quality"));
  
  // List matching docs
  System.err.println("Reviews with good recommendations and fuzzy: ");
  SearchRecordJava<Review>[] goodReviews = computerReviews
          .searchList("reviewText:recommend~0.8", 100, 0);
  Arrays.stream(goodReviews).forEach(r -> System.err.println(r));

  // Pass custom search options
  computerReviews = SearchRDDJava.<Review>builder()
          .rdd(reviewsRDD)
          .runtimeClass(Review.class)
          .options(SearchOptions.<Review>builder().analyzer(ShingleAnalyzerWrapper.class).build())
          .build();

  System.err.println("Top 100 reviews from Patosh with fuzzy with 0.5 minimum score:");
  computerReviews.search("reviewerName:Patrik~0.5", 100, 0.5)
          .map(SearchRecordJava::getSource)
          .map(Review::getReviewerName)
          .distinct()
          .collect()
          .foreach(r -> System.err.println(r));

  System.err.println("Loading software reviews...");
  JavaRDD<Review> softwareReviews = loadReviewRDD(spark, "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Software_10.json.gz");

  System.err.println("Top 10 reviews from same reviewer between computer and software:");
  computerReviews.matches(softwareReviews.filter(r -> r.reviewerName != null && !r.reviewerName.isEmpty())
                          .mapToPair(sr -> new Tuple2<String, Review>(sr.asin, sr)),
                  r -> String.format("reviewerName:\"%s\"~0.4", r.reviewerName.replaceAll("[\"]", " ")), 10, 0)
          .values()
          .filter(matches -> matches._2.length > 0)
          .map(sameReviewerMatches -> String.format("Reviewer:%s reviews computer %s and software %s (score on names matching are %s)",
                  sameReviewerMatches._1.reviewerName,
                  sameReviewerMatches._1.asin,
                  Arrays.stream(sameReviewerMatches._2).map(h -> h.source.asin).collect(toList()),
                  Arrays.stream(sameReviewerMatches._2).map(h -> h.source.reviewerName + ":" + h.score).collect(toList())
          ))
          .collect()
          .foreach(matches -> System.err.println(matches));

  // Save and search reload example
  SearchRDDJava.of(softwareReviews.repartition(8), Review.class)
          .save("/tmp/hdfs-pathname");
  SearchRDDJava<Review> restoredSearchRDD = SearchRDDJava
          .load(sc, "/tmp/hdfs-pathname", Review.class);
  System.err.println("Software reviews with good recommendations: "
          + restoredSearchRDD.count("reviewText:good AND reviewText:quality"));
 }
}

See Examples and Documentationfor more details.

  • Python (In progress)
from pyspark import SparkContext
import pysparksearch

data = [{"firstName": "Geoorge", "lastName": "Michael"},
         {"firstName": "Bob", "lastName": "Marley"},
         {"firstName": "Agnès", "lastName": "Bartoll"}]

sc = SparkContext()

sc.parallelize(data).count("firstName:agnes~")

Dataset/DataFrame API (In progress)

  • Scala
import org.apache.spark.search.sql._

val sentences = spark.read.csv("...")
sentences.count("sentence:happy OR sentence:best OR sentence:good")

// coming soon: SearchSparkStrategy/LogicPlan & column enhanced with search
sentences.where($"sentence".matches($"searchKeyword" ))

Benchmark

All benchmarks run under AWS EMR with 3 Spark workers EC2 m5.xlarge and/or 3 r5.large.elasticsearch data nodes for AWS Elasticsearch. The general use cases is to match company names against two data sets (7M vs 600K rows)

Feature SearchRDD Elasticsearch Hadoop LuceneRDD Spark regex matches (no score)
Index + Count matches 51s 486s (*) 400s 12s
Index + Entity matching 128s 719s (*) 597s NA (>1h)

DISCLAIMER Benchmarks methodology or related results may improve, feel free to submit a pull request.

(*) Results of elasticsearch hadoop benchmark must be carefully reviewed, contribution welcomed

Release notes

v0.2.0
  • SearchRDD#searchJoin renamed to SearchRDD#matches as it does automatically the reduction in addition of a simple join.
  • Fix matches was using only one core & improve join and dropDuplicate performances drastically
  • Scala 2.12 by default
v0.1.9
  • Fix deployment descriptor for scala 2.11
v0.1.8
  • SearchRDD is now iterable as a classical RDD, reloaded RDD can now be used as any other RDD
  • Upgrade support matrix from spark-2.4.8 & hadoop-2.10.1 to spark-3.1.2 & hadoop-3.3.1, built by default for scala 2.12
v0.1.7
  • Enable caching of search index rdd only for yarn cluster, and as an option.
  • Remove scala binary version in parent module artifact name
  • Expose SearchRDD as a public API to ease Dataset binding and hdfs reloading
  • Fix and enhance Search Java RDD API
  • Fix string query builder does not support analyzer
v0.1.6
  • Switch to multi modules build: core, sql, examples, benchmark
  • Improve the github build with running examples against a spark cluster in docker
  • Improve licence header checking
  • RDD lineage works the same on all DAG Scheduler (Yarn/Standalone): SearchIndexRDD computes zipped index per partition for the next rdd
  • CI tests examples under Yarn and Standalone cluster mode
  • Fix default field where not used under certain circumstances
v0.1.5
  • Fix SearchRDD#searchDropDuplicate method
  • Save/Restore search RDD to/from HDF
  • Yarn support and tested over AWS EMR
  • Adding and running benchmark examples with alternatives libraries on AWS EMR
  • Support of spark 3.0.0
v0.1.4
  • Optimize searchJoin for small num partition
v0.1.3
  • Fix searchJoin on multiple partitions
v0.1.2
  • Released to maven central
v0.1.1
  • First stable version of the Scala Spark Search RDD
  • Support of SearchRDD#searchJoin(RDD, S => String) - join 2 RDD by matching queries
  • Support of SearchRDD#dropDuplicates(S => String) - deduplicate an RDD based on matching query
v0.1.0
  • Support of SearchRDD#count(String) - count matching hits
  • Support of SearchRDD#searchList(String) - search matching records as list
  • Support of SearchRDD#search(String) - search matching records as RDD

Installation Spark Search

  • Maven
<dependency>
  <groupId>io.github.phymbert</groupId>
  <artifactId>spark-search_2.12</artifactId>
  <version>${spark.search.version}</version>
</dependency>
  • Gradle
implementation 'io.github.phymbert:spark-search_2.12:$sparkSearchVersion'

Building Spark Search

git clone https://github.com/phymbert/spark-search.git
cd spark-search
mvn clean verify

Known alternatives

spark-search's People

Contributors

dependabot[bot] avatar phymbert avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

spark-search's Issues

Indexation fails with NoSuchFileException while indexing huge data

For small amounts of data not getting this issue. But if we try to process huge data, then we get the following exception. Any guidance here could be great help.

22/09/16 08:17:34 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 32.0 in stage 47.0 (TID 1449) (offerexposure-cluster-naidu-1-w-10.c.wmt-mtech-offerexposure-stg.internal executor 1): org.apache.spark.search.SearchException: indexation failed on partition 1 and directory /tmp/spark-search/application_1663309524066_0008-sparksearch-rdd149-index-1
	at org.apache.spark.search.rdd.SearchPartitionIndex.monitorIndexation(SearchPartitionIndex.java:145)
	at org.apache.spark.search.rdd.SearchPartitionIndex.index(SearchPartitionIndex.java:82)
	at org.apache.spark.search.rdd.SearchRDDIndexer.compute(SearchRDDIndexer.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$6(BlockManager.scala:1461)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$6$adapted(BlockManager.scala:1459)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:70)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1459)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.search.rdd.SearchRDDCartesian.compute(SearchRDDCartesian.scala:54)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.file.NoSuchFileException: /tmp/spark-search/application_1663309524066_0008-sparksearch-rdd149-index-1/pending_segments_1
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
	at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
	at java.nio.file.Files.delete(Files.java:1126)
	at org.apache.lucene.store.FSDirectory.privateDeleteFile(FSDirectory.java:370)
	at org.apache.lucene.store.FSDirectory.deleteFile(FSDirectory.java:339)
	at org.apache.lucene.store.LockValidatingDirectoryWrapper.deleteFile(LockValidatingDirectoryWrapper.java:38)
	at org.apache.lucene.index.IndexFileDeleter.deleteFile(IndexFileDeleter.java:705)
	at org.apache.lucene.index.IndexFileDeleter.deleteFiles(IndexFileDeleter.java:699)
	at org.apache.lucene.index.IndexFileDeleter.<init>(IndexFileDeleter.java:238)
	at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:1089)
	at org.apache.spark.search.rdd.SearchPartitionIndex.lambda$index$1(SearchPartitionIndex.java:90)
	at org.apache.spark.search.rdd.SearchPartitionIndex.monitorIndexation(SearchPartitionIndex.java:128)
	... 29 more

Thanks,
Naidu

Examples are not properly working on CI

See action #130

Same for java & scala examples:

2021-08-20T22:36:18.0615405Z Some typo in names:
2021-08-20T22:36:24.0708492Z Downloading software reviews...
2021-08-20T22:36:27.3572928Z Joined software and computer reviews by reviewer names:
2021-08-20T22:36:36.7498019Z Dropping duplicated reviewers:
2021-08-20T22:37:23.4888234Z Restoring from previous indexation:
2021-08-20T22:37:30.1019131Z 8103 positive reviews after restoration

Document how to deploy to maven central

Link to:
https://oss.sonatype.org/
https://central.sonatype.org/publish/publish-maven/
https://central.sonatype.org/publish/publish-guide/#releasing-to-central
https://oss.sonatype.org/#view-repositories;releases~browsestorage~io/github/phymbert
https://issues.sonatype.org/browse/OSSRH-58231

Fix sources & javadoc should be done during package lifecycle and during deploy and maven:release (need to be added to parent & modules).

At the moment:

mvn clean source:jar javadoc:jar verify nexus-staging:deploy nexus-staging:deploy-staged nexus-staging:release -DskipTests -P deploy,scala-2.11

To deploy not the parent and other scala version

clean source:jar javadoc:jar verify nexus-staging:deploy nexus-staging:deploy-staged nexus-staging:release -DskipTests -pl core,sql  -P deploy,scala-2.12

Upon release, your component will be published to Central: this typically occurs within 30 minutes, though updates to search can take up to four hours.

SearchJavaRDD error: had a not serializable result: java.lang.Object

When running SearchRDDJavaExamples:

Serialization stack:
	- object not serializable (class: java.lang.Object, value: java.lang.Object@6528d1e1)
	- field (class: org.apache.spark.search.package$SearchRecord, name: source, type: class java.lang.Object)
	- object (class org.apache.spark.search.package$SearchRecord, SearchRecord(8399,1,1.7722454071044922,0,java.lang.Object@6528d1e1))
	- element of array (index: 0)
	- array (class [Lorg.apache.spark.search.package$SearchRecord;, size 100)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)

On Yarn partition are indexed multiple time

If SearchIndexedRDD is not cached on yarn, child RDD is not run on the same workers, triggering new indexation.
The opposite, if it is cached, MatchRDD partition is not located on the same executor and the index directory is not found.

Standalone Spark when no cache:

20/06/28 21:41:31 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 46, 172.21.0.6, executor 2): org.apache.lucene.index.IndexNotFoundException: no segments* file found in MMapDirectory@/tmp/spark-search-rdd9-index-0 lockFactory=org.apache.lucene.store.NoLockFactory@253f0c73: files: []
        at org.apache.lucene.index.SegmentInfos$FindSegmentsFile.run(SegmentInfos.java:675)
        at org.apache.lucene.index.StandardDirectoryReader.open(StandardDirectoryReader.java:84)
        at org.apache.lucene.index.DirectoryReader.open(DirectoryReader.java:76)
        at org.apache.lucene.index.DirectoryReader.open(DirectoryReader.java:64)
        at org.apache.spark.search.rdd.SearchPartitionReader.<init>(SearchPartitionReader.java:67)
        at org.apache.spark.search.rdd.SearchRDD.reader(SearchRDD.scala:206)
        at org.apache.spark.search.rdd.MatchRDD.compute(MatchRDD.scala:55)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)

IndexNotFoundException

hello - im attempting to use this library v0.2 in a yarn, with my driver running on the cluster

I am encountering the following exception -

Caused by: org.apache.lucene.index.IndexNotFoundException: no segments* file found in MMapDirectory@/local/hadoop/disksdl/yarn/nodemanager/usercache/spotci/appcache/application_1617967855014_1171701/container_e136_1617967855014_1171701_02_000001/tmp/spark-search/application_1617967855014_1171701-sparksearch-rdd0-index-3 lockFactory=org.apache.lucene.store.NoLockFactory@4a1941a4: files: [] at org.apache.lucene.index.SegmentInfos$FindSegmentsFile.run(SegmentInfos.java:715) at org.apache.lucene.index.StandardDirectoryReader.open(StandardDirectoryReader.java:84) at org.apache.lucene.index.DirectoryReader.open(DirectoryReader.java:64)

Im wondering if there was any info on where to start looking for why it would be empty?

thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.