Python API#

ai.chronon.group_by.Aggregation(input_column: Optional[str] = None, operation: Optional[Union[Operation, Tuple[Operation, Dict[str, str]]]] = None, windows: Optional[List[Window]] = None, buckets: Optional[List[str]] = None) Aggregation#
Parameters:
  • input_column (str) – Column on which the aggregation needs to be performed. This should be one of the input columns specified on the keys of the select in the Query’s Source

  • operation (ttypes.Operation) – Operation to use to aggregate the input columns. For example, MAX, MIN, COUNT Some operations have arguments, like last_k, approx_percentiles etc., Defaults to “LAST”.

  • windows (List[ttypes.Window]) – Length to window to calculate the aggregates on. Minimum window size is 1hr. Maximum can be arbitrary. When not defined, the computation is un-windowed.

  • buckets (List[str]) – Besides the GroupBy.keys, this is another level of keys for use under this aggregation. Using this would create an output as a map of string to aggregate.

Returns:

An aggregate defined with the specified operation.

ai.chronon.group_by.GroupBy(sources: Union[List[Union[Source, EventSource, EntitySource]], Source, EventSource, EntitySource], keys: List[str], aggregations: Optional[List[Aggregation]], online: Optional[bool] = None, production: Optional[bool] = None, backfill_start_date: Optional[str] = None, dependencies: Optional[List[str]] = None, env: Optional[Dict[str, Dict[str, str]]] = None, table_properties: Optional[Dict[str, str]] = None, output_namespace: Optional[str] = None, accuracy: Optional[Accuracy] = None, lag: int = 0, **kwargs) GroupBy#
Parameters:
  • sources (List[ai.chronon.api.ttypes.Events|ai.chronon.api.ttypes.Entities]) –

    can be constructed as entities or events:

    import ai.chronon.api.ttypes as chronon
    events = chronon.Source(events=chronon.Events(
        table=YOUR_TABLE,
        topic=YOUR_TOPIC #  <- OPTIONAL for serving
        query=chronon.Query(...)
        isCumulative=False  # <- defaults to false.
    ))
    Or
    entities = chronon.Source(events=chronon.Events(
        snapshotTable=YOUR_TABLE,
        mutationTopic=YOUR_TOPIC,
        mutationTable=YOUR_MUTATION_TABLE
        query=chronon.Query(...)
    ))
    

    Multiple sources can be supplied to backfill the historical values with their respective start and end partitions. However, only one source is allowed to be a streaming one.

  • keys (List[String]) – List of primary keys that defines the data that needs to be collected in the result table. Similar to the GroupBy in the SQL context.

  • aggregations (List[ai.chronon.api.ttypes.Aggregation]) –

    List of aggregations that needs to be computed for the data following the grouping defined by the keys:

    import ai.chronon.api.ttypes as chronon
    aggregations = [
        chronon.Aggregation(input_column="entity", operation=Operation.LAST),
        chronon.Aggregation(input_column="entity", operation=Operation.LAST, windows=[Window(7, TimeUnit.DAYS)])
    ],
    

  • online (bool) – Should we upload the result data of this conf into the KV store so that we can fetch/serve this GroupBy online. Once Online is set to True, you ideally should not change the conf.

  • production (bool) – This when set can be integrated to trigger alerts. You will have to integrate this flag into your alerting system yourself.

  • backfill_start_date (str) – Start date from which GroupBy data should be computed. This will determine how back of a time that Chronon would goto to compute the resultant table and its aggregations.

  • dependencies (List[str]) – This goes into MetaData.dependencies - which is a list of string representing which table partitions to wait for Typically used by engines like airflow to create partition sensors.

  • env (Dict[str, Dict[str, str]]) –

    This is a dictionary of “mode name” to dictionary of “env var name” to “env var value”. These vars are set in run.py and the underlying spark_submit.sh. There override vars set in teams.json/production/<MODE NAME> The priority order (descending) is:

    var set while using run.py "VAR=VAL run.py --mode=backfill <name>"
    var set here in Join's env param
    var set in team.json/<team>/<production>/<MODE NAME>
    var set in team.json/default/<production>/<MODE NAME>
    

  • table_properties (Dict[str, str]) – Specifies the properties on output hive tables. Can be specified in teams.json.

  • output_namespace (str) – In backfill mode, we will produce data into hive. This represents the hive namespace that the data will be written into. You can set this at the teams.json level.

  • accuracy (ai.chronon.api.ttypes.SNAPSHOT or ai.chronon.api.ttypes.TEMPORAL) – Defines the computing accuracy of the GroupBy. If “Snapshot” is selected, the aggregations are computed based on the partition identifier - “ds” time column. If “Temporal” is selected, the aggregations are computed based on the event time - “ts” time column.

  • lag (int) – Param that goes into customJson. You can pull this out of the json at path “metaData.customJson.lag” This is used by airflow integration to pick an older hive partition to wait on.

  • kwargs (Dict[str, str]) – Additional properties that would be passed to run.py if specified under additional_args property. And provides an option to pass custom values to the processing logic.

