Uncategorized

Riak with Dexy

March 18, 2011   ·   By   ·   1 Comment   ·   Posted in Uncategorized

I have recently been playing with Riak, a key-value store database inspired by dynamo, and in particular with its map-reduce functionality. This gave me a nice excuse to write this blog post which describes the Python interface to Riak, and also demonstrates how Dexy can be used to run and document map reduce jobs and to make use of the returned data.

I will implement the examples in this tutorial using the Riak Python client. This assumes you already have Riak and the Python client library installed and running.

To begin, we import the Python packages we will need:

import csv
import json
import riak

We read the CSV data file used for the examples, and load this into a riak bucket, using the date as a key. This script is designed so that the data is only imported the first time it’s encountered, as the script will be run more than once during the course of writing a Dexy document:

client = riak.RiakClient()
bucket = client.bucket('goog')

f = open("dexy--goog.csv", "r")
for row in csv.DictReader(f):
    key = row['Date'] # Dates are unique, use as keys.
    if not bucket.get(key).exists():
        item = bucket.new(key, data=row)
        item.store()

f.close()

Once we have loaded the data, we can retrieve an individual record like so:

>>> bucket.get("2010-05-05").get_data()
{'High': '515.72', 'Adj Close': '509.76', 'Volume': '4566900', 'Low': '500.47', 'Date': '2010-05-05', 'Close': '509.76', 'Open': '500.98'}
>>> 

Next we’ll work through the examples in this screencast:

Simple Map

A simple map-only job that returns the entire data set. To retrieve this, we simply pass the name of a ready-made JavaScript function.

Here is the original JSON version from the demo:

{"inputs":"goog",
  "query":[{"map":{"language":"javascript",
                   "name":"Riak.mapValuesJson",
                   "keep":true}}
          ]
}

In Python we do:

>>> query = client.add('goog')
>>> query.map("Riak.mapValuesJson")
<riak.mapreduce.RiakMapReduce object at 0x67c930>
>>> results = query.run()
>>> 

We can see how many results are returned:

>>> len(results)
1438
>>> 

And we can peek at the structure of the first record:

>>> results[0]
{'High': '571.74', 'Adj Close': '567.50', 'Volume': '2056300', 'Low': '565.78', 'Date': '2007-09-27', 'Close': '567.50', 'Open': '571.73'}
>>> results[0]['Date']
'2007-09-27'
>>> results[0]['Volume']
'2056300'
>>> 

Max High Value Specific Dates

The next example is a map-reduce job that returns the maximum daily high value in the first week of January. The “first week of January” dates are hard-coded, this is to demonstrate that a map function can be run on a specific list of keys within a bucket.

First, let’s just see how to fetch all the data for specific keys:

>>> query = client.add("goog","2010-01-04")
>>> query.add("goog","2010-01-05")
<riak.mapreduce.RiakMapReduce object at 0x67cc50>
>>> query.add("goog","2010-01-06")
<riak.mapreduce.RiakMapReduce object at 0x67cc50>
>>> query.add("goog","2010-01-07")
<riak.mapreduce.RiakMapReduce object at 0x67cc50>
>>> query.add("goog","2010-01-08")
<riak.mapreduce.RiakMapReduce object at 0x67cc50>
>>> query.map("Riak.mapValuesJson")
<riak.mapreduce.RiakMapReduce object at 0x67cc50>
>>> results = query.run()
>>> for r in results:
...     print r
... 
{'High': '603.25', 'Adj Close': '602.02', 'Volume': '4724300', 'Low': '589.11', 'Date': '2010-01-08', 'Close': '602.02', 'Open': '592.00'}
{'High': '610.00', 'Adj Close': '594.10', 'Volume': '6414300', 'Low': '592.65', 'Date': '2010-01-07', 'Close': '594.10', 'Open': '609.40'}
{'High': '625.86', 'Adj Close': '608.26', 'Volume': '3978700', 'Low': '606.36', 'Date': '2010-01-06', 'Close': '608.26', 'Open': '625.86'}
{'High': '627.84', 'Adj Close': '623.99', 'Volume': '3004700', 'Low': '621.54', 'Date': '2010-01-05', 'Close': '623.99', 'Open': '627.18'}
{'High': '629.51', 'Adj Close': '626.75', 'Volume': '1956200', 'Low': '624.24', 'Date': '2010-01-04', 'Close': '626.75', 'Open': '626.95'}

Now let’s do the task at hand, finding the max high during these dates. This task involves both a map phase (to retrieve the daily high data) and a reduce phase (to analyze the returned data to determine the highest sell value).

Here is the original JSON:

{"inputs":[["goog","2010-01-04"],
           ["goog","2010-01-05"],
           ["goog","2010-01-06"],
           ["goog","2010-01-07"],
           ["goog","2010-01-08"]],
 "query":[{"map":{"language":"javascript",
                  "source":"function(value,keyData,arg){ var data = Riak.mapValuesJson(value)[0]; return [data.High];}"
                 }},
          {"reduce":{"language":"javascript","name":"Riak.reduceMax","keep":true}}]
}

And here is our Python version:

>>> query = client.add("goog","2010-01-04")
>>> query.add("goog","2010-01-05")
<riak.mapreduce.RiakMapReduce object at 0x67cab0>
>>> query.add("goog","2010-01-06")
<riak.mapreduce.RiakMapReduce object at 0x67cab0>
>>> query.add("goog","2010-01-07")
<riak.mapreduce.RiakMapReduce object at 0x67cab0>
>>> query.add("goog","2010-01-08")
<riak.mapreduce.RiakMapReduce object at 0x67cab0>
>>> query.map("""function(value) {
...     var data = Riak.mapValuesJson(value)[0];
...     return [data.High];
... }""")
<riak.mapreduce.RiakMapReduce object at 0x67cab0>
>>> query.reduce("Riak.reduceMax")
<riak.mapreduce.RiakMapReduce object at 0x67cab0>
>>> first_week_jan_max = query.run()[0]
>>> first_week_jan_max
'629.51'
>>> 

Rather than fetching all the data in the map phase, now we just fetch the High values, using a custom map function to do so rather than the standard function used before. The reduce phase returns the maximum value among the values returned by the map. Note that we aren’t asking about which date had the max High, or anything other than the value of this High, so we don’t need to return anything else from the map.

Map Highs By Month

This example illustrates a custom map followed by a custom reduce, we want to find the maximum high value in each month.

Here is the original JSON:

{
    "inputs":"goog",
    "query":[
        {"map":{"language":"javascript", "source":"function(value, keyData, arg){ var data = Riak.mapValuesJson(value)[0]; var month = value.key.split('-').slice(0,2).join('-'); var obj = {}; obj[month] = data.High; return [obj];}"}},
    {"reduce":{"language":"javascript", "source":"function(values, arg){ return [ values.reduce(function(acc, item){ for(var month in item){ if(acc[month]) { acc[month] = (acc[month] < item[month]) ? item[month] : acc[month]; } else { acc[month] = item[month]; } } return acc; })];}", "keep":true}}
]
}

And here is the Python implementation, showing the first few values returned:

>>> query = client.add('goog')
>>> query.map("""function(value, keyData, arg) {
...     var data = Riak.mapValuesJson(value)[0];
...     var month = value.key.split('-').slice(0,2).join('-');
...     var obj = {};
...     obj[month] = data.High;
...     return [obj];
... }""")
<riak.mapreduce.RiakMapReduce object at 0x1937810>
>>> query.reduce("""function(values, arg) {
...     return [ values.reduce(function(acc, item) {
...     for(var month in item) {
...         if(acc[month]) {
...             acc[month] = (acc[month] < item[month]) ?  item[month] : acc[mon th];
...         } else {
...             acc[month] = item[month];
...         }
...     }
... return acc; })];
... }""")
<riak.mapreduce.RiakMapReduce object at 0x1937810>
>>> 
>>> monthly_highs = query.run()
>>> for k in sorted(monthly_highs[0].keys())[0:5]:
...     print "high in month", k, "was", monthly_highs[0][k]
... 
high in month 2004-08 was 113.48
high in month 2004-09 was 135.02
high in month 2004-10 was 199.95
high in month 2004-11 was 201.60
high in month 2004-12 was 199.88

Days over $600

The last of the examples in the screencast was to find a list of all days on which the High was over $600. This can be done just with a map, we don’t need a reduce phase.

Here is the original JSON:

{"inputs":"goog",
 "query":[{"map":{"language":"javascript",
                  "source":"function(value, keyData, arg) { var data = Riak.mapValuesJson(value)[0]; if(data.High && parseFloat(data.High) > 600.00) return [value.key]; else return [];}",
                  "keep":true}}]
}

And here is the Python implementation:

>>> js = """function(value, keyData, arg) {
...   var data = Riak.mapValuesJson(value)[0];
...   if(data.High && data.High > 600.00)
...     return [value.key];
...   else
...     return [];
... }"""
>>> 
>>> query = client.add('goog')
>>> query.map(js)
<riak.mapreduce.RiakMapReduce object at 0x1937730>
>>> days_over_600 = query.run()
>>> len(days_over_600)
87
>>> 

Save Data

Now that we’ve worked through these examples, let’s do a little more with them to take advantage of some of Dexy’s functionality. First, we’ll save the monthly high data we retrieved earlier to a CSV data file. Dexy’s ‘fn’ filter helps with filenames, we specify filenames like

