Download this example as a Jupyter notebook or a Python script.


Granta MI Scripting Toolkit example#

Introduction#

An example that uses the PyGranta Data Flow Extensions package to interact with a Granta MI system via the Granta MI Scripting Toolkit. This example uploads the data payload received by MI Data Flow to the workflow record. This could be replaced with any other business logic which requires access to Granta MI resources.

Pre-requisites#

This example assumes the workflow has been configured with the ‘Metals Pedigree’ table in the MI Training database, but includes guidance for how to adjust the Python code to work with a different database schema.

Info:

Running this notebook requires the Granta MI Scripting Toolkit package. If you do not have access to the Granta MI Scripting Toolkit, contact your System Administrator.

Example script#

[1]:
import logging
import traceback

from ansys.grantami.dataflow_extensions import MIDataflowIntegration

# Create an instance of the root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Add a StreamHandler to write the output to stderr
ch = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)


def main():
    """
    Initializes the MI Data Flow integration module, runs the business logic,
    and cleans up once execution has completed.
    """

    # Ansys strongly recommend using HTTPS in production environments.
    # If you are using an internal certificate, you should specify the
    # CA certificate with certificate_filename=my_cert_file.crt and add the
    # certificate to the workflow as a supporting file, or use an absolute
    # pathlib.Path object to the file on disk.
    # Refer to the MIDataflowIntegration API reference page for more details.
    dataflow_integration = MIDataflowIntegration(use_https=False)

    try:
        step_logic(dataflow_integration)
        exit_code = 0
    except Exception:
        traceback.print_exc()
        exit_code = 1
    dataflow_integration.resume_bookmark(exit_code)


def testing():
    """Contains a static copy of a Data Flow data payload for testing purposes"""
    dataflow_payload = {
        "WorkflowId": "67eb55ff-363a-42c7-9793-df363f1ecc83",
        "WorkflowDefinitionId": "Example; Version=1.0.0.0",
        "TransitionName": "Python_83e51914-3752-40d0-8350-c096674873e2",
        "Record": {
            "Database": "MI_Training",
            "Table": "Metals Pedigree",
            "RecordHistoryGuid": "d2f51a3d-c274-4a1e-b7c9-8ba2976202cc",
        },
        "WorkflowUrl": "http://my_server_name/mi_dataflow",
        "AuthorizationHeader": "",
        "ClientCredentialType": "Windows",
        "Attributes": {
            "Record": {"Value": ["d2f51a3d-c274-4a1e-b7c9-8ba2976202cc+MI_Training"]},
            "TransitionId": {"Value": "9f1bf6e7-0b05-4cd3-ac61-1d2d11a1d351"},
        },
        "CustomValues": {},
    }

    # Call MIDataflowIntegration constructor with "dataflow_payload" argument
    # instead of reading data from Data Flow.
    dataflow_integration = MIDataflowIntegration.from_dict_payload(
        dataflow_payload=dataflow_payload,
        use_https=False,
    )
    step_logic(dataflow_integration)


def step_logic(dataflow_integration):
    """Contains the business logic to be executed as part of the workflow.

    In this example, identify the record that is the subject of the
    workflow operation, and upload the Data Flow payload to that record.

    Replace the code in this module with your custom business logic.
    """
    payload = dataflow_integration.get_payload_as_dict()
    mi_session = dataflow_integration.mi_session

    db_key = payload["Record"]["Database"]
    db = mi_session.get_db(db_key=db_key)
    record_hguid = payload["Record"]["RecordHistoryGuid"]
    rec = db.get_record_by_id(hguid=record_hguid)

    # Write the json received from the dataflow API to the attribute
    # "Additional Processing Notes"
    data = dataflow_integration.get_payload_as_string(
        indent=True,
        include_credentials=False,
    )

    # To import the payload into a different attribute, change
    # the name here. Specify any long text attribute in the
    # table included in the workflow definition.
    attribute_name = "Additional Processing Notes"
    attribute = rec.attributes[attribute_name]
    attribute.value = data
    rec.set_attributes([attribute])

    # Update record database
    mi_session.update([rec])
    logger.info("Updated MI database")  # This output will be visible in the api/logs page


if __name__ == "__main__":
    # main()  # Used when running the script as part of a workflow
    testing()  # Used when testing the script manually
2025-06-06 13:49:50,002 - ansys.grantami.dataflow_extensions - INFO -
2025-06-06 13:49:50,002 - ansys.grantami.dataflow_extensions - INFO - ---------- Initializing new Data Flow Extensions instance ----------
2025-06-06 13:49:50,002 - ansys.grantami.dataflow_extensions - INFO - ------------------- Initialization complete --------------------
D:\a\grantami-dataflow-extensions\grantami-dataflow-extensions\src\ansys\grantami\dataflow_extensions\_mi_dataflow.py:474: UserWarning: This method is deprecated. Use 'get_scripting_toolkit_session()' instead.
  warnings.warn("This method is deprecated. Use 'get_scripting_toolkit_session()' instead.")
2025-06-06 13:49:50,002 - root - INFO - Updated MI database