Oracle Workflow Persistence Service for Workflows

Posted on June 10, 2008. Filed under: c#, workflow | Tags: , , , , |

Workflow comes with inbuilt MSSQL persistence Service. For other databases you have to create a custom persistence service. I have tried to do the same. Presented below is the basic most Oracle Workflow persistence Service. It has all the elements you need for a basic purpose. I have put appropriate comments with in the code.

To write a Custom Workflow Persistence Service you need to implement abstract class WorkflowPersistenceService and provide implmentations to the methods it contains.

A brief about all the methods is as follows:

  • SaveWorkflowInstanceState: Saves the current state of the workflow to the database
  • LoadWorkflowInstanceState: Load an entire workflow from the database, on receiving any events.
  • SaveCompletedContextActivity: Once an activity is completed persist it to the database.
  • LoadCompletedContextActivity: Load an activity from the database, on receiving any events.
  • UnloadOnIdle: Always set it to true to unload and persist your workflow when its idled.
  • Serialize: To insert/update in to the database
  • Deserialize: To retrieve from the database
  • DeleteWorkflow: Delete a completed workflow and all the related activities
  • GetAllWorkflows: Getting all workflows persisted in the database.
using System;
using System.IO;
using System.Collections.Generic;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using System.Workflow.ComponentModel;
using System.Data.OleDb;
using System.Data;
using System.Data.OracleClient;

namespace SharedWorkflows
{
    ///
    /// Oracle based workflow persistence service: Created by Sunil
    ///
    public class OracleWorkflowPersistenceService: WorkflowPersistenceService
    {
        private string dbconnectionString;
        public OracleWorkflowPersistenceService(string connectionString)
        {
            this.dbconnectionString = connectionString;
        }

        #region Abstract method implementations

        ///
        /// Persist the current state of the entire workflow
        ///
        protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
        {
            //get the workflow instance Id
            Guid instanceId = WorkflowEnvironment.WorkflowInstanceId;

            //determine the status of the workflow
            WorkflowStatus status = WorkflowPersistenceService.GetWorkflowStatus(rootActivity);
            switch (status)
            {
                case WorkflowStatus.Completed:
                case WorkflowStatus.Terminated:
                    //delete the persisted workflow
                    DeleteWorkflow(instanceId);
                    break;
                default:
                    //save the workflow
                    Serialize(instanceId, Guid.Empty, rootActivity);
                    break;
            }
        }

        ///
        /// Load an entire workflow
        ///
        protected override Activity LoadWorkflowInstanceState(
            Guid instanceId)
        {
            Activity activity = Deserialize(instanceId, Guid.Empty, null);
            if (activity == null)
            {
                ThrowException(instanceId, "Unable to deserialize workflow", null);
            }
            return activity;
        }

        ///
        /// Persist a completed activity context
        ///
        ///
        /// This persists completed activities that were part
        /// of an execution scope. Example:  Activities
        /// within a CompensatableSequenceActivity.
        ///
        protected override void SaveCompletedContextActivity(
            Activity activity)
        {
            //get the workflow instance Id
            Guid instanceId = WorkflowEnvironment.WorkflowInstanceId;

            //get the context Id which identifies the activity scope
            //within the workflow instance
            Guid contextId = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);

            //persist the activity for this workflow
            Serialize(instanceId, contextId, activity);
        }

        ///
        /// Load an activity context
        ///
        protected override Activity LoadCompletedContextActivity(Guid scopeId, Activity outerActivity)
        {
            //get the workflow instance Id
            Guid instanceId = WorkflowEnvironment.WorkflowInstanceId;

            Activity activity = Deserialize(instanceId, scopeId, outerActivity);
            if (activity == null)
            {
                ThrowException(instanceId, "Unable to deserialize activity", null);
            }
            return activity;
        }

        protected override void UnlockWorkflowInstanceState(Activity rootActivity)
        {
            //I have not implemented locking
        }

        protected override bool UnloadOnIdle(Activity activity)
        {
            //always unload on idle
            return true;
        }

        #endregion

        #region Persistence and Oracle Management

        ///
        /// Serialize the workflow or an activity context
        ///
        private void Serialize(Guid instanceId, Guid contextId, Activity activity)
        {
            try
            {
                MemoryStream memoryStream = new MemoryStream();
                activity.Save(memoryStream);
                byte[] wfState = memoryStream.GetBuffer();

                OracleConnection connection = new OracleConnection(this.dbconnectionString);
                OracleCommand command  = new OracleCommand();
                command.CommandText = "INSERT INTO WFPERSISTENCE (WFID, WFSTATE, CONTEXTID) VALUES ('"
                                      + instanceId.ToString()
                                      + "',:WFSTATE,'" + contextId.ToString() +"')";
                OracleParameter param = new OracleParameter();
                param.OracleType = OracleType.Blob;
                param.ParameterName = "WFSTATE";
                param.Value = wfState;
                command.Parameters.Add(param);

                command.Connection = connection;
                connection.Open();
                command.ExecuteNonQuery();
                connection.Close();
            }
            catch (ArgumentException e)
            {
                ThrowException(instanceId, "Serialize: Method has invalid argument", e);
            }
            catch (Exception e)
            {
                ThrowException(instanceId, "Serialize: Unknown exception", e);
            }
        }

