Giter VIP home page Giter VIP logo

pycarol's Introduction

PyCarol

Getting Started

Run pip install pycarol to install the latest stable version from PyPI. Documentation for the latest release is hosted on readthedocs.

This will install the minimal dependencies. To install pyCarol with the dataframes dependencies use pip install pycarol[dataframe], or to install with dask+pipeline dependencies use pip install pycarol[pipeline,dask]

The options we have are: complete, dataframe, onlineapp, dask, pipeline

To install from source:

  1. pip install -r requirements.txt to install the minimal requirements;
  2. pip install -e . ".[dev]" to install the minimal requirements + dev libs;
  3. pip install -e . ".[pipeline]" to install the minimal requirements + pipelines dependencies;
  4. pip install -e . ".[complete]" to install all dependencies;

Initializing pyCarol

Carol is the main object to access pyCarol and all Carol's APIs.

from pycarol import PwdAuth, Carol
carol = Carol(domain=TENANT_NAME, app_name=APP_NAME,
              auth=PwdAuth(USERNAME, PASSWORD), organization=ORGANIZATION)

where domain is the tenant name, app_name is the Carol's app name, if any, auth is the authentication method to be used (using user/password in this case) and organization is the organization one wants to connect. Carols's URL is build as www.ORGANIZATION.carol.ai/TENANT_NAME

It is also possible to initialize the object with a token generated via user/password. This is useful when creating an online app that interacts with Carol

from pycarol import PwdKeyAuth, Carol
carol = Carol(domain=TENANT_NAME, app_name=APP_NAME,
              auth=PwdKeyAuth(pwd_auth_token), organization=ORGANIZATION)

Using API Key

To use API keys instead of username and password:

from pycarol import ApiKeyAuth, Carol

carol = Carol(domain=DOMAIN,
              app_name=APP_NAME,
              auth=ApiKeyAuth(api_key=X_AUTH_KEY),
              connector_id=CONNECTOR, organization=ORGANIZATION)

In this case one changes the authentication method to ApiKeyAuth. Noticed that one needs to pass the connector_id too. An API key is always associated to a connector ID.

It is possible to use pyCarol to generate an API key

from pycarol import PwdAuth, ApiKeyAuth, Carol

carol = Carol(domain=TENANT_NAME, app_name=APP_NAME, organization=ORGANIZATION,
              auth=PwdAuth(USERNAME, PASSWORD), connector_id=CONNECTOR)
api_key = carol.issue_api_key()

print(f"This is a API key {api_key['X-Auth-Key']}")
print(f"This is the connector Id {api_key['X-Auth-ConnectorId']}")

To get the details of the API key you can do:

details = carol.api_key_details(APIKEY, CONNECTORID)

Finally, to revoke an API key:

carol.api_key_revoke(CONNECTORID)

Good practice using token

Never write in plain text your password/API token in your application. Use environment variables. pyCarol can use environment variables automatically. When none parameter is passed to the Carol constructor pycarol will look for:

  1. CAROLTENANT for domain
  2. CAROLAPPNAME for app_name
  3. CAROL_DOMAIN for environment
  4. CAROLORGANIZATION for organization
  5. CAROLAPPOAUTH for auth
  6. CAROLCONNECTORID for connector_id
  7. CAROLUSER for carol user email
  8. CAROLPWD for user password.

e.g., one can create a .env file like this:

CAROLAPPNAME=myApp
CAROLTENANT=myTenant
CAROLORGANIZATION=myOrganization
CAROLAPPOAUTH=myAPIKey
CAROLCONNECTORID=myConnector

and then

from pycarol import Carol
from dotenv import load_dotenv
load_dotenv(".env") #this will import these env variables to your execution.
carol = Carol()

Ingesting data

From both Staging Tables and Data Models (CDS Layer)

Use this method when you need to read most of the records and columns from the source.

from pycarol import Carol, Staging

staging = Staging(Carol())
df = staging.fetch_parquet(
    staging_name="execution_history",
    connector_name="model"
)

From both Staging Tables and Data Models (BQ Layer)

Use this method when you need to read only a subset of records and columns or when data transformation is needed.

from pycarol import BQ, Carol

bq = BQ(Carol())
query_str = "SELECT * FROM stg_connectorname_table_name"
results = bq.query(query_str)

In case one needs a service account with access to BigQuery, the following code can be used:

from pycarol import Carol
from pycarol.bigquery import TokenManager

tm = TokenManager(Carol())
service_account = tm.get_token().service_account

