Giter VIP home page Giter VIP logo

linkeddataeventstreams's Introduction

The Linked Data Event Streams specification

A Linked Data Event Stream is a collection of immutable objects (such as version objects, sensor observations or archived representation). Each object is described in RDF.

The objective of a Linked Data Event Stream is to allow consumers to replicate all of its items and to stay in sync when items are added.

The HTML specification can be accessed from here.

This specification uses TREE for its collection and fragmentation features, which in its turn is compatible to other specifications such as Activity Streams, DCAT-AP, LDP or Shape Trees.

If you are new to the concept of Linked Data Event Stream or Linked Data, this short training introduces the main concepts.

Build the spec

Install bikeshed and then run bikeshed watch eventstreams.bs

Contributions

  • Create an issue first
  • Gather community input, either by asking input from the editors via email, either via the github issue, either via the Matrix chat channel
  • Open a pull request extending the specification

Changelog

  • 2023-04-28 - Point in Time retention policy has been added
  • 2022-07-01 - tree:ViewDescription was introduced as a concept

Acknowledgements

This work is financed by the Interoperable Europe (SEMIC) programme of the European Commission

linkeddataeventstreams's People

Contributors

barthelemyf avatar ddvlanck avatar kasperzutterman avatar phochste avatar pietercolpaert avatar sdevalk avatar wlefever-cegeka avatar woutslabbinck avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

linkeddataeventstreams's Issues

Support streaming parsing of fragments (profile)

While the most common read pattern for clients will be to read at the end of the log, from time to time new clients will show up that want to sync over the full history of the stream.
As I explained in #40, the issue in open fragments is the maximum size of a fragment has an exponential impact on the necessary bandwidth and processing in the client. This would argue for creating smaller fragments, however this also has downsides, as smaller fragments would mean more requests to the server.
This is why it would be ideal to rewrite historical fragments (say older than a day, immutable), into larger fragments.
Fetching bigger files (especially if the HTTP headers indicate relations, so this can add concurrency) is much more efficient than many smaller ones (connection setup, higher compression rate,...), but has a drawback in the current form: no tree:Node streaming parser exists, essentially requiring the entire graph (of one page) to be parsed in memory.
When compacting historical fragments into these larger graphs, this could be an issue.
This is why I would suggest a default way of structuring the data in a page, such that a stream aware parser can stream parse the document, and emit members as they are processed.
This would significantly reduce the memory requirements in the case of large fragments.

The layout of a page (say, using turtle serialization as it offers best compression) could look something like this:

  • First the stream membership statements, required to find the tree members
  • Then the members, one by one, ordered first by object id, then timestamp path
    ( this allows for member skipping if the client is interested in latest state only, reducing the number of upserts on the database the stream is projected in.)
  • Last the relation pointing to the next page

As all member triples are 'grouped', the parser can read one member at the time.

As the document would be a normal RDF file, and the only semantics added are there to support the streaming behavior, this should be completely backwards compatible for clients that don't support streaming tree parsing.
The capability could be indicated by a statement on the view.

Hypermedia URIs should be relative URIs

When outputting hypermedia URIs that will be dereferenced by the client, as an LDES server, it would be best (rfc 2119 SHOULD?) to make those relative URIs.

This will allow for transparent proxy mappings without the need for proxy to rewrite any content.
As relative URIs are interpreted relative to the URL of the document they occur on, this will effectively resolve any potential proxy issues.

Add and specify support for named graphs

There is currently a proposal to support named graph serializations like TriG and JSON-LD.

The proposal would be to allow something like this:

ex:C1 a ldes:EventStream ;
      tree:member ex:Graph1, ex:Graph2 .

ex:Graph1 prov:generatedAtTime "2022-12-25T12:00" ;
                    foaf:primaryTopic ex:Streetname1 .

ex:Graph1 {
   ex:Streetname1 rdfs:label "Station Street" .
}

ex:Graph2 prov:generatedAtTime "2023-12-25T12:00" ;
                    foaf:primaryTopic ex:Streetname1 .

ex:Graph2 {
   ex:Streetname1 rdfs:label "Station Square" .
}

Create a set of real world examples

It would be easier for readers to have access to a set of examples of how to use LDES. Also we might need to add additional comments to the examples in the spec for clarity.

Consider using prov:specializationOf in the examples

Although the ldes:versionKey predicate allows defining which properties to use as a version key, there's an implicit recommendation to use dct:isVersionOf because it's the only one mentioned in the examples.

The Prov-O ontology contains another useful predicate that could be used to link the version object to the main entity: specializationOf. Given how Prov-O is a W3C recommendation, and it might get used to describe projections as well, it may be interesting to consider this one instead of dct:isVersionOf.

