How to Create an Effective Enterprise Data Strategy: Part 2
A Guide for How We Develop a Scalable, Secure, and Efficient Data Sharing Application to serve Enterprise Data Strategy
TLDR
Introduction
Recap of Part 1
In Part 1, we discussed why having a strong enterprise data strategy is important and the architectural choices we made to fit our needs.
A big part of an enterprise data strategy is data sharing. In this article, we'll dive into how we built our data sharing application, the tools and technology we used, and the valuable lessons we picked up along the way.
The need
Even before we jump in, I want to discuss what were the most important requirements that we were aiming to solve
We chose the
Databricks Lakehouse
platform for data storage. It's based on theSpark
ecosystem, usingSpark SQL
orPySpark
for data interaction. However, most users and tools don't primarily use these methods.Most of our users are financial analysts who have long used
Ms Excel
for their workflows. We aimed to create a solution that securely provides them with authorized data access.
System Backend Architecture
To select the backend architecture we took inspiration from how two service communicate with each other - APIs
. We explored multiple API Driven Architecture
like REST API
, GraphQL
, gRPC
, but ultimately decided to go ahead with REST API
due to its widespread compatibility with tools (including Excel π)
We used FastAPI
as the REST API
framework.
DRY (Don't Repeat Yourself)
, DIRFT (Do It Right the First Time)
. Establishing the project's structure is crucial, as it ensures a streamlined API design that maintains simplicity while offering the flexibility needed to incorporate new features seamlessly.Project Structure
βββ data_sharing/
β βββ dependency/
β β βββ auth.py
β β βββ verification.py
β βββ routers/
β β βββ admin.py
β β βββ users.py
β β βββ table.py
β βββ models.py
β βββ config.py
βββ main.py
We designed the structure of the project keeping modularity as a priority. Not everything is cramped into
main.py
. Follow this doc for more info.All the routers have their modules. Similarly, dependencies too have their modules.
We also have a dedicated
models
module to declare all sortsPydantic
,Enum
,Dataclass
, etc models. Similarly a module for the environmentconfig
to make the codebase truly environment-agnostic without changing a single line of code.
API Endpoints
These are some of the important endpoints
GET /api/v1/users
: To get a list of all user detailsGET /api/v1/users/{user_id}
: To get details of specificuser_id
.PUT /api/v1/users/{user_id}/rbac
: Override specificuser_id
RBAC
with provided valuesPATCH /api/v1/users/{user_id}/rbac?policy-level
: Change the specificRBAC
policy level of the givenuser_id
.DELETE /api/v1/users/{user_id}
: Delete the givenuser_id
from the system.GET /api/v1/dataset
: To get a list of all dataset details.GET /api/v1/dataset/{dataset}
: To get the details of the givendataset
GET /api/v1/dataset/{dataset}/table
: To get the list of all tables in the givendataset
GET /api/v1/dataset/{dataset}/table/{table}
: To get thetable
data residing in the givendataset
POST /api/v1/dataset/{dataset}/table/{table}
: To write data aspayload
to the giventable
of the givendatset
POST /api/v1/dataset/{dataset}/table/{table}/file
: To write data asfile upload
usingForm data
protocol to the giventable
of the givendatset
Authentication
The most important part of any authentication logic is getting the identity of the user. We are using Auth0
& SSO
provided by it. Letβs see in action how to implement it in FastAPI
.
The biggest challenge that we had was to integrate
SSO login
withSwagger Docs
. We started with the official docs provided by Auth0. This is more on the manual side forU2M
OAuth flow.As I said earlier our goal was to use
SSO login
instead. So we instead usefastapi-auth0
python library which enabled us to use SSO login write into Swagger docs.To abstract all the complexity from end user we combined
Security
withDependency Inject
of FastAPI.π‘FastAPI doesenβt really restricts us with Dependency Injection. We can go craazy creative using it. Its one of the best feature that FastAPI provides us.
# For Auth we need couple of models. This is written in data_sharing/modes.py
from pydantic import (
BaseModel,
EmailStr,
Field,
RootModel
)
class RootAuth0User(RootModel):
root: dict[str, Any]
def __iter__(self):
return iter(self.root)
def __getitem__(self, item):
return self.root[item]
class Auth0APIUser(BaseModel):
user_id: str
primary_email: EmailStr
sub: str
aud: str
iat: datetime
exp: datetime
azp: str
# This SSO Auth code is written in data_sharing/dependency/auth.py
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader, HTTPBasic
from fastapi_auth0.auth import Auth0, JwksDict
from data_sharing.models import (
Auth0APIUser,
RootAuth0User,
)
# Security flow to authenticate user using Auth0
auth0_authentication = Auth0(
domain="<your auth0 domain>",
api_audience="<your auth0 api audience>",
auth0user_model=RootAuth0User, # type: ignore
auto_error=False,
)
async def _get_user_using_auth0_sso_flow(
user: Annotated[RootAuth0User, Security(auth0_authentication.implicit_scheme)],
) -> RootAuth0User:
"""FASTApi Security injection to perform user auth using auth0 sso flow."""
return user
async def _get_user_using_auth0_implicit_flow(
user: Annotated[RootAuth0User, Security(auth0_authentication.get_user)],
) -> RootAuth0User:
"""FastAPI Security injection to perform user auth using auth0 implicit (Bearer token) flow."""
return user
# Once successfull SSO login & code exchange with Auth0, we have to convert it to actucal user
async def get_user(
sso_user: Annotated[RootAuth0User, Depends(_get_user_using_auth0_sso_flow)],
implicit_user: Annotated[
RootAuth0User, Depends(_get_user_using_auth0_implicit_flow)
],
) -> Auth0APIUser:
"""FastAPI Security injection to get the user based on the various auth flow"""
# NOTE - keep adding all the auth mechanism here to get the user
if sso_user:
# If you don't get response that follows AuthOAPIUser model then you
# may neeed to writea small function to convert it, like below
# return auth0_api_response_to_user(sso_user.model_dump())
return Auth0APIUser(**sso_user.model_dump())
elif implicit_user:
# return auth0_api_response_to_user(implicit_user.model_dump())
return Auth0APIUser(**sso_user.model_dump())
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No authorization token provided",
)
Authorization
We cannot treat authorization as some afterthought. Itβs equally important. As stated earlier we use
RBAC Strategy
for authorization.Itβs implementation is again powered by
FastAPI Dependency
. (See thatβs why I say I love it β€οΈ)
# All this authorization & verification code is written in data_sharing/dependency/verification.py
import polars as pl
async def role_verification(user: UserInfo, claim: str) -> UserInfo:
# checking if requested role is present
user_rbac = RBACPolicyClient(user_policy=user.model_dump())
try:
user_rbac.enforce_claim(policy_level=PolicyLevel.role, claim=claim)
except RBACPolicyViolation as e:
logger.warning(
f"{user.primary_email} not approved for {claim}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"{decrypt_data(ENCRYPTION_KEY,user.primary_email)} don't have role: {claim}",
) from e
return user
async def scope_verification(user: UserInfo, claim: str) -> UserInfo:
user_rbac = RBACPolicyClient(user_policy=user.model_dump())
try:
user_rbac.enforce_claim(policy_level=PolicyLevel.scope, claim=claim)
except RBACPolicyViolation as e:
logger.warning(
f"{user.primary_email} not approved for {claim}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"{user.primary_email} don't have scope: {claim}",
) from e
return user
async def permission_verification(user: UserInfo, permission: str) -> UserInfo:
# checking if requested permission is present
if user.permission is not None and permission not in user.permission:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user don't have permission: {permission}",
)
return user
async def user_presence_dependency(
request: Request, user: Annotated[Auth0APIUser, Depends(get_user)],
) -> UserInfo:
current_user_df = pl.read_database(
current_user_filter(user.user_id), request.app.state.cursor
)
if current_user_df.is_empty():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{user.primary_email} not found",
)
return UserInfo.from_polars(current_user_df)
async def dataset_presence_dependency(request: Request, dataset: str
) -> DatasetInfo:
current_dataset_df = pl.read_database(
current_dataset_filter(dataset), request.app.state.cursor
)
if current_dataset_df.is_empty():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{dataset} not found",
)
return DatasetInfoV2.from_polars(current_dataset_df)
async def table_presence_dependency(
dataset: Annotated[DatasetInfo, Depends(dataset_presence_dependency)], table:
) -> str:
if dataset.table is None or table not in dataset.table:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail=f"table: {table} not found in dataset: {dataset.dataset}",
)
return table
async def write_table_scope_dependency(
user: Annotated[UserInfo, Depends(user_presence_dependency)],
dataset: str,
table: str,
) -> UserInfo:
scope_condition = f"{dataset}:{table}::write"
return await scope_verification(user=user, claim=scope_condition)
async def read_table_scope_dependency(
user: Annotated[UserInfo, Depends(user_presence_dependency)],
dataset: str,
table: str,
) -> UserInfo:
scope_condition = f"{dataset}:{table}::read" # read scope will present by default
return await scope_verification(user=user, claim=scope_condition)
async def engineering_admin_role_dependency(
user: Annotated[UserInfo, Depends(user_presence_dependency)],
) -> UserInfo:
return await role_verification(user=user, claim=FixedString.ENGINEERING_ADMIN.value)
async def dataset_owner_role_dependency(
user: Annotated[UserInfo, Depends(user_presence_dependency)], dataset: str
) -> UserInfo:
return await role_verification(user=user, claim=f"{dataset}::dataset_owner")
Routers in action
Lets see few example how we can combine all the above to create API Routers
.
# All user router code is written in data_sharing/router/users.py
# SECTION - User router
router = APIRouter(prefix="/api/v1", tags=[APITags.user])
@router.get("/user/{user_id}")
async def _get_user(
user: Annotated[UserInfo, Depends(user_presence_dependency)],
user_id: str
request: Request,
) -> UserInfoV2:
"""Get the details of specified `user` along with their access"""
# Your respective logic to get user details will come here
@router.put(
"/user/{user_id}/rbac",
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(engineering_admin_role_dependency)],
)
async def _replace_user_rbac_v2(
user_id: str, user_rbac: UserRBACInfo
) -> dict[str, str]:
"""Replace user's complete RBAC policy.
>**NOTE:** This will completely overwrite the existing RBAC policy of the user. Use with caution.
"""
# Your logic to override user RBAC policies will come here
You can see how we used the right dependency injection to handle both authentication and authorization. It looks pretty neat, doesn't it? Let's check out some more examples.
@router.get(
"/dataset/{dataset}/table/{table}",
dependencies=[Depends(read_table_scope_dependency)],
response_class=ORJSONResponse,
)
async def read_data(
dataset: Annotated[DatasetInfoV2, Depends(dataset_presence_dependency)],
table: Annotated[str, Depends(table_presence_dependency)],
filter_query: Annotated[FilterQueryParams, Query()],
request: Request,
) -> ORJSONResponse:
"""Get data from desired dataset & table"""
# Code to read table data will come here
return ORJSONResponse(query_data.to_dicts())
Here you can see we used three dependency injections in two different ways
read_table_scope_dependency
: It is declared in the router decorator as this dependency is not expected to return anything.dataset_presence_dependency
&table_presence_dependency
: This is declared asPath parameter
. Thatβs because they return some values that will be used downstream.
You will also note that any user RBAC
related dependency is not used here. Thatβs because the dependencies here are multilevel nested dependencies. A level above dependency will perform user authentication & authorization.
Interact with Databricks Lakehouse
This is like the heart of the Data Sharing system. To provide the ability to read & write data in Lakehouse outside of Databricks Spark ecosystem
. I have already discussed the API Endpoint
part of it, now letβs dive into core logic.
Phase 1: Databricks Delta Sharing
Databricks has a decent native sharing product - Delta Sharing
. Follow the docs on how to create a Share
& Recipient
(this is not the scope of this article). Once it was done, we had to do the following
To consume the data in Python we had two options -
Spark
&Pandas
. We obviously chosePandas
because that was the whole point.The syntax to read the data on surface looks pretty straight forward
import delta_sharing
# <profile-path>: the location of the credential file.
# <share-name>: the value of share= for the table.
# <schema-name>: the value of schema= for the table.
# <table-name>: the value of name= for the table.
data = delta_sharing.load_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>"
)
- But in the practical world, it is never this straightforward. As a good governance policy, we used to maintain a dedicated share for every schema and a dedicated recipient for every share. This allowed us to streamline our governance policies. The goal of API was to make every data accessible regardless of which schema it belongs to. So, we had to store multiple credential files and load them dynamically based on table names.
# We stored all the credential (encrypted) in the database. So based on
# given schema, it was read from database
current_cred_config = await shareCredential.find_one(
shareCredential.schema_name == schema
)
# getting delta share cred
cred = current_cred_config.share_credential.model_dump()
# NOTE - the token is encrypted, so need to decrypt it first
cred["bearerToken"] = decrypt_data(ENCRYPTION_KEY, cred["bearerToken"])
# WARNING - to create delta sharing client it needs cred file physically present. So deleting it
# as soon as client is created
with open("./config.share", "w") as f:
json.dump(cred, f)
# NOTE - below is the specific `URL` format that delta sharing client needs
table_url = f"./config.share#{current_cred_config.share_name}.{schema}.{table}"
data = delta_sharing.load_as_pandas(table_url)
logger.info(f"successfully retrieve {schema}.{table} for {user_info.email_id}")
# deleting config
cred_config_clean_up()
Pros
Readily available sharing capability provided by Databricks.
No limit to how many catalogs can used to share. No limit on Share & Recipient creation.
Cons
Databricks often updates the
Reader
andWriter
protocols with new features. This means Delta Sharing needs to catch up, and we must ensure we aren't using unsupported table properties.The
credential
created in Databricks forRecipient
do get expired. So we need to frequently rotate them. This can be cumbersome.But by far the biggest problem (which was eventually a deal breaker for us) with its
pandas
driver is no support foroffset
. They do have support forlimit
, but without offset,limit
have very limited benefits.We could not use pagination.
We literally have to load all data in memory which is not practical.
It may not be a Con but
Delta Sharing
is designed for just reading data. So we could not addWrite
data functionality.
Due to all these issues, we had to make architectural changes.
Phase 2: Open source tools
As I stated in Part 1, one of the primary reasons to select Databricks lakehose
because it uses open source Delta lake
format to store table data. So we started to look for other open-source tools that we can utilize here. We settled on the wonderful Polars
library (If you're not already using it, you should definitely start right away! I can't recommend it enough).
It has built-in support to read Delta Lake tables, thanks to the Deltalake
library. It also supports Lazy read
, which works like Python generators, along with Offset
and Limit
. Plus, since we were already using Polars
, we didn't need to make any big changes to our workflows, which was a huge plus for us!
# Reading Delta table Data with few filter options
import polars as pl
# setting up options for reading delta table using polars
storage_options = {
"account_name": "<storage_account_name>",
"account_key": "<storage_account_key>",
}
pyarrow_options = {"parquet_read_options": {"coerce_int96_timestamp_unit": "ms"}} # make sure to set the correct timestamp unit
data = pl.scan_delta(
source="<path_to_delta_table>", # EG - "abfss://<container>@<storage_account_name>.dfs.core.windows.net/path_to/delta_table"
pyarrow_options=pyarrow_options,
storage_options=storage_options,
)
# Creating query plan to get the data based on the query params.
if columns: # NOTE - We used this as query param in fastapi to fetch only required columns
data = data.select(columns)
if row_count: # NOTE - We used this as query param in fastapi to fetch only required absolute no of rows
data = data.limit(row_count)
elif page: # NOTE - We used this as query param in fastapi to fetch data based on pagination
data = data.slice(
offset=page * env_config.page_size, length=env_config.page_size
)
# executing query plan
query_data = await data.collect_async(streaming=True)
# NOTE - If not using aysnc then use below code
# query_data = data.collect(streaming=True)
# WARNING - fastapi cannot handle polars dataframe while sending response back, so converting
# it to python dict
json_data = query_data.to_dicts()
# Writing data using Delta Merge
storage_options = {
"account_name": "<storage_account>",
"account_key": "<storage_key>",
}
merge_ops_output = (
data.write_delta( # NOTE - data is a Polars dataframe
target="<storage_location>",
mode="merge",
storage_options=storage_options,
delta_merge_options={
"predicate": "<predicate>", # condition to determine upsert requirement. e.g. "source.id = target.id"
"source_alias": "source",
"target_alias": "target",
},
)
.when_matched_update_all() # updating all columns for a row already present
.when_not_matched_insert_all() # inserting all columns for a new row
# Delta table supports a lot more merge operation, check them out based
# your requirement
.execute()
)
Pros
This uses open-source tools.
Supports proper
Offset
&Limit
which allowed us to developPagination
.It supports all
Delta Merge
operations out of the box which allowed us to developWrite data
functionality.
Cons
To read or write data, we need to know the storage location of the table in advance. To simplify this for users, who only need to provide the table name, we keep a database with the storage locations of all tables. Additionally, there is a process that regularly updates this database.
The biggest issue with this approach is again the same issue that we had with the previous approach
Reader
&Writer
protocol version. Even here the version was not updated frequently & we could not use the latest features developed by Databricks.As a temporary fix, we used to maintain two tables - the first one was the original table with all the latest table properties & a copy table stripped with all table properties.
Phase 3: Databricks Connect
They say the third is the charm. This was the case even for us too.
As discussed in the previous two phases we had to struggle with multiple pain points. We managed them for some time but none of them were permanent full-proof solutions. So while maintaining the current architecture, we kept our research engine going. We tried multiple approaches & finally settled on Databricks Connect.
Itβs a tool provided by Databricks to run Spark (both SQL & PySpark) jobs on Databricks infrastructure remotely. It includes Databricks SQL Connector
to work with Spark SQL & Databricks Connect for Python
(based on Spark Connect) to work with PySpark.
Let's explore how to create the Cursor
to interact with the Delta table in Lakehouse. Databricks offers a bunch of connection and authentication options, which you can check out here. Since we're using it in the application, we went with OAuth machine-to-machine (M2M) authentication.
from databricks.core.config import Config
from databricks.sdk.core import oauth_service_principal
from databricks.sql.exc import RequestError
from databricks.sql import connect
@asynccontextmanager
async def setup_databricks_cursor(app: FastAPI):
def _oauth_cred():
return oauth_service_principal(db_cfg)
def _get_cursor():
try:
return connect(
server_hostname="<databricks warehouse/cluster host>", # make sure to grab serverless warehouse
http_path="<databricks http path>", # make sure to grab serverless warehouse
credentials_provider=_oauth_cred,
).cursor()
except RequestError:
logger.debug("either token expired or connection with databricks lost, getting new")
return _get_cursor()
# Adding cursor to FastAPI app state so that it becomes available to all routers
app.state.cursor = _get_cursor()
logger.debug("successfully connected to databricks")
yield
app.state.cursor.close()
logger.debug("successfully closed databricks connection")
app = FastAPI(
debug=True,
title="Your title",
lifespan=setup_databricks_cursor, # NOTE - here we are passing the async context manager created above as a lifespan
)
# For Reading we used Databricks SQL Connector
import polars as pl
from sqlglot import Dialects, select
from sqlglot.errors import ParseError
try:
columns = select(*filter_query.column) if filter_query.column else select("*") # NOTE - We used this as query param in fastapi to fetch only required columns
query = columns.from_(f"{dataset.catalog}.{dataset.dataset}.{table}") # Full table name. EG - "catalog_name.dataset_name.table_name"
if filter_query.filter: # NOTE - We used this as query param in fastapi to filter the data based SQL Where clause
query = query.where(filter_query.filter)
if filter_query.order_by: # NOTE - We used this as query param in fastapi to order the data based on SQL Order By clause
query = query.order_by(filter_query.order_by)
if filter_query.row_count: # NOTE - We used this as query param in fastapi to limit the number of rows fetched
query = query.limit(filter_query.row_count)
elif filter_query.page: # NOTE - We used this as query param in fastapi to enable pagination
query = query.offset((filter_query.page - 1) * 1000).limit(1000)
# executing query plan
query_data = pl.read_database(
query.sql(dialect=Dialects.DATABRICKS), request.app.state.cursor # SQLGlot converts above query to Databricks SQL dialect
)
data = ORJSONResponse(query_data.to_dicts()) # FastAPI as native support for ORJSONResponse, which is faster than JSONResponse
# Error coming from SQLGlot due to incorrect sql query
except ParseError as e:
# You can add your custom exception handling here
One important point to note for using Databricks Connect is the connection gets destroyed in 10 minutes of inactivity and we have to manually create a new connection again. To solve this we used Python context manager.
# Setting up Databricks connect to execute PySpark code
@contextmanager
def spark_manager(profile: str | Config = "DEFAULT"):
if isinstance(profile, str):
cfg = _get_databricks_config(profile)
# NOTE - It's not necessary to use config. But I am using it to make it more flexible &
# able to use any profile/environment as needed.
if isinstance(profile, Config):
cfg = profile
spark = DatabricksSession.builder.sdkConfig(cfg).getOrCreate()
yield spark
# NOTE - We don't need to close the session as it will be automatically closed in case of 10
# minutes of inactivity by databricks. Also one more added benefit is that some other workload
# can use the same session if it's not closed.
# WARNING - If you still want to close the session, you can uncomment the below line.
# spark.stop()
db_cfg = Config(
host=env_config.databricks_host,
client_id="<databricks service principal client id>",
client_secret= "<databricks service principal client secret>"
serverless_compute_id="auto", # To use serverless cluster
)
# This will make sure we'll always gets an connection. If there is already one
# it will use it. If not present then it will create new & use it.
with spark_manager(db_cfg) as spark:
# your PySpark code will come here
Pros
No limitations on Delta tables with any table properties. In fact, we can even query
Views
which is not possible in the first two phases.Since we are using Databricks SQL, it brings all sorts of SQL capabilities like Select, Offset, Limit, Where, Order By, etc.
We can use Databricks Serverless Compute, that is shared with other workloads, So it requires no additional infrastructure cost.
No need to maintain any temporary solution.
Cons
- Vendor lock-in as we have to use tools provided by Databricks.
Conclusion
Wrapping up, creating a solid enterprise data strategy is all about planning and executing well across different stages.
Our journey from Databricks Delta Sharing to open-source tools like Polars, and finally to Databricks Connect, shows how important it is to stay flexible and keep improving to tackle challenges like data access and performance.
By focusing on things like modularity, authentication, and authorization, and embracing open-source solutions, businesses can create a scalable and efficient data strategy that keeps up with what users need.