Giter VIP home page Giter VIP logo

pycarol's Issues

Use connector name for all API calls instead of ID.

We should always use connector name, when let pycarol get the ID given the name since connector name is immutable between carol apps.

conn = Connectors(login).get_by_name(connector_name)
connector_id = conn['mdmId']

New parameter for sendData function

It would be good if instead of deleting a data model from carol with code before sending another one, we could just set a parameter equals True if we want to delete.
Ex.:

from pycarol.staging import Staging
staging = Staging(carol)
staging.sendData(staging_name = 'my_stag', data = json_ex, step_size = 2,
               connector_id=connectorId, print_stats = True, delete_dm = True)

Automatically detect when a task is modified.

We can have a option in the pipeline that reuses only the targets for which the task was not changed.
For this purpose, we can use the module "inspect" to recursively inspect all the members used in that task. For all this members we can generate a hash from their bytecodes, and include this hash as a luigi parameter.

This parameter should be exclusive for each task. Then, we can choose to propagate it or not using "inherit_list". If we propagate it, a modification in one task would force the run of all depending tasks (like a remove_upstream), which is quite nice. In this case, we just need to implement a clean up procedure to not overuse the storage.

Create retry policies

today we dont have any tool to avoid retry or connection problems that could be solved with some kind of retry

from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter

def retry_session(retries, session=None, backoff_factor=0.3, status_forcelist=(500, 502, 503, 504)):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

session = retry_session(retries=5)
session.post(url=endpoint, data=json.dumps(x), headers=headers)

Less than 3 namespace error

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/web.py", line 1511, in _execute
    result = yield result
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/views/autoload_js_handler.py", line 26, in get
    session = yield self.get_session()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/contexts.py", line 185, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 134, in modify_document
    self._func(doc)
  File "/home/rafael/pyCarol/pycarol/luigi_extension/taskviewer.py", line 142, in plot_task
    cm = self.get_colormapper()
  File "/home/rafael/pyCarol/pycarol/luigi_extension/taskviewer.py", line 129, in get_colormapper
    return CategoricalColorMapper(factors=factors, palette=Category10[nb_factors])
KeyError: 1

Task Visualization

Many times when debugging or even to generate reports, we get the results from a task and create some visualizations to interpret the results. The idea is to have it implemented inside the task, in a way that it is easy to reuse the same visualization. Something like:

task = TaskA(**params)
task.visualize.plot_churn_vs_clv()
task.visualize.plot_churn_classification()
...

To implement that, I defined a main class for the visualization, with some common methods and a configurations, e.g. calling task, whether it is a jupyter output or another thing, plt axis, plot_histogram, etc... and specific classes for the different kinds of visualization that a task might have. E.g.:

from pycarol.luigi_extension import Visualization

ModelChurnVisualization(Visualization):
    def plot_churn_vs_clv(self, param_a=None, param_b ...):
         ...

In the Task:

ClassifyChurn(Task):
    ...
    visualize = ModelChurnVisualization

To implement it like I mentioned, we must have it added to the Target:

class PyCarolTarget(luigi.Target):
    def __init__(self, task, *args, **kwargs):
        ...
        self.visualize = task.visualize

I was doing like that but it is still under development, if you guys think of a better way to implement that we should discuss before actually adding it to pycarol, but I really believe it will help a lot when our apps are in production and it is a nice way to reuse the work of creating visualizations easily.

Carol Init

Carol init is like this

 login = Carol(os.environ['CAROLTENANT'],
               APP_NAME,
               auth=ApiKeyAuth(os.environ['CAROLAPPOAUTH']),
               connector_id=os.environ['CAROLCONNECTORID'])

This is wrong.. Not a matter of taste. It should be like this

 login = Carol(os.environ['CAROLTENANT'],
               APP_NAME,
               auth=os.environ['CAROLAPPOAUTH'],
               connector_id=os.environ['CAROLCONNECTORID'])

If possible, we should drop APP_NAME

 login = Carol(os.environ['CAROLTENANT'],
               auth=os.environ['CAROLAPPOAUTH'],
               connector_id=os.environ['CAROLCONNECTORID'])

Still better, if it were like this

 login = Carol(os.environ['CAROLTENANT'],
               X-Auth-Key=os.environ['CAROLAPPOAUTH'],
               X-Auth-ConnectorId=os.environ['CAROLCONNECTORID'])

easy to fix, but I can't create a branch here

Online Apps: Easier Pattern for Module Injection File

Modules that are stored in sub-folders/paths today have to be referenced as ai-script.run_me.py so the lib can properly import them.

I know this is the default way in python to handle the files, but would be nice to add some kind of self.module_name.replaces("/",".") in the function, so the user in the JSON can refer like this ai-script/run_me.py and not ai-script.run_me.py.

self.imported_module = import_module(f'{self.file_path}{self.module_name}')

Create builder for filters / named queries

We should be able to write filters like on backend:

FilterQueryObject query = new FilterQueryObject.Builder()
            .must(new FilterType(QueryFilterType.TYPE_FILTER, TypeConstants.VALUE, entityType))
            .should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_NAME, "*" + name + "*"))
            .should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_ANSWER_MODEL + "." + SkillAnswerModel.NLP_VOICE_MESSAGE, "*" + name + "*"))
            .should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_EXAMPLE_QUESTION, "*" + name + "*"))
                .build();

