Loosely Coupled Record Automation

I want to automate the creation of records in Content Manager so that workflows can be initiated and other content can be captured. As records and workflows are processed by users within Content Manager I want to export that activity to a graph database. That graph database will highlight relationships between records that might not otherwise be easily determined. It can also aide in tracing lineage and relationships.

I few years ago I would have created a lightweight C# console app that implements the logic and directly integrates with Content Manager via the .Net or COM SDK. No longer though! Now I want to implement this using as many managed services and server-less components as possible.

This diagram depicts the final solution design…

CM Graph (2).png

What does each component do?

  • Cloud Scheduler — an online cron/task utility that is cheap and easy to use on GCP

  • Cloud Functions — light-weight, containerized bundles of code that can be versioned, deployed, and managed independently of the other components

  • Cloud PubSub — this is a message broker service that allows you to quickly integrate software components together. One system may publish to a topic. Other systems (0+) will subscribe to those topics

  • Service API — REST API end-point that enables integration over HTTP

  • Content Manager Event Server — custom .Net Event Processer Plugin that publishes new record meta-data and workflow state to a PubSub topic

  • Graph Database — enables searching via cypher query syntax (think social network graph) across complex relationships

Why use this approach?

  • Centralized — Putting the scheduler outside of the CM server makes it easier to monitor centrally

  • Separation of Concerns — Separating the “Check Website” logic from the “Saving to CM” logic enables us re-use the logic for other purposes

  • Asynchronous Processing — Putting PubSub between the functions let’s them react in real-time and independently of each other

  • Scaling — cloud functions and pubsub can scale horizontally to billions of calls

  • Error handling — when errors happen in a function we can redirect to an error topic for review (which could kick-off a workflow)

  • Language Freedom — I can use python, node, or Go for the cloud functions; or I can use .Net (via Cloud Run instead of as a Cloud Functions)

Overall this is a pretty simple undertaking. It will grow much more complex as time progresses, but for now I can get building!


Fetching the records

This is super easy with python! My source is a REST API that will contain a bunch of data about firms. For each retrieved firm I’ll publish a message to a topic. Multiple things could then subscribe to that topic and react to the message.

First we’ll create the topic…

2019-06-21_18-33-09.jpg

Next I write the logic in a python module…

import urllib.request as urllib2
import sys
import json
import requests
import gzip
import os
from google.cloud import pubsub
 
project_id = os.getenv('GOOGLE_CLOUD_PROJECT') if os.getenv('GOOGLE_CLOUD_PROJECT') else 'CM-DEV'
topic_name = os.getenv('GOOGLE_CLOUD_TOPIC') if os.getenv('GOOGLE_CLOUD_TOPIC') else 'new_firm'
 
def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())
 
def downloadFirms(args):
    request_headers = requests.utils.default_headers()
    request_headers.update({
        'Accept''text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 
        'Accept-Language''en-US,en;q=0.9', 
        'Cache-Control''max-age=0', 
        'User-Agent''Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0' 
    })
    url = "https://api..."
    request = urllib2.Request(url, headers=request_headers)
    html = gzip.decompress(urllib2.urlopen(request).read()).decode('utf-8')
    data = json.loads(html)
    try:
        hits = data['hits']['hits']
        publisher = pubsub.PublisherClient()
        topic_path = publisher.topic_path(project_id, topic_name)
        for firm in hits:
            firm_data = firm['_source']
            firm_name = firm_data['firm_name']
            topic_message = json.dumps(firm_data).encode('utf-8')
            print(firm_name)
            msg = publisher.publish(topic_path, topic_message)
            msg.add_done_callback(callback)
    except Exception as exc:
        print('Error: ', exc)
    finally:
        print('Done!')
 
if __name__ == "__main__":
    downloadFirms()

Now that module can be placed into a cloud function…

2019-06-21_18-41-02.jpg

Don’t forget to pass in the run-time parameters so that the function can post to the correct topic in the correct project. You may change these during your testing process.

2019-06-21_18-42-55.jpg

With that saved we can now review the HTTP end-point address, which we’ll use when scheduling the routine download. Open the cloud function and click onto the trigger tab, then copy the URL to your clipboard.

