Giter VIP home page Giter VIP logo

apache / incubator-wayang Goto Github PK

View Code? Open in Web Editor NEW
174.0 19.0 70.0 14.09 MB

Apache Wayang(incubating) is the first cross-platform data processing system.

Home Page: https://wayang.incubator.apache.org/

License: Apache License 2.0

Shell 0.55% Java 81.73% ANTLR 0.03% Scala 10.56% Groovy 0.12% Ruby 0.02% HTML 0.28% SCSS 0.02% CSS 0.17% JavaScript 0.04% Python 6.48%
cross-platform data-processing apache big-data data-management-platform distributed-system hadoop java jdbc middleware

incubator-wayang's Introduction

Apache Wayang (incubating) Wayang logo

The first open-source cross-platform data processing system

Maven central License Last commit GitHub commit activity (branch) GitHub forks GitHub Repo stars

Tweet Subreddit subscribers

Table of contents

Description

In contrast to traditional data processing systems that provide one dedicated execution engine, Apache Wayang (incubating) can transparently and seamlessly integrate multiple execution engines and use them to perform a single task. We call this cross-platform data processing. In Wayang, users can specify any data processing application using one of Wayang's APIs and then Wayang will choose the data processing platform(s), e.g., Postgres or Apache Spark, that best fits the application. Finally, Wayang will perform the execution, thereby hiding the different platform-specific APIs and coordinating inter-platform communication.

Apache Wayang (incubating) aims at freeing data engineers and software developers from the burden of learning all different data processing systems, their APIs, strengths and weaknesses; the intricacies of coordinating and integrating different processing platforms; and the inflexibility when trying a fixed set of processing platforms. As of now, Wayang has built-in support for the following processing platforms:

Apache Wayang (incubating) can be used via the following APIs:

  • Java native
  • Java scala-like
  • Scala
  • SQL (limited support of simple select-project queries for now)

Quick Guide for Running Wayang

For a quick guide on how to run WordCount see here.

Quick Guide for Developing with Wayang

For a quick guide on how to use Wayang in your Java/Scala project see here.

Installing Wayang

You first have to build the binaries as shown here. Once you have the binaries built, follow these steps to install Wayang:

tar -xvf wayang-0.7.1-snapshot.tar.gz
cd wayang-0.7.1-SNAPSHOT

In linux

echo "export WAYANG_HOME=$(pwd)" >> ~/.bashrc
echo "export PATH=${PATH}:${WAYANG_HOME}/bin" >> ~/.bashrc
source ~/.bashrc

In MacOS

echo "export WAYANG_HOME=$(pwd)" >> ~/.zshrc
echo "export PATH=${PATH}:${WAYANG_HOME}/bin" >> ~/.zshrc
source ~/.zshrc

Requirements at Runtime

Since Apache Wayang (incubating) is not an execution engine itself but rather manages the execution engines for you, it is important to have the necessary requirements installed.

  • Apache Wayang supports Java versions 8 and above. However, the Wayang team recommends using Java version 11. Don’t forget to set the JAVA_HOME environment variable.
  • You need to install Apache Spark version 3 or higher. Don’t forget to set the SPARK_HOME environment variable.
  • You need to install Apache Hadoop version 3 or higher. Don’t forget to set the HADOOP_HOME environment variable.

Validating the installation

To execute your first application with Apache Wayang, you need to execute your program with the 'wayang-submit' command:

bin/wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md

Getting Started

Wayang is available via Maven Central. To use it with Maven, include the following code snippet into your POM file:

<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-***</artifactId>
  <version>0.7.1</version>
</dependency>

Note the ***: Wayang ships with multiple modules that can be included in your app, depending on how you want to use it:

  • wayang-core: provides core data structures and the optimizer (required)
  • wayang-basic: provides common operators and data types for your apps (recommended)
  • wayang-api-scala-java: provides an easy-to-use Scala and Java API to assemble Wayang plans (recommended)
  • wayang-java, wayang-spark, wayang-graphchi, wayang-sqlite3, wayang-postgres: adapters for the various supported processing platforms
  • wayang-profiler: provides functionality to learn operator and UDF cost functions from historical execution data

NOTE: The module wayang-api-scala-java is intended to be used with Java 11 and Scala 2.12.

For the sake of version flexibility, you still have to include in the POM file your Hadoop (hadoop-hdfs and hadoop-common) and Spark (spark-core and spark-graphx) version of choice.

In addition, you can obtain the most recent snapshot version of Wayang via Sonatype's snapshot repository. Just include:

<repositories>
  <repository>
    <id>apache-snapshots</id>
    <name>Apache Foundation Snapshot Repository</name>
    <url>https://repository.apache.org/content/repositories/snapshots</url>
  </repository>
</repositories>

Prerequisites

Apache Wayang (incubating) is built with Java 11 and Scala 2.12. However, to run Apache Wayang it is sufficient to have just Java 11 installed. Please also consider that processing platforms employed by Wayang might have further requirements.

Java 11
[Scala 2.12]

NOTE: In windows, you need to define the variable HADOOP_HOME with the winutils.exe, an not official option to obtain this repository, or you can generate your winutils.exe following the instructions in the repository. Also, you may need to install msvcr100.dll

NOTE: Make sure that the JAVA_HOME environment variable is set correctly to Java 11 as the prerequisite checker script currently supports up to Java 11 and checks the latest version of Java if you have higher version installed. In Linux, it is preferably to use the export JAVA_HOME method inside the project folder. It is also recommended running './mvnw clean install' before opening the project using IntelliJ.

Building

If you need to rebuild Wayang, e.g., to use a different Scala version, you can simply do so via Maven:

  1. Adapt the version variables (e.g., spark.version) in the main pom.xml file.
  2. Build Wayang with the adapted versions.
    git clone https://github.com/apache/incubator-wayang.git
    cd incubator-wayang
    ./mvnw clean install -DskipTests

NOTE: If you receive an error about not finding MathExBaseVisitor, then the problem might be that you are trying to build from IntelliJ, without Maven. MathExBaseVisitor is generated code, and a Maven build should generate it automatically.

NOTE: In the current Maven setup, the version of scala is tied to the Java version, you can compile the profile scala-11 with Java 8 and profile scala-12 with Java 11.

NOTE: For compiling and testing the code it is required to have Hadoop installed on your machine.

NOTE: the standalone profile to fix Hadoop and Spark versions, so that Wayang apps do not explicitly need to declare the corresponding dependencies.

Also, note the distro profile, which assembles a binary Wayang distribution. To activate these profiles, you need to specify them when running maven, i.e.,

./mvnw clean install -DskipTests -P<profile name>

Running the tests

In the incubator-wayang root folder run:

./mvnw test

Example Applications

You can see examples on how to start using Wayang here

Built With

Contributing

Before submitting a PR, please take a look on how to contribute with Apache Wayang contributing guidelines here.

There is also a guide on how to compile your code here.

Authors

The list of contributors.

License

All files in this repository are licensed under the Apache Software License 2.0

Copyright 2020 - 2023 The Apache Software Foundation.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Acknowledgements

The Logo was donated by Brian Vera.

incubator-wayang's People

Contributors

2pk03 avatar adeelaslamunimore avatar adityagoel11 avatar atroudi avatar berttty avatar bgeng777 avatar calvinkirs avatar chrisdutz avatar damik3 avatar dependabot[bot] avatar ggevay avatar glauesppen avatar hboutemy avatar ichbinrich avatar joker-star-l avatar jonasrk avatar jorgequiane avatar juripetersen avatar kamir avatar kbeedkar avatar khayyatzy avatar kmoltke avatar luckyasser avatar michellesackmann avatar msanyoto avatar ro-pardo avatar sekruse avatar todoroki11 avatar yuhaoran1214 avatar zkaoudi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

incubator-wayang's Issues

validate if the implementation apply for the case

validate if the implementation apply for the case

final Optional fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);

if (fileSystem.isPresent()) {

final int KiB = 1024;

final int MiB = 1024 * KiB;

try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(

ObjectFileSource.this.inputUrl), 1 * MiB)) {

final BufferedReader bufferedReader = new BufferedReader(

new InputStreamReader(lis, ObjectFileSource.this.encoding)

);

char[] cbuf = new char[1024];

int numReadChars, numLineFeeds = 0;

while ((numReadChars = bufferedReader.read(cbuf)) != -1) {

for (int i = 0; i < numReadChars; i++) {

if (cbuf[i] == '\n') {

numLineFeeds++;

}

}

}

ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);

return OptionalDouble.empty();

}

return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);

} catch (IOException e) {

ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);

}

}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.basic.operators;

import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.fs.FileSystems;

/**
 * This source reads a text file and outputs the lines as data units.
 */
public class ObjectFileSource<T> extends UnarySource<T> {

    private final Logger logger = LogManager.getLogger(this.getClass());

    private final String inputUrl;

    private final Class<T> tClass;

    public ObjectFileSource(String inputUrl, DataSetType<T> type) {
        super(type);
        this.inputUrl = inputUrl;
        this.tClass = type.getDataUnitType().getTypeClass();
    }

    public ObjectFileSource(String inputUrl, Class<T> tClass) {
        super(DataSetType.createDefault(tClass));
        this.inputUrl = inputUrl;
        this.tClass = tClass;
    }

    /**
     * Copies an instance (exclusive of broadcasts).
     *
     * @param that that should be copied
     */
    public ObjectFileSource(ObjectFileSource that) {
        super(that);
        this.inputUrl = that.getInputUrl();
        this.tClass = that.getTypeClass();
    }

    public String getInputUrl() {
        return this.inputUrl;
    }

    public Class<T> getTypeClass(){
        return this.tClass;
    }

    @Override
    public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
            final int outputIndex,
            final Configuration configuration) {
        Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
        return Optional.of(new ObjectFileSource.CardinalityEstimator());
    }


    /**
     * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
     */
    protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {

        public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);

        public static final double CORRECTNESS_PROBABILITY = 0.95d;

        /**
         * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}.
         */
        public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05;

        @Override
        public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
            //TODO validate if the implementation apply for the case
            Validate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length);

            // see Job for StopWatch measurements
            final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
                    "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
            );

            // Query the job cache first to see if there is already an estimate.
            String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl);
            CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
            if (cardinalityEstimate != null) return  cardinalityEstimate;

            // Otherwise calculate the cardinality.
            // First, inspect the size of the file and its line sizes.
            OptionalLong fileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl);
            if (!fileSize.isPresent()) {
                ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
                        ObjectFileSource.this.inputUrl);
                timeMeasurement.stop();
                return this.FALLBACK_ESTIMATE;

            } else if (fileSize.getAsLong() == 0L) {
                timeMeasurement.stop();
                return new CardinalityEstimate(0L, 0L, 1d);
            }

            OptionalDouble bytesPerLine = this.estimateBytesPerLine();
            if (!bytesPerLine.isPresent()) {
                ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
                        ObjectFileSource.this.inputUrl);
                timeMeasurement.stop();
                return this.FALLBACK_ESTIMATE;
            }

            // Extrapolate a cardinality estimate for the complete file.
            double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
            double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
            cardinalityEstimate = new CardinalityEstimate(
                    (long) (numEstimatedLines - expectedDeviation),
                    (long) (numEstimatedLines + expectedDeviation),
                    CORRECTNESS_PROBABILITY
            );

            // Cache the result, so that it will not be recalculated again.
            optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);

            timeMeasurement.stop();
            return cardinalityEstimate;
        }

        /**
         * Estimate the number of bytes that are in each line of a given file.
         *
         * @return the average number of bytes per line if it could be determined
         */
        private OptionalDouble estimateBytesPerLine() {
            //TODO validate if the implementation apply for the case
//            final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);
//            if (fileSystem.isPresent()) {
//
//                // Construct a limited reader for the first x KiB of the file.
//                final int KiB = 1024;
//                final int MiB = 1024 * KiB;
//                try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(
//                    ObjectFileSource.this.inputUrl), 1 * MiB)) {
//                    final BufferedReader bufferedReader = new BufferedReader(
//                            new InputStreamReader(lis, ObjectFileSource.this.encoding)
//                    );
//
//                    // Read as much as possible.
//                    char[] cbuf = new char[1024];
//                    int numReadChars, numLineFeeds = 0;
//                    while ((numReadChars = bufferedReader.read(cbuf)) != -1) {
//                        for (int i = 0; i < numReadChars; i++) {
//                            if (cbuf[i] == '\n') {
//                                numLineFeeds++;
//                            }
//                        }
//                    }
//
//                    if (numLineFeeds == 0) {
//                        ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);
//                        return OptionalDouble.empty();
//                    }
//                    return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);
//                } catch (IOException e) {
//                    ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);
//                }
//            }

            return OptionalDouble.empty();
        }
    }

}