SearchResults<Skill> results = searcher(tenantId)
            .pageSize(pageSize)
            .offset(offset)
            .type(entityType).query(query).all().search(SearchHitEntityTransformers.SKILL);

Remove old unused branches

Ideally, we should always remove branches that are related to previous versions, like 2.3-test
Besides, we should take care when naming branches with version. If we create branch 2.6, it will not be possible to make a release named 2.6.
In this cases, I use 2.6dev

Load Datamodel from S3

This function is prepared for when the bucket is in this structure:
carol-tenantid/data/golden/data_model_name/

In the meanwhile, it can be called:
df = loadDataModel('docol', 'itemmanufacturing')

Code:

import boto3
import s3fs
import os
import pandas

def loadDataModel(tenantId, dataModel):
    """
    Load Specific DataModel from S3 Bucket to a Pandas Dataframe

    It will use the following structure:
        bucket-name: 'carolina-data'
        path: <tenantId>/data/golden/<dataModel>
    """

    bucket_name = 'carolina-data'
    s3 = s3fs.S3FileSystem(anon=False) 

    df_full = None

    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(bucket_name)
    dataModelPath = os.path.join(tenantId, 'data/golden', dataModel)

    print(dataModelPath)

    for key in bucket.objects.all():
        if (key.key.startswith(dataModelPath)):
            file = os.path.join(bucket_name, key.key)

            with s3.open(file, 'rb') as f:
                df = pandas.read_json(f, lines=True)
                if df_full is None:
                    df_full = df
                else:
                    df_full = pandas.concat([df_full, df], ignore_index=True)
    return df_full

Send Data to staging is not working

There are two issues:

  • when passing a crosswalk list, we are still fetching the crosswalk from the tenant
  • when fetching the crosswalk, we are getting list of list, and the pandas duplicated inside assert breaks

image

Online Apps: Validate if module exist

If the online app try to import some inexistent module, we don't do any validation on top of it, creating a false sense that the app is working and it isn't.

For example, if we inform to import some file that doesn't exist in the filesystem, the app start (Flask starts and get the healthchecks running) and the exposed APIs doesn't work.

def _dynamic_import(self):
try:
self.imported_module = import_module(f'{self.file_path}{self.module_name}')
except Exception as e:
self._log_append(f'Problem when importing file. Module: {self.module_name}. Error: {str(e)}')

Remove luigi build

Today we have the following line that starts a luigi build everytime we import luigi_extension:

luigi.build([], workers=1, local_scheduler=True)

As I understand, it was intended to solve the problem of duplicated logging. If that is the case, I believe it is possible to solve this through the configuration file for the logging. E.g.

[loggers]
keys=root,luigi,app,pycarol

[handlers]
keys=consoleHandler

[formatters]
keys=simpleFormatter

[logger_root]
level=INFO
handlers=consoleHandler

[logger_luigi]
level=DEBUG
handlers=consoleHandler
qualname=luigi-interface
propagate=0

[logger_pycarol]
level=DEBUG
handlers=consoleHandler
qualname=pycarol
propagate=0

[logger_app]
level=DEBUG
handlers=consoleHandler
qualname=app
propagate=0

[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)

[formatter_simpleFormatter]
format=%(name)s: %(levelname)s: %(message)s

If this solves for everybody, we can remove now. If the problem persists, I think we should take the time to investigate better why is this happening.

Online Apps: Base URL

The base path for the implemented APIs are hard to hit, and since every online app has it's own endpoint (DNS and URL), we can rely on the default path /, so the user can implement this way their APIs:

Today: https://<tenant>-<app>-<version>-<process>.online.carol.ai/<tenant>/<app>/<version>/<process>/api/entities

Improved: https://<tenant>-<app>-<version>-<process>.online.carol.ai/api/entities

base_url = f'{self.domain}/{self.app_name}/{self.app_version}/{self.online_name}'
@flask.route('/', methods=['GET','POST'])
def base():
return f'Running! Use http:// .../{base_url}/api/(endpoint)'
@flask.route(f'/{base_url}/api/<prediction_path>', methods=['GET','POST'])

Create schema without sending.

The following method of Staging should be split in two

def create_schema(self, fields_dict, staging_name,connector_id=None, mdm_flexible='false',
                     crosswalk_name=None, crosswalk_list=None, overwrite=False):
        assert fields_dict is not None



        if isinstance(fields_dict, list):
            fields_dict = fields_dict[0]

        if isinstance(fields_dict, dict):
            schema = carolSchemaGenerator(fields_dict)
            schema = schema.to_dict(mdmStagingType=staging_name, mdmFlexible=mdm_flexible,
                                    crosswalkname=crosswalk_name,crosswalkList=crosswalk_list)
        elif isinstance(fields_dict, str):
            schema = carolSchemaGenerator.from_json(fields_dict)
            schema = schema.to_dict(mdmStagingType=staging_name, mdmFlexible=mdm_flexible,
                                    crosswalkname=crosswalk_name, crosswalkList=crosswalk_list)
        else:
            print('Behavior for type %s not defined!' % type(fields_dict))

        query_string = {"connectorId": connector_id}
        if connector_id is None:
            connector_id = self.carol.connector_id
            query_string = {"connectorId": connector_id}

        has_schema = self.get_schema(staging_name,connector_id=connector_id) is not None
        if has_schema:
            method = 'PUT'
        else:
            method = 'POST'

        resp = self.carol.call_api('v2/staging/tables/{}/schema'.format(staging_name), data=schema, method=method,
                                   params=query_string)
        if resp.get('mdmId'):
            print('Schema sent successfully!')
        else:
            print('Failed to send schema: ' + resp)

