Giter VIP home page Giter VIP logo

scheduler's Introduction

scheduler's People

Contributors

fils avatar jmckenna avatar valentinedwv avatar ylyangtw avatar

scheduler's Issues

Stack rework to use multiple usercode GRPC code source

I think each set of 'workflow' code needs it own code repository, so that things do not get lost when
'user' code is updated.

https://docs.dagster.io/deployment/guides/docker#multi-container-docker-deployment

https://docs.dagster.io/concepts/code-locations/workspace-files#running-your-own-grpc-server

By default, Dagster tools automatically create a process on your local machine for each of your code locations. However, it's also possible to run your own gRPC server that's responsible for serving information about your code locations. This can be useful in more complex system architectures that deploy user code separately from the Dagster webserver.

https://github.com/dagster-io/dagster/tree/1.4.2/examples/deploy_docker

https://github.com/dagster-io/dagster/blob/1.4.2/examples/deploy_docker/Dockerfile_user_code

Unhardcode the Configuration paths

These need to be passed to the docker,

GLEANERIO_NABU_CONFIG_PATH=/configs/nabu/nabuconfig.yaml
GLEANERIO_GLEANER_CONFIG_PATH=/configs/gleaner/gleanerconfig.yaml

then these need to be unhardcoded:

CMD = ["--cfg", "/gleaner/gleanerconfig.yaml","-source", source, "--rude"]

Thinking when the docker.py using branch becomes stable, that we should inspect the container mounts, to be sure they are mounted.

Revise pygen.py cron code

I think the logic here for trying to do a default distribution of the sources is wrong.

I want to say given 21 days and 42 source then a source would run ever 12 hours.

around line 23-24 I do

        hours = int(days) * 24  # days is the cli param for the number of days to work over
        inc = round(hours / len(c["sources"])) # divide hours we want to run over by number of source to get increment

then around line 52 I do (where days is from the command line)

                    di = int(days)
                    q = (((i * inc) / 24) % di) // 1
                    r = (i * inc) % 24
                    new_cron_schedule = "0 {} {} * *".format(r, int(q)+1)

trying to get this aling with cron logic. This is now hacked a bit to make it work for the large number of sources in IoW and I hope it doesn't effect ECO.

I'll try and get this resolved to something working and logical soon but comments or better solutions welcome.

Containers: how to more dynamically add gleaner/nabu configs to the executing containers

Thought:

Put configs in a volume

  • what would happened if we defined a service in the docker stack that mounted a volume which mounted/copied the configs to the volume.

put in a volume at say: /configs

  • gleaner to /configs/gleaner/gleanerconfig.yaml
  • nabu to /configs/nabu/nabuconfig.yaml
  • schemahttp to /configs/assets/schemaorg-current-http.jsonld
  • schemahttps to /configs/assets/schemaorg-current-https.jsonld

oops schema files are too big (limit 1024000 bytes) to be configs... so would need to be compiled into the image.

gleaner, glcon, and nabu images would need to copy the schema assets to the container.

put in a volume at say: /configs

  • gleaner to /configs/gleanerconfig.yaml
  • nabu to /configs/nabuconfig.yaml

Add a dagster_local_stop.sh script

cadd a dagster_local_stop.sh script ;) don't even think we need the .env file
docker compose -p dagster -f compose_local.yaml down

but might want add logic for a remove option.

docker compose -p dagster -f compose_local.yaml rm

SSL Error. Opentopo

We cannot directly fix this, for now.

The two reports steps for opentopography fail with something to do with reading the sitemap