b203e321870282a9e7eb01c6d42418d849753199

should NOT be assigned an specific port, set port as 0 (zero)

should NOT be assigned an specific port, set port as 0 (zero)

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

e53ef66982b0ddda9c06e9e07b72b10c30fe48b0

remove the set parallelism 1

remove the set parallelism 1

        assert inputs.length == this.getNumInputs();
        assert outputs.length <= 1;
        final FileChannel.Instance output;
        final String targetPath;
        if(outputs.length == 1) {
            output = (FileChannel.Instance) outputs[0];
            targetPath = output.addGivenOrTempPath(this.textFileUrl, flinkExecutor.getConfiguration());
        }else{
            targetPath = this.textFileUrl;
        }

        //TODO: remove the set parallelism 1
        DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0];
        final DataSink<Type> tDataSink = input.<Type>provideDataSet()
                .write(new WayangFileOutputFormat<Type>(targetPath), targetPath, FileSystem.WriteMode.OVERWRITE)

034c188dd90f952249452462d3ed4c3af6231d37

How to get the config

How to get the config

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

45abbc7fea306b9325653d54f42c157c5caffcd7

ADD id to executions

ADD id to executions

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.general;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;


@RestController
public class WayangController {

    @GetMapping("/plan/create/fromfile")
    public String planFromFile(
            //@RequestParam("file") MultipartFile file
    ){

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);

