Stars

Stars : Data Science Remix

Stars automates data science stacks.

Stars’ end game is composing scalable data science stacks with predictability and reproducibility.

Stars was born two years ago as a small Mesos 0.21.0 cluster running Spark 1.4 on about 200 cores and 1.5 TB of RAM. That system was itself, descended from earlier Mesos clusters built at RENCI.

It had a command line interface to Spark and enabled the team to explore a number of analytic modalities done the Spark way including mapreduce, SQL, graph analytics, machine learning, and streaming. We also used it to analyze GitHub data, serve micro-services, and construct word embedding models used in informatics research.

In hindsight, it seems quaint. We’ve learned a lot about how to run a data science platform in the meantime.

Along the way, we’ve made more extensive use of a number of technologies and added some new ones to the toolkit. Here’s a list of favorites:

Component

Role

Docker Custom configurations of software components that that can be scaled horizontally, configured for specific operating system versions, and isolated from other processes.
Ansible Automates configuration of the container orchestration layer with DevOps rules written in distinct modules.
Notebooks Apache Zeppelin and Jupyter Lab are extremely useful for collaboration and visualization.
Blazegraph A high performance triplestore for semantic graphs with SPARQL support.
Livy A programmatic REST based interface to Spark, used to connect Jupyter Lab
Lightning A great visualization library that integrates nicely into the notebook computing setting.

What’s New?

So the latest version of Stars

  • Uses the latest stable Zookeeper, Mesos, and Marathon.
  • Moves container discovery services to Mesos-DNS.
  • Moves to the latest version of Spark.
  • Adds Livy to allow Jupyter Lab applications to connect to Spark.
  • Automates deployment via Ansible.
  • Uses Docker as the default Mesos container provider
  • Deploys some services via Docker.

Here’s Jupyter Lab (Jupyter’s new IDE environment) connecting to our Spark cluster over Livy. This will make it really easy to spin up a few hundred parallel compute jobs in an interactive environment and is compatible with our existing Jupyter notebooks.

 

New Interface

A few notable aspects of the new interfaces of Marathon and Mesos are:

  • Mesos has explicit support for GPUs
  • Marathon has
    • Better filtering capabilities and seems more robust
    • Supports editing configurations
    • Supports a much richer set of job configuration options

Here are a couple of shots of the two:

Health monitors are pretty dynamic and accurate. If an app is not healthy or deploying, Marathon lets you know.

Granular control over resource allocation is reliable and well communicated.

The majority of the tasks in this shot of Mesos map directly to jobs in Marathon The exception is the list of tasks with IDs 0-5. Those are Apache Spark jobs initiated from a Zeppelin notebook. Like Marathon, Zeppelin connects to Mesos as a framework to launch Spark jobs:

Status

The deployment package composes an Ansible playbook from a number of roles and the playbook architecture is described here.

It should be noted that this has only been tested in a Centos7 environment, though many of the Ansible plays are reused and are said to support Ubuntu.

Here’s a map of the current state of the prototype:

Common

Component

Version

Java JDK 1.8.0
Python 3.6.2
Spark 2.1.1
Maven 3.3.9

Master

Component

Version

Mesos (Master) 1.3.0
Chronos 3.0.0
Marathon 1.4.7
Mesos-DNS 0.6.0
Zookeeper 3.4.6

Worker

Component

Version

Mesos (Worker) 1.3.0
Docker 1.12.6

Next

This has been tested on a development cluster. As soon as some networking issues get sorted out,  production will be upgraded using the same playbook.

After that, some items of interest include:

  • The architecture of the playbook
  • Possibly moving more things to Docker
  • Configure Mesos-DNS at the workers
  • Kubernetes on Mesos
  • Neo4J integration with Spark

If you have others you think should take priority, please comment.

I’ll also take this opportunity to say I’m not using DC/OS even though it looks pretty great mainly because it doesn’t seem open enough for our purposes. If you have evidence to the contrary, I’d be interested in learning more, so please share in the comments. Thanks.

Green Translator’s GraphQL Alpha

A Brief History of Federated Services

Developer types may prefer to skip this strictly historical preamble provided for context.

Programs on multiple computers cooperating is an ancient computer science problem. Before the web, raw Internet Protocol (IP) sockets, COM, CORBA, RMI and other protocols took the first ugly steps on onto the network. But at the end of the twentieth century, many folks found themselves federating services across the internet and ditched those approaches in favor of open data formats transmitted as documents over HTTP. Thus began XML/HTTP which devolved into the unfortunately bloated SOAP protocol for a mercifully brief dark age.

Then age of REST gave us JSON over HTTP.

That was ok but there was little context for the data that came back. So HATEOAS became the first generation of self describing REST services. REST services also did nothing to link data in one response to other data. So in the meantime, the semantic web community was developing RDF, OWL, and SPARQL. This stack provides remote query access to linked data with extensive knowledge about interconnections.

Soon after, we got Swagger, a meta data rich way to define an API that is still technically REST, but systematically defined including endpoints and composite type definitions. It also provides extensive tooling including code generation for running servers and clients as well as a form of graphical interface. smartAPI layers additional semantic content on top of Swagger, now known as OpenAPI.

GraphQL

At one level, GraphQL could be seen as only the latest in a long line of technologies for remote procedure call over HTTP. As of this writing, its relative merits are hotly debated.

But GraphQL is different from recent ancestors like Swagger. Lots has been written on this so we’ll call attention to just a few items here:

  • It’s a feature rich query language that lets clients
    • Include, omit, and parameterize fields
    • Control the structure of the response
  • The type system
    • Is perhaps more robust than OpenAPI’s
    • Provides very good documentation support for rapid, easy comprehension by clients
    • Is supported by a formal interface definition language (IDL)
    • It supports type metadata introspection

Perhaps one way to think of GraphQL that distinguishes it from this lineage, is that it is really first and foremost a query language.

Green Translator via GraphQL

This is the graphical query interface for our new Green Translator GraphQL service (strictly a development server. may go up and down).

The far left pane is a patient query specifying all available fields.

The middle pane shows the structure of a response.

And the third pane shows the hyperlinked API documentation.

