The Problem

Using HDFS commands (e.g., ‘hdfs dfs -put filename path’) can be frustratingly slow, especially when you’re trying to move many files to distinct locations. Each command can take three or so seconds simply to spin-up a JVM process.

Solution: Use HTTPFS

A solution is to use httpfs. Of course, instead of forming relatively simple HDFS commands, we now need to form HTML requests and submit them to an HDFS node running httpfs. We can do this with Curl, and it works perfectly fine, but it’s tedious. So, we’ll instead bury the requests inside a script of our own and reference the methods when needed. I’m using Python with the requests package. One could also use pycurl if you desire greater control at the expense of pain and suffering.

Plus, we can do awesome things previously not possible with HDFS, such as appending files (with other files or with strings) and reading directly into memory. So, for instance, we can load a JSON file from HDFS directly into Python as a dict. *head explodes*

A caveat: I haven’t included the ‘rm’ command yet. In a lot of Hadoop environments where safety is an issue (as where I’m testing this), I don’t want easy access to blowing data away. (You can figure it out from the other commands and an httpfs reference.)

Here’s the script, which is a bit long, followed by some usage examples.

github: httpfs_utils

#!/usr/bin/python2
 
# httpfs_utils.py
#
# Provides HDFS access via httpfs using Python's requests package.
 
 
import datetime
import requests
try:
    import simplejson as json
except ImportError:
    import json
 
 
###################################################################################################
# Helper functions                                                                                #
###################################################################################################
 
def _get_max_str_len_(filestatuses, key):
    """ Returns the max string value length for a list of dictionaries with 'field' as key.
 
    This is used to pretty print directory listings.
 
    INPUT
    -----
    filestatus : list of dicts
        The FileStatuses dictionary returned by the liststatus method.
    key : str
        The key for which we wish to find the maximum length value.
 
    OUTPUT
    ------
    int : The length of the longest value.
    """
    return max([len(str(B[key])) for B in filestatuses['FileStatuses']['FileStatus']])
 
 
def _perm_long_str_(type_str, perm_str):
    """ Forms the long string version of the permission string.
 
    INPUT
    -----
    type_str : str
        The type of object as given by list, e.g., 'FILE' or 'DIRECTORY'.
    perm_str : str
        The short form (numeric) version of the permission string.
 
    OUTPUT
    ------
    str : The long form version of the permission string.
    """
    # Determine if a directory is represented.
    if type_str == 'DIRECTORY':
        perm_str_long = 'd'
    else:
        perm_str_long = '-'
    # Convert the permission string to long letter form.
    for n in perm_str:
        L = [int(i) for i in list(bin(int(n)).split('0b')[1].zfill(3))]
        if L[0]:
            perm_str_long += 'r'
        else:
            perm_str_long += '-'
        if L[1]:
            perm_str_long += 'w'
        else:
            perm_str_long += '-'
        if L[2]:
            perm_str_long += 'x'
        else:
            perm_str_long += '-'
 
    return perm_str_long
 
 
def make_httpfs_url(host, user, hdfs_path, op, port=14000):
    """ Forms the URL for httpfs requests.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    op : str
        The httpfs operation string. E.g., 'GETFILESTATUS'.
    port : int
        The port to use for httpfs connections.
 
    OUTPUT
    ------
    str : The string to use for an HTTP request to httpfs.
    """
    url = 'http://' + user + '@' + host + ':' + str(port) + '/webhdfs/v1'
    url += hdfs_path + '?user.name=' + user + '&op=' + op
 
    return url
 
 
###################################################################################################
# Functions                                                                                       #
###################################################################################################
 
def append(host, user, hdfs_path, filename, port=14000):
    """ Appends contents of 'filename' to 'hdfs_path' on 'user'@'host':'port'.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file to be appended to in HDFS.
    filename : str
        The file with contents being appended to hdfs_path. Can be a local file or a full path.
    port : int : default=14000
        The port to use for httpfs connections.
    """
    # Form the URL.
    url = make_httpfs_url(
        host=host,
        user=user,
        hdfs_path=hdfs_path,
        op='APPEND&data=true',
        port=port
    )
    headers = {
        'Content-Type':'application/octet-stream'
    }
 
    resp = requests.post(url, data=open(filename,'rb'), headers=headers)
    if resp.status_code != 200:
        resp.raise_for_status
 
 
