Using psycopg2 with PostgreSQL

I had been using MySql my whole life until recently I got my hands dirty on PostgreSQL in one of projects. I must tell, switching to PostgreSQL has been very easy. It has got some very cool and robust features. Let’s not talk about that here. When using python, psycopg2 is one of the mostly used database adapter. It is fairly stable and got a good community support. We used aiopg, which is a library for accessing a PostgreSQL database with asyncio. In this post, I will try to mention few important things which I came across.

1. DictCursor:

dict_cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

helps in fetching data from the database as a Python dictionary where we can easily get columns against their names. A plain Curson gives values against their index which can be sometimes painful. Say we have to fetch a row for id = 3 from user table and we have to use couple of fields as: name, age and gender and we do not want to use 10 other fields. Using dictcursor we can get these data as :

row.get('name'), row.get('age') and row.get('gender')

against :

row[2], row.get[4] and row.get[10], where 2, 4, and 10 are the orders of the required field

Some code:

cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = """SELECT * FROM {} where user_id = %s""".format(DBOperations.TABLE_NAME)
yield from cur.execute(query, (10, ))
row = yield from cur.fetchall()
return row

2. Single insert for multiple rows:

We might want to execute multiple insert in one query.

insert into user address ('name1', 'address1'), ('name1', 'address2'), ('name1', 'address3')

we have to construct the string and execute the query, which can be done as below:

def set_address(self, user_id, address_ids:list):
	tup = [(user_id, aid) for aid in address_ids]
	args_str = ",".join([str(s) for s in tup])
	insert = yield from cur.execute("INSERT INTO user_address VALUES " + args_str)

3. Searching in a jsonb array:

One of the cool datatypes in PostgreSql is jsonb array. PS has made sure that querying this array is easy. Sometime we may need to search for a particular key in the jsons, say a user has got many addresses in various cities and we need to look for all the users who have address in Mumbai.

def find_user_address_by_city(cls, cur, city: str):
        array_str = "%s"
        query = """SELECT * FROM user WHERE to_json(array(SELECT jsonb_array_elements(address) ->> 'city'))::jsonb ?|
         ARRAY[{}];""".format(array_str)
        yield from cur.execute(query, tuple(city))
        rows = yield from cur.fetchall()
        return rows

I will try to add other things as and when I get them.

A basic atomic number implementation

A basic atomic number implementation in python.

from datetime import datetime, date
from functools import wraps
import threading



def synchronized(function):

    def synched_function(self, *args, **kwargs):
        function._lock__ = threading.Lock()
        with function._lock__:
            return function(self, *args, **kwargs)
    return synched_function


class AtomicLong:
    def __init__(self, num):
        self._num = num

    @synchronized
    def increment_and_get(self):
        self._num += 1
        return self._num

    @synchronized
    def add_and_get(self, val):
        self._num += val
        return self._num

    @synchronized
    def set_value(self, val):
        self._num = val