Returns:

A GroupBy object containing specified aggregations.

ai.chronon.join.Join(left: Source, right_parts: List[JoinPart], check_consistency: bool = False, additional_args: Optional[List[str]] = None, additional_env: Optional[List[str]] = None, dependencies: Optional[List[str]] = None, online: bool = False, production: bool = False, output_namespace: Optional[str] = None, table_properties: Optional[Dict[str, str]] = None, env: Optional[Dict[str, Dict[str, str]]] = None, lag: int = 0, skew_keys: Optional[Dict[str, List[str]]] = None, sample_percent: Optional[float] = None, **kwargs) Join#

Construct a join object. A join can pull together data from various GroupBy’s both offline and online. This is also the focal point for logging, data quality computation and monitoring. A join maps 1:1 to models in ML usage.

Parameters:
  • left (ai.chronon.api.Source) – The source on the left side, when Entities, all GroupBys are join with SNAPSHOT accuracy (midnight values). When left is events, if on the right, either when GroupBy’s are TEMPORAL, or when topic is specified, we perform a TEMPORAL / point-in-time join.

  • right_parts (List[ai.chronon.api.JoinPart]) – The list of groupBy’s to join with. GroupBy’s are wrapped in a JoinPart, which contains additional information on how to join the left side with the GroupBy.

  • check_consistency (bool) – If online serving data should be compared with backfill data - as online-offline-consistency metrics. The metrics go into hive and your configured kv store for further visualization and monitoring.

  • additional_args (List[str]) – Additional args go into customJson of ai.chronon.api.MetaData within the ai.chronon.api.Join object. This is a place for arbitrary information you want to tag your conf with.

  • additional_env (List[str]) – Deprecated, see env

  • dependencies (List[str]) – This goes into MetaData.dependencies - which is a list of string representing which table partitions to wait for Typically used by engines like airflow to create partition sensors.

  • online (bool) – Should we upload this conf into kv store so that we can fetch/serve this join online. Once Online is set to True, you ideally should not change the conf.

  • production (bool) – This when set can be integrated to trigger alerts. You will have to integrate this flag into your alerting system yourself.

  • output_namespace (str) – In backfill mode, we will produce data into hive. This represents the hive namespace that the data will be written into. You can set this at the teams.json level.

  • table_properties – Specifies the properties on output hive tables. Can be specified in teams.json.

  • env (Dict[str, Dict[str, str]]) –

    This is a dictionary of “mode name” to dictionary of “env var name” to “env var value”. These vars are set in run.py and the underlying spark_submit.sh. There override vars set in teams.json/production/<MODE NAME> The priority order (descending) is:

    var set while using run.py "VAR=VAL run.py --mode=backfill <name>"
    var set here in Join's env param
    var set in team.json/<team>/<production>/<MODE NAME>
    var set in team.json/default/<production>/<MODE NAME>
    

  • lag – Param that goes into customJson. You can pull this out of the json at path “metaData.customJson.lag” This is used by airflow integration to pick an older hive partition to wait on.

  • skew_keys – While back-filling, if there are known irrelevant keys - like user_id = 0 / NULL etc. You can specify them here. This is used to blacklist crawlers etc

  • sample_percent – Online only parameter. What percent of online serving requests to this join should be logged into warehouse.