On the query: GraphQL is a full featured query language allowing the inclusion, omission, and parameterization of arbitrary fields. So, as GraphQL refers to itself, it may be better to think of GraphQL as a query language for an API rather than an API in the traditional sense, per se.

On the response: The main thing to note here is that the structure of a GraphQL response is a function of the query. The client need never receive more data than it requires.

On the API docs: While Swagger has definitions, the interface for browsing GraphQL types is very convenient, hyperlinked, and will be familiar to users of online documentation for Python, Java, and other languages.

Next

Linked Data: It’s still not obvious what the best way to associate JSON-LD with a GraphQL response is. Here’s one promising path.

Translator Registry: We want the JSON-LD annotation to facilitate incorporation in the Translator Registry. Undoubtedly, other issues will also surface.

Provenance: How do we incorporate provenance?

Client: Update the Green Translator API (and CQ notebooks) to use this endpoint.

Generalized Query: Look into the potential role of GraphQL, not as an alternative to OpenAPI, but as a query language over Translator as a whole.

Appendix A: Other Queries to Try:


{
  exposureConditions (chemicals: [ "D052638" ] ) {
    chemical
    gene
    pathway
    pathName
    pathID
    human
  } 
}

{
  drugsByCondition (conditions: [ "d001249" ] ) {
    genericName
  } 
}

{
  genePathsByDisease (diseases: [ "d001249" ] ) {
    uniprotGene
    keggPath
    pathName
  } 
}

{
  __type(name: "Prescription") {
    name
    fields {
      name
      type {
        name
        kind
      }
    }
  }
}

 

Playing With 4.5M Rows of Floats

Suppose

  • You had four and a half million rows of data
  • Each contained an id and three sizable arrays of floats
  • The CSV file they’re in is a little over a quarter terabyte
  • You want to play with it interactively. And you like Python.

For this experiment, we’ve configured the Zeppelin PySpark interpreter to run at 60 CPUS and 15GB of RAM.

We’re also taking the additional step of wrapping each row in a Python object. This is more out of curiosity than prescriptive, as it adds overhead in time and space.

So first, we include a Markdown note to describe the data, then a PySpark note to define the class abstraction for each row.

Above, we use the Spark CSV importer to specify our delimiter, strip the header and map our class to each row. Ultimately, we load all of this into the ‘curves’ RDD. Recall that in Spark, this sets up the execution chain the RDD represents but loads no data.

The execution of the pipeline happens when we attempt to use a result of an RDD operation. So counting the members runs the pipeline. Somewhat unexpectedly, Zeppelin is well integrated enough with Spark to render a reliable progress meter for the workflow (above).

I’m pretty sure summing is not on the list of things the folks who produced this data want to do with it, but this demonstrates that our RDD now contains about four and a half million Python LightCurve objects with three Numpy arrays of floats, ready for serious analytic fun to begin.

Next

Unfortunately, it’s not clear that one can parameterize the PySpark interpreter for a single notebook separately. This means that when someone creates another PySpark interpreter it will also get 60 CPUs. Fortunately, few of our notebooks at the moment require Spark. And there may indeed be a way to do this that I’ve not stumbled across. If you know what that is, please share.

 

 

 

Provenance in a Data Lake

Goal

We’d like a rigorous model of the execution of a complex query. Ideally, it would be a structured document that can be serialized to formats including JSON and XML for automated analysis, and visualization by human users.

If “q” is a query we’ve executed against complex heterogeneous sources, it would be good to be able to do something like this to understand which data sources were analyzed, and what algorithms and assumptions were incorporated:

This post is an overview of a technical approach to enable this using W3C standards in a Python environment.

Motivation

We answer complicated science questions by integrating heterogeneous data sources. We link semantic web graph data, results from REST service calls, data from CSV files, and other sources to produce query results.

But once a query result is produced, how is the user to know what it means or how to value it? How is the user to know which sources were used to generate the result? Which assumptions and algorithms were incorporated in the process? How long did the query’s constituent activities each take?

These questions broadly fall into the realm of provenance.

W3C PROV

The World Wide Web Consortium created the PROV standard to describe these sorts of concepts. It defines abstractions and how they interrelate to generically describe provenance for a wide range of generative processes. The standard also defines a variety of output formats so that provenance data produced with PROV can be widely consumed.

For our purposes, it is convenient to interact with PROV from a Python environment. The prov module supports creating, serializing, and visualizing PROV documents.

Usage

We want a framework in which developers can query data sources without thinking about provenance, yet have it captured reliably and thoroughly. Developers should only think in terms of the domain content of their queries. They should get a structured document describing the details of the algorithms used and the data sources consulted.

To do that, we provide a Query class with methods for accessing heterogeneous resources in a data lake environment. Each data access method is annotated with a provenance decorator. The provenance decorator inspects each method invocation at runtime, recording facts about the invocation including arguments, data source names, times and so on. Readers familiar with aspect oriented programming will note that this decorator works much like an aspect.

It is also possible to inspect invocation arguments in detail. One use, in our prototype, is to parse SPARQL queries, identify the namespaces involved in a query, and to record those in our provenance context. We can also record the entire text of the SPARQL query in the provenance object, though this is omitted in the example below for brevity.

Usage in the prototype looks like this:

Importantly, the developer using the query interface does nothing to record provenance data.

The document above could be used to generate more user friendly feedback to explain the query execution process.

Next, we’ll describe the approach a bit further.

Approach

Prov Abstractions

First we create abstractions for interacting with the prov library. QueryResponseProvenance wraps a prov document object. This lets us make some simplifying assumptions about how we’ll use provenance. AbstractEntity plays a similar role with respect to prov entities.

DataLake

Next, we create a DataLake class derived from QueryResponseProvenance. It’s main role is to examine specific kinds of data interactions and characterize them using the provenance document. For the initial implementation, it provides a method to parse SPARQL queries and log their characteristics including the RDF IRIs of queried resources. One could imagine it being extended to provide similar semantics for other query settings like SQL, or an HTTP path, query string, or POST parameters.

GreenTranslatorProvenance

This class derives from DataLake and principally parameterizes DataLake with information specific to this use case. In particular, it specifies a list of namespaces for specific data sources and algorithms that can be used in Green Translator.

