Asynchronous Agent Actor Critic (A3C)

I read the interesting article Asynchronous Methods for Deep Reinforcement Learning by the GoogleDeepMind group, and I’d like to share the insights I got from it.

Article

The same group published the Deep Q-learning method to play Atari games with superhuman behavior some time ago.

Reinforcement Learning refresher

Let’s briefly review what reinforcement is, and what problems it tries to solve.

The general idea, is to have an agent that can interact in an environment by performing some action $a$. The effect of performing such action is to receive a reward $r$ and a new state $s’$, so that the cycle continues.

More formally we have:

  • a discrete time indexed by $t$
  • an environment $E$, made by a set of states ${s_t}$
  • a set of actions ${a_t}$
  • policy $\pi: s_t \longrightarrow a_t$, which describes what action should be taken in each state
  • a reward $r_t$ given after each action is performed

At each time stamp $t$, the agent will try to maximize the future discounted reward defined as

where $\gamma \in [0,1]$ is the discount factor.

Please note that the previous sum is in reality a finite sum because we can assume that the episode will terminate at some point. In the case of Atari games, each game will terminate either with a win or loss.

Approaches to Reinforcement Learning

In general there are a few ways that we can use to attack the problem. We can divide them into the following categories:

  • Value based: optimize some value function
  • Policy based: optimize the policy function
  • Model based: model the environment

The problem with modeling the environment is that each different problem (e.g. Atari game), will require a different model. Value and policy based methods are much more powerful (and interesting) because they don’t require any prior knowledge of the world they are trying to model. Note that this does not mean that the same (trained) model will be used with all the games. An agent that can play Breakout, is unable to play Pong and vice versa. The point I’m trying to make is that both of them can be trained using the same techique.

Value based: (Deep) Q-Learning

In this case we are tying to maximize the value function $Q$ defined as:

This function expresses the prediction of the future reward obtained by performing action $a_t$ in state $s_t$. While this is a nice and sensible mathematical definition, it does not give us any practical value, as that expected value is impossible to calculate in practice. There is a closed for for such equation, called the Bellman equation:

Now if we have a small number of states and actions, we could implement $Q$ as a matrix. It is possible to show that a random initialized matrix will converge to $Q$. In case of the Atari games, the number of state is untractable. Each state consists of 4 frames of the game (to give the direction of the ball), where each frame has $84x84$ pixels. Therefore the total number of states is:

and this assuming that each pixel is grayscale only!

Since the problem is very sparse (not all the possible combinations of states are possible), a deep neural network $Q=Q(s, a, w)$ was introduced to solve the problem.

In particular, the loss function is given by

Getting the best policy out of $Q$

Why is $Q$ useful? Given $Q$ is now easy to find the best policy for each state:

First let’s define $Q^*$, the optimal value function as

Which is the best value that we can get out of all possible actions.

For each state $s$ the action that maximizes the reward is given by

To recap, we just showed that if we are able to build $Q$ we will be in turn able to maximse the reward by select the best of all possible action.

Let’s look at the problem from the other point of view now: let’s start with the policy function and see how we can maximize it.

Policy based: Policy gradients

First of all we want to note that a policy can be either:

  • deterministic, $a = \pi(s)$
  • stochastic $\pi(a| s) = \mathbb P [a | s]$

In the first case, the same action is selected each time we are in the state $s$, while in the second case we have a distribution over all the possible states. What we really want is to to change such a distribution such that better actions are more probable and bad ones less likely.

First of all, let us introduce another function called Value function

Let’s spend one minute to fully understand what’s going on here.

Given a (stochastic) policy $\pi$ over all possible actions, $V$ is the expected value of the (discounted) future reward over all the possible actions. Or: what’s the average reward I will obtain in this state given my current policy?

As before, we can have a closed form for $V$, that will greatly simplify our life:

Side note, but important for the future: we can now rewrite $Q$ as:

Advantage function

If $Q$ represent the max value we could get from each state, and $V$ the average value, we can define the advantage function as

which is telling us how good is the action $a$ performed in state $s$ compared with the average. In particular