After each execution of BQ.query, the BQ object will have an attribute called job. This attribute is of type bigquery.job.query.QueryJob and may be useful for monitoring/debug jobs.

PyCarol provides access to BigQuery Storage API also. It allows for much faster reading times, but with limited querying capabilities. For instance, only tables are readable, so 'ingestion_stg_model_deep_audit' is ok, but 'stg_model_deep_audit' is not (it is a view).

from pycarol import BQStorage, Carol

bq = BQStorage(Carol())
table_name = "ingestion_stg_model_deep_audit"
col_names = ["request_id", "version"]
restriction = "branch = '01'"
sample_size = 1000
df = bq.query(
    table_name,
    col_names,
    row_restriction=restriction,
    sample_percentage=sample_size,
    return_dataframe=True
)

From Data Models (RT Layer): Filter queries

Use this when you need low latency (only if RT layer is enabled).

from pycarol.filter import TYPE_FILTER, TERM_FILTER, Filter
from pycarol import Query
json_query = Filter.Builder() \
    .must(TYPE_FILTER(value='ratings' + "Golden")) \
    .must(TERM_FILTER(key='mdmGoldenFieldAndValues.userid.raw',value='123'))\
    .build().to_json()

FIELDS_ITEMS = ['mdmGoldenFieldAndValues.mdmaddress.coordinates']
query = Query(carol, page_size=10, print_status=True, only_hits=True,
              fields=FIELDS_ITEMS, max_hits=200).query(json_query).go()
query.results

The result will be 200 hits of the query json_query above, the pagination will be 10, that means in each response there will be 10 records. The query will return only the fields set in FIELDS_ITEMS.

The parameter only_hits = True will make sure that only records into the path $hits.mdmGoldenFieldAndValues will return. If one wants all the response use only_hits = False. Also, if your filter has an aggregation, one should use only_hits = False and get_aggs=True, e.g.,

from pycarol import Query
from pycarol.filter import TYPE_FILTER, Filter, CARDINALITY

json_query = Filter.Builder() \
    .must(TYPE_FILTER(value='datamodelname' + "Golden")) \
    .aggregation(CARDINALITY(name='cardinality', params = ["mdmGoldenFieldAndValues.taxid.raw"], size=40))\
    .build().to_json()

query = Query(carol, get_aggs=True, only_hits=False)
query.query(json_query).go()
query.results

From Data Models (RT Layer): Named queries

from pycarol import Query

named_query = 'revenueHist'  # named query name
params = {"bin":"1d","cnpj":"24386434000130"}  #query parameters to send.
results = Query(carol).named(named_query, params=params).go().results

It is possible to use all the parameters used in the filter query, i.e., only_hits , save_results, etc. For more information for the possible input parameters check the docstring.

What if one does not remember the parameters for a given named query?

named_query = 'revenueHist'  # named query name
Query(carol).named_query_params(named_query)
> {'revenueHist': ['*cnpj', 'dateFrom', 'dateTo', '*bin']}  #Parameters starting by * are mandatory.

Sending data

The first step to send data to Carol is to create a connector.

from pycarol import Connectors
connector_id = Connectors(carol).create(name='my_connector', label="connector_label", group_name="GroupName")
print(f"This is the connector id: {connector_id}")

With the connector Id on hands we can create the staging schema and then create the staging table. Assuming we have a sample of the data we want to send.

from pycarol import Staging

json_ex = {"name":'Rafael',"email": {"type": "email", "email": '[email protected]'} }

staging = Staging(carol)
staging.create_schema(staging_name='my_stag', data = json_ex,
                      crosswalk_name= 'my_crosswalk' ,crosswalk_list=['name'],
                        connector_name='my_connector')

The json schema will be in the variable schema.schema. The code above will create the following schema:

{
  'mdmCrosswalkTemplate': {
    'mdmCrossreference': {
      'my_crosswalk': [
        'name'
      ]
    }
  },
  'mdmFlexible': 'false',
  'mdmStagingMapping': {
    'properties': {
      'email': {
        'properties': {
          'email': {
            'type': 'string'
          },
          'type': {
            'type': 'string'
          }
        },
        'type': 'nested'
      },
      'name': {
        'type': 'string'
      }
    }
  },
  'mdmStagingType': 'my_stag'
}

To send the data (assuming we have a json with the data we want to send).

from pycarol import Staging