Decorator

The Python decoration capability lets us define a function that can be used to augment the execution of other functions. We define a provenance decorator which inspects the name of called functions and conditionally records facts about their invocation.

Query

The Query class defines methods for accessing data sources within the translator data lake. One executes a SPARQL query and another invokes a smartAPI REST service. In this prototype, the accessors are for demonstration purposes and do not have a full set of real world parameters.

Each accessor method is annotated with the provenance decorator function. This means that, transparent to developers who invoke the methods, provenance information is recorded about the interaction within the context of the query.

Next

While clearly a prototype, this is one way to track query execution that:

  • Provides granular logging of data sources and algorithms used
  • Output of a structured document in XML, RDF, JSON and other formats
  • Enables visualization of the process graph
  • Is transparent to developers using the query interface
  • Works seamlessly in a notebook environment

It’s unclear that the specific syntactic choices in this prototype are ideal. In particular, the use of a Python decorator object does substantially abstract details of provenance away from the query class. But it makes the decorator specific to the Query class’ implementation. And there’s nothing structural that would prevent us from just including the provenance code in the Query class. With the implemented method, though, it would be easier to swap out a different implementation of provenance or to add logging without bloating the Query logic. So that’s one potentially durable justification.

If W3C PROV does what we need, we’ll need to think for a bit about the classes of entities, activities, and other items to represent and how to do that. Perhaps there is an existing ontology we can use or extend.

Evaluation

Big questions to folks who have expressed a general desire for this sort of thing:

  • Does this do what we need?
  • Is there a substantially better approach?

Appendix A – Code


""" This part is generic provenance infrastructure """

import prov.model as prov
from datetime import datetime
from datetime import date

class QueryResponseProvenance(object):
    """ Abstract W3C PROV model of the provenance of components of a complex query. """
    def __init__(self, default_ns, namespaces=[]):
        """ Specify a default namespace and associated sub namespaces """
        self.document = prov.ProvDocument ()
        self.default_ns = default_ns
        self.document.set_default_namespace (self.default_ns)
        self.namespaces = namespaces
        self.subspaces = {}
        for namespace in self.namespaces:
            self.subspaces[namespace] = self.add_namespace (self.default_ns, namespace)
    def add_namespace (self, root, qualifier):
        subspace = "{0}{1}".format (root, qualifier)
        self.document.add_namespace (qualifier, subspace)
        return subspace
    def add_entity (self, name, tuples):
        self.document.entity (name, tuples)
    def add_data_source (self, name, entity):
        self.entity (name, entity.to_tuple ())
    def add_algorithm (self, name, start, end=None):
        assert name in self.namespaces, "Name must be in list of namespaces"
        if end:
            self.document.activity (name, start, end)
        else:
            self.document.activity (name, start)
    def get_time (self):
        return datetime.now ().strftime ("%Y-%m-%dT%H:%M:%S")
    def __str__(self):
        return self.__repr__()
    def __repr__(self):
        return self.document.get_provn ()

class AbstractEntity(object):
    def __init__(self, type, namespace, attributes=[]):
        self.attributes = []
        self.namespace = namespace
        self.attr_keys = {}
        self.add_attribute (prov.PROV_TYPE, type)
        for a in attributes:
            assert len(a) == 2, "Attribute components must be len==2 arrays"
            self.add_attribute (a[0], a[1])
    def add_attribute (self, iri, value):
        key = '{0}@{1}'.format (iri, value)
        if not key in self.attr_keys:
            self.attributes.append ((iri, value))
            self.attr_keys[key] = 1
    def to_tuple (self):
        return tuple(self.attributes)

import re

""" DataLake Support - encapsulate provenance behaviors specific to
    specific classes of data resources """

class DataLakeProvenance(QueryResponseProvenance):
    PREFIX = re.compile ('^prefix ', re.I)
    def __init__(self, default_ns, namespaces=[]):
        QueryResponseProvenance.__init__(self, default_ns, namespaces)

    def parse_sparql (self, text, source_map, type="data"):
        sources = {}
        for line in text.split ('\n'):
            line = ' '.join (line.strip ().split ())
            match = self.PREFIX.match (line)
            if match is None:
                continue
            if True:
                parts = line.strip().split (' ')
                if len(parts) >= 3:
                    iri = parts[2].strip ()
                    e = None
                    for k, v in source_map.iteritems ():
                        if k in iri:
                            if v in sources:
                                e = sources[v]
                            else:
                                e = AbstractEntity (type, '{0}{1}'.format (self.default_ns, v))
                                sources[v] = e
                            e.add_attribute ('src', v)
                    if e is None:
                        e = AbstractEntity (type, 'http://purl.data.org/')
                        e.add_attribute ('src', iri)
                        self.add_entity ('data', e.to_tuple ())
        for k, v in sources.iteritems ():
            self.add_entity ('data', v.to_tuple ())

""" And now we get really specific about one data lake, the green translator. """

class GreenTranslatorProvenance (DataLakeProvenance):
    def __init__(self):
        DataLakeProvenance.__init__(
            self,
            default_ns = 'http://purl.translator.org/prov/',
            namespaces = [
                'clinical', 'enviro', 'medbiochem',                            # data sources
                'exposure.pm25-ozone', 'clinical.med.prescribed', 'blazegraph' # algorithms / assumptions
            ])
    def parse_sparql (self, query):
        super(GreenTranslatorProvenance, self).parse_sparql (
            query,
            source_map = {
                '<http://chem2bio2rdf.org/ctd/' : 'c2b2r.ctd',
                'GO_' : 'GO',
                '<http://chem2bio2rdf.org/drugbank' : 'c2b2r.drugbank',
                'monarch' : 'monarch'
            },
            type="medbiochem:data")

"""

Far from the only possible approach, this treats the collection of provenance like an aspect, 
separate in implementation from  the operations it decorates, it is woven in at runtime via 
Python's decorator functionality.

"""

