Introduction#

Chronon is a feature engineering framework that allows you to compute and store features for offline model training and online feature serving with unified feature definitions.

Chronon integrates with airflow and spark to automatically generate feature computation pipelines. We can utilize kafka and hive as data sources that power these spark pipelines.

Chronon supports:

  • Defining features over data sources that

    • Require real-time aggregations over raw data

    • Midnight accurate aggregation over raw data

    • Or contain pre-aggregated data.

  • Powerful Aggregation primitive with windowing, bucketing and auto-unpacking support over arbitrary complex/nested data types.

  • Spark Sql Expressions with udfs & built-in function library for projections (select clauses) or filtering (where clauses).

  • Computing features in realtime based on traditional event streams or change-capture streams of databases.

  • Backfilling features without waiting to accumulate logs.

  • Data source types as first-class citizens. Data source type refers to patterns in which new data is ingested into the warehouse’s partitions.

    • Entites - new partitions contain snapshot of the state of an entity - like user bank balances. Snapshots of db tables and dimension tables are examples.

    • Events - new partitions contain all the events that occureed in the time duration of the partition value (date, hour etc.,)

    • Cumulative Events - new partitions contains all events that have occured from the beginning of time. So newer partition are simply a superset of older partitions and hence it is sufficient to always read just the latest partition.

  • Arbitrary spark queries to implement complex logic to prepare data for non-realtime accurate features.

  • Idempotent computation - new runs automatically fill all partitions starting from last filled partition. This is counter to the airflow convention of each run filling only the partition of that day.

Chronon comes packaged with tools to manage a repository of feature definitions. explore.py, compile.py and run.py are the primary tools provided via the chronon-ai pip package for discovering, validating and running feature definitions respectively. The repository has a notion of a team that can own feature definitions and configure the execution parameters of feature computation pipelines.

Example#

Let’s say you want to enrich a table of observations with two new columns representing

  1. User’s count of views of items in the last 5 hours - from a view event source.

  2. Item’s average rating in the last 90 days - from a database’s rating table.

The table of observations might look like

user_id

item_id

timestamp

alice

pizza

2021-09-30 5:24

bob

ice-cream

2021-10-15 9:18

carl

pizza

2021-11-21 7:44

You would expect an output that is enriced with features that looks like

user_id

item_id

timestamp

views_count_5h

avg_rating_90d

alice

pizza

2021-09-30 5:24

10

3.7

bob

ice-cream

2021-10-15 9:18

7

4.5

carl

pizza

2021-11-21 7:44

35

2.1

Users use this data, typically, as features, to train machine learning models. These models also require these aggregations while serving to make predictions. So online we get a request that translates to something like - “produce the view count in the last 5hrs, and average rating in the last 90 days, ending now for a user named Bob”.

You would express the computation in Chronon using GroupBy and Join

  • Average rating of the item in the last 90 days as a GroupBy

ratings_features = GroupBy(
    sources=[
        EntitySource(
            snapshotTable="item_info.ratings_snapshots_table",
            mutationsTable="item_info.ratings_mutations_table",
            mutationsTopic="ratings_mutations_topic",
            query=query.Query(
                selects={
                    "rating": "CAST(rating as DOUBLE)"
                }))
    ],
    keys=["item"],
    aggregations=[
        Aggregation(
            operation=Operation.AVERAGE,
            windows=[Window(length=90, timeUnit=TimeUnit.DAYS)])
    ])
  • Number of times the user viewed the item in the last 5 hours as a GroupBy

view_features = GroupBy(
    sources=[
        EventSource(
            table="user_activity.user_views_table",
            topic="user_views_stream",
            query=query.Query(
                selects={
                    "view": "if(context['activity_type'] = 'item_view', 1 , 0)",
                },
                wheres=["user != null"]
            ))
    ],
    keys=["user", "item"],
    aggregations=[
        Aggregation(
            operation=Operation.COUNT, 
            windows=[Window(length=5, timeUnit=TimeUnit.HOURS)]),
    ])
  • GroupBy is essentially a collection of features that are aggregated from sources with similar data. You can put features from different GroupBy’s together using a Join.

item_rec_features = Join(
    left=EventSource(
        table="user_activity.view_purchases",
        query=query.Query(
            start_partition='2021-06-30'
        )
    ),
    ## keys are automatically mapped from left to right_parts
    right_parts=[JoinPart(groupBy = view_features), JoinPart(groupBy = ratings_features)],
)

Full code example

More Examples#

The document below goes into a bit more detail about each of the concepts. You can find examples here instead.

Source#

Source in chronon refers to a logical group of underlying physical data sources. There are two types of sources

  • Events

  • Entities

  • Cumulative