json_ex = [{"name":'Rafael',"email": {"type": "email", "email": '[email protected]'}   },
           {"name":'Leandro',"email": {"type": "email", "email": '[email protected]'}   },
           {"name":'Joao',"email": {"type": "email", "email": '[email protected]'}   },
           {"name":'Marcelo',"email": {"type": "email", "email": '[email protected]'}   }]


staging = Staging(carol)
staging.send_data(staging_name = 'my_stag', data = json_ex, step_size = 2,
                 connector_id=connectorId, print_stats = True)

The parameter step_size says how many registers will be sent each time. Remember the the max size per payload is 5MB. The parameter data can be a pandas DataFrame.

OBS: It is not possible to create a mapping using pycarol. The Mapping has to be done via the UI

Logging

To log messages to Carol:

from pycarol import Carol, CarolHandler
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
carol = CarolHandler(Carol())
carol.setLevel(logging.INFO)
logger.addHandler(carol)

logger.debug('This is a debug message') #This will not be logged in Carol. Level is set to INFO
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')

These methods will use the current long task id provided by Carol when running your application. For local environments you need to set that manually first on the beginning of your code:

import os
os.environ['LONGTASKID'] = task_id

We recommend to log only INFO+ information in Carol. If no TASK ID is passed it works as a Console Handler.

Settings

We can use pyCarol to access the settings of your Carol App.

from pycarol.apps import Apps
app = Apps(carol)
settings = app.get_settings(app_name='my_app')
print(settings)

The settings will be returned as a dictionary where the keys are the parameter names and the values are the value for that parameter. Please note that your app must be created in Carol.

Useful Functions

  1. track_tasks: Track a list of tasks.
from pycarol import Carol
from pycarol.functions import track_tasks
carol = Carol()
def callback(task_list):
  print(task_list)
track_tasks(carol=carol, task_list=['task_id_1', 'task_id_2'], callback=callback)

Release process

  1. Open a PR with your change for master branch;
  2. Once approved, merge into master;
  3. In case there are any changes to the default release notes, please update them

pycarol's People

Contributors

amalucelli avatar bvolpato avatar caarlos0 avatar darolt avatar felipebmendes avatar ffjacob avatar gabrielgonzaga avatar guilhermespadaccia avatar gzamboni avatar helloitu avatar juvenalduarte avatar luizm avatar machseven avatar pedrobuzzi avatar rafarui avatar rafonseca avatar tdonato avatar wmsouza avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pycarol's Issues

Option to use pycarol storage on google cloud

We are going to take some time to move everything to GCP. Until then, I think we could have an option on pycarol storage to use AWS or GCP cloud. This would be very useful to leave the luigi targets in the same cloud/region, when we start to run our notebooks and pipelines on GCP.

To avoid API changes, we could do this setting through a env variable. If this env variable is present, then we use GCP, otherwise we keep using AWS. This would no affect any project.

@wmsouza , @amalucelli , @rafarui . what do you think ?

Carol Init

Carol init is like this

 login = Carol(os.environ['CAROLTENANT'],
               APP_NAME,
               auth=ApiKeyAuth(os.environ['CAROLAPPOAUTH']),
               connector_id=os.environ['CAROLCONNECTORID'])

This is wrong.. Not a matter of taste. It should be like this

 login = Carol(os.environ['CAROLTENANT'],
               APP_NAME,
               auth=os.environ['CAROLAPPOAUTH'],
               connector_id=os.environ['CAROLCONNECTORID'])

If possible, we should drop APP_NAME

 login = Carol(os.environ['CAROLTENANT'],
               auth=os.environ['CAROLAPPOAUTH'],
               connector_id=os.environ['CAROLCONNECTORID'])

Still better, if it were like this

 login = Carol(os.environ['CAROLTENANT'],
               X-Auth-Key=os.environ['CAROLAPPOAUTH'],
               X-Auth-ConnectorId=os.environ['CAROLCONNECTORID'])

easy to fix, but I can't create a branch here

Load Datamodel from S3

This function is prepared for when the bucket is in this structure:
carol-tenantid/data/golden/data_model_name/

In the meanwhile, it can be called:
df = loadDataModel('docol', 'itemmanufacturing')

Code:

import boto3
import s3fs
import os
import pandas