def provenance ():
    def provenance_aspect(function):
        def wrapper(*args, **kwargs):
            start = args[0].provenance.get_time ()
            function(*args, **kwargs)
            end = args[0].provenance.get_time ()
            if 'patient' in function.__name__ or 'clinical' in function.__name__:
                datasource = 'clinical'
                provenance.add_algorithm ('clinical.med.prescribed', start, end)

                e = AbstractEntity ('clinical:data', '{0}{1}'.format ('http://purl.clinical.org/', datasource))
                e.add_attribute ('src', datasource)
                args[0].provenance.add_entity ('clinical:data', e.to_tuple ())

            elif 'get_exposure' in function.__name__:
                args[0].provenance.add_algorithm ('exposure.pm25-ozone', start, end)

                e = AbstractEntity ('enviro:data', 'http://purl.exposure.org/exposure')
                e.add_attribute ('enviro:src', 'exposure')
                args[0].provenance.add_entity ('enviro:data', e.to_tuple ())

                e = AbstractEntity ('enviro:exposure', 'http://purl.exposure.org/call')
                e.add_attribute ('enviro:call', '{0}({1}{2})'.format (function.__name__, args[1:], kwargs))
                args[0].provenance.add_entity ('enviro:call', e.to_tuple ())

            elif 'query_sparql' in function.__name__:
                args[0].provenance.add_algorithm ('blazegraph', start, end)
                args[0].provenance.parse_sparql (args[1])
                
                if False:
                    e = AbstractEntity ('medbiochem', 'http://purl.medbiochem.org/query')
                    e.add_attribute ('medbiochem:query', '{0}'.format (args[1]))
                    args[0].provenance.add_entity ('query', e.to_tuple ())                
        return wrapper
    return provenance_aspect

"""

Here's a very rough sample of a hypothetical interface for querying a variety of data sources.

With respect to the person whose focus is on querying data, this is still essentially infrastructure.

"""

from string import Template
from SPARQLWrapper import SPARQLWrapper2, JSON
import urllib2

proxy = urllib2.ProxyHandler({'http': 'gateway.ad.renci.org:8080'})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)

blazegraph_uri = "http://stars-blazegraph.renci.org/bigdata/sparql"
blazegraph = SPARQLWrapper2 (blazegraph_uri)

class Query (object):
    
    def __init__(self):
        self.provenance = GreenTranslatorProvenance ()

    @provenance()
    def get_exposure (self, start, end):
        return requests.post (
            "https://exposures.renci.org/v1/getExposureScore",
            data = {
              "etime": "1985-04-12",
              "exposure": "pm25",
              "loc": "35.720278,-79.176389,Sa,35.731944,-78.852778,Su",
              "stime": "1985-04-12",
              "tres": "string",
              "tscore": "string"
            }).json ()

    @provenance()
    def query_sparql (self, query, service=blazegraph):
        service.setQuery (query)
        service.setReturnFormat (JSON)
        return service.query().convert ()
        
    def plot (self):
        self.provenance.document.plot ()


""" TEST 

Finally, here's a usage scenario. This is what it should look like for a developer to use 
the Translator framework to query resources and get a provenance model with no explicit effort.

"""

sparql_query = """
PREFIX db_resource:      <http://chem2bio2rdf.org/drugbank/resource/>
PREFIX ctd_chem_disease: <http://chem2bio2rdf.org/ctd/resource/ctd_chem_disease/> 
PREFIX biordf:           <http://bio2rdf.org/>
PREFIX ctd:              <http://chem2bio2rdf.org/ctd/resource/>
SELECT DISTINCT ?chem_disease ?meshid ?compound ?drug
WHERE {
  ?chem_disease ctd:diseaseid ?meshid .
  ?chem_disease ctd:cid       ?compound .
  ?drug         db_resource:CID ?compound
}"""

q = Query ()
q.get_exposure (start='1985-04-12', end='2017-04-12')
q.query_sparql (sparql_query)
print (q.provenance)

Output: The default output format is a simplified hierarchical document.

document
  default <http://purl.translator.org/prov/>
  prefix exposure.pm25-ozone <http://purl.translator.org/prov/exposure.pm25-ozone>
  prefix clinical <http://purl.translator.org/prov/clinical>
  prefix medbiochem <http://purl.translator.org/prov/medbiochem>
  prefix blazegraph <http://purl.translator.org/prov/blazegraph>
  prefix enviro <http://purl.translator.org/prov/enviro>
  prefix clinical.med.prescribed <http://purl.translator.org/prov/clinical.med.prescribed>
  
  activity(exposure.pm25-ozone, 2017-03-25T11:56:12, 2017-03-25T11:56:13)
  entity(enviro:data, [prov:type="enviro:data", enviro:src="exposure"])
  entity(enviro:call, [prov:type="enviro:exposure", enviro:call="get_exposure((){'start': '1985-04-12', 'end': '2017-04-12'})"])
  activity(blazegraph, 2017-03-25T11:56:13, 2017-03-25T11:56:14)
  entity(data, [prov:type="medbiochem:data", src="<http://bio2rdf.org/>"])
  entity(data, [prov:type="medbiochem:data", src="c2b2r.drugbank"])
  entity(data, [prov:type="medbiochem:data", src="c2b2r.ctd"])
endDocument

And here’s the JSON version:

{
  "activity": {
    "blazegraph": {
      "prov:endTime": "2017-03-25T11:34:51", 
      "prov:startTime": "2017-03-25T11:34:48"
    }, 
    "exposure.pm25-ozone": {
      "prov:endTime": "2017-03-25T11:34:48", 
      "prov:startTime": "2017-03-25T11:34:48"
    }
  }, 
  "entity": {
    "data": [
      {
        "prov:type": "medbiochem:data", 
        "src": "<http://bio2rdf.org/>"
      }, 
      {
        "prov:type": "medbiochem:data", 
        "src": "c2b2r.drugbank"
      }, 
      {
        "prov:type": "medbiochem:data", 
        "src": "c2b2r.ctd"
      }
    ], 
    "enviro:call": {
      "enviro:call": "get_exposure((){'start': '1985-04-12', 'end': '2017-04-12'})", 
      "prov:type": "enviro:exposure"
    }, 
    "enviro:data": {
      "enviro:src": "exposure", 
      "prov:type": "enviro:data"
    }
  }, 
  "prefix": {
    "blazegraph": "http://purl.translator.org/prov/blazegraph", 
    "clinical": "http://purl.translator.org/prov/clinical", 
    "clinical.med.prescribed": "http://purl.translator.org/prov/clinical.med.prescribed", 
    "default": "http://purl.translator.org/prov/", 
    "enviro": "http://purl.translator.org/prov/enviro", 
    "exposure.pm25-ozone": "http://purl.translator.org/prov/exposure.pm25-ozone", 
    "medbiochem": "http://purl.translator.org/prov/medbiochem"
  }
}

 