Shape Trees specification link is broken

https://shapetrees.github.io/specification/spec possible moved to https://shapetrees.org/TR/specification/

This specification uses the [TREE specification](https://treecg.github.io/specification) for its collection and fragmentation (or pagination) features, which in its turn is compatible to other specifications such as [[!activitystreams-core]], [[!VOCAB-DCAT-2]], [[!LDP]] or [Shape Trees](https://shapetrees.github.io/specification/spec). For the specific compatibility rules, read the [TREE specification](https://treecg.github.io/specification).

A note is cut

One of the notes currently ends "You can indicate that the new ldes:EventStream is derived from".

Some ideas about streams...

First of all, I'm really new to the spec, so I'm sure I'll make some wrong assumptions here...
But I have leaned that the easiest way to learn something on the internet is to make a statement, and someone will try to prove you wrong.

So here it goes:
I under the impression that the design of the event stream was focused on being bulk-loadable in a triplestore as-is, in such a way that the data can be queried without further processing; the state is present in the stream itself.
Although I think this is a really interesting feature, its biggest disadvantage is that the semantics of the event metadata and the entity data get mixed.
I'll try to argue here for an alternative approach, where the event stream is seen as more as transaction log.

So this is how I understood the spec; what I thought to be the requirements that where behind the definition of spec:

  • Events are appended to the end of the stream
  • Events are not business events, but the representation of the full state of an entity.
    (I assume this because of the design where the stream is queryable in and by itselve, more a growing graph.)
  • Entities can be updated
  • Old version, that no longer contribute to the state of the system can be cleaned up, if the stream decides to do so.

So now for what I see to be the disadvantages to the current LDES approach:

  • The event data and metadata are mixed. Because of this, the stream semantics might need to be updated when the datamodel of the producer evolves.
    There is a risk of having semantic collision between carrier and message, if for instance a DCAT Datasat would be carried over a stream, the use of isVersionOf would conflict. This is worked around with by the ldes:timestampPath and ldes:versionOfPath, but gives room to errors.
  • No order to the events can lead to inconsistency in the materialised state derived from the stream. Using time for ordering in a distributed system will lead to errors at some point.
  • Deletion of objects is implicit, through removing members from the stream.
  • Lack of transaction semantics. Since the state of objects is carried over, it might be that multiple objects need to be changed at once in order to remain adherent to the defined shapes.
    (This might not be relevant if the stream consist of sensor data, where each entity is an instance of the same class, but if the source is a business application the need will arise eventually).
  • Due to the approach to versioning, the graph of all events can't be used directly, but a subgraph needs to be derived by the indirection through the tree:member property of the event stream instance.
    This seems to imply that for each update two pages: the first containing the stream definition and the last one with the latest event.
    In case the stream grows to terrabytes of data, the reprocessing of the (huge) EventStream instance could become an issue.
  • Another issue I see with the versioning is the mix between application versioning (an entity received updates through time, and I want to see previous versions of it), and infrastructure concerns (I want to be able to do point-in-time restores if the application update messes up the database).

So, after this, I would like to propose another event representation that tries to address these issues(Example 1 of spec):

ex:C1 a ldes:EventStream ;
    tree:shape ex:shape1.shacl .

[ 
    a ldes:Event ; # Could be implied if pages are dedicated?
    ldes:sequence "1"^xs:integer ;
    ldes:commitTime "2021-01-01T00:00:00Z"^^xsd:dateTime ; # Used as metadata in case a point-in-time recovery of the stream needs to be performed, and the recovery point needs to be determined).

    ldes:key <ex:Observation1> ; # The subject of the object represented.
    ldes:value [ # value is optional, if not present this is a tombstone event (delete).
        a sosa:Observation ;
        sosa:resultTime "2021-01-01T00:00:00Z"^^xsd:dateTime ;
        sosa:hasSimpleResult "..." .
    ]
]

This also allows for representing the stream in a key-value ledger model as used by Apache Kafka.
The semantics for materialising this stream in a triplestore would be simple.
For each event in stream:

  • Delete all triples in DB with the event key as subject (ignoring nested blank nodes for a moment).
  • Take the ldes:value of the event, and set the subject of the triples to the event:key. Insert into DB

Under this model, versioning of objects can be done at the application level using the isVersionOf approach if the versions concerns the application (e.g. multiple versions are kept so users can roll back to a previous version of an entity), as well by the log retention policy algorith that is chosen on the stream level, which only concern is to be able to move back in time (for instance for point in time recovery), and doesn't need to concern about application versioning.

A retention policy could keep the full state (decided by the producer what should make up the state), as well as the transaction log for a certain time.
(following the idea of how topic log compaction is done in Apache Kafka)

Additional properties could be used to signal transaction boundaries (if retention policies apply, the boundaries can only be used when reading the head of the log, where messages are sequential).
e.g. ldes:trx_id "1"
If the stream delivery method can't guarantee that events are committed to the stream representation in an atomic way, a transaction event count could be added to the first event part of a transaction, to indicate the number of events that should be read as part of the transaction.

Move search form to the back

Search forms are a convenience tool, but certainly not a must or even a should. Let’s move that example to the back.

Consistent graph replication - RDF Dataset Canonicalization

When a client requires hard guarantees on consistency, the logic described in the RDF Dataset Canonicalization could be used to provided hashes of the state that should be reached after applying a fragment, or even better, a transaction.
This becomes relevant in cases where LDES is used as a replication protocol for named graphs (the client should have an exact copy of the named graph the publisher intended). For instance, consistency could be lost if a client is offline longer than allowed by the retention period, which could result in missed delete operations (tombstone events). If a checksum mismatch is detected, the client must restart replication from the start of the log to arrive at consistent state.

Reference: https://www.w3.org/TR/rdf-canon/

Discovering an LDES from a subject or another tree:Collection

Discovering an LDES from 1 specific subject

Proposed solution: a new predicate ldes:eventStream to be introduced, linking a subject to one or more EventStreams that contains, or will contain, updates about this subject in the future.

?subject <https://w3id.org/ldes#eventStream> <target> .

This would imply that <target> a ldes:EventStream . and dereferencing the target results in an RDF resource describing an LDES. This LDES may contain members that link to the ?subject.

Discovering from an existing tree:Collection

When the same data collection has updates, a collection may also have precisely one event stream for updates about its members. We propose a ldes:versionStream predicate for this.

<collection> <https://w3id.org/ldes#versionStream> [
           dcat:accessUrl <target> ;
           ldes:versionKey (dcterms:isVersionOf) 
]