def loadDataModel(tenantId, dataModel):
    """
    Load Specific DataModel from S3 Bucket to a Pandas Dataframe

    It will use the following structure:
        bucket-name: 'carolina-data'
        path: <tenantId>/data/golden/<dataModel>
    """

    bucket_name = 'carolina-data'
    s3 = s3fs.S3FileSystem(anon=False) 

    df_full = None

    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(bucket_name)
    dataModelPath = os.path.join(tenantId, 'data/golden', dataModel)

    print(dataModelPath)

    for key in bucket.objects.all():
        if (key.key.startswith(dataModelPath)):
            file = os.path.join(bucket_name, key.key)

            with s3.open(file, 'rb') as f:
                df = pandas.read_json(f, lines=True)
                if df_full is None:
                    df_full = df
                else:
                    df_full = pandas.concat([df_full, df], ignore_index=True)
    return df_full

Make luigi extension compatible with `temporary_path`

temporary_path is not working, it seems that when the task is complete, it rewrite the file with None as output.

Here an ex.

    def easy_run(self, inputs):  

        train, test, eval_train, user_index_test = inputs[0]
        space = [Integer(5, 50,name = 'epochs'), # epochs
                 Real(0.05, 0.1, 'log-uniform', name='learning_rate'), # learning_rate
                 Integer(15, 100, name='no_components'), # no_components
                 Real(10**-8, 10**-5, 'log-uniform',name='user_alpha'), # user_alpha
                 Real(10**-8, 10**-5, 'log-uniform',name='item_alpha'), # item_alpha
                 Integer(5, 15,name = 'max_sampled'),
                ]
        
        
  
        
        with self.output().temporary_path() as self.temp_output_path:
            checkpoint_saver = skopt_stopper.CheckpointSaver(self.temp_output_path) 
            delta_earlystop = DeltaYStopper(DELTA,N_BEST)
            res_fm = gp_minimize(objective, space, n_calls=N_CALLS,
                                     random_state=SEED_SKOPT,
                                     verbose=True, callback=[checkpoint_saver, delta_earlystop])

Create retry policies

today we dont have any tool to avoid retry or connection problems that could be solved with some kind of retry

from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter

def retry_session(retries, session=None, backoff_factor=0.3, status_forcelist=(500, 502, 503, 504)):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

session = retry_session(retries=5)
session.post(url=endpoint, data=json.dumps(x), headers=headers)

Use connector name for all API calls instead of ID.

We should always use connector name, when let pycarol get the ID given the name since connector name is immutable between carol apps.

conn = Connectors(login).get_by_name(connector_name)
connector_id = conn['mdmId']

Remove old unused branches

Ideally, we should always remove branches that are related to previous versions, like 2.3-test
Besides, we should take care when naming branches with version. If we create branch 2.6, it will not be possible to make a release named 2.6.
In this cases, I use 2.6dev

Less than 3 namespace error

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/web.py", line 1511, in _execute
    result = yield result
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/views/autoload_js_handler.py", line 26, in get
    session = yield self.get_session()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/contexts.py", line 185, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
    h.modify_document(doc)
  File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 134, in modify_document
    self._func(doc)
  File "/home/rafael/pyCarol/pycarol/luigi_extension/taskviewer.py", line 142, in plot_task
    cm = self.get_colormapper()
  File "/home/rafael/pyCarol/pycarol/luigi_extension/taskviewer.py", line 129, in get_colormapper
    return CategoricalColorMapper(factors=factors, palette=Category10[nb_factors])
KeyError: 1

Online Apps: Validate if module exist

If the online app try to import some inexistent module, we don't do any validation on top of it, creating a false sense that the app is working and it isn't.

For example, if we inform to import some file that doesn't exist in the filesystem, the app start (Flask starts and get the healthchecks running) and the exposed APIs doesn't work.

def _dynamic_import(self):
try:
self.imported_module = import_module(f'{self.file_path}{self.module_name}')
except Exception as e:
self._log_append(f'Problem when importing file. Module: {self.module_name}. Error: {str(e)}')

auth Class access

Please merge all files of auth in one single file.
This access is extremely confusing.

Luigi/Taskviewer: Persist stdout of tasks on S3

When a target is persisted on S3, we have access to the result, the target, but we dont have any info about the execution that generated this target.
Note that this is different from logging, because we only want to save the information associated with the respective target.

CustomTarget for very specific load/dump methods in Luigi Extension

I am facing a problem where I need to save a model but it does not support the pickle process. The library that implements this model already provides a method to save the file, but it has some logic in it. One way to benefit from the luigi_extension's features for target load/dump would be to create a new Target with the provided logic. That way would work, but I think it would become messy with time, as more specific patterns for load/dump will appear in the future. Another way would be to not use luigi_extension's Target, but then I lose the benefits of using a luigi_extension's Target.