Policy gradients

Consider the quantity

where $\rho^{s_0}$ is a distribution of starting states.

$J$ is expressing how good a policy function is: it’s the expected reward of a policy weighted by a distribution of starting states.

We want to estimate the gradient of $J$ so we can make good policies (i.e. the ones with higher final reward) more likely.

Let’s now assume we are modeling $\pi$ with a deep neural networks with weights $w$, i.e. $\pi(a|s, w)$. Then the gradient of $J$ w.r.t. $w$ is

Actor critic model

Let’s look a the previous equation:

  • $\nabla_w log \pi(a|s)$ is the actor, because it shows in which direction the (log) probability of taking action $a$ in state $s$ raises
  • $A(s, a)$ is the critic: it’s scalar that tells us what’s the advantage of taking this action in this context (same action in different state will result in different rewards)

The only missing piece of the puzzle is $A%: how can we calculate it? Let’s get back to the definition:

Which means once we have $V$, we know $A$!

In real life the same neural network is approximating both the policy $\pi$ and the value function $V$ to give more stability and faster training. In fact they are trying to model the same underlying environment so a lot of the feature are shared.

Asynchronous

The last word of the title that we haven’t explained is the asynchronous one, but it’s also the easiest.

The idea is not new, and was proposed in the Gorila (General Reinforcement Learning Architecture) framework. It is very simple and effective:

  • have several agents exploring the environment at the same time (each agent owns a copy of the full environment)
  • give different starting policies so that the agents are not correlated
  • after a while update the global state with the contributions of each agent and restart the process

The original Gorila idea was to leverage distributed systems, while in the DeepMind paper multiple cores from the same CPU are used. This comes with the advantage of saving time (and complexity) in moving data around.

Results and conclusions

The results are impressive. By using just a 16 core CPU they are able to beat the state of the art of Deep Q-Learning and speed up training using the async trick.

This is the score for 5 different games as reported to the original paper.

scores

Generate a new job posting à la Hacker News with LSTM and Keras

In this blog, we are going to show how to train a charachter-level deep learning model to hallucinate new jobs posting.

In order to train the model, we need data from the previous Hacker news posting. From now on, we are assuming that such data in contained in a local Elasticsearch index, as descried in this blog post. To gather the data, we only need to perform a ‘match all’ query and grab the text field from the source.

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

body= {
    "query" : {
        "match_all" : {}
    }
}

res = list(helpers.scan(es,  query=body))
all_listings = [d['_source']['text'] for d in res if 'text' in d['_source']]

Note that we used the function scan (found in the helpers module) rather than search because of the potentially large output.

Organizing the text and counting the characters

Since we want to train a character level network, we need create a mapping from character to integers. Training the network on all the character will be way too wasteful (if possible at all), and produce very noisy results. Therefore we are going to perform the following steps:

  • use only one type of apostrophe

  • lowercase all the text

  • define a subset off all the characters (bad_chars) which will contain the least used characters (note that the threshold for that was selected manually after printing counter.most_common())

  • define good_chars (i.e. the characters that will actually go into the model) as the complementary set to bad_chars

  • pick a special start_char, end_char and unknown_char after making sure they don’t belong in the good_char set

  • replace each character form the bad_char listing with the unknown_char one

The reason why we need the start and stop tokens is that we want to be able to generate a full listing (i.e. from beginning to end), so we need to teach the model when a listing starts and ends.

from collections import Counter

joined_listing = "".join(all_listings)
counter = Counter(joined_listing.lower().replace("\"", "'").replace("’", "'"))
chars = set(joined_listing)

bad_chars = [c for c, v in counter.most_common() if v < 2000] + ['—', '•']
good_chars = list(set(counter) - set(bad_chars))

start_char = '\x02'
end_char = '\x03'
unknown_char = '\x04'

# we don't want to pick characters that are already used
assert start_char not in good_chars
assert end_char not in good_chars
assert unknown_char not in good_chars

good_chars.extend([start_char, end_char, unknown_char])