auth Class access

Please merge all files of auth in one single file.
This access is extremely confusing.

error handle using decorators

We should create a decorator to handle the REST response. Avoiding have to repeat

if not self.lastResponse.ok:
    # error handler for token
    if self.lastResponse.reason == 'Unauthorized':
        self.token_object.refreshToken()
        self.headers = {'Authorization': self.token_object.access_token, 'Content-Type': 'application/json'}
        continue

all the time.
This decorator must have a persist parameter (how many times retry). Maybe some delay for consecutive requests after error and an option to call different functions for a given error code/msg

CustomTarget for very specific load/dump methods in Luigi Extension

I am facing a problem where I need to save a model but it does not support the pickle process. The library that implements this model already provides a method to save the file, but it has some logic in it. One way to benefit from the luigi_extension's features for target load/dump would be to create a new Target with the provided logic. That way would work, but I think it would become messy with time, as more specific patterns for load/dump will appear in the future. Another way would be to not use luigi_extension's Target, but then I lose the benefits of using a luigi_extension's Target.

The proposal is to create a CustomTarget, since the LocalTarget already receives the Task related to the Target, and, in the Task, the user would define the load and dump methods for this CustomTarget that could be named target_load and target_dump, something like that. That way, there is a generic form of defining targets directly (and related to) the task's processes.

E.g.

class MyTask(Task):
TARGET = CustomLocalTarget
def easy_run(self, inputs):
...
return model

target_load(self):
    ... # logic for the loading process, very specific for a few use cases

target_dump(self, data):
    .... # logic for the dump process, very specific for a few use cases

Can I implement that or do you think there is an easier more reliable path? @rafonseca @rafarui

Luigi/Taskviewer: Persist stdout of tasks on S3

When a target is persisted on S3, we have access to the result, the target, but we dont have any info about the execution that generated this target.
Note that this is different from logging, because we only want to save the information associated with the respective target.

Make luigi extension compatible with `temporary_path`

temporary_path is not working, it seems that when the task is complete, it rewrite the file with None as output.

Here an ex.

    def easy_run(self, inputs):  

        train, test, eval_train, user_index_test = inputs[0]
        space = [Integer(5, 50,name = 'epochs'), # epochs
                 Real(0.05, 0.1, 'log-uniform', name='learning_rate'), # learning_rate
                 Integer(15, 100, name='no_components'), # no_components
                 Real(10**-8, 10**-5, 'log-uniform',name='user_alpha'), # user_alpha
                 Real(10**-8, 10**-5, 'log-uniform',name='item_alpha'), # item_alpha
                 Integer(5, 15,name = 'max_sampled'),
                ]
        
        
  
        
        with self.output().temporary_path() as self.temp_output_path:
            checkpoint_saver = skopt_stopper.CheckpointSaver(self.temp_output_path) 
            delta_earlystop = DeltaYStopper(DELTA,N_BEST)
            res_fm = gp_minimize(objective, space, n_calls=N_CALLS,
                                     random_state=SEED_SKOPT,
                                     verbose=True, callback=[checkpoint_saver, delta_earlystop])

Option to use pycarol storage on google cloud

We are going to take some time to move everything to GCP. Until then, I think we could have an option on pycarol storage to use AWS or GCP cloud. This would be very useful to leave the luigi targets in the same cloud/region, when we start to run our notebooks and pipelines on GCP.

To avoid API changes, we could do this setting through a env variable. If this env variable is present, then we use GCP, otherwise we keep using AWS. This would no affect any project.

@wmsouza , @amalucelli , @rafarui . what do you think ?

Create on luigi-extension a ForceableTask

Create on luigi-extension a FoarceableTask capable of running a task even if its targets already exist. Like this:

`class ForceableTask(luigi.Task):

force = luigi.BoolParameter(significant=False, default=False)

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    # To force execution, we just remove all outputs before `complete()` is called
    if self.force is True:
        outputs = luigi.task.flatten(self.output())
        for out in outputs:
            if out.exists():
                os.remove(self.output().path)`

add DM consumer funciton

the example seems to be wrong

Map<String, Map<String, Long>>
a map of id to a map of id/long. I think you have to confirm the counter that you read, not only the id
because the record might have changed
right
Map<String, Long> goldenRecordIdWithCounter
@rafael.rui you'll have to get the counter for the record (mdmCounterForEntity)
and send like

   "aaa": 150,
   "bcd": 151,
}
}```
...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.