def appends(host, user, hdfs_path, content, port=14000):
    """ Appends 'content' to 'hdfs_path' on 'user'@'host':'port'.
 
    This method is like 'append', but takes a string as input instead of a file name.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file to be appended to in HDFS.
    content : str
        The contents being appended to hdfs_path.
    port : int : default=14000
        The port to use for httpfs connections.
    """
    # Form the URL.
    url = make_httpfs_url(
        host=host,
        user=user,
        hdfs_path=hdfs_path,
        op='APPEND&data=true',
        port=port
    )
    headers = {
        'Content-Type':'application/octet-stream'
    }
 
    resp = requests.post(url, data=content, headers=headers)
    if resp.status_code != 200:
        resp.raise_for_status
 
 
def copy_to_local(host, user, hdfs_path, filename, port=14000):
    """ Copies the file at 'hdfs_path' on 'user'@'host':'port' to 'filename' locally.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file in HDFS.
    port : int : default=14000
        The port to use for httpfs connections.
    perms : str or int : default=775
        The permissions to use for the uploaded file in HDFS.
    """
    # Form the URL.
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op='OPEN', port=port)
 
    # Form and issue the request.
    resp = requests.get(url, stream=True)
 
    if resp.status_code == 200:
        with open(filename, 'wb') as f_p:
            for chunk in resp:
                f_p.write(chunk)
    else:
        resp.raise_for_status
 
 
def exists(host, user, hdfs_path, port=14000):
    """ Returns True if 'hdfs_path' (full path) exists in HDFS at user@host:port via httpfs.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    port : int
        The port to use for httpfs connections.
 
    OUTPUT
    ------
    Boolean : True if 'hdfs_path' exists and can be accessed by 'user'; False otherwise.
    """
    op = 'GETFILESTATUS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
    # Get the JSON response using httpfs; stores as a Python dict
    resp = requests.get(url)
    # If a 404 was returned, the file/path does not exist
    if resp.status_code == 404:
        return False
    # If a 200 was returned, the file/path does exist
    elif resp.status_code == 200:
        return True
    # Something else - raise status, or if all else fails return None
    else:
        resp.raise_for_status()
        return None
 
 
def get_blocksize(host, user, hdfs_path, port=14000):
    """ Returns the HDFS block size (bytes) of 'hdfs_path' in HDFS at user@host:port via httpfs.
 
    The returned block size is in bytes. For MiB, divide this value by 2**20=1048576.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    port : int
        The port to use for httpfs connections.
 
    OUTPUT
    ------
    int/long : The block size in bytes.
    """
    op = 'GETFILESTATUS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
    # Get the JSON response using httpfs; stores as a Python dict
    resp = requests.get(url)
    # If a 200 was returned, the file/path exists
    if resp.status_code == 200:
        return resp.json()['FileStatus']['blockSize']
    # Something else - raise status, or if all else fails return None
    else:
        resp.raise_for_status()
 
 
def get_size(host, user, hdfs_path, port=14000):
    """ Returns the size (bytes) of 'hdfs_path' in HDFS at user@host:port via httpfs.
 
    The returned block size is in bytes. For MiB, divide this value by 2**20=1048576.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    port : int
        The port to use for httpfs connections.
 
    OUTPUT
    ------
    int/long : The size in bytes.
    """
    op = 'GETFILESTATUS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
    # Get the JSON response using httpfs; stores as a Python dict
    resp = requests.get(url)
    # If a 200 was returned, the file/path exists
    if resp.status_code == 200:
        return resp.json()['FileStatus']['length']
    # Something else - raise status, or if all else fails return None
    else:
        resp.raise_for_status()
 
 