The predicate ldes:versionKey on top of the VersionStreamDescription will explain how its members can be mapped to specific versions.

Add versioning metadata to the Collection

Publishes a dataset often entails adding some additional statements (e.g., dct:isVersionOf), but this data then becomes indistinguishable from the original data elements. This can be an issue for operations such as version materializes, which should yield the current version of each concept, as it would appear without this LDES-specific metadata.

The specification currently describes how to add a version key to a retention policy, but this cannot be used for collections without a retention policy. Furthermore, the version key only specifies which predicate is used to link the version URI to the concept URI, but there's often another predicate that is used to assign a timestamp to this version. This timestamp metadata would also be useful for other issues, such as #16.

I would propose to move the versioning metadata (the version key and timestamp predicate) to the Collection description, and possibly make it mandatory. Perhaps it can become part of the shape description.

Standardize client iterator

While the design of some LDES-es can be really simple, just keeping a history log of all pages and members processed to keep the state will not scale to really big LDES-es.

In earlier work, we have introduced the ldes:timestampPath to point at a property that can be used to keep the state in a different way: just resume from that timestamp.

If would be good to have a standard algorithm described in the spec to explain how a client must pause and resume from a certain state.

Coined by @sandervd

Status log of LDES consumers

Relevance:

  • In the Matrix channel, this was raised by @madnificent as something that we need in the spec
  • In DCAT-AP Feeds, this is left as an issue to this spec