The following diagram visualizes these sources Data Landscape

Events#

Events represent log of immutable events - like user views, clicks, transactions etc. Events are represented by two data elements

  • table => A daily partitioned hive table with each partition containing the events that occur during that day only. You can also use any fact table that is derived from some other offline logic - if you don’t care about realtime-ness.

  • topic => A kafka topic that the events flow through before ending up in hive. This is optional, and only needed if you are trying to produce realtime-features

Usually you will have a naming convention between the topic name and table name. It is a good idea to capture that in a helper method that is re-used across your organization.

# Example assuming arbitrary naming convention
def MyEventSource(event_name, *args, **kwargs):
    return EventSource(table=f"some_namespace.{event_name}_log",                
                        topic=f"{event_name}_mutations/host=YOUR_KAFKA_HOST/port=YOUR_KAFKA_PORT",
                        *args, **kwargs)

Note: The dates and timestamps below are shortened for brevity. You should have dates in yyyy-MM-dd format, and timestamps as long values representing milliseconds since epoch in UTC. You can specify a time transformation as an argument to EventSource.query.timeColumn.

events example

Entities#

In purple we have a group of data elements, that represents changing data in an online database table that application services use.

  • snapshotTable => A daily partitioned table in hive with full snapshot of online table taken every midnight. A tool like Sqoop etc can produce these snapshotTables into hive. You should already have a mechanism that captures online table snapshots into hive periodically. This can also be any dim / dimension table.

  • mutationTopic => A topic in kafka containing mutations to the table rows. A tool like Debezium can be attached to your online database to stream mutations into kafka.

  • mutationTable => A daily partitioned hive table containing mutations that happened in a particular day in each partition.

Note: that mutationTopic and mutationTable are only necessary for realtime feature serving and PITC backfill respectively. If you don’t have these two sources, you can still use just the snapshotTable for constructing midnight accurate (non-realtime) features.

If you have a convention around the all db related tables, it might be worth creating a method that produces a full entity source with all three elements specified.

# Example assuming arbitrary naming convention
def DBExportSource(table_name, *args, **kwargs):
    return EntitySource(snapshotTable=f"{table_name}_snapshots",
                        mutationTable=f"{table_name}_mutations",
                        mutationTopic=f"{table_name}_mutations/host=YOUR_KAFKA_HOST/port=YOUR_KAFKA_PORT",
                        *args, **kwargs)

See example ratings table Entities Example

Note: Dates and timestamps are shortened for brevity. MutationTable is an unpacked version. There are three kinds of mutations deletes, updates and inserts. mutationTs is modified for brevity, but it should be a long representing the time at which the update occurred in milliseconds since epoch.

  • Updates are unpacked into two rows - a before-update-row: is_before = True and an after-update-row: is_before = False.

  • Inserts are unpacked into a single row where is_before = False.

  • Deletes are unpacked into a single row where is_before = True.

You can expand the DBExportSource method above to fit your mutation data into these conventions once for your company.

Cumulative Events#

This is the case where, only new rows are added into new partitions, and old rows’s columns that you care about are never modified. This implies that latest partition always contain all data from the previous partition. The advantage of this is that we can always look at latest partition instead of a historical range.

This is specified as a sub-case of Events as Events(cumulative=True).

See example Cumulative Example

Note: Declaring a source as Cumulative one comes with enormous performance benefits compared to EntitySource. But you need to be 100% sure that the columns you want to consume are never modified.

Query#

Once you have chosen the right source type, the next step is to specify a query that can scan the necessary data. That is where the Query comes in.
row level transformation and filtering. We use spark sql expressions to power this & all built-in functions & UDFs are supported.


GroupBy#

A GroupBy is a group of Aggregations computed from a Source or similar Sources of data.

Consider the following group of aggregations from an user purchase stream (user, credit_card, merchant, price, timestamp) that are key-ed on user.

  • average of purchase prices of a user in last 7d, 30d & 90d windows.

  • top_k(10) purchase prices of a user in the last 365d window

  • unique_count of merchants of a user - in all history.

  • average purchase price of a user bucketed by merchant - result is a map of merchant to average purchase price.

The above example illustrates the computation of aggregates in several contexts.

  • served online, updated in realtime - you can utilize the Chronon client (java/scala) to query for the aggregate values as of now. The client would reply with realtime updated aggregate values. This would require a stream of user purchases and also a warehouse (hive) table of historical user purchases.

  • served online, updated at midnight - you can utilize the client to query for the aggregate values as of today’s midnight. The values are only refreshed every midnight. This would require just the warehouse (hive) table of historical user purchases that receives a new partition every midnight.

    • Note: Users can configure accuracy to be midnight or realtime

  • standalone backfilled - daily snapshots of aggregate values. The result is a date partitioned Hive Table where each partition contains aggregates as of that day, for each user that has row in the largest window ending that day.

  • backfilled against another source - see Join below. Most commonly used to enrich labelled data with aggregates coming from many different sources & GroupBy’s at once.