def info(host, user, hdfs_path, port=14000):
    """ Returns a dictionary of info for 'hdfs_path' in HDFS at user@host:port via httpfs.
 
    This method is similar to 'liststatus', but only displays top-level information. If you need
    info about all of the files and subdirectories of a directory, use 'liststatus'.
 
    The returned dictionary contains keys: group, permission, blockSize, accessTime, pathSuffix,
    modificationTime, replication, length, ownder, type.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    port : int
        The port to use for httpfs connections.
 
    OUTPUT
    ------
    Dictionary : Information about 'hdfs_path'
    """
    op = 'GETFILESTATUS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
    # Get the JSON response using httpfs; stores as a Python dict
    resp = requests.get(url)
    # If a 200 was returned, the file/path exists
    if resp.status_code == 200:
        return resp.json()
    # Something else - raise status, or if all else fails return None
    else:
        resp.raise_for_status()
 
 
def liststatus(host, user, hdfs_path, port=14000):
    """ Returns a dictionary of info for 'hdfs_path' in HDFS at user@host:port via httpfs.
 
    Returns a dictionary of information. When used on a file, the returned dictionary contains a
    copy of the dictionary returned by 'info.' When used on a directory, the returned dictionary
    contains a list of such dictionaries.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    port : int
        The port to use for httpfs connections.
 
    OUTPUT
    ------
    Dictionary : Information about 'hdfs_path'
    """
    op = 'LISTSTATUS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
    # Get the JSON response using httpfs; stores as a Python dict
    resp = requests.get(url)
    # If a 200 was returned, the file/path exists
    if resp.status_code == 200:
        return resp.json()
    # Something else - raise status, or if all else fails return None
    else:
        resp.raise_for_status()
 
 
def ls(host, user, hdfs_path, port=14000):
    """ Print info for 'hdfs_path' in HDFS at user@host:port via httpfs.
 
    A print function intended for interactive usage. Similar to 'ls -l' or 'hdfs dfs -ls'.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file or directory being checked.
    port : int
        The port to use for httpfs connections.
    """
    op = 'LISTSTATUS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
    # Get the JSON response using httpfs; stores as a Python dict
    resp = requests.get(url)
    # If a 200 was returned, the file/path exists. Otherwise, raise error or exit.
    if resp.status_code != 200:
        resp.raise_for_status()
    else:
        filestatuses = resp.json()
        for obj in filestatuses['FileStatuses']['FileStatus']:
            obj_str = _perm_long_str_(type_str=obj['type'],perm_str=obj['permission'])
            obj_str += '%*s' % (
                _get_max_str_len_(filestatuses, 'replication')+3,
                obj['replication']
            )
            obj_str += '%*s' % (
                _get_max_str_len_(filestatuses, 'owner')+3,
                obj['owner']
            )
            obj_str += '%*s' % (
                _get_max_str_len_(filestatuses, 'group')+2,
                obj['group']
            )
            obj_str += '%*s' % (
                _get_max_str_len_(filestatuses, 'length')+4,
                obj['length']
            )
            obj_str += '%21s' % (
                datetime.datetime.utcfromtimestamp(
                    obj['modificationTime']/1000
                ).isoformat().replace('T',' ')
            )
            obj_str += ' ' + hdfs_path + '/' + obj['pathSuffix']
 
            print "%s" % obj_str
 
 
def mkdir(host, user, hdfs_path, port=14000):
    """ Creates the directory 'hdfs_path' on 'user'@'host':'port'.
 
    Directories are created recursively.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The path of the directory to create in HDFS.
    port : int : default=14000
        The port to use for httpfs connections.
    """
    op = 'MKDIRS'
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op=op, port=port)
 
    # Make the request
    resp = requests.put(url)
    # If a 200 was returned, the file/path exists
    if resp.status_code == 200:
        return resp.json()
    # Something else - raise status, or if all else fails return None
    else:
        resp.raise_for_status()
 
 