        ///
        /// Deserialize a workflow or an activity context
        ///
        private Activity Deserialize(Guid instanceId, Guid contextId, Activity rootActivity)
        {
            Activity activity = null;
            try
            {
                OracleConnection connection = new OracleConnection(this.dbconnectionString);
                OracleCommand command = new OracleCommand();
                command.CommandText = "SELECT WFSTATE FROM WFPERSISTENCE WHERE WFID='"
                                      + instanceId.ToString() + "' AND CONTEXTID='"
                                      + contextId.ToString() + "'";
                command.Connection = connection;
                connection.Open();
                byte[] wfState = (byte[]) command.ExecuteScalar();

                connection.Close();
                MemoryStream memoryStream = new MemoryStream(wfState);
                activity = Activity.Load(memoryStream, rootActivity);
            }
            catch (ArgumentException e)
            {
                ThrowException(instanceId, "Deserialize: Path has invalid argument", e);
            }

            catch (Exception e)
            {
                ThrowException(instanceId, "Deserialize: Unknown exception", e);
            }
            return activity;
        }

        ///
        /// Delete a workflow and any related activity context files
        ///
        private void DeleteWorkflow(Guid instanceId)
        {
            try
            {
                OracleConnection connection = new OracleConnection(this.dbconnectionString);
                OracleCommand command = new OracleCommand();
                command.CommandText = "DELETE FROM WFPERSISTENCE WHERE WFID='"
                                      + instanceId.ToString() + "'";
                command.Connection = connection;
                connection.Open();
                command.ExecuteNonQuery();
                connection.Close();
            }
            catch (ArgumentException e)
            {
                ThrowException(instanceId, "Delete: Method has invalid argument", e);
            }

            catch (Exception e)
            {
                ThrowException(instanceId, "Delete: Unknown exception", e);
            }
        }

        #endregion

        #region Existing Workflow Management

        ///
        /// Return a list of all workflow Ids that are persisted
        ///
        public List GetAllWorkflows()
        {
            List workflows = new List();

            OracleConnection connection = new OracleConnection(this.dbconnectionString);
            OracleCommand command = new OracleCommand();
            command.CommandText = "SELECT DISTINCT WFID FROM WFPERSISTENCE";
            command.Connection = connection;
            DataSet dataSet = new DataSet();
            OracleDataAdapter dataAdapter = new OracleDataAdapter();
            connection.Open();
            dataAdapter.SelectCommand = command;
            dataAdapter.Fill(dataSet);
            connection.Close();

            for (int i = 0; i < dataSet.Tables[0].Rows.Count; i++)
            {
                Guid instanceId = new Guid(dataSet.Tables[0].Rows[i]["WFID"].ToString());
                workflows.Add(instanceId);
            }
            return workflows;
        }

        #endregion

        #region Common Error handling

        ///
        /// Throw an exception due to an error
        ///
        private void ThrowException(Guid instanceId, String message, Exception inner)
        {
            if (inner == null)
            {
                throw new PersistenceException(String.Format("Workflow: {0} Error: {1}",
                                               instanceId, message));
            }
            else
            {
                throw new PersistenceException(String.Format("Workflow: {0} Error: {1}: Inner: {2}",
                                               instanceId, message, inner.Message), inner);
            }
        }

        #endregion
    }
}

The corresponding database Table Create Script is as follows:

CREATE TABLE WFPERSISTENCE
(
 WFID       VARCHAR2(36 BYTE)  NOT NULL,
 WFSTATE    BLOB,
 CONTEXTID  VARCHAR2(36 BYTE)
)

Hope this gets you started on Custom Persistence Services for Oracle.

Make a Comment

Make a Comment: ( 1 so far )

You must be logged in to post a comment.

One Response to “Oracle Workflow Persistence Service for Workflows”

RSS Feed for My .NET Blog Comments RSS Feed

Hi there,

FYI we have a far more complete implementation over at CodePlex – http://www.codeplex.com/WFTools. Also has support for MySql

Cheers,

Dean


Where's The Comment Form?

Liked it here?
Why not try sites on the blogroll...