Skip to content

SDK

frank-sdk is the Python contract for custom transform authors. It gives Python-runner patterns a stable way to read runtime config, query data, emit metrics and lineage, and return structured results to the platform.

Install

From the shared utilities package:

bash
cd utils

# Core SDK
pip install -e ".[sdk]"

# With Trino support
pip install -e ".[sdk-trino]"

# With pandas and pyarrow
pip install -e ".[sdk-pandas]"

# Full local data stack
pip install -e ".[sdk-full]"

In packaged environments:

bash
pip install "frank-shared[sdk-full]"

Runtime contract

Python-runner transforms receive configuration through TRANSFORM_CONFIG.

json
{
  "artifact_id": "uuid",
  "run_id": "uuid",
  "tenant_id": "uuid",
  "source_table": "iceberg.bronze.orders",
  "source_tables": ["iceberg.bronze.orders"],
  "target_table": "iceberg.silver.orders_clean",
  "params": {
    "filter_expression": "status != 'deleted'"
  },
  "cursors": {
    "input_cursors": {
      "iceberg.bronze.orders": {
        "mode": "delta",
        "cursor_field": "_extracted_at",
        "cursor_value": "2026-02-01T00:00:00Z"
      }
    },
    "cutoff_cursors": {
      "iceberg.bronze.orders": {
        "cursor_value": "2026-02-04T12:00:00Z"
      }
    }
  }
}

The transform writes one final FrankResult JSON object to stdout. Logs, metrics, progress, and lineage should go through SDK emitters.

Minimal transform

python
from frank_sdk import FrankContext, FrankResult


def main():
    ctx = FrankContext.from_env()
    result = FrankResult.success(
        output_row_count=0,
        metrics={
            "mode": ctx.get_param("mode", "default"),
            "target_table": ctx.target_table,
        },
    )
    result.write_to_stdout()


if __name__ == "__main__":
    main()

Querying with Trino

python
from frank_sdk import FrankContext, FrankResult, emit_metric, emit_lineage


def main():
    ctx = FrankContext.from_env()
    conn = ctx.get_trino_connection()
    cur = conn.cursor()

    where = ctx.build_incremental_filter(ctx.source_table)
    cur.execute(f"""
        CREATE TABLE {ctx.target_table} AS
        SELECT *
        FROM {ctx.source_table}
        WHERE {where}
    """)

    cur.execute(f"SELECT COUNT(*) FROM {ctx.target_table}")
    count = cur.fetchone()[0]

    emit_metric("rows_processed", count)
    emit_lineage(
        source=ctx.source_table,
        target=ctx.target_table,
        operation="copy",
    )

    FrankResult.success(output_row_count=count).write_to_stdout()


if __name__ == "__main__":
    main()

Connection environment variables:

VariableDefault
TRINO_HOSTlocalhost
TRINO_PORT8080
TRINO_USERfrank-transform
TRINO_CATALOGiceberg
TRINO_SCHEMAunset
TRINO_PASSWORDunset

FrankContext

Important properties:

PropertyMeaning
artifact_idTransform artifact UUID.
run_idTransform run UUID.
tenant_idTenant UUID.
source_tablePrimary input table.
source_tablesAll input tables.
target_tableOutput table.
paramsPattern or transform parameters.
cursorsInput and cutoff cursor state.
raw_configFull runtime config.

Useful methods:

python
ctx.get_param("key", default=None)
ctx.require_param("key")
ctx.get_trino_connection()
ctx.get_input_cursor(table)
ctx.get_cutoff_cursor(table)
ctx.build_incremental_filter(table)

FrankResult

Use factory helpers:

python
FrankResult.success(output_row_count=100, input_row_count=120)
FrankResult.failure("Invalid source data", {"column": "id"})
FrankResult.partial(output_row_count=80, error_message="20 rows skipped")

Add metrics and data quality:

python
from frank_sdk import FrankResult, DataQualityResult

result = FrankResult.success(output_row_count=100)
result.add_metric("duplicates_removed", 5)
result.add_data_quality_result(
    DataQualityResult(
        rule_name="non_null_id",
        passed=True,
        total_rows=100,
        failed_rows=0,
    )
)
result.write_to_stdout()

Output fields:

FieldMeaning
statussuccess, failure, or partial.
output_row_countRows written.
input_row_countRows read.
rows_affectedRows changed by update/delete style operations.
metricsCustom JSON-serializable metrics.
data_qualityData quality check results.
error_messageHuman-readable error.
error_detailsStructured error context.
output_snapshot_idIceberg snapshot ID.

Structured logging, metrics, and lineage

python
from frank_sdk import emit_log, emit_warning, emit_metric, emit_progress, emit_lineage

emit_log("Starting transform", context={"table": "orders"})
emit_metric("rows_processed", 1000, tags={"stage": "clean"})
emit_progress(current=5, total=10, message="Processed batch 5")
emit_warning("Skipped rows with invalid dates", {"count": 3})
emit_lineage(
    source=["iceberg.bronze.orders", "iceberg.bronze.customers"],
    target="iceberg.gold.customer_orders",
    operation="join",
    columns={"customer_id": ["orders.customer_id", "customers.id"]},
)

Emitters write structured records to stderr for platform capture while keeping stdout reserved for the final FrankResult.

Local testing

Pair the SDK with the Python pattern CLI:

bash
frank init my-pattern --template python
cd patterns/my-pattern
frank validate .
frank test . --config tests/sample_config.json --env-file .env.test

Use test fixtures in tests/sample_config.json to create Trino tables, run assertions, and clean up after the transform.

Frank is built by aiaiai-pt.