class IdGenerator:
    RESET_MARKER = 101
    MAX_SEQUENCE_VALUE = 9999
    ATOMIC_LONG = AtomicLong(RESET_MARKER)

    @staticmethod
    def generate_id():
        # some business logic here
        sequence_number = IdGenerator.ATOMIC_LONG.increment_and_get()
        first_day = date(date.today().year, 1, 1)
        today = date.today()
        diff_days = (today - first_day).days
        year = date.today().year % 100
        seconds_passed_since_midnight = int(
            (datetime.now() - datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds())

        id = "{0}{1}{2}{3}{4}-{5}".format("A", "O", diff_days, year,
                                                   seconds_passed_since_midnight,
                                                   sequence_number)
        print(id)
        if sequence_number > IdGenerator.MAX_SEQUENCE_VALUE:
            IdGenerator.ATOMIC_LONG.set_value(IdGenerator.RESET_MARKER)
        return id



if __name__ == '__main__':
    id_gen = IdGenerator()
    thread1 = threading.Thread(target=id_gen.generate_id)
    thread2 = threading.Thread(target=id_gen.generate_id)
    thread3 = threading.Thread(target=id_gen.generate_id)
    thread1.start()
    thread2.start()
    thread3.start()

Pagination Using Redis

Continuing my last post on Redis, I was trying to achieve simple pagination using Redis. A use case can be showing reviews/comments on a web page.

When using a sorted set –

public static  void testRedisPagination(){
    ShardedJedis redis = JEDIS_POOL.getResource();
    redis.del("temp");
    String redisKey = "temp";
    for (int i = 0; i < 38; i++) {
        String val = "val - " + i;
        redis.zadd(redisKey, i, val);
    }
    long total = redis.zcard(redisKey);
    System.out.println("Total value inserted - " + total);
    int perPage = 7;
    System.out.println("Total number of pages - " + total/perPage);
    for (int pageNumber = 1; pageNumber <= total / perPage + 1; pageNumber++) {
        long start  = total - ((pageNumber-1)*perPage + 1);
        long end = start - perPage;
        System.out.println(" ===== Printing page " + (pageNumber));
        Set<String> values = redis.zrevrangeByScore(redisKey, start, end + 1);
        for (String value : values) {
            System.out.println(value + "\t");   
        }
        System.out.println("\n\n");
    }

When using a list

public static void testRedisPagination(){
 ShardedJedis redis = JEDIS_POOL.getResource();
 String redisKey = "temp";
 for (int i = 0; i < 38; i++) {
    String val = "val - " + i;
    redis.lpush(redisKey, val);
 } 
 long total = redis.llen(redisKey);
 System.out.println("Total value inserted - " + total);
 int perPage = 5;
 System.out.println("Total number of pages - " + total/perPage);
 for (int pageNumber = 1; pageNumber <= total / perPage; pageNumber++)  {
   int start = (pageNumber-1)*perPage + 1;
   int end = start + perPage -1;
   System.out.println(" ===== Printing page " + (pageNumber));
   List<String> values = redis.lrange(redisKey, start-1, end-1);
   for (String value : values) {
     System.out.println(value + "\t"); 
   }
 System.out.println("\n\n");
}

Using Jedis

Jedis is one of Redis clients written in Java, though I have not tried using any other client. It is very easy to configure Jedis and use it as a client. Lets quickly jump over to the code part and understand things.
Using Maven dependency:
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.6.0</version>
</dependency>
Problem Statement: – We have to add items to a sorted set on basis of some random scores attached to the values. Also there has to be maximum of 10 items present in the sorted set. We will use the functions zadd and zremrandeByScore.
Some Java code:
 Initializing the Redis Client – Our server is up on localhost:6379 (default). For multiple servers running we can add them to pool.
public RedisCacheManager() {
    List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
    for (String host : HOSTNAMES) {
        JedisShardInfo si = new JedisShardInfo(host, Constants.CACHE_PORT);
        shards.add(si);
        LOGGER.info("Added host to shard: " + host);
    }
    GenericObjectPoolConfig jedisConfig = new GenericObjectPoolConfig();
    JEDIS_POOL = new ShardedJedisPool(jedisConfig, shards);
}
Pushing items in a sorted set –
public boolean pushActivity(String activity, String activityId, double score, int ttl) {
    ShardedJedis redis = JEDIS_POOL.getResource();
    String redisKey = null;
    try {
        redisKey = activityId;
        redis.zadd(redisKey, score, activity);
        if (ttl > 0) {
            redis.expire(redisKey, ttl);
        }
        Long count = redis.zcount(redisKey, Double.MIN_VALUE, Double.MAX_VALUE);
        if (count > 10) {
            redis.zremrangeByScore(redisKey, Double.MIN_VALUE, count - 10);
        }
        return true;
    } catch (Exception e) {
        LOGGER.error("Unable to enter data into redis for - " + redisKey + " with activity - " + activity, e);
    } finally {
        JEDIS_POOL.returnResource(redis);
    }
    return false;
}
Retrieving item from Redis –
public Set<String> retrieveItem(int activityId, int maxIndex) {
    ShardedJedis redis = JEDIS_POOL.getResource();
    String redisKey = activityId;
    Set<String> redisValue = Collections.emptySet();
    try {
        redisValue = redis.zrange(redisKey, 0, maxIndex);
    } catch (Exception e) {
        LOGGER.error("Unable to fetch data from redis for - " + redisKey, e);
    } finally {
        JEDIS_POOL.returnResource(redis);
    }
    return redisValue;
}

Using Spring with Hibernate and c3p0 Connection Pool.

C3P0 is a very nice tool to manage database connections. I had hard time configuring Apache DBCP/2 so tried c3p0. There are many config options to set and the setting has to be done carefully so that we do not end up choking our database. Let us understand some of the config options.
  • testConnectionOnCheckin validates the connection when it is returned to the pool.
  • testConnectionOnCheckOut would ensure active connections before use, would be too expensive to do.
  • idleConnectionTestPeriod sets a limit to how long a connection will stay idle before testing it. Without preferredTestQuery, the default is DatabaseMetaData.getTables() – which is database agnostic, and although a relatively expensive call, is probably fine for a relatively small database. If you’re paranoid about performance use a query specific to your database (i.e. preferredTestQuery="SELECT 1")
  • maxIdleTimeExcessConnections will bring back the connectionCount back down to minPoolSize after a spike in activity, the connection is removed from the pool and returned back to db.
  • numHelperThreads it will help c3p0 spawns helper threads to manage the connections and returning them back

My spring configuration goes as –

<bean id="dataSource"  class="com.mchange.v2.c3p0.ComboPooledDataSource">
    <property name="driverClass">
        <value>com.mysql.jdbc.Driver</value>
    </property>
    <property name="jdbcUrl">
        <value>jdbc:mysql://localhost:3306/api</value>
    </property>
    <property name="user">
        <value>root</value>
    </property>
    <property name="password">
        <value></value>
    </property>
    <property name="idleConnectionTestPeriod">
        <value>300</value>
    </property>
    <property name="maxIdleTimeExcessConnections" value="180"/>
    <property name="maxPoolSize">
        <value>100</value>
    </property>
    <property name="acquireIncrement">
        <value>1</value>
    </property>
    <property name="maxStatements">
        <value>0</value>
    </property>
    <property name="minPoolSize">
        <value>10</value>
    </property>
    <property name="unreturnedConnectionTimeout">
        <value>3600</value>
    </property>
    <property name="preferredTestQuery">
        <value>SELECT 1</value>
    </property>
    <property name="initialPoolSize">
        <value>10</value>
    </property>
</bean>
Things to keep in mind:
  • To get an idea, please try to check the number of connection the app has hooked up with the database. For MySql try –  SHOW STATUS WHERE variable_name = ‘Threads_connected’;
  • When using Hibernate try to take care of opening and closing of sessions. If sessions are not properly closed, the connections are not freed and eventually it will choke the database.
Further Read - How to configure c3p0

Jackson JSON parser – Writing Custom Deserializer

One of major difference between Jackson and GSON is how they handle json parsing and mapping the json to a class object. While doing so Jackson uses strict checking which means the object from which the json was created and the class to which the json has to be mapped should be in strict sync.

Consider the following example –

class Foo {
private String value;
}
class Bar {
private String value;
}

and

String json = "{\"value\" : \"whatever\"}";
new Gson().fromJson(json, Foo.class);
new Gson().fromJson(json, Bar.class);

Gson is setup to perform a best effort to deserialize the given JSON into an instance of the given class. It will map as many fields as it finds. If none are found, that’s too bad, mapping will not be done.

Coming to the custom deserializer thing. We may need to write one in few cases as – changing date format of the json string to our custom one. For writing our custom deserializer we need to write a class which extends JsonDeserializer<T>. In this class we implement  the method deserialize(…) which contains our custom conversion code.

public class CustomJsonDateDeserializer extends JsonDeserializer<Date> {
@Override
public Date deserialize(JsonParser jsonParser, DeserializationContext context)
throws IOException, JsonProcessingException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm a z");
String date = jsonParser.getText();
try {
return format.parse(date);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}

Now annotate the field in the POJO which has to be deserialized using the custom deserializer.

@JsonDeserialize(using = CustomJsonDateDeserializer.class) private Date createdAt;

That’s it. Pretty simple thing.

RestEasy 3.0

We were using RESTEasy 2.0 in on of our earlier projects. RESTEasy is a JBoss project that provides various frameworks to help you build RESTful Web Services and RESTful Java applications. The 2.0 version was quite simple to use. I decided to use version 3.0 in one of recent projects and the migration was not that tough. A lot of chages have been done in the newer version. I am going to mention the ones which I found out.

RESTEasy Client 

In version 2.0

      String url = "http://resetservice.com/api/testService";
      ClientRequest request = new ClientRequest(url);
        request.body("application/json", json);
        response = request.post();
        int status = response.getStatus();
        if (status == 200) {
            String data = (String) response.getEntity(String.class);
            Boolean value = new Gson().fromJson(data, Boolean.class);
            return value;
        }
        return false;

The same thing  in 3.0 now changes to –

     ResteasyClient client = new ResteasyClientBuilder().build();
     String result = getResponseString("http://resetservice.com/api/testService");
     Boolean returnVal = clientUtil.getObjectFromJson(Boolean.class, result);
     return returnVal;
Authentication

I also had to use some in-house authentication technic.  I used it in very basic way. I checked for a particular request header (authorizationString) for authenticating the request. If that header is not present the request gets aborted. Below is the code snippet for the same.

@Provider
@ServerInterceptor
public class AuthorizationRequestFilter implements ContainerRequestFilter {
@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
    final SecurityContext securityContext =requestContext.getSecurityContext();
    String header = requestContext.getHeaderString("authorizationString");
    try {
         if (securityContext == null || header == null || header == "" ||
                                (header != null && !header.equals(
                                 ConfigReader.getInstance().getAuthToken()))) {
             requestContext.abortWith(
             Response.status(Response.Status.UNAUTHORIZED).entity("User cannot access the resource.").build());
          }
        } catch (ConfigurationException e) {
          e.printStackTrace();
     }
   }
}