We can now create the mapping from character to index (and vice versa, which will be useful at text generation time).

char_to_int = {ch: i for i, ch in enumerate(good_chars)}
int_to_char = {i: ch for i, ch in enumerate(good_chars)}

Tensorizing the text

It is now time to transform a list of strings (i.e. a list that where each element is a different posting) to a 3 dimensional tensor.

We know that the input has to have shape (num_timestamps, seq_len, num_chars), where seq_len is an arbitrary number. It does represent the length of the sequence learn by the model.

Step size

The number of timestamps (i.e. number of different training sequences) depends on the length of the text and the step we decide to use.

Let us consider an example:

text_example = 'in another moment down went Alice after'.lower()
seq_len = 30
step = 2
divided = []
for i in range(0, len(text_example) - seq_len, step):
    divided.append(text_example[i : i + seq_len + 1])
divided
['in another moment down went ali',
 ' another moment down went alice',
 'nother moment down went alice a',
 'ther moment down went alice aft',
 'er moment down went alice after']

If we now change the step size, we will obtain a different number of sequences:

step = 3
divided = []
for i in range(0, len(text_example) - seq_len, step):
    divided.append(text_example[i : i + seq_len + 1])
divided
['in another moment down went ali',
 'another moment down went alice ',
 'ther moment down went alice aft']

So the smaller the step size, the more sequences we will obtain. Because of memory and time constrains, we are going to use a step size of 3.

Preprocessing the text

For each of the posting, we want to perpend the start_char and append the end_char tokens to it. Then, for each of the characters in the text, we want to replace it with its index. Remember that each char in the bad_chars set will default to the same index.

seq_len = 100
step  = 3

def process(doc):
    doc = start_char + doc.lower() + end_char
    return [
        [to_int_func(z) for z in doc[i:i + seq_len + 1]]
        for i in range(0, len(doc) - seq_len, step)
    ]

def to_int_func(char):
    # checking if it's a good or bad char
    if char not in char_to_int:
        char = unknown_char
    return char_to_int[char]

Also note that process returns a list of list, where each piece of text has length seq_len + 1, one more than expected. This is due to the fact, that we need positive example to train the network (we are doing supervised learning after all). Therefore, we are going to pick all but the last element as the input, and the last one as the desired output. To obtains this, we can just slice a numpy.array which is always a very elegant and concise way.

At this point we need to concatenate the output of the process function when mapped to the list of postings. A flatmap function is what we need. Unfortunately it’s not in the standard library so, we are going to borrow this implementation from StackOverflow :

import itertools
def flatmap(func, *iterable):
    return itertools.chain.from_iterable(map(func, *iterable))

We can finally process the text:

import numpy as np

transofmed = np.array(list(flatmap(process, all_listings)))
transofmed.shape
(5926775, 101)

Each line contains a sequence of length 101. Remember that the first 100 elements represent the feature vector, while the last one is the desired output. Since we have used np.array is now very easy to separate them.

x_ = transofmed[:, :seq_len]
y_ = transofmed[:, seq_len]

Again, each line of x_ contains the indexes of the characters contained in the sequence, and y_ the corresponding output.

To match the input dimensions, we are going to use a one-hot encoding for our data.

X = np.zeros((len(x_), seq_len, len(good_chars)), dtype=np.bool)
Y = np.zeros((len(y_), len(good_chars)), dtype=np.bool)

for time, sentence in enumerate(x_):
    for index, char_index in enumerate(sentence):
        X[time, index, char_index] = 1
    Y[time, y_[time]] = 1
X.shape, Y.shape
((5926775, 100, 63), (5926775, 63))

Defining the Deep Learning model

Now that our data is ready, we can just define the model in Keras and start training:

import random
import numpy as np
from keras.models import Sequential
from keras.layers.recurrent import LSTM
from keras.layers.core import Dense, Activation, Dropout

model = Sequential()
model.add(LSTM(512, return_sequences=True, input_shape=(seq_len, len(good_chars))))
model.add(Dropout(0.4))
model.add(LSTM(512, return_sequences=False))
model.add(Dropout(0.4))
model.add(Dense(len(good_chars)))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')