The proposal is to create a CustomTarget, since the LocalTarget already receives the Task related to the Target, and, in the Task, the user would define the load and dump methods for this CustomTarget that could be named target_load and target_dump, something like that. That way, there is a generic form of defining targets directly (and related to) the task's processes.

E.g.

class MyTask(Task):
TARGET = CustomLocalTarget
def easy_run(self, inputs):
...
return model

target_load(self):
    ... # logic for the loading process, very specific for a few use cases

target_dump(self, data):
    .... # logic for the dump process, very specific for a few use cases

Can I implement that or do you think there is an easier more reliable path? @rafonseca @rafarui

error handle using decorators

We should create a decorator to handle the REST response. Avoiding have to repeat

if not self.lastResponse.ok:
    # error handler for token
    if self.lastResponse.reason == 'Unauthorized':
        self.token_object.refreshToken()
        self.headers = {'Authorization': self.token_object.access_token, 'Content-Type': 'application/json'}
        continue

all the time.
This decorator must have a persist parameter (how many times retry). Maybe some delay for consecutive requests after error and an option to call different functions for a given error code/msg

add DM consumer funciton

the example seems to be wrong

Map<String, Map<String, Long>>
a map of id to a map of id/long. I think you have to confirm the counter that you read, not only the id
because the record might have changed
right
Map<String, Long> goldenRecordIdWithCounter
@rafael.rui you'll have to get the counter for the record (mdmCounterForEntity)
and send like

   "aaa": 150,
   "bcd": 151,
}
}```
...

New parameter for sendData function

It would be good if instead of deleting a data model from carol with code before sending another one, we could just set a parameter equals True if we want to delete.
Ex.:

from pycarol.staging import Staging
staging = Staging(carol)
staging.sendData(staging_name = 'my_stag', data = json_ex, step_size = 2,
               connector_id=connectorId, print_stats = True, delete_dm = True)

Create schema without sending.

The following method of Staging should be split in two

def create_schema(self, fields_dict, staging_name,connector_id=None, mdm_flexible='false',
                     crosswalk_name=None, crosswalk_list=None, overwrite=False):
        assert fields_dict is not None



        if isinstance(fields_dict, list):
            fields_dict = fields_dict[0]

        if isinstance(fields_dict, dict):
            schema = carolSchemaGenerator(fields_dict)
            schema = schema.to_dict(mdmStagingType=staging_name, mdmFlexible=mdm_flexible,
                                    crosswalkname=crosswalk_name,crosswalkList=crosswalk_list)
        elif isinstance(fields_dict, str):
            schema = carolSchemaGenerator.from_json(fields_dict)
            schema = schema.to_dict(mdmStagingType=staging_name, mdmFlexible=mdm_flexible,
                                    crosswalkname=crosswalk_name, crosswalkList=crosswalk_list)
        else:
            print('Behavior for type %s not defined!' % type(fields_dict))

        query_string = {"connectorId": connector_id}
        if connector_id is None:
            connector_id = self.carol.connector_id
            query_string = {"connectorId": connector_id}

        has_schema = self.get_schema(staging_name,connector_id=connector_id) is not None
        if has_schema:
            method = 'PUT'
        else:
            method = 'POST'

        resp = self.carol.call_api('v2/staging/tables/{}/schema'.format(staging_name), data=schema, method=method,
                                   params=query_string)
        if resp.get('mdmId'):
            print('Schema sent successfully!')
        else:
            print('Failed to send schema: ' + resp)

Create on luigi-extension a ForceableTask

Create on luigi-extension a FoarceableTask capable of running a task even if its targets already exist. Like this:

`class ForceableTask(luigi.Task):