Visualization in Zeppelin with Lightning and Bokeh

Motivation

Digital collaborators use RENCI’s Stars data science lab to execute analytics and present vivid results in the tightest loop possible.

And we all know data science is better with pictures.

Also, NCATS Translator has  a hackathon planned in a few weeks. Hackathons work best when there are scientists, developers, lots of good data, and computers. Another helpful ingredient, is plenty of easily available tools for visualization. Ideally, the hacker-scientist team would have a ready library of working visualizations with a clear indication of how to incorporate them with minimal effort. That way, folks stay focused on answering science questions, pick the most fitting visualization from the library, and plug it into their solution.

Approach

So early on, I looked at integrating some classic tools. Matplotlib and Seaborn both work great as demonstrated in an earlier post on Zeppelin. It turns out that D3 and other JavaScript libraries widely used in web development can also be used. But the context switch to JavaScript and away from Python is not a great experience. And the mechanisms for inserting that JavaScript into a notebook are also not straightforward.

So a new set of interfaces is emerging in the visualization world to support dynamic notebooks like Zeppelin. We’ll discuss Lightning and Bokeh.

You can find these examples (if you have a RENCI account) as notebooks under the Tutorials folder in Zeppelin as pictured here:

 

 

 

 

 

 

 

Lightning

The salient features of Lightning making it interesting for our purposes are:

  • Server deployment mode
  • Python interface
  • Minimalist interface
  • Variety of high quality visualizations
  • Strong documentation
  • Polyglot: Python, Scala, JavaScript, R official interfaces

The server deployment mode is good because, while there are libraries to import, we’re essentially burdening the notebook with loading a client for Lightning, rather than all the code to implement every visualization Lightning knows how to do.  Since it has a Python interface, there’s no need to switch to JavaScript to get a good visualization. The user’s job is to get the data in the the appropriate format, call the REST API, and render the result.

Now, we do still have a few details and particulars that must be handled to get a visualization rendered. These include the URL of the RENCI Lightning server and the mechanism for inserting the visualization into the note. To simplify usage models like the hackathon, we now have a StarsLightning class that abstracts these details away. See the Lightning tutorial for the code. The sample instantiates that class as “starlight”. Here’s a usage example of starlight.

Note that the first set of lines create random data.

 

 

 

 

 

 

 

The final line calls starlight.render(). It passes a visualization object. That object is the result of calling starlight.service.adjacency.

So starlight.service is a Lightning object. The output of the note above is:

From there, the note renders various visualizations:

Here’s a force directed graph. In this case, there’s more sophisticated, non-random data. This is to bridge the gap between the example code in the documentation and a real world graph.

And for our last Lightning visualization, we’ve rendered a scatterplot, again, using slightly more sophisticated data than the Python example in the documentation for a more real-world example.

Lightning visualizations tend to render fairly rapidly, usually less than five seconds.

Bokeh

Bokeh is another sophisticated visualization library. While both Lightning and Bokeh have substantial overlap, we’ll look here at two visualizations that Bokeh excels at. One thing to note is that we’re using Bokeh in an “in-browser” mode. While it has a variety of deployment models, including a server deployment mode, this one was the shortest path. One consequence might be that visualizations take longer than with our Lightning deployment.

First, there’s the Choropleth. As with Lightning, we’ve created a helper class. You can find (and reuse) the code at the top of the /Tutorial/Visualization/Bokeh note. It’s also in the appendix at the end of this post. With that installed, we can do this after setting up the choropleth:

starsbokeh.render (p, title="Choropleth Example")

Bokeh’s scatter plot is also worth a look. Here, we show the color scatter plot example from the documentation with the alteration that we use the “starsbokeh” object to render it.

Next

Lightning and Bokeh are both mature, capable visualization libraries. Both provide

  • An extensive array of visualization modalities
  • Python interfaces (among others)

Lightning, as it is deployed within Stars, offers a lower latency path to visualization. Bokeh, however, provides at least the Choropleth and color scatter plot visualizations that Lightning does not appear to have.

So the real next step is to use this stuff at the upcoming NCATS Translator hackathon. Beyond that, though, we expect to use these capabilities in many projects. And we expect to add capabilities as interesting ones arise.

Are there other visualization systems we should have a look at or capabilities that would be useful to add?

Appendix A

For folks who don’t have a RENCI account but would like to give the Lightning and Bokeh Zeppelin helpers a try:

Lightning

from lightning import Lightning

class StarsLightning (object):
    def __init__(self, host="https://stars-lightning.renci.org"):
        self.host = host
        self.service = Lightning(host=self.host)
    def render (self, vis, width=400, height=300):
        print ('\n%html <iframe src="{0}/visualizations/{1}/iframe" width="{2}" height="{3}" >'.format (self.host, vis.id, width, height))
        
starlight = StarsLightning ()

Bokeh


import os
import uuid
from bokeh.plotting import figure, output_file, show
class StarsBokeh(object):
    def __init__(self, data_dir="/<data-path>/bokeh"):
        self.data_dir = data_dir
    def output_file (self, file_name, title="Title"):
        output_file (os.path.join (self.data_dir, file_name), title=title)
    def show (self, p, file_name):
        show (p)
        path = os.path.join (self.data_dir, file_name)
        print ("\n%html {0}".format (open(path).read ()))
    def render (self, p, title="Title"):
        out_file = "vis-{0}".format (uuid.uuid1())
        print (out_file)
        self.output_file (out_file, title=title)
        print ("wrote")
        self.show (p, out_file)
starsbokeh = StarsBokeh ()

 

Zeppelin Alpha

 