This is the beauty of Keras, we used only 8 lines to build our model in a Lego like fashion.

The model we used here is 2 layers of LSTM with dropout (with probability 0.4 each). The last layer is a dense layer of dimension len(good_char), one for each character (if you remember we used a the same character for a bunch of noisy elements). Note that we didn’t use any embedding layer, because we did the vectorizing manually.

Let’s start training and save and save the model each epoch:

epochs = 100
batch_size = 128
for epoch in epochs:
  model.fit(X, Y, batch_size=batch_size, nb_epoch=1)
  file_name = '{}.hdf5'.format(epoch)
  model.save(file_name)

Time to wait for the model to be trained. Please come back later for a new blog with some (hopefully interesting) examples of hallucinated job posting.

How to continuously update the Elasticsearch index

We saw earlier how to create an index containing the posts from the Hacker News who’s hiring thread.

Since during the course of the month new posts (thus new jobs) are added, we want to update the script so that it will add only the new posting without overwriting the ones that are already there.

The first step is again to create query the Hacker News API, to see what posts are currently online, just as before.

thread = 12627852

currents_posts = fetch_hn_data(thread)['kids']

For the definition of fetch_hn_data pleae refer to the previous post or the corresponding GitHub repo. The function is doing what you would expect, with some text cleaning on top.

Now we need to figure out what items are already in our index. The first thing that comes to mind, is to do a query that will return every element of the index, i.e.

{
    'query': {
        "match_all" : {}
    }
}

While this would technically work (and for now there would be no difference since we have only indexed the posts from one month), we want to do better.

Each child post has a parent filed, so we can just impose the extra condition in the query

{
    'query': {
        'term': {"parent": thread}
    }
}

Instead of querying the index via the usual search method, we want to use the scroll API, which handles bigger requests better. The python bindings are really easy to use.

query = {
    'query': {
        'term': {"parent": thread}
    }
}

older_posts_gen = helpers.scan(es, query)

As for now, older_posts_gen is a generator where each item is a dict that contains the metadata and the full data for each the posts. Since we only need the post id (stored as the _id metadata value), we can suppress the full data in the response (corresponding to the _source value in the metadata).

query = {
    "_source": False,
    'query': {
        'term': {"parent": thread}
    }
}

older_posts_gen = helpers.scan(es, query)

Let’s now grab only the ids.

old_posts_ids = {int(item['_id']) for item in older_posts_gen}

We can now find which are the new posts, and index only those

new_posts_ids = set(currents_posts) - old_posts_ids
if new_posts_ids:
    print("There are {} new posts!".format(len(new_posts_ids)))
    actions = [format_data_for_action(r) for r in new_posts_ids]
    list(parallel_bulk(es, actions))

Note that using a the parallel_bulk here is an overkill, but we want to maximize code reuse.

Let’s put this code in function. Given a thread id, the function will index all the posts that are new for the index, and not deleted from the website.

def update_thread(thread, es=None):
    """Put into the index new child posts"""

    if not es:
        es = Elasticsearch()
    currents_posts = fetch_hn_data(thread)['kids']
    query = {
        "_source": False,
        'query': {
            'term': {"parent": thread}
        }
    }
    older_posts_gen = helpers.scan(es, query)
    old_posts_ids = {int(item['_id']) for item in older_posts_gen}
    new_posts_ids = set(currents_posts) - old_posts_ids
    if new_posts_ids:
        print("There are {} new posts!".format(len(new_posts_ids)))
        actions = [format_data_for_action(r) for r in new_posts_ids if format_data_for_action(r)]
        list(parallel_bulk(es, actions))

What month is it?

Month

There is one last piece of information we want to retreive: the thread id of the latest who’s hiring thread. Luckly such postings are done by the same user (whoshiring), and the API let’s us perform a query per user.

From the bot activity you can see that we need to retreive the latest 3 posts, and find the correct one among those.

First of all, we need a more generic version of the fetch data function.