force = luigi.BoolParameter(significant=False, default=False)

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    # To force execution, we just remove all outputs before `complete()` is called
    if self.force is True:
        outputs = luigi.task.flatten(self.output())
        for out in outputs:
            if out.exists():
                os.remove(self.output().path)`

Send Data to staging is not working

There are two issues:

  • when passing a crosswalk list, we are still fetching the crosswalk from the tenant
  • when fetching the crosswalk, we are getting list of list, and the pandas duplicated inside assert breaks

image

Remove luigi build

Today we have the following line that starts a luigi build everytime we import luigi_extension:

luigi.build([], workers=1, local_scheduler=True)

As I understand, it was intended to solve the problem of duplicated logging. If that is the case, I believe it is possible to solve this through the configuration file for the logging. E.g.

[loggers]
keys=root,luigi,app,pycarol

[handlers]
keys=consoleHandler

[formatters]
keys=simpleFormatter

[logger_root]
level=INFO
handlers=consoleHandler

[logger_luigi]
level=DEBUG
handlers=consoleHandler
qualname=luigi-interface
propagate=0

[logger_pycarol]
level=DEBUG
handlers=consoleHandler
qualname=pycarol
propagate=0

[logger_app]
level=DEBUG
handlers=consoleHandler
qualname=app
propagate=0

[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)

[formatter_simpleFormatter]
format=%(name)s: %(levelname)s: %(message)s

If this solves for everybody, we can remove now. If the problem persists, I think we should take the time to investigate better why is this happening.

Create builder for filters / named queries

We should be able to write filters like on backend:

FilterQueryObject query = new FilterQueryObject.Builder()
            .must(new FilterType(QueryFilterType.TYPE_FILTER, TypeConstants.VALUE, entityType))
            .should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_NAME, "*" + name + "*"))
            .should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_ANSWER_MODEL + "." + SkillAnswerModel.NLP_VOICE_MESSAGE, "*" + name + "*"))
            .should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_EXAMPLE_QUESTION, "*" + name + "*"))
                .build();

SearchResults<Skill> results = searcher(tenantId)
            .pageSize(pageSize)
            .offset(offset)
            .type(entityType).query(query).all().search(SearchHitEntityTransformers.SKILL);

Online Apps: Base URL

The base path for the implemented APIs are hard to hit, and since every online app has it's own endpoint (DNS and URL), we can rely on the default path /, so the user can implement this way their APIs:

Today: https://<tenant>-<app>-<version>-<process>.online.carol.ai/<tenant>/<app>/<version>/<process>/api/entities

Improved: https://<tenant>-<app>-<version>-<process>.online.carol.ai/api/entities

base_url = f'{self.domain}/{self.app_name}/{self.app_version}/{self.online_name}'
@flask.route('/', methods=['GET','POST'])
def base():
return f'Running! Use http:// .../{base_url}/api/(endpoint)'
@flask.route(f'/{base_url}/api/<prediction_path>', methods=['GET','POST'])

Online Apps: Easier Pattern for Module Injection File

Modules that are stored in sub-folders/paths today have to be referenced as ai-script.run_me.py so the lib can properly import them.

I know this is the default way in python to handle the files, but would be nice to add some kind of self.module_name.replaces("/",".") in the function, so the user in the JSON can refer like this ai-script/run_me.py and not ai-script.run_me.py.

self.imported_module = import_module(f'{self.file_path}{self.module_name}')

Automatically detect when a task is modified.

We can have a option in the pipeline that reuses only the targets for which the task was not changed.
For this purpose, we can use the module "inspect" to recursively inspect all the members used in that task. For all this members we can generate a hash from their bytecodes, and include this hash as a luigi parameter.

This parameter should be exclusive for each task. Then, we can choose to propagate it or not using "inherit_list". If we propagate it, a modification in one task would force the run of all depending tasks (like a remove_upstream), which is quite nice. In this case, we just need to implement a clean up procedure to not overuse the storage.

Task Visualization

Many times when debugging or even to generate reports, we get the results from a task and create some visualizations to interpret the results. The idea is to have it implemented inside the task, in a way that it is easy to reuse the same visualization. Something like:

task = TaskA(**params)
task.visualize.plot_churn_vs_clv()
task.visualize.plot_churn_classification()
...

To implement that, I defined a main class for the visualization, with some common methods and a configurations, e.g. calling task, whether it is a jupyter output or another thing, plt axis, plot_histogram, etc... and specific classes for the different kinds of visualization that a task might have. E.g.:

from pycarol.luigi_extension import Visualization

ModelChurnVisualization(Visualization):
    def plot_churn_vs_clv(self, param_a=None, param_b ...):
         ...

In the Task:

ClassifyChurn(Task):
    ...
    visualize = ModelChurnVisualization

To implement it like I mentioned, we must have it added to the Target:

class PyCarolTarget(luigi.Target):
    def __init__(self, task, *args, **kwargs):
        ...
        self.visualize = task.visualize

I was doing like that but it is still under development, if you guys think of a better way to implement that we should discuss before actually adding it to pycarol, but I really believe it will help a lot when our apps are in production and it is a nice way to reuse the work of creating visualizations easily.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.