Here’s a presentation on Zeppelin I gave at RENCI today. And here’s the PDF version. Please bear in mind that animated slides will look weird in PDF, so this is strictly provided as an alternate avenue for folks who don’t have PowerPoint.

If you are a RENCI person, you can try Zeppelin here. Remember to prefix your user name with “ad\”.

If you’re not, you can request an account here.

This is alpha. Feel free to leave a comment with issues, comments, or questions.

Microservice Continuous Deployment

CICD

RENCI Stars provides, among other services, a continuous integration and continuous deployment (CICD) environment. Continuous integration aims to verify that the system under construction is operating within intended parameters. The RENCI development community has used continuous integration extensively for over six years. Our new environment will also emphasize continuous deployment. CD systematizes and automates the deployment of software to a publicly visible production system. In the past, both of these activities have been largely manual, ad hoc, and frequently neglected with adverse consequences. Automating them frees developers to concentrate on features rather than on the system’s rote operational aspects.

Case Study : NCATS Translator Microservices

NCATS Translator is developing a set of microservices. In particular, Green Team will be exposing three public Swagger/smartAPI endpoints:

  1. Exposures: Environmental exposures at geographic coordinates.
  2. Clinical: Analyze clinical patient data.
  3. Pharmacogenomic: Navigate semantic web of drugs, genes, and diseases.

In this post, we’ll show a path to achieving a high degree of DevOps operational automation.

Stars CICD: Jenkins

Stars uses Jenkins as a CICD engine. We’ve configured Jenkins to use RENCI’s Active Directory authentication. The NCATS Translator continuous integration and deployment projects are:

jenkinsoverview

Microservice CD

The Translator endpoints will be Python 3 Flask applications and they are stored in GitHub repositories. We’ve created a project called ExposuresAPI_CD to begin prototyping the ultimate continuous deployment model all of the Translator microservices will use.

Ultimately, we want to

  • Check: Check if there are changes in GitHub
  • Get: Fetch the changes
  • Test: Run automated tests on the new code
  • Deploy: If tests pass, deploy to production

We don’t have tests yet, but the other three things, we can do now:

cd-0

Here we see if there are differences with the master. Since the code being used is checked out only for CD, we can be reasonably confident that no one’s editing files there. So if there are no differences, there’s nothing to test and deploy.

If there are differences get the code with a git pull.

Then we use curl to send a message to Marathon to restart the ncats-exposures-api application.

Finally, we request application status about the ncats-exposures-api (the name of the Microservice in Marathon). The JSON below shows that three instances of the service have been restarted along with a great deal of other information.

{
    "app": {
        "acceptedResourceRoles": null,
        "args": null,
        "backoffFactor": 1.15,
        "backoffSeconds": 1,
        "cmd": "su - evryscope -c \"/projects/stars/app/exposures-api/centos7-start.sh\"",
        "constraints": [],
        "container": null,
        "cpus": 0.5,
        "dependencies": [],
        "deployments": [],
        "disk": 0.0,
        "env": {},
        "executor": "",
        "healthChecks": [],
        "id": "/ncats-exposures-api",
        "instances": 3,
        "labels": {},
        "lastTaskFailure": {
            "appId": "/ncats-exposures-api",
            "host": "stars-c7.edc.renci.org",
            "message": "Command exited with status 126",
            "state": "TASK_FAILED",
            "taskId": "ncats-exposures-api.e24a0ad7-fbbe-11e6-9eb8-005056a2378c",
            "timestamp": "2017-02-26T00:59:54.379Z",
            "version": "2017-02-26T00:58:20.612Z"
        },
        "maxLaunchDelaySeconds": 3600,
        "mem": 16.0,
        "ports": [
            10001
        ],
        "requirePorts": false,
        "storeUrls": [],
        "tasks": [
            {
                "appId": "/ncats-exposures-api",
                "host": "stars-c7.edc.renci.org",
                "id": "ncats-exposures-api.f251944e-006c-11e7-9eb8-005056a2378c",
                "ports": [
                    31270
                ],
                "stagedAt": "2017-03-03T23:55:57.999Z",
                "startedAt": "2017-03-03T23:55:58.339Z",
                "version": "2017-03-03T23:55:57.448Z"
            },
            {
                "appId": "/ncats-exposures-api",
                "host": "stars-c6.edc.renci.org",
                "id": "ncats-exposures-api.f71c3bc1-006c-11e7-9eb8-005056a2378c",
                "ports": [
                    31579
                ],
                "stagedAt": "2017-03-03T23:56:06.039Z",
                "startedAt": "2017-03-03T23:56:06.431Z",
                "version": "2017-03-03T23:55:57.448Z"
            },
            {
                "appId": "/ncats-exposures-api",
                "host": "stars-c5.edc.renci.org",
                "id": "ncats-exposures-api.f38420df-006c-11e7-9eb8-005056a2378c",
                "ports": [
                    31933
                ],
                "stagedAt": "2017-03-03T23:56:00.009Z",
                "startedAt": "2017-03-03T23:56:00.336Z",
                "version": "2017-03-03T23:55:57.448Z"
            }
        ],
        "tasksHealthy": 0,
        "tasksRunning": 3,
        "tasksStaged": 0,
        "tasksUnhealthy": 0,
        "upgradeStrategy": {
            "maximumOverCapacity": 1.0,
            "minimumHealthCapacity": 1.0
        },
        "uris": [],
        "user": null,
        "version": "2017-03-03T23:55:57.448Z"
    }
}

Next

For this approach to mature, we’ll need:

  • Tests: Automated tests need to gate deployment.
  • SCM: Vet the process for checking for changes.

 

NCATS Translator Exposures API Microservice

The upshot of this relatively technical post is that we’ve just deployed the first incarnation of the NCATS Translator Green Team’s Environmental Exposures API.  If you’re not terribly interested in how that works, this is a good place to stop reading.

NCATS Translator’s Green team is charged with producing an application programmer interface (API) for querying information about environmental exposures. Adhering to the current Translator blueprint, our partners at RTI used the smartAPI protocol to develop the endpoint. Here, we’ll describe technical steps taken to deploy the service on Stars in part to provide an overview of how Stars manages microservices.

