Loosely Coupled Record Automation
I want to automate the creation of records in Content Manager so that workflows can be initiated and other content can be captured. As records and workflows are processed by users within Content Manager I want to export that activity to a graph database. That graph database will highlight relationships between records that might not otherwise be easily determined. It can also aide in tracing lineage and relationships.
I few years ago I would have created a lightweight C# console app that implements the logic and directly integrates with Content Manager via the .Net or COM SDK. No longer though! Now I want to implement this using as many managed services and server-less components as possible.
This diagram depicts the final solution design…
What does each component do?
Cloud Scheduler — an online cron/task utility that is cheap and easy to use on GCP
Cloud Functions — light-weight, containerized bundles of code that can be versioned, deployed, and managed independently of the other components
Cloud PubSub — this is a message broker service that allows you to quickly integrate software components together. One system may publish to a topic. Other systems (0+) will subscribe to those topics
Service API — REST API end-point that enables integration over HTTP
Content Manager Event Server — custom .Net Event Processer Plugin that publishes new record meta-data and workflow state to a PubSub topic
Graph Database — enables searching via cypher query syntax (think social network graph) across complex relationships
Why use this approach?
Centralized — Putting the scheduler outside of the CM server makes it easier to monitor centrally
Separation of Concerns — Separating the “Check Website” logic from the “Saving to CM” logic enables us re-use the logic for other purposes
Asynchronous Processing — Putting PubSub between the functions let’s them react in real-time and independently of each other
Scaling — cloud functions and pubsub can scale horizontally to billions of calls
Error handling — when errors happen in a function we can redirect to an error topic for review (which could kick-off a workflow)
Language Freedom — I can use python, node, or Go for the cloud functions; or I can use .Net (via Cloud Run instead of as a Cloud Functions)
Overall this is a pretty simple undertaking. It will grow much more complex as time progresses, but for now I can get building!
Fetching the records
This is super easy with python! My source is a REST API that will contain a bunch of data about firms. For each retrieved firm I’ll publish a message to a topic. Multiple things could then subscribe to that topic and react to the message.
First we’ll create the topic…
Next I write the logic in a python module…
import urllib.request as urllib2 import sys import json import requests import gzip import os from google.cloud import pubsub project_id = os.getenv('GOOGLE_CLOUD_PROJECT') if os.getenv('GOOGLE_CLOUD_PROJECT') else 'CM-DEV' topic_name = os.getenv('GOOGLE_CLOUD_TOPIC') if os.getenv('GOOGLE_CLOUD_TOPIC') else 'new_firm' def callback(message_future): # When timeout is unspecified, the exception method waits indefinitely. if message_future.exception(timeout=30): print('Publishing message on {} threw an Exception {}.'.format( topic_name, message_future.exception())) else: print(message_future.result()) def downloadFirms(args): request_headers = requests.utils.default_headers() request_headers.update({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'max-age=0', 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0' }) url = "https://api..." request = urllib2.Request(url, headers=request_headers) html = gzip.decompress(urllib2.urlopen(request).read()).decode('utf-8') data = json.loads(html) try: hits = data['hits']['hits'] publisher = pubsub.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) for firm in hits: firm_data = firm['_source'] firm_name = firm_data['firm_name'] topic_message = json.dumps(firm_data).encode('utf-8') print(firm_name) msg = publisher.publish(topic_path, topic_message) msg.add_done_callback(callback) except Exception as exc: print('Error: ', exc) finally: print('Done!') if __name__ == "__main__": downloadFirms()
Now that module can be placed into a cloud function…
Don’t forget to pass in the run-time parameters so that the function can post to the correct topic in the correct project. You may change these during your testing process.
With that saved we can now review the HTTP end-point address, which we’ll use when scheduling the routine download. Open the cloud function and click onto the trigger tab, then copy the URL to your clipboard.
In the cloud scheduler we just need to determine the frequency and the URL of the cloud function. I’ll post an empty json object as it’s required by the scheduler (even though I won’t consume it directly within the cloud function).
Next I need a cloud function that subscribes to the topic and does something with the data. For quick demonstration purposes I’ll just write the data out to the console (which will materialize in stackdriver as a log event).
With that created I can now test the download function, which should result in new messages in the topic, and then new output in Stackdriver. I can also create log metrics based on the content of the log. For instance, I can create a metric for number of new firms, number of errors, or average runtime execution duration (cloud functions cap out in terms of their lifetime, so this is important to consider).
Now I could just put the “create folder in CM” logic within my existing cloud function, but then I’m tightly-coupling the download of the firms to the registration of folders. That would limit the extent to which I can re-use code and cobble together new feature functionality. Tightly-coupled solutions are harder to maintain, support, and validate.
In the next post we’ll update the cloud function that pushes the firm into the Content Manager dataset!