Integration Guide#
Chronon essentially can operate in two modes - online and offline. In this doc we are going to talk about how to integrate Chronon online. Online we maintain real-time refreshed or daily refreshed “views”.
For example:
Number of views of an item in the last 30 days - from an item view stream.
Average review length of an item - from a review database table.
The computations are expressed in a python API with spark sql expressions.
Initial Setup#
Install chronon pip package - one time setup per user machine.
pip install chronon-ai
Pick where you want to host, or are already hosting the Chronon configs. Add this also to
PYTHONPATH
. We recommend adding Chronon configs into your Airflow repo - for ease of deployment.
export CHRONON_REPO_PATH=/to/where/you/want_or_have/chronon/configs
export PYTHONPATH=$CHRONON_REPO_PATH:$PYTHONPATH
Setup the chronon definition repo - one time setup per company
mkdir -p $CHRONON_DIR
cd $CHRONON_DIR
git init
git remote add -f origin git@github.com:airbnb/chronon.git
git config core.sparseCheckout true
echo "api/py/test/sample" >> .git/info/sparse-checkout
git pull origin master
rm -rf .git
mv -v api/py/test/sample/* ./
rm -rf api
You can look at the examples under
group_bys
orjoins
orstaging_queries
Compile#
Compilation to JSON is done via compile.py
. Chronon can interpret the compiled artifacts
for managing metadata, creating data processing pipelines, online serving etc.
compile.py --conf=<conf_type>/<team>/<conf_name>.py
conf_type
can be “staging_queries” or “group_bys” or “joins”team
is your team name as registered in teams.jsonconf_name.py
is the python file where you put your compute definition.
Run#
There are three main API concepts that can be executed in Chronon - GroupBy, Join & StagingQuery.
Please familiarize yourself with the Python API.
There are a few modes in which these primitives can be executed using run.py
.
GroupBy - backfill, upload, streaming, fetching, analyze
Join - backfill, fetching, analyze, consistency
StagingQuery - backfill
Each of these modes take the compiled json of the python API and do a specific action.
Offline modes#
analyze - does full validation of the sql expressions used in the python API, prints the result schema and optionally does heavy hitter/hot key detection.
backfill - mode reads data from hive and produces data into hive according to compute definition.
Online modes#
upload - reads data from hive and bulk uploads data into KVStore.
streaming - reads data from kafka and adds into KVStore.
fetch - reads values from kv store, does some compute if necessary and return a result.
consistency - runs the job to compare online served values against backfilled values. See Online-Offline Consistency doc
You can run the configs in their modes using run command
run.py --mode=<MODE> --conf=<PATH>/<TO>/<YOUR>/<DEFINITION>
You can directly tune the parameters setup as env-vars set in spark_submit.sh
or spark_submit_streaming.sh via run.py
script
EXECUTOR_MEMORY=16G PARALLELISM=2000 run.py --mode=backfill --conf=production/joins/<your_team>/<your_join>
# or
EXECUTOR_MEMORY=2G PARALLELISM=8 run.py --mode=streaming --conf=production/group_bys/<your_team>/<your_group_by>
Explore#
Finally, if you want to explore existing feature definitions in Chronon, use explore.py
with any key word like user
, views
etc.,
This will list out instances where, source table, group_by name, aggregate column name or join name contains the keyword.
explore.py <KEYWORD>
Integrations#
![Architecture](../images/Overall Architecture.png)
There are essentially four integration points:
Chronon Repository - This is where your users will define Chronon configurations. We recommend that this live within an airflow pipeline(or your own data pipeline scheduler’s) repository to make deployment easy. Once you have the repository setup you can begin using Chronon for offline batch pipelines.
For online Serving
KVStore - for storing and serving features in low latency. This can be any kv store that can support point write, point lookup, scan and bulk write.
StreamDecoder - for reading bytes from kafka and converting them into a Chronon Event or a Chronon Mutation. If you have a convention between how you convert data in kafka into data in warehouse, you would need to follow that same convention to decode as well.
Airflow - for scheduling spark pipelines that periodically checks and triggers missing tasks, join backfills, group by uploads, meta data uploads, etc.
Repository Setup#
You can pull in a template for setting up the initial Chronon repository.
wget https://github.com/chronon-ai/conf-template/archive/refs/heads/main.zip
tar -xvzf main.zip
KV Store API#
KVStore API has three methods for you to implement (once per company).
multiPut
of a series of tripleskeys, value, time
calledTimedValue
.multiGet
of a series of tupleskeys, Option[time]
- which returnTimedValue
. You can map time as a secondary key that comes standard in no-sql stores.bulkPut
- to upload a hive table into your kv store. If you have another mechanism (like an airflow upload operator) to upload data from hive into your kv stores you don’t need to implement this method.
object KVStore {
// `afterTsMillis` implies that this is a range scan of all values with `timestamp` >= to the specified one. This can be implemented efficiently, if `timestamp` can be a secondary key. Some databases have a native version id concept which also can map to timestamp.
case class GetRequest(keyBytes: Array[Byte], dataset: String, afterTsMillis: Option[Long] = None)
// response is a series of values that are
case class TimedValue(bytes: Array[Byte], millis: Long)
case class GetResponse(request: GetRequest, values: Try[Seq[TimedValue]]) {
def latest: Try[TimedValue] = values.map(_.maxBy(_.millis))
}
case class PutRequest(keyBytes: Array[Byte], valueBytes: Array[Byte], dataset: String, tsMillis: Option[Long] = None)
}
trait KVStore {
def create(dataset: String): Unit
// Used by the Chronon client to fetch features
def multiGet(requests: Seq[GetRequest]): Future[Seq[GetResponse]]
// Used by spark streaming job to write values
def multiPut(keyValueDatasets: Seq[PutRequest]): Future[Seq[Boolean]]
// Used by spark upload job to bulk upload data into kv store
def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit
}
Stream Decoder API#
Simple deserializer API to decode bytes in a kafka stream into java values.
Java values are expected to be a Mutation
- which capture two kinds of data.
Events - Eg., Bank transactions -
source_account
,target_account
,amount
,timestamp
Mutations - Eg., Bank Balance table with row:
account
,balance
,update_at
A mutation event would contain a
before
row and anafter
rowBoth
before
andafter
haveaccount
,balance
,updated_at
.If mutation is a db table insert - before is null, after should contain values.
If mutation is a table delete - after is null, before should contain value.
If mutation is an update - both before and after should be present.
Events are special of Mutations / inserts. Before is always null, only
after
is specified.
You can use Apache Debezium if you don’t have a Mutation capture system in place. Until
then you can simply use Chronon for Events
case for online serving of realtime features.
case class Mutation(schema: StructType = null, before: Array[Any] = null, after: Array[Any] = null)
abstract class StreamDecoder extends Serializable {
def decode(bytes: Array[Byte]): Mutation
def schema: StructType
}
Chronon has a type system that can map to Spark’s or Avro’s type system. Schema is based on the below table which contains Java types corresponding to the Chronon schema types. StreamDecoder should produce mutations that comply.
Chronon Type |
Java Type |
---|---|
IntType |
java.lang.Integer |
LongType |
java.lang.Long |
DoubleType |
java.lang.Double |
FloatType |
java.lang.Float |
ShortType |
java.lang.Short |
BooleanType |
java.lang.Boolean |
ByteType |
java.lang.Byte |
StringType |
java.lang.String |
BinaryType |
Array[Byte] |
ListType |
java.util.List[Byte] |
MapType |
java.util.Map[Byte] |
StructType |
Array[Any] |
Putting stream decoder and kv store together, you need to implement the abstract class below.
abstract class Api(userConf: Map[String, String]) extends Serializable {
def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): StreamDecoder
def genKvStore: KVStore
}
userConf
is captured from commandline arguments to the run.py
script or to the
chronon-uber-jar
with ai.chronon.spark.Driver
as the main class -Zkey1=value1 -Zkey2=value2
becomes {key1: value1, key2: value2}
initializer argument to the Api class. You can use
that to set KVStore params, or kafka params for streaming jobs or bulk upload jobs.