Capabilities

Mesos’ basic capability is the sharing and management of computing resources in a data center across a variety of work load types. Marathon is a Mesos framework that manages fault tolerance, resource allocation, and distribution of long running services in a Mesos setting. When a service instance fails, Marathon restarts it elsewhere. It uses Linux containers to constrain resource utilization and enforce isolation between the service and adjacent processes. This fosters a more robust experience than manually administered single instance implementations.

Environment

Running a microservice on Stars entails:

  • Getting the code onto the cluster at a standard location
  • Get the service working on one of the worker nodes
  • Apply any necessary environment changes globally
  • Configure Marathon to execute the service
  • Verify the application starts successfully in Mesos
  • Configure the orchestration service to load balance it
  • Configure the web server to serve the application

Specific to this service, we:

  • Installed Python3 across cluster worker nodes
  • Patched Centos7’s broken Python implementation
  • Developed a script to apply that fix and start the application
  • Created a Marathon task to manage it
  • Added a new virtual host for additional APIs
  • Updated orchestration to load balance the instances

In general, we’d like our services to be restarted if they fail, be constrained within specific resource consumption parameters, and execute in security isolation from each other. Marathon and Mesos give us these capabilities. Here’s the process in a nutshell:

  • Create:
    • Create a Marathon app pointing at our microservice start script.
    • It constrains resources the app has access to

marathon-app-config

 

  • Scale: Then we start the app by scaling it to three instances.
    • Marathon requests resources from Mesos to do this.

marathon-instances

  • Orchestration: Instances of a service must be load balanced to a unified external endpoint.
    • Marathon notifies an orchestration service of new instances
    • Orchestration dynamically reconfigures HAProxy
    • A public reverse proxy provides a public service endpoint
    • Marathon shows the status of running instances:marathon-apps
    • HAProxy shows connection status for proxied instances:

haproxy

 

 

  • Management: Developers need access to logs for troubleshooting.
    • Mesos shows where instances run and provides access to logsmesos-instances
    • Selecting the sandbox for an instance shows stdout and stderr

app-output

Centos 7 and Python3

For the purposes of this API, we learned a lot about issues with Python3 on Centos7. The TL;DR version is that this script starts the exposures API:

#!/bin/bash

set -x
set -e

cd /projects/stars/app/exposures-api

if [ ! -f venv/bin/activate ]; then
    python3 -m venv --without-pip venv
    source venv/bin/activate
    curl https://bootstrap.pypa.io/get-pip.py | python
    deactivate
fi

./start-server.sh

Next Steps

Our main next step for this infrastructure is to upgrade the versions of Mesos and Marathon we’re using. That effort is under way.

Secondarily, we’ll be integrating our Jenkins environment to:

  • Continuously deploy services like the exposures API
  • Run builds on the Mesos infrastructure

Third, once we move to the latest Mesos and Marathon version, we’ll make more extensive use of Docker as our container environment.

 

Interactive Analytics

Data Science Notebooks

We’ve shown a few ways RENCI’s Stars cluster can be applied to sizable data sets. But running any of these systems involves logging in to a relatively low level interface. We SSH into the cluster and run scripts. And there’s nothing wrong with that.

But recently, data science has seen the rise of interactive, graphical notebooks. Many of their features are not novel with respect to functionality available from the shell. But others are. And taken together, they create powerful, collaborative development environments. The two most popular and robust notebook environments are Jupyter and Zeppelin. Notebooks typically provide:

  • Interactivity: Supports interpreted languages. Results are incremental.
  • Sharing: Share notebooks among authenticated, authorized collaborators.
  • Expressivity: Integration with a variety of programming languages.
  • Scale: Integration with cluster technologies like Spark and Mesos.
  • Polyglottic UX: Combine multiple languages in the same notebook.
  • Publishing: Notes can be published and reused within other applications.

While comparisons of Jupyter and Zeppelin exist, the pace of development renders many of them outdated and unhelpful. At the time of this writing, the main differentiators I’m aware of between these two strong offerings, are:

  • Zeppelin
    • Supports multiple programming languages in a notebook.
    • Has a more modern user interface.
    • Has better support for publishing notes to external systems.
    • Some say, has better support for cluster computing environments.
    • Is an Apache project well integrated with the Stars stack.

That said, this is a highly dynamic space and these are only the most mature of many rapidly evolving contenders. So we’ve installed Zeppelin on Stars:

login

Zeppelin

Zeppelin’s welcome page shows the user’s list of notebooks.

splash

Notebooks are lists of notes where each note is prefixed by a tag specifying the programming language used in interpreting the text. In our first notebook, we’ll be using a combination of Markdown and Python with Spark integration, or PySpark.

So our first note uses Markdown, a lightweight syntax for creating HTML documents, to introduce our notebook and its goals. Here’s the output of executing the markdown:

markdown

Next, we import some libraries and define a function for querying our graph database:

queries-1

The %pyspark tag tells Zeppelin to connect to our Apache Spark cluster. Zeppelin’s architecture supports numerous back end interpreters. In the case of Spark, we specify the URL of our Mesos instance as the Spark master. This results in Zeppelin launching the Spark interpreter as a framework within Mesos. Given this arrangement, we can configure Zeppelin’s Spark interpreter to to specify resource parameters including cores, memory, and additional Spark packages to load into the interpreter. The result of our current configuration is a Zeppelin framework registered with three cores and 1.4GB of RAM as a persistent back end which will process execution of all PySpark notes in our notebook. Here’s how this is depicted in the Mesos user interface:

mesosframeworks

We then create a SPARQL connection object and a function for querying Blazegraph. Next, we define a SPARQL query template, substitute values into it, call it, and display results:

queries-2

We begin our second query with some explanatory markdown:

query2-1

And follow it with another PySpark block:

query2-2

Executing this renders a histogram immediately below the note:

graphic

 

Publishing

There are a few interesting implications of this model as it relates to integration with external web applications. Given the notebook described above, we can click on the menu for a note and see the following options:

notemenu

