LinkedHashSet is a set that can also maintain order. To make this thread-safe, we can wrap it with Collections.synchronizedSet(). However, t
[296] Concurrent LinkedHashSet
Author: Dr Heinz M. Kabutz | Date: 2021-12-31 | Category: Concurrency | Java Version: 17 | Read Online
Abstract:
LinkedHashSet is a set that can also maintain order. To make this thread-safe, we can wrap it with Collections.synchronizedSet(). However, this is not a good option, because iteration would still be fast-fail. And what's the point of a thread-safe LinkedHashSet that we cannot iterate over? In this newsletter we try to create a concurrent set that behaves like a LinkedHashSet, but with minimal locking and with a weakly-consistent iteration.
Welcome to the 296th edition of The Java(tm) Specialists' Newsletter, and warm greetings from a wet and cold Island of Crete. Our summer tourists would be quite amazed at how drenched we get. Which other place has had its bad weather inscribed into the Holy Bible? (Saint Paul got caught in a hurricane while trying to navigate the South of Crete and after a shipwreck ended up on Malta.) Oh, and we have earthquakes galore. One of my friends sent me this email a few days ago: "Earthquakes happen very often now... Is it safe to stay there?" Good question indeed. No idea. But it is a nice place to live when it ain't rockin' and rollin', or raining cats and dogs.
And don't forget - today is the last day of 2021 and your last chance to spend 2021 training budget. For example, by grabbing our Java Specialists Superpack :-)
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
Concurrent LinkedHashSet
What's wrong with this code?
private final Set<Connection> = Collections.synchronizedSet(new LinkedHashSet<>());
During a recent talk, a friend showed a class with such a field. Someone complained. This would "encourage programmers to use Collections.synchronizedSet()
in production code. A concurrent collection would be preferable."
Since I have a particular interest in concurrency, I jumped to my friend's defense. It is difficult writing slides. Not every snippet of code we show is an example of perfect production code. Then, a synchronized collection is not always slower than a ConcurrentHashMap
. After all, the ConcurrentHashMap
itself maintains its invariants with synchronized
.
I do not know whether the synchronized wrapped LinkedHashSet
would be slow or not. Uncontended synchronization is fast. Since we are doing networking, I would expect that to be far more costly than a little lock. A bigger concern to me is that the LinkedHashSet
iteration is fast-fail. The synchronized wrapper would not protect us against a ConcurrentModificationException
. We could lock the entire set during iteration, but iteration is O(n) cost and thus locking might not be such a good idea. In a threaded environment, weakly-consistent iteration is preferable.
I began to wonder how we could create a concurrent form of LinkedHashSet
. Instead of TreeSet
, we can use the concurrent, thread-safe ConcurrenSkipListSet
. Similarly, instead of HashSet
, we could use ConcurrentHashMap.newKeySet()
. But LinkedHashSet
did not seem to have an obvious concurrent alternative.
Here is my attempt, combining a ConcurrentHashMap
with a ConcurrentSkipListSet
. Our class offers a reduced interface of Set
, thus only:
add(e)
remove(e)
contains(e)
stream()
clear()
iterator()
toString()
The ConcurrentSkipListSet
maintains the insertion order. The ConcurrentHashMap
ensures that elements are distinct. We store the element and insertion order inside the OrderedHolder
record
. These are then stored inside the ConcurrentSkipListSet
, sorted by the insertion order. We store our element as a key inside the ConcurrentHashMap
. The values are the same OrderedHolder
s that are inside the ConcurrentSkipListSet
.
When we add an element, we call the computeIfAbsent()
method on our map. If the element does not exist in the map yet, we create our OrderedHolder
and add it to the set. When we want to remove it again, we use computeIfPresent()
to also remove it from the set. Both of these compute methods are performed atomically on a ConcurrentHashMap
.
Here is the class. Please shout if you can think of a better approach for a thread-safe concurrent LinkedHashSet
. I have not done extensive testing on this class. Neither have I benchmarked the performance. I have thus no idea whether it is faster or slower than the original synchronize wrapped LinkedHashSet
. I'm not even sure that the code is correct. Please do not use it in production without extensive testing. And when (not if) you find glaring errors, please let me know so that I can update this newsletter :-)
import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.stream.*; public class ConcurrentLinkedReducedHashSet<E> implements Iterable<E> { private final Map<E, OrderedHolder<E>> identityHolder = new ConcurrentHashMap<>(); private record OrderedHolder<T>(T value, long order) { private static final AtomicLong nextOrder = new AtomicLong(); private OrderedHolder(T value) { this(value, nextOrder.incrementAndGet()); } } private final Set<OrderedHolder<E>> order = new ConcurrentSkipListSet<>( Comparator.comparingLong(OrderedHolder::order)); public boolean add(E e) { var added = new AtomicBoolean(false); identityHolder.computeIfAbsent( e, key -> { var holder = new OrderedHolder<>(e); order.add(holder); added.set(true); return holder; } ); return added.get(); } public boolean remove(E e) { var removed = new AtomicBoolean(false); identityHolder.computeIfPresent(e, (key, holder) -> { order.remove(holder); removed.set(true); return null; // will remove the entry }); return removed.get(); } public boolean contains(E e) { return identityHolder.containsKey(e); } public Stream<E> stream() { return order.stream().map(OrderedHolder::value); } public void clear() { // slow, but ensures we remove all entries in both collections stream().forEach(this::remove); } @Override public Iterator<E> iterator() { return stream().iterator(); } @Override public String toString() { return stream() .map(String::valueOf) .collect(Collectors.joining(", ", "[", "]")); } }
In this demo, we iterate in insertion order without a ConcurrentModificationException
:
public class WeaklyConsistentOrderedDemo { public static void main(String... args) { // var set = Collections.synchronizedSet(new LinkedHashSet<String>()); // CME // var set = ConcurrentHashMap.<String>newKeySet(); // works, wrong order var set = new ConcurrentLinkedReducedHashSet<String>(); // Perfect (maybe) set.add("hello"); set.add("world"); Iterator<String> it = set.iterator(); set.add("Goodbye"); while (it.hasNext()) { String next = it.next(); System.out.println(next); } } }
Output from our reduced set is:
hello world Goodbye
In the next demo, we remove "hello" after having iterated past it, and add it again. We would expect "hello" to thus show up at the end, and it does indeed.
public class WeaklyConsistentOrderedDemoComplex { public static void main(String... args) { var set = new ConcurrentLinkedReducedHashSet<String>(); set.add("hello"); set.add("world"); set.add("Goodbye"); Iterator<String> it = set.iterator(); System.out.println(it.next()); // hello System.out.println(it.next()); // world set.remove("hello"); set.add("hello"); System.out.println(it.next()); // Goodbye System.out.println(it.next()); // hello System.out.println(it.hasNext()); // false } }
The output is:
hello world Goodbye hello false
So far, so good. Next we have a demo that removes and adds 1.6m random numbers between 0..9 into our set. There should be no duplicates at the of the run, if everything is working.
import java.util.concurrent.*; public class ConcurrentUpdatesDemo { public static void main(String... args) throws InterruptedException { var set = new ConcurrentLinkedReducedHashSet<Integer>(); ExecutorService pool = Executors.newFixedThreadPool(16); for (int i = 0; i < 16; i++) { pool.submit(() -> { ThreadLocalRandom random = ThreadLocalRandom.current(); for (int j = 0; j < 100_000; j++) { int value = random.nextInt(0, 10); set.remove(value); set.add(value); } }); } pool.shutdown(); while (!pool.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("Waiting for pool to shut down"); } System.out.println("set = " + set); } }
That also seems to work:
set = [7, 6, 1, 3, 5, 2, 8, 9, 0, 4]
At the outset, I said that this would be a reduced set that only has the methods that we are going to use. But what if we need a java.util.Set
? We could throw UnsupportedOperationException
for all the methods, besides those we implemented.
This is surprisingly easy to do if you've read my book on dynamic proxies :-) [Or grab my course on dynamic proxies in Java here.] We first create a Set
that throws UnsupportedOperationException
for all methods:
Set<String> angrySet = Proxies.castProxy( Set.class, (p, m, a) -> { throw new UnsupportedOperationException( m.getName() + "() not implemented"); } );
Next we wrap that inside a dynamic object adapter, with our ConcurrentLinkedReducedHashSet
as the adapter. The dynamic object adapter will give precedence to our methods. It will thus throw an UnsupportedOperationException
whenever we try call another method from the Set
. The resulting type will be a Set
and we could add more methods as needed.
Set<String> set = Proxies.adapt( Set.class, // target interface angrySet, // adaptee new ConcurrentLinkedReducedHashSet<>() // adapter );
A complete demo would look like this:
// Maven: eu.javaspecialists.books.dynamicproxies:core:2.0.0 import eu.javaspecialists.books.dynamicproxies.*; import java.util.*; public class DynamicProxiesDemo { public static void main(String... args) { Set<String> angrySet = Proxies.castProxy( Set.class, (p, m, a) -> { throw new UnsupportedOperationException( m.getName() + "() not implemented"); } ); Set<String> set = Proxies.adapt( Set.class, // target interface angrySet, // adaptee new ConcurrentLinkedReducedHashSet<>() // adapter ); set.add("hello"); set.add("world"); Iterator<String> it = set.iterator(); set.add("Goodbye"); while (it.hasNext()) { String next = it.next(); System.out.println(next); } set.clear(); set.addAll(List.of("one")); // UnsupportedOperationException } }
Again, no guarantees about how fast this will be, nor whether it works at all. I would be delighted to see a better solution, using the standard JDK classes.
Kind regards
Heinz
Our entire Java Specialists Training in One Huge Bundle
If you no longer wish to receive our emails, click the link below:
Unsubscribe
Cretesoft Limited 77 Strovolos Ave Strovolos, Lefkosia 2018 Cyprus 