Couple url's to look at:
https://arista.my.site.com/AristaCommunity/s/article/Python-3-10-and-SSLV3-ALERT-HANDSHAKE-FAILURE-error
pandas-dev/pandas#47189

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "harvest_opentopography.opentopography_missingreport_s3":

  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 369, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 90, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/compute.py", line 192, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/compute.py", line 161, in _yield_compute_results
    for event in iterate_with_context(
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
    with context_fn():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
urllib.error.URLError: <urlopen error [SSL: SSLV3_ALERT_HANDSHAKE_FAILURE] sslv3 alert handshake failure (_ssl.c:1007)>

  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/compute_generator.py", line 124, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/compute_generator.py", line 118, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/usr/src/app/./ops/implnet_ops_opentopography.py", line 505, in opentopography_missingreport_s3
    returned_value = missingReport(source_url, bucket, source_name, s3Minio, graphendpoint, milled=milled, summon=summon)
  File "/usr/local/lib/python3.10/site-packages/ec/reporting/report.py", line 78, in missingReport
    sitemap = ec.sitemap.Sitemap(valid_sitemap_url)
  File "/usr/local/lib/python3.10/site-packages/ec/sitemap/sitemap.py", line 53, in __init__
    self.sitemap_df = adv.sitemap_to_df(sitemapurl)
  File "/usr/local/lib/python3.10/site-packages/advertools/sitemaps.py", line 491, in sitemap_to_df
    xml_text = urlopen(Request(sitemap_url, headers=headers))
  File "/usr/local/lib/python3.10/urllib/request.py", line 216, in urlopen
    return opener.open(url, data, timeout)
  File "/usr/local/lib/python3.10/urllib/request.py", line 519, in open
    response = self._open(req, data)
  File "/usr/local/lib/python3.10/urllib/request.py", line 536, in _open
    result = self._call_chain(self.handle_open, protocol, protocol +
  File "/usr/local/lib/python3.10/urllib/request.py", line 496, in _call_chain
    result = func(*args)
  File "/usr/local/lib/python3.10/urllib/request.py", line 1391, in https_open
    return self.do_open(http.client.HTTPSConnection, req,
  File "/usr/local/lib/python3.10/urllib/request.py", line 1351, in do_open
    raise URLError(err)

The above exception occurred during handling of the following exception:
ssl.SSLError: [SSL: SSLV3_ALERT_HANDSHAKE_FAILURE] sslv3 alert handshake failure (_ssl.c:1007)

  File "/usr/local/lib/python3.10/urllib/request.py", line 1348, in do_open
    h.request(req.get_method(), req.selector, req.data, headers,
  File "/usr/local/lib/python3.10/http/client.py", line 1283, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.10/http/client.py", line 1329, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.10/http/client.py", line 1278, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.10/http/client.py", line 1038, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.10/http/client.py", line 976, in send
    self.connect()
  File "/usr/local/lib/python3.10/http/client.py", line 1455, in connect
    self.sock = self._context.wrap_socket(self.sock,
  File "/usr/local/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/local/lib/python3.10/ssl.py", line 1071, in _create
    self.do_handshake()
  File "/usr/local/lib/python3.10/ssl.py", line 1342, in do_handshake
    self._sslobj.do_handshake()

Release file check Types

Earthcube Utilities coded to use https://schema.org namespace to generate summarize and graph stats report.

Against the release file, we need to call: all_count_types query, and checkL

or perhaps some verification list.

fragment from report

        {
            "report": "type_count",
            "processing_time": 30.65489888191223,
            "length": 50,
            "data": [
                {
                    "type": "http://schema.org/PropertyValue",
                    "scount": "2775"
                },
                {
                    "type": "https://schema.org/PropertyValue",
                    "scount": "1720"
                },
                {
                    "type": "http://schema.org/DataDownload",
                    "scount": "834"
                },
                {
                    "type": "http://schema.org/Person",
                    "scount": "750"
                },

Docker Compose Networking and added headless

added a dagster_host network, so jus the dagster containers talk to each other.
added headless to the dagster to see if it can be configured to be access to this stack.

networks:
  traefik_proxy:
    external: true
  dagster_host:
    driver: overlay
    name: dagster-${PROJECT}
    attachable: true
    headless:
      image: chromedp/headless-shell:latest
  #    build:
  #      context: .
  #      shm_size: "2gb"
      restart: unless-stopped
      shm_size: "2gb"
      labels:
        - "traefik.backend=headlesschrome"
        - "traefik.port=9222"
      ports:
        - 9222:9222
      environment:
        - SERVICE_PORTS=9222
      networks:
        - dagster_host

Trap non-zero gleaner/nabu container exits

Gleaner returns 0 for thing it should not... fixed on dev_ec containers

we then ignore the non-zero, because we want the logs.

So when return code is not zero, raise an exception to stop the workflow.

materialize reports as dagster s3 backed 'assets'

Presently, we use EC_utilites to put reports to s3

For dagster, it would be better to say they were an s3 asset, so they would be exposed in the dagster interface, and we might do some other analyses and test things to see that they are pages without JSON-LD, or some summon network issue.

capture more logs, and put into directory

There are now separate runstats and repo_{name}_{loaded|issue}.log files.

should these be uploaded
also, should we capture to directories?

Option 1: just put each source in it's source all in one

  • source/

Option 2:

  • run (gleaner/nabu) or run/source
  • runstats (place for just the stats, and also the ec utilities stats)
  • loaded/source (place for the repo_{loaded|issue} files

Can the latest runstat be an artifact... that way we would not need to dig too far after a run

Add authentication to dagster

According to this issue, dagster doesn't have this feature yet. But we can secure dagster on traefik: link

do we need this, nice to not have to be able to log in... worst case scenario is that someone does a run

Multiple deployments

This will require refactoring of the code to be less dependent on env variables, and more dependent on
dagster configs.

Multiple deployment of same PROJECT to same portainer
need to separate the PROJECT from the deployment. Right now, PROJECT is part of traefik labels, and PROJECT generates the code.
Let's change to DAG_PROJECT

Add DAG_DEPLOY variable so that we can deploy multiple dagsters on same portainer for same project.

use that in traefik labels. Maybe maybe container names.

  • "traefik.http.routers.sched-${DAG_DEPLOY:-deploy}.entrypoints=http"
    the NETWORK and VOLUME shared with gleaner/nabu will need to be named for each 'stack'
    Will need it's own network, so that it does not get confused (aka same containers on same network with same name...)
    dagster_host:
    driver: overlay

    using project name is a bit tricky from dagster code, and needs to be external for a

    container to use them

    name: dagster-${DAG_DEPLOY:-deploy}

    name: dagster_host

    attachable: true
    volume..
    It think the postgres local volume is ok. It's defined as part of the stack, each stack separate
    dagster-project:
    external: true
    name: dagster-${DAG_DEPLOY:-deploy}
    #name: dagster_gleaner_configs

change postRelease to use SPARQL UPDATE

Think this will make release generic across triplestores.

POST https://graph.geocodes-aws-dev.earthcube.org/blazegraph/namespace/test/sparql

x-www-form-urlencoded

update=LOAD https://oss.geocodes-aws.earthcube.org/gleaner/graphs/latest/summonediris_2023-03-13-15-54-37_release.nq

Not sure if the response will be the same.

log from postman, below

POST https://graph.geocodes-aws-dev.earthcube.org/blazegraph/namespace/test/sparql
200
394 ms
POST /blazegraph/namespace/test/sparql HTTP/1.1
User-Agent: PostmanRuntime/7.32.3
Accept: */*
Cache-Control: no-cache
Postman-Token: 797467e0-7902-4c4a-8f90-58563c374e7e
Host: graph.geocodes-aws-dev.earthcube.org
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Type: application/x-www-form-urlencoded
Content-Length: 138
 
update=LOAD%20%3Chttps%3A%2F%2Foss.geocodes-aws.earthcube.org%2Fgleaner%2Fgraphs%2Flatest%2Fsummonediris_2023-03-13-15-54-37_release.nq%3E
 
HTTP/1.1 200 OK
Access-Control-Allow-Credentials: true
Access-Control-Allow-Headers: Authorization,Origin,Content-Type,Accept
Access-Control-Allow-Origin: *
Content-Length: 440
Content-Type: text/html;charset=utf-8
Server: Jetty(9.4.z-SNAPSHOT)
Vary: Origin
X-Frame-Options: SAMEORIGIN
Date: Thu, 10 Aug 2023 00:06:51 GMT
 
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"><html><head><meta http-equiv="Content-Type" content="text&#47;html;charset=UTF-8"><title>blazegraph&trade; by SYSTAP</title
></head
><body<p>totalElapsed=66ms, elapsed=65ms, connFlush=0ms, batchResolve=0, whereClause=0ms, deleteClause=0ms, insertClause=0ms</p
><hr><p>COMMIT: totalElapsed=153ms, commitTime=1691626574764, mutationCount=972</p
></html
>

the mutation count is what we want to look at

Testing/refactoring

For testing, it looks like the @graph needs to be moved from the file with all the ops.

someting like this works with @graph removed.

from implnet_ops_geocodes_demo_datasets import geocodes_demo_datasets_gleaner
def test_geocodes_demo_datasets_gleaner():
    res = geocodes_demo_datasets_gleaner()
    assert res.success
    assert res.output_for_node("find_highest_protein_cereal") == "Special K"

Maybe we can us the same set of @ops with parameters/context passed.

Gleaner to Nabu improve workflows

Need to better trap gleaner errors. If it does not run, stop run.

Ideas:

  • done use GRPC code server (dagster api...) to hold code so that dagster daemon get's restarted less often
  • (no longer needed: logs uploaded every 600 seconds) use sensors to monitor long running jobs with names. Ouput asset with name and container id
  • tag outputs as assets, and have separate end load reports, that run late night if it's updated.
  • (done) have graph reports run over release file, and not graph.

separate summon from nabu dataloading

  • run sources once
  • create sensors that are community based that will add datasets to a community graph using relese files

Need to setup a sensor that when gleaner is run, it stores a container id, then a nabu sensor would wait until the run completes, then run the nabu steps.... Then if the nabu steps complete, the graph gets's loaded, etc.

Right now if scheduler is updated and a gleaner/nabu is running, scheduler will never know that it was supposed to be watching that container.

409 conflict on delete

Containers getting left around on delete, which is an issue.

urllib.error.HTTPError: HTTP Error 409: Conflict
caused by:

ypeError: HTTPConnection.getresponse() got an unexpected keyword argument 'buffering'
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 457, in _make_request
    httplib_response = conn.getresponse(buffering=True)

Changing to use the docker.py library.

Scheduler, CHECKs to add

Stop a run if:

  • post gleaner: no files in summoned s3 path.
  • move release step to front of nabu steps if there is no release, don't need to run other nabu steps
  • post release:
    • ideally, nabu release would return 1 if no file was created
    • if no release file exists do not push to graph (check if a file is at the s3 url), or run reports. This might just be it's own step,

Add bucketutil cull

This was cull, but there are some cases where there may be more than one JSONLD from a page (aka org datacatalog and dataset).

Need to provide a report that there are duplicates, and let us decide if that it ok,

  • (option1) just create one that culls dupes older than (reharvest period)
  • (option 2) just cull files older than (reharvest period)

Production Deployments

Stabilize a branch for use as 'production'

  • create production branch, add to workflow... then configure dagster stack.
  • Slow changing of generated code

wiki page

Docker/Portainer API start issue

   # {"message": "starting container with non-empty request body was deprecated since API v1.22 and removed in v1.24"}
  
    url = URL + 'containers/' + cid + '/start'

dagster: local deployment

Make a 'local' aka developers deployment

  • create a Dockerfile_local, and customize the docker_compose local so that the generated code is mounted, and therefore developers will not have to rebuild the containers everytime the code is updated

Add Headless Container to each Gleaner run.

Headless is a one 'summoner' at a time service. If more than one running gleaner accesses it, it can cause bad data.

So we need to create a single headless container Service with each run, or at least a container, that we start up and tear down since we don't need any configurations.

Needs to

  • have a unique name that can be refered to as http://headless_RUN:9222
  • a mount of /dev/shm to increase the size of shm mount.
  • this is just needed for when a gleaner is running.

Study assets

Iedata, iris, designsafe, cchdo, magic green, but they did nothing after gleaner, need to think about assets, so when output does not change, then runs stop..

And maybe study error logs/failure hooks

DEBUG being ignore

Needs to get passed to DAGSTER containers in the compose_xxx.yam

fixed in dv_docker

Reports. Weekly task, create source and overall time history

This should be done in [dagster/implnets/workflows/tasks]
use dagster dev for development.

Read the missing reports for a repository, create a csv or json of the load information

repo, date, sitemap count, jsonld in s3 count, milled count, graph count

remove msg parameter. prevents starting workflow in middle of run.

If we want to start with prune, after a long run of gleaner that got lost due to server restarting, then having to pass a message casues a failure

dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "msg" of step "harvest_iedadata.iedadata_nabu_prune":

Naming convention for sources

source names with symbols break scheduler. need to have a proper naming conventions.

add data validation in the spreadsheet

Push release graph to graphstore

branch off dev_eco

The releases created need to be pushed to Blazegraph, and an 'op' needs to be added

Suggested steps:

  • use postman to figure out how to push a 'release' file to Blazegraph.
  • add method to earthcube_utilites/graph/manageGraph... there may already be a method in the code
  • code operation in schedule template, preferably using the manageGraph so that we maintain one codebase.

do we need a script in earthcube utilites to do this from the command line?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.