Returns:

A join object that can be used to backfill or serve data. For ML use-cases this should map 1:1 to model.

ai.chronon.join.JoinPart(group_by: GroupBy, key_mapping: Optional[Dict[str, str]] = None, prefix: Optional[str] = None) JoinPart#

Specifies HOW to join the left of a Join with GroupBy’s.

Parameters:
  • group_by (ai.chronon.api.GroupBy) – The GroupBy object to join with. Keys on left are used to equi join with keys on right. When left is entities all GroupBy’s are computed as of midnight. When left is events, we do a point-in-time join when right.accuracy == TEMPORAL OR right.source.topic != null

  • key_mapping (Dict[str, str]) – Names of keys don’t always match on left and right, this mapping tells us how to map when they don’t.

  • prefix – All the output columns of the groupBy will be prefixed with this string. This is used when you need to join the same groupBy more than once with left. Say on the left you have seller and buyer, on the group you have a user’s avg_price, and you want to join the left (seller, buyer) with (seller_avg_price, buyer_avg_price) you would use key_mapping and prefix parameters.

Returns:

JoinPart specifies how the left side of a join, or the query in online setting, would join with the right side components like GroupBys.

ai.chronon.query.Query(selects: Optional[Dict[str, str]] = None, wheres: Optional[List[str]] = None, start_partition: Optional[str] = None, end_partition: Optional[str] = None, time_column: Optional[str] = None, setups: List[str] = [], mutation_time_column: Optional[str] = None, reversal_column: Optional[str] = None) Query#

Create a query object that is used to scan data from various data sources. This contains partition ranges, row level transformations and filtering logic. Additionally we also require a time_column for TEMPORAL events, mutation_time_column & reversal for TEMPORAL entities.

Parameters:
  • selects (List[str], optional) –

    Spark sql expressions with only arithmetic, function application & inline lambdas. You can also apply udfs see setups param below.:

    Example: {
        "alias": "built_in_function(col1) * my_udf(col2)",
        "alias1": "aggregate(array_col, 0, (acc, x) -> acc + x)"
    }
    

    See: https://spark.apache.org/docs/latest/api/sql/#built-in-functions When none, we will assume that no transformations are needed and will pick columns necessary for aggregations.

  • wheres (List[str], optional) – Used for filtering. Same as above, but each expression must return boolean. Expressions are joined using AND.

  • start_partition (str, optional) – From which partition of the source is the data valid from - inclusive. When absent we will consider all available data is usable.

  • end_partition (str, optional) – Till what partition of the source is the data valid till - inclusive. Not specified unless you know for a fact that a particular source has expired after a partition and you should instead use another source after this partition.

  • time_column (str, optional) – a single expression to produce time as ** milliseconds since epoch**.

  • setups (List[str], optional) – you can register UDFs using setups [“ADD JAR YOUR_JAR”, “create temporary function YOU_UDF_NAME as YOUR_CLASS”]

  • mutation_time_column (str, optional) – For entities, with real time accuracy, you need to specify an expression that represents mutation time. Time should be milliseconds since epoch. This is not necessary for event sources, defaults to “mutation_ts”

  • reversal_column – str, optional For entities with realtime accuracy, we divide updates into two additions & reversal. updates have two rows - one with is_before = True (the old value) & is_before = False (the new value) inserts only have is_before = false (just the new value). deletes only have is_before = true (just the old value). This is not necessary for event sources.

  • reversal_column – str, optional (defaults to “is_before”)

Returns:

A Query object that Chronon can use to scan just the necessary data efficiently.

class ai.chronon.api.ttypes.Aggregation(inputColumn=None, operation=None, argMap=None, windows=None, buckets=None)#

Chronon provides a powerful aggregations primitive - that takes the familiar aggregation operation, via groupBy in SQL and extends it with three things - windowing, bucketing and auto-explode.