def fetch_hn_data(thread, type_='item'):
    url = "https://hacker-news.firebaseio.com/v0/{}/{}.json".format(type_, thread)
    r = requests.get(url)
    assert r.status_code == 200
    data = r.json()
    if type == 'item':
        data = clean_text(data)
    return data

Then we can find the correct thread:

def find_hiring_thread():
    all_posts = fetch_hn_data('whoishiring', type_='user')['submitted']
    last_three_posts = sorted(all_posts)[-3:]
    for post in last_three_posts:
        data = fetch_hn_data(post)
        if 'is hiring?' in fetch_hn_data(post)['title']:
            break
    return post

We can just complete the scirpt with

if __name__ == '__main__':
    es = Elasticsearch()
    assert es.ping(), "Elasticsearch not started properly"
    if not es.indices.exists(index='hn'):
        print('The index does not extis, creating one')
        es.indices.create(index='hn', body=mappings)
    print('Looking for the latests hiring thread...')
    thread = find_hiring_thread()
    print('updating index...')

And make sure it run one the computer is started. We won’t give explicit instrucions about it, because this is system depepndet, and there are plenty of resources onoine for that. Also if you never turn off your computer you might want to set up a cron job to run this reguraly.

Please remember that the code is available in this GitHub repo.

How to combine boolean queries in Elasticsearch

In a previous post we saw how to use Elasticsearch to search for our dream job among the ones posted on hacker news.

Since Elasticsearch queries are basically JSON it’s really easy to lose track when we start nesting them.

In this post we are going to define a Python class that will create the required query (read: JSON) on demand.

Anatomy of an Elasticsearch query

A query in Elasticsearch in its general form is:

{
   'query': {
        'bool': {
            'must': [ ... ]
            'should': [ ... ]
            'must_not': [ ... ]
        }
    }
}

Where must, should and must_not correspond to the AND, OR and AND NOT operators respectively.

To use an earlier example, the following query will require both ‘san francisco’ and ‘machine learning’ to be present in the text field.

{
    'query': {
        'bool': {
            'must': [
                {'match': {'text': 'san francisco'}},
                {'match': {'text': 'machine learning'}}
            ]
        }
    }
}

So far so good. But what if we want to nest a boolean query inside another boolean query?

Deeper

For example, what if we want to find something that matches (‘san franisco’ OR ‘bay area’) AND (‘machine learning’ OR ‘data analysis’)?

The resulting query will be something like this:

{
    'query': {
        'bool': {
            'must': [
                'bool': {
                    'should': [
                        {'match': {'text': 'san francisco'}},
                        {'match': {'text': 'bay area'}}
                    ]
                },
                'bool': {
                    'should': [
                        {'match': {'text': 'machine learning'}},
                        {'match': {'text': 'data analysis'}}
                    ]
                }
            ]
        }
    }
}

You can see how it’s easy to get lost here.

Python to the rescue!

We want to represent the queries in a more human friendly way.

Before start coding, let’s design the class. This will save us so much time afterwards. What are the requirements, i.e. what is the expected behavior of our class (and its methods)?

  • it should have arguments that correspond to the three boolean operators

  • each operator can accept one or more term

  • we need to be able to specify which field we want to search in

  • each term can be another query

Let’s write down some obvious use cases.

# easy case: one field, one term per field
query = Query(must={'text': 'san francisco'}, should_not={'text': 'new york'})

# one field, more terms
query_2 = Query(should={'text': ['san francisco', 'bay area']})

# multiple fields, multiple terms
query_3 = Query(must={'text': ['san francisco', 'bay area'], 'title': 'hiring'})

# query in a query
inner = query_2
query_outer = Query(must={'query': inner, 'title': 'hiring'})

Most important of all, our class needs to have a to_elasticsearch method to produce the desired json on demand.

Let’s start coding something that would work for the first query, and then let’s improve on that.

Since we are in the easy case, we can assume that the value passed are dicts. In order to build the ES query, we then need to figure out which arguments have been passed (i.e. are not None) and put them in the query.