Two parts we need:

  1. A summary on top of a republished view or LDES when it was last synced (cfr. the older issue #5)
  2. A detailed log of why certain member were adopted by the consumer, and why certain members have been ignored

1 can be derived from 2 as well

Domain of ldes:retentionPolicy does not exist in TREE vocabulary

The ldes:retentionPolicy property is defined as:

ldes:retentionPolicy a rdf:Property ;
   rdfs:label "has retention policy"@en;
   rdfs:comment "Links to a retention policy."@en ;
   rdfs:domain tree:View ;
   rdfs:range ldes:RetentionPolicy .

But there is no class tree:View defined in the TREE vocabulary. Given that this is a LDES specific property, shouldn't its domain be ldes:EventStream instead?

Using git protocol as LDES transmission protocol

I have had this idea for quite some time, to use GIT as a way of synchronizing graphs over the network.
The data structure behind git, namely blob, tree and commit objects, could mix quite elegantly with the way LDES is structured.
(an old writing I did about this: https://github.com/sandervd/linkeddata-git )
As git has internal optimisations, such as packfiles https://git-scm.com/book/en/v2/Git-Internals-Packfiles
that can make transfer of deltas much more efficient than one by one...

Anyone interested in exploring this topic? I haven't looked into the constraints of git (the most important on would be the max size of tree objects), and at the moment I don't have a use case, I just wanted to throw the idea out for now :)

Distributed transactions in LDES

How do we indicate that we have a consistent knowledge graph across LDESes?

For instance, what if we split Linked OpenStreetMap into 3 LDESes: one for nodes, one for ways and one for relations. Then, for each LDES, multiple objects are added into one transaction. Just processing one member would not be very valuable, as you would get an inconsistent replication: the osm:Way would for example not yet have the necessary osm:Nodes to point to.

A solution would be to have a transaction system that indicates whether the transaction is still in progress, and define the bounds (probably based on a timestamp?). The set of members can then only be processed into a derived view or service from the moment the transaction is marked as completed.

Any ideas on this?

A way to circumvent transactions in LDES is to just post members at exactly the same datetime. However, we might want to introduce parts of the knowledge graph changes at different times.

Related work:

Which members should be deleted with version-based retention policies

With time-based retention policies, a tree-path is given and a duration. Based on those 2 triples, it is possible to reason which members should be deleted when a retention policy is present.

For version-based retention policies however, this is not clear. The only information given in this policy is an amount x and a versionKey, which states the collection of which a minimum of x should be preserved.

When that amount is reached, it is impossible to reason which members should be deleted. As it is not favorable to have random deletion, it might be interesting to delete the oldest members of the collection.

I suggest that, just like in time-based retention policies, deterministic deletion behavior can be achieved by adding a tree-path, which leads to a timestamp of a member. Based on that timestamp, the oldest member of the collection can be pruned due to the retention policy.

Default JSON-LD context

JSON-LD can lower the barrier for adoption of Linked Data when given clear rules on how to implement the JSON frame. @phochste proposed to have, in this repository, also a standard JSON-LD context that can be use for a default frame

ActivityStreams-based retention policies for removed entities

Some back-end systems are able to expose an event stream of last updated items, but don’t keep track of things that have been deleted. In that way, a retention policy should exist that describes the fact that the LDES conceptually does contains the as:Remove activity, but that it hasn’t been included, yet can be inferred from the fact that the earlier included as:Create is not anymore part of this

Use cases:

Latest processed item as a marker

There is a note suggesting to keep a list of already processed items. Would it be a recommendable alternative in some situations to instead keep the latest processed item as a marker of how far the stream has been processed?

Delayed flush strategy profile for views

As long as a fragment is being written to, the client needs to parse the entire fragment again, including the members it already processed.
In the worst case scenario, when the client polling interval is >= the member addition interval, the amount of effectively transferred and parsed members is given by (max_fragment_size * (max_fragment_size + 1) /2), where max_fragment_size is the max. number of members allowed in a fragment.
Say 5 members are allowed per page, and the client is polling faster than member are written to the fragment (and etags are used to avoid needless processing), than the amount of effectively transferred and parsed members = 1 + 2 + 3 + 4 + 5 = 15.
If the max_fragment_size is set to 250 members, 31375 members are parsed to process 250 members.
At this point the efficiency of both data transfer and parsing compute has dropped to 0,80% =((max_fragment_size * (max_fragment_size + 1) /2)/max_fragment_size)) *100, leaving 99,2% of resources spend of algorithmic overhead.