dexy--highs-by-month.csv

which will be converted to random filenames when we actually run this code. This way, we can easily access this filename later and also know that each time the script is run, the data is saved to a unique file, so if we change our script we can be confident that the data in the file is what we expect it to be.

Here’s the code as written:

f = open("dexy--highs-by-month.csv", "w")
w = csv.writer(f)
for k, v in monthly_highs[0].items():
    w.writerow([k, v])

f.close()


data = {
    "number-days-over-600" : len(days_over_600),
    "first-week-jan-max" : first_week_jan_max
}

f = open("dexy--map-reduce-results.json", "w")
json.dump(data,f)
f.close()

And here’s what it looks like when it is run, after the filenames have been substituted:

>>> f = open("af8c0906-1c6e-4b01-a62e-d7af505bbd53.csv", "w")
>>> w = csv.writer(f)
>>> for k, v in monthly_highs[0].items():
...     w.writerow([k, v])
... 
>>> f.close()
>>> 
>>> 
>>> data = {
...     "number-days-over-600" : len(days_over_600),
...     "first-week-jan-max" : first_week_jan_max
... }
>>> 
>>> f = open("27314a82-503b-40cc-9af1-e84b04ef69b9.json", "w")
>>> json.dump(data,f)
>>> f.close()
>>> 

You’ll notice that we saved the monthly highs to a CSV file, and saved some other data to a JSON file.

R

Now we can make use of this CSV file we’ve just created by opening it in R and analyzing and graphing the data.

> data = read.csv("af8c0906-1c6e-4b01-a62e-d7af505bbd53.csv",
+     col.names=c("Date", "High"),
+     header=FALSE)
> summary(data$High)
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
  113.5   373.0   459.4   446.4   539.5   747.2 
> 
> 
> library(zoo)
> zoo.data <- zoo(data$High, order.by=data$Date, frequency=12)
> 
> png(file="06ff3af8-26d7-4444-8383-f553924eb2f5.png", width=500, height=500)
> plot(
+     zoo.data,
+     col="purple",
+     main="Monthly Highs")
> dev.off()
null device 
          1 
> 

We’ll show the graph generated in the next section.

Writing

Dexy can help with writing tutorials and other forms of software documentation, as in the first few sections of this post where we showed Python scripts for interacting with Riak. But for those times when you are interested in the output, rather than just the technique, of running code, then Dexy can also help with writing up your reports. We have run a number of queries, and generally this is because we’d like to do something useful with the output. With data science, you probably want to report your analysis either to yourself or for others so you can make decisions or learn from what you’ve analyzed. This can be done with Dexy, with the advantages of:

  • being able to re-run your report at any time to update it, either with improved analysis code or fresh data
  • being able to incorporate your results and the documentation for how you generated those results into the same document, so it’s easy to check, audit and share your methods
  • eliminating errors due to manually copying the results of calculations into reports

So, let’s do this now. I have highlighted the dynamically generated values in blue. Remember that the source code for this blog post (and any others that have been generated using Dexy) is available on bitbucket. Here is our “report”, complete with graph:

There were 87 days during the period on which the share price was over 600. During the first week in January 2010, the maximum daily high was 629.51.

Here is a graph which shows the monthly highs during the period:

These numbers in blue came from the data we saved earlier to a JSON file. JSON is a convenient way to share data between different scripts or documents in a Dexy project, in this case to do some calculations in one place (the Python script) and to access the results of those calculations elsewhere (this blog post).

Finally, here is the .dexy file for this blog post:

{
  "demo.py|fn|idio|pycon|pyg" : {
    "inputs" : ["goog.csv|dexy"]
  },
  "demo.R|fn|r|pyg" : {
    "inputs" : ["demo.py|fn|idio|pycon|pyg"]
  },
  "@simple-map.json|pyg" : {
    "url" : "http://wiki.basho.com/attachments/simple-map.json"
  },
  "@map-high.json|pyg" : {
    "url" : "http://wiki.basho.com/attachments/map-high.json"
  },
  "@map-highs-by-month.json|pyg" : {
    "url" : "http://wiki.basho.com/attachments/map-highs-by-month.json"
  },
  "@sample-highs-over-600.json|pyg" : {
    "url" : "http://wiki.basho.com/attachments/sample-highs-over-600.json"
  }
}

One Comment
  1. BradfordW

    Not sure if it would be useful to you or not, but I had implemented a bucket->csv tool for Riak, specifically out of a need I had to get my JSON docs out of riak and into R without all the decoding overhead. You can nab it here: https://github.com/bradfordw/riak_csv

    Nice post, I hope more people see the value in using R along side Riak.