2019-06-21_18-46-06.jpg

In the cloud scheduler we just need to determine the frequency and the URL of the cloud function. I’ll post an empty json object as it’s required by the scheduler (even though I won’t consume it directly within the cloud function).

2019-06-21_18-49-34.jpg

Next I need a cloud function that subscribes to the topic and does something with the data. For quick demonstration purposes I’ll just write the data out to the console (which will materialize in stackdriver as a log event).

2019-06-21_19-14-16.jpg

With that created I can now test the download function, which should result in new messages in the topic, and then new output in Stackdriver. I can also create log metrics based on the content of the log. For instance, I can create a metric for number of new firms, number of errors, or average runtime execution duration (cloud functions cap out in terms of their lifetime, so this is important to consider).

2019-06-21_19-17-43.jpg

Now I could just put the “create folder in CM” logic within my existing cloud function, but then I’m tightly-coupling the download of the firms to the registration of folders. That would limit the extent to which I can re-use code and cobble together new feature functionality. Tightly-coupled solutions are harder to maintain, support, and validate.

In the next post we’ll update the cloud function that pushes the firm into the Content Manager dataset!

Transcribing Audio Records with the Cloud Speech API

In this post I'll show how one might approach enriching their public archives with audio transcriptions provided by Google's Cloud Speech API.

First we can start with this collection of records:

Collection of Hearing Records

Collection of Hearing Records

Simple meta-data for each file

Simple meta-data for each file

For each of these audio files I'll have to download it, convert it, stage it, pass it to the speech api, and capture the results.  I'll craft this in powershell for the moment and then later implement this as a cloud function.  The results can then be added to the notes or as an OCR text rendition (or reviewed and then added).

2018-06-08_20-46-04.png

The speech API will give me chunks of text with an associated confidence level, as shown below:

2018-06-08_20-54-10.png

Which can all be mashed together for a pretty accurate transcription:

2018-06-08_21-17-59.png

Step by Step

My first step is to enable the Speech API within GCP:

 
2018-06-02_1-33-25.png
 

Then create a storage bucket to house the files.  I could skip this and upload it directly within a request, but staging them in a bucket makes it easier for me to later work with cloud functions.  

To convert the audio from mp3 to wav format with a single audio channel I used ffmpeg:

 
ffmpeg -hide_banner -loglevel panic -y -i %inputfile% -ac 1 %outputfile%

I like ffmpeg because it's easy to use on windows servers & workstations.  But there's also a good fluent ffmpeg module available in nodejs, which allows this to be built as a cloud function.  For now here's my powershell function to convert the audio file...

function ConvertTo-Wav {
	Param([string]$inputFile)
	#few variables for local pathing
	$newFileName = [System.IO.Path]::getfilenamewithoutextension($inputFile) + ".wav"
	$fileDir = [System.IO.Path]::getdirectoryname($inputFile)
	$newFilePath = [System.IO.Path]::combine($fileDir, $newFileName)
	#once is enough
	Write-Debug("ConvertTo-Wav Target File: " + $newFilePath)
	if ( (Test-Path $newFilePath) -eq $false ) {
		#convert using open source ffmpeg
		$convertCmd = "ffmpeg -hide_banner -loglevel panic -y -i `"$inputFile`" -ac 1 `"$newFilePath`""
		Write-Debug ("ConvertTo-Wav Command: " + $convertCmd)
		Invoke-Expression $convertCmd
	}
	return $newFilePath
}

This function queries for the audio records from content manager and caches the result to disk (simply for development of the script)...

function Get-CMAudioFilesJson {
	$localResponseFile = [System.IO.Path]::Combine($localTempPath,"audioRecords.json")
	$audioFiles = @()
	if ( (Test-Path $localResponseFile) -eq $false ) {
		#fetch if results not cached to disk
		$searchUri = ($baseCMUri + "/Record?q=container:11532546&format=json&pageSize=100000&properties=Uri,RecordNumber,Url,RecordExtension,RecordDocumentSize")
		Write-Debug "Searching for audio records: $searchUri"
		$response = Invoke-RestMethod -Uri $searchUri -Method Get -ContentType $ApplicationJson
		#flush to disk as raw json
		$response | ConvertTo-Json -Depth 6 | Set-Content $localResponseFile
	} else {
		#load and convert from json
		Write-Debug ("Loading Audio Records from local file: " + $localResponseFile)
		$response = Get-Content -Path $localResponseFile | ConvertFrom-Json
	}
	#if no results just error out
	if ( $response.Results -ne $null ) {
		Write-Debug ("Processing $($response.Results.length) audio records")
		$audioFiles = $response.Results
	} else {
		Write-Debug "Error"
		break
	}
	return $audioFiles
}