            /*TODO ADD id to executions*/
            wpb.getWayangContext().execute(wpb.getWayangPlan());

        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Builder works";
    }

    @PostMapping("/plan/create")
    public String planFromMessage(
            @RequestParam("message") String message
    ){

        WayangPlanBuilder wpb = new WayangPlanBuilder(message);

        /*TODO ADD id to executions*/
        wpb.getWayangContext().execute(wpb.getWayangPlan());

        return "";
    }

    @GetMapping("/")
    public String all(){
        System.out.println("detected!");

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);

            WayangContext wc = buildContext(plan);
            WayangPlan wp = buildPlan(plan);

            System.out.println("Plan!");
            System.out.println(wp.toString());

            wc.execute(wp);
            return("Works!");

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Not working";
    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public static URI createUri(String resourcePath) {
        try {
            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

}

b85732398b37855b6b38d657127ec8b731d0cb6c

use config buffer size

use config buffer size

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.PythonUdf;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;

public class ProcessFeeder<Input, Output> {

    private Socket socket;
    private PythonUdf<Input, Output> udf;
    private ByteString serializedUDF;
    private Iterable<Input> input;

    //TODO add to a config file
    int END_OF_DATA_SECTION = -1;
    int NULL = -5;

    public ProcessFeeder(
            Socket socket,
            PythonUdf<Input, Output> udf,
            ByteString serializedUDF,
            Iterable<Input> input){

        if(input == null) throw new WayangException("Nothing to process with Python API");

        this.socket = socket;
        this.udf = udf;
        this.serializedUDF = serializedUDF;
        this.input = input;

    }

    public void send(){

        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
            DataOutputStream dataOut = new DataOutputStream(stream);

            writeUDF(serializedUDF, dataOut);
            this.writeIteratorToStream(input.iterator(), dataOut);
            dataOut.writeInt(END_OF_DATA_SECTION);
            dataOut.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUDF(ByteString serializedUDF, DataOutputStream dataOut){

        //write(serializedUDF.toByteArray(), dataOut);
        writeBytes(serializedUDF.toByteArray(), dataOut);
        System.out.println("UDF written");

    }

    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut){

        System.out.println("iterator being send");
        for (Iterator<Input> it = iter; it.hasNext(); ) {
            Input elem = it.next();
            //System.out.println(elem.toString());
            write(elem, dataOut);
        }
    }

    /*TODO Missing case PortableDataStream */
    public void write(Object obj, DataOutputStream dataOut){
        try {

            if(obj == null)
                dataOut.writeInt(this.NULL);

            /**
             * Byte Array cases
             */
            else if (obj instanceof Byte[] || obj instanceof byte[]) {
                System.out.println("Writing Bytes");
                writeBytes(obj, dataOut);
            }
            /**
             * String case
             * */
            else if (obj instanceof String)
                writeUTF((String) obj, dataOut);

            /**
             * Key, Value case
             * */
            else if (obj instanceof Map.Entry)
                writeKeyValue((Map.Entry) obj, dataOut);

            else{
                throw new WayangException("Unexpected element type " + obj.getClass());
            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeBytes(Object obj, DataOutputStream dataOut){

        try{

            if (obj instanceof Byte[]) {

                int length = ((Byte[]) obj).length;

                byte[] bytes = new byte[length];
                int j=0;

                // Unboxing Byte values. (Byte[] to byte[])
                for(Byte b: ((Byte[]) obj))
                    bytes[j++] = b.byteValue();

                dataOut.writeInt(length);
                dataOut.write(bytes);

            } else if (obj instanceof byte[]) {

                dataOut.writeInt(((byte[]) obj).length);
                dataOut.write(((byte[]) obj));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUTF(String str, DataOutputStream dataOut){

        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);

        try {

            dataOut.writeInt(bytes.length);
            dataOut.write(bytes);
        } catch (SocketException e){

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){

        write(obj.getKey(), dataOut);
        write(obj.getValue(), dataOut);
    }

}

1123270d0111584d60c442992a6fb02cee29f9b6

How to get the config

How to get the config

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

45abbc7fea306b9325653d54f42c157c5caffcd7

add unitary test to the elements in the file org.apache.wayang.api.JavaPlanBuild...

add unitary test to the elements in the file org.apache.wayang.api.JavaPlanBuilder.scala

* TODO: add unitary test to the elements in the file org.apache.wayang.api.JavaPlanBuilder.scala

 */

package org.apache.wayang.api
/**
 * TODO: add unitary test to the elements in the file org.apache.wayang.api.JavaPlanBuilder.scala
 * labels: unitary-test,todo
 */
import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache

f749b48e33189a4fec6508a1af48e238265907df

ADD id to executions

ADD id to executions

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.general;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;


@RestController
public class WayangController {

    @GetMapping("/plan/create/fromfile")
    public String planFromFile(
            //@RequestParam("file") MultipartFile file
    ){

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);

            /*TODO ADD id to executions*/
            wpb.getWayangContext().execute(wpb.getWayangPlan());

        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Builder works";
    }

    @PostMapping("/plan/create")
    public String planFromMessage(
            @RequestParam("message") String message
    ){

        WayangPlanBuilder wpb = new WayangPlanBuilder(message);

        /*TODO ADD id to executions*/
        wpb.getWayangContext().execute(wpb.getWayangPlan());

        return "";
    }

    @GetMapping("/")
    public String all(){
        System.out.println("detected!");

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);

            WayangContext wc = buildContext(plan);
            WayangPlan wp = buildPlan(plan);

            System.out.println("Plan!");
            System.out.println(wp.toString());

            wc.execute(wp);
            return("Works!");

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Not working";
    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public static URI createUri(String resourcePath) {
        try {
            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

}

87ee4971b5736963ad1dcede16329f8f8f3163c9

this should iterate through previous REDESIGN

this should iterate through previous REDESIGN

# TODO this should iterate through previous REDESIGN

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pickle
import cloudpickle
from config.config_reader import get_source_types
from config.config_reader import get_sink_types
from config.config_reader import get_boundary_types
import logging

pickle_protocol = pickle.HIGHEST_PROTOCOL


# Describes an Operation over an intermediate result
# Each operation could be processed by Python or Java platforms
class Operator:

    def __init__(
            self, operator_type=None, udf=None, previous=None,
            iterator=None, python_exec=False
    ):

        # Operator ID
        self.id = id(self)

        # Operator Type
        self.operator_type = operator_type

        # Set Boundaries
        if self.operator_type in get_boundary_types():
            self.boundary = True
        else:
            self.boundary = False

        # UDF Function
        self.udf = udf

        # Source types must come with an Iterator
        self.iterator = iterator
        if operator_type in get_source_types():
            if iterator is None:
                print("Source Operator Type without an Iterator")
                raise
            else:
                self.source = True
        else:
            self.source = False

        # Sink Operators
        if operator_type in get_sink_types():
            self.sink = True
        else:
            self.sink = False

        # TODO Why managing previous and predecessors per separate?
        self.previous = previous

        self.successor = []
        self.predecessor = []

        self.parameters = {}

        # Set predecessors and successors from previous
        if self.previous:
            for prev in self.previous:
                if prev is not None:
                    prev.set_successor(self)
                    self.set_predecessor(prev)

        self.python_exec = python_exec

        logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
                     str(self.python_exec) +
                     ", is boundary: " + str(self.is_boundary()) + ", is source: " +
                     str(self.source) + ", is sink: " + str(self.sink))

    def getID(self):
        return self.id

    def is_source(self):
        return self.source

    def is_sink(self):
        return self.sink

    def is_boundary(self):
        return self.boundary

    def serialize_udf(self):
        self.udf = cloudpickle.dumps(self.udf)

    def getIterator(self):
        if self.is_source():
            return self.iterator
        # TODO this should iterate through previous REDESIGN
        return self.udf(self.previous[0].getIterator())

    def set_parameter(self, key, value):
        self.parameters[key] = value

    def set_successor(self, suc):
        if (not self.is_sink()) and self.successor.count(suc) == 0:
            self.successor.append(suc)

    def set_predecessor(self, suc):
        if self.predecessor.count(suc) == 0:
            self.predecessor.append(suc)

552a18ba849015c40334bc9e43f351b2794a005a

Why managing previous and predecessors per separate?

Why managing previous and predecessors per separate?

# TODO Why managing previous and predecessors per separate?

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pickle
import cloudpickle
from config.config_reader import get_source_types
from config.config_reader import get_sink_types
from config.config_reader import get_boundary_types
import logging

pickle_protocol = pickle.HIGHEST_PROTOCOL


# Describes an Operation over an intermediate result
# Each operation could be processed by Python or Java platforms
class Operator:

    def __init__(
            self, operator_type=None, udf=None, previous=None,
            iterator=None, python_exec=False
    ):

        # Operator ID
        self.id = id(self)

        # Operator Type
        self.operator_type = operator_type

        # Set Boundaries
        if self.operator_type in get_boundary_types():
            self.boundary = True
        else:
            self.boundary = False

        # UDF Function
        self.udf = udf

        # Source types must come with an Iterator
        self.iterator = iterator
        if operator_type in get_source_types():
            if iterator is None:
                print("Source Operator Type without an Iterator")
                raise
            else:
                self.source = True
        else:
            self.source = False

        # Sink Operators
        if operator_type in get_sink_types():
            self.sink = True
        else:
            self.sink = False

        # TODO Why managing previous and predecessors per separate?
        self.previous = previous

        self.successor = []
        self.predecessor = []

        self.parameters = {}

        # Set predecessors and successors from previous
        if self.previous:
            for prev in self.previous:
                if prev is not None:
                    prev.set_successor(self)
                    self.set_predecessor(prev)

        self.python_exec = python_exec

        logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
                     str(self.python_exec) +
                     ", is boundary: " + str(self.is_boundary()) + ", is source: " +
                     str(self.source) + ", is sink: " + str(self.sink))

    def getID(self):
        return self.id

    def is_source(self):
        return self.source

    def is_sink(self):
        return self.sink

    def is_boundary(self):
        return self.boundary

    def serialize_udf(self):
        self.udf = cloudpickle.dumps(self.udf)

    def getIterator(self):
        if self.is_source():
            return self.iterator
        # TODO this should iterate through previous REDESIGN
        return self.udf(self.previous[0].getIterator())

    def set_parameter(self, key, value):
        self.parameters[key] = value

    def set_successor(self, suc):
        if (not self.is_sink()) and self.successor.count(suc) == 0:
            self.successor.append(suc)

    def set_predecessor(self, suc):
        if self.predecessor.count(suc) == 0:
            self.predecessor.append(suc)

33b13ed166582bb29212019ff9be3dc264b98994

add unitary test to the elements in the file org.apache.wayang.api.DataQuantaBui...

add unitary test to the elements in the file org.apache.wayang.api.DataQuantaBuilder.scala

* TODO: add unitary test to the elements in the file org.apache.wayang.api.DataQuantaBuilder.scala

package org.apache.wayang.api

/**
 * TODO: add unitary test to the elements in the file org.apache.wayang.api.DataQuantaBuilder.scala
 * labels: unitary-test,todo
 */
import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
import java.util.{Collection => JavaCollection}
import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}

c8c9df7f45ddaf1e16155a5a0e17c4deb7eb30a0

add the description for the implicitis in org.apache.wayang.api.graph

add the description for the implicitis in org.apache.wayang.api.graph

* TODO: add the description for the implicitis in org.apache.wayang.api.graph

  */
package object graph {

  /**
   * TODO: add the description for the implicitis in org.apache.wayang.api.graph
   * labels: documentation,todo
   */

  type Vertex = java.lang.Long

  type Edge = T2[Vertex, Vertex]

b1e0e4b397bbbbba3042a694a04106bbeddc22db

See what is happening with ENV Python version

See what is happening with ENV Python version

IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ff647ebc44c8a28c02079340b5b775e2a5564fbd

create reduce by

create reduce by

plan_dataquanta_sink.console()

# TODO create reduce by

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from orchestrator.plan import Descriptor
from orchestrator.dataquanta import DataQuantaBuilder
import datetime


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_sort(descriptor):
    plan = DataQuantaBuilder(descriptor)
    sink_dataquanta = \
        plan.source("../test/words.txt") \
            .sort(lambda elem: elem.lower()) \
            .sink("../test/output.txt", end="")
    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_sort_filter(descriptor):
    plan = DataQuantaBuilder(descriptor)
    sink_dataquanta = \
        plan.source("../test/words.txt") \
            .sort(lambda elem: elem.lower()) \
            .filter(lambda elem: str(elem).startswith("f")) \
            .sink("../test/output.txt", end="")
    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_filter_text(descriptor):
    plan = DataQuantaBuilder(descriptor)

    sink_dataquanta = \
        plan.source("../test/words.txt") \
            .filter(lambda elem: str(elem).startswith("f")) \
            .sink("../test/output.txt", end="")

    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_filter(descriptor):
    plan = DataQuantaBuilder(descriptor)

    sink_dataquanta = \
        plan.source("../test/numbers.txt") \
            .filter(lambda elem: int(elem) % 2 != 0) \
            .sink("../test/output.txt", end="")

    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_basic(descriptor):
    plan = DataQuantaBuilder(descriptor)

    sink_dataquanta = \
        plan.source("../test/lines.txt") \
            .sink("../test/output.txt", end="")

    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_junction(descriptor):

    plan = DataQuantaBuilder(descriptor)

    dq_source_a = plan.source("../test/lines.txt")
    dq_source_b = plan.source("../test/morelines.txt") \
        .filter(lambda elem: str(elem).startswith("I"))
    dq_source_c = plan.source("../test/lastlines.txt") \
        .filter(lambda elem: str(elem).startswith("W"))

    sink_dataquanta = dq_source_a.union(dq_source_b) \
        .union(dq_source_c) \
        .sort(lambda elem: elem.lower()) \
        .sink("../test/output.txt", end="")

    return sink_dataquanta


def plan_java_junction(descriptor):

    plan = DataQuantaBuilder(descriptor)

    dq_source_a = plan.source("../test/lines.txt")
    dq_source_b = plan.source("../test/morelines.txt")
    sink_dataquanta = dq_source_a.union(dq_source_b) \
        .filter(lambda elem: str(elem).startswith("I")) \
        .sort(lambda elem: elem.lower()) \
        .sink("../test/output.txt", end="")

    return sink_dataquanta


def plan_tpch_q1(descriptor):

    # TODO create reduce by
    plan = DataQuantaBuilder(descriptor)

    def reducer(obj1, obj2):
        return obj1[0], obj1[1], obj1[2] + obj2[2], obj1[3] + obj2[3], obj1[4] + obj2[4], obj1[5] + obj2[5], \
               obj1[6] + obj2[6], obj1[7] + obj2[7], obj1[8] + obj2[8], obj1[9] + obj2[9]

    sink = plan.source("../test/lineitem.txt") \
        .map(lambda elem: elem.split("|")) \
        .sink("../test/output.txt", end="")
    """
        .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime('1998-09-02', '%Y-%m-%d')) \
        .map(lambda elem:
             [elem[8], elem[9], elem[4], elem[5],
              float(elem[5]) * (1 - float(elem[6])),
              float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
              elem[4], elem[5],
              elem[6], 1]) \
        .sink("../test/output.txt", end="")"""
        # .reduce_by_key([0, 1], reducer) \


    return sink


def plan_full_java(descriptor):

    plan = DataQuantaBuilder(descriptor)

    dq_source_a = plan.source("../test/lines.txt")
    dq_source_b = plan.source("../test/morelines.txt")
    sink_dataquanta = dq_source_a.union(dq_source_b) \
        .sink("../test/output.txt", end="")

    return sink_dataquanta


def plan_wordcount(descriptor):

    plan = DataQuantaBuilder(descriptor)
    sink_wordcount = plan.source("../test/lineitem.txt") \
        .filter(lambda elem: len(str(elem).split("|")[0]) < 4) \
        .flatmap(lambda elem: str(elem).split("|")) \
        .sink("../test/output.txt", end="")

    return sink_wordcount


if __name__ == '__main__':

    # Plan will contain general info about the Wayang Plan created here
    descriptor = Descriptor()
    descriptor.add_plugin(Descriptor.Plugin.spark)
    descriptor.add_plugin(Descriptor.Plugin.java)

    plan_dataquanta_sink = plan_wordcount(descriptor)
    # plan_dataquanta_sink.execute()
    # plan_dataquanta_sink.console()

    plan_dataquanta_sink.to_wayang_plan()

7d744e87f3c7a2f596ae70bd906f86980648240c

create reduce by

create reduce by

.reduce_by(reducer) \

.flatmap(lambda elem: elem.split("|"))

.map(lambda elem: (elem, elem.split("|"))) \

L_RETURNFLAG 8

L_LINESTATUS 9

L_QUANTITY 4

L_EXTENDEDPRICE 5

discount 6

tax 7

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from orchestrator.plan import Descriptor
from orchestrator.dataquanta import DataQuantaBuilder
import datetime


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_sort(descriptor):
    plan = DataQuantaBuilder(descriptor)
    sink_dataquanta = \
        plan.source("../test/words.txt") \
            .sort(lambda elem: elem.lower()) \
            .sink("../test/output.txt", end="")
    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_sort_filter(descriptor):
    plan = DataQuantaBuilder(descriptor)
    sink_dataquanta = \
        plan.source("../test/words.txt") \
            .sort(lambda elem: elem.lower()) \
            .filter(lambda elem: str(elem).startswith("f")) \
            .sink("../test/output.txt", end="")
    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_filter_text(descriptor):
    plan = DataQuantaBuilder(descriptor)

    sink_dataquanta = \
        plan.source("../test/words.txt") \
            .filter(lambda elem: str(elem).startswith("f")) \
            .sink("../test/output.txt", end="")

    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_filter(descriptor):
    plan = DataQuantaBuilder(descriptor)

    sink_dataquanta = \
        plan.source("../test/numbers.txt") \
            .filter(lambda elem: int(elem) % 2 != 0) \
            .sink("../test/output.txt", end="")

    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_basic(descriptor):
    plan = DataQuantaBuilder(descriptor)

    sink_dataquanta = \
        plan.source("../test/lines.txt") \
            .sink("../test/output.txt", end="")

    return sink_dataquanta


# Returns the Sink Executable Dataquanta of a DEMO plan
def plan_junction(descriptor):

    plan = DataQuantaBuilder(descriptor)

    dq_source_a = plan.source("../test/lines.txt")
    dq_source_b = plan.source("../test/morelines.txt") \
        .filter(lambda elem: str(elem).startswith("I"))
    dq_source_c = plan.source("../test/lastlines.txt") \
        .filter(lambda elem: str(elem).startswith("W"))

    sink_dataquanta = dq_source_a.union(dq_source_b) \
        .union(dq_source_c) \
        .sort(lambda elem: elem.lower()) \
        .sink("../test/output.txt", end="")

    return sink_dataquanta


def plan_java_junction(descriptor):

    plan = DataQuantaBuilder(descriptor)

    dq_source_a = plan.source("../test/lines.txt")
    dq_source_b = plan.source("../test/morelines.txt")
    sink_dataquanta = dq_source_a.union(dq_source_b) \
        .filter(lambda elem: str(elem).startswith("I")) \
        .sort(lambda elem: elem.lower()) \
        .sink("../test/output.txt", end="")

    return sink_dataquanta


def plan_tpch_q1(descriptor):

    #TODO create reduce by
    plan = DataQuantaBuilder(descriptor)

    def reducer(obj1, obj2):
        return obj1[0]

    sink = plan.source("../test/lineitem.txt") \
        .map(lambda elem: elem.split("|")) \
        .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime("1998-09-02", '%Y-%m-%d')) \
        .map(lambda elem:
           [elem[8], elem[9], elem[4], elem[5],
            float(elem[5]) * (1 - float(elem[6])),
            float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
            elem[4], elem[5],
            elem[6], 1]) \
        .sink("../test/output.txt", end="")
        # .group_by(lambda elem: elem) \
        # .reduce_by(reducer) \
        # .flatmap(lambda elem: elem.split("|"))
        # .map(lambda elem: (elem, elem.split("|"))) \
        # L_RETURNFLAG 8
        # L_LINESTATUS 9
        # L_QUANTITY 4
        # L_EXTENDEDPRICE 5
        # discount 6
        # tax 7

    return dq_source_b


def plan_full_java(descriptor):

    plan = DataQuantaBuilder(descriptor)

    dq_source_a = plan.source("../test/lines.txt")
    dq_source_b = plan.source("../test/morelines.txt")
    sink_dataquanta = dq_source_a.union(dq_source_b) \
        .sink("../test/output.txt", end="")

    return sink_dataquanta


if __name__ == '__main__':

    # Plan will contain general info about the Wayang Plan created here
    descriptor = Descriptor()

    plan_dataquanta_sink = plan_tpch_q1(descriptor)
    plan_dataquanta_sink.execute()

aff53164d9e4797b22e4d4a00753ad8b2676ba93

add the documentation to the object org.apache.wayang.api.DataQuanta

add the documentation to the object org.apache.wayang.api.DataQuanta

* TODO: add the documentation to the object org.apache.wayang.api.DataQuanta

}

/**
 * TODO: add the documentation to the object org.apache.wayang.api.DataQuanta
 * labels: documentation,todo
 */
object DataQuanta {

  def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_] =

6bcf18f428fa2a8cc8989894ff0101b956e79a61

maybe would be better just leave the number as key

maybe would be better just leave the number as key

udf=func,

Nevertheless, current configuration runs it over Java

To execute the plan directly in the program driver

graph.print_adjlist()

Separates Python Plan into a List of Pipelines

UDF always will receive:

x: a Node object,

y: an object representing the result of the last iteration,

z: a collection to store final results inside your UDF

writer.write_message(self.descriptor)

# TODO maybe would be better just leave the number as key

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from orchestrator.operator import Operator
from graph.graph import Graph
from graph.traversal import Traversal
from protobuf.planwriter import MessageWriter
import itertools
import collections
import logging
from functools import reduce
import operator


# Wraps a Source operation to create an iterable
class DataQuantaBuilder:
    def __init__(self, descriptor):
        self.descriptor = descriptor

    def source(self, source):

        if type(source) is str:
            source_ori = open(source, "r")
        else:
            source_ori = source
        return DataQuanta(
            Operator(
                operator_type="source",
                udf=source,
                iterator=iter(source_ori),
                previous=[],
                python_exec=False
            ),
            descriptor=self.descriptor
        )


# Wraps an operation over an iterable
class DataQuanta:
    def __init__(self, operator=None, descriptor=None):
        self.operator = operator
        self.descriptor = descriptor
        if self.operator.is_source():
            self.descriptor.add_source(self.operator)
        if self.operator.is_sink():
            self.descriptor.add_sink(self.operator)

    # Operational Functions
    def filter(self, udf):
        def func(iterator):
            return filter(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="filter",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def flatmap(self, udf):

        def auxfunc(iterator):
            return itertools.chain.from_iterable(map(udf, iterator))

        def func(iterator):
            mapped = map(udf, iterator)
            flattened = flatten_single_dim(mapped)
            yield from flattened

        def flatten_single_dim(mapped):
            for item in mapped:
                for subitem in item:
                    yield subitem

        return DataQuanta(
            Operator(
                operator_type="flatmap",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def group_by(self, udf):
        def func(iterator):
            # TODO key should be given by "udf"
            return itertools.groupby(iterator, key=operator.itemgetter(0))
            #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))

        return DataQuanta(
            Operator(
                operator_type="group_by",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def map(self, udf):
        def func(iterator):
            return map(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="map",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    # Key specifies pivot dimensions
    # UDF specifies reducer function
    def reduce_by_key(self, keys, udf):

        op = Operator(
            operator_type="reduce_by_key",
            udf=udf,
            previous=[self.operator],
            python_exec=False
        )

        print(len(keys), keys)
        for i in range(0, len(keys)):
            """if keys[i] is int:
                op.set_parameter("vector_position|"+str(i), keys[i])
            else:
                op.set_parameter("dimension_key|"+str(i), keys[i])"""

            # TODO maybe would be better just leave the number as key
            op.set_parameter("dimension|"+str(i+1), keys[i])

        return DataQuanta(
            op,
            descriptor=self.descriptor
        )

    def reduce(self, udf):
        def func(iterator):
            return reduce(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="reduce",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def sink(self, path, end="\n"):
        def consume(iterator):
            with open(path, 'w') as f:
                for x in iterator:
                    f.write(str(x) + end)

        def func(iterator):
            consume(iterator)
            # return self.__run(consume)

        return DataQuanta(
            Operator(
                operator_type="sink",

                udf=path,
                # To execute directly uncomment
                # udf=func,

                previous=[self.operator],
                python_exec=False
            ),
            descriptor=self.descriptor
        )

    def sort(self, udf):

        def func(iterator):
            return sorted(iterator, key=udf)

        return DataQuanta(
            Operator(
                operator_type="sort",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    # This function allow the union to be performed by Python
    # Nevertheless, current configuration runs it over Java
    def union(self, other):

        def func(iterator):
            return itertools.chain(iterator, other.operator.getIterator())

        return DataQuanta(
            Operator(
                operator_type="union",
                udf=func,
                previous=[self.operator, other.operator],
                python_exec=False
            ),
            descriptor=self.descriptor
        )

    def __run(self, consumer):
        consumer(self.operator.getIterator())

    # Execution Functions
    def console(self, end="\n"):
        def consume(iterator):
            for x in iterator:
                print(x, end=end)

        self.__run(consume)

    # Only for debugging purposes!
    # To execute the plan directly in the program driver
    def execute(self):
        logging.warn("DEBUG Execution")
        logging.info("Reminder to swap SINK UDF value from path to func")
        logging.debug(self.operator.previous[0].operator_type)
        if self.operator.is_sink():
            logging.debug(self.operator.operator_type)
            logging.debug(self.operator.udf)
            logging.debug(len(self.operator.previous))
            self.operator.udf(self.operator.previous[0].getIterator())
        else:
            logging.error("Plan must call execute from SINK type of operator")
            raise RuntimeError

    # Converts Python Functional Plan to valid Wayang Plan
    def to_wayang_plan(self):

        sinks = self.descriptor.get_sinks()
        if len(sinks) == 0:
            return

        graph = Graph()
        graph.populate(self.descriptor.get_sinks())

        # Uncomment to check the Graph built
        # graph.print_adjlist()

        # Function to be consumed by Traverse
        # Separates Python Plan into a List of Pipelines
        def define_pipelines(node1, current_pipeline, collection):
            def store_unique(pipe_to_insert):
                for pipe in collection:
                    if equivalent_lists(pipe, pipe_to_insert):
                        return
                collection.append(pipe_to_insert)

            def equivalent_lists(l1, l2):
                if collections.Counter(l1) == collections.Counter(l2):
                    return True
                else:
                    return False

            if not current_pipeline:
                current_pipeline = [node1]

            elif node1.operator.is_boundary():
                store_unique(current_pipeline.copy())
                current_pipeline.clear()
                current_pipeline.append(node1)

            else:
                current_pipeline.append(node1)

            if node1.operator.sink:
                store_unique(current_pipeline.copy())
                current_pipeline.clear()

            return current_pipeline

        # Works over the graph
        trans = Traversal(
            graph=graph,
            origin=self.descriptor.get_sources(),
            # udf=lambda x, y, z: d(x, y, z)
            # UDF always will receive:
            # x: a Node object,
            # y: an object representing the result of the last iteration,
            # z: a collection to store final results inside your UDF
            udf=lambda x, y, z: define_pipelines(x, y, z)
        )

        # Gets the results of the traverse process
        collected_stages = trans.get_collected_data()

        # Passing the Stages to a Wayang message writer
        writer = MessageWriter()
        a = 0
        # Stage is composed of class Node objects
        for stage in collected_stages:
            a += 1
            logging.info("///")
            logging.info("stage" + str(a))
            writer.process_pipeline(stage)

        writer.set_dependencies()

        # Uses a file to provide the plan
        # writer.write_message(self.descriptor)

        # Send the plan to Wayang REST api directly
        writer.send_message(self.descriptor)

668a0429fb75425f983112ec63959c33341a58f7

add the documentation in the methods of org.apache.wayang.api.util.DataQuantaBui...

add the documentation in the methods of org.apache.wayang.api.util.DataQuantaBuilderDecorator

* TODO: add the documentation in the methods of org.apache.wayang.api.util.DataQuantaBuilderDecorator

/**
  * Utility to extend a [[DataQuantaBuilder]]'s functionality by decoration.
  */
/**
 * TODO: add the documentation in the methods of org.apache.wayang.api.util.DataQuantaBuilderDecorator
 * labels: documentation,todo
 */
abstract class DataQuantaBuilderDecorator[This <: DataQuantaBuilder[This, Out], Out]
(baseBuilder: DataQuantaBuilder[_, Out])
  extends DataQuantaBuilder[This, Out] {

5e1d9e1f024145b62c1c86bef26e3deddf5f2340

validate if the implementation apply for the case

validate if the implementation apply for the case

First, inspect the size of the file and its line sizes.

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.basic.operators;

import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.fs.FileSystems;

/**
 * This source reads a text file and outputs the lines as data units.
 */
public class ObjectFileSource<T> extends UnarySource<T> {

    private final Logger logger = LogManager.getLogger(this.getClass());

    private final String inputUrl;

    private final Class<T> tClass;

    public ObjectFileSource(String inputUrl, DataSetType<T> type) {
        super(type);
        this.inputUrl = inputUrl;
        this.tClass = type.getDataUnitType().getTypeClass();
    }

    public ObjectFileSource(String inputUrl, Class<T> tClass) {
        super(DataSetType.createDefault(tClass));
        this.inputUrl = inputUrl;
        this.tClass = tClass;
    }

    /**
     * Copies an instance (exclusive of broadcasts).
     *
     * @param that that should be copied
     */
    public ObjectFileSource(ObjectFileSource that) {
        super(that);
        this.inputUrl = that.getInputUrl();
        this.tClass = that.getTypeClass();
    }

    public String getInputUrl() {
        return this.inputUrl;
    }

    public Class<T> getTypeClass(){
        return this.tClass;
    }

    @Override
    public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
            final int outputIndex,
            final Configuration configuration) {
        Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
        return Optional.of(new ObjectFileSource.CardinalityEstimator());
    }


    /**
     * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
     */
    protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {

        public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);

        public static final double CORRECTNESS_PROBABILITY = 0.95d;

        /**
         * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}.
         */
        public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05;

        @Override
        public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
            //TODO validate if the implementation apply for the case
            Validate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length);

            // see Job for StopWatch measurements
            final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
                    "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
            );

            // Query the job cache first to see if there is already an estimate.
            String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl);
            CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
            if (cardinalityEstimate != null) return  cardinalityEstimate;

            // Otherwise calculate the cardinality.
            // First, inspect the size of the file and its line sizes.
            OptionalLong fileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl);
            if (!fileSize.isPresent()) {
                ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
                        ObjectFileSource.this.inputUrl);
                timeMeasurement.stop();
                return this.FALLBACK_ESTIMATE;

            } else if (fileSize.getAsLong() == 0L) {
                timeMeasurement.stop();
                return new CardinalityEstimate(0L, 0L, 1d);
            }

            OptionalDouble bytesPerLine = this.estimateBytesPerLine();
            if (!bytesPerLine.isPresent()) {
                ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
                        ObjectFileSource.this.inputUrl);
                timeMeasurement.stop();
                return this.FALLBACK_ESTIMATE;
            }

            // Extrapolate a cardinality estimate for the complete file.
            double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
            double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
            cardinalityEstimate = new CardinalityEstimate(
                    (long) (numEstimatedLines - expectedDeviation),
                    (long) (numEstimatedLines + expectedDeviation),
                    CORRECTNESS_PROBABILITY
            );

            // Cache the result, so that it will not be recalculated again.
            optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);

            timeMeasurement.stop();
            return cardinalityEstimate;
        }

        /**
         * Estimate the number of bytes that are in each line of a given file.
         *
         * @return the average number of bytes per line if it could be determined
         */
        private OptionalDouble estimateBytesPerLine() {
            //TODO validate if the implementation apply for the case
//            final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);
//            if (fileSystem.isPresent()) {
//
//                // Construct a limited reader for the first x KiB of the file.
//                final int KiB = 1024;
//                final int MiB = 1024 * KiB;
//                try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(
//                    ObjectFileSource.this.inputUrl), 1 * MiB)) {
//                    final BufferedReader bufferedReader = new BufferedReader(
//                            new InputStreamReader(lis, ObjectFileSource.this.encoding)
//                    );
//
//                    // Read as much as possible.
//                    char[] cbuf = new char[1024];
//                    int numReadChars, numLineFeeds = 0;
//                    while ((numReadChars = bufferedReader.read(cbuf)) != -1) {
//                        for (int i = 0; i < numReadChars; i++) {
//                            if (cbuf[i] == '\n') {
//                                numLineFeeds++;
//                            }
//                        }
//                    }
//
//                    if (numLineFeeds == 0) {
//                        ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);
//                        return OptionalDouble.empty();
//                    }
//                    return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);
//                } catch (IOException e) {
//                    ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);
//                }
//            }

            return OptionalDouble.empty();
        }
    }

}

7731165e0c2dfeb54dd56b576a364f28aa4f9d1f

add unitary test to the elements in the file org.apache.wayang.api.DataQuanta.sc...

add unitary test to the elements in the file org.apache.wayang.api.DataQuanta.scala

* TODO: add unitary test to the elements in the file org.apache.wayang.api.DataQuanta.scala

package org.apache.wayang.api

/**
 * TODO: add unitary test to the elements in the file org.apache.wayang.api.DataQuanta.scala
 * labels: unitary-test,todo
 */
import _root_.java.lang.{Iterable => JavaIterable}
import _root_.java.util.function.{Consumer, IntUnaryOperator, BiFunction => JavaBiFunction, Function => JavaFunction}
import _root_.java.util.{Collection => JavaCollection}

802365979c9912b9cbb1eedd524c7da7390e8c94

use config buffer size

use config buffer size

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Iterator;
/*TODO cannot be always string, include definition for every operator
*  like: map(udf, inputtype, outputtype)*/
public class ProcessReceiver<Output> {

    private ReaderIterator<Output> iterator;

    public ProcessReceiver(Socket socket){
        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
            this.iterator = new ReaderIterator<>(stream);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Iterable<Output> getIterable(){
        return () -> iterator;
    }

    public void print(){
        iterator.forEachRemaining(x -> System.out.println(x.toString()));

    }
}

00041f78a7f7fe83b09721ebb1d71cd4c2f14bae

add the documentation in the implicit of org.apache.wayang.api

add the documentation in the implicit of org.apache.wayang.api

* TODO: add the documentation in the implicit of org.apache.wayang.api

/**
  * Provides implicits for the basic Wayang API.
  */
/**
 * TODO: add the documentation in the implicit of org.apache.wayang.api
 * labels: documentation,todo
 */
package object api {

  implicit def basicDataUnitType[T](implicit classTag: ClassTag[T]): BasicDataUnitType[T] = {

afd91b27aa8122e38852815a840fd64dd31ecd63

See what is happening with ENV Python version

See what is happening with ENV Python version

IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ff647ebc44c8a28c02079340b5b775e2a5564fbd

use config buffer size

use config buffer size

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Iterator;
/*TODO cannot be always string, include definition for every operator
*  like: map(udf, inputtype, outputtype)*/
public class ProcessReceiver<Output> {

    private ReaderIterator<Output> iterator;

    public ProcessReceiver(Socket socket){
        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
            this.iterator = new ReaderIterator<>(stream);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Iterable<Output> getIterable(){
        return () -> iterator;
    }

    public void print(){
        iterator.forEachRemaining(x -> System.out.println(x.toString()));

    }
}

00041f78a7f7fe83b09721ebb1d71cd4c2f14bae

add unitary test to the elements in the file org.apache.wayang.api.RecordDataQua...

add unitary test to the elements in the file org.apache.wayang.api.RecordDataQuantaBuilder.scala

* TODO: add unitary test to the elements in the file org.apache.wayang.api.RecordDataQuantaBuilder.scala

 */

package org.apache.wayang.api
/**
 * TODO: add unitary test to the elements in the file org.apache.wayang.api.RecordDataQuantaBuilder.scala
 * labels: unitary-test,todo
 */
import org.apache.wayang.api.util.DataQuantaBuilderDecorator
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.function.ProjectionDescriptor

8d9c1b73814e5cb75c62e20fe537b2ca7652677b

Connect with predecessors requires more details in connection slot

Connect with predecessors requires more details in connection slot

/*TODO Connect with predecessors requires more details in connection slot*/

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.general;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;


@RestController
public class WayangController {

    @GetMapping("/plan/create/fromfile")
    public String planFromFile(
            //@RequestParam("file") MultipartFile file
    ){

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);

            /*TODO ADD id to executions*/
            wpb.getWayangContext().execute(wpb.getWayangPlan());

        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Builder works";
    }

    @PostMapping("/plan/create")
    public String planFromMessage(
            @RequestParam("message") String message
    ){

        WayangPlanBuilder wpb = new WayangPlanBuilder(message);

        /*TODO ADD id to executions*/
        wpb.getWayangContext().execute(wpb.getWayangPlan());

        return "";
    }

    @GetMapping("/")
    public String all(){
        System.out.println("detected!");

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);

            WayangContext wc = buildContext(plan);
            WayangPlan wp = buildPlan(plan);

            System.out.println("Plan!");
            System.out.println(wp.toString());

            wc.execute(wp);
            return("Works!");

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Not working";
    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public static URI createUri(String resourcePath) {
        try {
            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

}

46f8cdedd25da6c3205060ebd781dafe78222516

ADD id to executions

ADD id to executions

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.general;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;


@RestController
public class WayangController {

    @GetMapping("/plan/create/fromfile")
    public String planFromFile(
            //@RequestParam("file") MultipartFile file
    ){

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);

            /*TODO ADD id to executions*/
            wpb.getWayangContext().execute(wpb.getWayangPlan());

        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Builder works";
    }

    @PostMapping("/plan/create")
    public String planFromMessage(
            @RequestParam("message") String message
    ){

        WayangPlanBuilder wpb = new WayangPlanBuilder(message);

        /*TODO ADD id to executions*/
        wpb.getWayangContext().execute(wpb.getWayangPlan());

        return "";
    }

    @GetMapping("/")
    public String all(){
        System.out.println("detected!");

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);

            WayangContext wc = buildContext(plan);
            WayangPlan wp = buildPlan(plan);

            System.out.println("Plan!");
            System.out.println(wp.toString());

            wc.execute(wp);
            return("Works!");

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Not working";
    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public static URI createUri(String resourcePath) {
        try {
            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

}

87ee4971b5736963ad1dcede16329f8f8f3163c9

Missing case PortableDataStream

Missing case PortableDataStream

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.PythonUdf;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;

public class ProcessFeeder<Input, Output> {

    private Socket socket;
    private PythonUdf<Input, Output> udf;
    private ByteString serializedUDF;
    private Iterable<Input> input;

    //TODO add to a config file
    int END_OF_DATA_SECTION = -1;
    int NULL = -5;

    public ProcessFeeder(
            Socket socket,
            PythonUdf<Input, Output> udf,
            ByteString serializedUDF,
            Iterable<Input> input){

        if(input == null) throw new WayangException("Nothing to process with Python API");

        this.socket = socket;
        this.udf = udf;
        this.serializedUDF = serializedUDF;
        this.input = input;

    }

    public void send(){

        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
            DataOutputStream dataOut = new DataOutputStream(stream);

            writeUDF(serializedUDF, dataOut);
            this.writeIteratorToStream(input.iterator(), dataOut);
            dataOut.writeInt(END_OF_DATA_SECTION);
            dataOut.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUDF(ByteString serializedUDF, DataOutputStream dataOut){

        //write(serializedUDF.toByteArray(), dataOut);
        writeBytes(serializedUDF.toByteArray(), dataOut);
        System.out.println("UDF written");

    }

    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut){

        System.out.println("iterator being send");
        for (Iterator<Input> it = iter; it.hasNext(); ) {
            Input elem = it.next();
            //System.out.println(elem.toString());
            write(elem, dataOut);
        }
    }

    /*TODO Missing case PortableDataStream */
    public void write(Object obj, DataOutputStream dataOut){
        try {

            if(obj == null)
                dataOut.writeInt(this.NULL);

            /**
             * Byte Array cases
             */
            else if (obj instanceof Byte[] || obj instanceof byte[]) {
                System.out.println("Writing Bytes");
                writeBytes(obj, dataOut);
            }
            /**
             * String case
             * */
            else if (obj instanceof String)
                writeUTF((String) obj, dataOut);

            /**
             * Key, Value case
             * */
            else if (obj instanceof Map.Entry)
                writeKeyValue((Map.Entry) obj, dataOut);

            else{
                throw new WayangException("Unexpected element type " + obj.getClass());
            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeBytes(Object obj, DataOutputStream dataOut){

        try{

            if (obj instanceof Byte[]) {

                int length = ((Byte[]) obj).length;

                byte[] bytes = new byte[length];
                int j=0;

                // Unboxing Byte values. (Byte[] to byte[])
                for(Byte b: ((Byte[]) obj))
                    bytes[j++] = b.byteValue();

                dataOut.writeInt(length);
                dataOut.write(bytes);

            } else if (obj instanceof byte[]) {

                dataOut.writeInt(((byte[]) obj).length);
                dataOut.write(((byte[]) obj));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUTF(String str, DataOutputStream dataOut){

        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);

        try {

            dataOut.writeInt(bytes.length);
            dataOut.write(bytes);
        } catch (SocketException e){

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){

        write(obj.getKey(), dataOut);
        write(obj.getValue(), dataOut);
    }

}

a39f09115503fc5acbf1b5f8db4e52ba6d47567d

Connect with predecessors requires more details in connection slot

Connect with predecessors requires more details in connection slot

/*TODO Connect with predecessors requires more details in connection slot*/

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.decoder;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.TextFileSink;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.Base64;

public class WayangPlanBuilder {

    private WayangPlan wayangPlan;
    private WayangContext wayangContext;

    public WayangPlanBuilder(FileInputStream planFile){
        try {

            WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);

            this.wayangContext = buildContext(plan);
            this.wayangPlan = buildPlan(plan);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public WayangPlanBuilder(String writtenPlan){

        System.out.println(writtenPlan);
        byte[] message = Base64.getDecoder().decode(writtenPlan);
        System.out.println(message);

        try {
            WayangPlanProto plan = WayangPlanProto.parseFrom(message);

            this.wayangContext = buildContext(plan);
            this.wayangPlan = buildPlan(plan);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                        new MapPartitionsDescriptor<String, String>(
                                new WrappedPythonFunction<String, String>(
                                        l -> l,
                                        operator.getUdf()
                                ),
                                String.class,
                                String.class
                        )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public WayangContext getWayangContext() {
        return wayangContext;
    }

    public WayangPlan getWayangPlan() {
        return wayangPlan;
    }
}

74c68f9b6dfefde05989465bbcac62fa6ebc504d

create documentation to how to the configuration in the code

create documentation to how to the configuration in the code

this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

4a0fbc9208e2cedccea3043c1a899cafbe73e522

Stop testing with Travis CI

Last month was decided to start using Github actions for our DevOps pipelines.
Nevertheless Travis is still being executed in addition to Github Actions.

We should delete Travis CI pipelines from the project

Remove Logs during test mode

It is unnecessary that by default in TEST Mode, there are displayed as a result all the Logs from Flink, Spark, etc.

This should be first configured by the developer.

add to a config file

add to a config file

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.PythonUdf;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;

public class ProcessFeeder<Input, Output> {

    private Socket socket;
    private PythonUdf<Input, Output> udf;
    private ByteString serializedUDF;
    private Iterable<Input> input;

    //TODO add to a config file
    int END_OF_DATA_SECTION = -1;
    int NULL = -5;

    public ProcessFeeder(
            Socket socket,
            PythonUdf<Input, Output> udf,
            ByteString serializedUDF,
            Iterable<Input> input){

        if(input == null) throw new WayangException("Nothing to process with Python API");

        this.socket = socket;
        this.udf = udf;
        this.serializedUDF = serializedUDF;
        this.input = input;

    }

    public void send(){

        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
            DataOutputStream dataOut = new DataOutputStream(stream);

            writeUDF(serializedUDF, dataOut);
            this.writeIteratorToStream(input.iterator(), dataOut);
            dataOut.writeInt(END_OF_DATA_SECTION);
            dataOut.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUDF(ByteString serializedUDF, DataOutputStream dataOut){

        //write(serializedUDF.toByteArray(), dataOut);
        writeBytes(serializedUDF.toByteArray(), dataOut);
        System.out.println("UDF written");

    }

    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut){

        System.out.println("iterator being send");
        for (Iterator<Input> it = iter; it.hasNext(); ) {
            Input elem = it.next();
            //System.out.println(elem.toString());
            write(elem, dataOut);
        }
    }

    /*TODO Missing case PortableDataStream */
    public void write(Object obj, DataOutputStream dataOut){
        try {

            if(obj == null)
                dataOut.writeInt(this.NULL);

            /**
             * Byte Array cases
             */
            else if (obj instanceof Byte[] || obj instanceof byte[]) {
                System.out.println("Writing Bytes");
                writeBytes(obj, dataOut);
            }
            /**
             * String case
             * */
            else if (obj instanceof String)
                writeUTF((String) obj, dataOut);

            /**
             * Key, Value case
             * */
            else if (obj instanceof Map.Entry)
                writeKeyValue((Map.Entry) obj, dataOut);

            else{
                throw new WayangException("Unexpected element type " + obj.getClass());
            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeBytes(Object obj, DataOutputStream dataOut){

        try{

            if (obj instanceof Byte[]) {

                int length = ((Byte[]) obj).length;

                byte[] bytes = new byte[length];
                int j=0;

                // Unboxing Byte values. (Byte[] to byte[])
                for(Byte b: ((Byte[]) obj))
                    bytes[j++] = b.byteValue();

                dataOut.writeInt(length);
                dataOut.write(bytes);

            } else if (obj instanceof byte[]) {

                dataOut.writeInt(((byte[]) obj).length);
                dataOut.write(((byte[]) obj));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUTF(String str, DataOutputStream dataOut){

        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);

        try {

            dataOut.writeInt(bytes.length);
            dataOut.write(bytes);
        } catch (SocketException e){

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){

        write(obj.getKey(), dataOut);
        write(obj.getValue(), dataOut);
    }

}

10c00fe30db45034ba821b8fb31d86cc1dd94b3b

Protoc at Github Action

It required to install the package 'protobuf-compiler' in the Github action before to compile the code

ExecutionPlan.toJsonList add documentation

ExecutionPlan.toJsonList add documentation

labels:documentation,todo

@return

        return sb.toString();
    }

    /**
     * TODO: ExecutionPlan.toJsonList add documentation
     * labels:documentation,todo
     *
     * @return
     */
    public List<Map> toJsonList() {
        Counter<ExecutionStage> stageActivationCounter = new Counter<>();
        Queue<ExecutionStage> activatedStages = new LinkedList<>(this.startingStages);

eb7c56cfe3a29a574d5f0fac8781f9389527d5ea

Connect with predecessors requires more details in connection slot

Connect with predecessors requires more details in connection slot

/*TODO Connect with predecessors requires more details in connection slot*/

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.decoder;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.TextFileSink;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.Base64;

public class WayangPlanBuilder {

    private WayangPlan wayangPlan;
    private WayangContext wayangContext;

    public WayangPlanBuilder(FileInputStream planFile){
        try {

            WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);

            this.wayangContext = buildContext(plan);
            this.wayangPlan = buildPlan(plan);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public WayangPlanBuilder(String writtenPlan){

        System.out.println(writtenPlan);
        byte[] message = Base64.getDecoder().decode(writtenPlan);
        System.out.println(message);

        try {
            WayangPlanProto plan = WayangPlanProto.parseFrom(message);

            this.wayangContext = buildContext(plan);
            this.wayangPlan = buildPlan(plan);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                        new MapPartitionsDescriptor<String, String>(
                                new WrappedPythonFunction<String, String>(
                                        l -> l,
                                        operator.getUdf()
                                ),
                                String.class,
                                String.class
                        )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public WayangContext getWayangContext() {
        return wayangContext;
    }

    public WayangPlan getWayangPlan() {
        return wayangPlan;
    }
}

74c68f9b6dfefde05989465bbcac62fa6ebc504d

cannot be always string, include definition for every operator

cannot be always string, include definition for every operator

like: map(udf, inputtype, outputtype)

/*TODO cannot be always string, include definition for every operator

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Iterator;
/*TODO cannot be always string, include definition for every operator
*  like: map(udf, inputtype, outputtype)*/
public class ProcessReceiver<Output> {

    private ReaderIterator<Output> iterator;

    public ProcessReceiver(Socket socket){
        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
            this.iterator = new ReaderIterator<>(stream);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Iterable<Output> getIterable(){
        return () -> iterator;
    }

    public void print(){
        iterator.forEachRemaining(x -> System.out.println(x.toString()));

    }
}

8abd710475aacb398e08336b85000219e3f1432e

key should be given by "udf"

key should be given by "udf"

UDF specifies reducer function

# TODO key should be given by "udf"

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from orchestrator.operator import Operator
from graph.graph import Graph
from graph.traversal import Traversal
from protobuf.planwriter import MessageWriter
import itertools
import collections
import logging
from functools import reduce
import operator


# Wraps a Source operation to create an iterable
class DataQuantaBuilder:
    def __init__(self, descriptor):
        self.descriptor = descriptor

    def source(self, source):

        if type(source) is str:
            source_ori = open(source, "r")
        else:
            source_ori = source
        return DataQuanta(
            Operator(
                operator_type="source",
                udf=source,
                iterator=iter(source_ori),
                previous=[],
                python_exec=False
            ),
            descriptor=self.descriptor
        )


# Wraps an operation over an iterable
class DataQuanta:
    def __init__(self, operator=None, descriptor=None):
        self.operator = operator
        self.descriptor = descriptor
        if self.operator.is_source():
            self.descriptor.add_source(self.operator)
        if self.operator.is_sink():
            self.descriptor.add_sink(self.operator)

    # Operational Functions
    def filter(self, udf):
        def func(iterator):
            return filter(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="filter",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def flatmap(self, udf):

        def auxfunc(iterator):
            return itertools.chain.from_iterable(map(udf, iterator))

        def func(iterator):
            mapped = map(udf, iterator)
            flattened = flatten_single_dim(mapped)
            yield from flattened

        def flatten_single_dim(mapped):
            for item in mapped:
                for subitem in item:
                    yield subitem

        return DataQuanta(
            Operator(
                operator_type="flatmap",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def group_by(self, udf):
        def func(iterator):
            # TODO key should be given by "udf"
            return itertools.groupby(iterator, key=operator.itemgetter(0))
            #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))

        return DataQuanta(
            Operator(
                operator_type="group_by",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def map(self, udf):
        def func(iterator):
            return map(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="map",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    # Key specifies pivot dimensions
    # UDF specifies reducer function
    def reduce_by_key(self, keys, udf):

        op = Operator(
            operator_type="reduce_by_key",
            udf=udf,
            previous=[self.operator],
            python_exec=False
        )

        print(len(keys), keys)
        for i in range(0, len(keys)):
            """if keys[i] is int:
                op.set_parameter("vector_position|"+str(i), keys[i])
            else:
                op.set_parameter("dimension_key|"+str(i), keys[i])"""

            # TODO maybe would be better just leave the number as key
            op.set_parameter("dimension|"+str(i+1), keys[i])

        return DataQuanta(
            op,
            descriptor=self.descriptor
        )

    def reduce(self, udf):
        def func(iterator):
            return reduce(udf, iterator)

        return DataQuanta(
            Operator(
                operator_type="reduce",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    def sink(self, path, end="\n"):
        def consume(iterator):
            with open(path, 'w') as f:
                for x in iterator:
                    f.write(str(x) + end)

        def func(iterator):
            consume(iterator)
            # return self.__run(consume)

        return DataQuanta(
            Operator(
                operator_type="sink",

                udf=path,
                # To execute directly uncomment
                # udf=func,

                previous=[self.operator],
                python_exec=False
            ),
            descriptor=self.descriptor
        )

    def sort(self, udf):

        def func(iterator):
            return sorted(iterator, key=udf)

        return DataQuanta(
            Operator(
                operator_type="sort",
                udf=func,
                previous=[self.operator],
                python_exec=True
            ),
            descriptor=self.descriptor
        )

    # This function allow the union to be performed by Python
    # Nevertheless, current configuration runs it over Java
    def union(self, other):

        def func(iterator):
            return itertools.chain(iterator, other.operator.getIterator())

        return DataQuanta(
            Operator(
                operator_type="union",
                udf=func,
                previous=[self.operator, other.operator],
                python_exec=False
            ),
            descriptor=self.descriptor
        )

    def __run(self, consumer):
        consumer(self.operator.getIterator())

    # Execution Functions
    def console(self, end="\n"):
        def consume(iterator):
            for x in iterator:
                print(x, end=end)

        self.__run(consume)

    # Only for debugging purposes!
    # To execute the plan directly in the program driver
    def execute(self):
        logging.warn("DEBUG Execution")
        logging.info("Reminder to swap SINK UDF value from path to func")
        logging.debug(self.operator.previous[0].operator_type)
        if self.operator.is_sink():
            logging.debug(self.operator.operator_type)
            logging.debug(self.operator.udf)
            logging.debug(len(self.operator.previous))
            self.operator.udf(self.operator.previous[0].getIterator())
        else:
            logging.error("Plan must call execute from SINK type of operator")
            raise RuntimeError

    # Converts Python Functional Plan to valid Wayang Plan
    def to_wayang_plan(self):

        sinks = self.descriptor.get_sinks()
        if len(sinks) == 0:
            return

        graph = Graph()
        graph.populate(self.descriptor.get_sinks())

        # Uncomment to check the Graph built
        # graph.print_adjlist()

        # Function to be consumed by Traverse
        # Separates Python Plan into a List of Pipelines
        def define_pipelines(node1, current_pipeline, collection):
            def store_unique(pipe_to_insert):
                for pipe in collection:
                    if equivalent_lists(pipe, pipe_to_insert):
                        return
                collection.append(pipe_to_insert)

            def equivalent_lists(l1, l2):
                if collections.Counter(l1) == collections.Counter(l2):
                    return True
                else:
                    return False

            if not current_pipeline:
                current_pipeline = [node1]

            elif node1.operator.is_boundary():
                store_unique(current_pipeline.copy())
                current_pipeline.clear()
                current_pipeline.append(node1)

            else:
                current_pipeline.append(node1)

            if node1.operator.sink:
                store_unique(current_pipeline.copy())
                current_pipeline.clear()

            return current_pipeline

        # Works over the graph
        trans = Traversal(
            graph=graph,
            origin=self.descriptor.get_sources(),
            # udf=lambda x, y, z: d(x, y, z)
            # UDF always will receive:
            # x: a Node object,
            # y: an object representing the result of the last iteration,
            # z: a collection to store final results inside your UDF
            udf=lambda x, y, z: define_pipelines(x, y, z)
        )

        # Gets the results of the traverse process
        collected_stages = trans.get_collected_data()

        # Passing the Stages to a Wayang message writer
        writer = MessageWriter()
        a = 0
        # Stage is composed of class Node objects
        for stage in collected_stages:
            a += 1
            logging.info("///")
            logging.info("stage" + str(a))
            writer.process_pipeline(stage)

        writer.set_dependencies()

        # Uses a file to provide the plan
        # writer.write_message(self.descriptor)

        # Send the plan to Wayang REST api directly
        writer.send_message(self.descriptor)

071d79ff81738e15f7e5cbbf8c883d315e8e1d61

First we must receive the operator + UDF

First we must receive the operator + UDF

print("base64_message")

print(base64_message)

print (func)

for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)

# TODO First we must receive the operator + UDF

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import socket
import struct
import pickle
import base64
import re
import sys


class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
    START_ARROW_STREAM = -6


def read_int(stream):
    length = stream.read(4)
    if not length:
        raise EOFError
    res = struct.unpack("!i", length)[0]
    return res


class UTF8Deserializer:
    """
    Deserializes streams written by String.getBytes.
    """

    def __init__(self, use_unicode=True):
        self.use_unicode = use_unicode

    def loads(self, stream):
        length = read_int(stream)
        if length == SpecialLengths.END_OF_DATA_SECTION:
            raise EOFError
        elif length == SpecialLengths.NULL:
            return None
        s = stream.read(length)
        return s.decode("utf-8") if self.use_unicode else s

    def load_stream(self, stream):
        try:
            while True:
                yield self.loads(stream)
        except struct.error:
            return
        except EOFError:
            return

    def __repr__(self):
        return "UTF8Deserializer(%s)" % self.use_unicode


def write_int(p, outfile):
    outfile.write(struct.pack("!i", p))


def write_with_length(obj, stream):
    serialized = obj.encode('utf-8')
    if serialized is None:
        raise ValueError("serialized value should not be None")
    if len(serialized) > (1 << 31):
        raise ValueError("can not serialize object larger than 2G")
    write_int(len(serialized), stream)
    stream.write(serialized)


def dump_stream(iterator, stream):

    for obj in iterator:
        if type(obj) is str:
            write_with_length(obj, stream)
        ## elif type(obj) is list:
        ##    write_with_length(obj, stream)
    print("Termine")
    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
    print("Escribi Fin")


def process(infile, outfile):
    """udf64 = os.environ["UDF"]
    print("udf64")
    print(udf64)
    #serialized_udf = binascii.a2b_base64(udf64)
    #serialized_udf = base64.b64decode(udf64)
    serialized_udf = bytearray(udf64, encoding='utf-16')
    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
    print("serialized_udf")
    print(serialized_udf)
    # input to be ast.literal_eval(serialized_udf)
    func = pickle.loads(serialized_udf, encoding="bytes")
    print ("func")
    print (func)
    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""



    # TODO First we must receive the operator + UDF
    """udf = lambda elem: elem.lower()

    def func(it):
        return sorted(it, key=udf)"""
    udf_length = read_int(infile)
    print("udf_length")
    print(udf_length)
    serialized_udf = infile.read(udf_length)
    print("serialized_udf")
    print(serialized_udf)
    #base64_message = base64.b64decode(serialized_udf + "===")
    #print("base64_message")
    #print(base64_message)
    func = pickle.loads(serialized_udf)
    #func = ori.lala(serialized_udf)
    #print (func)
    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)

    """print("example")
    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
    iterator = UTF8Deserializer().load_stream(infile)
    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
    out_iter = func(iterator)
    dump_stream(iterator=out_iter, stream=outfile)


def local_connect(port):
    sock = None
    errors = []
    # Support for both IPv4 and IPv6.
    # On most of IPv6-ready systems, IPv6 will take precedence.
    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
        af, socktype, proto, _, sa = res
        try:
            sock = socket.socket(af, socktype, proto)
            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
            sock.settimeout(30)
            sock.connect(sa)
            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
            sockfile = sock.makefile("rwb", 65536)
            # _do_server_auth(sockfile, auth_secret)
            return (sockfile, sock)
        except socket.error as e:
            emsg = str(e)
            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
            sock.close()
            sock = None
    raise Exception("could not open socket: %s" % errors)


if __name__ == '__main__':
    print("Python version")
    print (sys.version)
    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
    sock_file, sock = local_connect(java_port)
    process(sock_file, sock_file)
    sock_file.flush()
    exit()

af039622f101f845343bfb05bdcec8c3ef432251

Here we are temporarily assuming that the user is exclusively sending UTF8. User...

Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types

On most of IPv6-ready systems, IPv6 will take precedence.

# TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import socket
import struct
import pickle
import base64
import re
import sys


class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
    START_ARROW_STREAM = -6


def read_int(stream):
    length = stream.read(4)
    if not length:
        raise EOFError
    res = struct.unpack("!i", length)[0]
    return res


class UTF8Deserializer:
    """
    Deserializes streams written by String.getBytes.
    """

    def __init__(self, use_unicode=True):
        self.use_unicode = use_unicode

    def loads(self, stream):
        length = read_int(stream)
        if length == SpecialLengths.END_OF_DATA_SECTION:
            raise EOFError
        elif length == SpecialLengths.NULL:
            return None
        s = stream.read(length)
        return s.decode("utf-8") if self.use_unicode else s

    def load_stream(self, stream):
        try:
            while True:
                yield self.loads(stream)
        except struct.error:
            return
        except EOFError:
            return

    def __repr__(self):
        return "UTF8Deserializer(%s)" % self.use_unicode


def write_int(p, outfile):
    outfile.write(struct.pack("!i", p))


def write_with_length(obj, stream):
    serialized = obj.encode('utf-8')
    if serialized is None:
        raise ValueError("serialized value should not be None")
    if len(serialized) > (1 << 31):
        raise ValueError("can not serialize object larger than 2G")
    write_int(len(serialized), stream)
    stream.write(serialized)


def dump_stream(iterator, stream):

    for obj in iterator:
        if type(obj) is str:
            write_with_length(obj, stream)
        ## elif type(obj) is list:
        ##    write_with_length(obj, stream)
    print("Termine")
    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
    print("Escribi Fin")


def process(infile, outfile):
    """udf64 = os.environ["UDF"]
    print("udf64")
    print(udf64)
    #serialized_udf = binascii.a2b_base64(udf64)
    #serialized_udf = base64.b64decode(udf64)
    serialized_udf = bytearray(udf64, encoding='utf-16')
    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
    print("serialized_udf")
    print(serialized_udf)
    # input to be ast.literal_eval(serialized_udf)
    func = pickle.loads(serialized_udf, encoding="bytes")
    print ("func")
    print (func)
    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""



    # TODO First we must receive the operator + UDF
    """udf = lambda elem: elem.lower()

    def func(it):
        return sorted(it, key=udf)"""
    udf_length = read_int(infile)
    print("udf_length")
    print(udf_length)
    serialized_udf = infile.read(udf_length)
    print("serialized_udf")
    print(serialized_udf)
    #base64_message = base64.b64decode(serialized_udf + "===")
    #print("base64_message")
    #print(base64_message)
    func = pickle.loads(serialized_udf)
    #func = ori.lala(serialized_udf)
    #print (func)
    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)

    """print("example")
    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
    iterator = UTF8Deserializer().load_stream(infile)
    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
    out_iter = func(iterator)
    dump_stream(iterator=out_iter, stream=outfile)


def local_connect(port):
    sock = None
    errors = []
    # Support for both IPv4 and IPv6.
    # On most of IPv6-ready systems, IPv6 will take precedence.
    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
        af, socktype, proto, _, sa = res
        try:
            sock = socket.socket(af, socktype, proto)
            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
            sock.settimeout(30)
            sock.connect(sa)
            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
            sockfile = sock.makefile("rwb", 65536)
            # _do_server_auth(sockfile, auth_secret)
            return (sockfile, sock)
        except socket.error as e:
            emsg = str(e)
            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
            sock.close()
            sock = None
    raise Exception("could not open socket: %s" % errors)


if __name__ == '__main__':
    print("Python version")
    print (sys.version)
    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
    sock_file, sock = local_connect(java_port)
    process(sock_file, sock_file)
    sock_file.flush()
    exit()

f57e9e4e72ff5497ee6a645385c8fc0b3a44d5ba

Connect with predecessors requires more details in connection slot

Connect with predecessors requires more details in connection slot

/*TODO Connect with predecessors requires more details in connection slot*/

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.general;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;


@RestController
public class WayangController {

    @GetMapping("/plan/create/fromfile")
    public String planFromFile(
            //@RequestParam("file") MultipartFile file
    ){

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);

            /*TODO ADD id to executions*/
            wpb.getWayangContext().execute(wpb.getWayangPlan());

        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Builder works";
    }

    @PostMapping("/plan/create")
    public String planFromMessage(
            @RequestParam("message") String message
    ){

        WayangPlanBuilder wpb = new WayangPlanBuilder(message);

        /*TODO ADD id to executions*/
        wpb.getWayangContext().execute(wpb.getWayangPlan());

        return "";
    }

    @GetMapping("/")
    public String all(){
        System.out.println("detected!");

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);

            WayangContext wc = buildContext(plan);
            WayangPlan wp = buildPlan(plan);

            System.out.println("Plan!");
            System.out.println(wp.toString());

            wc.execute(wp);
            return("Works!");

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Not working";
    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public static URI createUri(String resourcePath) {
        try {
            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

}

46f8cdedd25da6c3205060ebd781dafe78222516

add unitary test to the elements in the file org.apache.wayang.api.PlanBuilder.s...

add unitary test to the elements in the file org.apache.wayang.api.PlanBuilder.scala

* TODO: add unitary test to the elements in the file org.apache.wayang.api.PlanBuilder.scala

 */

package org.apache.wayang.api
/**
 * TODO: add unitary test to the elements in the file org.apache.wayang.api.PlanBuilder.scala
 * labels: unitary-test,todo
 */
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record

16a4914ccf4698994676780dfe1ffcde45ca495e

create documentation to how to the configuration in the code

create documentation to how to the configuration in the code

this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

4a0fbc9208e2cedccea3043c1a899cafbe73e522

cannot be always string, include definition for every operator

cannot be always string, include definition for every operator

like: map(udf, inputtype, outputtype)

/*TODO cannot be always string, include definition for every operator

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Iterator;
/*TODO cannot be always string, include definition for every operator
*  like: map(udf, inputtype, outputtype)*/
public class ProcessReceiver<Output> {

    private ReaderIterator<Output> iterator;

    public ProcessReceiver(Socket socket){
        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8192;

            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
            this.iterator = new ReaderIterator<>(stream);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Iterable<Output> getIterable(){
        return () -> iterator;
    }

    public void print(){
        iterator.forEachRemaining(x -> System.out.println(x.toString()));

    }
}

8abd710475aacb398e08336b85000219e3f1432e

First we must receive the operator + UDF

First we must receive the operator + UDF

print("base64_message")

print(base64_message)

print (func)

for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)

# TODO First we must receive the operator + UDF

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import socket
import struct
import pickle
import base64
import re
import sys


class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
    START_ARROW_STREAM = -6


def read_int(stream):
    length = stream.read(4)
    if not length:
        raise EOFError
    res = struct.unpack("!i", length)[0]
    return res


class UTF8Deserializer:
    """
    Deserializes streams written by String.getBytes.
    """

    def __init__(self, use_unicode=True):
        self.use_unicode = use_unicode

    def loads(self, stream):
        length = read_int(stream)
        if length == SpecialLengths.END_OF_DATA_SECTION:
            raise EOFError
        elif length == SpecialLengths.NULL:
            return None
        s = stream.read(length)
        return s.decode("utf-8") if self.use_unicode else s

    def load_stream(self, stream):
        try:
            while True:
                yield self.loads(stream)
        except struct.error:
            return
        except EOFError:
            return

    def __repr__(self):
        return "UTF8Deserializer(%s)" % self.use_unicode


def write_int(p, outfile):
    outfile.write(struct.pack("!i", p))


def write_with_length(obj, stream):
    serialized = obj.encode('utf-8')
    if serialized is None:
        raise ValueError("serialized value should not be None")
    if len(serialized) > (1 << 31):
        raise ValueError("can not serialize object larger than 2G")
    write_int(len(serialized), stream)
    stream.write(serialized)


def dump_stream(iterator, stream):

    for obj in iterator:
        if type(obj) is str:
            write_with_length(obj, stream)
        ## elif type(obj) is list:
        ##    write_with_length(obj, stream)
    print("Termine")
    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
    print("Escribi Fin")


def process(infile, outfile):
    """udf64 = os.environ["UDF"]
    print("udf64")
    print(udf64)
    #serialized_udf = binascii.a2b_base64(udf64)
    #serialized_udf = base64.b64decode(udf64)
    serialized_udf = bytearray(udf64, encoding='utf-16')
    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
    print("serialized_udf")
    print(serialized_udf)
    # input to be ast.literal_eval(serialized_udf)
    func = pickle.loads(serialized_udf, encoding="bytes")
    print ("func")
    print (func)
    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""



    # TODO First we must receive the operator + UDF
    """udf = lambda elem: elem.lower()

    def func(it):
        return sorted(it, key=udf)"""
    udf_length = read_int(infile)
    print("udf_length")
    print(udf_length)
    serialized_udf = infile.read(udf_length)
    print("serialized_udf")
    print(serialized_udf)
    #base64_message = base64.b64decode(serialized_udf + "===")
    #print("base64_message")
    #print(base64_message)
    func = pickle.loads(serialized_udf)
    #func = ori.lala(serialized_udf)
    #print (func)
    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)

    """print("example")
    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
    iterator = UTF8Deserializer().load_stream(infile)
    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
    out_iter = func(iterator)
    dump_stream(iterator=out_iter, stream=outfile)


def local_connect(port):
    sock = None
    errors = []
    # Support for both IPv4 and IPv6.
    # On most of IPv6-ready systems, IPv6 will take precedence.
    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
        af, socktype, proto, _, sa = res
        try:
            sock = socket.socket(af, socktype, proto)
            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
            sock.settimeout(30)
            sock.connect(sa)
            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
            sockfile = sock.makefile("rwb", 65536)
            # _do_server_auth(sockfile, auth_secret)
            return (sockfile, sock)
        except socket.error as e:
            emsg = str(e)
            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
            sock.close()
            sock = None
    raise Exception("could not open socket: %s" % errors)


if __name__ == '__main__':
    print("Python version")
    print (sys.version)
    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
    sock_file, sock = local_connect(java_port)
    process(sock_file, sock_file)
    sock_file.flush()
    exit()

af039622f101f845343bfb05bdcec8c3ef432251

ADD id to executions

ADD id to executions

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.rest.server.spring.general;

import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.WrappedPythonFunction;
import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
import org.apache.wayang.basic.operators.*;
import org.apache.wayang.commons.serializable.OperatorProto;
import org.apache.wayang.commons.serializable.PlanProto;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.function.MapPartitionsDescriptor;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import org.apache.wayang.commons.serializable.WayangPlanProto;
import org.springframework.web.multipart.MultipartFile;


@RestController
public class WayangController {

    @GetMapping("/plan/create/fromfile")
    public String planFromFile(
            //@RequestParam("file") MultipartFile file
    ){

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);

            /*TODO ADD id to executions*/
            wpb.getWayangContext().execute(wpb.getWayangPlan());

        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Builder works";
    }

    @PostMapping("/plan/create")
    public String planFromMessage(
            @RequestParam("message") String message
    ){

        WayangPlanBuilder wpb = new WayangPlanBuilder(message);

        /*TODO ADD id to executions*/
        wpb.getWayangContext().execute(wpb.getWayangPlan());

        return "";
    }

    @GetMapping("/")
    public String all(){
        System.out.println("detected!");

        try {
            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);

            WayangContext wc = buildContext(plan);
            WayangPlan wp = buildPlan(plan);

            System.out.println("Plan!");
            System.out.println(wp.toString());

            wc.execute(wp);
            return("Works!");

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "Not working";
    }

    private WayangContext buildContext(WayangPlanProto plan){

        WayangContext ctx = new WayangContext();
        plan.getContext().getPlatformsList().forEach(platform -> {
            if (platform.getNumber() == 0)
                ctx.with(Java.basicPlugin());
            else if (platform.getNumber() == 1)
                ctx.with(Spark.basicPlugin());
        });
        //ctx.with(Spark.basicPlugin());

        return ctx;
    }

    private WayangPlan buildPlan(WayangPlanProto plan){

        System.out.println(plan);

        PlanProto planProto = plan.getPlan();
        LinkedList<OperatorProto> protoList = new LinkedList<>();
        planProto.getSourcesList().forEach(protoList::addLast);

        Map<String, OperatorBase> operators = new HashMap<>();
        List<OperatorBase> sinks = new ArrayList<>();
        while(! protoList.isEmpty()) {

            OperatorProto proto = protoList.pollFirst();

            /* Checking if protoOperator can be connected to the current WayangPlan*/
            boolean processIt;
            if(proto.getType().equals("source")) processIt = true;

            else {
                /* Checking if ALL predecessors were already processed */
                processIt = true;
                for(String predecessor : proto.getPredecessorsList()){
                    if (!operators.containsKey(predecessor)) {
                        processIt = false;
                        break;
                    }
                }
            }

            /* Operators should not be processed twice*/
            if(operators.containsKey(proto.getId())) processIt = false;

            if(processIt) {

                /* Create and store Wayang operator */
                OperatorBase operator = createOperatorByType(proto);
                operators.put(proto.getId(), operator);

                /*TODO Connect with predecessors requires more details in connection slot*/
                int order = 0;
                for (String pre_id : proto.getPredecessorsList()) {

                    OperatorBase predecessor = operators.get(pre_id);
                    /* Only works without replicate topology */
                    predecessor.connectTo(0, operator, order);
                    order++;

                    if(proto.getType().equals("sink")){
                        sinks.add(operator);
                        //if(!sinks.contains(operator)) {
                        //    sinks.add(operator);
                        //}
                    }
                }

                /*List of OperatorProto successors
                 * They will be added to the protoList
                 * nevertheless they must be processed only if the parents are in operators list */
                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : listSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
                        .stream()
                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
                        .collect(Collectors.toList());
                for (OperatorProto successor : sinkSuccessors){
                    if(!protoList.contains(successor)){
                        protoList.addLast(successor);
                    }
                }

            } else {

                /* In case we cannot process it yet, It must be added again at the end*/
                protoList.addLast(proto);
            }
        }

        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
        return wayangPlan;
    }

    public OperatorBase createOperatorByType(OperatorProto operator){

        System.out.println("Typo: " + operator.getType());
        switch(operator.getType()){
            case "source":
                try {
                    String source_path = operator.getPath();
                    URL url = new File(source_path).toURI().toURL();
                    return new TextFileSource(url.toString());
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "sink":
                try {
                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "reduce_by_key":
                try {
                    /* Function to be applied in Python workers */
                    ByteString function = operator.getUdf();

                    /* Has dimension or positions that compose GroupKey */
                    Map<String, String> parameters = operator.getParametersMap();

                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
                        operator.getParametersMap(),
                        operator.getUdf() ,
                        String.class,
                        String.class,
                            false
                    );

                    String sink_path = operator.getPath();
                    URL url = new File(sink_path).toURI().toURL();
                    return new TextFileSink<String>(
                            url.toString(),
                            String.class
                    );

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
                break;
            case "map_partition":
                return new MapPartitionsOperator<>(
                    new MapPartitionsDescriptor<String, String>(
                        new WrappedPythonFunction<String, String>(
                            l -> l,
                            operator.getUdf()
                        ),
                        String.class,
                        String.class
                    )
                );

            case "union":
                return new UnionAllOperator<String>(
                        String.class
                );

        }

        throw new WayangException("Operator Type not supported");
    }

    public static URI createUri(String resourcePath) {
        try {
            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

}

b85732398b37855b6b38d657127ec8b731d0cb6c

Here we are temporarily assuming that the user is exclusively sending UTF8. User...

Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types

On most of IPv6-ready systems, IPv6 will take precedence.

# TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import socket
import struct
import pickle
import base64
import re
import sys


class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
    START_ARROW_STREAM = -6


def read_int(stream):
    length = stream.read(4)
    if not length:
        raise EOFError
    res = struct.unpack("!i", length)[0]
    return res


class UTF8Deserializer:
    """
    Deserializes streams written by String.getBytes.
    """

    def __init__(self, use_unicode=True):
        self.use_unicode = use_unicode

    def loads(self, stream):
        length = read_int(stream)
        if length == SpecialLengths.END_OF_DATA_SECTION:
            raise EOFError
        elif length == SpecialLengths.NULL:
            return None
        s = stream.read(length)
        return s.decode("utf-8") if self.use_unicode else s

    def load_stream(self, stream):
        try:
            while True:
                yield self.loads(stream)
        except struct.error:
            return
        except EOFError:
            return

    def __repr__(self):
        return "UTF8Deserializer(%s)" % self.use_unicode


def write_int(p, outfile):
    outfile.write(struct.pack("!i", p))


def write_with_length(obj, stream):
    serialized = obj.encode('utf-8')
    if serialized is None:
        raise ValueError("serialized value should not be None")
    if len(serialized) > (1 << 31):
        raise ValueError("can not serialize object larger than 2G")
    write_int(len(serialized), stream)
    stream.write(serialized)


def dump_stream(iterator, stream):

    for obj in iterator:
        if type(obj) is str:
            write_with_length(obj, stream)
        ## elif type(obj) is list:
        ##    write_with_length(obj, stream)
    print("Termine")
    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
    print("Escribi Fin")


def process(infile, outfile):
    """udf64 = os.environ["UDF"]
    print("udf64")
    print(udf64)
    #serialized_udf = binascii.a2b_base64(udf64)
    #serialized_udf = base64.b64decode(udf64)
    serialized_udf = bytearray(udf64, encoding='utf-16')
    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
    print("serialized_udf")
    print(serialized_udf)
    # input to be ast.literal_eval(serialized_udf)
    func = pickle.loads(serialized_udf, encoding="bytes")
    print ("func")
    print (func)
    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""



    # TODO First we must receive the operator + UDF
    """udf = lambda elem: elem.lower()

    def func(it):
        return sorted(it, key=udf)"""
    udf_length = read_int(infile)
    print("udf_length")
    print(udf_length)
    serialized_udf = infile.read(udf_length)
    print("serialized_udf")
    print(serialized_udf)
    #base64_message = base64.b64decode(serialized_udf + "===")
    #print("base64_message")
    #print(base64_message)
    func = pickle.loads(serialized_udf)
    #func = ori.lala(serialized_udf)
    #print (func)
    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)

    """print("example")
    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
    iterator = UTF8Deserializer().load_stream(infile)
    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
    out_iter = func(iterator)
    dump_stream(iterator=out_iter, stream=outfile)


def local_connect(port):
    sock = None
    errors = []
    # Support for both IPv4 and IPv6.
    # On most of IPv6-ready systems, IPv6 will take precedence.
    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
        af, socktype, proto, _, sa = res
        try:
            sock = socket.socket(af, socktype, proto)
            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
            sock.settimeout(30)
            sock.connect(sa)
            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
            sockfile = sock.makefile("rwb", 65536)
            # _do_server_auth(sockfile, auth_secret)
            return (sockfile, sock)
        except socket.error as e:
            emsg = str(e)
            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
            sock.close()
            sock = None
    raise Exception("could not open socket: %s" % errors)


if __name__ == '__main__':
    print("Python version")
    print (sys.version)
    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
    sock_file, sock = local_connect(java_port)
    process(sock_file, sock_file)
    sock_file.flush()
    exit()

f57e9e4e72ff5497ee6a645385c8fc0b3a44d5ba

should NOT be assigned an specific port, set port as 0 (zero)

should NOT be assigned an specific port, set port as 0 (zero)

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

public class PythonProcessCaller {

    private Process process;
    private Socket socket;
    private ServerSocket serverSocket;
    private boolean ready;

    //TODO How to get the config
    private Configuration configuration;

    public PythonProcessCaller(ByteString serializedUDF){

        //TODO create documentation to how to the configuration in the code
        this.configuration = new Configuration("file:///Users/rodrigopardomeza/wayang/incubator-wayang/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties");
        this.ready = false;
        byte[] addr = new byte[4];
        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;

        try {
            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
            ProcessBuilder pb = new ProcessBuilder(
                    Arrays.asList(
                            "python3",
                            this.configuration.getStringProperty("wayang.api.python.worker")
                    )
            );
            Map<String, String> workerEnv = pb.environment();
            workerEnv.put("PYTHON_WORKER_FACTORY_PORT", String.valueOf(this.serverSocket.getLocalPort()));

            // TODO See what is happening with ENV Python version
            workerEnv.put("PYTHONPATH", "/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/:/Users/rodrigopardomeza/opt/anaconda3/");

            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            this.process = pb.start();


            // Redirect worker stdout and stderr
            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

            // Wait for it to connect to our socket
            this.serverSocket.setSoTimeout(10000);

            try {
                this.socket = this.serverSocket.accept();
                this.serverSocket.setSoTimeout(0);

                if(socket.isConnected())
                    this.ready = true;

            } catch (Exception e) {
                System.out.println(e);
                throw new WayangException("Python worker failed to connect back.", e);
            }
        } catch (Exception e){
            System.out.println(e);
            throw new WayangException("Python worker failed");
        }
    }

    public Process getProcess() {
        return process;
    }

    public Socket getSocket() {
        return socket;
    }

    public boolean isReady(){
        return ready;
    }

    public void close(){
        try {
            this.process.destroy();
            this.socket.close();
            this.serverSocket.close();
            System.out.println("Everything closed");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

e53ef66982b0ddda9c06e9e07b72b10c30fe48b0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.