class Query:

    def __init__(self, must=None, should=None, should_not=None):
        self.must = must
        self.should = should
        self.should_not = should_not

    def to_elasticsearch(self):
        names = ['must', 'should', 'should_not']
        values = [self.must, self.should, self.should_not]
        query = {name: value for name, value in zip(names, values) if value}
        query =  {
            'query': {
                'bool': query
            }
        }

    return query
query = Query(must={'text': 'san francisco'}, should_not={'text': 'new york'})
query.to_elasticsearch()

So far so good. But what if the the same field has multiple values, as in query_2? Our function needs to adapt. For one, we need to use the same key for all the values. So in the example above the key text has to be applied to both 'san francisco' and 'bay area'.

def to_elasticsearch(self):
    query = {}
    names = ['must', 'should', 'should_not']
    values = [self.must, self.should, self.should_not]
    for name, value in zip(names, values):
        if not value:
            continue
        for field_name, field_values in value.items():
            # field_name = 'text', field_values = ['san francisco', 'bay area']
            query[name] = [{'match': {field_name: v}} for v in field_values]

        query =  {
            'query': {
                'bool': query
            }
        }
    return query
Query.to_elasticsearch = to_elasticsearch
query_2 = Query(should={'text': ['san francisco', 'bay area']})
query_2.to_elasticsearch()

Now we got it working for lists. What happens if we try mixed case like

query_3 = Query(must={'text': ['san francisco', 'bay area'], 'title': 'hiring'})

There are 2 things that will break:

  • query[name] = ... will overwrite previous results (in our case the title field will overwirte the text one)

  • for v in field_values part would not behave as expected (e.g. it will unpack a string)

To fix the first problem, we can just make query a defaultdict and extend it. To avoid problems communicating with the Elasticsearch client, we will convert back to a regular dict before returning.

from collections import defaultdict

def to_elasticsearch(self):
    query = defaultdict(list)
    names = ['must', 'should', 'should_not']
    values = [self.must, self.should, self.should_not]
    for name, values in zip(names, values):
        if not values:
            continue
        for field_name, field_values in values.items():
            # field_name = 'text', field_values = ['san francisco', 'bay area']
            query[name].extend([{'match': {field_name: v}} for v in field_values])

        query =  {
            'query': {
                'bool': dict(query)
            }
        }
    return query

Query.to_elasticsearch = to_elasticsearch
query_3 = Query(must={'text': ['san francisco', 'bay area'], 'title': 'hiring'})
query_3.to_elasticsearch()

The most elegant way of solving the second problem, is to transform the input into a standard way. In that way our to_elasticsearch method will be independent of the original input form.

For each (non null) argument, we want to make sure that its values are wrapped in a list.

def __init__(self, must=None, should=None, should_not=None):
    self.must = self.preprocess(must)
    self.should = self.preprocess(should)
    self.should_not = self.preprocess(should_not)

def preprocess(self, field):
    if not field:
        return None
    for key, value in field.items():
        if not isinstance(value, list):
            field[key] = [value]
    return field

Query.__init__ = __init__
Query.preprocess = preprocess

query_3 = Query(must={'text': ['san francisco', 'bay area'], 'title': 'hiring'})
query_3.to_elasticsearch()

Or, in a more compact way:

def preprocess(self, field):
    return {k: v if isinstance(v, list) \
        else [v] for k,v in field.items()} of field \
        else None

Some stuff to note here:

  • if the field is None, we want to keep that way, not wrap it into a list. Returning {} would be fine too

  • [value] is not the same as list(value)

  • query_1 output is now changed (there is an extra list), but it’s still a valid ES query, and it returns the same result.

It’s turtles all the way down

Now the interesting part: we want to combine a query withing a query.

Let’s do this step by step. First of all, we want the method to_elasticsearch to expand any inner queries, if present. Since inner queries are still instsances of Query we can call the same method on them. Therefore, we need to distinguish between a real Query object and just a query term.

The minimal edit to the previous code will be something like this:

def to_elasticsearch(self):
    query = defaultdict(list)
    names = ['must', 'should', 'should_not']
    values = [self.must, self.should, self.should_not]
    for name, values in zip(names, values):
        if not values:
            continue
        for field_name, field_values in values.items():
            # field_name = 'text', field_values = ['san francisco', 'bay area']
            # OR
            # field_name = 'query',  field_values = some isntance of Query
            query[name].extend([
                    v.to_elasticsearch() if isinstance(v, Query)
                    else {'match': {field_name: v}}
                    for v in field_values])

        query =  {
            'query': {
                'bool': dict(query)
            }
        }
    return query

Just to reiterate: everything is the same as before, except we now check if we encounter another instance of Query. If that’s the case, the instance itself will take care of transforming its portion of the query, which might cause another instsance to be found…

Also note that if there is another instance of Query, we just ignore the field_name variable. This means that you could pass the internal query as {'foo': query_2}.

Unfortunately this does not work as expected:

inner = Query(must={'text': ['san francisco', 'bay area']})
query_outer = Query(should={'query': inner, 'title': 'hiring'})
query_outer.to_elasticsearch()
>>> {
        'query': {
            'bool': {
                'should': [
                    {'match': {'title': 'hiring'}},
                    { 'query': {
                        'bool': {
                            'must': [
                                {'match': {'text': 'san francisco'}},
                                {'match': {'text': 'bay area'}}
                            ]
                        }
                    }
                }]
            }
        }
    }

Can you spot the mistake? The key query appears twice, so the previous is not a valid es query.

A little refactoring will bring us where we want.

def expand_query(self):
    query = defaultdict(list)
    names = ['must', 'should', 'should_not']
    values = [self.must, self.should, self.should_not]
    for name, values in zip(names, values):
        if not values:
            continue
        for field_name, field_values in values.items():
            # field_name = 'text', field_values = ['san francisco', 'bay area']
            # OR
            # field_name = 'query',  field_values = some isntance of Query
            query[name].extend([
                    v.expand_query() if isinstance(v, Query)
                    else {'match': {field_name: v}}
                    for v in field_values])

    return {'bool': dict(query)}


def to_elasticsearch(self):
    return   {'query': self.expand_query()}

Let’s test it:

Query.expand_query = expand_query
Query.to_elasticsearch = to_elasticsearch
query_outer.to_elasticsearch()
>>> {
        'query': {
            'bool': {
                'should': [
                    {'match': {'title': 'hiring'}},
                    {'bool': {
                        'must': [
                            {'match': {'text': 'san francisco'}},
                            {'match': {'text': 'bay area'}}
                        ]
                    }}
                ]
            }
        }
    }

It worked!

The next step is to use this to increasing the quering power for hackernews jobs index that we build earlier.

How I found a job in San Francisco with Hackernews and Elasticsearch

Recently I have decided to move to San Francisco, and I had only one problem: finding a job.

I knew two things:

  • I didn’t want to apply to one of the big companies

  • I wanted a Machine Learning job

After doing some obvious Google searches, I decided to find a better way, especially considering that anonymous online applicants are rarely considered. (to my surprise, I obtained an onsite interview with a company I applied online.)

What I wanted was a list of companies offering machine learning positions, and possibly a smoother application process.

I decided to use the monthly Hacker News who’s hiring to help me with the search. Usually you can contact somebody who is already working in the team (and not a recruiter), so it’s a better introduction.

At the time of writing, the last of such thread is located here.

Since this thread is now very popular, there are way too many postings to look them one by one, so I needed to filter our the uninteresting ones. What I wanted to keep was

  • a top level thread. This is where usually jobs are posted. Replies are mostly used to ask for additional info

  • a job located in SF or in the Bay Area

  • a job related to my area of interest

Furthermore, some jobs are added in during the course of the month, so I needed a way of keeping track of what was already scraped and what was new.

Finally I wanted an easy way to read trough the jobs, so that I could pick the ones were interesting to me, and move from there.

First of all: the data

Of course the first thing we need is to get the data from the thread. Hacker News conveniently offers an API, that gives all the first level posts (called kids in the response) of a given story. This is exactly what we need.

