Sunday, September 28, 2014

Exporting GAE Datastore data to MongoDB for reporting


GAE's datastore is a high performance distributed NoSQL DB that can support pretty much anything you through at it. The problem is: it's virtually impossible to query!

1. Getting large data sets after their in there simply does not work using the documented ways. After quite a bit of workarounding around issues, we have put up a nice little open source Remote API utility (we like Golang), but even that cannot handle downloading datastore records when they come by the millions.  We also took the approach of writing JSON as logs, and then we put up we service which runs the GAE logs downloader, and then we dumped that into a DB - but this also fails when logs are too large (> 1 GB).
2. Writing queries may only be done in GQL, which is very limited in it's syntax and cannot be used for very simply queries (you cannot event use COUNT), not to mention complex ones (groups, pivots, etc).

As long as you use Datastore as "an operational" DB - the DB that the production system is using to insert data into then it's actually quite a good datastore. This is pretty much correct for every large scale system: you would never use your "operational" DB as your reporting DB - it simply would not work - reporting-friendly DB's are slow, read and write locks would drive you insane and cause data loss.

So what can be done? Well, you can choose to stream your data to BigQuery directly (if you are doing that with Golang, check our our go-gae-bigquery package). Otherwise, one possible solution is by doing this:

1. Http Event Handlers Persist Data to the Datastore

Using the Golang datastore package, it's super easy to write amazingly fast tracking handlers. The StreamRail Golang GAE handler can handle around 500 req/sec while using just 8-14MB of RAM memory on a single GAE instance. Writes to the Datastore are super fast, and if a lot of work needs to be done before inserting, it's possible to defer the task to a queue using the delay package.

2. Using the datastore admin, we export a backup of the entities we want to query to Google Cloud Storage 

Using the datastore admin, we export a backup of the desired entities. On Google Cloud Storage, we have a designated bucket for each entity, and the backups are performed regularly. The backup process launches MapReduce jobs to your GAE task queues, which means you are billed for the operation, but it's fast, reliable, and requires zero effort on your behalf (except for maybe keeping an eye on the job progress if it's taking too long).

It is sometimes advisable to create different entity kinds to store the data - for example "TrackingMetrics_yyyyMMdd_hhmmss". This will allow you to backup only bits of the data that you need for the report. It will also make the backup process faster and cheaper (it will also make the reports simpler, but that's a different story).

After the backup process is done, you can easily download the backup from Cloud Storage by executing a gsutil command.

3. Loading the Data to MongoDB

The backup is a bunch of files in Google's leveldb format. We use a simple python parser to read the contents of the DB and load into MongoDB. We load in bulks of 10K for better monitoring of progress. This is the script, roughly:

import sys
import os
import json
from bson import json_util
from pymongo import MongoClient

sys.path.append('/usr/local/google_appengine')
from google.appengine.api.files import records
from google.appengine.datastore import entity_pb
from google.appengine.api import datastore

db_dir = './leveldb/'
db_name = 'dbName'
collection_name = 'collectionName'

cl = MongoClient()
coll = cl[db_name][collection_name]

files = [db_dir + f for f in os.listdir(db_dir) if 'output' in f]
entities = list()
i = 0
bulkSize = 10000
for f in files:
 raw = open(f, 'r')
 reader = records.RecordsReader(raw)
 for record in reader:
  i += 1
  if i > bulkSize:
   coll.insert(entities)
   print 'bulk inserted ' + str(len(entities)) + ' entities'
   entities = list()
   i = 0
  else:
   entity_proto = entity_pb.EntityProto(contents=record)
   entity = datastore.Entity.FromPb(entity_proto)
   entity['Date'] = str(entity['Date'])
   j = json.dumps(entity, default=json_util.object_hook)
   entities.append(json.loads(j))
coll.insert(entities)
print 'bulk inserted ' + str(len(entities)) + ' entities'
5. We have some scripts that execute complex queries against MongoDB, which is very suitable for that. Typically, the script would would create a new collection with the data to be exported as the report.

6. Using mongoexport, we export the report table onto a CSV file, which is sent by email as the report to the interested parties. This is the script, roughly:

#!/bin/bash

OIFS=$IFS;
IFS=",";

dbname=DBNAME
host=localhost:27017
collection=$1
DATE_EXACT=`date +%Y%m%d_%H%M%S`
DATE=`date +%Y-%m-%d`

mkdir ../Reports
mkdir ../Reports/$DATE
# get comma separated list of keys. do this by peeking into the first document in the collection and get his set of keys
keys=`mongo "$host/$dbname" --eval "rs.slaveOk();var keys = []; for(var key in db.$collection.find().sort({_id: -1}).limit(1)[0]) { keys.push(key); }; keys;" --quiet`;
# now use mongoexport with the set of keys to export the collection to csv
mongoexport --host $host -d $dbname -c $collection --fields "$keys" --csv --out ../Reports/$DATE/$dbname.$collection.$DATE_EXACT.csv;

IFS=$OIFS;

No comments:

Post a Comment