def put(host, user, hdfs_path, filename, port=14000, perms=775):
    """ Puts 'filename' into 'hdfs_path' on 'user'@'host':'port'.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the location to place the file in HDFS.
    filename : str
        The file to upload. Can be a local file or a full path.
    port : int : default=14000
        The port to use for httpfs connections.
    perms : str or int : default=775
        The permissions to use for the uploaded file in HDFS.
    """
    # Get the file name without base path.
    filename_short = filename.split('/')[-1]
    # Form the URL.
    url = make_httpfs_url(
        host=host,
        user=user,
        hdfs_path=hdfs_path + '/' + filename_short,
        op='CREATE&data=true&overwrite=true&permission=' + str(perms),
        port=port
    )
    headers = {
        'Content-Type':'application/octet-stream'
    }
    #files = {'file': open(filename,'rb')}
 
    resp = requests.put(url, data=open(filename,'rb'), headers=headers)
    if resp.status_code != 200:
        resp.raise_for_status()
 
 
def read(host, user, hdfs_path, port=14000):
    """ Reads file at 'hdfs_path' on 'user'@'host':'port'.
 
    This method allows the contents of a file in HDFS to be read into memory in Python.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file in HDFS.
    port : int : default=14000
        The port to use for httpfs connections.
    perms : str or int : default=775
        The permissions to use for the uploaded file in HDFS.
 
    OUTPUT
    ------
    Text of the file.
    """
    # Form the URL.
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op='OPEN', port=port)
 
    # Form and issue the request.
    resp = requests.get(url)
 
    if resp.status_code != 200:
        resp.raise_for_status
 
    return resp.text
 
 
def read_json(host, user, hdfs_path, port=14000):
    """ Reads JSON file at 'hdfs_path' on 'user'@'host':'port' and returns a Python dict.
 
    This method reads the contents of a JSON file in HDFS into Python as a dictionary.
 
    INPUT
    -----
    host : str
        The host to connect to for httpfs access to HDFS. (Can be 'localhost'.)
    user : str
        The user to use for httpfs connections.
    hdfs_path : str
        The full path of the file in HDFS.
    port : int : default=14000
        The port to use for httpfs connections.
    perms : str or int : default=775
        The permissions to use for the uploaded file in HDFS.
 
    OUTPUT
    ------
    Text of the file interpreted in JSON as a Python dict.
    """
    # Form the URL.
    url = make_httpfs_url(host=host, user=user, hdfs_path=hdfs_path, op='OPEN', port=port)
 
    # Form and issue the request.
    resp = requests.get(url)
 
    if resp.status_code != 200:
        resp.raise_for_status
 
    return json.loads(requests.get(url).text)

Here’s an example of printing the contents of a directory in IPython. I should point out that this is across a network connection, but still only takes 0.08 seconds.

In [1]: import httpfs_utils as httpfs
 
In [2]: time httpfs.ls(host='hadoop01',user='root',hdfs_path='/user/root')
drwx------   0     root  supergroup        0  2015-01-29 20:21:17 /user/root/.Trash
drwx------   0     root  supergroup        0  2015-01-22 20:49:49 /user/root/.staging
-rwxrwxr-x   3     root  supergroup    74726  2015-07-06 16:50:08 /user/root/alice.txt
drwxr-xr-x   0     root  supergroup        0  2015-07-06 22:17:52 /user/root/config
drwxr-xr-x   0     root  supergroup        0  2015-03-03 19:38:21 /user/root/hdfs_bin
drwxr-xrwx   0     root  supergroup        0  2015-04-09 00:56:40 /user/root/referrer_host
drwxr-xr-x   0     root  supergroup        0  2015-02-26 22:58:04 /user/root/stage
-rwxrwxr-x   3     root  supergroup       49  2015-07-06 19:09:20 /user/root/test_append.txt
drwxr-xr-x   0     root  supergroup        0  2015-07-06 19:38:14 /user/root/testdir
drwxr-xr-x   0     root  supergroup        0  2015-03-04 22:04:37 /user/root/tmp
CPU times: user 0.02 s, sys: 0.02 s, total: 0.03 s
Wall time: 0.08 s

That was simply a print function. We can do much more useful things, such as uploading a file. Here, I’ll upload a smallish (1.6MiB) CSV into HDFS. Then, we’ll verify that the file exists in HDFS.