import requests
from bs4 import BeautifulSoup

thread = 12627852

def clean_text(data):
    try:
        clean_text = BeautifulSoup(data['text'], "html.parser").text
        data['text'] = clean_text
        return data
    except KeyError:
          return {}

def fetch_hn_data(thread):
    url = "https://hacker-news.firebaseio.com/v0/item/{}.json".format(thread)
    r = requests.get(url)
    assert r.status_code == 200
    data = r.json()
    data = clean_text(data)
    return data
data = fetch_hn_data(thread)

The text value from the API response is still in html, which means there is a lot of unnecessary noise. We used BeautifulSoup to obtain the text. We lose paragraph / new line dividers but this doesn’t seem to be a problem for now. The text field is not always present. For example the post might be deleted. If that’s the case, we return an empty dict, for reasons that will be clear in a moment.

In order to be able to search for the right posting (therefore job) we are going to feed all such posts to Elasticsearch, a fully fledged text search engine. All the filtering and querying will happen from via the python Elasticsearch package.

Putting the data into Elasticsearch

Before indexing the documents, we need to create the index with mappings (basically a schema). Elasticsearch in general is smart enough to figure out what type of contant a fields has, but we need to define mappings if we want some sort of special treatment for it. In our case we want the data['time'] to be recognized as a (unix) timestamp.

from elasticsearch import Elasticsearch
# client init
es = Elasticsearch()

mappings = {
    "mappings" : {
        "post" : {
            "properties" : {
                "time" : {
                    "type" : "date",
                    "format" : "strict_date_optional_time||epoch_millis"
                }
            }
        }
    }
}
es.indices.create(index='hn', body=mappings)

Note that we created an index called hn and we claimed (in the mappings dict) that such an index will have a document type called post.

Let’s add our documents now. Note that we are not including the top level thread, because it does not contain jobs.

from elasticsearch.helpers import parallel_bulk

def format_data_for_action(post_id):
    return {
    '_index': 'hn',
    '_type': 'post',
    '_id': post_id,
    '_source': fetch_hn_data(post_id)
}

actions = [format_data_for_action(r) for r in data['kids'] if format_data_for_action(r)]
list(parallel_bulk(es, actions))

Since we have a lot of posting we are using the parallel_bulk helper. The bottleneck is the fetching of the data, because we are sending one request at a time. This is the only case where crawling the web page gives an advantage over the api: all the data are already present in the first response. On the other hand it require way more human time (to figure out how to successfully extract the right elements), so it’s ok to have the machine to wait for us. Maybe one day we could update the fetching functionlaity sing the new Python async and await functionalities (keep in mind that this will put more stress on the server).

If everything worked properly, the index will contain all the published posting and we can start querying it.

Querying the index

Let’s have a sanity check, to make sure we are not off track. Let’s search for San Francisco jobs. I’m not sure how many we should find, but I am expecting more than 10

query = {
    "query" : {
        "bool" : {
            "must" : {
                "match": { 'text' : 'san francisco'}
            }
        }
    }}

response = es.search(index='hn', body=query)
response['hits']['total']
>>> 180

And it worked! This seems a very reasonable result. Note that your mileage may vary, because posts get deleted all / added the time.

So this matches any job with a location in San Francisco. What if we want a San Francisco based job, that contains the ‘machine learning’ keyword?

We just need to add another clause to the match query:

query = {"query" : {
        "bool" : {
            "must" : [{
                "match": {'text' : 'san francisco'}},
                {"match": {'text' : 'machine learning'}}
            ]

        }

    }}

es.search(index='hn', body=query)['hits']['total']
>>> 40

That’s great! Now we have a list of potentially interesting jobs that we can look at.

What’s next?

While this simple method works, it is still too manual to be really useful. Foe example the query above is way too restrictive. In general we want something like (‘san franisco’ OR ‘bay area’) AND (‘machine learning’ OR ‘data analysis’). Those elasticsearch json-based queries can be really nested and it’s easy to get lost in them.

In the next posts we are going to address the following issues:

The code used in this post for this can be found here.