selecting the right Source for your GroupBy is a crucial first step to correctly defining a GroupBy. See the Sources section above for more info on the options and when to use each.

Often, you might want to chain together aggregations (i.e., first run LAST then run SUM on the output). This can be achieved by using the output of one GroupBy as the input to the next.

Aggregations#

Chronon supports a powerful aggregation primitive that supports time windows, bucketing and auto-exploding as first class primitives. See aggregations.md for more details.

Accuracy#

accuracy is a toggle that can be supplied to GroupBy. It can be either SNAPSHOT or TEMPORAL. SNAPSHOT accuracy means that feature values are computed as of midnight only and refreshed once daily. TEMPORAL accuracy means that feature values are computed in realtime while serving, and in point-in-time-correct fashion while backfilling. When topic or mutationTopic is specified, we default to TEMPORAL otherwise SNAPSHOT.

GroupBy Online (Serving)#

online is a toggle to specify if the pipelines necessary to maintain feature views should be scheduled. This is for online low-latency serving.

your_gb = GroupBy(
  ...,
  online=True
)

Note: Once a groupBy is marked online, the compiler compile.py will prevent you from updating it. This is so that you don’t accidentally merge a change that release modified features out-of-band with model updates. You can overwrite this behavior by deleting the older compiled output. Our recommendation is to create a new version your_gb_v2 instead.

Click here for more configuration examples#


Join#

A join can pull data from GroupBy’s that are keyed on different entities. As you saw in the example section above, view_features are keyed by item & user and ratings_features are keyed only by item. Key feature of a join is that it can operate both online and offline with consistency. Meaning the offline back-filled values are the same as online served values, and there is a single definition governing both.

Chronon runs daily pipelines that measure inconsistency between an offline join, and an online joins.

Join has essentially two things - a left source and right_parts.

  • left source is the source against which aggregates are being computed for. The left source only matter for offline backfilling and not used in serving.

    • One of the main things to determine about your left source, is whether it is an event or an entity.

    • Only events can be used to trigger realtime / point-in-time-correct backfills. Entities always trigger a daily accurate backfill.

    • Only events can be used to trigger realtime / point-in-time-correct backfills. Entities always trigger a daily accurate backfill.

    • If you are predicting online - in a service in realtime, you usually want an EventSource as your left. In this case providing a timeColumn - as milliseconds in UTC since epoch - is essential.

    • If you are predicting offline - once every day, you want an Entity source on your left.

  • right_parts is a list of JoinParts containing group_by’s to join with. This has an optional prefix and key re-mapping facility.

    • prefix - just adds the specified string to the names of the columns from group_by

    • keyMapping - is a map of string to string. This is used to re-map keys from left side into right side. You could have a group_by on the right keyed by user. On the left you have chosen to call the user user_id or vendor. Then you can use the remapping facility to specify this relation for each group_by.

NOTE: The online flag in a Join states that this join is going to be fetched online. Metadata about the join is uploaded and there are checks to make sure each GroupBy used by the Join is set to online as well.

NOTE: The production flag in a Join states that the join is not in development anymore and critical alerting and monitoring are expected. If a Join is marked as production it cannot reference any non-production GroupBys.

Click here for more configuration examples#


Staging Query#

A StagingQuery can be used to express free form ETL (including joins/group by) within Chronon. They are typically used to express more complex logic that won’t fit into the simple query facility provided by the source.

v1 = StagingQuery(
    query="select * from namespace.table a JOIN namespace.table_2 b on a.key=b.key ",
    startPartition="2020-04-01",
    setups=[
        "ADD JAR s3://path/to/your/jar",
        "CREATE TEMPORARY FUNCTION YOUR_FUNCTION AS 'com.you_company.udf.your_team.YourUdfClass'",
    ],
    metaData=MetaData(
        dependencies=["namespace.table", "namespace.table_2"],
    )
)

The StagingQuery can then be used in both GroupBy and Join. For ex:

from staging_queries.team_name_folder import file_name
from ai.chronon.utils import get_staging_query_output_table_name
v1 = Join(
    left=EventSource(
        table=get_staging_query_output_table_name(file_name.staging_query_var_name)
        ...
    )
)

Note: The output namespace of the staging query is dependent on the metaData value for output_namespace. By default, the metadata is extracted from teams.json in (or default team if one is not set).

Click here for more configuration examples#