The highlighted “Link this paragraph” option displays a URL which can be embedded in external HTML documents to render the output of this note. The process is trivial and involves creating an IFrame with a source attribute pointing to the link. This enables a remarkably clean separation of labor between data scientists and the display technologies and environments that need to dynamically display results of their work. Notes from a Zeppelin notebook become reusable analytic components.

GitHub Integration

One potential drawback of the notebook approach to development is that the source code, unlike the text of conventional programming languages, is really only useful inside the notebook environment. It is stored as a JSON object with lots of escape characters and metadata that render it largely unreadable.

That said, we can still version control it. To do this, we clone a GitHub repository to the machine running Zeppelin. Then we configure a GitHub storage location path within Zeppelin:

gitstorage

From there, we can commit and push changes as with any other GitHub repository. This notebook is stored in the NCATS Translator Green Team’s repo (requires authorization).

Next Steps

We’re in the midst of adding authentication via Active Directory to the Stars Zeppelin instance. This will allow collaborators to login with their RENCI credentials.

The examples above don’t yet make use of the Spark cluster but we expect to integrate it into our workflows.

Zepplin also has support for AngularJS and the ability to render visualizations via D3 and similar libraries.

 

 

Semantic Similarity on Stars

Introduction

Semantic similarity is the notion that the relatedness of two words in a text corpus can be measured. This notion of relatedness can be represented along a large number of dimensions to create a high dimensional matrix of words and their connections to each other.This model, can in turn, be used to make assessments of the semantics of terms within some domain. We’re interested in building semantic proximity models for the medical literature at multiple time scales. Here, we’ll show a Python based approach to doing this in parallel on the Stars cluster.

The Data

First, we downloaded over a million publicly available journal articles from PubMed Central. These were pre-processed to extract the article’s text and other metadata into a JSON file format where one JSON file corresponds to one of the original XML formatted journal articles. So our starting point is a collection of a little over a million files, each containing one JSON document with data from a single journal article.

Approach

The semantic similarity models we’re targeting are known as word embedding models and are perhaps most recognizably embodied by Word2Vec. We use a Python implementation of Word2Vec that’s part of the Gensim machine learning package.

To run the code in parallel, we use Apache Spark, part of the RENCI data team’s Star’s cluster. So we need to:

  • Select the PubMed Central data along various timeframes
  • Present that data to Spark
  • Execute Word2Vec in parallel

The goals of the component for selecting articles has at least these objectives:

  • Scalability: It’s not feasible to load all documents into memory. Fortunately, Gensim provides a streaming API for processing data incrementally. So we’ll want to use that.
  • Flexibility: We want to specify multiple timeframes for journal articles. The component should provide a framework for doing that.

First we define a class with the necessary methods to conform to Gensim’s streaming API. An abbreviated version of the implementation is shown below. Full source is here.


class ArticleSentenceGenerator(object):
    tokenizer = TreebankWordTokenizer() 
    ...
    def __iter__(self):
        for file_name in self.files:
            if self.match (file_name):
                base = "{0}.json".format (os.path.basename (file_name))
                article_path = os.path.join(self.input_dir, base)
                article = SUtil.get_article (article_path)
                if article is not None:
                    sentence_tokens = [ self.tokenizer.tokenize(s) for s in sent_tokenize (article.raw) ]
                    sentences = [ self.gene_syn.make_canonical (s) for s in sentence_tokens ]
                    for s in sentences:
                        yield s
    def match (self, article):
        return True

This base class applies a generic match function to each file. If the file matches, we load the articles full raw text and tokenize it using an NLTK tokenizer. We then turn each term into a canonical form by using a list of medical terminology synonyms.

Next, we present this data to Spark. The steps are:

  • Generate a set of timeframes: years and months
  • Permute those to generate a list of tuples that include:
    • A time range over which data should be collected
    • A mode that is word or bigram
    • A month or year
  • Exclude parameters for which a model has already been generated
  • Parallelize the list into Spark and apply the generate model function.

    years   = [ y for y in range (1900, datetime.datetime.now().year + 1) ]
    months  = [ ( y, m ) for y in years for m in range (1, 12 + 1) ]
    composite_elements = [ ( "year",       "word",   m ) for m in months  ] + \
                         [ ( "2year",      "word",   m ) for m in months  ] + \
                         [ ( "3year",      "word",   m ) for m in months  ] + \
                         [ ( "month",      "word",   m ) for m in months  ] + \
                         [ ( "2month",     "word",   m ) for m in months  ] + \
                         [ ( "cumulative", "word",   y ) for y in years   ]

    ''' Extend to include permutations with bigram phrase models. '''
    composite_elements = composite_elements + map (lambda t : (t[0], "bigram", t[2]), composite_elements)

    ''' Remove any parameters corresponding to existing files. '''
    composite = filter (lambda e : not os.path.exists (get_file_name (out_dir, e[0], e[1], e[2])),
                        composite_elements)

    if len(composite) == 0:
        print ("Models corresponding to all parameters already exist at {0}".format (out_dir))
    else:
        print ("Generating {0} w2v models.".format (len(composite)))

        '''
        Shuffle parameters to avoid queus of long running models on one executor.
        Distribute all randomized parameters across all partitions. Invoke generate model for each. 
        Dynamically construct an appropriate streaming sentence iterator for each based on its type.
        '''
        random.shuffle (composite)
        model_count = sc.parallelize (composite, numSlices=360). \
                      map (lambda c : generate_model (file_name = get_file_name (out_dir,
                                                                                 model_type=c[0],
                                                                                 mode=c[1],
                                                                                 data=c[2]),
                                                      sentences = sentence_gen [c[0]](in_dir, file_list, c[2], hgnc))). \
                      filter (lambda c : c is not None). \
                      sum ()

Within the call to generate_model(), we call get_file_name() to translate a parameter set into a disk location for the model. This uses a series of transformations to create an instance of a class derived from the ArticleSentenceGenerator class above. Each one has a different method of selecting articles as matching and a notion of time ranges.  These article selection schemes are passed to the generate model function to drive how Gensim selects and processes articles.

The actual code to generate the model is unremarkable and closely follows the Gensim documentation.

Next Steps

We now have a collection of word embedding models on disk. These can be loaded and used by other applications to calculate semantic similarity for terms in the medical literature:

word2vec