These 31375 members, have to be transferred over the web and parsed, hence a lot of bandwidth and cpu cycles are wasted.
In order to mitigate this, I would propose to introduce some semantics in LDES, so that a server can indicate to a client that it follows the 'delayed flush strategy'.
So how would this work? A server would only write out fragments that are immutable. It can do this by buffering writes for a maximum time (let's go with 10 seconds for the argument). After this time (or when the max fragment size is reached), all buffered members are flushed as an immutable fragment. This results in potentially smaller fragments being written, which is fine for those reading the at the end of the log.
As a client knows (from the view definition) that fragments are written out in an atomic fashion, it only needs to request the fragment once.
If no relations are found in the fragment, the client can then fall back to polling using HEAD requests, when the relations are present in the HTTP headers.

Of course, this means that data can be delayed during the buffer window, but the trade-off would be reasonable to make.
The profile could be announced on the view with a simple statement.

Status indication of a derived View

Status indication

When replicating a stream on your own system when, for example, building other tree:views on top of them with different fragmentations, you MAY track the progress of how much elements from the original stream are processed into your own system.

This should become possible using an ldes:elementsProcessed as follows:

<Collection> a ldes:EventStream ;
    hydra:totalItems 500 ;
    tree:view :View1 .
:View1 a tree:Node ;
    ldes:elementsProcessed 250 .

Clarify example 3

I couldn't directly understand example 3. Is it implied that there are three documents, each consisting of the triples where the subject is <C1>, <?page=1>, <?page=2>?

What does ... consist of - will all observations be individually declared as members of <C1> and will they be referenced by the first of the documents?

Add Additional Hyperlinks to README.md

Motivate streams not ordered by publish time

Intuitively, a "stream" refers to a collection of items ordered by the time they were published in the collection. Thus, the stream grows at its end. The specification seems to consider these an important but not the only relevant type of "streams". (There is a note saying "A 1-dimensional fragmentation based on creation time of the immutable objects is probably going to be the most interesting and highest priority fragmentation for an LDES" but then continuing "sometimes the back-end of an LDES server cannot guarantee that objects will be published chronologically".)

Should these two separate cases be motivated, perhaps already in the introduction?

Separating metadata from LDES members

LDES members are mangled with the LDES metadata such as versionOf, timestamp, etc. which is a bit against the philosophy of the OSI model which assumes your have an envelope with metadata where the actual member data in resides.

Problem

LDES member data is mixed with their metadata. For example:

<MemberJefke#8> a foaf:Person;
  foaf:name "Jefke";
  foaf:age "45";
  dct:created "2023-01-01T00:00:00.00Z";
  dct:versionOf <MemberJefke>;
.

In the example, the member describes about a foaf:Person but also mixes the metadata within when the member was created.
This gives a weird view to this person for example, you could assume that it was created on 2023-01-01 but its age is 45 which is not possible. The bigger problem here is that metadata about the member is integrated into the member instead of separately where it points to the member, for example:

<MemberJefkeMetadata#8> 
  dct:created "2023-01-01T00:00:00.00Z";
  dct:versionOf <MemberJefke>;
  dct:subject <MemberJefke#8>;
.
<MemberJefke#8> a foaf:Person;
  foaf:name "Jefke";
  foaf:age "45";
.

Similar approaches

  • W3C Acitivity Streams does the same by relating to the object it is describing with as:object.
  • OSI model encapsulates data as well like this with metadata around it. This is possible in multiple layers even.

CC: @sandervd

Consider forbidding indefenite caching of root nodes

A client that is already synchronized to an event stream may be disappointed if the data publisher decides to deprecate the fragmentation it's subscribed to, because it may just look like the event stream is no longer updated.

If root nodes can be cached, clients may never discover the new APIs at all. So should the specification forbid setting root nodes as immutable?

Add note on version objects

The ldes:timestampPath should be carefully assessed as it’s not always the case that the timestamp on the version object is a change in the real-world on top of the real-world entity

Rewrite 4.3 Version subsets

First tell "why" you would be making version subsets, because an eventstream in itself is versioned, so why make subsets?

In order to indicate you only keep 1 specific version, e.g., the latest, in your stream
-> 1 specific version of what?
-> 1 specific version for every object

What's the definition of "ldes:amount"? numberOfLatestVersionsPerObject?

How do we tackle other use cases, e.g., what if you only keep the versions between a certain time window and region?

JSON-LD context/frame for the LDES members

It would be interesting to be able to point to an optional json-ld context on the eventstream, that can be used to apply to LDES members to JSON-ify them.
This would allow to use any serialization within a data pipeline, transforming the data as you go, while being able to create a JSON document from the member as needed.

Polling interval algorithm and extra property called ldes:timeToLive

Currently, the expected behavior is as follows:

  1. When a Cache-control: max-age=x and Age: Y has been set, poll the page again in X-Y seconds, or when it is immutable, don’t poll it ever again.
  2. When there are is no Cache-control header present, poll the page according to a ldes client specific strategy, that could be influenced by a configuration flag

In the reference implementation of the LDES client, there is a flag on a pollingInterval. This is a problem as it requires specific configuration for a specific kind of LDES.

There should be an algorithm in the LDES spec that indicates when you MUST poll again, even when caching headers are not set. This thus can first elaborate on how to interpret max-age and age, but I would like to have an LDES specific property on a tree:Node to either overwrite the max-age, or set the polling interval when it’s not set.

# If set, you can ignore HTTP headers and not visit this page again
<> ldes:timeToLive "immutable" .
# If this is set, the time to live of the page is 60 seconds. It may however be possible that the page was generated 59 seconds ago. For more fine-grained control, you will need to check HTTP headers to then check the `Age` header.
<> ldes:timeToLive 60 . 

This may also be useful for hosting LDESes in systems in which the caching headers are not easy to change.

When there is no timeToLive, or max-age present, we should have a fall-back of a default value. I’m unsure what this default value should be then.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.