totvslabs / pycarol Goto Github PK
View Code? Open in Web Editor NEWPython projects for Carol
Python projects for Carol
We should always use connector name, when let pycarol get the ID given the name since connector name is immutable between carol apps.
conn = Connectors(login).get_by_name(connector_name)
connector_id = conn['mdmId']
It would be good if instead of deleting a data model from carol with code before sending another one, we could just set a parameter equals True if we want to delete.
Ex.:
from pycarol.staging import Staging
staging = Staging(carol)
staging.sendData(staging_name = 'my_stag', data = json_ex, step_size = 2,
connector_id=connectorId, print_stats = True, delete_dm = True)
We can have a option in the pipeline that reuses only the targets for which the task was not changed.
For this purpose, we can use the module "inspect" to recursively inspect all the members used in that task. For all this members we can generate a hash from their bytecodes, and include this hash as a luigi parameter.
This parameter should be exclusive for each task. Then, we can choose to propagate it or not using "inherit_list". If we propagate it, a modification in one task would force the run of all depending tasks (like a remove_upstream), which is quite nice. In this case, we just need to implement a clean up procedure to not overuse the storage.
today we dont have any tool to avoid retry or connection problems that could be solved with some kind of retry
from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter
def retry_session(retries, session=None, backoff_factor=0.3, status_forcelist=(500, 502, 503, 504)):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
session = retry_session(retries=5)
session.post(url=endpoint, data=json.dumps(x), headers=headers)
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/web.py", line 1511, in _execute
result = yield result
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/views/autoload_js_handler.py", line 26, in get
session = yield self.get_session()
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 43, in get_session
session = yield self.application_context.create_session_if_needed(session_id, self.request)
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
yielded = self.gen.send(value)
File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/server/contexts.py", line 185, in create_session_if_needed
self._application.initialize_document(doc)
File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/application/application.py", line 179, in initialize_document
h.modify_document(doc)
File "/opt/anaconda3/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 134, in modify_document
self._func(doc)
File "/home/rafael/pyCarol/pycarol/luigi_extension/taskviewer.py", line 142, in plot_task
cm = self.get_colormapper()
File "/home/rafael/pyCarol/pycarol/luigi_extension/taskviewer.py", line 129, in get_colormapper
return CategoricalColorMapper(factors=factors, palette=Category10[nb_factors])
KeyError: 1
A swagger can be used and implemented in the base route to expose to the user all the implemented APIs that the online app can handle.
caps should be reserved for constants.
Moreover, we have many things named simply target. target_type or something similar is more meaninful.
What do you think @gabrielgonzaga , @rafarui ?
Many times when debugging or even to generate reports, we get the results from a task and create some visualizations to interpret the results. The idea is to have it implemented inside the task, in a way that it is easy to reuse the same visualization. Something like:
task = TaskA(**params)
task.visualize.plot_churn_vs_clv()
task.visualize.plot_churn_classification()
...
To implement that, I defined a main class for the visualization, with some common methods and a configurations, e.g. calling task, whether it is a jupyter output or another thing, plt axis, plot_histogram, etc... and specific classes for the different kinds of visualization that a task might have. E.g.:
from pycarol.luigi_extension import Visualization
ModelChurnVisualization(Visualization):
def plot_churn_vs_clv(self, param_a=None, param_b ...):
...
In the Task:
ClassifyChurn(Task):
...
visualize = ModelChurnVisualization
To implement it like I mentioned, we must have it added to the Target:
class PyCarolTarget(luigi.Target):
def __init__(self, task, *args, **kwargs):
...
self.visualize = task.visualize
I was doing like that but it is still under development, if you guys think of a better way to implement that we should discuss before actually adding it to pycarol, but I really believe it will help a lot when our apps are in production and it is a nice way to reuse the work of creating visualizations easily.
Carol init is like this
login = Carol(os.environ['CAROLTENANT'],
APP_NAME,
auth=ApiKeyAuth(os.environ['CAROLAPPOAUTH']),
connector_id=os.environ['CAROLCONNECTORID'])
This is wrong.. Not a matter of taste. It should be like this
login = Carol(os.environ['CAROLTENANT'],
APP_NAME,
auth=os.environ['CAROLAPPOAUTH'],
connector_id=os.environ['CAROLCONNECTORID'])
If possible, we should drop APP_NAME
login = Carol(os.environ['CAROLTENANT'],
auth=os.environ['CAROLAPPOAUTH'],
connector_id=os.environ['CAROLCONNECTORID'])
Still better, if it were like this
login = Carol(os.environ['CAROLTENANT'],
X-Auth-Key=os.environ['CAROLAPPOAUTH'],
X-Auth-ConnectorId=os.environ['CAROLCONNECTORID'])
easy to fix, but I can't create a branch here
This way we can get the staging data.
{ "mustList": [ { "mdmFilterType": "TYPE_FILTER", "mdmValue": "producteanMaster" } ] }
In version 2.7, we will no longer support old style for defining targets
old style:
class DataSet(Task):
set_target = Task.keras_target
new style:
class DataSet(Task):
TARGET = KerasLocalTarget
@gabrielgonzaga , @rafarui , @tdonato , @ayeright , @marielen , @GuilhermeSpadaccia
Today when it is passed a data frame we convert it to JSON before sending. We should convert only the part being sent
Modules that are stored in sub-folders/paths today have to be referenced as ai-script.run_me.py
so the lib can properly import them.
I know this is the default way in python
to handle the files, but would be nice to add some kind of self.module_name.replaces("/",".")
in the function, so the user in the JSON can refer like this ai-script/run_me.py
and not ai-script.run_me.py
.
pyCarol/pycarol/app/online_api.py
Line 76 in 92259dc
We should be able to write filters like on backend:
FilterQueryObject query = new FilterQueryObject.Builder()
.must(new FilterType(QueryFilterType.TYPE_FILTER, TypeConstants.VALUE, entityType))
.should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_NAME, "*" + name + "*"))
.should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_ANSWER_MODEL + "." + SkillAnswerModel.NLP_VOICE_MESSAGE, "*" + name + "*"))
.should(new FilterType(QueryFilterType.WILDCARD_FILTER, Skill.NLP_EXAMPLE_QUESTION, "*" + name + "*"))
.build();
SearchResults<Skill> results = searcher(tenantId)
.pageSize(pageSize)
.offset(offset)
.type(entityType).query(query).all().search(SearchHitEntityTransformers.SKILL);
Return the query results after call .newQuery(), .namedQuery() etc.
Currently we build taskviewer graph with only one task. We should allow to build from a list of tasks like luigi.build
Ideally, we should always remove branches that are related to previous versions, like 2.3-test
Besides, we should take care when naming branches with version. If we create branch 2.6, it will not be possible to make a release named 2.6.
In this cases, I use 2.6dev
This function is prepared for when the bucket is in this structure:
carol-tenantid/data/golden/data_model_name/
In the meanwhile, it can be called:
df = loadDataModel('docol', 'itemmanufacturing')
Code:
import boto3
import s3fs
import os
import pandas
def loadDataModel(tenantId, dataModel):
"""
Load Specific DataModel from S3 Bucket to a Pandas Dataframe
It will use the following structure:
bucket-name: 'carolina-data'
path: <tenantId>/data/golden/<dataModel>
"""
bucket_name = 'carolina-data'
s3 = s3fs.S3FileSystem(anon=False)
df_full = None
s3_resource = boto3.resource('s3')
bucket = s3_resource.Bucket(bucket_name)
dataModelPath = os.path.join(tenantId, 'data/golden', dataModel)
print(dataModelPath)
for key in bucket.objects.all():
if (key.key.startswith(dataModelPath)):
file = os.path.join(bucket_name, key.key)
with s3.open(file, 'rb') as f:
df = pandas.read_json(f, lines=True)
if df_full is None:
df_full = df
else:
df_full = pandas.concat([df_full, df], ignore_index=True)
return df_full
https://github.com/google/yapf/
It is a python formatter. If we always use it, we will have a unified style and reviewing PRs will be much easier.
this method returns a dict containing only the parameters related to that task.
If the online app try to import some inexistent module, we don't do any validation on top of it, creating a false sense that the app is working and it isn't.
For example, if we inform to import some file that doesn't exist in the filesystem, the app start (Flask starts and get the healthchecks running) and the exposed APIs doesn't work.
pyCarol/pycarol/app/online_api.py
Lines 74 to 78 in bb562d7
Remove or move test related stuff to dev branch until it is implemented
Need to see why no error is shown until all async tasks are done
Today Date is sent as a string.
Can we close old luigi-extension repo?
@rafarui , @gabrielgonzaga , @tdonato , @GuilhermeSpadaccia , @ayeright , @marielen , @wmsouza
Today we have the following line that starts a luigi build everytime we import luigi_extension:
luigi.build([], workers=1, local_scheduler=True)
As I understand, it was intended to solve the problem of duplicated logging. If that is the case, I believe it is possible to solve this through the configuration file for the logging. E.g.
[loggers]
keys=root,luigi,app,pycarol
[handlers]
keys=consoleHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=INFO
handlers=consoleHandler
[logger_luigi]
level=DEBUG
handlers=consoleHandler
qualname=luigi-interface
propagate=0
[logger_pycarol]
level=DEBUG
handlers=consoleHandler
qualname=pycarol
propagate=0
[logger_app]
level=DEBUG
handlers=consoleHandler
qualname=app
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)
[formatter_simpleFormatter]
format=%(name)s: %(levelname)s: %(message)s
If this solves for everybody, we can remove now. If the problem persists, I think we should take the time to investigate better why is this happening.
this default logger is already implemented in carol-ds-education. we should make it available for all projects
The base path for the implemented APIs are hard to hit, and since every online app has it's own endpoint (DNS and URL), we can rely on the default path /
, so the user can implement this way their APIs:
Today: https://<tenant>-<app>-<version>-<process>.online.carol.ai/<tenant>/<app>/<version>/<process>/api/entities
Improved: https://<tenant>-<app>-<version>-<process>.online.carol.ai/api/entities
pyCarol/pycarol/app/online_api.py
Lines 99 to 105 in bb562d7
Today a ...
is present to the user, when the Host
header of the HTTP connection can be used to provide the final URL.
In flask
, something like the one provided here can be used to solve this:
pyCarol/pycarol/app/online_api.py
Line 103 in bb562d7
The following method of Staging should be split in two
def create_schema(self, fields_dict, staging_name,connector_id=None, mdm_flexible='false',
crosswalk_name=None, crosswalk_list=None, overwrite=False):
assert fields_dict is not None
if isinstance(fields_dict, list):
fields_dict = fields_dict[0]
if isinstance(fields_dict, dict):
schema = carolSchemaGenerator(fields_dict)
schema = schema.to_dict(mdmStagingType=staging_name, mdmFlexible=mdm_flexible,
crosswalkname=crosswalk_name,crosswalkList=crosswalk_list)
elif isinstance(fields_dict, str):
schema = carolSchemaGenerator.from_json(fields_dict)
schema = schema.to_dict(mdmStagingType=staging_name, mdmFlexible=mdm_flexible,
crosswalkname=crosswalk_name, crosswalkList=crosswalk_list)
else:
print('Behavior for type %s not defined!' % type(fields_dict))
query_string = {"connectorId": connector_id}
if connector_id is None:
connector_id = self.carol.connector_id
query_string = {"connectorId": connector_id}
has_schema = self.get_schema(staging_name,connector_id=connector_id) is not None
if has_schema:
method = 'PUT'
else:
method = 'POST'
resp = self.carol.call_api('v2/staging/tables/{}/schema'.format(staging_name), data=schema, method=method,
params=query_string)
if resp.get('mdmId'):
print('Schema sent successfully!')
else:
print('Failed to send schema: ' + resp)
Please merge all files of auth in one single file.
This access is extremely confusing.
sphinx, pydoc, doxygen, ...
I have never used.
Some opinion ?
We should create a decorator to handle the REST response. Avoiding have to repeat
if not self.lastResponse.ok:
# error handler for token
if self.lastResponse.reason == 'Unauthorized':
self.token_object.refreshToken()
self.headers = {'Authorization': self.token_object.access_token, 'Content-Type': 'application/json'}
continue
all the time.
This decorator must have a persist
parameter (how many times retry). Maybe some delay for consecutive requests after error and an option to call different functions for a given error code/msg
I am facing a problem where I need to save a model but it does not support the pickle process. The library that implements this model already provides a method to save the file, but it has some logic in it. One way to benefit from the luigi_extension's features for target load/dump would be to create a new Target with the provided logic. That way would work, but I think it would become messy with time, as more specific patterns for load/dump will appear in the future. Another way would be to not use luigi_extension's Target, but then I lose the benefits of using a luigi_extension's Target.
The proposal is to create a CustomTarget, since the LocalTarget already receives the Task related to the Target, and, in the Task, the user would define the load and dump methods for this CustomTarget that could be named target_load and target_dump, something like that. That way, there is a generic form of defining targets directly (and related to) the task's processes.
E.g.
class MyTask(Task):
TARGET = CustomLocalTarget
def easy_run(self, inputs):
...
return model
target_load(self):
... # logic for the loading process, very specific for a few use cases
target_dump(self, data):
.... # logic for the dump process, very specific for a few use cases
Can I implement that or do you think there is an easier more reliable path? @rafonseca @rafarui
pycarol is not saving the results in "resolve" filed, since it only keeps the mdmGoldenFieldAndValues
At least when the mapping is 1-1. Useful for debugging new mdm features locally.
When a target is persisted on S3, we have access to the result, the target, but we dont have any info about the execution that generated this target.
Note that this is different from logging, because we only want to save the information associated with the respective target.
temporary_path is not working, it seems that when the task is complete, it rewrite the file with None as output.
Here an ex.
def easy_run(self, inputs):
train, test, eval_train, user_index_test = inputs[0]
space = [Integer(5, 50,name = 'epochs'), # epochs
Real(0.05, 0.1, 'log-uniform', name='learning_rate'), # learning_rate
Integer(15, 100, name='no_components'), # no_components
Real(10**-8, 10**-5, 'log-uniform',name='user_alpha'), # user_alpha
Real(10**-8, 10**-5, 'log-uniform',name='item_alpha'), # item_alpha
Integer(5, 15,name = 'max_sampled'),
]
with self.output().temporary_path() as self.temp_output_path:
checkpoint_saver = skopt_stopper.CheckpointSaver(self.temp_output_path)
delta_earlystop = DeltaYStopper(DELTA,N_BEST)
res_fm = gp_minimize(objective, space, n_calls=N_CALLS,
random_state=SEED_SKOPT,
verbose=True, callback=[checkpoint_saver, delta_earlystop])
We are going to take some time to move everything to GCP. Until then, I think we could have an option on pycarol storage to use AWS or GCP cloud. This would be very useful to leave the luigi targets in the same cloud/region, when we start to run our notebooks and pipelines on GCP.
To avoid API changes, we could do this setting through a env variable. If this env variable is present, then we use GCP, otherwise we keep using AWS. This would no affect any project.
@wmsouza , @amalucelli , @rafarui . what do you think ?
Give the option to automatically create schema if it does not exist when sending data.
Create on luigi-extension a FoarceableTask capable of running a task even if its targets already exist. Like this:
`class ForceableTask(luigi.Task):
force = luigi.BoolParameter(significant=False, default=False)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# To force execution, we just remove all outputs before `complete()` is called
if self.force is True:
outputs = luigi.task.flatten(self.output())
for out in outputs:
if out.exists():
os.remove(self.output().path)`
In order to have a nice UX in taskviewer, this method should be faster. Usually, we check all targets of a given pipeline to update the viewer. This takes more than 30 seconds and blocks the process. For sure, we can improve it.
Line 116 in 48c45a6
We are not properly handling when a token expires.
Today we have a parameter to get aggregations. I think we should use the fields to sort that out.
the example seems to be wrong
Map<String, Map<String, Long>>
a map of id to a map of id/long. I think you have to confirm the counter that you read, not only the id
because the record might have changed
right
Map<String, Long> goldenRecordIdWithCounter
@rafael.rui you'll have to get the counter for the record (mdmCounterForEntity)
and send like
"aaa": 150,
"bcd": 151,
}
}```
...
Clone tenant Methods should tell the user when it is done
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.