In [3]: time httpfs.put(host='hadoop01',user='root',hdfs_path='/user/root',filename='/export/data-share/test.csv')
CPU times: user 0.00 s, sys: 0.02 s, total: 0.02 s
Wall time: 0.09 s
 
In [4]: time httpfs.exists(host='hadoop01',user='root',hdfs_path='/user/root/test.csv',)
CPU times: user 0.00 s, sys: 0.00 s, total: 0.00 s
Wall time: 0.02 s
Out[4]: True

Let’s read a JSON file directly into Python as a dictionary. (I’ve tracked all tweets mentioning the company for which I work, SpotXchange, for a year now. Here’s a random one.)

In [5]: time tweet = httpfs.read_json(host='hadoop01',user='root',hdfs_path='/user/root/spotxtweet.json')
CPU times: user 0.01 s, sys: 0.01 s, total: 0.02 s
Wall time: 0.66 s
 
In [8]: tweet
Out[8]: 
{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Mon Jul 14 22:25:14 +0000 2014',
 u'entities': {u'hashtags': [],
  u'symbols': [],
  u'urls': [{u'display_url': u'bit.ly/1qoqcDB',
    u'expanded_url': u'http://bit.ly/1qoqcDB',
    u'indices': [102, 124],
    u'url': u'http://t.co/yyMiXenMTJ'}],
  u'user_mentions': []},
 u'favorite_count': 0,
 u'favorited': False,
 u'filter_level': u'medium',
 u'geo': None,
 u'id': 488811485703835648,
 u'id_str': u'488811485703835648',
 u'in_reply_to_screen_name': None,
 u'in_reply_to_status_id': None,
 u'in_reply_to_status_id_str': None,
 u'in_reply_to_user_id': None,
 u'in_reply_to_user_id_str': None,
 u'lang': u'en',
 u'place': None,
 u'possibly_sensitive': False,
 u'retweet_count': 0,
 u'retweeted': False,
 u'source': u'<a href="http://www.hootsuite.com" rel="nofollow">Hootsuite</a>',
 u'text': u'Great insights from the CEO. Publishers Rapidly Adopting Programmatic Ad Sales: SpotXchange\u2019s Shehan: http://t.co/yyMiXenMTJ',
 u'truncated': False,
 u'user': {u'contributors_enabled': False,
  u'created_at': u'Thu Oct 31 17:50:48 +0000 2013',
  u'default_profile': False,
  u'default_profile_image': False,
  u'description': u'The Center of the Native Mobile Advertising Community',
  u'favourites_count': 1,
  u'follow_request_sent': None,
  u'followers_count': 152,
  u'following': None,
  u'friends_count': 83,
  u'geo_enabled': False,
  u'id': 2166999860,
  u'id_str': u'2166999860',
  u'is_translation_enabled': False,
  u'is_translator': False,
  u'lang': u'en',
  u'listed_count': 10,
  u'location': u'Anywhere and Everywhere',
  u'name': u'NativeMobile.com',
  u'notifications': None,
  u'profile_background_color': u'000000',
  u'profile_background_image_url': u'http://pbs.twimg.com/profile_background_images/378800000139148775/8OxxwKh3.jpeg',
  u'profile_background_image_url_https': u'https://pbs.twimg.com/profile_background_images/378800000139148775/8OxxwKh3.jpeg',
  u'profile_background_tile': False,
  u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/2166999860/1386291602',
  u'profile_image_url': u'http://pbs.twimg.com/profile_images/378800000834565058/0948f6f3c16916fbc99746370d067d71_normal.png',
  u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/378800000834565058/0948f6f3c16916fbc99746370d067d71_normal.png',
  u'profile_link_color': u'0084B4',
  u'profile_sidebar_border_color': u'FFFFFF',
  u'profile_sidebar_fill_color': u'DDEEF6',
  u'profile_text_color': u'333333',
  u'profile_use_background_image': True,
  u'protected': False,
  u'screen_name': u'nativemobile',
  u'statuses_count': 1841,
  u'time_zone': u'Pacific Time (US & Canada)',
  u'url': u'http://nativemobile.com',
  u'utc_offset': -25200,
  u'verified': False}}

