Custom Source¶
Nilus includes a growing collection of built-in connectors for commonly used databases, APIs, and platforms. For data sources that are not natively supported, integration is enabled through Custom Sources.
Custom Sources allow the development of user-defined connectors while utilizing Nilus infrastructure for orchestration, incremental data loading, and movement into the DataOS Lakehouse or other Supported Destinations.
Implementation Overview of Custom Sources¶
- Custom Sources are used to extend the Nilus platform with additional connectors.
- A Custom Source is implemented by subclassing the
CustomSource
class and defining one or more resources. - In Custom Sources, configuration and parameterization are handled through URI-based connection strings.
- Incremental loading is also supported in the Custom Sources when state tracking is enabled within the connector.
- Custom sources are deployed by referring to the custom source repository within a Nilus workflow.
Warning
Dependency Management for Custom Sources
- Only include new dependencies required by the custom source or destination.
- Do not add libraries that are already part of the existing environment.
- This prevents duplication and avoids version conflicts.
Pre-Installed Packages: See the requirements.txt, to know the installed packages or Download.
Implementation Guide¶
Here's a streamlined guide to help you implement custom sources for user-defined connectors effectively.
Step 1: Base Class Requirements
All custom sources must subclass the CustomSource
base class:
from nilus import CustomSource
class MyCustomSource(CustomSource):
def handles_incrementality(self) -> bool:
return False
def nilus_source(self, uri: str, table: str, **kwargs):
return my_source_function(uri, table, **kwargs)
handles_incrementality()
: ReturnsTrue
If- The connector supports incremental loading.
nilus_source()
: Defines the entry point Nilus will invoke during execution.
Step 2: Resource Definition
Resources represent discrete data-fetching operations. Each resource must be defined as a Python generator function and registered using Nilus decorators:
import nilus
@nilus.resource()
def fetch_data():
for item in data:
yield item
@nilus.source
def my_source_function(uri, table, **kwargs):
return fetch_data()
@nilus.resource()
: Used to annotate resource functions.@nilus.source
: Used to register the source entry point.- Resource functions must yield one item at a time.
Step 3: URI and Parameter Handling
Nilus uses URI-based connection strings to configure custom sources. Parameters can be passed using standard query syntax:
To extract parameters:
from urllib.parse import urlparse, parse_qs
def nilus_source(self, uri: str, table: str, **kwargs):
parsed_uri = urlparse(uri)
params = parse_qs(parsed_uri.query)
return my_source_function(
uri=uri,
table=table,
param1=params.get("param1", [None])[0],
param2=params.get("param2", [None])[0],
**kwargs
)
Step 4: Incremental Loading
To support incremental data extraction, state tracking must be implemented. Nilus persists resource state between runs:
class MyIncrementalSource(CustomSource):
def handles_incrementality(self) -> bool:
return True
@nilus.resource()
def fetch_incremental_data(last_processed_id=None):
state = nilus.current.resource_state()
last_id = state.get("last_id", last_processed_id)
data = get_data_after_id(last_id)
if data:
state["last_id"] = data[-1]["id"]
for item in data:
yield item
Step 5: Error Handling
Connection and data-fetching errors must be handled explicitly to ensure reliability:
Info
If table_name
is not defined in the @nilus.resource
decorator; it defaults to the function name. Check the Example Implementations section below for more details.
@nilus.resource()
</strong>def fetch_data_with_error_handling():
try:
conn = establish_connection()
try:
data = conn.get_data()
for item in data:
yield item
finally:
conn.close()
except Exception as e:
logger.error(f"Error fetching data: {str(e)}")
raise
Step 6: Performance Optimization
For large datasets, batching and pagination must be implemented to ensure efficiency:
@nilus.resource()
def fetch_large_dataset(batch_size=1000):
page = 1
has_more = True
while has_more:
data, has_more = get_paginated_data(page=page, size=batch_size)
for item in data:
yield item
page += 1
Deployment Configuration¶
After creating the source files for your custom sources, deploy them using the Workflow as follows:
Repository Structure¶
Custom sources must follow a standard Python package layout:
Workflow Configuration¶
To deploy the custom source, reference the repository in a Nilus Workflow:
repo:
url: "https://github.com/your-repo"
syncFlags:
- "--ref=main"
baseDir: "my-custom-sources"
source:
address: custom://MyCustomSource?param1=value1
options:
source-table: "my_data"
sink:
address: dataos://lakehouse
options:
incremental-strategy: append
dest-table: "raw"
Example Implementations¶
The following examples illustrate various methods for creating custom sources in Nilus. Each example shows how resources and sources can be defined for specific use cases.
-
Static Source
This example produces a small, fixed dataset.
@nilus.resource() def fetch_random_users(): for d in [{"id": 1, "name": "Alice"}]: yield d @nilus.source def random_user_source(uri, table, **kwargs): return fetch_random_users()
fetch_random_users
: Defines a resource that yields static data.random_user_source
: Registers the entry point Nilus uses to execute the resource.
-
API Integration
This example shows how to fetch data from an external API.
@nilus.resource() def fetch_api_data(api_url, api_key, endpoint): headers = {"Authorization": f"Bearer {api_key}"} data = requests.get(f"{api_url}/{endpoint}", headers=headers).json() for item in data: yield item
- API parameters (
api_url
,api_key
,endpoint
) are passed into the function. - The API response is iterated, and each record is yielded individually.
- API parameters (
-
Multi-Resource Source This example defines multiple resources within a single source.
@nilus.resource(write_disposition="replace") def users(): yield from fetch_users() @nilus.resource(write_disposition="append") def activities(): yield from fetch_activities() @nilus.source def multi_resource_source(uri, table, **kwargs): return users(), activities()
- Users resource: Produces user records and replaces existing data.
- Activities resource: Produces activity records and appends them to existing data.
- multi_resource_source: Groups both resources so Nilus can run them together.
Creating a Random User Custom Source¶
This example shows how to implement a custom source that returns a static dataset of random users. It illustrates how to separate resource logic from the CustomSource
implementation.
random_users.py
import nilus
# Dummy static dataset
dummy_data = [
{
"gender": "female",
"name": {"title": "Ms", "first": "Aisha", "last": "Patel"},
"location": {
"city": "Bengaluru",
"state": "Karnataka",
"country": "India"
},
"email": "aisha.patel@example.com",
"dob": {"date": "1988-04-12T10:15:30.000Z", "age": 36},
"phone": "080-12345678"
},
{
"gender": "male",
"name": {"title": "Mr", "first": "Rahul", "last": "Sharma"},
"location": {
"city": "Mumbai",
"state": "Maharashtra",
"country": "India"
},
"email": "rahul.sharma@example.com",
"dob": {"date": "1992-11-05T08:05:10.000Z", "age": 32},
"phone": "022-23456789"
},
{
"gender": "female",
"name": {"title": "Mrs", "first": "Neha", "last": "Reddy"},
"location": {
"city": "Chennai",
"state": "Tamil Nadu",
"country": "India"
},
"email": "neha.reddy@example.com",
"dob": {"date": "1975-07-22T12:30:45.000Z", "age": 49},
"phone": "044-34567890"
}
]
@nilus.resource()
def fetch_random_users():
for d in dummy_data:
yield d
@nilus.source
def random_user_source(uri, table, **kwargs):
return fetch_random_users()
nilus_custom_source.py
from examples.custom_source.random_users import random_user_source
from nilus import CustomSource
class RandomUserSource(CustomSource):
def handles_incrementality(self) -> bool:
return False
def nilus_source(self, uri: str, table: str, **kwargs):
return random_user_source(uri, table, **kwargs)
How It Works¶
random_users.py
- Defines a resource (
fetch_random_users
) that yields static user data. - Registers a Nilus source (
random_user_source
) to expose the resource.
- Defines a resource (
nilus_custom_source.py
- Wraps the source in a
CustomSource
class (RandomUserSource
). - Nilus expects this class when running the custom source.
- Wraps the source in a
-
The name of the destination will be defined:
- Explicitly through the table_name parameter in the
@nilus.resource
decorator.
- If
table_name
is not defined in the@nilus.resource
decorator; it defaults to the function name. - Looking at the actual example in
random_users.py
- Since the
table_name
parameter is not defined in the@nilus.resource()
, the destination table name will befetch_random_users
(the function name). - Nilus executes the custom source like any other built-in connector, streaming the yielded records into downstream pipelines.
- Explicitly through the table_name parameter in the
Best Practices¶
When implementing custom sources, follow these guidelines to ensure maintainability, performance, and reliability:
- Code Organization
- Keep resources, sources, and utility functions in separate modules.
- Maintain a clear repository structure.
- Error Handling
- Implement logging for all failures.
- Ensure errors are handled gracefully without leaving open connections.
- Performance
- Use batching or pagination for large datasets.
- Apply indexing where applicable to optimize data retrieval.
- Testing
- Test sources independently before deploying in a workflow.
- Validate correctness and data completeness.
- Documentation
- Document all required parameters for each source.
- Provide usage examples where possible.
Security Considerations¶
Custom Sources must follow secure development practices to protect credentials, control access, and ensure data compliance.
- Secrets Management
- Store sensitive credentials in DataOS Secrets or environment variables.
- Avoid hardcoding API keys or credentials in the source code.
- Rotate authentication tokens and API keys regularly.
- Access Control
- Apply the principle of least privilege to database or API users.
- Monitor and audit access logs for suspicious activity.
- Data Protection
- Use TLS or equivalent encryption when connecting to external APIs or databases.
- Mask, encrypt, or hash sensitive fields when required.
- Ensure compliance with relevant regulations such as GDPR or HIPAA.