Parameters:
  • inputColumn – The column as specified in source.query.selects - on which we need to aggregate with.

  • operation – The type of aggregation that needs to be performed on the inputColumn.

  • argMap – Extra arguments that needs to be passed to some of the operations like LAST_K, APPROX_PERCENTILE.

  • windows

    For TEMPORAL case windows are sawtooth. Meaning head slides ahead continuously in time, whereas, the tail only hops ahead, at discrete points in time. Hop is determined by the window size automatically. The maximum hop size is 1/12 of window size. You can specify multiple such windows at once.

    Window > 12 days -> Hop Size = 1 day

    Window > 12 hours -> Hop Size = 1 hr

    Window > 1hr -> Hop Size = 5 minutes

  • buckets – This is an additional layer of aggregation. You can key a group_by by user, and bucket a “item_view” count by “item_category”. This will produce one row per user, with column containing map of “item_category” to “view_count”. You can specify multiple such buckets at once.

Attributes:
  • inputColumn

  • operation

  • argMap

  • windows

  • buckets

class ai.chronon.api.ttypes.EntitySource(snapshotTable=None, mutationTable=None, mutationTopic=None, query=None)#

Entity Sources represent data that gets mutated over-time - at row-level. This is a group of three data elements. snapshotTable, mutationTable and mutationTopic. mutationTable and mutationTopic are only necessary if we are trying to create realtime or point-in-time aggregations over these sources. Entity sources usually map 1:1 with a database tables in your OLTP store that typically serves live application traffic. When mutation data is absent they map 1:1 to dim tables in star schema.

Parameters:
  • snapshotTable – Table currently needs to be a ‘ds’ (date string - yyyy-MM-dd) partitioned hive table.

  • mutationTable – Topic is a kafka table. The table contains all the events historically came through this topic.

  • mutationTopic – The logic used to scan both the table and the topic. Contains row level transformations and filtering expressed as Spark SQL statements.

  • isCumulative – If each new hive partition contains not just the current day’s events but the entire set of events since the begininng. The key property is that the events are not mutated across partitions.

Attributes:
  • snapshotTable

  • mutationTable

  • mutationTopic

  • query

class ai.chronon.api.ttypes.EventSource(table=None, topic=None, query=None, isCumulative=None)#

Event source captures data that is essentially immutable - like user clicks, impressions etc. It has two parts, an offline table and an online topic. The term fact table from star-schema also maps to this concepts. But lacks a notion of topic.

Parameters:
  • table – Table currently needs to be a ‘ds’ (date string - yyyy-MM-dd) partitioned hive table.

  • topic – Topic is a kafka table. The table contains all the events historically came through this topic.

  • query – The logic used to scan both the table and the topic. Contains row level transformations and filtering expressed as Spark SQL statements.

  • isCumulative – If each new hive partition contains not just the current day’s events but the entire set of events since the begininng. The key property is that the events are not mutated across partitions.

Attributes:
  • table

  • topic

  • query

  • isCumulative

class ai.chronon.api.ttypes.StagingQuery(metaData=None, query=None, startPartition=None, setups=None)#

Staging Query encapsulates arbitrary spark computation. One key feature is that the computation follows a “fill-what’s-missing” pattern. Basically instead of explicitly specifying dates you specify two macros. {{ start_date }} and {{end_date}}. Chronon will pass in earliest-missing-partition for start_date and execution-date / today for end_date. So the query will compute multiple partitions at once.

Parameters:
  • metaData – Contains name, team, output_namespace, execution parameters etc. Things that don’t change the semantics of the computation itself.

  • query – The spark sql query with date templates.

  • startPartition – This is where start_date will be applied from. We expect the query to also produce output hive tables partitioned data starting from startPartition.

  • setups – Spark SQL setup statements. Used typically to register UDFs.

Attributes:
  • metaData

  • query

  • startPartition

  • setups

class ai.chronon.api.ttypes.Window(length=None, timeUnit=None)#
Attributes:
  • length

  • timeUnit