Taking the results of an Impala query in the impala-shell and saving them as a TSV is easy. In my experience, this is better through the shell than through a service such as Hue. When I’ve done this in Hue, there have been some issues with the name node running out of memory due to the resultset being so large. Dumping to TSV from the shell doesn’t seem to result in the same issue.

Here’s a brief explanation of the different options:
-i: As usual, this connects the shell to an impala daemon.
-o: Output to the following file.
-B: Turn off pretty printing. Use tab delimiters by default.
-f: Run the query in the following file.

The delimiter used can be changed using the –output_delimiter option. In the following example, I’m connecting to the data node at data_node_01.

$ impala-shell -i data_node_01 -o output_file.tsv -B -f impala_query.sql

A lot of times, I’ll create an externally managed Hive table as a step toward constructing something better (e.g., a Parquet columnar snappy-compressed table created by Hive for use in Impala or Spark). The data for that table is often broken down by day. Instead of writing an interactive BASH command to iterate dates and create the nested directory structure, I wrote the following script.

For instance, I want a root directory in HDFS (say, “/user/jason/my_root_dir”) to have date directories for all days in 2014, such as:
- /user/jason/my_root_dir/2014
- /user/jason/my_root_dir/2014/01
- /user/jason/my_root_dir/2014/01/01
- /user/jason/my_root_dir/2014/01/02
- /user/jason/my_root_dir/2014/01/03

- /user/jason/my_root_dir/2014/12/31

Running “./make_partitions /user/jason/my_root_dir 2014-01-01 2014-12-31″ accomplishes this. Keep in mind that this takes a while, as the directories are checked and created across the cluster.

#!/bin/bash
 
# Usage: ./make_partitions HDFS_root_dir start_date end_date
# Example: ./make_partitions /user/root/mydir 2014-01-01 2014-12-31
# Creates nested year, month, day partitions for a sequence of dates (inclusive).
# Jason B. Hill - jason@jasonbhill.com
 
# Parse input options
HDFSWD=$1
START_DATE="$(date -d "$2" +%Y-%m-%d)"
END_DATE="$(date -d "$3 +1 days" +%Y-%m-%d)"
 
# Function to form directories based on a date
function mkdir_partition {
    # Input: $1 = date to form partition
 
    # Get date parameters
    YEAR=$(date -d "$1" +%Y)
    MONTH=$(date -d "$1" +%m)
    DAY=$(date -d "$1" +%d)
 
    # If the year doesn't exist, create it
    $(hdfs dfs -test -e ${HDFSWD}/${YEAR})
    if [[ "$?" -eq "1" ]]; then
        echo "-- creating HDFS directory: ${HDFSWD}/${YEAR}"
        $(hdfs dfs -mkdir ${HDFSWD}/${YEAR})
    fi
    # If the month doesn't exist, create it
    $(hdfs dfs -test -e ${HDFSWD}/${YEAR}/${MONTH})
    if [[ "$?" -eq "1" ]]; then
        echo "-- creating HDFS directory: ${HDFSWD}/${YEAR}/${MONTH}"
        $(hdfs dfs -mkdir ${HDFSWD}/${YEAR}/${MONTH})
    fi
    # If the day doesn't exist (it shouldn't), create it
    $(hdfs dfs -test -e ${HDFSWD}/${YEAR}/${MONTH}/${DAY})
    if [[ "$?" -eq "1" ]]; then
        echo "-- creating HDFS directory: ${HDFSWD}/${YEAR}/${MONTH}/${DAY}"
        $(hdfs dfs -mkdir ${HDFSWD}/${YEAR}/${MONTH}/${DAY})
    fi
}
 
# Iterate over dates and make partitions
ITER_DATE="${START_DATE}"
until [[ "${ITER_DATE}" == "${END_DATE}" ]]; do
    mkdir_partition ${ITER_DATE}
    ITER_DATE=$(date -d "${ITER_DATE} +1 days" +%Y-%m-%d)
done
 
exit 0