A function to submit the record's audio file to the speech api (and capture the results):

function Get-AudioText 
{
    Param($audioRecord)
    #formulate a valid path for the local file system
    $localFileName = (""+$audioRecord.Uri+"."+$audioRecord.RecordExtension.Value)
    $localPath = ($localTempPath + "\" + $localFileName)
    $sourceAudioFileUri = ($audioRecord.Url.Value + "/File/Document")
    $speechApiResultPath = ($localTempPath + "\" + $audioRecord.Uri + ".txt")  
    $speechTextPath = ($localTempPath + "\" + $audioRecord.Uri + "_text.txt")
    #download the audio file if not already done so
    if ( (Test-Path $localPath) -eq $false ) {  
        Invoke-WebRequest -Uri $sourceAudioFileUri -OutFile $localPath
    }
    #convert file if necessary
    if ( ($audioRecord.RecordExtension.Value.ToLower()) -ne "wav" ) {
        $localPath = ConvertTo-Wav $localPath
        $localFileName = [System.IO.Path]::GetfileName($localPath)
        if ( (Test-Path $localPath) -eq $false ) {
            Write-Error "Error Converting $($localPath)"
            return
        }
    }
 
    #transcribe, if not already done so
    Write-Debug ("Checking Speech API Text: "+$speechApiResultPath)
    if ( (Test-Path $speechApiResultPath) -eq $false ) {
        try {
            $bucketFilePath = "$bucketPath/$localFileName"
            Put-BucketFile -bucketFilePath $bucketFilePath -bucketPath $bucketPath -localPath $localPath
            #invoke speech api
            $speechCmd = "gcloud ml speech recognize-long-running $bucketFilePath --language-code=en-US"
            Write-Debug ("Speech API Command: "+$speechCmd)
            Invoke-Expression $speechCmd -OutVariable $speechResult | Tee-Object -FilePath $speechApiResultPath   
            Write-Debug ("Speech API Result: " + $speechResult)    
        } catch {
            Write-Error $_Write-Error $_
        }
    }
 
    #process transcription result
    if ( (Test-Path $speechApiResultPath) -eq $true ) {
        Write-Debug ("Reading Speech Results File: " + $speechApiResultPath)
		#remove previous consolidated transcription file
		if ( (Test-Path) -eq $true ) {Remove-Item $speechTextPath -Force }
		#flush each transcript result to disk
		$content.results | ForEach-Object { $_.alternatives | ForEach-Object { Add-Content $speechTextPath ($_.transcript+' ')  }  }
    } else {
        Write-Debug ("No Speech API Results: " + $speechTextPath)
    }
}

And then some logic to parse the search results and invoke the speech api:

#fetch the search results
$audioFiles = Get-CMAudioFilesJson
if ( $audioFiles -eq $null ) {
    Write-Error "No audio files found"
    exit
}
#process each
Write-Debug "Found $($audioFiles.Length) audio files"
foreach ( $audioFile in $audioFiles ) {
    Write-Host "Transcribing $($audioFile.RecordNumber.Value)"
    Get-AudioText  -audioRecord $audioFile
}

Here's the complete script:

Clear-Host
$DebugPreference = "Continue"
 
#variables and such
$AllProtocols = [System.Net.SecurityProtocolType]'Ssl3,Tls,Tls11,Tls12'
[System.Net.ServicePointManager]::SecurityProtocol = $AllProtocols
$ApplicationJson = "application/json"
$baseCMUri = "http://efiles.portlandoregon.gov"
$localTempPath = "C:\temp\speechapi"
$bucketPath = "gs://speech-api-cm-dev"
 
#create local staging area
if ( (Test-Path $localTempPath) -eq $false ) {
    New-Item $localTempPath -Type Directory
}
 
function Get-CMAudioFilesJson {
	$localResponseFile = [System.IO.Path]::Combine($localTempPath,"audioRecords.json")
	$audioFiles = @()
	if ( (Test-Path $localResponseFile) -eq $false ) {
		#fetch if results not cached to disk
		$searchUri = ($baseCMUri + "/Record?q=container:11532546&format=json&pageSize=100000&properties=Uri,RecordNumber,Url,RecordExtension,RecordDocumentSize")
		Write-Debug "Searching for audio records: $searchUri"
		$response = Invoke-RestMethod -Uri $searchUri -Method Get -ContentType $ApplicationJson
		#flush to disk as raw json
		$response | ConvertTo-Json -Depth 6 | Set-Content $localResponseFile
	} else {
		#load and convert from json
		Write-Debug ("Loading Audio Records from local file: " + $localResponseFile)
		$response = Get-Content -Path $localResponseFile | ConvertFrom-Json
	}
	#if no results just error out
	if ( $response.Results -ne $null ) {
		Write-Debug ("Processing $($response.Results.length) audio records")
		$audioFiles = $response.Results
	} else {
		Write-Debug "Error"
		break
	}
	return $audioFiles
}
 
function ConvertTo-Wav {
	Param([string]$inputFile)
	#few variables for local pathing
	$newFileName = [System.IO.Path]::getfilenamewithoutextension($inputFile) + ".wav"
	$fileDir = [System.IO.Path]::getdirectoryname($inputFile)
	$newFilePath = [System.IO.Path]::combine($fileDir, $newFileName)
	#once is enough
	Write-Debug("ConvertTo-Wav Target File: " + $newFilePath)
	if ( (Test-Path $newFilePath) -eq $false ) {
		#convert using open source ffmpeg
		$convertCmd = "ffmpeg -hide_banner -loglevel panic -y -i `"$inputFile`" -ac 1 `"$newFilePath`""
		Write-Debug ("ConvertTo-Wav Command: " + $convertCmd)
		Invoke-Expression $convertCmd
	}
	return $newFilePath
}
 
function Put-BucketFile {
	Param($bucketFilePath,$bucketPath,$localPath)
	#upload to bucket
    $checkCommand = "gsutil -q stat $bucketFilePath"
    $checkCommand += ';$?'
    Write-Debug ("GCS file check: " + $checkCommand)
    $fileCheck = Invoke-Expression $checkCommand
    #fileCheck is true if it exists, false otherwise
    if (-not $fileCheck ) {
        Write-Debug ("Uploading to bucket: gsutil cp " + $localPath + " " + $bucketPath)
        gsutil cp $localPath $bucketPath
    }
}
 
function Get-AudioText 
{
    Param($audioRecord)
    #formulate a valid path for the local file system
    $localFileName = (""+$audioRecord.Uri+"."+$audioRecord.RecordExtension.Value)
    $localPath = ($localTempPath + "\" + $localFileName)
    $sourceAudioFileUri = ($audioRecord.Url.Value + "/File/Document")
    $speechApiResultPath = ($localTempPath + "\" + $audioRecord.Uri + ".txt")  
    $speechTextPath = ($localTempPath + "\" + $audioRecord.Uri + "_text.txt")
    #download the audio file if not already done so
    if ( (Test-Path $localPath) -eq $false ) {  
        Invoke-WebRequest -Uri $sourceAudioFileUri -OutFile $localPath
    }
    #convert file if necessary
    if ( ($audioRecord.RecordExtension.Value.ToLower()) -ne "wav" ) {
        $localPath = ConvertTo-Wav $localPath
        $localFileName = [System.IO.Path]::GetfileName($localPath)
        if ( (Test-Path $localPath) -eq $false ) {
            Write-Error "Error Converting $($localPath)"
            return
        }
    }
 
    #transcribe, if not already done so
    Write-Debug ("Checking Speech API Text: "+$speechApiResultPath)
    if ( (Test-Path $speechApiResultPath) -eq $false ) {
        try {
            $bucketFilePath = "$bucketPath/$localFileName"
            Put-BucketFile -bucketFilePath $bucketFilePath -bucketPath $bucketPath -localPath $localPath
            #invoke speech api
            $speechCmd = "gcloud ml speech recognize-long-running $bucketFilePath --language-code=en-US"
            Write-Debug ("Speech API Command: "+$speechCmd)
            Invoke-Expression $speechCmd -OutVariable $speechResult | Tee-Object -FilePath $speechApiResultPath   
            Write-Debug ("Speech API Result: " + $speechResult)    
        } catch {
            Write-Error $_Write-Error $_
        }
    }
 
    #process transcription result
    if ( (Test-Path $speechApiResultPath) -eq $true ) {
        Write-Debug ("Reading Speech Results File: " + $speechApiResultPath)
		#remove previous consolidated transcription file
		if ( (Test-Path) -eq $true ) {Remove-Item $speechTextPath -Force }
		#flush each transcript result to disk
		$content.results | ForEach-Object { $_.alternatives | ForEach-Object { Add-Content $speechTextPath ($_.transcript+' ')  }  }
    } else {
        Write-Debug ("No Speech API Results: " + $speechTextPath)
    }
}
 
#fetch the search results
$audioFiles = Get-CMAudioFilesJson
if ( $audioFiles -eq $null ) {
    Write-Error "No audio files found"
    exit
}
#process each
Write-Debug "Found $($audioFiles.Length) audio files"
foreach ( $audioFile in $audioFiles ) {
    Write-Host "Transcribing $($audioFile.RecordNumber.Value)"
    Get-AudioText  -audioRecord $audioFile
}

Stress Testing GCP CM via ServiceAPI and jmeter

Let's see how my GCP hosted CM infrastructure holds up under ridiculous load.  

jmeter can be downloaded here.  Since this is a pure Java application you'll need to have java installed (I'm using JRE 9 but most should use JDK 8 here).  Note that JDK v10 is not yet officially supported.

After extracting jmeter you launch it by executing jmeter.bat...

2018-05-12_6-39-04.png

Every test plan should have a name, description, and a few core variables...

There are many, many possible ways to build out a test plan.  Each will have at least one thread group though, so I'll start by creating just one.  

2018-05-12_6-49-13.png

As shown below, I created a thread group devoted to creating folders.  I also set error action to stop test.  Later I'll come back and increase the thread properties, but for now I only want one thread so that I can finish the configuration.

Next I'll add an HTTP Request Sampler, as shown below...

2018-05-12_7-37-48.png

The sampler is configured to submit a Json record definition, with values for Title and Record Type.  This is posted to the ServiceAPI end-point for records.  It's as basic as you can get!

I'll also need an HTTP Header Manager though, so that the ServiceAPI understands I'm intending to work with json data.

2018-05-12_7-34-29.png

Last, I'll add a View Results Tree Listener, like so...

2018-05-12_7-40-52.png

Now I can run the test plan and review the results...

2018-05-12_7-46-35.png

After fixing the issue with my json (I used "RecordRecordTitle" instead of "RecordTypedTitle"), I can run it again to see success.

2018-05-12_7-48-46.png

Now I'll disable the view results tree, add a few report listeners, and ramp up the number of threads to 5.  Each thread will submit 5 new folders.  With that I can see the average throughput.

2018-05-12_7-51-36.png

Next I'll increase the number of folders, tweak the ramp-up period, and delay thread creation until needed.  Then I run it and can review a the larger sample set.

2018-05-12_7-54-24.png

This is anecdotal though.  I need to also monitor all of the resources within the stack.  For instance, as shown below, I can see the impact of that minor load on the CPU of the workgroup server.  I'd want to also monitor the workgroup server(s) RAM, disk IO, network IO, and event logs.  Same goes for the database server and any components of the stack.

2018-05-12_7-58-44.png

From here the stress test plan could include logic to also create documents within the folders, run complex searches, or perform actions.  There are many other considerations before running the test plan, such as: running the plan via command line instead of GUI, scheduling the plan across multiple workstations, calculating appropriate thread count/ramp-up period based on infrastructure, and chaining multiple HTTP requests to more aptly reflect end-user actions.

For fun though I'm going to create 25,000 folders and see what happens!

2018-05-12_8-32-43.png